一、helloword
生产者
package com.xwtec.mq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xwtec.utils.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Liyongzhe
* @date 2021/4/20_9:39
* 消息队列 生产者
*/
public class Producer {
//定义交换机名称
static final String EXCHANGE_PRODUCER_ONE = "EXCHANGE_PRODUCER_ONE";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接
Connection connection = ConnectionUtils.getConnection();
//创建频道
Channel channel = connection.createChannel();
/**
* 设置连接参数
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接
* 参数4:是否在不使用的时候自动删除队列
* 参数5:队列其它参数
*/
channel.queueDeclare(EXCHANGE_PRODUCER_ONE,true,false,false,null);
//需要传递的消息
String catchMSG = "this message need catch out! number_" + i +" put ";
/*
* 向server发布一条消息
* 参数1:exchange名字,若为空则使用默认的exchange
* 参数2:路由key,简单模式可以传递队列名称
* 参数3:其他的属性
* 参数4:消息体
* RabbitMQ默认有一个exchange,叫default exchange,它用一个空字符串表示,它是direct exchange类型,
* 任何发往这个exchange的消息都会被路由到routing key的名字对应的队列上,如果没有对应的队列,则消息会被丢弃
*/
channel.basicPublish("",EXCHANGE_PRODUCER_ONE,null,catchMSG.getBytes());
System.out.println("已发送消息:" + catchMSG);
//关闭流资源
channel.close();
connection.close();
}
}
连接工具类
package com.xwtec.utils;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Liyongzhe
* @date 2021/4/20_10:25
*
* 抽取连接类
*/
public class ConnectionUtils {
public static Connection getConnection() throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory rabbitMQConnectionFactory = new ConnectionFactory();
//主机地址
rabbitMQConnectionFactory.setHost("localhost");
//连接端口
rabbitMQConnectionFactory.setPort(5672);
//虚拟主机地址
rabbitMQConnectionFactory.setVirtualHost("/xwtec");
//设置用户名
rabbitMQConnectionFactory.setUsername("liyongzhe");
//设置密码
rabbitMQConnectionFactory.setPassword("7999");
//
return rabbitMQConnectionFactory.newConnection();
}
}
消费者
package com.xwtec.mq;
import com.rabbitmq.client.*;
import com.xwtec.utils.ConnectionUtils;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.TimeoutException;
/**
* @author Liyongzhe
* @date 2021/4/20_10:57
*/
public class ConsumerClone {
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = ConnectionUtils.getConnection();
//创建通道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(Producer.EXCHANGE_PRODUCER_ONE,true,false,false,null);
//设置每次处理两条事情
channel.basicQos(2);
//监听事件
channel.basicConsume(Producer.EXCHANGE_PRODUCER_ONE,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties,byte[] body) throws UnsupportedEncodingException {
try {
//路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
//交换机
System.out.println("交换机为:" + envelope.getExchange());
//消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("ConsumerClone接收到的消息为:" + new String(body, "utf-8"));
Thread.sleep(100);
//消息确认 手动消息确认
channel.basicAck(envelope.getDeliveryTag(),false);
}catch (Exception e){
e.printStackTrace();
}
}
});
}
}二、workquen模式
生产者
package com.xwtec.mq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xwtec.utils.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Liyongzhe
* @date 2021/4/20_9:39
* 消息队列 生产者
*/
public class Producer {
//定义交换机名称
static final String EXCHANGE_PRODUCER_ONE = "EXCHANGE_PRODUCER_ONE";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接
Connection connection = ConnectionUtils.getConnection();
//创建频道
Channel channel = connection.createChannel();
/**
* 设置连接参数
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接
* 参数4:是否在不使用的时候自动删除队列
* 参数5:队列其它参数
*/
channel.queueDeclare(EXCHANGE_PRODUCER_ONE,true,false,false,null);
//工作队列模式下的workQueen 创建1000个需要发送的消息
for (int i = 1; i < 30; i++) {
//需要传递的消息
String catchMSG = "this message need catch out! number_" + i +" put ";
/*
* 向server发布一条消息
* 参数1:exchange名字,若为空则使用默认的exchange
* 参数2:路由key,简单模式可以传递队列名称
* 参数3:其他的属性
* 参数4:消息体
* RabbitMQ默认有一个exchange,叫default exchange,它用一个空字符串表示,它是direct exchange类型,
* 任何发往这个exchange的消息都会被路由到routing key的名字对应的队列上,如果没有对应的队列,则消息会被丢弃
*/
channel.basicPublish("",EXCHANGE_PRODUCER_ONE,null,catchMSG.getBytes());
System.out.println("已发送消息:" + catchMSG);
}
//关闭流资源
channel.close();
connection.close();
}
}
消费者1
package com.xwtec.mq;
import com.rabbitmq.client.*;
import com.xwtec.utils.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Liyongzhe
* @date 2021/4/20_10:31
*
* rabbitMQ消费者
*/
public class Consumer {
// private static final String EXCHANGE_CONSUMER_ONE = "EXCHANGE_CONSUMER_ONE";
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = ConnectionUtils.getConnection();
//创建通道
Channel channel = connection.createChannel();
// 声明(创建)队列
/*
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接
* 参数4:是否在不使用的时候自动删除队列
* 参数5:队列其它参数
*/
channel.queueDeclare(Producer.EXCHANGE_PRODUCER_ONE,true,false,false,null);
//设置一次只能处理一条消息
channel.basicQos(1);
//创建消费者;并设置消息处理
DefaultConsumer consumer = new DefaultConsumer(channel){
/**
* consumerTag 消息者标签,在channel.basicConsume时候可以指定
* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
(收到消息失败后是否需要重新发送)
* properties 属性信息
* body 消息
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
//路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
//交换机
System.out.println("交换机为:" + envelope.getExchange());
//消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("Consumer接收到的消息为:" + new String(body, "utf-8"));
//设置
Thread.sleep(1000);
//消息确认
channel.basicAck(envelope.getDeliveryTag(),false);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//监听消息
/*
* 参数1:队列名称
* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消
* 息,设置为false则需要手动确认
* 参数3:消息接收到后回调
*/
channel.basicConsume(Producer.EXCHANGE_PRODUCER_ONE, true, consumer);
//不关闭资源,应该一直监听消息
//channel.close();
//connection.close();
}
}
消费者2
package com.xwtec.mq;
import com.rabbitmq.client.*;
import com.xwtec.utils.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Liyongzhe
* @date 2021/4/20_10:31
*
* rabbitMQ消费者
*/
public class ConsumerTwo {
// private static final String EXCHANGE_CONSUMER_ONE = "EXCHANGE_CONSUMER_ONE";
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = ConnectionUtils.getConnection();
//创建通道
Channel channel = connection.createChannel();
// 声明(创建)队列
/*
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接
* 参数4:是否在不使用的时候自动删除队列
* 参数5:队列其它参数
*/
channel.queueDeclare(Producer.EXCHANGE_PRODUCER_ONE,true,false,false,null);
//设置一次只能处理一条消息
channel.basicQos(1);
//创建消费者;并设置消息处理
DefaultConsumer consumer = new DefaultConsumer(channel){
/**
* consumerTag 消息者标签,在channel.basicConsume时候可以指定
* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
(收到消息失败后是否需要重新发送)
* properties 属性信息
* body 消息
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
//路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
//交换机
System.out.println("交换机为:" + envelope.getExchange());
//消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("ConsumerTwo接收到的消息为:" + new String(body, "utf-8"));
//设置
Thread.sleep(1000);
//消息确认
channel.basicAck(envelope.getDeliveryTag(),false);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//监听消息
/*
* 参数1:队列名称
* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消
* 息,设置为false则需要手动确认
* 参数3:消息接收到后回调
*/
channel.basicConsume(Producer.EXCHANGE_PRODUCER_ONE, true, consumer);
//不关闭资源,应该一直监听消息
//channel.close();
//connection.close();
}
}
先启动两个消费者 再启动生产者 会出现平均竞争模式
三、发布订阅模式
生产者
package com.xwtec.mq;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xwtec.utils.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Liyongzhe
* @date 2021/4/20_15:06
* 生产者发布订阅模式
*/
public class FanoutProducer {
//定义交换机名称
static final String FANOUT_EXCHANGE_ONE = "FANOUT_EXCHANGE_ONE";
static final String FANOUT_EXCHANGE_TWO = "FANOUT_EXCHANGE_TWO";
//定义队列名称
static final String FANOUT_QUEEN_ONE = "FANOUT_QUEEN_ONE";
static final String FANOUT_QUEEN_TWO = "FANOUT_QUEEN_TWO";
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = ConnectionUtils.getConnection();
//创建通道
Channel channel = connection.createChannel();
/*
* 声明交换机
* 参数1:交换机名称
* 参数2:交换机类型,fanout、topic、direct、headers
*/
channel.exchangeDeclare(FANOUT_EXCHANGE_ONE, BuiltinExchangeType.FANOUT);
/*
* 声明队列
* 设置连接参数
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接
* 参数4:是否在不使用的时候自动删除队列
* 参数5:队列其它参数
*/
channel.queueDeclare(FANOUT_QUEEN_ONE,true,false,false,null);
channel.queueDeclare(FANOUT_QUEEN_TWO,true,false,false,null);
//队列绑定交换机
channel.queueBind(FANOUT_QUEEN_ONE,FANOUT_EXCHANGE_ONE,"");
channel.queueBind(FANOUT_QUEEN_TWO,FANOUT_EXCHANGE_ONE,"");
for (int i = 1; i <= 10; i++) {
String fanoutMSG = "RabbitMQ bind exchange by this order "+i;
/*
* 向server发布一条消息
* 参数1:exchange名字,若为空则使用默认的exchange
* 参数2:路由key,简单模式可以传递队列名称
* 参数3:其他的属性
* 参数4:消息体
* RabbitMQ默认有一个exchange,叫default exchange,它用一个空字符串表示,它是direct exchange类型,
* 任何发往这个exchange的消息都会被路由到routing key的名字对应的队列上,如果没有对应的队列,则消息会被丢弃
*/
channel.basicPublish(FANOUT_EXCHANGE_ONE,"",null,fanoutMSG.getBytes());
System.out.println("已发送信息"+fanoutMSG);
}
// 关闭资源
channel.close();
connection.close();
}
}
消费者1
package com.xwtec.mq;
import com.rabbitmq.client.*;
import com.xwtec.utils.ConnectionUtils;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.TimeoutException;
/**
* @author Liyongzhe
* @date 2021/4/20_16:17
* 广播模式下的消费者一
*/
public class FanoutConsumerOne {
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = ConnectionUtils.getConnection();
//获取频道
Channel channel = connection.createChannel();
/*
* 声明队列
* 设置连接参数
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接
* 参数4:是否在不使用的时候自动删除队列
* 参数5:队列其它参数
*/
channel.queueDeclare(FanoutProducer.FANOUT_QUEEN_ONE,true,false,false,null);
/*
* 声明交换机
* 参数1:交换机名称
* 参数2:交换机类型,fanout、topic、direct、headers
*/
channel.exchangeDeclare(FanoutProducer.FANOUT_EXCHANGE_ONE, BuiltinExchangeType.FANOUT);
//队列绑定交换机
channel.queueBind(FanoutProducer.FANOUT_QUEEN_ONE,FanoutProducer.FANOUT_EXCHANGE_ONE,"");
//创建消费者 并设置消息处理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
/**
* consumerTag 消息者标签,在channel.basicConsume时候可以指定
* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
* (收到消息失败后是否需要重新发送)
* properties 属性信息
* body 消息
*/
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties,byte[] body) throws UnsupportedEncodingException {
//路由key
System.out.println("路由key__"+envelope.getRoutingKey());
//交换机
System.out.println("交换机__"+envelope.getExchange());
//消息id
System.out.println("消息ID__"+envelope.getDeliveryTag());
//收到的消息
System.out.println("消费者1--接收到的消息为:"+new String(body,"utf-8"));
}
};
//消息监听
/*
* 参数1:队列名称
* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
* 参数3:消息接收到后回调
*/
channel.basicConsume(FanoutProducer.FANOUT_QUEEN_ONE,true,consumer);
//不关闭资源,应该一直监听消息
//channel.close();
//connection.close();
}
}
消费者2
package com.xwtec.mq;
import com.rabbitmq.client.*;
import com.xwtec.utils.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Liyongzhe
* @date 2021/4/20_16:50
* 广播模式下消费者二
*/
public class FanoutConsumerTwo {
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = ConnectionUtils.getConnection();
//获取频道
Channel channel = connection.createChannel();
/*
* 声明交换机
* 参数1 交换机名称
* 参数2 交换机类型
*/
channel.exchangeDeclare(FanoutProducer.FANOUT_EXCHANGE_ONE, BuiltinExchangeType.FANOUT);
/*
* 声明队列
* 队列
* 是否持久化队列
* 是否独占本次连接
* 是否不在使用时自动删除队列
* 队列的其他参数
*/
channel.queueDeclare(FanoutProducer.FANOUT_QUEEN_TWO,true,false,false,null);
//交换机队列绑定
channel.queueBind(FanoutProducer.FANOUT_QUEEN_TWO,FanoutProducer.FANOUT_EXCHANGE_ONE,"");
//创建消费者,设置消息处理
DefaultConsumer consumer = new DefaultConsumer(channel){
/**
*
* @param consumerTag 消息者标签,在channel.basicConsume时候可以指定
* @param envelope 包含deliveryTag、exchange、routingKey等信息
* @param properties BasicProperties对象,即消息生产时设置的该对象特性 属性信息
* @param body 消息体byte数组
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key__"+envelope.getRoutingKey());
//交换机
System.out.println("交换机__"+envelope.getExchange());
//消息id
System.out.println("消息ID__"+envelope.getDeliveryTag());
//收到的消息
System.out.println("消费者2--接收到的消息为:"+new String(body,"utf-8"));
}
};
//监听 监听肯定是队列的使用情况
/*
* 参数1:队列名称
* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
* 参数3:消息接收到后回调
*/
channel.basicConsume(FanoutProducer.FANOUT_QUEEN_TWO,true,consumer);
//不关闭资源,应该一直监听消息
//channel.close();
//connection.close();
}
}
先启动两个消费者 再启动生产者
四、采用路由key模式
先启动两个消费者 再启动生产者
生产者
package com.xwtec.mq;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xwtec.utils.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Liyongzhe
* @date 2021/4/21_9:35
* 定向,把消息交给符合指定routing key 的队列 也称作路由key模式
*/
public class DirectProducer {
//交换机名称
static final String DIRECTOR_EXCHANGE_ONE = "DIRECTOR_EXCHANGE_ONE";
//队列名称——新增
static final String DIRECTOR_QUEEN_INSERT = "DIRECTOR_QUEEN_INSERT";
//队列名称——修改
static final String DIRECTOR_QUEEN_UPDATE = "DIRECTOR_QUEEN_UPDATE";
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = ConnectionUtils.getConnection();
//创建通道
Channel channel = connection.createChannel();
//声明交换机 交换机的名称 和 采取哪种交换机
channel.exchangeDeclare(DIRECTOR_EXCHANGE_ONE, BuiltinExchangeType.DIRECT);
/*
* 声明队列
*
* 队列名称
* 是否持久化 可以在RabbitMQ崩溃后恢复消息
* 是否独占本次连接 表示一个队列只能被一个消费者占有并消费
* 是否在不使用的时候自动删除队列
* 队列的其他参数
*/
channel.queueDeclare(DIRECTOR_QUEEN_INSERT,true,false,false,null);
//队列绑定交换机 因为队列与交换机的对应关系是一对一的 交换机与队列的对应关系式一对多的
channel.queueBind(DIRECTOR_QUEEN_INSERT,DIRECTOR_EXCHANGE_ONE,"insert");
channel.queueBind(DIRECTOR_QUEEN_UPDATE,DIRECTOR_EXCHANGE_ONE,"update");
/**
* 消息发送
* 定义消息
* 交换机名称
* 路由key
* 其他属性
* 消息内容
*/
String insertMsg = "小兔子要吃新的胡萝卜——采用routing key模式——insert";
String updateMsg = "小兔子换个胡萝卜吃——采用routing key——update";
channel.basicPublish(DIRECTOR_EXCHANGE_ONE,"insert",null,insertMsg.getBytes());
System.out.println("已发送消息:"+insertMsg);
channel.basicPublish(DIRECTOR_EXCHANGE_ONE,"update",null,updateMsg.getBytes());
System.out.println("已发送消息:"+updateMsg);
//关闭资源
channel.close();
connection.close();
}
}
消费者一
package com.xwtec.mq;
import com.rabbitmq.client.*;
import com.xwtec.utils.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Liyongzhe
* @date 2021/4/21_10:23
* 定向传递消息 采取路由key模式
*/
public class DirectConsumerInsert {
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = ConnectionUtils.getConnection();
//设置频道
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(DirectProducer.DIRECTOR_EXCHANGE_ONE, BuiltinExchangeType.DIRECT);
//声明队列
channel.queueDeclare(DirectProducer.DIRECTOR_QUEEN_INSERT,true,false,false,null);
//交换机队列绑定
channel.queueBind(DirectProducer.DIRECTOR_QUEEN_INSERT,DirectProducer.DIRECTOR_EXCHANGE_ONE,"insert");
//消费消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key__"+envelope.getRoutingKey());
//交换机
System.out.println("交换机__"+envelope.getExchange());
//消息id
System.out.println("消息ID__"+envelope.getDeliveryTag());
//收到的消息
System.out.println("insert——定向routing key模式--接收到的消息为:"+new String(body,"utf-8"));
}
};
//监听消息
/*
* 参数1:队列名称
* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消
息,设置为false则需要手动确认
* 参数3:消息接收到后回调
*/
channel.basicConsume(DirectProducer.DIRECTOR_QUEEN_INSERT,true,consumer);
}
}
消费者二
package com.xwtec.mq;
import com.rabbitmq.client.*;
import com.xwtec.utils.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Liyongzhe
* @date 2021/4/21_10:38
* 定向routing key 模式
*/
public class DirectConsumerUpdate {
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = ConnectionUtils.getConnection();
//设置频道
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(DirectProducer.DIRECTOR_EXCHANGE_ONE, BuiltinExchangeType.DIRECT);
//声明队列
channel.queueDeclare(DirectProducer.DIRECTOR_QUEEN_UPDATE,true,false,false,null);
//交换机队列绑定
channel.queueBind(DirectProducer.DIRECTOR_QUEEN_UPDATE,DirectProducer.DIRECTOR_EXCHANGE_ONE,"update");
//消费消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key__"+envelope.getRoutingKey());
//交换机
System.out.println("交换机__"+envelope.getExchange());
//消息id
System.out.println("消息ID__"+envelope.getDeliveryTag());
//收到的消息
System.out.println("update——定向routing key模式--接收到的消息为:"+new String(body,"utf-8"));
}
};
//监听消息
/*
* 参数1:队列名称
* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消
息,设置为false则需要手动确认
* 参数3:消息接收到后回调
*/
channel.basicConsume(DirectProducer.DIRECTOR_QUEEN_UPDATE,true,consumer);
}
}
五、Toptic 通配符模式
生产者
package com.xwtec.mq;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xwtec.utils.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Liyongzhe
* @date 2021/4/21_13:37
* topic通配符模式下的生产者 #:匹配一个或多个单词 *:匹配不多不少恰好一个词
*/
public class TopicProducer {
//定义交换机
static final String TOPIC_EXCHANGE = "TOPIC_EXCHANGE";
//定义队列
static final String TOPIC_QUEEN_UPDATE = "TOPIC_QUEEN_UPDATE";
static final String TOPIC_QUEEN_DELETE = "TOPIC_QUEEN_DELETE";
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = ConnectionUtils.getConnection();
//设置频道
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);
//声明队列
channel.queueDeclare(TOPIC_QUEEN_UPDATE,true,false,false,null);
channel.queueDeclare(TOPIC_QUEEN_DELETE,true,false,false,null);
//此处不做队列绑定交换机 因为 不能设置路由key 一旦绑定 消息直接就从此通道发出。
//发送消息
String message = "小兔子 在TOPIC模式下 【拿】了一根胡萝卜! routing key = topic.insert";
channel.basicPublish(TOPIC_EXCHANGE,"topic.insert",null,message.getBytes());
System.out.println("小兔子 在TOPIC模式下 【拿】了一根胡萝卜! routing key = topic.insert");
message = "小兔子 在TOPIC模式下 【换】了一根胡萝卜! routing key = topic.update";
channel.basicPublish(TOPIC_EXCHANGE,"topic.update",null,message.getBytes());
System.out.println("小兔子 在TOPIC模式下 【换】了一根胡萝卜! routing key = topic.update");
message = "小兔子 在TOPIC模式下 【吃】了一根胡萝卜! routing key = topic.delete";
channel.basicPublish(TOPIC_EXCHANGE,"topic.delete",null,message.getBytes());
System.out.println("小兔子 在TOPIC模式下 【吃】了一根胡萝卜! routing key = topic.delete");
channel.close();
connection.close();
}
}
消费者1
package com.xwtec.mq;
import com.rabbitmq.client.*;
import com.xwtec.utils.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Liyongzhe
* @date 2021/4/21_14:18
*/
public class TopicConsumerOne {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(TopicProducer.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);
channel.queueDeclare(TopicProducer.TOPIC_QUEEN_UPDATE,true,false,false,null);
channel.queueBind(TopicProducer.TOPIC_QUEEN_UPDATE,TopicProducer.TOPIC_EXCHANGE,"topic.insert");
channel.queueBind(TopicProducer.TOPIC_QUEEN_UPDATE,TopicProducer.TOPIC_EXCHANGE,"topic.update");
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("EXCHANGE:"+envelope.getExchange());
System.out.println("ROUTING KEY:"+envelope.getRoutingKey());
System.out.println("ID:"+envelope.getDeliveryTag());
System.out.println("消费者一接收的消息"+new String(body,"utf-8"));
System.out.println("********************************************************");
}
};
channel.basicConsume(TopicProducer.TOPIC_QUEEN_UPDATE,true,consumer);
}
}
消费者2
package com.xwtec.mq;
import com.rabbitmq.client.*;
import com.xwtec.utils.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Liyongzhe
* @date 2021/4/21_14:18
*/
public class TopicConsumerTwo {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(TopicProducer.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);
channel.queueDeclare(TopicProducer.TOPIC_QUEEN_DELETE,true,false,false,null);
channel.queueBind(TopicProducer.TOPIC_QUEEN_DELETE,TopicProducer.TOPIC_EXCHANGE,"topic.*");
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("EXCHANGE:"+envelope.getExchange());
System.out.println("ROUTING KEY:"+envelope.getRoutingKey());
System.out.println("ID:"+envelope.getDeliveryTag());
System.out.println("消费者二接收的消息"+new String(body,"utf-8"));
System.out.println("********************************************************");
}
};
channel.basicConsume(TopicProducer.TOPIC_QUEEN_DELETE,true,consumer);
}
}
京公网安备 11010502036488号