roman_日积跬步-终至千里

roman_日积跬步-终至千里

在TaskManager中启动Task线程后,会调用StreamTask.invoke()方法触发当前Task中算子的执行,在invoke()方法中会调用restoreInternal()方法,这中间包括创建和初始化算子中的状态数据。
另外在invoke中,可以通过判断任务状态来判断是否需要初始化状态。

        // Allow invoking method 'invoke' without having to call 'restore' before it.
        if (!isRunning) {
            LOG.debug("Restoring during invoke will be called.");
            restoreInternal();
        }

StreamTask调用initializeStateAndOpenOperators()方法对当前Task中所有算子的状态数据进行初始化。

RegularOperatorChain.
public void initializeStateAndOpenOperators(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception {  
    Iterator var2 = this.getAllOperators(true).iterator();  
  
    while(var2.hasNext()) {  
        StreamOperatorWrapper<?, ?> operatorWrapper = (StreamOperatorWrapper)var2.next();  
        StreamOperator<?> operator = operatorWrapper.getStreamOperator();  
        operator.initializeState(streamTaskStateInitializer);  
        operator.open();  
    }  
  
}

 
找到了算子状态初始化的位置,我们继续了解状态是如何初始化的。

1. 状态初始化总流程梳理

AbstractStreamOperator.initializeState中描述了状态初始化的总体流程,如下代码以及注释:

# AbstractStreamOperator.initializeState

public final void initializeState(StreamTaskStateInitializer streamTaskStateManager)  
        throws Exception {  
    //1. 获取类型序列化器
    final TypeSerializer<?> keySerializer =  
            config.getStateKeySerializer(getUserCodeClassloader());  
    //2. get containingTask
    final StreamTask<?, ?> containingTask = Preconditions.checkNotNull(getContainingTask());  
    final CloseableRegistry streamTaskCloseableRegistry =  
            Preconditions.checkNotNull(containingTask.getCancelables());  
   //3. create StreamOperatorStateContext
    final StreamOperatorStateContext context =  
            streamTaskStateManager.streamOperatorStateContext(  
                    getOperatorID(),  
                    getClass().getSimpleName(),  
                    getProcessingTimeService(),  
                    this,  
                    keySerializer,  
                    streamTaskCloseableRegistry,  
                    metrics,  
                    config.getManagedMemoryFractionOperatorUseCaseOfSlot(  
                            ManagedMemoryUseCase.STATE_BACKEND,  
                            runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(),  
                            runtimeContext.getUserCodeClassLoader()),  
                    isUsingCustomRawKeyedState());  
   //4. create stateHandler
    stateHandler =  
            new StreamOperatorStateHandler(  
                    context, getExecutionConfig(), streamTaskCloseableRegistry);  
    timeServiceManager = context.internalTimerServiceManager();  
    //5. initialize OperatorState
    stateHandler.initializeOperatorState(this);  
    //6. set KeyedStateStore in runtimeContext
    runtimeContext.setKeyedStateStore(stateHandler.getKeyedStateStore().orElse(null));  
}

在StreamOperator初始化状态数据的过程中,首先从StreamTask中获取创建状态需要的组件,例如托管状态的管理后端KeyedStateBackend、OperatorStateBackend以及原生状态管理的KeyedStateInputs和OperatorStateInputs组件。

状态数据操作过程中使用的管理组件最终都会封装成StateInitializationContext并传递给子类使用,例如在AbstractUdfStreamOperator中,就会使用StateInitializationContext中的信息初始化用户定义的UDF中的状态数据。

2.创建StreamOperatorStateContext

接下来看如何在Task实例初始化时创建这些组件,并将其存储在StreamOperatorStateContext中供算子使用,如下代码:

StreamTaskStateInitializerImpl
@Override  
public StreamOperatorStateContext streamOperatorStateContext(  
        @Nonnull OperatorID operatorID,  
        @Nonnull String operatorClassName,  
        @Nonnull ProcessingTimeService processingTimeService,  
        @Nonnull KeyContext keyContext,  
        @Nullable TypeSerializer<?> keySerializer,  
        @Nonnull CloseableRegistry streamTaskCloseableRegistry,  
        @Nonnull MetricGroup metricGroup,  
        double managedMemoryFraction,  
        boolean isUsingCustomRawKeyedState)  
        throws Exception {  
    //1. 获取task实例信息
    TaskInfo taskInfo = environment.getTaskInfo();  
    OperatorSubtaskDescriptionText operatorSubtaskDescription =  
            new OperatorSubtaskDescriptionText(  
                    operatorID,  
                    operatorClassName,  
                    taskInfo.getIndexOfThisSubtask(),  
                    taskInfo.getNumberOfParallelSubtasks());  
  
    final String operatorIdentifierText = operatorSubtaskDescription.toString();  
  
    final PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates =  
            taskStateManager.prioritizedOperatorState(operatorID);  
  
    CheckpointableKeyedStateBackend<?> keyedStatedBackend = null;  
    OperatorStateBackend operatorStateBackend = null;  
    CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs = null;  
    CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs = null;  
    InternalTimeServiceManager<?> timeServiceManager;  
  
    try {  
        // 创建keyed类型的状态后端
        // -------------- Keyed State Backend --------------  
        keyedStatedBackend =  
                keyedStatedBackend(  
                        keySerializer,  
                        operatorIdentifierText,  
                        prioritizedOperatorSubtaskStates,  
                        streamTaskCloseableRegistry,  
                        metricGroup,  
                        managedMemoryFraction);  
        //创建operator类型的状态后端
        // -------------- Operator State Backend --------------  
        operatorStateBackend =  
                operatorStateBackend(  
                        operatorIdentifierText,  
                        prioritizedOperatorSubtaskStates,  
                        streamTaskCloseableRegistry);  
        //创建原生类型状态后端
        // -------------- Raw State Streams --------------  
        rawKeyedStateInputs =  
                rawKeyedStateInputs(  
                        prioritizedOperatorSubtaskStates  
                                .getPrioritizedRawKeyedState()  
                                .iterator());  
        streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs);  
  
        rawOperatorStateInputs =  
                rawOperatorStateInputs(  
                        prioritizedOperatorSubtaskStates  
                                .getPrioritizedRawOperatorState()  
                                .iterator());  
        streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs);  
        //创建Internal Timer Service Manager
        // -------------- Internal Timer Service Manager --------------  
        if (keyedStatedBackend != null) {  
  
            // if the operator indicates that it is using custom raw keyed state,  
            // then whatever was written in the raw keyed state snapshot was NOT written            // by the internal timer services (because there is only ever one user of raw keyed            // state);            // in this case, timers should not attempt to restore timers from the raw keyed            // state.            final Iterable<KeyGroupStatePartitionStreamProvider> restoredRawKeyedStateTimers =  
                    (prioritizedOperatorSubtaskStates.isRestored()  
                                    && !isUsingCustomRawKeyedState)  
                            ? rawKeyedStateInputs  
                            : Collections.emptyList();  
  
            timeServiceManager =  
                    timeServiceManagerProvider.create(  
                            keyedStatedBackend,  
                            environment.getUserCodeClassLoader().asClassLoader(),  
                            keyContext,  
                            processingTimeService,  
                            restoredRawKeyedStateTimers);  
        } else {  
            timeServiceManager = null;  
        }  
  
        // -------------- Preparing return value --------------  
  
        return new StreamOperatorStateContextImpl(  
                prioritizedOperatorSubtaskStates.getRestoredCheckpointId(),  
                operatorStateBackend,  
                keyedStatedBackend,  
                timeServiceManager,  
                rawOperatorStateInputs,  
                rawKeyedStateInputs);  
    } catch (Exception ex) {  
  
。。。。
}

