容器化运行RabbitMQ,可以参考我之前的一篇文章容器化运行RabbitMQ
本篇不讲解RabbitMQ的理论知识,均采用最简配置
那么消息队列,需要一个消息的生产者与消费者
创建一个生产者:引入相关的依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
application中的配置
server: port: 8091 spring: application: name: producer rabbitmq: host: 127.0.0.1 port: 5672 username: admin password: admin
生产者的产生的消息,首先是发到交换机上的,再由交换机依据路由键转发到指定的队列上。因此,我们需要
- 创建一个交换机
- 创建一个队列
- 将交换机与队列绑定
实现代码如下:
/** * @author qcy * @create 2020/08/06 15:57:36 */ @Configuration public class MQConfig { //交换机名称 public static final String DIRECT_EXCHANGE_NAME = "directExchange"; //队列名称 public static final String DIRECT_QUEUE_NAME = "directQueue"; //路由键,带有此键的消息,directExchange会将消息转发到directQueue上 public static final String ROUTING_KEY = "direct"; //构造名称为directExchange的直连交换机 @Bean public DirectExchange directExchange() { //使用最简单的构造函数,指定直连交换机的名称 return new DirectExchange(DIRECT_EXCHANGE_NAME); } //构造名称为directQueue的队列 @Bean public Queue queue() { //同样适用最简单的构造函数,指定队列的名称 return new Queue(DIRECT_QUEUE_NAME); } @Bean public Binding binding() { return BindingBuilder.bind(queue()).to(directExchange()).with(ROUTING_KEY); } }
通过接口产生消息
/** * @author qcy * @create 2020/08/06 16:07:18 */ @RestController public class MQController { @Resource RabbitTemplate rabbitTemplate; @PostMapping("/sendMsg") public Map<String, Object> sendMsg(@RequestBody Map<String, Object> map) { //将此消息发送到名称为MQConfig.DIRECT_EXCHANGE_NAME的交换机上,路由键是MQConfig.ROUTING_KEY //因此该消息,最终会被转发到名称为MQConfig.DIRECT_QUEUE_NAME的队列上 rabbitTemplate.convertAndSend(MQConfig.DIRECT_EXCHANGE_NAME, MQConfig.ROUTING_KEY, map); return map; } }
使用Postman测试
在未发消息前,是没有任何队列与消息的:
调用之后,创建了名称为directQueue的队列,并且其中有一条消息等待消费
现在创建一个消费者
还是需要引入刚才的依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
application.yaml和生产者的配置几乎一致
server: port: 8092 spring: application: name: consumer rabbitmq: host: 127.0.0.1 port: 5672 username: admin password: admin
消费者只需要接收消息,监听队列即可
/** * @author qcy * @create 2020/08/06 16:41:14 */ @Component @RabbitListener(queues = "directQueue") public class MQListener { @RabbitHandler public void handle(Map<String, Object> map) { map.forEach((k, v) -> System.out.println("(" + k + "," + v + ")")); } }
启动消费者后,可以看到消费了消息:
队列中的消息数量又变为了0
至此,一个简单的集成就做好了。其实这边交换机的类型不止Direct一种,还有很多种。
下一篇,将会演示如何确认消息。确认机制在一定程度上,可以解决消息丢失的问题。