一、helloword

图片说明

生产者

package com.xwtec.mq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xwtec.utils.ConnectionUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Liyongzhe
 * @date 2021/4/20_9:39
 * 消息队列 生产者
 */
public class Producer {

    //定义交换机名称
    static final String EXCHANGE_PRODUCER_ONE = "EXCHANGE_PRODUCER_ONE";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接
        Connection connection = ConnectionUtils.getConnection();
        //创建频道
        Channel channel = connection.createChannel();
        /**
         * 设置连接参数
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接
         * 参数4:是否在不使用的时候自动删除队列
         * 参数5:队列其它参数
         */
        channel.queueDeclare(EXCHANGE_PRODUCER_ONE,true,false,false,null);
            //需要传递的消息
            String catchMSG = "this message need catch out! number_" + i +" put ";
            /*
             * 向server发布一条消息
             * 参数1:exchange名字,若为空则使用默认的exchange
             * 参数2:路由key,简单模式可以传递队列名称
             * 参数3:其他的属性
             * 参数4:消息体
             * RabbitMQ默认有一个exchange,叫default exchange,它用一个空字符串表示,它是direct exchange类型,
             * 任何发往这个exchange的消息都会被路由到routing key的名字对应的队列上,如果没有对应的队列,则消息会被丢弃
             */
            channel.basicPublish("",EXCHANGE_PRODUCER_ONE,null,catchMSG.getBytes());

            System.out.println("已发送消息:" + catchMSG);
        //关闭流资源
        channel.close();
        connection.close();
    }
}

连接工具类

package com.xwtec.utils;


import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Liyongzhe
 * @date 2021/4/20_10:25
 *
 * 抽取连接类
 */
public class ConnectionUtils {

    public static Connection getConnection() throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory rabbitMQConnectionFactory = new ConnectionFactory();
        //主机地址
        rabbitMQConnectionFactory.setHost("localhost");
        //连接端口
        rabbitMQConnectionFactory.setPort(5672);
        //虚拟主机地址
        rabbitMQConnectionFactory.setVirtualHost("/xwtec");
        //设置用户名
        rabbitMQConnectionFactory.setUsername("liyongzhe");
        //设置密码
        rabbitMQConnectionFactory.setPassword("7999");

        //
        return rabbitMQConnectionFactory.newConnection();
    }
}

消费者

package com.xwtec.mq;

import com.rabbitmq.client.*;
import com.xwtec.utils.ConnectionUtils;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.TimeoutException;

/**
 * @author Liyongzhe
 * @date 2021/4/20_10:57
 */
public class ConsumerClone {

    public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接
        Connection connection = ConnectionUtils.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(Producer.EXCHANGE_PRODUCER_ONE,true,false,false,null);
        //设置每次处理两条事情
        channel.basicQos(2);
        //监听事件
        channel.basicConsume(Producer.EXCHANGE_PRODUCER_ONE,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties,byte[] body) throws UnsupportedEncodingException {
                try {
                    //路由key
                    System.out.println("路由key为:" + envelope.getRoutingKey());
                    //交换机
                    System.out.println("交换机为:" + envelope.getExchange());
                    //消息id
                    System.out.println("消息id为:" + envelope.getDeliveryTag());
                    //收到的消息
                    System.out.println("ConsumerClone接收到的消息为:" + new String(body, "utf-8"));
                    Thread.sleep(100);
                    //消息确认 手动消息确认
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        });
    }
}

二、workquen模式

图片说明

生产者

package com.xwtec.mq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xwtec.utils.ConnectionUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Liyongzhe
 * @date 2021/4/20_9:39
 * 消息队列 生产者
 */
public class Producer {

    //定义交换机名称
    static final String EXCHANGE_PRODUCER_ONE = "EXCHANGE_PRODUCER_ONE";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接
        Connection connection = ConnectionUtils.getConnection();
        //创建频道
        Channel channel = connection.createChannel();
        /**
         * 设置连接参数
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接
         * 参数4:是否在不使用的时候自动删除队列
         * 参数5:队列其它参数
         */
        channel.queueDeclare(EXCHANGE_PRODUCER_ONE,true,false,false,null);

