之前已经讲了直连模型,现在开始说这个扇形模型的使用,也就是fanout模型的使用,这个是扇出,也就是广播


这个是有交换机的。

自定义交换机,并且将消息发送给交换机

我们rabbitmq默认是有一些交换机,但是现在我们要自己创建一个交换机,用代码如何进行创建交换机。

public class Provider {
   

    public static void main(String[] args) throws IOException {
   
        Connection connection = RabbitMqUtils.getConnection();
        // 创建通道
        Channel channel = connection.createChannel();
// 将通道声明指定的交换机 参数1:交换机的名称 参数2:交换机的类型 fanout 广播类型
        channel.exchangeDeclare("logs","fanout");
// 发送消息
        channel.basicPublish("logs","",null,"message".getBytes());
// 释放资源
        RabbitMqUtils.closeConnectionAndChannel(channel,connection);

    }
}

以上代码和队列就没有什么关系了,直接是将消息发送到我们自定义的交换机里面的。

运行完代码之后,我们就可以看到界面上面生成了一个我们自定义的交换机

创建临时队列,并且将队列和交换机进行绑定,之后从队列里面拿出数据

也就是这个广播的模型,只要和这个广播模型的交换机相连的队列,那么只要在这个交换机里面发送了消息,所有绑定的队列都会得到消息

public class Customer1 {
   

    public static void main(String[] args) throws IOException {
   
        Connection connection = RabbitMqUtils.getConnection();
        // 创建通道
        Channel channel = connection.createChannel();
// 将通道声明指定的交换机 参数1:交换机的名称 参数2:交换机的类型 fanout 广播类型
        channel.exchangeDeclare("logs","fanout");
// 创建临时队列
        String queue = channel.queueDeclare().getQueue();
// 绑定交换机和队列
        channel.queueBind(queue,"logs","");
// 消费消息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
   
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
   
                System.out.println("消费者1:"+new String(body));
            }
        });
    }
}