RabbitMQ

前置 基本概念 Publisher:生产者,可以向队列或者交换机发送消息 Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,递交给某个特别队列、递交给所有队列、或是将消息丢弃。取决于Exchange的类型 Fanout:广播,将消息交给所有绑定到交换机的队列 Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列 Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符 Headers:头匹配,基于MQ的消息头匹配 Queue:队列,接收消息、缓存消息 Consumer:消费者,订阅队列并消费消息 安装 1 2 3 4 5 6 7 8 9 10 11 docker run \ -e RABBITMQ_DEFAULT_USER=nebula \ -e RABBITMQ_DEFAULT_PASS=123456 \ -v mq-plugins:/plugins \ --name mq \ --hostname mq \ -p 15672:15672 \ -p 5672:5672 \ --network mq-net\ -d \ rabbitmq 15672:RabbitMQ提供的管理控制台的端口 5672:RabbitMQ的消息发送处理接口 spring使用 pom依赖 1 2 3 4 5 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> yaml配置 1 2 3 4 5 6 7 spring: rabbitmq: host: 127.0.0.1 # 你的虚拟机IP port: 5672 # 端口 virtual-host: /nebula_vh # 虚拟主机 username: nebula # 用户名 password: 123456 # 密码 WorkQueue模型配置 1 2 3 4 5 spring: rabbitmq: listener: simple: prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息 队列收发 publisher 1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; public void testSimpleQueue() { // 队列名称 String queueName = "simple.queue"; // 消息 String message = "hello, spring amqp!"; // 发送消息 rabbitTemplate.convertAndSend(queueName, message); } } consumer 1 2 3 4 5 6 7 8 9 10 11 @Component public class SpringRabbitListener { // 利用RabbitListener来声明要监听的队列信息 // 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。 // 可以看到方法体中接收的就是消息体的内容 @RabbitListener(queues = "simple.queue") public void listenSimpleQueueMessage(String msg) { System.out.println("spring 消费者接收到消息:【" + msg + "】"); } } 交换机收发 Fanout交换机 publisher 1 2 3 4 5 6 7 public void testFanoutExchange() { // 交换机名称 String exchangeName = "nebula.fanout"; // 消息 String message = "hello, everyone!"; rabbitTemplate.convertAndSend(exchangeName, "", message); } consumer 1 2 3 4 5 6 7 8 9 @RabbitListener(queues = "fanout.queue1") public void listenFanoutQueue1(String msg) { System.out.println("消费者1接收到Fanout消息:【" + msg + "】"); } @RabbitListener(queues = "fanout.queue2") public void listenFanoutQueue2(String msg) { System.out.println("消费者2接收到Fanout消息:【" + msg + "】"); } Direct交换机 publisher 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public void testSendDirectExchange() { // 交换机名称 String exchangeName = "nebula.direct"; // 消息 String message = "redMessage"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "red", message); } public void testSendDirectExchange() { // 交换机名称 String exchangeName = "nebula.direct"; // 消息 String message = "blueMessage"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "blue", message); } consumer 1 2 3 4 5 6 7 8 9 @RabbitListener(queues = "direct.queue1") public void listenDirectQueue1(String msg) { System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】"); } @RabbitListener(queues = "direct.queue2") public void listenDirectQueue2(String msg) { System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】"); } Topic交换机 #:匹配一个或多个词 *:匹配一个词 publisher 1 2 3 4 5 6 7 8 public void testSendTopicExchange() { // 交换机名称 String exchangeName = "nebula.topic"; // 消息 String message = "message"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "nebula.news", message); } consumer 1 2 3 4 5 6 7 8 9 @RabbitListener(queues = "topic.queue1") public void listenTopicQueue1(String msg){ System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】"); } @RabbitListener(queues = "topic.queue2") public void listenTopicQueue2(String msg){ System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】"); } 配置类声明交换机 Fanout交换机 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Configuration public class FanoutConfig { //声明交换机 @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("nebula.fanout"); } //声明队列 @Bean public Queue fanoutQueue1() { return new Queue("fanout.queue1"); } //绑定队列和交换机 @Bean public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); } } Direct交换机 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Configuration public class DirectConfig { //声明交换机 @Bean public DirectExchange directExchange() { return ExchangeBuilder.directExchange("nebula.direct").build(); } //声明队列 @Bean public Queue directQueue1() { return new Queue("direct.queue1"); } //绑定队列和交换机 @Bean public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange) { return BindingBuilder.bind(directQueue1).to(directExchange).with("red"); } } Topic交换机 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Configuration public class TopicConfig { //声明交换机 @Bean public TopicExchange topicExchange() { return ExchangeBuilder.topicExchange("nebula.topic").build(); } //声明队列 @Bean public Queue topicQueue1() { return new Queue("topic.queue1"); } //绑定队列和交换机 @Bean public Binding bindingQueue1WithRed(Queue topicQueue1, TopicExchange topicExchange) { return BindingBuilder.bind(topicQueue1).to(topicExchange).with("red.#"); } } 注解声明交换机 Fanout交换机 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "fanout.queue1"), exchange = @Exchange(name = "nebula.fanout", type = ExchangeTypes.FANOUT) )) public void listenFanoutQueue1(String msg){ System.out.println("消费者1接收到fanout.queue1的消息:【" + msg + "】"); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "fanout.queue2"), exchange = @Exchange(name = "nebula.fanout", type = ExchangeTypes.FANOUT) )) public void listenFanoutQueue2(String msg){ System.out.println("消费者2接收到fanout.queue2的消息:【" + msg + "】"); } Direct交换机 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue1"), exchange = @Exchange(name = "nebula.direct", type = ExchangeTypes.DIRECT), key = {"red", "blue"} )) public void listenDirectQueue1(String msg){ System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】"); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "nebula.direct", type = ExchangeTypes.DIRECT), key = {"red", "yellow"} )) public void listenDirectQueue2(String msg){ System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】"); } Topic交换机 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue1"), exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC), key = "china.#" )) public void listenTopicQueue1(String msg){ System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】"); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue2"), exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC), key = "#.news" )) public void listenTopicQueue2(String msg){ System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】"); } 消息转换器 使用jackson转换器 1 2 3 4 <dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-xml</artifactId> </dependency> 添加Bean 1 2 3 4 5 6 7 8 9 //在启动类中添加Bean @Bean public MessageConverter messageConverter(){ // 1.定义消息转换器 Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息 jackson2JsonMessageConverter.setCreateMessageIds(true); return jackson2JsonMessageConverter; }

2024-08-09 · 5 分钟 · Nebula