本文原创地址:https://www.cnblogs.com/xijiu/p/14235551.html 转载请注明

赛题

实现一个分布式统计和过滤的链路追踪

  • 赛题背景

    本题目是另外一种采样方式(tail-based Sampling),只要请求的链路追踪数据中任何节点出现重要数据特征(错慢请求),这个请求的所有链路数据都采集。目前开源的链路追踪产品都没有实现tail-based Sampling,主要的挑战是:任何节点出现符合采样条件的链路数据,那就需要把这个请求的所有链路数据采集。即使其他链路数据在这条链路节点数据之前还是之后产生,即使其他链路数据在分布式系统的成百上千台机器上产生。

  • 整体流程

    用户需要实现两个程序,一个是数量流(橙色标记)的处理程序,该机器可以获取数据源的http地址,拉取数据后进行处理,一个是后端程序(蓝色标记),和客户端数据流处理程序通信,将最终数据结果在后端引擎机器上计算。具体描述可直接打开赛题地址 https://tianchi.aliyun.com/competition/entrance/231790/information。此处不再赘述

    《中间件性能挑战赛--分布式统计和过滤的链路追踪》java 选手分享-LMLPHP

解题

题目分析

可将整体流程粗略分为三大部分

  • 一、front 读取 http stream 至 front 节点
    • 网络io
  • 二、front 节点处理数据
    • cpu处理
  • 三、将 bad traces 同步至 backend,backend 经过汇总计算完成上报
    • 网络传输 + cpu

遵循原则:各部分协调好,可抽象为生成、消费模型,切勿产生数据饥饿;理想效果是stream流完,计算也跟着马上结束

方案一 (trace粒度)

因题目中明确表明某个 trace 的数据出现的位置前后不超过2万上,故每2万行是非常重要的特征

《中间件性能挑战赛--分布式统计和过滤的链路追踪》java 选手分享-LMLPHP

  • 按行读取字符流
    • BufferedReader.readLine()
  • 分析计算
    • 在某个 trace 首次出现时(p),记录其结束位置 p + 2w,并将其放入待处理队列(queue)中
    • 如果当前 trace 为 badTrace,将其放入 BadTraceSet 中
    • 每处理一行,均从 queue 队列中拿出 firstElement,判断是否与之相等,如果相等,且为 badTrace,那么进入第3步
  • 向 backend 发送数据 注:后续所有涉及网络交互的部门均为 netty 异步交互
    • 将当前 trace 对应的所有数据按照 startTime 排序
    • 将排好序的数据与该 trace 最后结束位置 endPosition 一并发送至 backend 节点
  • backend 通知 front2 发送数据
    • backend 接收到从 front1 发送过来的 trace 数据,向 front2 发送通知,要求其发送该 trace 的全部数据
    • 注:此处交互有多种情况,在步骤5时,会具体说明
  • front2 将数据发送至 backend 此处列举所有可能发生的情况
    • 《中间件性能挑战赛--分布式统计和过滤的链路追踪》java 选手分享-LMLPHP
    • 场景 1:front1 主动上报 traceA 后,发现 front2 已经上报该 traceA 数据,结束
    • 场景 2:front1 主动上报 traceA 后,front2 未上报,front2 发现该trace在已就绪集合中,排序、上报,结束
    • 场景 3:front1 主动上报 traceA 后,front2 未上报,且 front2 的已就绪集合没有该 trace,在错误集合中发现该 trace,结束 注:因该 trace 存在于 badTraceSet 中,故将来某个时刻 front2 一定会主动上报该 trace
    • 场景 4:front1 主动上报 traceA 后,front2 未上报,且 front2 的已就绪集合没有该 trace,那么等待行数超限后,检查该 trace 是否存在于 badTraceSet 中,如果已存在,结束
    • 场景 5:front1 主动上报 traceA 后,front2 未上报,且 front2 的已就绪集合没有该 trace,那么等待行数超限后,检查该 trace 是否存在于 badTraceSet 中,如果不存在,排序、上报,结束 注:即便是 front2 中不存在该trace的信息,也需要上报
  • 结果计算
    • 在收集到 front1 跟 front2 数据后,对2个有序集合进行 merge sort 后,计算其 MD5

方案分析

此方案的跑分大致在 25s 左右,成绩不甚理想,总结原因大致可分为以下几种

  • 交互场景较为复杂
  • 需要维护一块缓存区域
    • 如果该缓存区域通过行数来失效过期数据的话,那么需要额外的分支计算来判断过期数据,拖慢整体响应时间
    • 如果通过缓存大小来自动失效过期数据的话,那么大小设置很难平衡,如果太小,则可能会失效一些正常数据,导致最终结果不正确,如果太大,又会导致程序反应变慢,带来一系列不可控的问题

基于上述原因,为了充分利用 2万行的数据特征,引入方案二

方案二 (batch粒度)

