之前我们已经实现了一个发送者将消息发送到队列,有多个消费者从队列里面拿数据,但是这样多个消费者是轮询的方式从队列里面拿数据的,每一个消费者拿到的数据都一样多,现在我们想要实现的是能者多劳,咋实现这个呢?
什么是消息确认机制
rabbitmq软件为什么 默认是轮询的了,这个和软件的消息确认机制有一定的关系,那么什么是消息确认机制了?
我们先看消费者端的代码
public class Customer1 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMqUtils.getConnection();
// 创建通道
Channel channel = connection.createChannel();
// 让通道和消息队列进行绑定
channel.queueDeclare("work",false,false,false,null);
//接受消息
channel.basicConsume("work",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1==="+new String(body));
}
});
}
}
channel.basicConsume("work",true,new DefaultConsumer(channel)
第一个参数的意思: 要接受哪个队列里面的消息
第二个参数:消息确认机制 true false
第三个参数: 接受消息后的回调函数,在这个回调函数里面拿出队列里面的数据
当第二个参数为true的时候,一个消费者从消息队列里面拿出一个消息,这个消息队列就将队列里面的这个消息删除,消息队列是不管这个消费者拿这个消息干什么,也不管这个消息执行完没有,意思就是只要这个消费者拿了消息队列里面的一个消息,那么消息队列就删除队列中的信息。
现在我们的问题
现在我们使用默认的消息确认机制,当一个队列里面有10个消息,现在有两个消费者,那么如果是默认的,那么每一个消费者可以拿到5个消息,但是现在就有一个问题,如果消费者A拿到5个消息,在执行第2个 的时候,这个消费者宕机了,那么其他的3个消息咋办,那就丢失了啊,消息队列只要将消息给了消费者,那么消息队列里面的信息就删除了,现在消费者A也宕机了,其他的3个消息咋办,现在我们想要做的就是将这还没有处理的3个信息给了消费者B ,让消费者B 进行处理,实现能者多劳。
解决问题
1 不使用默认的消息确认机制
channel.basicConsume("work",false,new DefaultConsumer(channel){
第二个参数只要变为false,那么就不会使用默认的确认机制了。
即使我们的消费者已经将消息消费了,但是也不会自动的告诉队列,我已经消费了。
2 设置一个通道里面只是放一个消息
意思就是 一个消费者在一个通道里面只能消费一个消息,
所以,我们要告诉我们的通道,一次只能消费一个消息
源码:
Connection connection = RabbitMqUtils.getConnection();
// 创建通道
Channel channel = connection.createChannel();
channel.basicQos(1);
// 让通道和消息队列进行绑定
解释源码新增的一句话
channel.basicQos(1);这个的意思是告诉通道,一次只能消费一个消息
让通道和消息队列进行绑定
channel.queueDeclare("work",false,false,false,null);
channel.basicConsume("work",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1==="+new String(body));
channel.basicAck(envelope.getDeliveryTag(),true);
}
});
channel.basicAck(envelope.getDeliveryTag(),false); 手动确认消息