一、apache/rocketMQ-spring版本

这个版本,是基于官网,包装,集成Springboot等特点,可能存在着依赖复杂,维护周期的问题(无意冒犯该作者,自己使用的时候,遇到不确定的问题,依赖版本问题,遇到一些棘手的问题)

1.1、pom.xml最小依赖

 <dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>${rocketmq-spring-boot-starter-version}</version>
        </dependency>
    </dependencies>

不过要注意,rocketmq-spring-boot-starter,依赖着

 <parent>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-parent</artifactId>
        <version>2.0.3</version>
        <relativePath>../rocketmq-spring-boot-parent/pom.xml</relativePath>
    </parent>

    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <packaging>jar</packaging>

    <name>RocketMQ Spring Boot Starter</name>
    <description>SRocketMQ Spring Boot Starter</description>
    <url>https://github.com/apache/rocketmq-spring</url>

    <dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-validation</artifactId>
        </dependency>
    </dependencies>

还有,rocketmq-spring-boot,依赖着

<dependencies>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-api</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-acl</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-messaging</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-aop</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>
    </dependencies>

1.2、API操作

生产者基于RocketMQTemplate

消费者

@Service
@RocketMQMessageListener(topic = "${demo.rocketmq.topic}", consumerGroup = "string_consumer")
public class StringConsumer implements RocketMQListener<String> {
   
    @Override
    public void onMessage(String message) {
   
        System.out.printf("------- StringConsumer received: %s \n", message);
    }
}

1.3、

1.4、

1.5、

二、apache/rocketMQ官网版本

2.1、pom.xml最小依赖

 <dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-common</artifactId>
            <version>4.3.0</version>
        </dependency>
    </dependencies>

2.2、API

生产者

public class SyncProducer {
   
    public static void main(String[] args) throws Exception {
   

        // 定义 生产者的组,一般一个broker对应一个 group,也可以多个broker指定同一个Group
// DefaultMQProducer producer = new DefaultMQProducer("group-1");
        DefaultMQProducer producer = new DefaultMQProducer("SyncGroup");
        // NameServer 地址
        producer.setNamesrvAddr("192.168.1.79:9876");
        // 加载生产者实例
        producer.start();
        // 创建一个消息(Topic,Tag,Body[]),消息内容是bytes数组形式传输
// Message msg = new Message("BenchmarkTest","TagA",("消息 body").getBytes(RemotingHelper.DEFAULT_CHARSET));

        // 这里测试到,如果这里指定了服务端不存在的 Topic,会有如下异常
        // No route info of this topic, TL

// producer.createTopic("accessKey","TL",2); // 可创建Topic,指定分片 queue num

        Message msg = new Message("BenchmarkTest","TagA",("消息 body").getBytes(RemotingHelper.DEFAULT_CHARSET));

        //调用broker,生产消息
        SendResult sendResult = producer.send(msg);
        System.out.println("生产消息反馈: "+ sendResult.getSendStatus());

        // 生产完毕,shutdown
        producer.shutdown();
    }
}

消费者

public class Consumer {
   

    public static void main(String[] args) throws InterruptedException, MQClientException {
   

        // 实例消费者 Group
// DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group-1");
// DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SyncGroup");
// DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OnewayProducer_group");
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("AsyncProducerGroup");

        // 指定NameServer地址
        consumer.setNamesrvAddr("192.168.1.79:9876");

        // 订阅Topic下,指定Tag,或者 所有的
// consumer.subscribe("BenchmarkTest", "*");
// consumer.subscribe("AsyncTopic", "*");
        consumer.subscribe("AsyncTopic", "*");
        // Register callback to execute on arrival of messages fetched from brokers.
        consumer.registerMessageListener(new MessageListenerConcurrently() {
   

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
   
// String s = Arrays.toString(msgs);
                System.out.printf("%s 接收的消息: %n", msgs.get(0));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //Launch the consumer instance.
        consumer.start();

        System.out.printf("Consumer Started.");
    }
}