因题目中明确表明某个trace的数据出现的位置前后不超过2万上,故每2万行数据可作为一个批次存储,过期数据自动删除

《中间件性能挑战赛--分布式统计和过滤的链路追踪》java 选手分享-LMLPHP

  • 按行读取字符流

    • BufferedReader.readLine()
  • 每2万行数据作为一个batch,并分配唯一的batchId(自增即可),此处涉及大量cpu计算,分为2部分

    • 在每行的 tag 部分寻找error=1http.status_code!=200的数据并将其暂存
    • 将 traceId 相同的 span 数据放入预先开辟好空间的数据结构List<Map<String, List<String>>>中,方便后续 backend 节点拿取数据
    • 注:此处下载数据与处理数据并行执行,交由2个线程处理,一切为了提速
  • 上报 badTraceId

    • 每切割 2万行,统计所有的 badTraceId,与 batchId 一并上报至 backend
    • 因同一个 span 数据可能分布在2个 front 节点中,所以有可能 front1 存在错误数据,front2 却全部正确,2个 front 又不能直接通信,所以此时需要同步至 backend,由 backend 统计全量的 badTraceIds
    • front 收到 backend 的通知后,进行当前批次错误 trace 数据的统计,因当前批次的数据有可能出现在上一个批次跟下一个批次,故一定要等到处理每行数据的线程已经处理完 currentBatchNum+1 个线程后,方能执行操作
  • 通知2个 front 节点发送指定 traceIds 的全量数据

    • backend 主动向2个 front 发送获取指定 traceIds 的全量数据通知
    • front 将 span 数据排好序后上报至 backend
    • backend 执行二路归并排序后,计算整体 span 的 md5 值,反复循环,直至数据流读取完毕 注:因2个 front 节点为2核4g,backend 节点为单核2g,为减少 backend 压力,将部分排序工作下放至 front 节点
  • 计算结果

    • 归并排序,计算最终结果

方案总结

当前方案耗时在20s左右,统计发现字符流的读取耗时15s,其他耗时5s,且监控发现各个缓冲区没有发现饥饿、过剩的情况,所以当前方案的瓶颈还是卡在字符流的读取、以及cpu判断上,所以一套面向字节流处理的方案呼之欲出

  • 跟读BufferedReader源码,发现其将字节流转换字符流做了大量的工作,也是耗时的源头,故需要将当前方案改造为面向字节的操作

方案三 (面向字节)

大层面的设计思想与方案二一致,不过面向字节处理的话,从读取流、截断行、判断是否为bad trace、数据组装等均需为字节操作

  • 好处:预分配内存,面向字节,程序性能提高
  • 弊端:编码复杂,需自定义数据协议

《中间件性能挑战赛--分布式统计和过滤的链路追踪》java 选手分享-LMLPHP

  • 读取字节流

    • 程序在初始化时,预先分配10个字节数据 byte[],每个数组存放10M数据
    • io 与 cpu 分离,将读取数据任务交个某个独立线程,核心线程处理cpu
  • 数据处理

    • 用固定内存结构 int[20000] 替换之前动态分配内存的数据结构体 List<Map<String, List<String>>>,只记录每行开始的 position
    • 同时将 bad trace id 存入预先分配好的数组中
  • 上报 badTraceId

    • 同方案二
  • 通知2个 front 节点发送指定 traceIds 的全量数据

    • backend 主动向2个 front 发送获取指定 traceIds 的全量数据通知
    • 因在步骤二时,并没有针对 trace 进行数据聚合,所以在搜集数据时,需要遍历int[20000],将符合要求的 trace 数据放入自定义规范的byte[] 注:刚开始设计的(快排+归并排序)的方案效果不明显,且线上的评测环境的2个 front 节点压力很大,再考虑到某个 trace 对应的 span 数据只有几十条,故此处将所有的排序操作都下放给 backend 节点,从而减轻 front 压力
    • 因 span 为变长数据,故自定义规范byte[]存储数据的设计如下
      • 预先分配10Mbyte[],来存储一个批次中的所有 bad trace 对应 span 数据
      • 用2个 byte 存放下一个 span 数据的长度
      • 存储 span 数据
      • 最后返回byte[]及有效长度
  • 计算结果

    • 排序,计算最终结果

线程并发

《中间件性能挑战赛--分布式统计和过滤的链路追踪》java 选手分享-LMLPHP

  • A-Thread: slot 粒度,读取 http stream 线程
  • B-Thread: block 粒度,处理 slot 中的 block,将 block 数据按行切割、抓取 bad trace id 等
  • C-Thread: block 粒度,响应 backend 拉取数据的请求
阻塞场景
  • A-Thread 读取 http stream 中数据后,将其放入下一个缓存区,如果下一个缓冲区还没有被线程C消费,那么A-Thread 将被阻塞
  • B-Thread 处理数据,如果B-Thread下一个要处理的byte[]数据A线程还未下载完毕,那么B-Thread将被阻塞(io阻塞)
  • C-Thread 为拼接 bad trace 的线程,需要 previous、current、next 3个 batch 都 ready后,才能组织数据,当B-Thread还未处理完next batch 数据时,C-Thread将被阻塞
