1. 前言

上一篇文章我们介绍了RocketMQ集群的搭建,这篇文章将主要使用RocketMQ测试下简单消息。

2. 同步消息(生产者)

同步消息的话,消费者发布消息之后必须等集群返回成功之后才会发布下一条消息,消息的发布是同步进行的。

2.1. 测试代码
  1. 创建生产者

    	// 1.创建生产者对象
    	DefaultMQProducer defaultMQProducer = new DefaultMQProducer("feige-producer-group");
    
  2. 指定nameserver

    // 2.指定nameServer
    defaultMQProducer.setNamesrvAddr("172.31.184.89:9876");
    

    因为每个nameserver都有所有broker的路由信息,所以只需要指定一个nameserver。

  3. 启动生产者发布消息

    // 3.启动生产者
    		defaultMQProducer.start();
    		//4.创建消息
    		for (int i = 0; i < 100; i++) {
    			// 创建消息,指定topic,以及消息体
    			Message message = new Message("base_topic", ("飞哥测试消息" + i).getBytes());
    			//5.发送消息
    			SendResult send = defaultMQProducer.send(message);
    			System.out.println(send);
    		}
    		// 6.关闭生产者
    		defaultMQProducer.shutdown();
    

创建一个名为 base_topic的topic,虽然集群中还没有这个topic,但是由于前面我们搭建集群的时候指定的可以自动创建topic autoCreateTopicEnable=true 。 然后消息体是:飞哥测试消息xxx。这里打印了集群的响应结果SendResult。

运行结果(部分结果):

SendResult [sendStatus=SEND_OK, msgId=7F0000018D8014DAD5DC0C03E2250000, offsetMsgId=AC1FB85900002A9F00000000001F70B6, messageQueue=MessageQueue [topic=base_topic, brokerName=broker-b, queueId=0], queueOffset=125]
SendResult [sendStatus=SEND_OK, msgId=7F0000018D8014DAD5DC0C03E2410001, offsetMsgId=AC1FB85900002A9F00000000001F71A1, messageQueue=MessageQueue [topic=base_topic, brokerName=broker-b, queueId=1], queueOffset=125]
SendResult [sendStatus=SEND_OK, msgId=7F0000018D8014DAD5DC0C03E24D0002, offsetMsgId=AC1FB85900002A9F00000000001F728C, messageQueue=MessageQueue [topic=base_topic, brokerName=broker-b, queueId=2], queueOffset=125]
SendResult [sendStatus=SEND_OK, msgId=7F0000018D8014DAD5DC0C03E2570003, offsetMsgId=AC1FB85900002A9F00000000001F7377, messageQueue=MessageQueue [topic=base_topic, brokerName=broker-b, queueId=3], queueOffset=125]
SendResult [sendStatus=SEND_OK, msgId=7F0000018D8014DAD5DC0C03E2600004, offsetMsgId=AC1FB85900002A9F00000000001F7462, messageQueue=MessageQueue [topic=base_topic, brokerName=broker-b, queueId=0], queueOffset=126]
SendResult [sendStatus=SEND_OK, msgId=7F0000018D8014DAD5DC0C03E2830005, offsetMsgId=AC1FB85900002A9F00000000001F754D, messageQueue=MessageQueue [topic=base_topic, brokerName=broker-b, queueId=1], queueOffset=126]
SendResult [sendStatus=SEND_OK, msgId=7F0000018D8014DAD5DC0C03E28C0006, offsetMsgId=AC1FB85900002A9F00000000001F7638, messageQueue=MessageQueue [topic=base_topic, brokerName=broker-b, queueId=2], queueOffset=126]
SendResult [sendStatus=SEND_OK, msgId=7F0000018D8014DAD5DC0C03E2970007, offsetMsgId=AC1FB85900002A9F00000000001F7723, messageQueue=MessageQueue [topic=base_topic, brokerName=broker-b, queueId=3], queueOffset=126]
    

