之前的流处理API,无论是基本的转换、聚合,还是更为复杂的窗口操作,其实都是基于DataStream进行转换的,所以可以统称为DataStream API。

在Flink更底层,我们可以不定义任何具体的算子(比如map,filter,或者window),而只是提炼出一个统一的“处理”(process)操作——它是所有转换算子的一个概括性的表达,可以自定义处理逻辑,所以这一层接口就被叫作“处理函数”(process function

Flink的处理函数-LMLPHP

一.基本处理函数(ProcessFunction

1.1 处理函数的功能和使用

转换算子一般针对某种具体操作来定义的,能拿到的信息有限。而使用底层的处理函数,则可以使用处理函数提供的“定时服务”(TimerServer) 来获取到当前当前水位线、事件等更为详细的信息,及注册“定时事件”。而且处理函数继承了AbstractRichFunction抽象类,所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息。此外,处理函数还可以直接将数据输出到侧输出流(side output)中。所以,处理函数是最为灵活的处理方法,可以实现各种自定义的业务逻辑。

1.2 ProcessFunction解析

ProcessFunction的使用就基于 DataStream 调用 .process() 方法传入一个ProcessFunction作为参数,用来定义处理逻辑。

从源码可以看到,ProcessFunction 继承了 AbstractRichFunction(抽象富函数类),两个泛类型参数代表输入类型与输出类型。里面单独定于了两个非常重要的方法:一个是必须要实现的抽象方法.processElement();另一个是非抽象方法.onTimer()。

  • processElement():具体的数据处理逻辑,且对于流中的每个元素都会调用一次。三个参数分别为 value当前数据、ctx上下文、out采集器。
  • onTimer():当注册的定时器被触发,会执行该方法。三个参数分别为 timestamp时间戳、ctx上下文、out采集器。Flink中,只有“按键分区流”KeyedStream才支持设置定时器的操作。

Flink的处理函数-LMLPHP

通过几个参数的分析不难发现,ProcessFunction可以轻松实现flatMap、map、filter这样的基本转换功能;而通过富函数提供的获取上下文方法.getRuntimeContext(),也可以自定义状态(state)进行处理,这也就能实现聚合操作的功能了。

1.3 处理函数的分类 

DataStream在调用一些转换方法之后,有可能生成新的流类型;例如调用 .KeyBy() 后得到的是KeyedStream,然后调用 .window() 后得到的 WindowedStream。但是对于不同的流类型,都可以直接调用 .process() 方法进行自定义处理,此时传入的参数就叫做处理函数。当然,它们尽管本质相同,都是可以访问状态和时间信息的底层API,可彼此之间也会有所差异。

Flink提供了8个不同的处理函数:

(1) ProcessFunction

最基本的处理函数,基于DataStream直接调用.process()时作为参数传入。 

(2) KeyedProcessFunction

对流按键分区后的处理函数,基于KeyedStream调用.process()时作为参数传入。要想使用定时器,比如基于KeyedStream。

(3) ProcessWindowFunction

开窗之后的处理函数,也是全窗口函数的代表。基于WindowedStream调用.process()时作为参数传入。

(4) ProcessAllWindowFunction

同样是开窗之后的处理函数,基于AllWindowedStream调用.process()时作为参数传入。

(5) CoProcessFunction

合并(connect)两条流之后的处理函数,基于ConnectedStreams调用.process()时作为参数传入。

(6) ProcessJoinFunction

间隔连接(interval join)两条流之后的处理函数,基于IntervalJoined调用.process()时作为参数传入。

(7) BroadcastProcessFunction

广播连接流处理函数,基于BroadcastConnectedStream调用.process()时作为参数传入。这里的“广播连接流”BroadcastConnectedStream,是一个未keyBy的普通DataStream与一个广播流(BroadcastStream)做连接(conncet)之后的产物。

(8) KeyedBroadcastProcessFunction

按键分区的广播连接流处理函数,同样是基于BroadcastConnectedStream调用.process()时作为参数传入。与BroadcastProcessFunction不同的是,这时的广播连接流,是一个KeyedStream与广播流(BroadcastStream)做连接之后的产物。

二.按键分区处理函数(KeyedProcessFunction

上面提到,只有在KeyedStream中才支持使用TimerService设置定时器的操作。所以我们一般是先对流做 KeyBy 分区操作后,再去调用 .process() 定义具体的操作逻辑逻辑;一般传入 KeyedProcessFunction。

2.1 定时器(Timer)和定时服务(TimerService)

在.onTimer()方法中可以实现定时处理的逻辑,当之前注册的定时器被触发,则会调用该方法。注册定时器的功能,是通过上下文中提供的“定时服务”来实现的。

通过KeyedProcessFunction提供的上下文可以获取以下等内容:

ds
    .keyBy( t -> t.getId())
    .process(new KeyedProcessFunction<String, WaterSensor, Object>() {

        /**
         * 来一条数据调用一次
         * @param value 当前输入的数据
         * @param ctx 上下文信息
         * @param out 采集器
         * @throws Exception
         */
        @Override
        public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, Object>.Context ctx, Collector<Object> out) throws Exception {

            // 获取定时服务
            TimerService timerService = ctx.timerService();

            // 注册定时器:以事件时间为基准
            timerService.registerEventTimeTimer(long time);

            // 注册定时器:以处理时间为基准
            timerService.registerProcessingTimeTimer(long time);

            // 当前的处理时间:即系统时间
            timerService.currentProcessingTime();

            // 删除触发时间为time的处事件时间定时器
            timerService.deleteEventTimeTimer(long time);

            // 删除触发时间为time的处理时间定时器
            timerService.deleteEventTimeTimer(long time);

            // 获取当前水位线 ***获取的上一条数据的水位线
            timerService.currentWatermark();

        }    

        /**
         * 时间进展到定时器注册的时间则会调用该方法
         * @param timestamp 当前时间戳
         * @param ctx 上下文信息
         * @param out 采集器
         * @throws Exception
         */
        @Override
        public void onTimer(long timestamp, KeyedProcessFunction<String, WaterSensor, Object>.OnTimerContext ctx, Collector<Object> out) throws Exception {

        }
    });

TimerService会以键(key)和时间戳为标准,对定时器进行去重;也就是说对于每个key和时间戳,最多只有一个定时器,如果注册了多次,onTimer()方法也将只被调用一次。

2.2 KeyedProcessFunction案例 

public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.setParallelism(1);

    SingleOutputStreamOperator<WaterSensor> sensorDS = env
                    .socketTextStream("xxx.xxx.xxx.xxx", 1234)
                    .map(new MyMapFunctionImpl());

    // ***定义 WaterMark 策略
    WatermarkStrategy<WaterSensor> waterSensorWatermarkStrategy = WatermarkStrategy
            .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 设置最大等待时间为3s
            .withTimestampAssigner((SerializableTimestampAssigner<WaterSensor>) (waterSensor, l) -> waterSensor.getTs() * 1000L);
    // ***指定 watermark策略
    SingleOutputStreamOperator<WaterSensor> sensorWithWaterMark = sensorDS
            .assignTimestampsAndWatermarks(waterSensorWatermarkStrategy);

    sensorWithWaterMark
            .keyBy( t -> t.getId())
            .process(new KeyedProcessFunction<String, WaterSensor, Object>() {
            /**
             * 来一条数据调用一次
             * @param value 当前输入的数据
             * @param ctx 上下文信息
             * @param out 采集器
             * @throws Exception
             */
            @Override
            public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, Object>.Context ctx, Collector<Object> out) throws Exception {
                // 获取定时服务
                TimerService timerService = ctx.timerService();
                // 当前 Key
                String currentKey = ctx.getCurrentKey();
                // 数据中的事件时间
                Long timestamp = ctx.timestamp();
                // 注册以事件时间为基准的5s定时器
                timerService.registerEventTimeTimer(5000);
                System.out.println("当前 Key 为=" + currentKey + "当前时间为=" + timestamp + "注册了一个5s的定时器");

            }

            /**
             * 时间进展到定时器注册的时间则会调用该方法
             * @param timestamp 当前时间戳
             * @param ctx 上下文信息
             * @param out 采集器
             * @throws Exception
             */
            @Override
            public void onTimer(long timestamp, KeyedProcessFunction<String, WaterSensor, Object>.OnTimerContext ctx, Collector<Object> out) throws Exception {
                // 当前 Key
                String currentKey = ctx.getCurrentKey();
                System.out.println("当前 Key 为=" + currentKey + "现在时间为" + timestamp + "定时器触发");
            }
        });

    env.execute();

}

 输入:

