其实Rocketmq的给第三方的插件已经全了,如果大家有兴趣的话请移步https://github.com/apache/rocketmq-externals。本文主要是结合笔者已有的rmq在spark中的应用经验对rocketmq做简单介绍以及经验总结,当然免不了会将rocketmq和如今特别火爆的kafka做一些对比(Ps:为了方便打字rmq后面会是rocketmq的缩写)。

  首先对rocktmq做一些流行的消息队列对比。

  提到mq不得不提消息队列,对应于数据结构里面的“先进先出”的队列。而rocketmq就是应用于大数据时代拥有高吞吐低延迟特性的分布式消息拥有发布订阅功能的队列系统。这样的分布式消息系统主要提供应用解耦、流量消峰、消息分发等功能。本片不会对安装集群做过多的介绍,安装单机版本rmq的教程移步官方文档http://rocketmq.apache.org/docs/quick-start/。rocktmq是阿里研发主要作用于双十一这样的高峰期实时流数据处理,起初是基于activemq,但是随着对吞吐量的要求逐步提高,阿里的开发者们逐渐把眼光向kafka转移,但是kafka并不具备低延迟和高可靠性。因此阿里决定研究这样一个兼并传统的订阅消息系统的发布订阅场景与高并发零误差低延时的传输系统。

  下面这个表是官网在2016年提供的activemq、kafka以及rocketmq的对比图。或许对比有点落后,或许开发者比较的眼光比较偏向于rockemq但是仅作为参考(比如数据的有序性,kafka因为需要要有序性和高并发获得一个平衡只能保证一个partition下的消息通过offset来保持消费有序(当一个主题只有一个Partition的时候就能保持全局消息有序性),rocketmq是通过主题与消息队列的一对一对应的来确保全局有序性的,实际上这两种都是可以保证全局有序性,前提都是失去消息的多线程消费)。

