一、序言

在上一节 RabbitMQ中的核心概念和交换机类型 中我们介绍了RabbitMQ中的一些核心概念,尤其是各种交换机的类型,接下来我们将具体讲解各种交换机的配置和消息订阅实操。


二、配置文件application.yml

我们先上应用启动配置文件application.yml,如下:

server:
  port: 8080
spring:
  rabbitmq:
    addresses: localhost:5672
    username: admin
    password: admin
    virtual-host: /
    listener:
      type: simple
      simple:
        acknowledge-mode: auto
        concurrency: 5
        max-concurrency: 20
        prefetch: 5

三、RabbitMQ交换机和队列配置

Spring官方提供了一套 流式API 来定义队列交换机绑定关系,非常的方便,接下来我们定义4种类型的交换机和相应队列的绑定关系。

1、定义4个队列

/**
 * 定义4个队列
 */
@Configuration
protected static class QueueConfig {

	@Bean
	public Queue queue1() {
		return QueueBuilder.durable("queue-1").build();
	}

	@Bean
	public Queue queue2() {
		return QueueBuilder.durable("queue-2").build();
	}

	@Bean
	public Queue queue3() {
		return QueueBuilder.durable("queue-3").build();
	}

	@Bean
	public Queue queue4() {
		return QueueBuilder.durable("queue-4").build();
	}
}

2、定义Fanout交换机和队列绑定关系

/**
 * 定义Fanout交换机和对应的绑定关系
 */
@Configuration
protected static class FanoutExchangeBindingConfig {

	@Bean
	public FanoutExchange fanoutExchange() {
		return ExchangeBuilder.fanoutExchange("fanout-exchange").build();
	}

	/**
	 * 定义多个Fanout交换机和队列的绑定关系
	 * @param fanoutExchange
	 * @param queue1
	 * @param queue2
	 * @param queue3
	 * @param queue4
	 * @return
	 */
	@Bean
	public Declarables bindQueueToFanoutExchange(FanoutExchange fanoutExchange, Queue queue1, Queue queue2, Queue queue3, Queue queue4) {
		Binding queue1Binding = BindingBuilder.bind(queue1).to(fanoutExchange);
		Binding queue2Binding = BindingBuilder.bind(queue2).to(fanoutExchange);
		Binding queue3Binding = BindingBuilder.bind(queue3).to(fanoutExchange);
		Binding queue4Binding = BindingBuilder.bind(queue4).to(fanoutExchange);
		return new Declarables(queue1Binding, queue2Binding, queue3Binding, queue4Binding);
	}

}

2、定义Direct交换机和队列绑定关系

@Configuration
protected static class DirectExchangeBindingConfig {

	@Bean
	public DirectExchange directExchange() {
		return ExchangeBuilder.directExchange("direct-exchange").build();
	}

	@Bean
	public Binding bindingQueue3ToDirectExchange(DirectExchange directExchange, Queue queue3) {
		return BindingBuilder.bind(queue3).to(directExchange).with("queue3-route-key");
	}
}

3、定义Topic交换机和队列绑定关系

@Configuration
protected static class TopicExchangeBindingConfig {

	@Bean
	public TopicExchange topicExchange() {
		return ExchangeBuilder.topicExchange("topic-exchange").build();
	}

	@Bean
	public Declarables bindQueueToTopicExchange(TopicExchange topicExchange, Queue queue1, Queue queue2) {
		Binding queue1Binding = BindingBuilder.bind(queue1).to(topicExchange).with("com.order.*");
		Binding queue2Binding = BindingBuilder.bind(queue2).to(topicExchange).with("com.#");
		return new Declarables(queue1Binding, queue2Binding);
	}
}

这里我们定义了名为topic-exchange类型的交换机,该类型交换机支持路由key通配符匹配,*代表一个任意字符,#代表一个或多个任意字符。

4、定义Header交换机和队列绑定关系

@Configuration
protected static class HeaderExchangeBinding {

	@Bean
	public HeadersExchange headersExchange() {
		return ExchangeBuilder.headersExchange("headers-exchange").build();
	}

	@Bean
	public Binding bindQueueToHeadersExchange(HeadersExchange headersExchange, Queue queue4) {
		return BindingBuilder.bind(queue4).to(headersExchange).where("function").matches("logging");
	}
}

四、RabbitMQ消费者配置

Spring RabbitMQ中支持注解式监听端点配置,用于异步接收消息,如下:

@Slf4j
@Component
public class RabbitMqConsumer {

	@RabbitListener(queues = "queue-1")
	public void handleMsgFromQueue1(String msg) {
		log.info("Message received from queue-1, message body: {}", msg);
	}

	@RabbitListener(queues = "queue-2")
	public void handleMsgFromQueue2(String msg) {
		log.info("Message received from queue-2, message body: {}", msg);
	}

	@RabbitListener(queues = "queue-3")
	public void handleMsgFromQueue3(String msg) {
		log.info("Message received from queue-3, message body: {}", msg);
	}

	@RabbitListener(queues = "queue-4")
	public void handleMsgFromQueue4(String msg) {
		log.info("Message received from queue-4, message body: {}", msg);
	}
}

五、RabbitMQ生产者

