消息中间件的一种。可以在分布式系统的不同服务之间进行消息的发送和接收。
消息通信机制(消息模型)
- 点对点模式,每个消息只有1个消费者,它的目的地称为queue队列;如果消息发送不成功,此消息默认会保存到ActiveMQ服务端直到有消费者将其消费,所以此消息是不会丢失的。
- 发布/订阅模式,每个消息可以有多个消费者,而且订阅一个主题的消费者,只能消费自它订阅之后发布的消息。默认情况只通知一次,如果接受不到此消息就没有了,这种场景使用于对消息发送率要求不高的情况,如果要求消息必须送达不可以丢失的话,需要配置持久订阅。
丢消息怎么办
用持久化消息【可以使用对数据进行持久化JDBC,AMQ(日志文件),KahaDB和LevelDB】,或者非持久化消息及时处理不要堆积。
如何防止消息重复发送
增加消息状态表。相当于一个账本,用来记录消息的处理状态,每次处理消息之前,都去状态表中查询一次,如果已经有相同的消息存在,那么不处理,可以防止重复发送。
消息队列的应用场景
异步处理(并行处理任务),应用解耦(订单系统写入消息队列,库存系统订阅),流量削锋(控制秒杀的人数)和消息通讯(聊天室)四个场景
消息消费方式
同步:发送程序首先向接收程序发起一个请求,称之为发送消息,发送程序紧接着就会堵塞当前自身的进程,不与其他应用进行任何的通信以及交互,等待接收程序的响应,待发送消息得到接收程序的返回消息之后会继续向下运行,进行下一步的业务处理。
异步:两个通信应用之间可以不用同时在线等待,任何一方只需各自处理自己的业务,比如发送方发送消息以后不用登录接收方的响应,可以接着处理其他的任务。也就是说发送方和接收方都是相互独立存在的,发送方只管方,接收方只能接收,无须去等待对方的响应。
ActiveMQ中,消息的产生和消费都是异步的
点对点模式的demo
参考:https://blog.csdn.net/HezhezhiyuLe/article/details/84257120
gradle构建项目,build.gradle里添加:
dependencies {
testCompile group: 'junit', name: 'junit', version: '4.11'
testCompile group: 'junit', name: 'junit', version: '4.12'
// https://mvnrepository.com/artifact/org.apache.activemq/activemq-all
compile group: 'org.apache.activemq', name: 'activemq-all', version: '5.14.5'
} 创建生产者消费者 import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Producer {
public static void main(String [] args){
ActiveMQConnectionFactory connectionFactory;
Connection conn = null;
Session session=null;
try{
//创建连接工厂
connectionFactory=new ActiveMQConnectionFactory("admin","admin","tcp://127.0.0.1:61616");
//创建连接对象
conn=connectionFactory.createConnection();
conn.start();
//创建会话
//第一个参数:是否支持事务,如果为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED
//第一个参数为false时,第二个参数的值可为Seesion.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,Session.DUPS_OK_ACKNOWLEDGE
//AUTO_ACKNOWLEDGE 为自动确认,客户端发送和接收消息不需要做额外的工作
//CLIENT_ACKNOWLEDGE为客户端确认,客户端接收到消息后,必须调用javax.jms,Messagede1acknow方法
//DUPS_OK_ACKNOWLEDGE允许副本的确认模式,一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认
session=conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
//创建连接目标
Destination destination = session.createQueue("queue1");
//创建生产者
MessageProducer producer = session.createProducer(destination);
//持久化配置
//NON_PERSISTENT 当activemq关闭的时候,队列数据将会被清空
//PERSISTENT 当activemq关闭的时候,队列数据将会保存
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//发送消息
String text = "Hello world!";
TextMessage message = session.createTextMessage(text);
producer.send(message);
conn.close();
}catch ( JMSException e){
e.printStackTrace();
}
} import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Consumer {
public static void main(String [] args){
ActiveMQConnectionFactory connectionFactory = null;
Connection conn = null;
Session session = null;
MessageConsumer consumer = null;
try{
//创建连接工厂
connectionFactory=new ActiveMQConnectionFactory(
"admin",
"admin",
"tcp://127.0.0.1:61616"
);
//创建连接对象
conn = connectionFactory.createConnection(
"admin",
"admin");
conn.start();
//创建会话
session=conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
//创建点对点接收的目标
Destination destination =session.createQueue("queue1");
//创建订阅的目标
//Destination destination = session.createTopic("topic1");
//创建消费对象
consumer = session.createConsumer(destination);
//接收消息
Message message=consumer.receive(1000);
if (message instanceof TextMessage){
System.out.println("收到文本信息"+((TextMessage) message).getText());
}else{
System.out.println(message);
}
conn.close();
}catch (JMSException e){
e.printStackTrace();
}
}
} 运行三次后:
发布订阅模式的demo
生产者:
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class TopicProducer {
public static void main(String[] args)
{
try {
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
//2.获取连接
Connection connection = null;
connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取session (参数1:是否启动事务,参数2:消息确认模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建主题对象
Topic topic = session.createTopic("test-topic");
//6.创建消息生产者
MessageProducer producer = session.createProducer(topic);
//7.创建消息
TextMessage textMessage = session.createTextMessage("我发送了一条消息");
//8.发送消息
producer.send(textMessage);
//9.关闭资源
producer.close();
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
} 消费者(构造了两个): import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
public class TopicConsumer {
public static void main(String[] args)
{
try {
//1.创建连接工厂
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
//2.获取连接
Connection connection = null;
connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取session (参数1:是否启动事务,参数2:消息确认模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建主题对象
//Queue queue = session.createQueue("test-queue");
Topic topic = session.createTopic("test-topic");
//6.创建消息消费
MessageConsumer consumer = session.createConsumer(topic);
//7.监听消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage=(TextMessage)message;
try {
System.out.println("接收到消息:"+textMessage.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
//8.等待键盘输入
System.in.read();
//9.关闭资源
consumer.close();
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
} 开启两个消费者
再开启生产者

京公网安备 11010502036488号