RabbitMQ-使用Java操作简单队列 simple queues
1.获取连接工具类
为了方便每次获取连接,封装一个简单的工具类
package com.ithzk.rabbitmq.utils;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/** * @author hzk * @date 2018/3/7 */
public class RabbitMQConnectionUtils {
/** * 若connection timeout 连接不上尝试关闭防火墙或者放开端口访问 * @return * @throws IOException * @throws TimeoutException */
public static Connection getConnection() throws IOException, TimeoutException {
//定义一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置服务器地址
connectionFactory.setHost("10.200.52.118");
//AMQP 15672
connectionFactory.setPort(5672);
//vhost
connectionFactory.setVirtualHost("/vhost_hzk");
//设置用户名
connectionFactory.setUsername("hzk");
//设置密码
connectionFactory.setPassword("hzk");
return connectionFactory.newConnection();
}
}
2.消息生产者/发送者
package com.ithzk.rabbitmq.simple;
import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/** * @author hzk * @date 2018/3/7 */
public class Send {
private static final String QUEUE_NAME="test_simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//获取一个连接
Connection connection = RabbitMQConnectionUtils.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
//创建队列申明
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String msg = "simgple queue";
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
System.out.println("send msg:"+msg);
channel.close();
connection.close();
}
}
3.消息消费者/接收者
新、老两种接收消息的方式
package com.ithzk.rabbitmq.simple;
import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/** * @author hzk * @date 2018/3/7 */
public class Recv {
private static final String QUEUE_NAME = "test_simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = RabbitMQConnectionUtils.getConnection();
//从连接中获取通道
Channel channel = connection.createChannel();
//创建队列申明
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
/** * 获取到达消息 * @param consumerTag * @param envelope * @param properties * @param body * @throws IOException */
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("recv msg:" + msg);
}
};
//监听队列
channel.basicConsume(QUEUE_NAME,true,consumer);
}
/** * 老方式接收队列信息 * @throws IOException * @throws TimeoutException * @throws InterruptedException */
@Test
public void oldRecv() throws IOException, TimeoutException, InterruptedException {
//获取连接
Connection connection = RabbitMQConnectionUtils.getConnection();
//从连接中获取通道
Channel channel = connection.createChannel();
//定义队列消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//监听队列
channel.basicConsume(QUEUE_NAME,true,queueingConsumer);
while (true){
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("old Recv msg:"+msg);
}
}
}
ps:如若连接超时 connection timeout 检查防火墙配置,可临时关闭防火墙或者放开对应端口访问
service iptables stop 临时关闭防火墙直到手动启动或者是重启电脑
iptables -I INPUT -p tcp --dport xxx -j ACCEPT 开放端口访问