背景:

      最近在用kafka做消息中间件,producer从hive中读取消息发送到kafka,后端storm对消息分类发送到elasticsearch建立索引。

问题:

      hive表中总共350万数据,当时整个全量索引结束后发现,最后索引条数总共310万左右。storm日志没有任何错误日志。

排查:

      首先排查storm consumer的问题,由于发现storm日志没有任何异常,所以第一步基本排除建索引程序的问题。storm 消费kafka用的官方storm-kafka包,而且已开启ack,所以基本排除storm端的问题。

    现在怀疑kafka里的数据本身只有310万条数据,写了一个程序扔到了kafka集群上探查了一下,印证了自己的想法。果然,数据只有310万条。现在基本判断问题的在kafka producer上。仔细查看了下producer代码

     "acks" 选项表示kafka 的ack级别:acks=0 意味着producer永远不会等待任何一个来自broker的ack,意味着不需要任何确实,发送及以为着成功。acks=1 意味着在leader replica已经接收到数据后,producer会得到一个ack,这个选项对速度与安全性做一个平衡,但是不需要等其他副本确认,如果发生leader挂了,其他副本还没来得及同步,这时就会发生数据丢失的情况。最后一种数据最安全的情况就是acks=al,l意味着在所有的ISR都接收到数据后,producer才得到一个ack。这个选项提供了最好的持久性,只要还有一个replica存活,那么数据就不会丢失,但是相应的吞吐量会受到影响。本着对业务对数据可靠性的要求,我选择了最高的可靠级别,这点没毛病。

    "retries"选项大于0的值将使客户端重新发送任何数据,一旦这些数据发送失败,会间隔一段时间重试,这个值设置的就是重试间隔时间。初步怀疑这个值太小,如果磁盘卡顿,网络中断超过三秒,是否会丢数据。所以将这个参数调大到300。

     重新打包上传到storm集群重新跑了一回,数据还是丢了30多万。场面一度尴尬。。问题陷入了僵局。

转机:

    现在的问题已经超过了我的认知,之前从来没出现过如此严重的丢数据的问题。在网上搜的资料大部分都看过。理论上可靠性可以通过副本解决,没有类似于我这个种问题。心想着如果不行,只能更改broker 从page cache同步到硬盘的频率了。鬼使神差下,我更改了下producer的压缩格式,从snappy改到gzip,这次kafka中的消息,竟然只少了2000。同样的参数,只改了下压缩格式。我又查看下了前两次用snapp格式,kafka里的消息数,发现了一个问题,两次用snappy的时候,kafka消息数竟然一模一样。如果不是玄学的问题,理论上如果丢消息,350万条,丢相同条数的信息概率简直太小了。

  现在问题似乎已经很清晰了,gzip压缩率要比snappy高,snappy优势在于压缩速度。压缩率高意味着单条数据要小。现在基本问题定位在单条数据大小的问题。但是为什么producer端没有异常日志呢。查看一下producer发送消息的源码:“Future send(ProducerRecord var1)” producer 发送消息后会发挥一个future,这种模式是异步发送方式,当broker返回异常信息时并不会抛出。,producer.send(producerRecord).get(),加上get(),将异步改同步,打包运行果然发送到30万条左右数据时就已经抛出异常

解决:

  至此问题已经定位到,下一步解决问题,搜了下stackoverflow,参考下最高票回答:

    已完美解决问题。



 

原文参考:https://www.jianshu.com/p/ec93eb4f7733

10-05 14:19