[root@VM-55-24-centos ~]# nc -lk 1234
S1,1,1
S1,4,4
S2,3,3
S2,5,5
S3,8,8
S3,9,9

输出:

当前 Key 为=S1当前时间为=1000注册了一个5s的定时器
当前 Key 为=S1当前时间为=4000注册了一个5s的定时器
当前 Key 为=S2当前时间为=3000注册了一个5s的定时器
当前 Key 为=S2当前时间为=5000注册了一个5s的定时器
当前 Key 为=S3当前时间为=8000注册了一个5s的定时器
当前 Key 为=S3当前时间为=9000注册了一个5s的定时器 // 触发定时器 9000ms-3000ms-1ms = 5999
当前 Key 为=S1现在时间为5000定时器触发
当前 Key 为=S3现在时间为5000定时器触发
当前 Key 为=S2现在时间为5000定时器触发

2.3 KeyedProcessFunction中的当前Watermark

在实现的 processElement() 中获取当前水位线

sensorWithWaterMark
        .keyBy( t -> t.getId())
        .process(new KeyedProcessFunction<String, WaterSensor, Object>() {
        /**
         * 来一条数据调用一次
         * @param value 当前输入的数据
         * @param ctx 上下文信息
         * @param out 采集器
         * @throws Exception
         */
        @Override
        public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, Object>.Context ctx, Collector<Object> out) throws Exception {
            
            // 获取定时服务
            TimerService timerServiceService = ctx.timerService();

            // 获取当前水位线
            long currentWatermark = timerServiceService.currentWatermark();

            System.out.println("当前数据为=" + value + "当前水位线为=" + currentWatermark );
        }
    });

