消息中间件的一种。可以在分布式系统的不同服务之间进行消息的发送和接收。
消息通信机制(消息模型)
- 点对点模式,每个消息只有1个消费者,它的目的地称为queue队列;如果消息发送不成功,此消息默认会保存到ActiveMQ服务端直到有消费者将其消费,所以此消息是不会丢失的。
- 发布/订阅模式,每个消息可以有多个消费者,而且订阅一个主题的消费者,只能消费自它订阅之后发布的消息。默认情况只通知一次,如果接受不到此消息就没有了,这种场景使用于对消息发送率要求不高的情况,如果要求消息必须送达不可以丢失的话,需要配置持久订阅。
丢消息怎么办
用持久化消息【可以使用对数据进行持久化JDBC,AMQ(日志文件),KahaDB和LevelDB】,或者非持久化消息及时处理不要堆积。
如何防止消息重复发送
增加消息状态表。相当于一个账本,用来记录消息的处理状态,每次处理消息之前,都去状态表中查询一次,如果已经有相同的消息存在,那么不处理,可以防止重复发送。
消息队列的应用场景
异步处理(并行处理任务),应用解耦(订单系统写入消息队列,库存系统订阅),流量削锋(控制秒杀的人数)和消息通讯(聊天室)四个场景
消息消费方式
同步:发送程序首先向接收程序发起一个请求,称之为发送消息,发送程序紧接着就会堵塞当前自身的进程,不与其他应用进行任何的通信以及交互,等待接收程序的响应,待发送消息得到接收程序的返回消息之后会继续向下运行,进行下一步的业务处理。
异步:两个通信应用之间可以不用同时在线等待,任何一方只需各自处理自己的业务,比如发送方发送消息以后不用登录接收方的响应,可以接着处理其他的任务。也就是说发送方和接收方都是相互独立存在的,发送方只管方,接收方只能接收,无须去等待对方的响应。
ActiveMQ中,消息的产生和消费都是异步的
点对点模式的demo
参考:https://blog.csdn.net/HezhezhiyuLe/article/details/84257120
gradle构建项目,build.gradle里添加:
dependencies { testCompile group: 'junit', name: 'junit', version: '4.11' testCompile group: 'junit', name: 'junit', version: '4.12' // https://mvnrepository.com/artifact/org.apache.activemq/activemq-all compile group: 'org.apache.activemq', name: 'activemq-all', version: '5.14.5' }创建生产者消费者
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class Producer { public static void main(String [] args){ ActiveMQConnectionFactory connectionFactory; Connection conn = null; Session session=null; try{ //创建连接工厂 connectionFactory=new ActiveMQConnectionFactory("admin","admin","tcp://127.0.0.1:61616"); //创建连接对象 conn=connectionFactory.createConnection(); conn.start(); //创建会话 //第一个参数:是否支持事务,如果为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED //第一个参数为false时,第二个参数的值可为Seesion.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,Session.DUPS_OK_ACKNOWLEDGE //AUTO_ACKNOWLEDGE 为自动确认,客户端发送和接收消息不需要做额外的工作 //CLIENT_ACKNOWLEDGE为客户端确认,客户端接收到消息后,必须调用javax.jms,Messagede1acknow方法 //DUPS_OK_ACKNOWLEDGE允许副本的确认模式,一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认 session=conn.createSession(false,Session.AUTO_ACKNOWLEDGE); //创建连接目标 Destination destination = session.createQueue("queue1"); //创建生产者 MessageProducer producer = session.createProducer(destination); //持久化配置 //NON_PERSISTENT 当activemq关闭的时候,队列数据将会被清空 //PERSISTENT 当activemq关闭的时候,队列数据将会保存 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //发送消息 String text = "Hello world!"; TextMessage message = session.createTextMessage(text); producer.send(message); conn.close(); }catch ( JMSException e){ e.printStackTrace(); } }
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class Consumer { public static void main(String [] args){ ActiveMQConnectionFactory connectionFactory = null; Connection conn = null; Session session = null; MessageConsumer consumer = null; try{ //创建连接工厂 connectionFactory=new ActiveMQConnectionFactory( "admin", "admin", "tcp://127.0.0.1:61616" ); //创建连接对象 conn = connectionFactory.createConnection( "admin", "admin"); conn.start(); //创建会话 session=conn.createSession(false,Session.AUTO_ACKNOWLEDGE); //创建点对点接收的目标 Destination destination =session.createQueue("queue1"); //创建订阅的目标 //Destination destination = session.createTopic("topic1"); //创建消费对象 consumer = session.createConsumer(destination); //接收消息 Message message=consumer.receive(1000); if (message instanceof TextMessage){ System.out.println("收到文本信息"+((TextMessage) message).getText()); }else{ System.out.println(message); } conn.close(); }catch (JMSException e){ e.printStackTrace(); } } }
运行三次后:
发布订阅模式的demo
生产者:
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class TopicProducer { public static void main(String[] args) { try { ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); //2.获取连接 Connection connection = null; connection = connectionFactory.createConnection(); //3.启动连接 connection.start(); //4.获取session (参数1:是否启动事务,参数2:消息确认模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建主题对象 Topic topic = session.createTopic("test-topic"); //6.创建消息生产者 MessageProducer producer = session.createProducer(topic); //7.创建消息 TextMessage textMessage = session.createTextMessage("我发送了一条消息"); //8.发送消息 producer.send(textMessage); //9.关闭资源 producer.close(); session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } } }消费者(构造了两个):
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; import java.io.IOException; public class TopicConsumer { public static void main(String[] args) { try { //1.创建连接工厂 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); //2.获取连接 Connection connection = null; connection = connectionFactory.createConnection(); //3.启动连接 connection.start(); //4.获取session (参数1:是否启动事务,参数2:消息确认模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建主题对象 //Queue queue = session.createQueue("test-queue"); Topic topic = session.createTopic("test-topic"); //6.创建消息消费 MessageConsumer consumer = session.createConsumer(topic); //7.监听消息 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage=(TextMessage)message; try { System.out.println("接收到消息:"+textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); //8.等待键盘输入 System.in.read(); //9.关闭资源 consumer.close(); session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }
开启两个消费者
再开启生产者