RocketMQ原生API收发消息代码样例 pom文件 新建 maven 项目或 module,添加 rocketmq-client
依赖。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 <?xml version="1.0" encoding="UTF-8" ?> <project xmlns ="http://maven.apache.org/POM/4.0.0" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation ="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > <modelVersion > 4.0.0</modelVersion > <groupId > cn.tedu</groupId > <artifactId > demo1</artifactId > <version > 1.0-SNAPSHOT</version > <dependencies > <dependency > <groupId > org.apache.rocketmq</groupId > <artifactId > rocketmq-client</artifactId > <version > 4.7.1</version > </dependency > <dependency > <groupId > org.apache.rocketmq</groupId > <artifactId > rocketmq-store</artifactId > <version > 4.7.1</version > </dependency > </dependencies > <build > <plugins > <plugin > <groupId > org.apache.maven.plugins</groupId > <artifactId > maven-compiler-plugin</artifactId > <version > 3.8.0</version > <configuration > <source > 1.8</source > <target > 1.8</target > </configuration > </plugin > </plugins > </build > </project >
同步消息
同步消息发送要保证强一致性,发到master的消息向slave复制后,才会向生产者发送反馈信息。
这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 package demo1;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;import java.util.Scanner;public class Producer { public static void main (String[] args) throws Exception { DefaultMQProducer p = new DefaultMQProducer ("producer-demo1" ); p.setNamesrvAddr("192.168.64.151:9876:192.168.64.152:9876" ); p.start(); String topic = "Topic1" ; String tag = "TagA" ; while (true ) { System.out.print("输入消息,用逗号分隔多条消息: " ); String[] a = new Scanner (System.in).nextLine().split("," ); for (String s : a) { Message msg = new Message (topic, tag, s.getBytes()); SendResult r = p.send(msg); System.out.println(r); } } } }
消费者 消费者的要点:
1. push 和 pull
消费者有两种模式:push 和 pull。
push 模式由服务器主动向消费者发送消息;pull 模式由消费者主动向服务器请求消息。
在消费者处理能力有限时,为了减轻消费者的压力,可以采用pull模式。多数情况下都采用 pull 模式。
2. NameServer
消费者需要向 NameServer 询问 Topic 的路由信息。
3. Topic
从指定的Topic接收消息。Topic相当于是一级分类。
4. Tag
Topic 相当于是一级分类,Tag 相当于是2级分类。
多个 Tag 可以这样写: TagA || TagB || TagC
不指定 Tag,或者说接收所有的 Tag,可以写星号: *
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 package demo1;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class Consumer { public static void main (String[] args) throws Exception { DefaultMQPushConsumer c = new DefaultMQPushConsumer ("consumer-demo1" ); c.setNamesrvAddr("192.168.64.151:9876:192.168.64.152:9876" ); c.subscribe("Topic1" , "TagA" ); c.registerMessageListener(new MessageListenerConcurrently () { @Override public ConsumeConcurrentlyStatus consumeMessage (List<MessageExt> list, ConsumeConcurrentlyContext ctx) { for (MessageExt msg : list) { System.out.println(new String (msg.getBody()) + " - " + msg); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); c.start(); System.out.println("开始消费数据" ); } }
异步消息 master 收到消息后立即向生产者进行反馈。之后再以异步方式向 slave 复制消息。
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 package demo2;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendCallback;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.exception.RemotingException;import java.util.Scanner;public class Producer { public static void main (String[] args) throws MQClientException, RemotingException, InterruptedException { DefaultMQProducer p = new DefaultMQProducer ("producer-demo2" ); p.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876" ); p.start(); p.setRetryTimesWhenSendAsyncFailed(0 ); String topic = "Topic2" ; String tag = "TagA" ; String key = "Key-demo2" ; while (true ) { System.out.print("输入消息,用逗号分隔多条消息: " ); String[] a = new Scanner (System.in).nextLine().split("," ); for (String s : a) { Message msg = new Message (topic, tag, key, s.getBytes()); p.send(msg, new SendCallback () { @Override public void onSuccess (SendResult sendResult) { System.out.println("nn消息发送成功 : " +sendResult); } @Override public void onException (Throwable throwable) { System.out.println("nn消息发送失败" ); } }); System.out.println("--------------------消息已送出-----------------------" ); } } } }
消费者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 package demo2;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class Consumer { public static void main (String[] args) throws MQClientException { DefaultMQPushConsumer c = new DefaultMQPushConsumer ("consumer-demo2" ); c.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876" ); c.subscribe("Topic2" , "TagA" ); c.registerMessageListener(new MessageListenerConcurrently () { @Override public ConsumeConcurrentlyStatus consumeMessage (List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt msg : list) { System.out.println(new String (msg.getBody()) + " - " + msg); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); c.start(); System.out.println("开始消费数据" ); } }
单向消息
这种方式主要用在不特别关心发送结果的场景,例如日志发送。
生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 package demo3;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.exception.RemotingException;import java.util.Scanner;public class Producer { public static void main (String[] args) throws MQClientException, RemotingException, InterruptedException { DefaultMQProducer p = new DefaultMQProducer ("producer-demo3" ); p.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876" ); p.start(); String topic = "Topic3" ; String tag = "TagA" ; while (true ) { System.out.print("输入消息,用逗号分隔多条消息: " ); String[] a = new Scanner (System.in).nextLine().split("," ); for (String s : a) { Message msg = new Message (topic, tag, s.getBytes()); p.sendOneway(msg); } System.out.println("--------------------消息已送出-----------------------" ); } } }
消费者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 package demo3;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class Consumer { public static void main (String[] args) throws MQClientException { DefaultMQPushConsumer c = new DefaultMQPushConsumer ("consumer-demo2" ); c.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876" ); c.subscribe("Topic3" , "TagA" ); c.registerMessageListener(new MessageListenerConcurrently () { @Override public ConsumeConcurrentlyStatus consumeMessage (List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt msg : list) { System.out.println(new String (msg.getBody()) + " - " + msg); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); c.start(); System.out.println("开始消费数据" ); } }
顺序消息
上图演示了 Rocketmq 顺序消息的基本原理:
同一组有序的消息序列,会被发送到同一个队列,按照 FIFO 的方式进行处理
一个队列只允许一个消费者线程接收消息,这样就保证消息按顺序被接收
下面以订单为例:
一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中。消费时,从同一个队列接收同一个订单的消息。
生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 package demo4;import org.apache.rocketmq.client.exception.MQBrokerException;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.MessageQueueSelector;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.common.message.MessageQueue;import org.apache.rocketmq.remoting.exception.RemotingException;import java.util.List;import java.util.Scanner;public class Producer { static String[] msgs = { "15103111039,创建" , "15103111065,创建" , "15103111039,付款" , "15103117235,创建" , "15103111065,付款" , "15103117235,付款" , "15103111065,完成" , "15103111039,推送" , "15103117235,完成" , "15103111039,完成" }; public static void main (String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { DefaultMQProducer p = new DefaultMQProducer ("producer-demo4" ); p.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876" ); p.start(); String topic = "Topic4" ; String tag = "TagA" ; for (String s : msgs) { System.out.println("按回车发送此消息: " +s); new Scanner (System.in).nextLine(); Message msg = new Message (topic, tag, s.getBytes()); String[] a = s.split("," ); long orderId = Long.parseLong(a[0 ]); SendResult r = p.send(msg, new MessageQueueSelector () { @Override public MessageQueue select (List<MessageQueue> queueList, Message message, Object o) { Long orderId = (Long) o; long index = orderId % queueList.size(); System.out.println("消息已发送到: " +queueList.get((int ) index)); return queueList.get((int ) index); } }, orderId); System.out.println(r+"nn" ); } } }
消费者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 package demo4;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class Consumer { public static void main (String[] args) throws MQClientException { DefaultMQPushConsumer c = new DefaultMQPushConsumer ("consumer-demo4" ); c.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876" ); c.subscribe("Topic4" , "*" ); c.registerMessageListener(new MessageListenerOrderly () { @Override public ConsumeOrderlyStatus consumeMessage (List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) { String t = Thread.currentThread().getName(); for (MessageExt msg : list) { System.out.println(t+" - " + msg.getQueueId() + " - " +new String (msg.getBody())); } return ConsumeOrderlyStatus.SUCCESS; } }); c.start(); System.out.println("开始消费数据" ); } }
延时消息 消息发送到 Rocketmq 服务器后, 延迟一定时间再向消费者进行投递。
延时消息的使用场景:
比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
生产者发送消息时,对消息进行延时设置:
1 msg.setDelayTimeLevel(3 );
其中 3
代表级别而不是一个具体的时间值,级别和延时时长对应关系是在 MessageStoreConfig
类种进行定义的:
1 this .messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h" ;
对应关系表:
级别
延时时长
1
1s
2
5s
3
10s
4
30s
5
1m
6
2m
7
3m
8
4m
9
5m
10
6m
11
7m
12
8m
13
9m
14
10m
15
20m
16
30m
17
1h
18
2h
生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 package demo5;import org.apache.rocketmq.client.exception.MQBrokerException;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.exception.RemotingException;import java.util.Scanner;public class Producer { public static void main (String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { DefaultMQProducer p = new DefaultMQProducer ("producer-demo5" ); p.setNamesrvAddr("192.168.64.151:9876:192.168.64.152:9876" ); p.start(); while (true ) { System.out.print("输入消息,用逗号分隔多条消息: " ); String[] a = new Scanner (System.in).nextLine().split("," ); for (String s : a) { Message msg = new Message ("Topic5" , s.getBytes()); msg.setDelayTimeLevel(3 ); p.send(msg); } } } }
消费者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 package demo5;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class Consumer { public static void main (String[] args) throws MQClientException { DefaultMQPushConsumer c = new DefaultMQPushConsumer ("consumer-demo5" ); c.setNamesrvAddr("192.168.64.151:9876:192.168.64.152:9876" ); c.subscribe("Topic5" , "*" ); c.registerMessageListener(new MessageListenerConcurrently () { @Override public ConsumeConcurrentlyStatus consumeMessage (List<MessageExt> list, ConsumeConcurrentlyContext ctx) { System.out.println("------------------------------" ); for (MessageExt msg : list) { long t = System.currentTimeMillis() - msg.getBornTimestamp(); System.out.println(new String (msg.getBody()) + " - 延迟: " +t); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); c.start(); System.out.println("开始消费数据" ); } }
批量消息 批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。
生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 package demo6;import org.apache.rocketmq.client.exception.MQBrokerException;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.exception.RemotingException;import java.util.ArrayList;import java.util.Scanner;public class Producer { public static void main (String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { DefaultMQProducer p = new DefaultMQProducer ("producer-demo6" ); p.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876" ); p.start(); String topic = "Topic6" ; while (true ) { System.out.print("输入消息,用逗号分隔多条消息: " ); String[] a = new Scanner (System.in).nextLine().split("," ); ArrayList<Message> messages = new ArrayList <>(); for (String s : a) { messages.add(new Message (topic, s.getBytes())); } p.send(messages); System.out.println("批量消息已发送" ); } } }
消费者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 package demo6;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class Consumer { public static void main (String[] args) throws MQClientException { DefaultMQPushConsumer c = new DefaultMQPushConsumer ("consumer-demo6" ); c.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876" ); c.subscribe("Topic6" , "*" ); c.registerMessageListener(new MessageListenerConcurrently () { @Override public ConsumeConcurrentlyStatus consumeMessage (List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt msg : list) { System.out.println("收到: " +new String (msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); c.start(); System.out.println("开始消费数据" ); } }
消息过滤 Tag 过滤 Tag 可以满足大多数消息过滤的需求。使用 Tag 过滤非常简单,例如:
1 consumer.subscribe("Topic1" , "TagA || TagB || TagC" );
对自定义属性过滤 生产者可以在消息中添加自定义的属性:
1 2 msg.putUserProperty("prop1" , "1" ); msg.putUserProperty("prop2" , "2" );
消费者接收数据时,可以根据属性来过滤消息:
1 consumer.subscribe("Topic7" , MessageSelector.bySql("prop1=1 or prop2=2" ));
可以看到,自定义属性的过滤语法是 Sql 语法,RocketMQ只定义了一些基本语法来支持这个特性,支持的 Sql 过滤语法如下:
数值比较,比如:>,>=,<,<=,BETWEEN,=;
字符比较,比如:=,<>,IN;
IS NULL 或者 IS NOT NULL;
逻辑符号 AND,OR,NOT;
生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 package demo7;import org.apache.rocketmq.client.exception.MQBrokerException;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.exception.RemotingException;import java.util.Random;import java.util.Scanner;public class Producer { public static void main (String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { DefaultMQProducer p = new DefaultMQProducer ("producer-demo7" ); p.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876" ); p.start(); String topic = "Topic7" ; while (true ) { System.out.print("输入消息,用逗号分隔多条消息: " ); String[] a = new Scanner (System.in).nextLine().split("," ); System.out.print("输入Tag: " ); String tag = new Scanner (System.in).nextLine(); for (String s : a) { Message msg = new Message (topic, tag, s.getBytes()); msg.putUserProperty("rnd" , "" +new Random ().nextInt(4 )); p.send(msg); } } } }
消费者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 package demo7;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.MessageSelector;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.message.MessageExt;import java.util.List;import java.util.Scanner;public class Consumer { public static void main (String[] args) throws MQClientException { System.out.print("使用Tag过滤还是使用Sql过滤(tag/sql): " ); String ts = new Scanner (System.in).nextLine(); DefaultMQPushConsumer c = new DefaultMQPushConsumer ("consumer-demo7" ); c.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876" ); if (ts.equalsIgnoreCase("tag" )) { System.out.println("使用Tag过滤: TagA || TagB || TagC" ); c.subscribe("Topic7" , "TagA || TagB || TagC" ); } else { System.out.println("使用Sql过滤: rnd=1 or rnd > 2" ); c.subscribe("Topic7" , MessageSelector.bySql("rnd=1 or rnd > 2" )); } c.registerMessageListener(new MessageListenerConcurrently () { @Override public ConsumeConcurrentlyStatus consumeMessage (List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt msg : list) { System.out.println(new String (msg.getBody()) + " - " + msg.getUserProperty("rnd" )); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); c.start(); System.out.println("开始消费数据" ); } }
事务消息 RocketMQ 提供了可靠性消息,也叫事务消息。下面分析一下其原理。