        //工作队列模式下的workQueen 创建1000个需要发送的消息
        for (int i = 1; i < 30; i++) {

            //需要传递的消息
            String catchMSG = "this message need catch out! number_" + i +" put ";
            /*
             * 向server发布一条消息
             * 参数1:exchange名字,若为空则使用默认的exchange
             * 参数2:路由key,简单模式可以传递队列名称
             * 参数3:其他的属性
             * 参数4:消息体
             * RabbitMQ默认有一个exchange,叫default exchange,它用一个空字符串表示,它是direct exchange类型,
             * 任何发往这个exchange的消息都会被路由到routing key的名字对应的队列上,如果没有对应的队列,则消息会被丢弃
             */
            channel.basicPublish("",EXCHANGE_PRODUCER_ONE,null,catchMSG.getBytes());

            System.out.println("已发送消息:" + catchMSG);

        }
        //关闭流资源
        channel.close();
        connection.close();
    }
}

消费者1

package com.xwtec.mq;

import com.rabbitmq.client.*;
import com.xwtec.utils.ConnectionUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Liyongzhe
 * @date 2021/4/20_10:31
 *
 * rabbitMQ消费者
 */
public class Consumer {

//    private static final String EXCHANGE_CONSUMER_ONE = "EXCHANGE_CONSUMER_ONE";

    public static void main(String[] args) throws IOException, TimeoutException {

        //获取连接
        Connection connection = ConnectionUtils.getConnection();
        //创建通道
        Channel channel = connection.createChannel();

        // 声明(创建)队列
        /*
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接
         * 参数4:是否在不使用的时候自动删除队列
         * 参数5:队列其它参数
         */
        channel.queueDeclare(Producer.EXCHANGE_PRODUCER_ONE,true,false,false,null);
        //设置一次只能处理一条消息
        channel.basicQos(1);
        //创建消费者;并设置消息处理
        DefaultConsumer consumer = new DefaultConsumer(channel){

            /**
             * consumerTag 消息者标签,在channel.basicConsume时候可以指定
             * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
             (收到消息失败后是否需要重新发送)
             * properties 属性信息
             * body 消息
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    //路由key
                    System.out.println("路由key为:" + envelope.getRoutingKey());
                    //交换机
                    System.out.println("交换机为:" + envelope.getExchange());
                    //消息id
                    System.out.println("消息id为:" + envelope.getDeliveryTag());
                    //收到的消息
                    System.out.println("Consumer接收到的消息为:" + new String(body, "utf-8"));
                    //设置
                    Thread.sleep(1000);
                    //消息确认
                    channel.basicAck(envelope.getDeliveryTag(),false);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        //监听消息
        /*
         * 参数1:队列名称
         * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消
         * 息,设置为false则需要手动确认
         * 参数3:消息接收到后回调
         */
        channel.basicConsume(Producer.EXCHANGE_PRODUCER_ONE, true, consumer);
        //不关闭资源,应该一直监听消息
        //channel.close();
        //connection.close();
    }
}

消费者2

package com.xwtec.mq;

import com.rabbitmq.client.*;
import com.xwtec.utils.ConnectionUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Liyongzhe
 * @date 2021/4/20_10:31
 *
 * rabbitMQ消费者
 */
public class ConsumerTwo {

//    private static final String EXCHANGE_CONSUMER_ONE = "EXCHANGE_CONSUMER_ONE";

    public static void main(String[] args) throws IOException, TimeoutException {

        //获取连接
        Connection connection = ConnectionUtils.getConnection();
        //创建通道
        Channel channel = connection.createChannel();

        // 声明(创建)队列
        /*
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接
         * 参数4:是否在不使用的时候自动删除队列
         * 参数5:队列其它参数
         */
        channel.queueDeclare(Producer.EXCHANGE_PRODUCER_ONE,true,false,false,null);
        //设置一次只能处理一条消息
        channel.basicQos(1);
        //创建消费者;并设置消息处理
        DefaultConsumer consumer = new DefaultConsumer(channel){

            /**
             * consumerTag 消息者标签,在channel.basicConsume时候可以指定
             * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
             (收到消息失败后是否需要重新发送)
             * properties 属性信息
             * body 消息
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    //路由key
                    System.out.println("路由key为:" + envelope.getRoutingKey());
                    //交换机
                    System.out.println("交换机为:" + envelope.getExchange());
                    //消息id
                    System.out.println("消息id为:" + envelope.getDeliveryTag());
                    //收到的消息
                    System.out.println("ConsumerTwo接收到的消息为:" + new String(body, "utf-8"));
                    //设置
                    Thread.sleep(1000);
                    //消息确认
                    channel.basicAck(envelope.getDeliveryTag(),false);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        //监听消息
        /*
         * 参数1:队列名称
         * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消
         * 息,设置为false则需要手动确认
         * 参数3:消息接收到后回调
         */
        channel.basicConsume(Producer.EXCHANGE_PRODUCER_ONE, true, consumer);
        //不关闭资源,应该一直监听消息
        //channel.close();
        //connection.close();
    }
}

先启动两个消费者 再启动生产者 会出现平均竞争模式

三、发布订阅模式

生产者

package com.xwtec.mq;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xwtec.utils.ConnectionUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Liyongzhe
 * @date 2021/4/20_15:06
 * 生产者发布订阅模式
 */
public class FanoutProducer {