ActiveMQJava, .NET, C++ etc.Push model, support OpenWire, STOMP, AMQP, MQTT, JMSExclusive Consumer or Exclusive Queues can ensure orderingSupportedNot SupportedSupportedSupportedNot SupportedSupports very fast persistence using JDBC along with a high performance journal,such as levelDB, kahaDBSupportedSupportedSupported, depending on storage,if using kahadb it requires a ZooKeeper serverNot SupportedThe default configuration is low level, user need to optimize the configuration parametersSupported
KafkaJava, Scala etc.Pull model, support TCPEnsure ordering of messages within a partitionNot SupportedSupported, with async producerNot SupportedSupported, you can use Kafka Streams to filter messagesNot SupportedHigh performance file storageSupported offset indicateNot SupportedSupported, requires a ZooKeeper serverNot SupportedKafka uses key-value pairs format for configuration. These values can be supplied either from a file or programmatically.Supported, use terminal command to expose core metrics
RocketMQJava, C++, GoPull model, support TCP, JMS, OpenMessagingEnsure strict ordering of messages,and can scale out gracefullySupportedSupported, with sync mode to avoid message lossSupportedSupported, property filter expressions based on SQL92SupportedHigh performance and low latency file storageSupported timestamp and offset two indicatesNot SupportedSupported, Master-Slave model, without another kitSupportedWork out of box,user only need to pay attention to a few configurationsSupported, rich web and terminal command to expose core metrics

  上表的对比并不是最新的,对比于2016年。如今,拥有众多粉丝的kafka在上千家公司得到应用,社区的活跃性让kafka做了从架构等方面的优化。这里需要提及两点,目前在kafka官网文档没有看到改进说明。一、kafka作为中间件而言,消费模式只有集群消费,广播消费只存在于同一个主题下不同消费组之间,同一个消费组内的不同消费组进程必须且只能消费某个消息主题下的不同partition,这也造成当消费主题过多时,多个消费者在消费状态下会有过多磁盘IO读取文件操作,造成kafka的延时性远远高于rocketmq;但是作为高并发,一个主题分成多个partition会使得kafka的高吞吐能力远远高于其他中间件。二、消息的重新消费。rmq支持通过指定某个时间点或者offset甚至选择特定消费决策(latest或者earliest)来重置offset的两种方式来重新获取消息,而当前了解是kafka只支持后者一种方式。研究rmq如何实现高并发低延迟的机制请移步http://rocketmq.apache.org/rocketmq/how-to-support-more-queues-in-rocketmq/

  组成rmq的各个角色介绍。

  Producer:生产者。类似于邮件系统中发消息的角色。

  ProducerGroup:相同角色的生产者分为一组(考虑到生产者的高效率为了避免不必要的消息初始化,一个组内只允许一个生产者实例)。  

  Consumer:消费者。类似于邮件系统中收消息的角色。

  ConsumerGroup:类似于生产者组,相同角色的消费组分为一个组(在集群模式下,同一个消费组内的消费者均衡的分摊队列中的消息,不同消费组内不同消费者可以同时接受相同的消息,这就实现了加载平衡和高容错的目标)。

  Topic:主题。是生产者和消费这之间传输之前确定好的消息类别。生产者发消息之前需要创建Topic,然后消费者想要获取这个Topic下的消息需要订阅这个主题。一个消费者组可以订阅多个主题,只要这个组内的所有消费者订阅的主题保持一致性。

  Message:消息。就是发送信息的载体,里面包含需要发送的具体信息以及必须要带Topic(可以理解这里的Topic就是邮件的地址,生产者作为发信人需要写对的收件人地址,消费者需要登陆对应收件人的邮箱才能收到生产者发送到这个邮箱地址上的邮件)。

  MessageQueue:消息队列。类似于kafka中的partition,只不过这里的分区是逻辑分区不是partition这样的物理分区,因此如果某个topic下的数据量特别多,可以通过分为不同的消息队列来获得高并发量,生产者可以高并发的发送消息,消费者可以高并发的读取消息,此外需要说明的每个队列管理一个offset,这里的offset准确的定义是某个topic下的指定队列里的位置,通过offset可以定位具体的消息,用来指示消费者从offset开始处理。

  Broker:接受来自Produer的消息,存储消息,提供管道给Consumer获取消息。也会存储元数据信息,包括消费组、消费进程的offset以及主题甚至队列的相关信息(HA架构中Broker可以是M/S模式消除单点故障,甚至是多M/S模式可以提供存储量和吞吐量)。

  NameServer:管理Broker的路由信息。Producer和Cosumer需要拿Topics去NameServer中找到对应的Broker的清单(多NameServer可以消除单点故障)。

  MessageModel:集群消费和广播消费。集群消费就是同一个主题下的所有消费者均衡的分摊消息队列中的消息从而做到负载均衡,广播消费是所有消费者都消费这个队列的全量消息。

  讲完了rocktmq,我们再简单介绍sparkstreaming。

  Spark Streaming是提供高吞吐,拥有容错能力的实时数据量处理的基于Spark Core的扩展。输入数据源可以是Kafka、Flume、HDFS以及TCP套接字,并且拥有许多高级算子比如map、reduce、join和window。输出可以是HDFS、数据库或者实时仪表盘。甚至可以在这些数据量上执行机器学习和图论相关的算法。其实,与其说streaming是实时处理,更确切的描述应该是micro-batch的伪实时流数据处理引擎。

 

  在实时性要求不高的场景,是可以秒级的护理该单位时间内的所有数据。具体的接口文档见https://github.com/apache/rocketmq-externals/blob/master/rocketmq-spark/spark-streaming-rocketmq.md,这里只介绍编写入口函数RocketMqUtils.createMQPullStream时需要重点关注的几个参数。

  forceSpecial:Boolean。默认情况下是false,每个消息队列如果拥有checkpoint就不管我们是否指定offset消费者都会从checkpoint开始消费数据,但是如果设置为true,那么rmq就会从指定的可以获取的offset开始消费,对于没有指定offset的队列默认从最小位移开始消费。

  ConsumerStrategy:ConsumerStrategy。分为earliest、lastest、specificOffset(queueToOffset: ju.Map[MessageQueue, Long])以及specificTime(queueToTime: ju.Map[MessageQueue, String])这四种类型。如果是第一种则是从队列的最小位移开始消费,这时候可能会重复消费之前以及消费过的消息;第二种是从最大位移开始消费也就是会错过消费进程启动前的生产者发的消息;第三种是直接设置指定队列的offset,如果这个offset小于最小位移就直接从该队列的最小位移开始消费,否则直接从指定offset开始消费;第四种就是获取某个时间点转换为时间戳的的offset。对于没有指定offset的队列默认从最小位移开始消费。

  autoCommit:Boolean。是否自动提交offset给rmq服务器。true的情况是一旦接受到就自动提交offset;false的情况是异步提交,消息处理并callback后才会提交offset。

  failOnDataLoss:Boolean。当查询的数据丢失(比如topic被删除或者offset超出范围)是否报异常退出程序还是仅仅日志警告输出;这里如果对数据的丢失特别严格建议设置为true,否则丢了消息也只是日志warn而已。

   这里就对遇到的坑位做一些总结:

  1、找不到Topic。要么是打包少了fastjson这个依赖,要么是nameserver地址写错了或者topic写错了。

  2、数据丢失。以下有两种数据丢失场景。

  第一种情况。生产者发了几条消息给rmq,但是此时消费者的进程还没有启动,启动消费者无法从rmq种获取那几条消息,初始化时日志warn显示"MessageQueue $mq's checkpointOffset $checkpointOffset is smaller than minOffset $minOffset"。如果参数设置forceSpecial=true,则会导致每次消费者重新启动不会按照他上次消费的的checkpointoffset开始消费而是按照指定offset来消费或者直接从最小位移开始消费。但是从代码的角度看这个参数的优先级是低于ConsumerStrategy(下面源码种第三段黄色背景标记显示是在选择消费决策为SpecificOffsetStrategy后才会用到参数forceSpecial)。而我此时的ConsumerStrategy=lastest,这样就会让消费者从他能获取的最近的几条消息的maxoffset开始消费(参照第一段黄色标记部分),明显这些消息都会被略过,而后面我们取ConsumerStrategy=earliest(作用于第二段黄色标记部分),这些之前发的消息全都成功接收成功(前提是消息队列里面还存储着这些消息)。

