FanoutRabbitmqConfig.java
参数解释:
FanoutExchange构造函数的参数:
第一个参数是交换机的名字。
第二个参数表示这个交换机是否是持久的(即重启RabbitMQ后,交换机是否还存在)。
第三个参数表示如果没有队列绑定到这个交换机,交换机是否会被自动删除。
Queue构造函数的参数:
第一个参数是队列的名字。
第二个参数表示这个队列是否是持久的。
整个配置类设置了RabbitMQ的一个Fanout交换机和三个队列,以及这些队列到交换机的绑定。这样,当有消息发送到"fanoutExchange"交换机时,这个消息会被广播到所有绑定到这个交换机的队列上。
package com.example.rabbitmq.fanout;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutRabbitmqConfig {
//@Bean注解告诉Spring这个方法会返回一个对象,该对象应该被注册为Spring应用上下文中的一个Bean。这个方法创建并返回一个新的FanoutExchange实例,它名为"fanoutExchange",是持久的(true),并且不是自动删除的(false`)。
@Bean
public FanoutExchange newFanoutExchange(){
return new FanoutExchange("fanoutExchange",true,false);
}
@Bean
public Queue newQueueX(){
return new Queue("fanoutQueueX",true);
}
@Bean
public Queue newQueueY(){
return new Queue("fanoutQueueY",true);
}
@Bean
public Queue newQueueZ(){
return new Queue("fanoutQueueZ",true);
}
@Bean
public Binding bingQueueX(){
return BindingBuilder.bind(newQueueX()).to(newFanoutExchange());
}
@Bean
public Binding bingQueueY(){
return BindingBuilder.bind(newQueueY()).to(newFanoutExchange());
}
@Bean
public Binding bingQueueZ(){
return BindingBuilder.bind(newQueueZ()).to(newFanoutExchange());
}
}
FanoutProducer.java
rabbitTemplate.convertAndSend(“fanoutExchange”,null,msg);
调用rabbitTemplate的convertAndSend方法来发送消息。这个方法的第一个参数是交换机的名字(在这个例子中是"fanoutExchange"),第二个参数是路由键(在这个例子中为null,因为在fanout模式下,路由键是不需要的),第三个参数是要发送的消息。
package com.example.rabbitmq.fanout;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("fanout")
public class FanoutProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("sendMsg")
public String sendMsg(){
String msg = "fanout模式下,消息是订单已经生成了";
rabbitTemplate.convertAndSend("fanoutExchange",null,msg);
return "ok";
}
}
FanoutConsumer.java
@RabbitHandler:这个注解表明紧随其后的方法是一个消息处理方法,它会被调用来处理接收到的RabbitMQ消息。
@RabbitListener(queues = “fanoutQueueX”):这个注解表明该方法是一个RabbitMQ的监听器,它会监听名为"fanoutQueueX"的队列。当有消息到达这个队列时,Spring会调用这个方法,并将消息作为参数传递。
public void processQueueX(String msg):定义了一个公开的方法processQueueX,它接收一个字符串参数msg(即接收到的消息),并且没有返回值(void)。
方法体内的代码System.out.println(“接收到的queue x中的消息是:”+msg);会打印接收到的消息,前面带有描述性的文本。
同样的逻辑也适用于processQueueY和processQueueZ方法,只是它们分别监听"fanoutQueueY"和"fanoutQueueZ"队列,并在接收到消息时打印相应的信息。
简而言之,这个FanoutConsumer类定义了三个方法,每个方法都监听一个特定的RabbitMQ队列,并在接收到消息时处理它(在这个例子中,处理就是打印消息到控制台)。
package com.example.rabbitmq.fanout;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class FanoutConsumer {
@RabbitHandler
@RabbitListener(queues = "fanoutQueueX")
public void processQueueX(String msg){
System.out.println("接收到的queue x中的消息是:"+msg);
}
@RabbitHandler
@RabbitListener(queues = "fanoutQueueY")
public void processQueueY(String msg){
System.out.println("接收到的queue y中的消息是:"+msg);
}
@RabbitHandler
@RabbitListener(queues = "fanoutQueueZ")
public void processQueueZ(String msg){
System.out.println("接收到的queue z中的消息是:"+msg);
}
}