    //定义交换机名称
    static final String FANOUT_EXCHANGE_ONE = "FANOUT_EXCHANGE_ONE";
    static final String FANOUT_EXCHANGE_TWO = "FANOUT_EXCHANGE_TWO";
    //定义队列名称
    static final String FANOUT_QUEEN_ONE = "FANOUT_QUEEN_ONE";
    static final String FANOUT_QUEEN_TWO = "FANOUT_QUEEN_TWO";

    public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接
        Connection connection = ConnectionUtils.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        /*
         * 声明交换机
         * 参数1:交换机名称
         * 参数2:交换机类型,fanout、topic、direct、headers
         */
        channel.exchangeDeclare(FANOUT_EXCHANGE_ONE, BuiltinExchangeType.FANOUT);
        /*
         * 声明队列
         * 设置连接参数
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接
         * 参数4:是否在不使用的时候自动删除队列
         * 参数5:队列其它参数
         */
        channel.queueDeclare(FANOUT_QUEEN_ONE,true,false,false,null);
        channel.queueDeclare(FANOUT_QUEEN_TWO,true,false,false,null);
        //队列绑定交换机
        channel.queueBind(FANOUT_QUEEN_ONE,FANOUT_EXCHANGE_ONE,"");
        channel.queueBind(FANOUT_QUEEN_TWO,FANOUT_EXCHANGE_ONE,"");

        for (int i = 1; i <= 10; i++) {
            String fanoutMSG = "RabbitMQ bind exchange by this order "+i;
            /*
             * 向server发布一条消息
             * 参数1:exchange名字,若为空则使用默认的exchange
             * 参数2:路由key,简单模式可以传递队列名称
             * 参数3:其他的属性
             * 参数4:消息体
             * RabbitMQ默认有一个exchange,叫default exchange,它用一个空字符串表示,它是direct exchange类型,
             * 任何发往这个exchange的消息都会被路由到routing key的名字对应的队列上,如果没有对应的队列,则消息会被丢弃
             */
            channel.basicPublish(FANOUT_EXCHANGE_ONE,"",null,fanoutMSG.getBytes());
            System.out.println("已发送信息"+fanoutMSG);
        }
        // 关闭资源
        channel.close();
        connection.close();
    }
}

消费者1

package com.xwtec.mq;

import com.rabbitmq.client.*;
import com.xwtec.utils.ConnectionUtils;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.TimeoutException;

/**
 * @author Liyongzhe
 * @date 2021/4/20_16:17
 * 广播模式下的消费者一
 */
public class FanoutConsumerOne {

    public static void main(String[] args) throws IOException, TimeoutException {

        //获取连接
        Connection connection = ConnectionUtils.getConnection();
        //获取频道
        Channel channel = connection.createChannel();

        /*
         * 声明队列
         * 设置连接参数
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接
         * 参数4:是否在不使用的时候自动删除队列
         * 参数5:队列其它参数
         */
        channel.queueDeclare(FanoutProducer.FANOUT_QUEEN_ONE,true,false,false,null);
        /*
         * 声明交换机
         * 参数1:交换机名称
         * 参数2:交换机类型,fanout、topic、direct、headers
         */
        channel.exchangeDeclare(FanoutProducer.FANOUT_EXCHANGE_ONE, BuiltinExchangeType.FANOUT);
        //队列绑定交换机
        channel.queueBind(FanoutProducer.FANOUT_QUEEN_ONE,FanoutProducer.FANOUT_EXCHANGE_ONE,"");

        //创建消费者 并设置消息处理
        DefaultConsumer consumer = new DefaultConsumer(channel){

            @Override
            /**
             * consumerTag 消息者标签,在channel.basicConsume时候可以指定
             * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
             * (收到消息失败后是否需要重新发送)
             * properties 属性信息
             * body 消息
             */
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties,byte[] body) throws UnsupportedEncodingException {
                //路由key
                System.out.println("路由key__"+envelope.getRoutingKey());
                //交换机
                System.out.println("交换机__"+envelope.getExchange());
                //消息id
                System.out.println("消息ID__"+envelope.getDeliveryTag());
                //收到的消息
                System.out.println("消费者1--接收到的消息为:"+new String(body,"utf-8"));
            }

        };

