本文介绍了如何在 KafkaStream 应用程序中获取 partitionId 和 TopicName的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们如何从 KafkaStream 获取主题名称和分区 ID.对于任何其他 Kafka 消费者,我们可以获得主题名称和 partitionId,如下所示:

How do we get topic name and partition id from KafkaStream. For any other Kafka consumer we can get topic name and partitionId like following:

    ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {System.out.printf("consumed: key = %s, value = %s, partition id= %s, offset = %s%n",record.key(), record.value(), record.partition(), record.offset());}

不知道如何在 KafkaStreams 中获取记录引用.

Not sure how to get the record reference in KafkaStreams.

推荐答案

您可以通过处理器 API 中公开的 ProcessorContext 获取输入记录的元数据.您可以通过 transform() 和类似方法将处理器 API 嵌入到 DSL 中.

You can get meta data of input record via the ProcessorContext that is exposed in the Processor API. You can embed the Processor API in the DSL via transform() and similar methods.

查看文档了解详情:https://docs.confluent.io/current/streams/developer-guide/processor-api.html#accessing-processor-context

这篇关于如何在 KafkaStream 应用程序中获取 partitionId 和 TopicName的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

11-01 19:10