roman_日积跬步-终至千里

roman_日积跬步-终至千里

1.OperatorChain的设计与实现

OperatorChain的大致逻辑

 
OperatorChain的Output组件:将数据发送到下游

 
OperatorChain的collect():收集处理完的数据

 
OperatorChain的Output接口:也能输出Watermark和LatencyMarker等事件

 
OperatorChain内部定义了不同的WatermarkGaugeExposingOutput接口实现类。

【Flink网络数据传输】OperatorChain的设计与实现-LMLPHP

例子:收集数据并通过Output发数据数据到下游

 

2.OperatorChain的创建和初始化

接下来我们看OperatorChain的初始化过程,如下代码,OperatorChain的构造器包含如下逻辑。

public OperatorChain(
      StreamTask<OUT, OP> containingTask,
      RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> 
         recordWriterDelegate
      ) {
   // 获取当前StreamTask的userCodeClassloader
   final ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader();
   // 获取StreamConfig
   final StreamConfig configuration = containingTask.getConfiguration();
   // 获取StreamOperatorFactory
   StreamOperatorFactory<OUT> operatorFactory = 
   configuration.getStreamOperatorFactory(userCodeClassloader);
   // 读取chainedConfigs
   Map<Integer, StreamConfig> chainedConfigs = 
configuration.getTransitiveChainedTaskConfigsWithSelf(userCodeClassloader);
   // 根据StreamEdge创建RecordWriterOutput组件
   List<StreamEdge> outEdgesInOrder = 
   configuration.getOutEdgesInOrder(userCodeClassloader);
   Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap = 
   new HashMap<>(outEdgesInOrder.size());
   this.streamOutputs = new RecordWriterOutput<?>[outEdgesInOrder.size()];
   boolean success = false;
   try {
      for (int i = 0; i < outEdgesInOrder.size(); i++) {
         StreamEdge outEdge = outEdgesInOrder.get(i);
         // 为每个输出边创建RecordWriterOutput
         RecordWriterOutput<?> streamOutput = createStreamOutput(
            recordWriterDelegate.getRecordWriter(i),
            outEdge,
            chainedConfigs.get(outEdge.getSourceId()),
            containingTask.getEnvironment());
         this.streamOutputs[i] = streamOutput;
         streamOutputMap.put(outEdge, streamOutput);
      }
      // 创建OperatorChain内部算子之间的连接
      List<StreamOperator<?>> allOps = new ArrayList<>(chainedConfigs.size());
      this.chainEntryPoint = createOutputCollector(
         containingTask,
         configuration,
         chainedConfigs,
         userCodeClassloader,
         streamOutputMap,
         allOps,
         containingTask.getMailboxExecutorFactory());
      if (operatorFactory != null) {
         WatermarkGaugeExposingOutput<StreamRecord<OUT>> output = 
            getChainEntryPoint();
         // 创建headOperator
         headOperator = StreamOperatorFactoryUtil.createOperator(
               operatorFactory,
               containingTask,
               configuration,
               output);
         headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK,
         output.getWatermarkGauge());
      } else {
         headOperator = null;
      }
      allOps.add(headOperator);
      this.allOperators = allOps.toArray(new StreamOperator<?>[allOps.size()]);
      success = true;
   }
   finally {
      // 如果创建不成功,则关闭StreamOutputs中的RecordWriterOutput
      // 这里防止内存泄漏
      if (!success) {
         for (RecordWriterOutput<?> output : this.streamOutputs) {
            if (output != null) {
               output.close();
            }
         }
      }
   }
}

OperatorChain作用小结

 

3.创建RecordWriterOutput

RecordWriterOutput用于将数据输出到网络指定位置。

OperatorChain.createStreamOutput()逻辑如下:

private RecordWriterOutput<OUT> createStreamOutput(
      RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,
      StreamEdge edge,
      StreamConfig upStreamConfig,
      Environment taskEnvironment) {
   // 获取OutputTag
   OutputTag sideOutputTag = edge.getOutputTag(); 
   // 获取数据序列化器TypeSerializer
   TypeSerializer outSerializer = null;
   // 如果StreamEdge指定了OutputTag
   if (edge.getOutputTag() != null) {
      // 则进行边路输出
      outSerializer = upStreamConfig.getTypeSerializerSideOut(
            edge.getOutputTag(), taskEnvironment.getUserClassLoader());
   } else {
      // 正常输出
      outSerializer = 
      upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());
   }
   // 返回创建的RecordWriterOutput实例
   return new RecordWriterOutput<>(recordWriter, outSerializer, sideOutputTag, this);
}

 

StreamRecord将数据输出的逻辑

public void collect(StreamRecord<OUT> record) {
   if (this.outputTag != null) {
      return;
   }
   pushToRecordWriter(record);
}

 

pushToRecordWriter发送数据

//RecordWriterOutput.pushToRecordWriter()
private <X> void pushToRecordWriter(StreamRecord<X> record) {
   serializationDelegate.setInstance(record);
   try {
      recordWriter.emit(serializationDelegate);
   }
   catch (Exception e) {
      throw new RuntimeException(e.getMessage(), e);
   }
}

 
 

03-07 22:04