        //消息监听
        /*
         * 参数1:队列名称
         * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
         * 参数3:消息接收到后回调
         */
        channel.basicConsume(FanoutProducer.FANOUT_QUEEN_ONE,true,consumer);
        //不关闭资源,应该一直监听消息
        //channel.close();
        //connection.close();
    }
}

消费者2

package com.xwtec.mq;

import com.rabbitmq.client.*;
import com.xwtec.utils.ConnectionUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Liyongzhe
 * @date 2021/4/20_16:50
 * 广播模式下消费者二
 */
public class FanoutConsumerTwo {

    public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接
        Connection connection = ConnectionUtils.getConnection();
        //获取频道
        Channel channel = connection.createChannel();
        /*
         * 声明交换机
         * 参数1 交换机名称
         * 参数2 交换机类型
         */
        channel.exchangeDeclare(FanoutProducer.FANOUT_EXCHANGE_ONE, BuiltinExchangeType.FANOUT);
        /*
         * 声明队列
         * 队列
         * 是否持久化队列
         * 是否独占本次连接
         * 是否不在使用时自动删除队列
         * 队列的其他参数
         */
        channel.queueDeclare(FanoutProducer.FANOUT_QUEEN_TWO,true,false,false,null);
        //交换机队列绑定
        channel.queueBind(FanoutProducer.FANOUT_QUEEN_TWO,FanoutProducer.FANOUT_EXCHANGE_ONE,"");
        //创建消费者,设置消息处理
        DefaultConsumer consumer = new DefaultConsumer(channel){

            /**
             *
             * @param consumerTag 消息者标签,在channel.basicConsume时候可以指定
             * @param envelope  包含deliveryTag、exchange、routingKey等信息
             * @param properties BasicProperties对象,即消息生产时设置的该对象特性 属性信息
             * @param body 消息体byte数组
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由key
                System.out.println("路由key__"+envelope.getRoutingKey());
                //交换机
                System.out.println("交换机__"+envelope.getExchange());
                //消息id
                System.out.println("消息ID__"+envelope.getDeliveryTag());
                //收到的消息
                System.out.println("消费者2--接收到的消息为:"+new String(body,"utf-8"));
            }
        };

        //监听 监听肯定是队列的使用情况
        /*
         * 参数1:队列名称
         * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
         * 参数3:消息接收到后回调
         */
        channel.basicConsume(FanoutProducer.FANOUT_QUEEN_TWO,true,consumer);
        //不关闭资源,应该一直监听消息
        //channel.close();
        //connection.close();
    }
}

先启动两个消费者 再启动生产者

四、采用路由key模式

图片说明
先启动两个消费者 再启动生产者

生产者

package com.xwtec.mq;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xwtec.utils.ConnectionUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Liyongzhe
 * @date 2021/4/21_9:35
 * 定向,把消息交给符合指定routing key 的队列 也称作路由key模式
 */
public class DirectProducer {

