我想加入两个流,并且将合并窗口设置为25小时,因为要合并的记录最多可以相隔24小时。

final Long JOIN_WINDOW = TimeUnit.HOURS.toMillis(25);

kstream.join(
  runsheetIdStream,
  (jt,r) -> { jt.setDate(r.getStart_date()); return jt; },
  JoinWindows.of(JOIN_WINDOW),
  Joined.with(Serdes.Long(),jobTransactionSerde,runsheetSerde))

这将引发以下异常:

org.apache.kafka.streams.errors.TopologyException:无效的拓扑:连接窗口KSTREAM-JOINTHIS-0000000016-store的保留期限必须不小于其窗口大小。

如何延长保留期限?

最佳答案

当您使用join并使用JoinWindows.of(JOIN_WINDOW)时,您隐式定义了基础状态存储的元数据。
JoinWindows.of的javadoc中:

指定如果同一键的记录的时间戳在timeDifference内,则该键可以连接,即辅助流中的记录的时间戳比主流中的记录的时间戳早或晚于max timeDifference。

所谓的保留期(又名窗口维护持续时间)是使用until指定的更早(在Kafka Streams 2.1.0之前):

设置窗口维护持续时间(保留时间),以毫秒为单位。该保留时间是保证将窗口维持多长时间的下限。

由于默认情况下,保留期限为1天(目前无法找到参考),因此是例外情况。
从Kafka Streams 2.1.0开始,您应该使用Materialized API:

用于描述应如何实现StateStore。您可以通过一种接受供应商的方法来提供自定义StateStore后端,也可以通过仅提供商店名称来使用默认的RocksDB后端。
Materialized使您可以完全控制用于连接的基础状态存储,并提供withRetention(java.time.Duration retention):

配置窗口和会话存储的保留期。
请注意,保留期必须至少足够长,以包含窗口数据从窗口开始到窗口结束以及整个宽限期的整个生命周期。

10-08 02:58