消息可靠性投递

connection

Confirm & Return

Confirm

表示生产者将消息投递到Broker时的状态。

/*
 * 开启connectionFactory的publisher-confirms="true" 属性开启
 * 设置confirm监听
 */
rabbitTemplate.setConfirmCallback(new ConfirmCallback() {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            System.out.println("消息发送成功");
        } else {
            System.out.println("消息发送失败,失败原因:" + cause);
        }
    }
});
Return

表示消息被broker正常ack后,但发现没有可投递的队列时所表现的状态

/*
 * 开启connectionFactory的publisher-returns="true" 属性开启。
 * 设置setMandatory(true)为true时,消息投递失败才会回调return监听,否则丢弃消息。
 * 设置return监听。
 */
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(new ReturnCallback() {
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("消息投递失败,被broker退回");
    }
});

Consumer ACK

  • spring-rabbitmq-fanout-consumer.xml
<!--
 acknowledge="none" 自动ack
 acknowledge="manual" 手动ack
 acknowledge="auto" 根据异常情况确认(不常用)
 设置手动签收 acknowledge="manual"
-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
    <rabbit:listener ref="ackListener" queue-names="spring-fanout-queue-a" />
</rabbit:listener-container>
  • AckListener.java
@Component
public class AckListener implements ChannelAwareMessageListener {

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        System.out.println("message: " + message);

        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try {
            //  模拟业务异常
            int result = 1/0;
            // 正常签收
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            /*
             * 第三个参数 requeue,表示是否重回队列
             * 需要根据业务场景决定拒签类型
             * 消费失败可以记录失败的消息,做补偿机制
             */
            channel.basicNack(deliveryTag, false, true);
        }

    }
}

消费端限流

spring-rabbitmq-fanout-consumer.xml

<!--
 设置手动签收 acknowledge="manual"
 配置prefetch时,需要消费端的签收模式为手动签收
 prefetch="1" 表示消费端每次拉取一条
-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
    <rabbit:listener ref="ackListener" queue-names="spring-fanout-queue-a" />
</rabbit:listener-container>

QosListener.java

@Component
public class QosListener implements ChannelAwareMessageListener {

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        System.out.println("message: " + message);

        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        channel.basicAck(deliveryTag, false);
    }
}

TTL(Time to Live)

表示消息存活时间,消息到达存活时间,还未被消费时,会被清除掉。RabbitMQ支持对消息和队列设置过期时间。

以队列为单位

通过x-message-ttl参数设置队列消息的过期时间,单位ms

  • xml方式

<!-- ttl队列  -->
<rabbit:queue id="spring-topic-ttl-queue" name="spring-topic-ttl-queue" auto-declare="true">
<rabbit:queue-arguments>
    <entry key="x-message-ttl" value="5000" />
</rabbit:queue-arguments>
</rabbit:queue>
  • java config方式
// 声明队列
@Bean
public Queue topicQueue() {
    return QueueBuilder.durable(QUEUE_NAME).withArgument("x-message-ttl", 5000).build();
}

以消息为单位

通过expiration设置消息的过期时间,单位ms

判断逻辑: 在消息即将消费时,会校验该消息是否过期。

同时设置

x-message-ttl以及expiration两个参数都设置值,则以时间短的为准。

DLX(Dead Letter Exchange)

死信交换机,或者叫死信队列。消息变成Dead Message时,消息会被交由死信交换机处理。

成为Dead Message的条件

  • 队列消息长度到达上限,可通过参数x-max-length设置
  • 消费者Nackrequeuefalse
  • 设置了超时时间的消息或消息队列,消息到达超时时间未被消费

延迟队列

ttl+dlx

  • xml配置
<!-- ttl队列  -->
<rabbit:queue id="spring-topic-ttl-queue" name="spring-topic-ttl-queue" auto-declare="true">
<rabbit:queue-arguments>
  <!-- 消息5s后变成Dead Message -->
  <entry key="x-message-ttl" value="5000" value-type="java.lang.Integer" />
  <!-- 绑定死信交换机 -->
  <entry key="x-dead-letter-exchange" value="spring-topic-ttl-exchange" />
  <!-- 设置Dead Message发送给死信交换机时的routing key -->
  <entry key="x-dead-letter-routing-key" value="com.heoller" />
  <!-- x-max-length设置队列的长度(可选) -->
  <entry key="x-max-length" value="20" value-type="java.lang.Integer" />
</rabbit:queue-arguments>
</rabbit:queue>
<!-- 将ttl队列绑定到交换机 spring-topic-ttl-exchange  -->
<rabbit:topic-exchange id="spring-topic-ttl-exchange" name="spring-topic-ttl-exchange" auto-declare="true">
<rabbit:bindings>
  <rabbit:binding pattern="com.heoller.#" queue="spring-topic-ttl-queue" />
</rabbit:bindings>
</rabbit:topic-exchange>
<!-- 死信交换机(队列) -->
<rabbit:topic-exchange id="spring-topic-dlx-exchange" name="spring-topic-dlx-exchange" auto-declare="true">
<rabbit:bindings>
  <!-- 将死信队列和死信交换机绑定 -->
  <rabbit:binding pattern="*.heoller" queue="spring-topic-dlx-queue" />
</rabbit:bindings>
</rabbit:topic-exchange>
<!-- 死信队列 -->
<rabbit:queue id="spring-topic-dlx-queue" name="spring-topic-dlx-queue" auto-declare="true" />

消息积压处理

积压可能原因

  • 消费者宕机
  • 消费者消费效率低
  • 生产者流量过大

解决方案

  • 添加更多的消费者
  • 消费者将消息存储起来,在消费者本地慢慢处理。

重复消费