一、背景说明

在Flink中对流数据进行去重计算是常有操作,如流量域对独立访客之类的统计,去重思路一般有三个:

  • 基于Hashset来实现去重
    数据存在内存,容量小,服务重启会丢失。
  • 使用状态编程ValueState/MapState实现去重
    常用方式,可以使用内存/文件系统/RocksDB作为状态后端存储。
  • 结合Redis使用布隆过滤器实现去重
    适用对上亿数据量进行去重实现,占用资源少效率高,有小概率误判。

这里以自定义布隆过滤器的方式,实现Flink窗口计算中独立访客的统计,数据集样例如下:

Flink去重统计-基于自定义布隆过滤器-LMLPHP

二、布隆过滤器部分说明

Flink去重统计-基于自定义布隆过滤器-LMLPHP

布隆过滤器简单点说就是哈希算法+bitmap,如上图,对字符串结合多种哈希算法,基于bitmap作为存储,由于只用0/1存储,所以可以大量节省存储空间,也就特别适合在上百亿数据里面做去重这种动作。在后续要进行字符串查找时,对要查找的字符串同样计算这多个哈希算法,根据在bitmap上的位置,可以确认该字符串一定不在或者极大概率在(由于哈希冲突问题会有极小概率误判)。

在本次开发中,使用自定义的布隆过滤器,其中对哈希算法部分做了几点优化:

  • 结合Redis使用,Redis原生支持bitmap
    Flink去重统计-基于自定义布隆过滤器-LMLPHP
  • 对bitmap容量扩容,一般为数据的3-10倍,这里使用2^30,使用2的整数幂,能让后续查找输出使用位与运算,实现比取模查找更高的效率。
myBloomFilter = new MyBloomFilter(1 << 30);
  • 优化哈希算法,这里对要查找的id转为char类型,并行单个剔除后基于Unicode编码乘以质数31再相加,来避免不同字符串计算出同样哈希值的问题。
for (char c : value.toCharArray()){
                result += result * 31 + c;
            }

另外,谷歌提供的工具Guava也包含了布隆过滤器,加入相关依赖即可使用,主要参数如下源码,输入要建立的过滤器容器大小及误判概率即可。

public static <T> BloomFilter<T> create(Funnel<? super T> funnel, int expectedInsertions, double fpp) {
        return create(funnel, (long)expectedInsertions, fpp);
    }

三、代码部分

package com.test.UVbloomfilter;

import bean.UserBehavior;
import bean.UserVisitorCount;
import java.sql.Timestamp;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import redis.clients.jedis.Jedis;

public class UserVisitorTest {
    public static void main(String[] args) throws Exception {
        //建立环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //env.setParallelism(1);
        //指定时间语义
        WatermarkStrategy<UserBehavior> wms = WatermarkStrategy
                .<UserBehavior>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<UserBehavior>() {
            @Override
            public long extractTimestamp(UserBehavior element, long recordTimestamp) {
                return element.getTimestamp() * 1000L;
            }
        });
        //读取数据、映射、过滤
        SingleOutputStreamOperator<UserBehavior> userBehaviorDS = env
                .readTextFile("input/UserBehavior.csv")
                .map(new MapFunction<String, UserBehavior>() {
                    @Override
                    public UserBehavior map(String value) throws Exception {
                        String[] split = value.split(",");
                        return new UserBehavior(Long.parseLong(split[0])
                                , Long.parseLong(split[1])
                                , Integer.parseInt(split[2])
                                , split[3]
                                , Long.parseLong(split[4]));
                    }
                })
                //.filter(data -> "pv".equals(data.getBehavior()))  //lambda表达式写法
                .filter(new FilterFunction<UserBehavior>() {
                    @Override
                    public boolean filter(UserBehavior value) throws Exception {
                        if (value.getBehavior().equals("pv")) {
                            return true;
                        }return false; }})
                .assignTimestampsAndWatermarks(wms);

        //去重按全局去重,故使用行为分组,仅为后续开窗使用、开窗
        WindowedStream<UserBehavior, String, TimeWindow> windowDS = userBehaviorDS.keyBy(UserBehavior::getBehavior)
                .window(TumblingEventTimeWindows.of(Time.hours(1)));

        SingleOutputStreamOperator<UserVisitorCount> processDS = windowDS
                .trigger(new MyTrigger()).process(new UserVisitorWindowFunc());

        processDS.print();
        env.execute();
    }

    //自定义触发器:来一条计算一条(访问Redis一次)
    private static class MyTrigger extends Trigger<UserBehavior, TimeWindow> {
        @Override
        public TriggerResult onElement(UserBehavior element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
            return TriggerResult.FIRE_AND_PURGE; //触发计算和清除窗口元素。
        }
        @Override
        public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
            return TriggerResult.CONTINUE;
        }
        @Override
        public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
            return TriggerResult.CONTINUE;
        }
        @Override
        public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        }
    }

    private static class UserVisitorWindowFunc extends ProcessWindowFunction<UserBehavior,UserVisitorCount,String,TimeWindow>  {
        //声明Redis连接
        private Jedis jedis;

        //声明布隆过滤器
        private MyBloomFilter myBloomFilter;

        //声明每个窗口总人数的key
        private String hourUVCountKey;

        @Override
        public void open(Configuration parameters) throws Exception {
            jedis = new Jedis("hadoop102",6379);
            hourUVCountKey = "HourUV";
            myBloomFilter = new MyBloomFilter(1 << 30); //2^30
        }

        @Override
        public void process(String s, Context context, java.lang.Iterable<UserBehavior> elements, Collector<UserVisitorCount> out) throws Exception {
            //1.取出数据
            UserBehavior userBehavior = elements.iterator().next();
            //2.提取窗口信息
            String windowEnd = new Timestamp(context.window().getEnd()).toString();
            //3.定义当前窗口的BitMap Key
            String bitMapKey = "BitMap_" + windowEnd;
            //4.查询当前的UID是否已经存在于当前的bitMap中
            long offset = myBloomFilter.getOffset(userBehavior.getUserId().toString());
            Boolean exists = jedis.getbit(bitMapKey, offset);

            //5.根据数据是否存在做下一步操作
            if (!exists){
                //将对应offset位置改为1
                jedis.setbit(bitMapKey,offset,true);
                //累加当前窗口的综合
                jedis.hincrBy(hourUVCountKey,windowEnd,1);
            }
            //输出数据
            String hget = jedis.hget(hourUVCountKey, windowEnd);
            out.collect(new UserVisitorCount("UV",windowEnd,Integer.parseInt(hget)));
        }
    }

    private static class MyBloomFilter {
        //减少哈希冲突优化1:增加过滤器容量为数据3-10倍
        //定义布隆过滤器容量,最好传入2的整次幂数据
        private long cap;

        public MyBloomFilter(long cap) {
            this.cap = cap;
        }
        //传入一个字符串,获取在BitMap中的位置
       public long getOffset(String value){
            long result = 0L;

            //减少哈希冲突优化2:优化哈希算法
            //对字符串每个字符的Unicode编码乘以一个质数31再相加
            for (char c : value.toCharArray()){
                result += result * 31 + c;
            }
            //取模,使用位与运算代替取模效率更高
           return  result & (cap - 1);
       }}}

输出结果在Redis查看如下:

Flink去重统计-基于自定义布隆过滤器-LMLPHP

学习交流,有任何问题还请随时评论指出交流。

05-20 15:15