写在前面
这里介绍一下RabbitMQ客户端的操作方式
一、项目构建
1.1、 引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1.2、yml 配置
spring:
rabbitmq:
addresses: ip:5672
username: root
password: root
virtual-host: /vhost
# 开启confirms回调 P -> Exchange
publisher-confirms: true
# 开启returnedMessage回调 Exchange -> Queue
publisher-returns: true
# 设置手动确认(ack) Queue
listener:
simple:
acknowledge-mode: manual
template:
mandatory: true
1.3、producer
- 先定义回掉函数
//回调函数: confirm确认
final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.err.println("correlationData: " + correlationData);
String messageId = correlationData.getId();
if (ack) {
//如果confirm返回成功
} else {
//失败则进行具体的后续操作:重试 或者补偿等手段
System.err.println("异常处理...");
}
}
};
- producer 1
public void sendOrder(Order order) throws Exception {
// 通过实现 ConfirmCallback 接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中
rabbitTemplate.setConfirmCallback(confirmCallback);
// 消息唯一ID
CorrelationData correlationData = new CorrelationData(order.getMessageId());
rabbitTemplate.convertAndSend("ls-exchange", "ls.ABC", order, correlationData);
}
- producer 2
public void sendOrder3(Order order) throws Exception {
rabbitTemplate.setConfirmCallback(confirmCallback);
CorrelationData correlationData = new CorrelationData(order.getMessageId());
rabbitTemplate.convertAndSend("ex1",
"ls.ABC",
order,
message -> {
message.getMessageProperties().setPriority(1);
return message;
}, correlationData
);
}
1.4、consumer
//配置监听的哪一个队列,同时在没有queue和exchange的情况下会去创建并建立绑定关系
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "ls-queue", durable = "true"),
exchange = @Exchange(name = "ls-exchange", durable = "true", type = "topic"),
key = "ls.*"))
@RabbitHandler//如果有消息过来,在消费的时候调用这个方法
public void onOrderMessage(@Payload Order order, @Headers Map<String, Object> headers, Channel channel) throws IOException {
logger.info("---收到消息,开始消费--,订单ID:{},Header:{},channel:{}", order.getId(), headers.toString(), channel.toString());
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTag, false);
}
/** * 不定义交换机,收发消息 * * @param order * @param headers * @param channel * @throws IOException */
@RabbitListener(queues = "defaultExchangeQueue", concurrency = "5-40")
public void onOrderMessage3(@Payload String order, @Headers Map<String, Object> headers, Channel channel) throws IOException {
//消费者操作
logger.info("--不定义交换机,开始消费--,订单ID:{},Header:{},channel:{}", order, headers.toString(), channel.toString());
logger.info("重试次数:{}", headers.get("retry"));
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
//ACK,确认一条消息已经被消费
channel.basicAck(deliveryTag, false);
}