输入:

[root@VM-55-24-centos ~]# nc -lk 1234
s1,1,1
s1,3,3
s1,6,6
s1,8,8

输出:

当前数据为=WaterSensor{id='s1', ts=1, vc=1}当前水位线为=-9223372036854775808
当前数据为=WaterSensor{id='s1', ts=3, vc=3}当前水位线为=-2001
当前数据为=WaterSensor{id='s1', ts=6, vc=6}当前水位线为=-1
当前数据为=WaterSensor{id='s1', ts=8, vc=8}当前水位线为=2999

可以看到,在process中的当前水位线其实是 上一条数据的事件时间 - 水位线延迟时间 - 1ms。

2.4 KeyedProcessFunction 小结

其他处理函数类似。

三. 应用案例——Top N

案例需求:实时统计一段时间内的出现次数最多的水位。例如,统计最近10秒钟内出现次数最多的两个水位,并且每5秒钟更新一次。我们知道,这可以用一个滑动窗口来实现。于是就需要开滑动窗口收集传感器的数据,按照不同的水位进行统计,而后汇总排序并最终输出前两名。这其实就是著名的“Top N”问题。 

3.1 使用ProcessAllWindowFunction

思路:直接开窗,使用全窗口函数处理窗口内所有的数据,使用HashMap存储,再对map进行统计排序输出。统计十秒内数据,五秒输出一次,其实就是滑动窗口大小为10,滑动步长为5。

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.setParallelism(1);

    SingleOutputStreamOperator<WaterSensor> sensorDS = env
                    .socketTextStream("xxx.xxx.xxx.xxx", 1234)
                    .map(new MyMapFunctionImpl());

    WatermarkStrategy<WaterSensor> waterSensorWatermarkStrategy = WatermarkStrategy
            .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 设置最大等待时间为3s
            .withTimestampAssigner((SerializableTimestampAssigner<WaterSensor>) (waterSensor, l) -> waterSensor.getTs() * 1000L);

    SingleOutputStreamOperator<WaterSensor> sensorWithWaterMark = sensorDS
            .assignTimestampsAndWatermarks(waterSensorWatermarkStrategy);

    sensorWithWaterMark
            // 滑动窗口,窗口大小10s,滑动步长5s
            .windowAll(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5)))
            .process(new MyProcessAllWindowFunction())
            .print();
    env.execute();
}

    /**
     * 自定义全窗口处理函数
     *      全窗口函数:窗口触发时调用一次
     */
    public static class MyProcessAllWindowFunction extends ProcessAllWindowFunction<WaterSensor , String , TimeWindow>{
        @Override
        public void process(ProcessAllWindowFunction<WaterSensor, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
            // 定义一个hashmap用来存,key=vc,value=count值
            Map<Integer, Integer> vcCountMap = new HashMap<>();
            for (WaterSensor element : elements) {
                Integer vc = element.getVc();
                if (vcCountMap.containsKey(vc)) {
                    vcCountMap.put(vc, vcCountMap.get(vc) + 1);
                } else {
                    vcCountMap.put(vc, 1);
                }
            }

            List<Map.Entry<Integer, Integer>> list = new ArrayList<>(vcCountMap.entrySet());

            // 使用Collections.sort()按value降序排序
            Collections.sort(list, (o1, o2) ->  o2.getValue() - o1.getValue());
            System.out.println(list);
            StringBuffer result = new StringBuffer();
            for (int i = 0; i < Math.min(2 , list.size()); i++) {
                result.append("Top " + (i+1) + ": vc = " + list.get(i).getKey() + ",出现次数 = " + list.get(i).getValue());
                result.append("\n");
            }
            out.collect(result.toString());
        }
    }

}