    //交换机名称
    static final String DIRECTOR_EXCHANGE_ONE = "DIRECTOR_EXCHANGE_ONE";
    //队列名称——新增
    static final String DIRECTOR_QUEEN_INSERT = "DIRECTOR_QUEEN_INSERT";
    //队列名称——修改
    static final String DIRECTOR_QUEEN_UPDATE = "DIRECTOR_QUEEN_UPDATE";
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接
        Connection connection = ConnectionUtils.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //声明交换机 交换机的名称 和 采取哪种交换机
        channel.exchangeDeclare(DIRECTOR_EXCHANGE_ONE, BuiltinExchangeType.DIRECT);
        /*
         * 声明队列
         *
         * 队列名称
         * 是否持久化 可以在RabbitMQ崩溃后恢复消息
         * 是否独占本次连接 表示一个队列只能被一个消费者占有并消费
         * 是否在不使用的时候自动删除队列
         * 队列的其他参数
         */
        channel.queueDeclare(DIRECTOR_QUEEN_INSERT,true,false,false,null);
        //队列绑定交换机  因为队列与交换机的对应关系是一对一的 交换机与队列的对应关系式一对多的
        channel.queueBind(DIRECTOR_QUEEN_INSERT,DIRECTOR_EXCHANGE_ONE,"insert");
        channel.queueBind(DIRECTOR_QUEEN_UPDATE,DIRECTOR_EXCHANGE_ONE,"update");
        /**
         * 消息发送
         *  定义消息
         *  交换机名称
         *  路由key
         *  其他属性
         *  消息内容
         */
        String insertMsg = "小兔子要吃新的胡萝卜——采用routing key模式——insert";
        String updateMsg = "小兔子换个胡萝卜吃——采用routing key——update";

        channel.basicPublish(DIRECTOR_EXCHANGE_ONE,"insert",null,insertMsg.getBytes());
        System.out.println("已发送消息:"+insertMsg);

        channel.basicPublish(DIRECTOR_EXCHANGE_ONE,"update",null,updateMsg.getBytes());
        System.out.println("已发送消息:"+updateMsg);

        //关闭资源
        channel.close();
        connection.close();
    }

}

消费者一

package com.xwtec.mq;

import com.rabbitmq.client.*;
import com.xwtec.utils.ConnectionUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Liyongzhe
 * @date 2021/4/21_10:23
 * 定向传递消息 采取路由key模式
 */
public class DirectConsumerInsert {

    public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接
        Connection connection = ConnectionUtils.getConnection();
        //设置频道
        Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare(DirectProducer.DIRECTOR_EXCHANGE_ONE, BuiltinExchangeType.DIRECT);
        //声明队列
        channel.queueDeclare(DirectProducer.DIRECTOR_QUEEN_INSERT,true,false,false,null);
        //交换机队列绑定
        channel.queueBind(DirectProducer.DIRECTOR_QUEEN_INSERT,DirectProducer.DIRECTOR_EXCHANGE_ONE,"insert");
        //消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由key
                System.out.println("路由key__"+envelope.getRoutingKey());
                //交换机
                System.out.println("交换机__"+envelope.getExchange());
                //消息id
                System.out.println("消息ID__"+envelope.getDeliveryTag());
                //收到的消息
                System.out.println("insert——定向routing key模式--接收到的消息为:"+new String(body,"utf-8"));
            }
        };
        //监听消息
        /*
         * 参数1:队列名称
         * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消
         息,设置为false则需要手动确认
         * 参数3:消息接收到后回调
         */
        channel.basicConsume(DirectProducer.DIRECTOR_QUEEN_INSERT,true,consumer);
    }
}

消费者二

package com.xwtec.mq;

import com.rabbitmq.client.*;
import com.xwtec.utils.ConnectionUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Liyongzhe
 * @date 2021/4/21_10:38
 * 定向routing key 模式
 */
public class DirectConsumerUpdate {

    public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接
        Connection connection = ConnectionUtils.getConnection();
        //设置频道
        Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare(DirectProducer.DIRECTOR_EXCHANGE_ONE, BuiltinExchangeType.DIRECT);
        //声明队列
        channel.queueDeclare(DirectProducer.DIRECTOR_QUEEN_UPDATE,true,false,false,null);
        //交换机队列绑定
        channel.queueBind(DirectProducer.DIRECTOR_QUEEN_UPDATE,DirectProducer.DIRECTOR_EXCHANGE_ONE,"update");
        //消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由key
                System.out.println("路由key__"+envelope.getRoutingKey());
                //交换机
                System.out.println("交换机__"+envelope.getExchange());
                //消息id
                System.out.println("消息ID__"+envelope.getDeliveryTag());
                //收到的消息
                System.out.println("update——定向routing key模式--接收到的消息为:"+new String(body,"utf-8"));
            }
        };
        //监听消息
        /*
         * 参数1:队列名称
         * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消
         息,设置为false则需要手动确认
         * 参数3:消息接收到后回调
         */
        channel.basicConsume(DirectProducer.DIRECTOR_QUEEN_UPDATE,true,consumer);
    }
}