流程梳理:

 
小结
StreamTaskStateInitializer.streamOperatorStateContext()方法包含创建托管状态和原生状态管理后端的全过程。StreamOperator的实现类能够从StreamOperatorStateContext中获取这些状态管理组件并使用它们创建指定类型的状态,最终状态数据会存储在状态管理后端指定的物理介质上,例如堆内存或RocksDB。

StateInitializationContext会被用于算子和UserDefinedFunction中,实现算子或函数中的状态数据操作。

 

3. StateInitializationContext的接口设计。

【flink状态管理(2)各状态初始化入口】状态初始化流程详解与源码剖析-LMLPHP

 

4. 状态初始化举例:UDF状态初始化

在AbstractStreamOperator中调用initializeState(StateInitializationContext context)抽象方法初始化Operator中的状态。这里以AbstractUdfStreamOperator为例说明具体算子、UDF是如何进行状态初始化的。

AbstractUdfStreamOperator.initializeState()方法实际上调用了StreamingFunctionUtils.restoreFunctionState()方法对User-Defined Function中的状态数据进行初始化和恢复,实际上就是将上文创建的StateInitializationContext上下文信息提供给Function接口使用。

public void initializeState(StateInitializationContext context) throws Exception {
   super.initializeState(context);
   StreamingFunctionUtils.restoreFunctionState(context, userFunction);
}

恢复函数内部的状态数据涉及Checkpoint的实现,我们会在之后介绍如何在StreamingFunctionUtils.restoreFunctionState()方法中恢复函数中的状态数据。

 
《Flink设计与实现:核心原理与源码解析》张利兵

02-08 09:09