输入:

[root@VM-55-24-centos ~]# nc -lk 1234
s1,1,1
s2,3,1
s1,5,2
s3,6,2
s1,6,1
s2,7,3
s3,8,1
s1,8,3
s2,9,2
s1,10,1
s3,11,2
s1,13,2

输出:

// 注释:[ 0 , 5 ] 的窗口
Top 1: vc = 1,出现次数 = 2 

// 注释:[ 0 , 10 ] 的窗口
Top 1: vc = 1,出现次数 = 4
Top 2: vc = 2,出现次数 = 3

3.2 使用KeyedProcessFunction

上面的方法使用全窗口将所有的数据都放在一个分区内,强行将并行度设置成了1,这是Flink不推荐的做法。

则可以使用KeyedProcessFunction进行优化:

1.对统计字段(vc)进行 KeyBy 分区

2.进行增量聚合,统计vc出现的次数,封装数据(vc,count,窗口标记(窗口结束数据))

3.对标记(窗口)进行分组,对数据进行排序、取TopN

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.setParallelism(1);

    SingleOutputStreamOperator<WaterSensor> sensorDS = env
                    .socketTextStream("xxx.xxx.xxx.xxx", 1234)
                    .map(new MyMapFunctionImpl())
                    .assignTimestampsAndWatermarks(WatermarkStrategy
                                            .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 设置最大等待时间为3s
                                            .withTimestampAssigner((SerializableTimestampAssigner<WaterSensor>) (waterSensor, l) -> waterSensor.getTs() * 1000L)
                    );

    // 对 vc 进行分组,统计窗口内vc出现的次数 将每条数据封装成 vc,count,窗口结束时间
    SingleOutputStreamOperator<Tuple3<Integer, Integer, Long>> windowAgg  = sensorDS
            .keyBy(sensor -> sensor.getVc())
            .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
            .aggregate(
                    new VcCountAgg(),
                    new WindowResult()
            );

    // 按照打的标记(windowEndTime)进行分组,保证同一个窗口时间范围的结果一起 , 分组后排序,取 TopN
    windowAgg.keyBy(v -> v.f2)
                    .process(new TopN(2))
                    .print();
    env.execute();
}
/**
 * 增量聚合:累计同分组出现的次数
 */