@Slf4j
@Component
@RequiredArgsConstructor
public class RabbitMqProducer {

	private final RabbitTemplate rabbitTemplate;

	public void sendMsgToFanoutExchange(String body) {
		log.info("开始发送消息到fanout-exchange, 消息体:{}", body);

		Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).build();
		rabbitTemplate.send("fanout-exchange", StringUtils.EMPTY, message);
	}

	public void sendMsgToDirectExchange(String body) {
		log.info("开始发送消息到direct-exchange, 消息体:{}", body);

		Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).build();
		rabbitTemplate.send("direct-exchange", "queue3-route-key", message);
	}

	public void sendMsgToTopicExchange(String routingKey, String body) {
		log.info("开始发送消息到topic-exchange, 消息体:{}", body);

		Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).build();
		rabbitTemplate.send("topic-exchange", routingKey, message);
	}

	public void sendMsgToHeadersExchange(String body) {
		log.info("开始发送消息到headers-exchange, 消息体:{}", body);

		MessageProperties messageProperties = MessagePropertiesBuilder.newInstance().setHeader("function", "logging").build();
		Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).andProperties(messageProperties).build();
		rabbitTemplate.send("headers-exchange", StringUtils.EMPTY, message);
	}

}


六、测试用例

这里写了个简单的Controller用来测试,如下:

@RestController
@RequiredArgsConstructor
public class RabbitMsgController {

	private final RabbitMqProducer rabbitMqProducer;

	@RequestMapping("/exchange/fanout")
	public ResponseEntity<String> sendMsgToFanoutExchange(String body) {
		rabbitMqProducer.sendMsgToFanoutExchange(body);
		return ResponseEntity.ok("广播消息发送成功");
	}

	@RequestMapping("/exchange/direct")
	public ResponseEntity<String> sendMsgToDirectExchange(String body) {
		rabbitMqProducer.sendMsgToDirectExchange(body);
		return ResponseEntity.ok("消息发送到Direct交换成功");
	}

	@RequestMapping("/exchange/topic")
	public ResponseEntity<String> sendMsgToTopicExchange(String routingKey, String body) {
		rabbitMqProducer.sendMsgToTopicExchange(routingKey, body);
		return ResponseEntity.ok("消息发送到Topic交换机成功");
	}

	@RequestMapping("/exchange/headers")
	public ResponseEntity<String> sendMsgToHeadersExchange(String body) {
		rabbitMqProducer.sendMsgToHeadersExchange(body);
		return ResponseEntity.ok("消息发送到Headers交换机成功");
	}

}

1、发送到FanoutExchage

直接访问http://localhost:8080/exchange/fanout?body=hello,可以看到该消息广播到了4个队列上。

2023-11-07 17:41:12.959  INFO 39460 --- [nio-8080-exec-9] c.u.r.i.producer.RabbitMqProducer        : 开始发送消息到fanout-exchange, 消息体:hello
2023-11-07 17:41:12.972  INFO 39460 --- [ntContainer#1-5] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-1, message body: hello
2023-11-07 17:41:12.972  INFO 39460 --- [ntContainer#0-4] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-4, message body: hello
2023-11-07 17:41:12.972  INFO 39460 --- [ntContainer#3-3] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-3, message body: hello
2023-11-07 17:41:12.972  INFO 39460 --- [ntContainer#2-4] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-2, message body: hello

2、发送到DirectExchage

访问http://localhost:8080/exchange/direct?body=hello,可以看到消息通过路由keyqueue3-route-key发送到了queue-3上。

2023-11-07 17:43:26.804  INFO 39460 --- [nio-8080-exec-1] c.u.r.i.producer.RabbitMqProducer        : 开始发送消息到direct-exchange, 消息体:hello
2023-11-07 17:43:26.822  INFO 39460 --- [ntContainer#3-5] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-3, message body: hello

3、发送到TopicExchange

访问http://localhost:8080/exchange/topic?body=hello&routingKey=com.order.create,路由key为 com.order.create的消息分别发送到了queue-1queue-2上。

2023-11-07 17:44:45.301  INFO 39460 --- [nio-8080-exec-4] c.u.r.i.producer.RabbitMqProducer        : 开始发送消息到topic-exchange, 消息体:hello
2023-11-07 17:44:45.312  INFO 39460 --- [ntContainer#1-3] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-1, message body: hello
2023-11-07 17:44:45.312  INFO 39460 --- [ntContainer#2-3] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-2, message body: hello

4、发动到HeadersExchage

访问http://localhost:8080/exchange/headers?body=hello,消息通过头部信息function=logging发送到了headers-exchange上。

2023-11-07 17:47:21.736  INFO 39460 --- [nio-8080-exec-9] c.u.r.i.producer.RabbitMqProducer        : 开始发送消息到headers-exchange, 消息体:hello
2023-11-07 17:47:21.749  INFO 39460 --- [ntContainer#0-3] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-4, message body: hello

七、结语

下一节我们将会介绍通过两种方式借由RabbitMQ实现延迟消息发送和订阅,敬请期待。
Spring RabbitMQ那些事(1-交换机配置&amp;消息发送订阅实操)-LMLPHP

11-11 03:35