这里SendResult 返回结果有几个属性需要说明下:

  1. sendStatus: 发送状态
  2. msgId:消息ID,每个消息都是唯一的
  3. offsetMsgId:偏移消息ID,在队列里的消息唯一ID
  4. messageQueue:用于指定当前这条消息落到哪个队列中,在搭建集群的时候指定一个broker有4个messageQueue。
  5. topic:当前队列所属的主题
  6. brokerName:当前队列所属的broker
  7. queueId:当前队列在broker中序号
  8. queueOffset:当前消息在队列里的偏移量。

从打印的结果可以看出,目前这100条消息是轮流的发送到broker-b中的4个队列中的。关系如下图所示:

【RocketMQ系列四】消息示例-简单消息的实现-LMLPHP

3. 消费者

  1. 创建消费者&指定nameserver

    	// 1.创建消费者
    		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
    		// 2.指定连接nameServer
    		consumer.setNamesrvAddr("172.31.184.89:9876");
    
  2. 订阅一个或者多个topic,这里指定消费base_topic,不做过滤。

    // 3.订阅一个或者多个topic,这里指定消费base_topic,不做过滤
    consumer.subscribe("base_topic", "*");
    
  3. 创建一个回调函数&处理消息

    		// 4.创建一个回调函数
    	consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
    			// 5.处理消息
    			for (MessageExt msg : msgs) {
    				System.out.println(msg);
    				System.out.println("收到的消息内容:" + new String(msg.getBody()));
    			}
    			// 返回消费成功的对象
    			return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    		});
    

    创建一个回调监听函数,它是一个长轮询,当有消息产生时,它会监听到并进行消费(ps: broker会把消息推送给消费者)。

  4. 启动消费者

    	// 6.启动消费者
    		consumer.start();
    		System.out.println("消费者已经启动");
    

    运行结果(部分截图):

    【RocketMQ系列四】消息示例-简单消息的实现-LMLPHP

4. 异步消息

异步消息与同步消息的区别就是异步消息不需要等待集群返回发送成功的标识,即可发送下一条消息。主要是发送消息阶段有区别。其他的与同步消息相同。

// 异步消息发送失败重试次数
		defaultMQProducer.setRetryTimesWhenSendAsyncFailed(0);

		CountDownLatch2 countDownLatch2 = new CountDownLatch2(100);
		// 4.创建消息
		for (int i = 0; i < 100; i++) {
			// 创建消息,指定topic,以及消息体
			Message message = new Message("base_topic", "TagA", "feige", ("飞哥异步消息测试" + i).getBytes());
			// 5.发送消息
			int index = i;
			defaultMQProducer.send(message, new SendCallback() {
				@Override
				public void onSuccess(SendResult sendResult) {
					countDownLatch2.countDown();
					System.out.printf("%-10d ok,%s,%n", index,sendResult.getMsgId());
				}

				@Override
				public void onException(Throwable e) {
					countDownLatch2.countDown();
					System.out.printf("%-10d fail,%s,%n", index, e);
					e.printStackTrace();
				}
			});

		}
		System.out.println("=====================");
		countDownLatch2.await(10, TimeUnit.SECONDS);

异步消息在调用send方法的时候,需要实现SendCallback 接口。此函数有 onSuccess 方法和onException 方法。onSuccess 方法在消息发送成功的时候会被集群调用,而onException方法则是在消息发送失败的时候被调用。

5. 单向消息

单向消息只管发送不管接收。

//4.创建消息
		for (int i = 0; i < 100; i++) {
			// 创建消息,指定topic,以及消息体
			Message message = new Message("base_topic", ("飞哥同步消息测试:" + i).getBytes());
			//5.发送消息
		    defaultMQProducer.sendOneway(message);
		}

6. 总结

本文详细介绍了简单消息里的同步消息,异步消息和单向消息。他们的区别主要是生产者发布消息时的区别。另外,简单消息的消费是没有顺序的。

10-17 09:21