rocketMQ学习记录
一、apache/rocketMQ-spring版本
这个版本,是基于官网,包装,集成Springboot等特点,可能存在着依赖复杂,维护周期的问题(无意冒犯该作者,自己使用的时候,遇到不确定的问题,依赖版本问题,遇到一些棘手的问题)
1.1、pom.xml最小依赖
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq-spring-boot-starter-version}</version>
</dependency>
</dependencies>
不过要注意,rocketmq-spring-boot-starter,依赖着
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-parent</artifactId>
<version>2.0.3</version>
<relativePath>../rocketmq-spring-boot-parent/pom.xml</relativePath>
</parent>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<packaging>jar</packaging>
<name>RocketMQ Spring Boot Starter</name>
<description>SRocketMQ Spring Boot Starter</description>
<url>https://github.com/apache/rocketmq-spring</url>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
</dependencies>
还有,rocketmq-spring-boot,依赖着
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies>
1.2、API操作
生产者基于RocketMQTemplate
消费者
@Service
@RocketMQMessageListener(topic = "${demo.rocketmq.topic}", consumerGroup = "string_consumer")
public class StringConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.printf("------- StringConsumer received: %s \n", message);
}
}
1.3、
1.4、
1.5、
二、apache/rocketMQ官网版本
2.1、pom.xml最小依赖
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-common</artifactId>
<version>4.3.0</version>
</dependency>
</dependencies>
2.2、API
生产者
public class SyncProducer {
public static void main(String[] args) throws Exception {
// 定义 生产者的组,一般一个broker对应一个 group,也可以多个broker指定同一个Group
// DefaultMQProducer producer = new DefaultMQProducer("group-1");
DefaultMQProducer producer = new DefaultMQProducer("SyncGroup");
// NameServer 地址
producer.setNamesrvAddr("192.168.1.79:9876");
// 加载生产者实例
producer.start();
// 创建一个消息(Topic,Tag,Body[]),消息内容是bytes数组形式传输
// Message msg = new Message("BenchmarkTest","TagA",("消息 body").getBytes(RemotingHelper.DEFAULT_CHARSET));
// 这里测试到,如果这里指定了服务端不存在的 Topic,会有如下异常
// No route info of this topic, TL
// producer.createTopic("accessKey","TL",2); // 可创建Topic,指定分片 queue num
Message msg = new Message("BenchmarkTest","TagA",("消息 body").getBytes(RemotingHelper.DEFAULT_CHARSET));
//调用broker,生产消息
SendResult sendResult = producer.send(msg);
System.out.println("生产消息反馈: "+ sendResult.getSendStatus());
// 生产完毕,shutdown
producer.shutdown();
}
}
消费者
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// 实例消费者 Group
// DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group-1");
// DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SyncGroup");
// DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OnewayProducer_group");
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("AsyncProducerGroup");
// 指定NameServer地址
consumer.setNamesrvAddr("192.168.1.79:9876");
// 订阅Topic下,指定Tag,或者 所有的
// consumer.subscribe("BenchmarkTest", "*");
// consumer.subscribe("AsyncTopic", "*");
consumer.subscribe("AsyncTopic", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// String s = Arrays.toString(msgs);
System.out.printf("%s 接收的消息: %n", msgs.get(0));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.");
}
}