一、helloword
生产者
package com.xwtec.mq; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.xwtec.utils.ConnectionUtils; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author Liyongzhe * @date 2021/4/20_9:39 * 消息队列 生产者 */ public class Producer { //定义交换机名称 static final String EXCHANGE_PRODUCER_ONE = "EXCHANGE_PRODUCER_ONE"; public static void main(String[] args) throws IOException, TimeoutException { //创建连接 Connection connection = ConnectionUtils.getConnection(); //创建频道 Channel channel = connection.createChannel(); /** * 设置连接参数 * 参数1:队列名称 * 参数2:是否定义持久化队列 * 参数3:是否独占本次连接 * 参数4:是否在不使用的时候自动删除队列 * 参数5:队列其它参数 */ channel.queueDeclare(EXCHANGE_PRODUCER_ONE,true,false,false,null); //需要传递的消息 String catchMSG = "this message need catch out! number_" + i +" put "; /* * 向server发布一条消息 * 参数1:exchange名字,若为空则使用默认的exchange * 参数2:路由key,简单模式可以传递队列名称 * 参数3:其他的属性 * 参数4:消息体 * RabbitMQ默认有一个exchange,叫default exchange,它用一个空字符串表示,它是direct exchange类型, * 任何发往这个exchange的消息都会被路由到routing key的名字对应的队列上,如果没有对应的队列,则消息会被丢弃 */ channel.basicPublish("",EXCHANGE_PRODUCER_ONE,null,catchMSG.getBytes()); System.out.println("已发送消息:" + catchMSG); //关闭流资源 channel.close(); connection.close(); } }
连接工具类
package com.xwtec.utils; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author Liyongzhe * @date 2021/4/20_10:25 * * 抽取连接类 */ public class ConnectionUtils { public static Connection getConnection() throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory rabbitMQConnectionFactory = new ConnectionFactory(); //主机地址 rabbitMQConnectionFactory.setHost("localhost"); //连接端口 rabbitMQConnectionFactory.setPort(5672); //虚拟主机地址 rabbitMQConnectionFactory.setVirtualHost("/xwtec"); //设置用户名 rabbitMQConnectionFactory.setUsername("liyongzhe"); //设置密码 rabbitMQConnectionFactory.setPassword("7999"); // return rabbitMQConnectionFactory.newConnection(); } }
消费者
package com.xwtec.mq; import com.rabbitmq.client.*; import com.xwtec.utils.ConnectionUtils; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.concurrent.TimeoutException; /** * @author Liyongzhe * @date 2021/4/20_10:57 */ public class ConsumerClone { public static void main(String[] args) throws IOException, TimeoutException { //获取连接 Connection connection = ConnectionUtils.getConnection(); //创建通道 Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare(Producer.EXCHANGE_PRODUCER_ONE,true,false,false,null); //设置每次处理两条事情 channel.basicQos(2); //监听事件 channel.basicConsume(Producer.EXCHANGE_PRODUCER_ONE,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws UnsupportedEncodingException { try { //路由key System.out.println("路由key为:" + envelope.getRoutingKey()); //交换机 System.out.println("交换机为:" + envelope.getExchange()); //消息id System.out.println("消息id为:" + envelope.getDeliveryTag()); //收到的消息 System.out.println("ConsumerClone接收到的消息为:" + new String(body, "utf-8")); Thread.sleep(100); //消息确认 手动消息确认 channel.basicAck(envelope.getDeliveryTag(),false); }catch (Exception e){ e.printStackTrace(); } } }); } }
二、workquen模式
生产者
package com.xwtec.mq; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.xwtec.utils.ConnectionUtils; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author Liyongzhe * @date 2021/4/20_9:39 * 消息队列 生产者 */ public class Producer { //定义交换机名称 static final String EXCHANGE_PRODUCER_ONE = "EXCHANGE_PRODUCER_ONE"; public static void main(String[] args) throws IOException, TimeoutException { //创建连接 Connection connection = ConnectionUtils.getConnection(); //创建频道 Channel channel = connection.createChannel(); /** * 设置连接参数 * 参数1:队列名称 * 参数2:是否定义持久化队列 * 参数3:是否独占本次连接 * 参数4:是否在不使用的时候自动删除队列 * 参数5:队列其它参数 */ channel.queueDeclare(EXCHANGE_PRODUCER_ONE,true,false,false,null); //工作队列模式下的workQueen 创建1000个需要发送的消息 for (int i = 1; i < 30; i++) { //需要传递的消息 String catchMSG = "this message need catch out! number_" + i +" put "; /* * 向server发布一条消息 * 参数1:exchange名字,若为空则使用默认的exchange * 参数2:路由key,简单模式可以传递队列名称 * 参数3:其他的属性 * 参数4:消息体 * RabbitMQ默认有一个exchange,叫default exchange,它用一个空字符串表示,它是direct exchange类型, * 任何发往这个exchange的消息都会被路由到routing key的名字对应的队列上,如果没有对应的队列,则消息会被丢弃 */ channel.basicPublish("",EXCHANGE_PRODUCER_ONE,null,catchMSG.getBytes()); System.out.println("已发送消息:" + catchMSG); } //关闭流资源 channel.close(); connection.close(); } }
消费者1
package com.xwtec.mq; import com.rabbitmq.client.*; import com.xwtec.utils.ConnectionUtils; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author Liyongzhe * @date 2021/4/20_10:31 * * rabbitMQ消费者 */ public class Consumer { // private static final String EXCHANGE_CONSUMER_ONE = "EXCHANGE_CONSUMER_ONE"; public static void main(String[] args) throws IOException, TimeoutException { //获取连接 Connection connection = ConnectionUtils.getConnection(); //创建通道 Channel channel = connection.createChannel(); // 声明(创建)队列 /* * 参数1:队列名称 * 参数2:是否定义持久化队列 * 参数3:是否独占本次连接 * 参数4:是否在不使用的时候自动删除队列 * 参数5:队列其它参数 */ channel.queueDeclare(Producer.EXCHANGE_PRODUCER_ONE,true,false,false,null); //设置一次只能处理一条消息 channel.basicQos(1); //创建消费者;并设置消息处理 DefaultConsumer consumer = new DefaultConsumer(channel){ /** * consumerTag 消息者标签,在channel.basicConsume时候可以指定 * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志 (收到消息失败后是否需要重新发送) * properties 属性信息 * body 消息 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { //路由key System.out.println("路由key为:" + envelope.getRoutingKey()); //交换机 System.out.println("交换机为:" + envelope.getExchange()); //消息id System.out.println("消息id为:" + envelope.getDeliveryTag()); //收到的消息 System.out.println("Consumer接收到的消息为:" + new String(body, "utf-8")); //设置 Thread.sleep(1000); //消息确认 channel.basicAck(envelope.getDeliveryTag(),false); } catch (InterruptedException e) { e.printStackTrace(); } } }; //监听消息 /* * 参数1:队列名称 * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消 * 息,设置为false则需要手动确认 * 参数3:消息接收到后回调 */ channel.basicConsume(Producer.EXCHANGE_PRODUCER_ONE, true, consumer); //不关闭资源,应该一直监听消息 //channel.close(); //connection.close(); } }
消费者2
package com.xwtec.mq; import com.rabbitmq.client.*; import com.xwtec.utils.ConnectionUtils; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author Liyongzhe * @date 2021/4/20_10:31 * * rabbitMQ消费者 */ public class ConsumerTwo { // private static final String EXCHANGE_CONSUMER_ONE = "EXCHANGE_CONSUMER_ONE"; public static void main(String[] args) throws IOException, TimeoutException { //获取连接 Connection connection = ConnectionUtils.getConnection(); //创建通道 Channel channel = connection.createChannel(); // 声明(创建)队列 /* * 参数1:队列名称 * 参数2:是否定义持久化队列 * 参数3:是否独占本次连接 * 参数4:是否在不使用的时候自动删除队列 * 参数5:队列其它参数 */ channel.queueDeclare(Producer.EXCHANGE_PRODUCER_ONE,true,false,false,null); //设置一次只能处理一条消息 channel.basicQos(1); //创建消费者;并设置消息处理 DefaultConsumer consumer = new DefaultConsumer(channel){ /** * consumerTag 消息者标签,在channel.basicConsume时候可以指定 * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志 (收到消息失败后是否需要重新发送) * properties 属性信息 * body 消息 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { //路由key System.out.println("路由key为:" + envelope.getRoutingKey()); //交换机 System.out.println("交换机为:" + envelope.getExchange()); //消息id System.out.println("消息id为:" + envelope.getDeliveryTag()); //收到的消息 System.out.println("ConsumerTwo接收到的消息为:" + new String(body, "utf-8")); //设置 Thread.sleep(1000); //消息确认 channel.basicAck(envelope.getDeliveryTag(),false); } catch (InterruptedException e) { e.printStackTrace(); } } }; //监听消息 /* * 参数1:队列名称 * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消 * 息,设置为false则需要手动确认 * 参数3:消息接收到后回调 */ channel.basicConsume(Producer.EXCHANGE_PRODUCER_ONE, true, consumer); //不关闭资源,应该一直监听消息 //channel.close(); //connection.close(); } }
先启动两个消费者 再启动生产者 会出现平均竞争模式
三、发布订阅模式
生产者
package com.xwtec.mq; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.xwtec.utils.ConnectionUtils; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author Liyongzhe * @date 2021/4/20_15:06 * 生产者发布订阅模式 */ public class FanoutProducer { //定义交换机名称 static final String FANOUT_EXCHANGE_ONE = "FANOUT_EXCHANGE_ONE"; static final String FANOUT_EXCHANGE_TWO = "FANOUT_EXCHANGE_TWO"; //定义队列名称 static final String FANOUT_QUEEN_ONE = "FANOUT_QUEEN_ONE"; static final String FANOUT_QUEEN_TWO = "FANOUT_QUEEN_TWO"; public static void main(String[] args) throws IOException, TimeoutException { //获取连接 Connection connection = ConnectionUtils.getConnection(); //创建通道 Channel channel = connection.createChannel(); /* * 声明交换机 * 参数1:交换机名称 * 参数2:交换机类型,fanout、topic、direct、headers */ channel.exchangeDeclare(FANOUT_EXCHANGE_ONE, BuiltinExchangeType.FANOUT); /* * 声明队列 * 设置连接参数 * 参数1:队列名称 * 参数2:是否定义持久化队列 * 参数3:是否独占本次连接 * 参数4:是否在不使用的时候自动删除队列 * 参数5:队列其它参数 */ channel.queueDeclare(FANOUT_QUEEN_ONE,true,false,false,null); channel.queueDeclare(FANOUT_QUEEN_TWO,true,false,false,null); //队列绑定交换机 channel.queueBind(FANOUT_QUEEN_ONE,FANOUT_EXCHANGE_ONE,""); channel.queueBind(FANOUT_QUEEN_TWO,FANOUT_EXCHANGE_ONE,""); for (int i = 1; i <= 10; i++) { String fanoutMSG = "RabbitMQ bind exchange by this order "+i; /* * 向server发布一条消息 * 参数1:exchange名字,若为空则使用默认的exchange * 参数2:路由key,简单模式可以传递队列名称 * 参数3:其他的属性 * 参数4:消息体 * RabbitMQ默认有一个exchange,叫default exchange,它用一个空字符串表示,它是direct exchange类型, * 任何发往这个exchange的消息都会被路由到routing key的名字对应的队列上,如果没有对应的队列,则消息会被丢弃 */ channel.basicPublish(FANOUT_EXCHANGE_ONE,"",null,fanoutMSG.getBytes()); System.out.println("已发送信息"+fanoutMSG); } // 关闭资源 channel.close(); connection.close(); } }
消费者1
package com.xwtec.mq; import com.rabbitmq.client.*; import com.xwtec.utils.ConnectionUtils; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.concurrent.TimeoutException; /** * @author Liyongzhe * @date 2021/4/20_16:17 * 广播模式下的消费者一 */ public class FanoutConsumerOne { public static void main(String[] args) throws IOException, TimeoutException { //获取连接 Connection connection = ConnectionUtils.getConnection(); //获取频道 Channel channel = connection.createChannel(); /* * 声明队列 * 设置连接参数 * 参数1:队列名称 * 参数2:是否定义持久化队列 * 参数3:是否独占本次连接 * 参数4:是否在不使用的时候自动删除队列 * 参数5:队列其它参数 */ channel.queueDeclare(FanoutProducer.FANOUT_QUEEN_ONE,true,false,false,null); /* * 声明交换机 * 参数1:交换机名称 * 参数2:交换机类型,fanout、topic、direct、headers */ channel.exchangeDeclare(FanoutProducer.FANOUT_EXCHANGE_ONE, BuiltinExchangeType.FANOUT); //队列绑定交换机 channel.queueBind(FanoutProducer.FANOUT_QUEEN_ONE,FanoutProducer.FANOUT_EXCHANGE_ONE,""); //创建消费者 并设置消息处理 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override /** * consumerTag 消息者标签,在channel.basicConsume时候可以指定 * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志 * (收到消息失败后是否需要重新发送) * properties 属性信息 * body 消息 */ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws UnsupportedEncodingException { //路由key System.out.println("路由key__"+envelope.getRoutingKey()); //交换机 System.out.println("交换机__"+envelope.getExchange()); //消息id System.out.println("消息ID__"+envelope.getDeliveryTag()); //收到的消息 System.out.println("消费者1--接收到的消息为:"+new String(body,"utf-8")); } }; //消息监听 /* * 参数1:队列名称 * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认 * 参数3:消息接收到后回调 */ channel.basicConsume(FanoutProducer.FANOUT_QUEEN_ONE,true,consumer); //不关闭资源,应该一直监听消息 //channel.close(); //connection.close(); } }
消费者2
package com.xwtec.mq; import com.rabbitmq.client.*; import com.xwtec.utils.ConnectionUtils; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author Liyongzhe * @date 2021/4/20_16:50 * 广播模式下消费者二 */ public class FanoutConsumerTwo { public static void main(String[] args) throws IOException, TimeoutException { //获取连接 Connection connection = ConnectionUtils.getConnection(); //获取频道 Channel channel = connection.createChannel(); /* * 声明交换机 * 参数1 交换机名称 * 参数2 交换机类型 */ channel.exchangeDeclare(FanoutProducer.FANOUT_EXCHANGE_ONE, BuiltinExchangeType.FANOUT); /* * 声明队列 * 队列 * 是否持久化队列 * 是否独占本次连接 * 是否不在使用时自动删除队列 * 队列的其他参数 */ channel.queueDeclare(FanoutProducer.FANOUT_QUEEN_TWO,true,false,false,null); //交换机队列绑定 channel.queueBind(FanoutProducer.FANOUT_QUEEN_TWO,FanoutProducer.FANOUT_EXCHANGE_ONE,""); //创建消费者,设置消息处理 DefaultConsumer consumer = new DefaultConsumer(channel){ /** * * @param consumerTag 消息者标签,在channel.basicConsume时候可以指定 * @param envelope 包含deliveryTag、exchange、routingKey等信息 * @param properties BasicProperties对象,即消息生产时设置的该对象特性 属性信息 * @param body 消息体byte数组 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //路由key System.out.println("路由key__"+envelope.getRoutingKey()); //交换机 System.out.println("交换机__"+envelope.getExchange()); //消息id System.out.println("消息ID__"+envelope.getDeliveryTag()); //收到的消息 System.out.println("消费者2--接收到的消息为:"+new String(body,"utf-8")); } }; //监听 监听肯定是队列的使用情况 /* * 参数1:队列名称 * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认 * 参数3:消息接收到后回调 */ channel.basicConsume(FanoutProducer.FANOUT_QUEEN_TWO,true,consumer); //不关闭资源,应该一直监听消息 //channel.close(); //connection.close(); } }
先启动两个消费者 再启动生产者
四、采用路由key模式
先启动两个消费者 再启动生产者
生产者
package com.xwtec.mq; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.xwtec.utils.ConnectionUtils; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author Liyongzhe * @date 2021/4/21_9:35 * 定向,把消息交给符合指定routing key 的队列 也称作路由key模式 */ public class DirectProducer { //交换机名称 static final String DIRECTOR_EXCHANGE_ONE = "DIRECTOR_EXCHANGE_ONE"; //队列名称——新增 static final String DIRECTOR_QUEEN_INSERT = "DIRECTOR_QUEEN_INSERT"; //队列名称——修改 static final String DIRECTOR_QUEEN_UPDATE = "DIRECTOR_QUEEN_UPDATE"; public static void main(String[] args) throws IOException, TimeoutException { //获取连接 Connection connection = ConnectionUtils.getConnection(); //创建通道 Channel channel = connection.createChannel(); //声明交换机 交换机的名称 和 采取哪种交换机 channel.exchangeDeclare(DIRECTOR_EXCHANGE_ONE, BuiltinExchangeType.DIRECT); /* * 声明队列 * * 队列名称 * 是否持久化 可以在RabbitMQ崩溃后恢复消息 * 是否独占本次连接 表示一个队列只能被一个消费者占有并消费 * 是否在不使用的时候自动删除队列 * 队列的其他参数 */ channel.queueDeclare(DIRECTOR_QUEEN_INSERT,true,false,false,null); //队列绑定交换机 因为队列与交换机的对应关系是一对一的 交换机与队列的对应关系式一对多的 channel.queueBind(DIRECTOR_QUEEN_INSERT,DIRECTOR_EXCHANGE_ONE,"insert"); channel.queueBind(DIRECTOR_QUEEN_UPDATE,DIRECTOR_EXCHANGE_ONE,"update"); /** * 消息发送 * 定义消息 * 交换机名称 * 路由key * 其他属性 * 消息内容 */ String insertMsg = "小兔子要吃新的胡萝卜——采用routing key模式——insert"; String updateMsg = "小兔子换个胡萝卜吃——采用routing key——update"; channel.basicPublish(DIRECTOR_EXCHANGE_ONE,"insert",null,insertMsg.getBytes()); System.out.println("已发送消息:"+insertMsg); channel.basicPublish(DIRECTOR_EXCHANGE_ONE,"update",null,updateMsg.getBytes()); System.out.println("已发送消息:"+updateMsg); //关闭资源 channel.close(); connection.close(); } }
消费者一
package com.xwtec.mq; import com.rabbitmq.client.*; import com.xwtec.utils.ConnectionUtils; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author Liyongzhe * @date 2021/4/21_10:23 * 定向传递消息 采取路由key模式 */ public class DirectConsumerInsert { public static void main(String[] args) throws IOException, TimeoutException { //获取连接 Connection connection = ConnectionUtils.getConnection(); //设置频道 Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(DirectProducer.DIRECTOR_EXCHANGE_ONE, BuiltinExchangeType.DIRECT); //声明队列 channel.queueDeclare(DirectProducer.DIRECTOR_QUEEN_INSERT,true,false,false,null); //交换机队列绑定 channel.queueBind(DirectProducer.DIRECTOR_QUEEN_INSERT,DirectProducer.DIRECTOR_EXCHANGE_ONE,"insert"); //消费消息 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //路由key System.out.println("路由key__"+envelope.getRoutingKey()); //交换机 System.out.println("交换机__"+envelope.getExchange()); //消息id System.out.println("消息ID__"+envelope.getDeliveryTag()); //收到的消息 System.out.println("insert——定向routing key模式--接收到的消息为:"+new String(body,"utf-8")); } }; //监听消息 /* * 参数1:队列名称 * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消 息,设置为false则需要手动确认 * 参数3:消息接收到后回调 */ channel.basicConsume(DirectProducer.DIRECTOR_QUEEN_INSERT,true,consumer); } }
消费者二
package com.xwtec.mq; import com.rabbitmq.client.*; import com.xwtec.utils.ConnectionUtils; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author Liyongzhe * @date 2021/4/21_10:38 * 定向routing key 模式 */ public class DirectConsumerUpdate { public static void main(String[] args) throws IOException, TimeoutException { //获取连接 Connection connection = ConnectionUtils.getConnection(); //设置频道 Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(DirectProducer.DIRECTOR_EXCHANGE_ONE, BuiltinExchangeType.DIRECT); //声明队列 channel.queueDeclare(DirectProducer.DIRECTOR_QUEEN_UPDATE,true,false,false,null); //交换机队列绑定 channel.queueBind(DirectProducer.DIRECTOR_QUEEN_UPDATE,DirectProducer.DIRECTOR_EXCHANGE_ONE,"update"); //消费消息 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //路由key System.out.println("路由key__"+envelope.getRoutingKey()); //交换机 System.out.println("交换机__"+envelope.getExchange()); //消息id System.out.println("消息ID__"+envelope.getDeliveryTag()); //收到的消息 System.out.println("update——定向routing key模式--接收到的消息为:"+new String(body,"utf-8")); } }; //监听消息 /* * 参数1:队列名称 * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消 息,设置为false则需要手动确认 * 参数3:消息接收到后回调 */ channel.basicConsume(DirectProducer.DIRECTOR_QUEEN_UPDATE,true,consumer); } }
五、Toptic 通配符模式
生产者
package com.xwtec.mq; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.xwtec.utils.ConnectionUtils; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author Liyongzhe * @date 2021/4/21_13:37 * topic通配符模式下的生产者 #:匹配一个或多个单词 *:匹配不多不少恰好一个词 */ public class TopicProducer { //定义交换机 static final String TOPIC_EXCHANGE = "TOPIC_EXCHANGE"; //定义队列 static final String TOPIC_QUEEN_UPDATE = "TOPIC_QUEEN_UPDATE"; static final String TOPIC_QUEEN_DELETE = "TOPIC_QUEEN_DELETE"; public static void main(String[] args) throws IOException, TimeoutException { //获取连接 Connection connection = ConnectionUtils.getConnection(); //设置频道 Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC); //声明队列 channel.queueDeclare(TOPIC_QUEEN_UPDATE,true,false,false,null); channel.queueDeclare(TOPIC_QUEEN_DELETE,true,false,false,null); //此处不做队列绑定交换机 因为 不能设置路由key 一旦绑定 消息直接就从此通道发出。 //发送消息 String message = "小兔子 在TOPIC模式下 【拿】了一根胡萝卜! routing key = topic.insert"; channel.basicPublish(TOPIC_EXCHANGE,"topic.insert",null,message.getBytes()); System.out.println("小兔子 在TOPIC模式下 【拿】了一根胡萝卜! routing key = topic.insert"); message = "小兔子 在TOPIC模式下 【换】了一根胡萝卜! routing key = topic.update"; channel.basicPublish(TOPIC_EXCHANGE,"topic.update",null,message.getBytes()); System.out.println("小兔子 在TOPIC模式下 【换】了一根胡萝卜! routing key = topic.update"); message = "小兔子 在TOPIC模式下 【吃】了一根胡萝卜! routing key = topic.delete"; channel.basicPublish(TOPIC_EXCHANGE,"topic.delete",null,message.getBytes()); System.out.println("小兔子 在TOPIC模式下 【吃】了一根胡萝卜! routing key = topic.delete"); channel.close(); connection.close(); } }
消费者1
package com.xwtec.mq; import com.rabbitmq.client.*; import com.xwtec.utils.ConnectionUtils; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author Liyongzhe * @date 2021/4/21_14:18 */ public class TopicConsumerOne { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(TopicProducer.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC); channel.queueDeclare(TopicProducer.TOPIC_QUEEN_UPDATE,true,false,false,null); channel.queueBind(TopicProducer.TOPIC_QUEEN_UPDATE,TopicProducer.TOPIC_EXCHANGE,"topic.insert"); channel.queueBind(TopicProducer.TOPIC_QUEEN_UPDATE,TopicProducer.TOPIC_EXCHANGE,"topic.update"); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("EXCHANGE:"+envelope.getExchange()); System.out.println("ROUTING KEY:"+envelope.getRoutingKey()); System.out.println("ID:"+envelope.getDeliveryTag()); System.out.println("消费者一接收的消息"+new String(body,"utf-8")); System.out.println("********************************************************"); } }; channel.basicConsume(TopicProducer.TOPIC_QUEEN_UPDATE,true,consumer); } }
消费者2
package com.xwtec.mq; import com.rabbitmq.client.*; import com.xwtec.utils.ConnectionUtils; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author Liyongzhe * @date 2021/4/21_14:18 */ public class TopicConsumerTwo { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(TopicProducer.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC); channel.queueDeclare(TopicProducer.TOPIC_QUEEN_DELETE,true,false,false,null); channel.queueBind(TopicProducer.TOPIC_QUEEN_DELETE,TopicProducer.TOPIC_EXCHANGE,"topic.*"); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("EXCHANGE:"+envelope.getExchange()); System.out.println("ROUTING KEY:"+envelope.getRoutingKey()); System.out.println("ID:"+envelope.getDeliveryTag()); System.out.println("消费者二接收的消息"+new String(body,"utf-8")); System.out.println("********************************************************"); } }; channel.basicConsume(TopicProducer.TOPIC_QUEEN_DELETE,true,consumer); } }