写在前面

这里介绍一下RabbitMQ客户端的操作方式

一、项目构建

1.1、 引入依赖

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

1.2、yml 配置

spring:
  rabbitmq:
    addresses: ip:5672
    username: root
    password: root
    virtual-host: /vhost
    # 开启confirms回调 P -> Exchange
    publisher-confirms: true
    # 开启returnedMessage回调 Exchange -> Queue
    publisher-returns: true
    # 设置手动确认(ack) Queue 
    listener:
      simple:
        acknowledge-mode: manual
    template:
      mandatory: true

1.3、producer

  • 先定义回掉函数
 //回调函数: confirm确认
    final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
   
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
   
            System.err.println("correlationData: " + correlationData);
            String messageId = correlationData.getId();
            if (ack) {
   
                //如果confirm返回成功
            } else {
   
                //失败则进行具体的后续操作:重试 或者补偿等手段
                System.err.println("异常处理...");
            }
        }
    };
  • producer 1
    public void sendOrder(Order order) throws Exception {
   
        // 通过实现 ConfirmCallback 接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中
        rabbitTemplate.setConfirmCallback(confirmCallback);
        // 消息唯一ID
        CorrelationData correlationData = new CorrelationData(order.getMessageId());
        rabbitTemplate.convertAndSend("ls-exchange", "ls.ABC", order, correlationData);
    }
  • producer 2
    public void sendOrder3(Order order) throws Exception {
   
        rabbitTemplate.setConfirmCallback(confirmCallback);
        CorrelationData correlationData = new CorrelationData(order.getMessageId());
        rabbitTemplate.convertAndSend("ex1",
                "ls.ABC",
                order,
                message -> {
   
                    message.getMessageProperties().setPriority(1);
                    return message;
                }, correlationData
        );
    }

1.4、consumer

  //配置监听的哪一个队列,同时在没有queue和exchange的情况下会去创建并建立绑定关系
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "ls-queue", durable = "true"),
            exchange = @Exchange(name = "ls-exchange", durable = "true", type = "topic"),
            key = "ls.*"))
    @RabbitHandler//如果有消息过来,在消费的时候调用这个方法
    public void onOrderMessage(@Payload Order order, @Headers Map<String, Object> headers, Channel channel) throws IOException {
   
        logger.info("---收到消息,开始消费--,订单ID:{},Header:{},channel:{}", order.getId(), headers.toString(), channel.toString());
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
 
        channel.basicAck(deliveryTag, false);
    }
 /** * 不定义交换机,收发消息 * * @param order * @param headers * @param channel * @throws IOException */
    @RabbitListener(queues = "defaultExchangeQueue", concurrency = "5-40")
    public void onOrderMessage3(@Payload String order, @Headers Map<String, Object> headers, Channel channel) throws IOException {
   
        //消费者操作
        logger.info("--不定义交换机,开始消费--,订单ID:{},Header:{},channel:{}", order, headers.toString(), channel.toString());
        logger.info("重试次数:{}", headers.get("retry"));
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        //ACK,确认一条消息已经被消费
        channel.basicAck(deliveryTag, false);
    }