文章目录
RabbitMQ-work queue工作队列 和 fair dispatch公平分发
1.work queue 工作队列
轮询分发
1.1消息生产者
package com.ithzk.rabbitmq.work;
import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/** * @author hzk * @date 2018/3/7 */
public class Send {
private static final String QUEUE_NAME = "test_work_queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//获取连接
Connection connection = RabbitMQConnectionUtils.getConnection();
//从连接中取得一个频道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
for (int i = 0 ; i < 50 ; i++){
String msg = "hello "+i;
System.out.println("[WQ] send:"+msg);
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
Thread.sleep(i*20);
}
channel.close();
connection.close();
}
}
1.2 消息消费者(2个消费者)
package com.ithzk.rabbitmq.work;
import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/** * @author hzk * @date 2018/3/7 */
public class Recv {
private static final String QUEUE_NAME = "test_work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = RabbitMQConnectionUtils.getConnection();
//从连接中获取频道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//定义消费者
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[1] Recv msg:" + msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[1] done");
}
}
};
//轮询分发 一个消费者拿一个处理 不考虑消费者消费能力 或者消费是否结束 两个消费者各一半
//自动应答 消费完会自动返回消费情况
boolean autoAck = true;
channel.basicConsume(QUEUE_NAME,autoAck,defaultConsumer);
}
}
package com.ithzk.rabbitmq.work;
import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/** * @author hzk * @date 2018/3/7 */
public class Recv2 {
private static final String QUEUE_NAME = "test_work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = RabbitMQConnectionUtils.getConnection();
//从连接中获取频道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//定义消费者
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[2] Recv msg:" + msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[2] done");
}
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME,autoAck,defaultConsumer);
}
}
2. 公平分发
默认情况下,消息队列不管消费者是否处理完毕消息,都会继续发送下一条消息,有一种机制可以避免这种情况,这种机制使得,使得每个消费者发送确认信号前,消息队列不会发送消息,也就是一个消费者发送确认前一次只处理一条消息,同时消息会在消息队列堆积
2.1 消息生产者
package com.ithzk.rabbitmq.fairWork;
import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/** * @author hzk * @date 2018/3/7 */
public class Send {
private static final String QUEUE_NAME = "test_work_queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//获取连接
Connection connection = RabbitMQConnectionUtils.getConnection();
//从连接中取得一个频道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
/** * 每个消费者 发送确认消息之前,消息队列不发送下一个消息到消费者,一次只处理一个消息 * * 限制发送给同一个消费者 不得超过一条消息 */
int perfetchCount = 1;
channel.basicQos(perfetchCount);
for (int i = 0 ; i < 50 ; i++){
String msg = "hello "+i;
System.out.println("[WQ] send:"+msg);
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
Thread.sleep(i*5);
}
channel.close();
connection.close();
}
}
2.2 消息消费者
package com.ithzk.rabbitmq.fairWork;
import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/** * @author hzk * @date 2018/3/7 */
public class Recv {
private static final String QUEUE_NAME = "test_work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = RabbitMQConnectionUtils.getConnection();
//从连接中获取频道
final Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//保证一次只分发一个
channel.basicQos(1);
//定义消费者
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[1] Recv msg:" + msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[1] done");
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
//公平分发 消费能力快的拿得快 能者多劳
//使用公平分发 必须关闭自动应答 ack改成手动
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME,autoAck,defaultConsumer);
}
}
package com.ithzk.rabbitmq.fairWork;
import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/** * @author hzk * @date 2018/3/7 */
public class Recv2 {
private static final String QUEUE_NAME = "test_work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = RabbitMQConnectionUtils.getConnection();
//从连接中获取频道
final Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//保证一次只分发一个
channel.basicQos(1);
//定义消费者
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[2] Recv msg:" + msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[2] done");
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME,autoAck,defaultConsumer);
}
}