传统中间件

当前中间件CORBA、DCOM、RMI等RPC中间件技术已广泛应用于各个领域,但是面对规模和复杂度越来越高的分布式系统,这些技术也显示出其局限性。
  • 同步通信:客户发出调用后,必须等待服务对象完成处理并返回结果后才能继续执行
  • 客户和服务对象生命周期都必须正常运行,如果由于服务对象崩溃故障导致客户的请求不可到达,客户会接受到异常
  • 点对点通信:客户的一次调用只发送给某个单独的目标对象

消息中间件

面向消息的中间件(Message Oriented Middleware,MOM)较好解决了以上问题,发哦那个这将消息发送给消息服务器,消息服务器将消息存放在若干队列中,在合适的时候再将消息转发给接收者。
这种模式下,发送和接收是异步的,发送者无需等待,二者的生命周期未必相同,发送消息的时候接收者不一定运行,接收消息的时候发送者也不一定运行;一对多通信,对于一个消息可以有多个接收者。
Java消息服务(JMS)定义了Java中访问消息中间件的接口,JMS只是接口,实现接口的消息中间件为Provider。
Apache的ActiveMQ,阿里的RocketMQ,IBM的MQSeries,微软的MSMQ ,BEA的MessageQ RabbieMQ

中间件提出的目的:




JMS术语

Provider  生产者
Consumer 消费者
PTP 点对点的消息模型
Pub/Sub 发布/订阅的消息模型
Queue 队列目标
Topic    主题目标
ConnectionFactory 连接工厂,创建连接对象实例
Connection 连接对象实例
Destination 消息目的地
Session 会话,一个发送或接收的线程。
Message:

安装使用:

到下面的官网地址下载,包括linux和Windows的不同版本。

解压使用,ActiveMQ目录如下:


进入对应的bin目录下:找到activemq可执行文件,通过cmd,执行activemq start  启动
activemq stop关闭


linux使用:
解压到指定目录:sudo tar zxvf activema-x.x.x-bin.tar.gz
进入到bin目录,执行:   ./activemq start
关闭 :   ./activemq stop

后台管理界面:http://localhost:8161/admin/
用户名和密码均为 admin

ActiveMQ的HelloWorld

准备环境:

  • Java JDK1.7 以上
  • Maven 3.0 以上
  • 开发工具 IDEA
maven依赖:
<build>
<plugins>
<plugin>
<groupId>
org.apache.maven.plugins
</groupId> 
<artifactId>
maven-compiler-plugin
</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration> 
</plugin> </plugins> 
</build> 
<properties> 
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
<spring.version>4.3.10.RELEASE</spring.version> 
</properties> 
<dependencies> 
<dependency> 
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.0</version>
</dependency>
<dependency> 
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId> 
<version>${spring.version}</version>
</dependency> 
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.6.2</version> 
</dependency> 
<dependency> 
<groupId>junit</groupId> 
<artifactId>junit</artifactId> 
<version>4.12</version> 
</dependency> 
<dependency> 
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId> 
<version>4.2.5.RELEASE</version>
</dependency>
</dependencies>
Sender/Receiver:

  1. 建立ConnectionFactory工厂对象,需要填入用户名、密码、以及要连接的地址,默认端口为:tcp://localhost:61616
  2. 通过ConnectionFactory工厂对象创建一个Connection连接,并且调用Connection的start方法开启连接,Connection默认是关闭的。
  3. 通过Connection对象创建Session会话,用于接收消息,参数配置1为是否启用事务,参数配置2为签收模式,一般设置自动签收
  4. 通过Session创建Destination对象,指的是一个客户端用来指定生产消息目标和消息消费来源的对象。在PTP模式下为Queue队列,在Pub/Sub模式为Topic。程序中可以使用多个Queue和Topic。
  5. 我们需要通过Session对象创建消息的发送和接收对象(生产者和消费者)
  6. 我们可以使用MessageProdece的setDeliveryMode方法为其设置持久化特性和非持久化特性
  7. 使用JMS规范的TextMessage形式创建数据(通过Session对象),并用MessageProducer的send方法发送数据,同理客户端使用receive方法进行接收数据,最后不要忘记关闭Connection连接
对于Receiver
从第5步开始更改为消费者创建。


ActiveMQ安全机制:
管理web界面:http://127.0.0.1:8161/admin
安装目录下的conf/jetty-realm.properties修改密码
/activemq.xml  123行之后添加插件配置,符合认证的用户才能发送和获取消息。

API:
  • Connection用过之后要关闭close()
  • Session是一个发送或接收消息的线程,可以创建生产者、消费者、消息。createSession(boolean,int)中第一个参数是事务标识,第二个参数是签收模式。开启事务后,必须使用session.commit()方法表示提交事务。
  • 签收有三种模式:自动签收;调用acknowledge方法签收;不确保传送消息的签收,可能引起消息的重复,但降低了Session的开销。
  • MessageProducer:是一个由Session创建的对象,用来向Destination发送消息。send()重载,参数确定消息优先级,消息过期时间,两种消息传送模式(默认持久化和非持久化)
  • MessageConsumer:是一个由Session创建的对象,用来从Destination接收消息。createConsumer()重载方法参数noLocal默认false,当设置为true,标志只能接收和自己相同的连接(Connection)所发布的消息。此标志只适用于主题,不适用于队列。createDurableSubscriber()重载参数。receive()方法重载,消息的同步接收是指客户端主动去接收消息,客户端可以采用该方法接收向下一个消息。消息的异步接收是指当消息到达时,ActiveMQ主动通知客户端,可以通过注册一个实现MessageListener接口对象到MessageConsumer,只有一个必须实现的方法onMessage,只接收一个参数,即Message,在为每个发送到Destination的消息实现onMessage时调用该方法。
  • Message:(消息头,属性,消息体)  BlobMessage(createBlobMessage(File file) 参数还可以是InputStream RUL )  BytesMessage(createBytesMessage)   MapMessage  ObjectMessage  TextMessage  一般在接收端通过instanceof方法区别数据类型。

Spring和ActiveMQ 整合:

利用消息中间件,异步处理任务的机制,比如异步消费数据、异步发送邮件、异步做查询操作等。

Provider配置文件:

provider模块



Test类



Consumer模块配置文件



ActiveMQ整合Spring

环境
  • jdk1.7以上
  • Maven 3.0以上
  • Spring 4.3.1以上
  • ActiveMQ 5.15.9
项目结构:
子模块聚合的项目,其结构如下:


maven相关依赖:
<properties> 
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
<spring.version>4.3.10.RELEASE</spring.version>
</properties> 
<dependencyManagement> 
<dependencies> 
<dependency> 
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.9</version> </dependency>
<dependency> 
<groupId>org.springframework</groupId> 
<artifactId>spring-jms</artifactId>
<version>${spring.version}</version>
</dependency> 
<dependency> 
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.6.2</version>
</dependency> 
<dependency> 
<groupId>junit</groupId> 
<artifactId>junit</artifactId> 
<version>4.12</version> 
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId> 
<version>4.2.5.RELEASE</version>
</dependency> 
</dependencies> 
</dependencyManagement>
生产者和消费者同前面的代码一样。