解决思路
  • A-B 同步:10个 slot 分配10个Semaphore,为 A-Thread 与 B-Thread 同步服务,A-Thread 产生数据后,对应 slot 的信号量+1,B-Thread 消费数据之前,需要semaphore.acquire()
  • B-C 同步:通过volatile及纳秒级睡眠Thread.sleep(0, 2)实现高效响应。实际测试,某些场景中,该组合性能超过Semaphore;C-Thread 发现 B-Thread 还未产出 next batch 的数据,那么进入等待状态
  • A-C 同步:同样利用volatile及纳秒级睡眠Thread.sleep(0, 2)

JVM调参

打印gc输出日志时发现,程序会发生3-5次 full gc,导致性能欠佳,分析内存使用场景发现,流式输出的数据模型,在内存中只会存在很短的一段时间便会失效,真正流入老年代的内存是很小的,故应调大新生代占比

java -Dserver.port=$SERVER_PORT -Xms3900m -Xmx3900m -Xmn3500m -jar tailbaseSampling-1.0-SNAPSHOT.jar &

直接分配约 4g 的空间,其中新生代占 3.5g,通过观测 full gc 消失;此举可使评测快2-3s

方案总结

此方案最优成绩跑到了5.7s,性能有了较大提升,总结快的原因如下:

  • 面向字节
  • 内存预分配;避免临时开辟内存
  • 使用轻量级锁
  • 避免程序阻塞或饥饿

奇技淫巧

快速读取字节数组

因java语言设计缘故,凡事读取比 int 小的数据类型,统一转为 int 后操作,试想以下代码

while ((byteNum = input.read(data)) != -1) {
	for (int i = 0; i < byteNum; i++) {
		if (data[i] == 10) {
			count++;
		}
	}
}

大量的字节对比操作,每次对比,均把一个 byte 转换为 4个 byte,效率可想而知

一个典型的提高字节数组对比效率的例子,采用万能的Unsafe,一次性获取8个byte long val = unsafe.getLong(lineByteArr, pos + Unsafe.ARRAY_BYTE_BASE_OFFSET); 然后比较2个 long 值是否相等,提速是成倍增长的,那么怎么用到本次赛题上呢?

span数据是类似这样格式的

193081e285d91b5a|1593760002553450|1e86d0a94dab70d|28b74c9f5e05b2af|508|PromotionCenter|DoGetCommercialStatus|192.168.102.13|http.status_code=200&component=java-web-servlet&span.kind=server&bizErr=4-failGetOrder&http.method=GET

用"|"切割后,倒数第二位是ip,且格式固定为192.168.***.***,如果采用Unsafe,每次读取一个 int 时,势必会落在192.168.中间,有4种可能192.92.12.16.168,故可利用此特性,直接进行 int 判断

int val = unsafe.getInt(data, beginPos + Unsafe.ARRAY_BYTE_BASE_OFFSET);
if (val == 775043377 || val == 825111097 || val == 909192754 || val == 943075630) {
}

此“技巧”提速1-2秒

大循环遍历

提供2种遍历字节数组方式,哪种效率更高

  • 方式1

    byte[] data = new byte[1024 * 1024 * 2];
    int byteNum;
    while ((byteNum = input.read(data)) != -1) {
    	for (int i = 0; i < byteNum; i++) {
    		if (data[i] == 10) {
    			count++;
    		}
    	}
    }
    
  • 方式2

    byte[] data = new byte[1024 * 1024 * 2];
    int byteNum;
    int beginIndex;
    int endIndex;
    int beginPos;
    while ((byteNum = input.read(data)) != -1) {
    	beginIndex = 0;
    	endIndex = byteNum;
    	beginPos = 0;
    	while (beginIndex < endIndex) {
    		int i;
    		for (i = beginPos; i < endIndex; i++) {
    			if (data[i] == 124) {
    				beginPos = i + 1;
    				times++;
    				break;
    			} else {
    				if (data[i] == 10) {
    					count++;
    					beginIndex = i + 1;
    					beginPos = i + 1;
    					break;
    				}
    			}
    		}
    		if (i >= byteNum) {
    			break;
    		}
    	}
    }
    

两种方式达到的效果一样,都是寻找换行符。方式2不同的是,每次找到换行符都 break 掉当前循环,然后从之前位置继续循环。其实这个小点卡了我1个星期,就是将字符流转换为字节流时,性能几乎没有得到提高,换成方式2后,性能至少提高一倍。为什么会呈现这样一种现象,我还没找到相关资料,有知道的同学,还望不吝赐教哈

结束

最终比赛成绩贴上哈

《中间件性能挑战赛--分布式统计和过滤的链路追踪》java 选手分享-LMLPHP

01-05 21:58