本文介绍了使用 GlobalWindow 在 Beam 中进行状态垃圾收集的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Apache Beam 最近引入了状态单元,通过 StateSpec@StateId 注释,部分支持 Apache Flink 和 Google Cloud Dataflow.

Apache Beam has recently introduced state cells, through StateSpec and the @StateId annotation, with partial support in Apache Flink and Google Cloud Dataflow.

我找不到有关将其与 GlobalWindow 一起使用时会发生什么情况的任何文档.特别是,有没有办法让状态垃圾收集"机制根据某些配置摆脱一段时间未见的键的状态,同时仍然保持键的单一历史状态够频繁吗?

I cannot find any documentation on what happens when this is used with a GlobalWindow. In particular, is there a way to have a "state garbage collection" mechanism to get rid of states for keys that have not been seen for a while according to some configuration, while still maintaining a single all-time state for keys are that seen frequently enough?

或者,在这种情况下使用的状态量是否会出现分歧,无法回收与一段时间未见的键相对应的状态?

Or, is the amount of state used in this case going to diverge, with no way to ever reclaim state corresponding to keys that have not been seen in a while?

我还对 Apache Flink 或 Google Cloud Dataflow 是否支持潜在解决方案感兴趣.

I am also interested in whether a potential solution would be supported in either Apache Flink or Google Cloud Dataflow.

Flink 和直接运行器似乎有一些用于状态 GC"的代码,但我不确定它的作用以及在使用全局窗口时是否相关.

Flink and direct runners seem to have some code for "state GC" but I am not really sure what it does and whether it is relevant when using a global window.

推荐答案

状态可以在窗口到期后的某个时刻由 Beam runner 自动垃圾收集 - 当输入水印超过允许的延迟时间时,所有进一步的输入都是可丢弃的.确切的细节取决于跑步者.

State can be automatically garbage collected by a Beam runner at some point after a window expires - when the input watermark exceeds the end of the window by the allowed lateness, so all further input is droppable. The exact details depend on the runner.

正如您正确确定的那样,全局窗口可能永远不会过期.那么这个状态的自动收集将不会被调用.对于有界数据,包括耗尽场景,它实际上会过期,但对于永久无界数据源,它不会.

As you correctly determined, the Global window may never expire. Then this automatic collection of state will not be invoked. For bounded data, including drain scenarios, it actually will expire, but for a perpetual unbounded data source it will not.

如果您在全局窗口中对此类数据进行有状态处理,您可以使用用户定义的计时器(通过 @TimerId@OnTimer 使用)TimerSpec - 我还没有写过关于这些的博客)在您选择的一些超时后清除状态.如果状态代表某种类型的聚合,那么无论如何您都需要一个计时器来确保您的数据不会滞留在状态中.

If you are doing stateful processing on such data in the Global window you can use user-defined timers (used through @TimerId, @OnTimer, and TimerSpec - I haven't blogged about these yet) to clear state after some timeout of your choosing. If the state represents an aggregation of some sort, then you'll want a timer anyhow to make sure your data is not stranded in state.

以下是它们使用的快速示例:

Here is a quick example of their use:

new DoFn<Foo, Baz>() {

  private static final String MY_TIMER = "my-timer";
  private static final String MY_STATE = "my-state";

  @StateId(MY_STATE)
  private final StateSpec<ValueState<Bizzle>> =
      StateSpec.value(Bizzle.coder());

  @TimerId(MY_TIMER)
  private final TimerSpec myTimer =
      TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @ProcessElement
  public void process(
      ProcessContext c,
      @StateId(MY_STATE) ValueState<Bizzle> bizzleState,
      @TimerId(MY_TIMER) Timer myTimer) {
    bizzleState.write(...);
    myTimer.setForNowPlus(...);
  }

  @OnTimer(MY_TIMER)
  public void onMyTimer(
      OnTimerContext context,
      @StateId(MY_STATE) ValueState<Bizzle> bizzleState) {
    context.output(... bizzleState.read() ...);
    bizzleState.clear();
  }
}

这篇关于使用 GlobalWindow 在 Beam 中进行状态垃圾收集的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-16 23:35