前置

基本概念

  • Publisher:生产者,可以向队列或者交换机发送消息
  • Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,递交给某个特别队列、递交给所有队列、或是将消息丢弃。取决于Exchange的类型
    • Fanout:广播,将消息交给所有绑定到交换机的队列
    • Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
    • Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
    • Headers:头匹配,基于MQ的消息头匹配
  • Queue:队列,接收消息、缓存消息
  • Consumer:消费者,订阅队列并消费消息

安装

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
docker run \
 -e RABBITMQ_DEFAULT_USER=nebula \
 -e RABBITMQ_DEFAULT_PASS=123456 \
 -v mq-plugins:/plugins \
 --name mq \
 --hostname mq \
 -p 15672:15672 \
 -p 5672:5672 \
 --network mq-net\
 -d \
rabbitmq
  • 15672:RabbitMQ提供的管理控制台的端口
  • 5672:RabbitMQ的消息发送处理接口

spring使用

pom依赖

1
2
3
4
5

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

yaml配置

1
2
3
4
5
6
7
spring:
  rabbitmq:
    host: 127.0.0.1 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: /nebula_vh # 虚拟主机
    username: nebula # 用户名
    password: 123456 # 密码

WorkQueue模型配置

1
2
3
4
5
spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

队列收发

publisher

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
public class SpringAmqpTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void testSimpleQueue() {
        // 队列名称
        String queueName = "simple.queue";
        // 消息
        String message = "hello, spring amqp!";
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, message);
    }
}

consumer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11

@Component
public class SpringRabbitListener {
    // 利用RabbitListener来声明要监听的队列信息
    // 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。
    // 可以看到方法体中接收的就是消息体的内容
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) {
        System.out.println("spring 消费者接收到消息:【" + msg + "】");
    }
}

交换机收发

Fanout交换机

publisher

1
2
3
4
5
6
7
public void testFanoutExchange() {
    // 交换机名称
    String exchangeName = "nebula.fanout";
    // 消息
    String message = "hello, everyone!";
    rabbitTemplate.convertAndSend(exchangeName, "", message);
}

consumer

1
2
3
4
5
6
7
8
9
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
    System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}

@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
    System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}

Direct交换机

publisher

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
public void testSendDirectExchange() {
    // 交换机名称
    String exchangeName = "nebula.direct";
    // 消息
    String message = "redMessage";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "red", message);
}

public void testSendDirectExchange() {
  // 交换机名称
  String exchangeName = "nebula.direct";
  // 消息
  String message = "blueMessage";
  // 发送消息
  rabbitTemplate.convertAndSend(exchangeName, "blue", message);
}

consumer

1
2
3
4
5
6
7
8
9
@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) {
    System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) {
    System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}

Topic交换机

  • #:匹配一个或多个词
  • *:匹配一个词

publisher

1
2
3
4
5
6
7
8
public void testSendTopicExchange() {
    // 交换机名称
    String exchangeName = "nebula.topic";
    // 消息
    String message = "message";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "nebula.news", message);
}

consumer

1
2
3
4
5
6
7
8
9
@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg){
  System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}

@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg){
  System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}

配置类声明交换机

Fanout交换机

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
@Configuration
public class FanoutConfig {
    //声明交换机
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("nebula.fanout");
    }

    //声明队列
    @Bean
    public Queue fanoutQueue1() {
        return new Queue("fanout.queue1");
    }

    //绑定队列和交换机
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }
}

Direct交换机

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
@Configuration
public class DirectConfig {

    //声明交换机
    @Bean
    public DirectExchange directExchange() {
        return ExchangeBuilder.directExchange("nebula.direct").build();
    }

    //声明队列
    @Bean
    public Queue directQueue1() {
        return new Queue("direct.queue1");
    }

    //绑定队列和交换机
    @Bean
    public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
    }
}

Topic交换机

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
@Configuration
public class TopicConfig {

  //声明交换机
  @Bean
  public TopicExchange topicExchange() {
    return ExchangeBuilder.topicExchange("nebula.topic").build();
  }

  //声明队列
  @Bean
  public Queue topicQueue1() {
    return new Queue("topic.queue1");
  }

  //绑定队列和交换机
  @Bean
  public Binding bindingQueue1WithRed(Queue topicQueue1, TopicExchange topicExchange) {
    return BindingBuilder.bind(topicQueue1).to(topicExchange).with("red.#");
  }
}

注解声明交换机

Fanout交换机

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "fanout.queue1"),
    exchange = @Exchange(name = "nebula.fanout", type = ExchangeTypes.FANOUT)
))
public void listenFanoutQueue1(String msg){
    System.out.println("消费者1接收到fanout.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "fanout.queue2"),
    exchange = @Exchange(name = "nebula.fanout", type = ExchangeTypes.FANOUT)
))
public void listenFanoutQueue2(String msg){
    System.out.println("消费者2接收到fanout.queue2的消息:【" + msg + "】");
}

Direct交换机

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue1"),
    exchange = @Exchange(name = "nebula.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
    System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue2"),
    exchange = @Exchange(name = "nebula.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
    System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}

Topic交换机

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue1"),
    exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),
    key = "china.#"
))
public void listenTopicQueue1(String msg){
    System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue2"),
    exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),
    key = "#.news"
))
public void listenTopicQueue2(String msg){
    System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}

消息转换器

使用jackson转换器

1
2
3
4
<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
</dependency>

添加Bean

1
2
3
4
5
6
7
8
9
//在启动类中添加Bean
@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jackson2JsonMessageConverter.setCreateMessageIds(true);
    return jackson2JsonMessageConverter;
}