消息中间件的一种。可以在分布式系统的不同服务之间进行消息的发送接收。 

消息通信机制(消息模型)

  1. 点对点模式,每个消息只有1个消费者,它的目的地称为queue队列;如果消息发送不成功,此消息默认会保存到ActiveMQ服务端直到有消费者将其消费,所以此消息是不会丢失的。
  2. 发布/订阅模式,每个消息可以有多个消费者,而且订阅一个主题的消费者,只能消费自它订阅之后发布的消息。默认情况只通知一次,如果接受不到此消息就没有了,这种场景使用于对消息发送率要求不高的情况,如果要求消息必须送达不可以丢失的话,需要配置持久订阅。

丢消息怎么办

用持久化消息【可以使用对数据进行持久化JDBC,AMQ(日志文件),KahaDB和LevelDB】,或者非持久化消息及时处理不要堆积。

如何防止消息重复发送

增加消息状态表。相当于一个账本,用来记录消息的处理状态,每次处理消息之前,都去状态表中查询一次,如果已经有相同的消息存在,那么不处理,可以防止重复发送。

消息队列的应用场景

异步处理(并行处理任务),应用解耦(订单系统写入消息队列,库存系统订阅),流量削锋(控制秒杀的人数)和消息通讯(聊天室)四个场景

消息消费方式
同步:发送程序首先向接收程序发起一个请求,称之为发送消息,发送程序紧接着就会堵塞当前自身的进程,不与其他应用进行任何的通信以及交互,等待接收程序的响应,待发送消息得到接收程序的返回消息之后会继续向下运行,进行下一步的业务处理。

异步:两个通信应用之间可以不用同时在线等待,任何一方只需各自处理自己的业务,比如发送方发送消息以后不用登录接收方的响应,可以接着处理其他的任务。也就是说发送方和接收方都是相互独立存在的,发送方只管方,接收方只能接收,无须去等待对方的响应。
ActiveMQ中,消息的产生和消费都是异步的



ActiveMQ的快速应用:https://www.jianshu.com/p/ecdc6eab554c


启动am

cd 到am目录
bin\activemq start











点对点模式的demo

参考:https://blog.csdn.net/HezhezhiyuLe/article/details/84257120 

gradle构建项目,build.gradle里添加:
dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.11'
    testCompile group: 'junit', name: 'junit', version: '4.12'
    // https://mvnrepository.com/artifact/org.apache.activemq/activemq-all
    compile group: 'org.apache.activemq', name: 'activemq-all', version: '5.14.5'
}
创建生产者消费者
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class Producer {

    public static void main(String [] args){
        ActiveMQConnectionFactory connectionFactory;
        Connection conn = null;
        Session session=null;

        try{
            //创建连接工厂
            connectionFactory=new ActiveMQConnectionFactory("admin","admin","tcp://127.0.0.1:61616");
            //创建连接对象
            conn=connectionFactory.createConnection();
            conn.start();
            //创建会话
            //第一个参数:是否支持事务,如果为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED
            //第一个参数为false时,第二个参数的值可为Seesion.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,Session.DUPS_OK_ACKNOWLEDGE
            //AUTO_ACKNOWLEDGE  为自动确认,客户端发送和接收消息不需要做额外的工作
            //CLIENT_ACKNOWLEDGE为客户端确认,客户端接收到消息后,必须调用javax.jms,Messagede1acknow方法
            //DUPS_OK_ACKNOWLEDGE允许副本的确认模式,一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认
            session=conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
            //创建连接目标
            Destination destination = session.createQueue("queue1");
            //创建生产者
            MessageProducer producer = session.createProducer(destination);
            //持久化配置
            //NON_PERSISTENT 当activemq关闭的时候,队列数据将会被清空
            //PERSISTENT 当activemq关闭的时候,队列数据将会保存
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

            //发送消息
            String text = "Hello world!";
            TextMessage message = session.createTextMessage(text);
            producer.send(message);
            conn.close();
        }catch ( JMSException e){
            e.printStackTrace();
        }
    }

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;


public class Consumer {
    public static void main(String [] args){

        ActiveMQConnectionFactory connectionFactory = null;
        Connection conn = null;
        Session session = null;
        MessageConsumer consumer = null;

        try{
            //创建连接工厂
            connectionFactory=new ActiveMQConnectionFactory(
                    "admin",
                    "admin",
                    "tcp://127.0.0.1:61616"
            );
            //创建连接对象
            conn = connectionFactory.createConnection(
                    "admin",
                    "admin");
            conn.start();
            //创建会话
            session=conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
            //创建点对点接收的目标
            Destination destination =session.createQueue("queue1");

            //创建订阅的目标
            //Destination destination = session.createTopic("topic1");

            //创建消费对象
            consumer = session.createConsumer(destination);
            //接收消息
            Message message=consumer.receive(1000);
            if (message instanceof TextMessage){
                System.out.println("收到文本信息"+((TextMessage) message).getText());
            }else{
                System.out.println(message);
            }
            conn.close();
        }catch (JMSException e){
            e.printStackTrace();
        }


    }
}



运行三次后:



发布订阅模式的demo

生产者:
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class TopicProducer {
 public static void main(String[] args)
 {
     try {
         ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
         //2.获取连接
         Connection connection = null;
         connection = connectionFactory.createConnection();
         //3.启动连接
         connection.start();
         //4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         //5.创建主题对象
         Topic topic = session.createTopic("test-topic");
         //6.创建消息生产者
         MessageProducer producer = session.createProducer(topic);
         //7.创建消息
         TextMessage textMessage = session.createTextMessage("我发送了一条消息");
         //8.发送消息
         producer.send(textMessage);
         //9.关闭资源
         producer.close();
         session.close();
         connection.close();
     } catch (JMSException e) {
         e.printStackTrace();
     }
 }
}
消费者(构造了两个):
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.io.IOException;

public class TopicConsumer {
    public static void main(String[] args)
    {
        try {
            //1.创建连接工厂
            ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
            //2.获取连接
            Connection connection = null;
            connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建主题对象
            //Queue queue = session.createQueue("test-queue");
            Topic topic = session.createTopic("test-topic");
            //6.创建消息消费
            MessageConsumer consumer = session.createConsumer(topic);

            //7.监听消息
            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    TextMessage textMessage=(TextMessage)message;
                    try {
                        System.out.println("接收到消息:"+textMessage.getText());
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            });
            //8.等待键盘输入
            System.in.read();
            //9.关闭资源
            consumer.close();
            session.close();
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
}

开启两个消费者


再开启生产者