private def computePullFromWhere(mq: MessageQueue): Long = {
    var result = -1L
    val offsetStore = kc.getOffsetStore
    val minOffset = kc.minOffset(mq)
    val checkpointOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE)

    consumerStrategy match {
      case LatestStrategy => {
        if (checkpointOffset >= 0) {
          //consider the checkpoint offset first
          if (checkpointOffset < minOffset) {
            reportDataLoss(s"MessageQueue $mq's checkpointOffset $checkpointOffset is smaller than minOffset $minOffset")
            result = kc.maxOffset(mq)
          } else {
            result = checkpointOffset
          }
        } else {
          // First start,no offset
          if (mq.getTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
            result = 0
          } else {
            result = kc.maxOffset(mq)
          }
        }
      }
      case EarliestStrategy => {
        if (checkpointOffset >= 0) {
          //consider the checkpoint offset first
          if (checkpointOffset < minOffset) {
            reportDataLoss(s"MessageQueue $mq's checkpointOffset $checkpointOffset is smaller than minOffset $minOffset")
            result = minOffset
          } else {
            result = checkpointOffset
          }
        } else {
          // First start,no offset
          result = minOffset
        }
      }
      case SpecificOffsetStrategy(queueToOffset) => {

        val specificOffset = queueToOffset.get(mq)

        if (checkpointOffset >= 0 && !forceSpecial) {
          if (checkpointOffset < minOffset) {
            reportDataLoss(s"MessageQueue $mq's checkpointOffset $checkpointOffset is smaller than minOffset $minOffset")
            result = minOffset
          } else {
            result = checkpointOffset
          }
        } else {
          specificOffset match {
            case Some(ConsumerStrategy.LATEST) => {
              result = kc.maxOffset(mq)
            }
            case Some(ConsumerStrategy.EARLIEST) => {
              result = kc.minOffset(mq)
            }
            case Some(offset) => {
              if (offset < minOffset) {
                reportDataLoss(s"MessageQueue $mq's specific offset $offset is smaller than minOffset $minOffset")
                result = minOffset
              } else {
                result = offset
              }
            }
            case None => {
              if (checkpointOffset >= 0) {
                //consider the checkpoint offset first
                if (checkpointOffset < minOffset) {
                  reportDataLoss(s"MessageQueue $mq's checkpointOffset $checkpointOffset is smaller than minOffset $minOffset")
                  result = minOffset
                } else {
                  result = checkpointOffset
                }
              } else {
                logWarning(s"MessageQueue $mq's specific offset and checkpointOffset are none, then use the minOffset")
                result = kc.minOffset(mq)
              }
            }
          }
        }
      }
    }
    result
  }

  第二种情况是考虑这样一个场景。生产者P1每天实时发某个主题T的消息给执行streaming任务的消费者C,主题T里面的消息由两个key组成,一个是key1,一个key2,这两个key分别代表消息的两个不同的内容。消费者进程C会分别对这两个Key做不同的处理然后将其分别转化为RDD来做后面的计算,最后分别对计算后的内容包装进成key1和key2的value,并以主题T2的形式发送回rmq让另外一个消费者CC来消费。开发者考虑到key1和key2的计算资源充分利用,就将原来的消费者进程C拆成两个消费者C1和C2来分别处理key1和key2的内容。这个时候,C2进程突然因为其他因素挂了,但是C1进程还在正常消费来自P1的消息,这就意味着C1会正常提交offset给P1,然后继续接受来自P1的消息,但是实际上C1只会处理来自key1的内容,所以CC只能收到来自C1的消息,迟迟等不到C2的消息。最后导致就算重启了C2也接受不到之前的消息(有一种想法是进程C1和C2同属于一个消费组所以C1接收到了消息马上提交offset给P1,这样就算重启C2由于C2的checkpoint早就被C1更改,除非重置offset。这件事情发生在第一个情况即没有将消费决策改为earliest之前所以不好判断是否两个线程的同一个消费组的消费者是否会公用一个offset)。最后的解决方案是将C1和C2分成两个消费者组来处理消息,当时认为这样就相当于两个队列就不会互相干扰对方的offset。所以我在想这里的解决方案固然是分成两个队列处理最好,但是是否给者两个key分为不同的tag处理是否也会生成两个队列?如果不同的tag里面的数据会输入到不同的队列,那么接收消息的时候对于C1和C2也不会收到对方的tag下的消息,比如C1只会接受tag1下的消息,并提交tag1对应队列Q1的offset情况;C2只会接受tag2的消息,并提交tag2的对应队列Q2的offset。后面我会尝试这种方法,请大家有空也可以试验一下,毕竟实践是检验真理的标准。

  针对上次第二种情况的丢数据问题,设计这样一个实验:

  •   设置对照组1,也就是两个不同消费组中消费者C1和C2分别先后接受来自同一个Topic的三个消息(调用了三次send方法),其中C1先消费完生产者P1的消息,然后启动消费进程C2;
  •   设置实验组2,同一个消费组不同消费者进程C1和C2分别先后接受来自同一个Topic的相同Tag的三个消息,消费次序同上;
  •   设置实验组3,同一个消费组不同消费者进程C1和C2分别同时接受来自同一个Topic下的tag1的两个消息和tag2的三个消息;
  •   设置实验组4,同一个消费组不同消费者进程C1和C2分别先后接受来自同一个Topic下的tag1的两个消息和tag2的三个消息。

  实验结果:

  1.   对照组1中的C1和C2分别能成功处理消息(其中C1接收的消息出现两次重复而C2收到的消息有22次),并且接受的三个消息分别来自三个不同的队列queueId=0、1、2,拥有不同的min_offset、max_offset以及queueOffset和commitLogOffset。其中队列0的min_offset=2151、max_offset=2197、queueOffset=2196、commitLogOffset=247968272601、storeSize=884,队列1的min_offset=2148、max_offset=2190、queueOffset=2189、commitLogOffset=247968273485、storeSize=894,队列2的min_offset=2124、max_offset=2163、queueOffset=2162、commitLogOffset=247968274379、storeSize=901;
  2.   对照组1中的C1和C2分别能成功处理消息(其中C1收到3条消息,C2无消息接收),这里确实会存在同一个组内相同Tag中的消费者C1和C2存在先后接受消息的时差,导致C1先消费P1的3条消息提交完所有的offset,最后C2从队列里获取的commitLogOffset是C1消费完以后的offset,此时无新的消息发过来则无法收到新的消息,后面的进一步实验证明了如果C1和C2同时在消费P1的数据则能同时获取3个消息且commitLogOffset相同;
  3.   对照组1中的C1和C2分别能成功处理消息(C1只能接收到tag1的消息,C2只能接受tag2的消息),且这5个消息的commitLogOffset是连续的;
  4.   显而易见C1和C2分别能接受对应的不同的Tag下的消息,虽然消息是同时发送的,且commitLogOffset是唯一的且一直递增的。

  结果分析:

  分析结果1可知。每个send方法就是一个生产线程会产生三个消息队列,并且者三个队列是独立的拥有各自的min_offset、max_offset以及queueOffset,但是commitLogOffset是由生产者统一存储在commitlog文件中的,所有的消费者都需要从commitlog文件中根据上次消费的消息的commitLogOffset+storeSize的得到本次消息的commitLogOffset从而开始消费数据(如结果1中队列0的commitLogOffset=247968272601以及storeSize=884,两者相加等于队列1的commitLogOffset从而开始消费队列1的数据以此类推得到队列2的数据)。两个不同组下的消费者消费消息是独立的;

  分析结果2可知。同一个组中的不同消费者先后获取同一个Tag的相同数据如果被某个消费者消费了一次那么其他消费者只能在该消费者消费的基础上获取下一个数据;

  分析结果3和4可知。但是不同Tag下的消费者消费消息是独立的;

  总的来说目前不知道是rmq集群不稳定还是如何,streaming任务几乎每隔几天就会报错连接不上rmq的nameserver或者连接不上broker。还有一个问题是由于我们的streaming任务每天会初始化数据持久化到内存种,然后每次这个时候都会warn找不到metadata,这个原因也可以研究一下,不知道是否跟unpersist方法有关。

12-13 18:34