五、Toptic 通配符模式

图片说明
图片说明

生产者

package com.xwtec.mq;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xwtec.utils.ConnectionUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Liyongzhe
 * @date 2021/4/21_13:37
 * topic通配符模式下的生产者  #:匹配一个或多个单词  *:匹配不多不少恰好一个词
 */
public class TopicProducer {
    //定义交换机
    static final String TOPIC_EXCHANGE = "TOPIC_EXCHANGE";
    //定义队列
    static final String TOPIC_QUEEN_UPDATE = "TOPIC_QUEEN_UPDATE";
    static final String TOPIC_QUEEN_DELETE = "TOPIC_QUEEN_DELETE";

    public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接
        Connection connection = ConnectionUtils.getConnection();
        //设置频道
        Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);
        //声明队列
        channel.queueDeclare(TOPIC_QUEEN_UPDATE,true,false,false,null);
        channel.queueDeclare(TOPIC_QUEEN_DELETE,true,false,false,null);
        //此处不做队列绑定交换机 因为 不能设置路由key 一旦绑定 消息直接就从此通道发出。
        //发送消息
        String message = "小兔子 在TOPIC模式下 【拿】了一根胡萝卜!  routing key = topic.insert";
        channel.basicPublish(TOPIC_EXCHANGE,"topic.insert",null,message.getBytes());
        System.out.println("小兔子 在TOPIC模式下 【拿】了一根胡萝卜!  routing key = topic.insert");

        message = "小兔子 在TOPIC模式下 【换】了一根胡萝卜!  routing key = topic.update";
        channel.basicPublish(TOPIC_EXCHANGE,"topic.update",null,message.getBytes());
        System.out.println("小兔子 在TOPIC模式下 【换】了一根胡萝卜!  routing key = topic.update");

        message = "小兔子 在TOPIC模式下 【吃】了一根胡萝卜!  routing key = topic.delete";
        channel.basicPublish(TOPIC_EXCHANGE,"topic.delete",null,message.getBytes());
        System.out.println("小兔子 在TOPIC模式下 【吃】了一根胡萝卜!  routing key = topic.delete");

        channel.close();
        connection.close();

    }
}

消费者1

package com.xwtec.mq;

import com.rabbitmq.client.*;
import com.xwtec.utils.ConnectionUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Liyongzhe
 * @date 2021/4/21_14:18
 */
public class TopicConsumerOne {

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(TopicProducer.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);
        channel.queueDeclare(TopicProducer.TOPIC_QUEEN_UPDATE,true,false,false,null);

        channel.queueBind(TopicProducer.TOPIC_QUEEN_UPDATE,TopicProducer.TOPIC_EXCHANGE,"topic.insert");
        channel.queueBind(TopicProducer.TOPIC_QUEEN_UPDATE,TopicProducer.TOPIC_EXCHANGE,"topic.update");

        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("EXCHANGE:"+envelope.getExchange());
                System.out.println("ROUTING KEY:"+envelope.getRoutingKey());
                System.out.println("ID:"+envelope.getDeliveryTag());
                System.out.println("消费者一接收的消息"+new String(body,"utf-8"));
                System.out.println("********************************************************");
            }
        };
        channel.basicConsume(TopicProducer.TOPIC_QUEEN_UPDATE,true,consumer);
    }
}

消费者2

package com.xwtec.mq;

import com.rabbitmq.client.*;
import com.xwtec.utils.ConnectionUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Liyongzhe
 * @date 2021/4/21_14:18
 */
public class TopicConsumerTwo {

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(TopicProducer.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);
        channel.queueDeclare(TopicProducer.TOPIC_QUEEN_DELETE,true,false,false,null);

        channel.queueBind(TopicProducer.TOPIC_QUEEN_DELETE,TopicProducer.TOPIC_EXCHANGE,"topic.*");

        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("EXCHANGE:"+envelope.getExchange());
                System.out.println("ROUTING KEY:"+envelope.getRoutingKey());
                System.out.println("ID:"+envelope.getDeliveryTag());
                System.out.println("消费者二接收的消息"+new String(body,"utf-8"));
                System.out.println("********************************************************");
            }
        };
        channel.basicConsume(TopicProducer.TOPIC_QUEEN_DELETE,true,consumer);
    }

}