本文介绍了加入 2 个 kafka 流的问题(使用自定义时间戳提取器)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在加入 2 个 kafka 流以从我的事件字段中提取日期时遇到问题.当我没有定义自定义 TimeStampExtractor 时,联接工作正常,但当我这样做时,联接不再起作用.我的拓扑很简单:

I'm having problems joining 2 kafka streams extracting the date from the fields of my event. The join is working fine when I do not define a custom TimeStampExtractor but when I do the join does not work anymore. My topology is quite simple:

val builder = new StreamsBuilder()

val couponConsumedWith = Consumed.`with`(Serdes.String(),
  getAvroCouponSerde(schemaRegistryHost, schemaRegistryPort))
val couponStream: KStream[String, Coupon] = builder.stream(couponInputTopic, couponConsumedWith)

val purchaseConsumedWith = Consumed.`with`(Serdes.String(),
  getAvroPurchaseSerde(schemaRegistryHost, schemaRegistryPort))
val purchaseStream: KStream[String, Purchase] = builder.stream(purchaseInputTopic, purchaseConsumedWith)

val couponStreamKeyedByProductId: KStream[String, Coupon] = couponStream.selectKey(couponProductIdValueMapper)
val purchaseStreamKeyedByProductId: KStream[String, Purchase] = purchaseStream.selectKey(purchaseProductIdValueMapper)

val couponPurchaseValueJoiner = new ValueJoiner[Coupon, Purchase, Purchase]() {

  @Override
  def apply(coupon: Coupon, purchase: Purchase): Purchase = {
      val discount = (purchase.getAmount * coupon.getDiscount) / 100
      new Purchase(purchase.getTimestamp, purchase.getProductid, purchase.getProductdescription, purchase.getAmount - discount)
  }
}

val fiveMinuteWindow = JoinWindows.of(TimeUnit.MINUTES.toMillis(10))
val outputStream: KStream[String, Purchase] = couponStreamKeyedByProductId.join(purchaseStreamKeyedByProductId,
  couponPurchaseValueJoiner,
  fiveMinuteWindow
  )

outputStream.to(outputTopic)

builder.build()

正如我所说,当我不使用自定义 TimeStampExtractor 时,这段代码就像一个魅力,但是当我通过将 StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG 设置为我的自定义提取器类时(我已经仔细检查了该类是否正确提取了日期)联接不再起作用.

As I said this code works like a charm when I do not use a custom TimeStampExtractor but when I do by setting the StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG to my custom extractor class (I've double checked that the class is extracting the date properly) the join does not work anymore.

我正在通过运行单元测试并将以下事件传递给它来测试拓扑:

I'm testing the topology by running a unit test and passing the following events to it:

    val coupon1 = new Coupon("Dec 05 2018 09:10:00.000 UTC", "1234", 10F)
    // Purchase within the five minutes after the coupon - The discount should be applied
    val purchase1 = new Purchase("Dec 05 2018 09:12:00.000 UTC", "1234", "Green Glass", 25.00F)
    val purchase1WithDiscount = new Purchase("Dec 05 2018 09:12:00.000 UTC", "1234", "Green Glass", 22.50F)
    val couponRecordFactory1 = couponRecordFactory.create(couponInputTopic, "c1", coupon1)
    val purchaseRecordFactory1 = purchaseRecordFactory.create(purchaseInputTopic, "p1", purchase1)

    testDriver.pipeInput(couponRecordFactory1)
    testDriver.pipeInput(purchaseRecordFactory1)
    val outputRecord1 = testDriver.readOutput(outputTopic,
      new StringDeserializer(),
      JoinTopologyBuilder.getAvroPurchaseSerde(
        schemaRegistryHost,
        schemaRegistryPort).deserializer())
    OutputVerifier.compareKeyValue(outputRecord1, "1234", purchase1WithDiscount)

不确定选择新密钥的步骤是否摆脱了正确的日期.我已经测试了很多没有运气的组合:(

Not sure if the step of selecting a new key is getting rid of the proper date. I have tested a lot of combinations with no luck :(

任何帮助将不胜感激!

推荐答案

我不确定,因为我不知道你测试了多少代码,但我的猜测是:

I'm not sure of that because I don't know how much you test your code, but my guess will be that :

1) 您的代码使用默认时间戳提取器,因为它使用您将记录发送到管道中的时间作为时间戳记录,所以基本上它会起作用,因为在您的测试中,您正在一个接一个地发送数据而没有暂停.

1) your code work with the default timestamp extractor because it's using the time when you're sending record into the pipes as timestamps records, so basically it will work because in your test you're sending data one after another without a pause.

2) 您正在使用 TopologyTestDriver 进行测试!请注意,它对于将您的业务代码和拓扑作为一个单元进行测试非常有用(我有什么作为输入以及什么是正确的输出),但在这些测试中没有运行 Kafka Stream 应用程序.

2) you are using the TopologyTestDriver to do your tests !Note that it's very useful for testing your business code and the topology as a unit (what I have as inputs and what is the correct according outputs) but there isn't a Kafka Stream app running in thoses tests.

在您的情况下,您可以使用 TopologyTestDriver 类中的方法 advanceWallClockTime(long) 来模拟系统时间行走.

In your case you can play with the method advanceWallClockTime(long) in the TopologyTestDriver class to simulate the system time walking.

如果您想启动拓扑,您必须使用嵌入式 kafka 集群进行集成测试(kafka 库中有一个运行良好!).

If you want to start the topology you will have to do an integration test with an embedded kafka cluster (there is one on kafka libraries that's working just fine !).

让我知道这是否有帮助:-)

Let me know if that's help :-)

这篇关于加入 2 个 kafka 流的问题(使用自定义时间戳提取器)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-30 05:23