容器化运行RabbitMQ,可以参考我之前的一篇文章容器化运行RabbitMQ

本篇不讲解RabbitMQ的理论知识,均采用最简配置


那么消息队列,需要一个消息的生产者与消费者

创建一个生产者:引入相关的依赖

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

application中的配置

server:
  port: 8091

spring:
  application:
    name: producer

  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: admin
    password: admin

生产者的产生的消息,首先是发到交换机上的,再由交换机依据路由键转发到指定的队列上。因此,我们需要

  • 创建一个交换机
  • 创建一个队列
  • 将交换机与队列绑定

实现代码如下:

/**
 * @author qcy
 * @create 2020/08/06 15:57:36
 */ @Configuration public class MQConfig { //交换机名称 public static final String DIRECT_EXCHANGE_NAME = "directExchange"; //队列名称 public static final String DIRECT_QUEUE_NAME = "directQueue"; //路由键,带有此键的消息,directExchange会将消息转发到directQueue上 public static final String ROUTING_KEY = "direct"; //构造名称为directExchange的直连交换机 @Bean public DirectExchange directExchange() { //使用最简单的构造函数,指定直连交换机的名称 return new DirectExchange(DIRECT_EXCHANGE_NAME);
    } //构造名称为directQueue的队列 @Bean public Queue queue() { //同样适用最简单的构造函数,指定队列的名称 return new Queue(DIRECT_QUEUE_NAME);
    } @Bean public Binding binding() { return BindingBuilder.bind(queue()).to(directExchange()).with(ROUTING_KEY);
    }
    
}

通过接口产生消息

/**
 * @author qcy
 * @create 2020/08/06 16:07:18
 */ @RestController public class MQController { @Resource RabbitTemplate rabbitTemplate; @PostMapping("/sendMsg") public Map<String, Object> sendMsg(@RequestBody Map<String, Object> map) { //将此消息发送到名称为MQConfig.DIRECT_EXCHANGE_NAME的交换机上,路由键是MQConfig.ROUTING_KEY //因此该消息,最终会被转发到名称为MQConfig.DIRECT_QUEUE_NAME的队列上 rabbitTemplate.convertAndSend(MQConfig.DIRECT_EXCHANGE_NAME, MQConfig.ROUTING_KEY, map); return map;
    }
}

使用Postman测试

在未发消息前,是没有任何队列与消息的:

调用之后,创建了名称为directQueue的队列,并且其中有一条消息等待消费


现在创建一个消费者

还是需要引入刚才的依赖:

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> 

application.yaml和生产者的配置几乎一致

server:
  port: 8092

spring:
  application:
    name: consumer

  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: admin
    password: admin

消费者只需要接收消息,监听队列即可

/**
 * @author qcy
 * @create 2020/08/06 16:41:14
 */ @Component @RabbitListener(queues = "directQueue") public class MQListener { @RabbitHandler public void handle(Map<String, Object> map) {
        map.forEach((k, v) -> System.out.println("(" + k + "," + v + ")"));
    }
}

启动消费者后,可以看到消费了消息:

队列中的消息数量又变为了0

至此,一个简单的集成就做好了。其实这边交换机的类型不止Direct一种,还有很多种。

下一篇,将会演示如何确认消息。确认机制在一定程度上,可以解决消息丢失的问题。