public static class VcCountAgg implements AggregateFunction<WaterSensor,Integer,Integer> {

    @Override
    public Integer createAccumulator() {
        return 0;
    }

    @Override
    public Integer add(WaterSensor waterSensor, Integer integer) {
        return integer+1;
    }

    @Override
    public Integer getResult(Integer integer) {
        return integer;
    }

    @Override
    public Integer merge(Integer integer, Integer acc1) {
        return null;
    }
}


/**
 * 全窗口函数,窗口内数据全部到达才会执行一次
 * 泛型如下:
 * 第一个:输入类型 = 增量函数的输出  count值,Integer
 * 第二个:输出类型 = Tuple3(vc,count,windowEndTime) ,带上 窗口结束时间 的标签
 * 第三个:key类型 , vc,Integer
 * 第四个:窗口类型
 */
public static class WindowResult extends ProcessWindowFunction<Integer, Tuple3<Integer, Integer, Long>, Integer, TimeWindow> {

    @Override
    public void process(Integer vc, ProcessWindowFunction<Integer, Tuple3<Integer, Integer, Long>, Integer, TimeWindow>.Context context, Iterable<Integer> elements, Collector<Tuple3<Integer, Integer, Long>> out) throws Exception {
        // 获取迭代器的数据(只有一条数据)
        Integer count = elements.iterator().next();
        // 获取窗口结束时间 作为标记
        long endTime = context.window().getEnd();
        // 将 vc、vc对应的count 连带窗口标记 返回
        out.collect(Tuple3.of(vc , count , endTime));
    }
}
/**
 *  处理组内的每一条数据 一条数据触发一次
 */
public static class TopN extends KeyedProcessFunction<Long, Tuple3<Integer, Integer, Long>, String> {

    // 存不同窗口的 统计结果,key=windowEnd,value=list数据
    private Map<Long, List<Tuple3<Integer, Integer, Long>>> dataListMap;

    // 要取的Top数量
    private int threshold;

    public TopN(int threshold) {
        this.threshold = threshold;
        dataListMap = new HashMap<>();
    }


    @Override
    public void processElement(Tuple3<Integer, Integer, Long> value, KeyedProcessFunction<Long, Tuple3<Integer, Integer, Long>, String>.Context ctx, Collector<String> out) throws Exception {
        // 进入这个方法,只是一条数据,要排序,得到齐才行 ===》 存起来,不同窗口分开存
        Long windowEnd = value.f2;
        // 将对应的窗口的数据放入map中对应key的list中
        if(dataListMap.containsKey(windowEnd)){
            // 该 vc 存在,则直接添加到数组中
            dataListMap.get(windowEnd).add(value);
        }else{
            // 不包含vc,是该vc的第一条
            List<Tuple3<Integer, Integer, Long>> dataList = new ArrayList<>();
            dataList.add(value);
            dataListMap.put(windowEnd , dataList);
        }

        // 注册一个定时器 窗口触发时触发定时器(windowEndTime + 1ms) 输出计算结果
        // 同一个窗口范围,应该同时输出,只不过是一条一条调用processElement方法,只需要延迟1ms即可
        ctx.timerService().registerProcessingTimeTimer(windowEnd + 1);
    }

