roman_日积跬步-终至千里

roman_日积跬步-终至千里

一. RecordWriter封装数据并发送到网络

1. 数据发送到网络的具体流程

RecordWriter对接入的StreamRecord数据进行序列化并等待下游任务消费的过程,整个过程细节如下。

【Flink网络数据传输(4)】RecordWriter(下)封装数据并发送到网络的过程-LMLPHP

 

2. 源码层面

接下来我们从源码的角度了解RecordWriter具体处理数据的逻辑。在RecordWriterOutput中调用pushToRecordWriter方法将数据写出。

【Flink网络数据传输(4)】RecordWriter(下)封装数据并发送到网络的过程-LMLPHP

通过recordWriter.emit(serializationDelegate)方法,将数据元素发送到RecordWriter中进行处理。主要逻辑如下

protected void emit(T record, int targetSubpartition) throws IOException {  
    checkErroneous();  
  
    targetPartition.emitRecord(serializeRecord(serializer, record), targetSubpartition);  
  
    if (flushAlways) {  
        targetPartition.flush(targetSubpartition);  
    }  
}



protected void emit(T record, int targetChannel) throws IOException, 
   InterruptedException {
   checkErroneous();
   // 数据序列化
   serializer.serializeRecord(record);
   // 将序列化器中的数据复制到指定分区中
   if (copyFromSerializerToTargetChannel(targetChannel)) {
      // 清空序列化器
      serializer.prune();
   }
}

 

2.1. Serializer的实现逻辑

接着了解如何将序列化器中的数据转换成Buffer并存储到ResultPartiton中,最终将数据发送到下游。

a. SpanningRecordSerializer的实现

SpanningRecordSerializer实现将序列化后的BytesBuffer数据写入BufferBuilder。

 

b. SpanningRecordSerializer中如何对数据元素进行序列化

SpanningRecordSerializer.serializeRecord()方法主要逻辑如下。

public void serializeRecord(T record) throws IOException {
   if (CHECKED) {
      if (dataBuffer.hasRemaining()) {
         throw new IllegalStateException("Pending serialization of previous record.");
      }
   }
   // 首先清理serializationBuffer中的数据
   serializationBuffer.clear();
   // 设定serialization buffer数量
   serializationBuffer.skipBytesToWrite(4);
   // 将record数据写入serializationBuffer
   record.write(serializationBuffer);
   // 获取serializationBuffer的长度信息并记录到serializationBuffer对象中
   int len = serializationBuffer.length() - 4;
   serializationBuffer.setPosition(0);
   serializationBuffer.writeInt(len);
   serializationBuffer.skipBytesToWrite(len);
   // 对serializationBuffer进行wrapp处理,转换成ByteBuffer数据结构
   dataBuffer = serializationBuffer.wrapAsByteBuffer();
}

 

2.2. 将ByteBuffer中间数据写入BufferBuilder

首先BufferBuilder用于构建完整的Buffer数据。在copyFromSerializerToTargetChannel()方法中实现了将RecordSerializer中的ByteBuffer中间数据写入BufferBuilder的逻辑:

protected boolean copyFromSerializerToTargetChannel(int targetChannel) throws 
   IOException, InterruptedException {
   // 对序列化器进行Reset操作,初始化initial position
   serializer.reset();
   // 创建BufferBuilder
   boolean pruneTriggered = false;
   BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
   // 调用序列化器将数据写入bufferBuilder
   SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder);
   // 如果SerializationResult是完整Buffer
   while (result.isFullBuffer()) {
      // 则完成创建Buffer数据的操作
      finishBufferBuilder(bufferBuilder);
      // 如果是完整记录,则将pruneTriggered置为True
      if (result.isFullRecord()) {
         pruneTriggered = true;
         emptyCurrentBufferBuilder(targetChannel);
         break;
      }
      // 创建新的bufferBuilder,继续复制序列化器中的数据到BufferBuilder中
      bufferBuilder = requestNewBufferBuilder(targetChannel);
      result = serializer.copyToBufferBuilder(bufferBuilder);
   }
   checkState(!serializer.hasSerializedData(), "All data should be written at once");
     // 如果指定的flushAlways,则直接调用flushTargetPartition将数据写入ResultPartition
   if (flushAlways) {
      flushTargetPartition(targetChannel);
   }
   return pruneTriggered;
}

 

二. BufferBuilder申请资源并创建

1. ChannelSelectorRecordWriter创建BufferBuilder

在ChannelSelectorRecordWriter.getBufferBuilder()方法中定义了BufferBuilder的创建过程。

//1. targetChannel确认数据写入的分区,ID与下游InputGate中的InputChannelID是对应的
//2. 
public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, 
   InterruptedException {
	//在ChannelSelectorRecordWriter中维护了
	//bufferBuilders[]数组,用于存储创建好的BufferBuilder对象
   if (bufferBuilders[targetChannel] != null) {
      return bufferBuilders[targetChannel];
   } else {
   //只有在无法从bufferBuilders[]中获取BufferBuilder时,
   //才会调用requestNewBufferBuilder()方法创建新的BufferBuilder对象。
      return requestNewBufferBuilder(targetChannel);
   }
}

requestNewBufferBuilder()方法逻辑如下

public BufferBuilder requestNewBufferBuilder(int targetChannel) throws 
   IOException, InterruptedException {
   checkState(bufferBuilders[targetChannel] == null 
              || bufferBuilders[targetChannel].isFinished());
   // 调用targetPartition获取BufferBuilder
   BufferBuilder bufferBuilder = targetPartition.getBufferBuilder();
   // 向targetPartition中添加BufferConsumer
   targetPartition.addBufferConsumer(bufferBuilder.createBufferConsumer(),
                                     targetChannel);
   // 将创建好的bufferBuilder添加至数组
   bufferBuilders[targetChannel] = bufferBuilder;
   return bufferBuilder;
}

 

2. BroadcastRecordWriter创建BufferBuilder

在BroadcastRecordWriter内部创建BufferBuilder的过程中,会将创建的bufferConsumer对象添加到所有的ResultSubPartition中,实现将Buffer数据下发至所有InputChannel,如下代码:

public BufferBuilder requestNewBufferBuilder(int targetChannel) 
    throws IOException, InterruptedException {
   checkState(bufferBuilder == null || bufferBuilder.isFinished());
   BufferBuilder builder = targetPartition.getBufferBuilder();
   if (randomTriggered) {
      targetPartition.addBufferConsumer(builder.createBufferConsumer(), targetChannel);
   } else {
      try (BufferConsumer bufferConsumer = builder.createBufferConsumer()) {
         for (int channel = 0; channel < numberOfChannels; channel++) {
            targetPartition.addBufferConsumer(bufferConsumer.copy(), channel);
         }
      }
   }
   bufferBuilder = builder;
   return builder;
}

 

这是从Task的层面了解数据网络传输过程,下篇了解在TaskManager中如何构建底层的网络传输通道。

 

03-08 01:11