问题描述

场景描述: flink job 的开发过程中遇到这样的需求,需要先 map 处理,然后把返回的 DataStream 作为输入,流入别的 map 中。这里我们遇到的场景是从原来的 map 流到 AsyncDataStream 中。
大概的 java 代码为:

        SingleOutputStreamOperator<xxxxx> task = source.rebalance()
                .map(new XXXXMapFunction())
                .uid("xxxxxx")
                .name("xxxx");
        DataStream<yyyyy> dataStreamHyKontrast = AsyncDataStream
                .unorderedWait(task, new YYYYRequestMapFunction(), defaultTimeoutMills, TimeUnit.SECONDS)
                .name("yyyyyy")
                .uid("yyyyyy");
        .......

问题描述: 开发完成本地运行flink入口 main 方法的时候,错误提示如下:

Caused by: java.lang.RuntimeException: Assigned key must not be null!
  at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:103)
  at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:87)
  at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:43)
  at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
  at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
  at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
  at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
  at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
  at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
  at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
  at org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.emitResult(StreamRecordQueueEntry.java:64)
  at org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueue$Segment.emitCompleted(UnorderedStreamElementQueue.java:272)
  at org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueue.emitCompletedElement(UnorderedStreamElementQueue.java:159)
  at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.outputCompletedElement(AsyncWaitOperator.java:298)
  at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.access$100(AsyncWaitOperator.java:78)
  at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.processResults(AsyncWaitOperator.java:371)
  at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.lambda$processInMailbox$0(AsyncWaitOperator.java:352)
  at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
  at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
  at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
  at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
  at java.lang.Thread.run(Thread.java:750)

解决方法

我们避免数据直接从第一个 map 过程后直接流向第二个 AsynMap,中间添加一个处理过程,尽管这个处理过程我们啥也不干。

        SingleOutputStreamOperator<xxxxx> task = source.rebalance()
                .map(new XXXXMapFunction())
                .uid("xxxxxx")
                .name("xxxx");
        // 这里添加一个处理过程,task -> process ,然后process到下一个 map
        SingleOutputStreamOperator<KontrastAlgoTaskStated> process = task.process(new DefaultProcessFunction<>());
        DataStream<yyyyy> dataStreamHyKontrast = AsyncDataStream
                .unorderedWait(process, new YYYYRequestMapFunction(), defaultTimeoutMills, TimeUnit.SECONDS)
                .name("yyyyyy")
                .uid("yyyyyy");
 

其中的 DefaultProcessFunction 是我自己编写的,就是什么也不做。

import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

/**
 * 默认的 collect,用于处理 map 后不做任何处理直接到下一个结点
 * @author smileyan
 * @param <IN> 对应的实体类
 */
public final class DefaultProcessFunction<IN> extends ProcessFunction<IN, IN> {

    @Override
    public void processElement(IN value, ProcessFunction<IN, IN>.Context ctx, Collector<IN> out) throws Exception {
        out.collect(value);
    }
}

总结

遇到问题后查了一下,都没有找到我这种情况的博客。后来折腾了一会儿发现如上方法可以解决问题,特此记录,希望可能帮到遇到相同问题的小伙伴 ~ 感谢阅览 ~

07-19 07:17