消息可靠性投递
Confirm & Return
Confirm
表示生产者将消息投递到Broker时的状态。
/*
* 开启connectionFactory的publisher-confirms="true" 属性开启
* 设置confirm监听
*/
rabbitTemplate.setConfirmCallback(new ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息发送成功");
} else {
System.out.println("消息发送失败,失败原因:" + cause);
}
}
});
Return
表示消息被broker正常ack后,但发现没有可投递的队列时所表现的状态
/*
* 开启connectionFactory的publisher-returns="true" 属性开启。
* 设置setMandatory(true)为true时,消息投递失败才会回调return监听,否则丢弃消息。
* 设置return监听。
*/
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(new ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("消息投递失败,被broker退回");
}
});
Consumer ACK
spring-rabbitmq-fanout-consumer.xml
<!--
acknowledge="none" 自动ack
acknowledge="manual" 手动ack
acknowledge="auto" 根据异常情况确认(不常用)
设置手动签收 acknowledge="manual"
-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
<rabbit:listener ref="ackListener" queue-names="spring-fanout-queue-a" />
</rabbit:listener-container>
AckListener.java
@Component
public class AckListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
System.out.println("message: " + message);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 模拟业务异常
int result = 1/0;
// 正常签收
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
/*
* 第三个参数 requeue,表示是否重回队列
* 需要根据业务场景决定拒签类型
* 消费失败可以记录失败的消息,做补偿机制
*/
channel.basicNack(deliveryTag, false, true);
}
}
}
消费端限流
spring-rabbitmq-fanout-consumer.xml
<!--
设置手动签收 acknowledge="manual"
配置prefetch时,需要消费端的签收模式为手动签收
prefetch="1" 表示消费端每次拉取一条
-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
<rabbit:listener ref="ackListener" queue-names="spring-fanout-queue-a" />
</rabbit:listener-container>
QosListener.java
@Component
public class QosListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
System.out.println("message: " + message);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, false);
}
}
TTL(Time to Live)
表示消息存活时间,消息到达存活时间,还未被消费时,会被清除掉。RabbitMQ支持对消息和队列设置过期时间。
以队列为单位
通过x-message-ttl
参数设置队列消息的过期时间,单位ms
- xml方式
<!-- ttl队列 -->
<rabbit:queue id="spring-topic-ttl-queue" name="spring-topic-ttl-queue" auto-declare="true">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value="5000" />
</rabbit:queue-arguments>
</rabbit:queue>
- java config方式
// 声明队列
@Bean
public Queue topicQueue() {
return QueueBuilder.durable(QUEUE_NAME).withArgument("x-message-ttl", 5000).build();
}
以消息为单位
通过expiration
设置消息的过期时间,单位ms
。
判断逻辑: 在消息即将消费时,会校验该消息是否过期。
同时设置
x-message-ttl
以及expiration
两个参数都设置值,则以时间短的为准。
DLX(Dead Letter Exchange)
死信交换机,或者叫死信队列。消息变成Dead Message时,消息会被交由死信交换机处理。
成为Dead Message的条件
- 队列消息长度到达上限,可通过参数
x-max-length
设置 - 消费者
Nack
且requeue
为false
- 设置了超时时间的消息或消息队列,消息到达超时时间未被消费
延迟队列
- xml配置
<!-- ttl队列 -->
<rabbit:queue id="spring-topic-ttl-queue" name="spring-topic-ttl-queue" auto-declare="true">
<rabbit:queue-arguments>
<!-- 消息5s后变成Dead Message -->
<entry key="x-message-ttl" value="5000" value-type="java.lang.Integer" />
<!-- 绑定死信交换机 -->
<entry key="x-dead-letter-exchange" value="spring-topic-ttl-exchange" />
<!-- 设置Dead Message发送给死信交换机时的routing key -->
<entry key="x-dead-letter-routing-key" value="com.heoller" />
<!-- x-max-length设置队列的长度(可选) -->
<entry key="x-max-length" value="20" value-type="java.lang.Integer" />
</rabbit:queue-arguments>
</rabbit:queue>
<!-- 将ttl队列绑定到交换机 spring-topic-ttl-exchange -->
<rabbit:topic-exchange id="spring-topic-ttl-exchange" name="spring-topic-ttl-exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding pattern="com.heoller.#" queue="spring-topic-ttl-queue" />
</rabbit:bindings>
</rabbit:topic-exchange>
<!-- 死信交换机(队列) -->
<rabbit:topic-exchange id="spring-topic-dlx-exchange" name="spring-topic-dlx-exchange" auto-declare="true">
<rabbit:bindings>
<!-- 将死信队列和死信交换机绑定 -->
<rabbit:binding pattern="*.heoller" queue="spring-topic-dlx-queue" />
</rabbit:bindings>
</rabbit:topic-exchange>
<!-- 死信队列 -->
<rabbit:queue id="spring-topic-dlx-queue" name="spring-topic-dlx-queue" auto-declare="true" />
消息积压处理
积压可能原因
- 消费者宕机
- 消费者消费效率低
- 生产者流量过大
解决方案
- 添加更多的消费者
- 消费者将消息存储起来,在消费者本地慢慢处理。