与Queue不同的是,Topic实现的是发布/订阅模型,在下面的例子中,启动2个消费者共同监听一个Topic,然后循环给这个Topic中发送多个消息。

[java]  view plain copy
  1. import javax.jms.Connection;  
  2. import javax.jms.JMSException;  
  3. import javax.jms.Message;  
  4. import javax.jms.MessageConsumer;  
  5. import javax.jms.MessageListener;  
  6. import javax.jms.MessageProducer;  
  7. import javax.jms.Session;  
  8. import javax.jms.TextMessage;  
  9. import javax.jms.Topic;  
  10. import org.apache.activemq.ActiveMQConnectionFactory;  
  11. import org.apache.activemq.command.ActiveMQTopic;  
  12.   
  13. public class TopicTest {  
  14.     public static void main(String[] args) throws Exception {  
  15.         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");  
  16.      
  17.         Connection connection = factory.createConnection();  
  18.         connection.start();  
  19.          
  20.         //创建一个Topic  
  21.         Topic topic= new ActiveMQTopic("testTopic");  
  22.         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
  23.           
  24.         //注册消费者1  
  25.         MessageConsumer comsumer1 = session.createConsumer(topic);  
  26.         comsumer1.setMessageListener(new MessageListener(){  
  27.             public void onMessage(Message m) {  
  28.                 try {  
  29.                     System.out.println("Consumer1 get " + ((TextMessage)m).getText());  
  30.                 } catch (JMSException e) {  
  31.                     e.printStackTrace();  
  32.                 }  
  33.             }  
  34.         });  
  35.          
  36.         //注册消费者2  
  37.         MessageConsumer comsumer2 = session.createConsumer(topic);  
  38.         comsumer2.setMessageListener(new MessageListener(){  
  39.             public void onMessage(Message m) {  
  40.                 try {  
  41.                     System.out.println("Consumer2 get " + ((TextMessage)m).getText());  
  42.                 } catch (JMSException e) {  
  43.                     e.printStackTrace();  
  44.                 }  
  45.             }  
  46.              
  47.         });  
  48.          
  49.         //创建一个生产者,然后发送多个消息。  
  50.         MessageProducer producer = session.createProducer(topic);  
  51.         for(int i=0; i<10; i++){  
  52.             producer.send(session.createTextMessage("Message:" + i));  
  53.         }  
  54.     }  
  55. }  

运行后得到下面的输出结果:

[java]  view plain copy
  1. Consumer1 get Message:0  
  2. Consumer2 get Message:0  
  3. Consumer1 get Message:1  
  4. Consumer2 get Message:1  
  5. Consumer1 get Message:2  
  6. Consumer2 get Message:2  
  7. Consumer1 get Message:3  
  8. Consumer2 get Message:3  
  9. Consumer1 get Message:4  
  10. Consumer2 get Message:4  
  11. Consumer1 get Message:5  
  12. Consumer2 get Message:5  
  13. Consumer1 get Message:6  
  14. Consumer2 get Message:6  
  15. Consumer1 get Message:7  
  16. Consumer2 get Message:7  
  17. Consumer1 get Message:8  
  18. Consumer2 get Message:8  
  19. Consumer1 get Message:9  
  20. Consumer2 get Message:9  

说明每一个消息都会被所有的消费者消费。