导入依赖包
public class ActiveMQUtil {
PooledConnectionFactory pooledConnectionFactory = null;
public void init(String brokerUrl){
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerUrl);
pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory);
//设置超时时间
pooledConnectionFactory.setExpiryTimeout(2000);
// 设置出现异常的时候,继续重试连接
pooledConnectionFactory.setReconnectOnException(true);
// 设置最大连接数
pooledConnectionFactory.setMaxConnections(5);
}
// 获取连接
public Connection getConnection(){
Connection connection = null;
try {
connection = pooledConnectionFactory.createConnection();
} catch (JMSException e) {
e.printStackTrace();
}
return connection;
}
}
写工具类
public class ActiveMQUtil {
PooledConnectionFactory pooledConnectionFactory = null;
public void init(String brokerUrl){
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerUrl);
pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory);
//设置超时时间
pooledConnectionFactory.setExpiryTimeout(2000);
// 设置出现异常的时候,继续重试连接
pooledConnectionFactory.setReconnectOnException(true);
// 设置最大连接数
pooledConnectionFactory.setMaxConnections(5);
}
// 获取连接
public Connection getConnection(){
Connection connection = null;
try {
connection = pooledConnectionFactory.createConnection();
} catch (JMSException e) {
e.printStackTrace();
}
return connection;
}
}
写配置类,直接注解注入
@Configuration
public class ActiveMQConfig {
@Value("${spring.activemq.broker-url:disabled}")
String brokerURL ;
@Value("${activemq.listener.enable:disabled}")
String listenerEnable;
// 获取activeMQUtil
@Bean
public ActiveMQUtil getActiveMQUtil(){
if ("disabled".equals(brokerURL)){
return null;
}
ActiveMQUtil activeMQUtil = new ActiveMQUtil();
activeMQUtil.init(brokerURL);
return activeMQUtil;
}
@Bean(name = "jmsQueueListener")
public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory) {
if("disabled".equals(listenerEnable)){
return null;
}
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(activeMQConnectionFactory);
// 设置事务
factory.setSessionTransacted(false);
// 手动签收
factory.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
// 设置并发数
factory.setConcurrency("5");
// 重连间隔时间
factory.setRecoveryInterval(5000L);
return factory;
}
// 接收消息
@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory (){
ActiveMQConnectionFactory activeMQConnectionFactory =
new ActiveMQConnectionFactory(brokerURL);
return activeMQConnectionFactory;
}
发消息的controller:实现发送一个success及orderid的message到PAYMENT_TO_ORDER队列里面,***取到message,拿到里面的success
@GetMapping("sendPayment")
@ResponseBody
public String sendPayment(String orderId){
paymentInfoService.sendPaymentToOrder(orderId,"success");
return "success";
}
发消息的service
@Override
public void sendPaymentToOrder(String orderId,String result) {
Connection connection = activeMQUtil.getConnection();//建立连接
try {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);//创建会话
MessageProducer producer = session.createProducer(session.createQueue("PAYMENT_TO_ORDER"));//定义队列名称
MapMessage mapMessage = new ActiveMQMapMessage();
mapMessage.setString("orderId",orderId);//订单编号
mapMessage.setString("result",result);//结果
producer.send(mapMessage);//发送消息
session.commit();//提交会话
connection.close();//关闭连接
} catch (JMSException e) {
e.printStackTrace();
}
}
收消息的***
@JmsListener(destination = "SKU_DEDUCT_QUEUE",containerFactory = "jmsQueueListener" )
public void consumeWareDeduct(MapMessage mapMessage) throws JMSException {
// 更新订单状态
String orderId = mapMessage.getString("orderId");
String status = mapMessage.getString("status");
if("DEDUCTED".equals(status)){
orderService.updateStatus(orderId,ProcessStatus.WAITING_DELEVER);
}else{
orderService.updateStatus(orderId,ProcessStatus.STOCK_EXCEPTION);
}
}