    // 注册一个定时器 窗口触发时进行排序,取TopN,输出结果
    @Override
    public void onTimer(long timestamp, KeyedProcessFunction<Long, Tuple3<Integer, Integer, Long>, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
        super.onTimer(timestamp, ctx, out);
        // 定时器触发,同一个窗口范围的计算结果攒齐了,开始 排序、取TopN
        Long windowEnd = ctx.getCurrentKey();
        // 1. 排序
        List<Tuple3<Integer, Integer, Long>> dataList = dataListMap.get(windowEnd);
        dataList.sort((o1, o2) -> o2.f1 - o1.f1);

        // 2. 取TopN
        StringBuilder outStr = new StringBuilder();

        outStr.append("================================\n");
        // 遍历 排序后的 List,取出前 threshold 个, 考虑可能List不够2个的情况  ==》 List中元素的个数 和 2 取最小值
        for (int i = 0; i < Math.min(threshold, dataList.size()); i++) {
            Tuple3<Integer, Integer, Long> vcCount = dataList.get(i);
            outStr.append("Top" + (i + 1) + "\n");
            outStr.append("vc=" + vcCount.f0 + "\n");
            outStr.append("count=" + vcCount.f1 + "\n");
            outStr.append("窗口结束时间=" + vcCount.f2 + "\n");
            outStr.append("================================\n");
        }

        out.collect(outStr.toString());

    }
}

输入:

[root@VM-55-24-centos ~]# nc -lk 1234
s1,1,1
s1,2,1
s2,3,3
s1,4,3
s1,5,1
s2,8,1
s1,9,3
s3,11,3
s1,12,3
s1,13,3

输出:

================================
Top1
vc=1
count=2
窗口结束时间=5000
================================
Top2
vc=3
count=2
窗口结束时间=5000
================================

================================
Top1
vc=1
count=4
窗口结束时间=10000
================================
Top2
vc=3
count=3
窗口结束时间=10000
================================

3.3 侧输出流(Side Output

侧输出流可以认为是“主流”上分叉出的“支流”,所以可以由一条流产生出多条流,而且这些流中的数据类型还可以不一样。利用这个功能可以很容易地实现“分流”操作。

具体实现上,可以在处理函数中的上下文中调用 .output() 方法即可。

案例:对每个传感器,水位超过10则输出告警信息。

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.setParallelism(1);

    SingleOutputStreamOperator<WaterSensor> sensorDS = env
                    .socketTextStream("xxx.xxx.xxx.xxx", 1234)
                    .map(new MyMapFunctionImpl())
                    .assignTimestampsAndWatermarks(WatermarkStrategy
                                            .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 设置最大等待时间为3s
                                            .withTimestampAssigner((SerializableTimestampAssigner<WaterSensor>) (waterSensor, l) -> waterSensor.getTs() * 1000L)
                    );

    // 定义输出流标签
    OutputTag<String> warnTag = new OutputTag<String>("warn-tag", Types.STRING);
    
    SingleOutputStreamOperator<WaterSensor> process = sensorDS
            .keyBy(WaterSensor::getId)
            .process(new KeyedProcessFunction<String, WaterSensor, WaterSensor>() {

                @Override
                public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, WaterSensor>.Context ctx, Collector<WaterSensor> out) throws Exception {
                    if (value.getVc() >= 10) {
                        // 使用侧输出流告警
                        ctx.output(warnTag, "当前水位线:" + value.getVc() + ",触发阈值10!");
                    }
                    out.collect(value);
                }
            });

    // 输出主流
    process.print("主流");

    // 输出侧流
    process.getSideOutput(warnTag).printToErr("侧流-Warn");

    env.execute();
}

输入:

[root@VM-55-24-centos ~]# nc -lk 1234
s1,1,3
s2,5,7
s1,9,10
s1,5,9
s5,11,11

输出:
 

主流> WaterSensor{id='s1', ts=1, vc=3}
主流> WaterSensor{id='s2', ts=5, vc=7}
侧流-Warn> 当前水位线:10,触发阈值10!
主流> WaterSensor{id='s1', ts=9, vc=10}
主流> WaterSensor{id='s1', ts=5, vc=9}
侧流-Warn> 当前水位线:11,触发阈值10!
主流> WaterSensor{id='s5', ts=11, vc=11}
12-17 11:33