SpringBoot 整合RabbitMQ
常用的交换机
DirectExchange
直连型交换机,根据消息携带的路由键,将消息转发给对应的队列
FanoutExchange
扇形交换机,接收到消息后会将消息转发到所有队列
TopicExchange
主题交换机,根据消息携带的路由键和交换机与队列绑定键的规则,将消息转发给对应的队列
规则:
*(星号):表示一个字符必须出现
#(井号):表示任意数量的字符
准备
两个Spring Boot 项目:
rabbitmq-provider(生产者)
rabbitmq-comsumer(消费者)
版本号:2.1.7.RELEASE
依赖:
<!--rabbitmq--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
application.yml: server: port: 9000 spring: application: name: rabbitmq-provider rabbitmq: host: 192.168.1.45 port: 5672 username: admin password: admin #virtual-host: # ------- 消息确认配置项 -------- # 确认消息已发送到交换机 #publisher-confirms: true # 确认消息已发送队列 #publisher-returns: true
DirectExchange
rabbitmq-provider
1、创建DirectRabbitConfig.java
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class DirectRabbitConfig { /** * 交换机 */ @Bean public DirectExchange myDirectExchange() { // 参数意义: // name: 名称 // durable: true // autoDelete: 自动删除 return new DirectExchange("myDirectExchange", true, false); } /** * 队列 */ @Bean public Queue myDirectQueue() { return new Queue("myDirectQueue", true); } /** * 绑定 */ @Bean public Binding bindingDirect() { return BindingBuilder.bind(myDirectQueue()) .to(myDirectExchange()) .with("my.direct.routing"); } }
2、控制器提供一个发送消息的方法
@Autowired private RabbitTemplate rabbitTemplate; @RequestMapping("/send") public String send() { String msg = "hello"; rabbitTemplate.convertAndSend("myDirectExchange", "my.direct.routing", msg); return "success"; }
3、启动项目,使用PostMan调用发送消息接口,观察RabbitMQ控制面板的队列和控制台输出
rabbitmq-consumer
1、创建消息处理者DirectReceiver.java
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component //@RabbitListener(queues = "myDirectQueue") public class DirectReceiver{ @RabbitHandler @RabbitListener(queues = "myDirectQueue") public void process(String msg) { System.out.println(msg); } }
2、启动项目,观察控制台输出
监听的队列,在项目启动之前应该存在,否则会报错
解决方法:先启动生产者发送一次消息,这时RabbitMQ会自动创建队列,再启动消费者接收消息
FanoutExchange
创建多个队列绑定到扇形交换机,生产者发送一次消息,可以观察到多个处理者都收到了消息。
rabbitmq-provider
1、创建FanoutRabbitConfig.java
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class FanoutRabbitConfig { // ----- 交换机 ----- @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange", true, false); } // ----- 队列 ----- @Bean public Queue fanoutQueueA() { return new Queue("fanoutQueueA", true); } @Bean public Queue fanoutQueueB() { return new Queue("fanoutQueueB", true); } @Bean public Queue fanoutQueueC() { return new Queue("fanoutQueueC", true); } // ----- 绑定 ----- @Bean public Binding bindingFanoutA() { return BindingBuilder.bind(fanoutQueueA()).to(fanoutExchange()); } @Bean public Binding bindingFanoutB() { return BindingBuilder.bind(fanoutQueueB()).to(fanoutExchange()); } @Bean public Binding bindingFanoutC() { return BindingBuilder.bind(fanoutQueueC()).to(fanoutExchange()); } } 2、控制器提供一个发送消息的方法 @Autowired private RabbitTemplate rabbitTemplate; @RequestMapping("/sendByFanout") public String sendByFanout() { String msg = "hello fanout"; rabbitTemplate.convertAndSend("fanoutExchange", null, msg); return "success"; } 3、启动项目,发送消息 rabbitmq-comsumer 1、创建消息处理者 package com.rabbitmq.demo.mq; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class DirectReceiver{ @RabbitHandler @RabbitListener(queues = "fanoutQueueA") public void processA(String msg) { System.out.println("fanoutQueueA " + msg); } @RabbitHandler @RabbitListener(queues = "fanoutQueueB") public void processB(String msg) { System.out.println("fanoutQueueB " + msg); } @RabbitHandler @RabbitListener(queues = "fanoutQueueC") public void processC(String msg) { System.out.println("fanoutQueueC " + msg); } } 2、启动项目,观察控制台输出 TopicExchange 创建两个队列,并使用通配符绑定到主题交换机 rabbitmq-provider 1、创建TopicRabbitConfig.java import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class TopicRabbitConfig { // 交换机 @Bean public TopicExchange myTopicExchange() { return new TopicExchange("myTopicExchange", true, false); } // ----- 队列 ----- @Bean public Queue myTopicQueue_01() { return new Queue("myTopicQueue_01", true); } @Bean Queue myTopicQueue_02() { return new Queue("myTopicQueue_02", true); } /** * 绑定路由键为topic.01 */ @Bean public Binding binding_01() { return BindingBuilder.bind(myTopicQueue_01()).to(myTopicExchange()).with("topic.01"); } /** * 绑定路由键为topic.#规则 */ @Bean public Binding binding_02() { return BindingBuilder.bind(myTopicQueue_02()).to(myTopicExchange()).with("topic.#"); } } 2、控制器提供一个发送消息的方法 @Autowired private RabbitTemplate rabbitTemplate; @RequestMapping("/sendByTopic") public String sendByTopic() { String msg = "hello topic"; rabbitTemplate.convertAndSend("myTopicExchange", "topic.01", msg + " topic.01"); rabbitTemplate.convertAndSend("myTopicExchange", "topic.xxx", msg + " topic.xxx"); return "success"; } 3、启动项目,发送消息 rabbitmq-consumer 1、创建消息处理者 import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class DirectReceiver{ @RabbitHandler @RabbitListener(queues = "myTopicQueue_01") public void process_01(String msg) { System.out.println("myTopicQueue_01 " + msg); } @RabbitHandler @RabbitListener(queues = "myTopicQueue_02") public void process_02(String msg) { System.out.println("myTopicQueue_02 " + msg); } }
2、启动项目,观察控制台输出
消息确认
1、application.yml文件开启rabbitmq消息确认
#确认消息已发送到交换机
publisher-confirms: true
#确认消息已发送到队列
publisher-returns: true
高版本报错可能得使用: publisher-confirm-type: correlated
2、生产者配置回调函数
import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { @Bean public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); // 开启Mandatory, 才能触发回调函数,无论消息推送结果如何都强制调用回调函数 rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("ConfirmCallback: "+"相关数据:" + correlationData); System.out.println("ConfirmCallback: "+"确认情况:" + ack); System.out.println("ConfirmCallback: "+"原因:" + cause); } }); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("ReturnCallback: "+"消息:" + message); System.out.println("ReturnCallback: "+"回应码:" + replyCode); System.out.println("ReturnCallback: "+"回应信息:" + replyText); System.out.println("ReturnCallback: "+"交换机:" + exchange); System.out.println("ReturnCallback: "+"路由键:" + routingKey); } }); return rabbitTemplate; } }
3、消费者配置监听
import com.rabbitmq.demo.mq.MyAckReceiver; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class MessageListenerConfig { @Autowired private CachingConnectionFactory cachingConnectionFactory; @Autowired private MyAckReceiver myAckReceiver; @Bean public SimpleMessageListenerContainer simpleMessageListenerContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory); // 监听队列名 container.setQueueNames("myDirectQueue"); // 当前消费者数量 container.setConcurrentConsumers(1); // 最大消费者数量 container.setMaxConcurrentConsumers(1); // 手动确认 container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 设置监听器 container.setMessageListener(myAckReceiver); return container; } } 4、消费者创建监听器 import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.stereotype.Component; @Component public class MyAckReceiver implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { // 消息的唯一性ID long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { String msg = message.toString(); System.out.println("消息: " + msg); System.out.println("消息来自: "+message.getMessageProperties().getConsumerQueue()); // 手动确认 channel.basicAck(deliveryTag, true); } catch (Exception e) { // 拒绝策略 channel.basicReject(deliveryTag, false); e.printStackTrace(); } } }
5、启动生产者发送消息
6、启动消费者
7、观察生产者和消费者控制台输出