本文介绍了Apache Kafka Streams将KTables物化为一个主题似乎很慢的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用kafka流,并且试图将KTable变成一个主题.

I'm using kafka stream and I'm trying to materialize a KTable into a topic.

它有效,但似乎每30秒左右完成一次.

It works but it seems to be done every 30 secs or so.

Kafka Stream如何/何时决定将KTable的当前状态具体化为主题?

How/When does Kafka Stream decides to materialize the current state of a KTable into a topic ?

有什么办法可以缩短时间,使其更实时"?

Is there any way to shorten this time and to make it more "real-time" ?

这是我正在使用的实际代码

Here is the actual code I'm using

// Stream of random ints: (1,1) -> (6,6) -> (3,3)
// one record every 500ms
KStream<Integer, Integer> kStream = builder.stream(Serdes.Integer(), Serdes.Integer(), RandomNumberProducer.TOPIC);

// grouping by key
KGroupedStream<Integer, Integer> byKey = kStream.groupByKey(Serdes.Integer(), Serdes.Integer());

// same behaviour with or without the TimeWindow
KTable<Windowed<Integer>, Long> count = byKey.count(TimeWindows.of(1000L),"total");

// same behaviour with only count.to(Serdes.Integer(), Serdes.Long(), RandomCountConsumer.TOPIC);
count.toStream().map((k,v) -> new KeyValue<>(k.key(), v)).to(Serdes.Integer(), Serdes.Long(), RandomCountConsumer.TOPIC);

推荐答案

这由commit.interval.ms控制,默认值为30s.此处有更多详细信息: http://docs.confluent.io/current/streams/developer-guide.html

This is controlled by commit.interval.ms, which defaults to 30s. More details here:http://docs.confluent.io/current/streams/developer-guide.html

在这里:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-63%3A+Unify+store+and+downstream+caching+in+streams

这篇关于Apache Kafka Streams将KTables物化为一个主题似乎很慢的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

11-03 01:54