• 导入依赖包

    public class ActiveMQUtil {
    
      PooledConnectionFactory pooledConnectionFactory = null;
    
      public  void init(String brokerUrl){
          ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerUrl);
          pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory);
          //设置超时时间
          pooledConnectionFactory.setExpiryTimeout(2000);
          // 设置出现异常的时候,继续重试连接
          pooledConnectionFactory.setReconnectOnException(true);
          // 设置最大连接数
          pooledConnectionFactory.setMaxConnections(5);
      }
      // 获取连接
      public Connection getConnection(){
          Connection connection = null;
          try {
              connection = pooledConnectionFactory.createConnection();
          } catch (JMSException e) {
              e.printStackTrace();
          }
          return  connection;
      }
    }
  • 写工具类

    public class ActiveMQUtil {
    
      PooledConnectionFactory pooledConnectionFactory = null;
    
      public  void init(String brokerUrl){
          ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerUrl);
          pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory);
          //设置超时时间
          pooledConnectionFactory.setExpiryTimeout(2000);
          // 设置出现异常的时候,继续重试连接
          pooledConnectionFactory.setReconnectOnException(true);
          // 设置最大连接数
          pooledConnectionFactory.setMaxConnections(5);
      }
      // 获取连接
      public Connection getConnection(){
          Connection connection = null;
          try {
              connection = pooledConnectionFactory.createConnection();
          } catch (JMSException e) {
              e.printStackTrace();
          }
          return  connection;
      }
    }
  • 写配置类,直接注解注入

    @Configuration
    public class ActiveMQConfig {
    
      @Value("${spring.activemq.broker-url:disabled}")
      String brokerURL ;
    
      @Value("${activemq.listener.enable:disabled}")
      String listenerEnable;
    
      // 获取activeMQUtil
      @Bean
      public ActiveMQUtil getActiveMQUtil(){
          if ("disabled".equals(brokerURL)){
              return null;
          }
          ActiveMQUtil activeMQUtil = new ActiveMQUtil();
          activeMQUtil.init(brokerURL);
          return  activeMQUtil;
      }
    
      @Bean(name = "jmsQueueListener")
      public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory) {
    
          if("disabled".equals(listenerEnable)){
              return null;
          }
          DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
          factory.setConnectionFactory(activeMQConnectionFactory);
          // 设置事务
          factory.setSessionTransacted(false);
          // 手动签收
          factory.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
          // 设置并发数
          factory.setConcurrency("5");
          // 重连间隔时间
          factory.setRecoveryInterval(5000L);
          return factory;
      }
      // 接收消息
      @Bean
      public ActiveMQConnectionFactory activeMQConnectionFactory (){
          ActiveMQConnectionFactory activeMQConnectionFactory =
                  new ActiveMQConnectionFactory(brokerURL);
          return activeMQConnectionFactory;
      }
  • 发消息的controller:实现发送一个success及orderid的message到PAYMENT_TO_ORDER队列里面,***取到message,拿到里面的success

    @GetMapping("sendPayment")
      @ResponseBody
      public String sendPayment(String orderId){
    
          paymentInfoService.sendPaymentToOrder(orderId,"success");
          return "success";
      }
  • 发消息的service

     @Override
      public void sendPaymentToOrder(String orderId,String result) {
          Connection connection = activeMQUtil.getConnection();//建立连接
    
          try {
              Session session = connection.createSession(true, Session.SESSION_TRANSACTED);//创建会话
    
              MessageProducer producer = session.createProducer(session.createQueue("PAYMENT_TO_ORDER"));//定义队列名称
              MapMessage mapMessage = new ActiveMQMapMessage();
              mapMessage.setString("orderId",orderId);//订单编号
              mapMessage.setString("result",result);//结果
              producer.send(mapMessage);//发送消息
              session.commit();//提交会话
              connection.close();//关闭连接
    
          } catch (JMSException e) {
              e.printStackTrace();
          }
      }
  • 收消息的***

    @JmsListener(destination = "SKU_DEDUCT_QUEUE",containerFactory = "jmsQueueListener" )
      public  void  consumeWareDeduct(MapMessage mapMessage) throws JMSException {
          // 更新订单状态
          String orderId = mapMessage.getString("orderId");
          String status = mapMessage.getString("status");
          if("DEDUCTED".equals(status)){
              orderService.updateStatus(orderId,ProcessStatus.WAITING_DELEVER);
          }else{
              orderService.updateStatus(orderId,ProcessStatus.STOCK_EXCEPTION);
          }
      }