RocketMQ SpringCloud 实现分布式事务

事务消息的原理

事务

事务
下面来看 RocketMQ 的事务消息是如何来发送“可靠消息”的,只需要以下三步:

  1. 发送半消息(半消息不会发送给消费者)
  2. 执行本地事务
  3. 提交消息

事务

事务

事务
完成事务消息发送后,消费者就可以以正常的方式来消费数据。

RocketMQ 的自动重发机制在绝大多数情况下,都可以保证消息被正确消费。

假如消息最终消费失败了,还可以由人工处理进行托底。

事务
上面分析的是正常情况下的执行流程。下面再来看两种错误情况:

  1. 事务执行失败时回滚消息
  2. 服务器无法得知消息状态时,需要主动回查消息状态

回滚:

事务
消息回查:

事务

事务

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package demo8;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.Scanner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;

public class Producer {
public static void main(String[] args) throws MQClientException {
TransactionMQProducer p = new TransactionMQProducer("producer-demo8");
p.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");

p.setExecutorService(Executors.newFixedThreadPool(5));

p.setTransactionListener(new TransactionListener() {

ConcurrentHashMap<String, LocalTransactionState> localTx = new ConcurrentHashMap<>();

/*
在这里执行本地事务
*/
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
System.out.println("执行本地事务");
if (Math.random()<0.333) {
System.out.println("本地事务执行成功, 按回车提交事务消息");
new Scanner(System.in).nextLine();

localTx.put(message.getTransactionId(), LocalTransactionState.COMMIT_MESSAGE);
return LocalTransactionState.COMMIT_MESSAGE;
} else if (Math.random()<0.666) {
System.out.println("本地事务执行失败, 按回车回滚事务消息");
new Scanner(System.in).nextLine();

localTx.put(message.getTransactionId(), LocalTransactionState.ROLLBACK_MESSAGE);
return LocalTransactionState.ROLLBACK_MESSAGE;
} else {
System.out.println("本地事务执行情况未知, 按回车继续");
new Scanner(System.in).nextLine();

localTx.put(message.getTransactionId(), LocalTransactionState.UNKNOW);
return LocalTransactionState.UNKNOW;
}
}

/*
回查方法
检测频率默认1分钟,可通过在broker.conf文件中设置transactionCheckInterval的值来改变默认值,单位为毫秒。
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
System.out.println("服务器正在回查消息状态");

LocalTransactionState s = localTx.get(messageExt.getTransactionId());
if (s == null || s == LocalTransactionState.UNKNOW) {
s = LocalTransactionState.ROLLBACK_MESSAGE;
}
return s;
}
});

p.start();

String topic = "Topic8";

while (true) {
System.out.print("输入消息,用逗号分隔多条消息: ");
String[] a = new Scanner(System.in).nextLine().split(",");

for (String s : a) {
Message msg = new Message(topic, s.getBytes());
System.out.println("---------发送半消息-----------");
TransactionSendResult r = p.sendMessageInTransaction(msg, null);
System.out.println("事务消息发送结果: "+ r.getLocalTransactionState().name());
}
}
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package demo8;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/*

如果返回 RECONSUME_LATER, 服务器会等待一会再重试发送消息

消息属性默认设置 DELAY=6, 等待时间为 2 分钟,

org/apache/rocketmq/store/config/MessageStoreConfig.java
this.messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";


*/
public class Consumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer c = new DefaultMQPushConsumer("consumer-demo8");
c.setNamesrvAddr("192.168.64.151:9876:192.168.64.152:9876");

c.subscribe("Topic8", "*");

c.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext ctx) {
for (MessageExt msg : list) {
System.out.println(new String(msg.getBody()) + " - " + msg);
}
if (Math.random()<0.5) {
System.out.println("消息处理完成");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} else {
System.out.println("消息处理失败, 要求服务器稍后重试发送消息");
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
});

c.start();
System.out.println("开始消费数据");
}
}

准备订单项目案例

新建 rocketmq-dtx 工程

新建 Empty Project:
a
工程命名为 rocketmq-dtx,存放到任意文件夹下:

a

导入订单项目,无事务版本

下载项目代码

  1. 访问 git 仓库 https://gitee.com/benwang6/seata-samples
  2. 访问项目标签
    a
  3. 下载无事务版
    a

解压到 rocketmq-dtx 目录

压缩文件中的 7 个项目目录解压缩到 rocketmq-dtx 目录:

a

导入项目

在 idea 中按两下 shift 键,搜索 add maven projects,打开 maven 工具:
a

然后选择 rocketmq-dtx 工程目录下的 7 个项目的 pom.xml 导入:

a

order 添加事务状态表

Rocketmq收到事务消息后,会等待生产者提交或回滚该消息。如果无法得到生产者的提交或回滚指令,则会主动向生产者询问消息状态,称为回查。

在 order 项目中,为了让Rocketmq可以回查到事务的状态,需要记录事务的状态,所以我们添加一个事务的状态表来记录事务状态。

修改 db-init 项目中的 order.sql 文件,创建 tx_table 表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
drop database if exists `seata_order`;

CREATE DATABASE `seata_order` charset utf8;

use `seata_order`;

CREATE TABLE `order` (
`id` bigint(11) NOT NULL,
`user_id` bigint(11) DEFAULT NULL COMMENT '用户id',
`product_id` bigint(11) DEFAULT NULL COMMENT '产品id',
`count` int(11) DEFAULT NULL COMMENT '数量',
`money` decimal(11,0) DEFAULT NULL COMMENT '金额',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;

ALTER TABLE `order` ADD COLUMN `status` int(1) DEFAULT NULL COMMENT '订单状态:0:创建中;1:已完结' AFTER `money` ;

-- for AT mode you must to init this sql for you business database. the seata server not need it.
CREATE TABLE IF NOT EXISTS `undo_log`
(
`branch_id` BIGINT(20) NOT NULL COMMENT 'branch transaction id',
`xid` VARCHAR(100) NOT NULL COMMENT 'global transaction id',
`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',
`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` DATETIME(6) NOT NULL COMMENT 'create datetime',
`log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime',
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';

CREATE TABLE IF NOT EXISTS segment
(
id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY COMMENT '自增主键',
VERSION BIGINT DEFAULT 0 NOT NULL COMMENT '版本号',
business_type VARCHAR(63) DEFAULT '' NOT NULL COMMENT '业务类型,唯一',
max_id BIGINT DEFAULT 0 NOT NULL COMMENT '当前最大id',
step INT DEFAULT 0 NULL COMMENT '步长',
increment INT DEFAULT 1 NOT NULL COMMENT '每次id增量',
remainder INT DEFAULT 0 NOT NULL COMMENT '余数',
created_at BIGINT UNSIGNED NOT NULL COMMENT '创建时间',
updated_at BIGINT UNSIGNED NOT NULL COMMENT '更新时间',
CONSTRAINT uniq_business_type UNIQUE (business_type)
) CHARSET = utf8mb4
ENGINE INNODB COMMENT '号段表';

INSERT INTO segment
(VERSION, business_type, max_id, step, increment, remainder, created_at, updated_at)
VALUES (1, 'order_business', 1000, 1000, 1, 0, NOW(), NOW());

CREATE TABLE tx_table(
`xid` char(32) PRIMARY KEY COMMENT '事务id',
`status` int COMMENT '0-提交,1-回滚,2-未知',
`created_at` BIGINT UNSIGNED NOT NULL COMMENT '创建时间'
);

运行 db-init 项目,会创建这个表:

a

order 发送事务消息,并执行本地事务

Rocketmq 中添加 Topic

使用 order-topic 来收发消息,在 Rocketmq 服务器上创建这个 Topic:

a

order-parent 中添加 rocketmq 起步依赖

修改 pom.xml 添加以下内容:

properties 中设置 rocketmq 起步依赖的版本

1
<rocketmq-spring-boot-starter.version>2.1.0</rocketmq-spring-boot-starter.version>

添加 rocketmq 起步依赖:

1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq-spring-boot-starter.version}</version>
</dependency>

order 项目中添加 rocketmq 连接信息配置:

修改 order 项目的 application.yml,添加 NameServer 地址,指定生产者组名:

1
2
3
4
rocketmq:
name-server: 192.168.64.151:9876;192.168.64.152:9876
producer:
group: order-group

添加 TxMapper 访问事务状态表

事务状态保存到 tx_table 表,在 TxMapper 接口和 TxMapper.xml 中添加事务状态数据的读写方法。

本地事务执行后要保存事务信息(事务id、事务状态)到数据库,以便之后进行事务回查,首先创建封装事务信息的类 TxInfo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package cn.tedu.order.tx;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class TxInfo {
private String xid;
private Long created;
private Integer status;
}

TxMapper 接口:

1
2
3
4
5
6
7
8
package cn.tedu.order.mapper;

import cn.tedu.order.tx.TxInfo;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;

public interface TxMapper extends BaseMapper<TxInfo> {
Boolean exists(String xid);
}

TxMapper.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="cn.tedu.order.mapper.TxMapper" >
<resultMap id="BaseResultMap" type="cn.tedu.order.tx.TxInfo" >
<id column="xid" property="xid" jdbcType="CHAR" />
<result column="created_at" property="created" jdbcType="BIGINT" />
<result column="status" property="status" jdbcType="INTEGER"/>
</resultMap>

<insert id="insert">
INSERT INTO `tx_table`(`xid`,`created_at`,`status`) VALUES(#{xid},#{created},#{status});
</insert>
<select id="exists" resultType="boolean">
SELECT COUNT(1) FROM tx_table WHERE xid=#{xid};
</select>
<select id="selectById" resultMap="BaseResultMap">
SELECT `xid`,`created_at`,`status` FROM tx_table WHERE xid=#{xid};
</select>
</mapper>

Json处理工具

发送事务消息时,我们把事务对象序列化成 Json 字符串再发送。这里先添加一个工具 JsonUtil 用来处理 Json:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
package cn.tedu.order.util;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Writer;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.lang3.StringUtils;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.module.paramnames.ParameterNamesModule;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class JsonUtil {
private static ObjectMapper mapper;
private static JsonInclude.Include DEFAULT_PROPERTY_INCLUSION = JsonInclude.Include.NON_DEFAULT;
private static boolean IS_ENABLE_INDENT_OUTPUT = false;
private static String CSV_DEFAULT_COLUMN_SEPARATOR = ",";
static {
try {
initMapper();
configPropertyInclusion();
configIndentOutput();
configCommon();
} catch (Exception e) {
log.error("jackson config error", e);
}
}

private static void initMapper() {
mapper = new ObjectMapper();
}

private static void configCommon() {
config(mapper);
}

private static void configPropertyInclusion() {
mapper.setSerializationInclusion(DEFAULT_PROPERTY_INCLUSION);
}

private static void configIndentOutput() {
mapper.configure(SerializationFeature.INDENT_OUTPUT, IS_ENABLE_INDENT_OUTPUT);
}

private static void config(ObjectMapper objectMapper) {
objectMapper.enable(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN);
objectMapper.enable(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT);
objectMapper.enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY);
objectMapper.enable(DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY);
objectMapper.enable(DeserializationFeature.FAIL_ON_NUMBERS_FOR_ENUMS);
objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
objectMapper.disable(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES);
objectMapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
objectMapper.enable(JsonParser.Feature.ALLOW_COMMENTS);
objectMapper.disable(JsonGenerator.Feature.ESCAPE_NON_ASCII);
objectMapper.enable(JsonGenerator.Feature.IGNORE_UNKNOWN);
objectMapper.enable(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES);
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
objectMapper.setDateFormat(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
objectMapper.enable(JsonParser.Feature.ALLOW_SINGLE_QUOTES);
objectMapper.registerModule(new ParameterNamesModule());
objectMapper.registerModule(new Jdk8Module());
objectMapper.registerModule(new JavaTimeModule());
}
public static void setSerializationInclusion(JsonInclude.Include inclusion) {
DEFAULT_PROPERTY_INCLUSION = inclusion;
configPropertyInclusion();
}

public static void setIndentOutput(boolean isEnable) {
IS_ENABLE_INDENT_OUTPUT = isEnable;
configIndentOutput();
}

public static <V> V from(URL url, Class<V> c) {
try {
return mapper.readValue(url, c);
} catch (IOException e) {
log.error("jackson from error, url: {}, type: {}", url.getPath(), c, e);
return null;
}
}

public static <V> V from(InputStream inputStream, Class<V> c) {
try {
return mapper.readValue(inputStream, c);
} catch (IOException e) {
log.error("jackson from error, type: {}", c, e);
return null;
}
}

public static <V> V from(File file, Class<V> c) {
try {
return mapper.readValue(file, c);
} catch (IOException e) {
log.error("jackson from error, file path: {}, type: {}", file.getPath(), c, e);
return null;
}
}

public static <V> V from(Object jsonObj, Class<V> c) {
try {
return mapper.readValue(jsonObj.toString(), c);
} catch (IOException e) {
log.error("jackson from error, json: {}, type: {}", jsonObj.toString(), c, e);
return null;
}
}

public static <V> V from(String json, Class<V> c) {
try {
return mapper.readValue(json, c);
} catch (IOException e) {
log.error("jackson from error, json: {}, type: {}", json, c, e);
return null;
}
}

public static <V> V from(URL url, TypeReference<V> type) {
try {
return mapper.readValue(url, type);
} catch (IOException e) {
log.error("jackson from error, url: {}, type: {}", url.getPath(), type, e);
return null;
}
}

public static <V> V from(InputStream inputStream, TypeReference<V> type) {
try {
return mapper.readValue(inputStream, type);
} catch (IOException e) {
log.error("jackson from error, type: {}", type, e);
return null;
}
}

public static <V> V from(File file, TypeReference<V> type) {
try {
return mapper.readValue(file, type);
} catch (IOException e) {
log.error("jackson from error, file path: {}, type: {}", file.getPath(), type, e);
return null;
}
}

public static <V> V from(Object jsonObj, TypeReference<V> type) {
try {
return mapper.readValue(jsonObj.toString(), type);
} catch (IOException e) {
log.error("jackson from error, json: {}, type: {}", jsonObj.toString(), type, e);
return null;
}
}

public static <V> V from(String json, TypeReference<V> type) {
try {
return mapper.readValue(json, type);
} catch (IOException e) {
log.error("jackson from error, json: {}, type: {}", json, type, e);
return null;
}
}

public static <V> String to(List<V> list) {
try {
return mapper.writeValueAsString(list);
} catch (JsonProcessingException e) {
log.error("jackson to error, obj: {}", list, e);
return null;
}
}

public static <V> String to(V v) {
try {
return mapper.writeValueAsString(v);
} catch (JsonProcessingException e) {
log.error("jackson to error, obj: {}", v, e);
return null;
}
}

public static <V> void toFile(String path, List<V> list) {
try (Writer writer = new FileWriter(new File(path), true)) {
mapper.writer().writeValues(writer).writeAll(list);
writer.flush();
} catch (Exception e) {
log.error("jackson to file error, path: {}, list: {}", path, list, e);
}
}

public static <V> void toFile(String path, V v) {
try (Writer writer = new FileWriter(new File(path), true)) {
mapper.writer().writeValues(writer).write(v);
writer.flush();
} catch (Exception e) {
log.error("jackson to file error, path: {}, obj: {}", path, v, e);
}
}

public static String getString(String json, String key) {
if (StringUtils.isEmpty(json)) {
return null;
}
try {
JsonNode node = mapper.readTree(json);
if (null != node) {
return node.get(key).asText();
} else {
return null;
}
} catch (IOException e) {
log.error("jackson get string error, json: {}, key: {}", json, key, e);
return null;
}
}

public static Integer getInt(String json, String key) {
if (StringUtils.isEmpty(json)) {
return null;
}
try {
JsonNode node = mapper.readTree(json);
if (null != node) {
return node.get(key).intValue();
} else {
return null;
}
} catch (IOException e) {
log.error("jackson get int error, json: {}, key: {}", json, key, e);
return null;
}
}

public static Long getLong(String json, String key) {
if (StringUtils.isEmpty(json)) {
return null;
}
try {
JsonNode node = mapper.readTree(json);
if (null != node) {
return node.get(key).longValue();
} else {
return null;
}
} catch (IOException e) {
log.error("jackson get long error, json: {}, key: {}", json, key, e);
return null;
}
}

public static Double getDouble(String json, String key) {
if (StringUtils.isEmpty(json)) {
return null;
}
try {
JsonNode node = mapper.readTree(json);
if (null != node) {
return node.get(key).doubleValue();
} else {
return null;
}
} catch (IOException e) {
log.error("jackson get double error, json: {}, key: {}", json, key, e);
return null;
}
}

public static BigInteger getBigInteger(String json, String key) {
if (StringUtils.isEmpty(json)) {
return new BigInteger(String.valueOf(0.00));
}
try {
JsonNode node = mapper.readTree(json);
if (null != node) {
return node.get(key).bigIntegerValue();
} else {
return null;
}
} catch (IOException e) {
log.error("jackson get biginteger error, json: {}, key: {}", json, key, e);
return null;
}
}

public static BigDecimal getBigDecimal(String json, String key) {
if (StringUtils.isEmpty(json)) {
return null;
}
try {
JsonNode node = mapper.readTree(json);
if (null != node) {
return node.get(key).decimalValue();
} else {
return null;
}
} catch (IOException e) {
log.error("jackson get bigdecimal error, json: {}, key: {}", json, key, e);
return null;
}
}

public static boolean getBoolean(String json, String key) {
if (StringUtils.isEmpty(json)) {
return false;
}
try {
JsonNode node = mapper.readTree(json);
if (null != node) {
return node.get(key).booleanValue();
} else {
return false;
}
} catch (IOException e) {
log.error("jackson get boolean error, json: {}, key: {}", json, key, e);
return false;
}
}

public static byte[] getByte(String json, String key) {
if (StringUtils.isEmpty(json)) {
return null;
}
try {
JsonNode node = mapper.readTree(json);
if (null != node) {
return node.get(key).binaryValue();
} else {
return null;
}
} catch (IOException e) {
log.error("jackson get byte error, json: {}, key: {}", json, key, e);
return null;
}
}

public static <T> ArrayList<T> getList(String json, String key) {
if (StringUtils.isEmpty(json)) {
return null;
}
String string = getString(json, key);
return from(string, new TypeReference<ArrayList<T>>() {});
}

public static <T> String add(String json, String key, T value) {
try {
JsonNode node = mapper.readTree(json);
add(node, key, value);
return node.toString();
} catch (IOException e) {
log.error("jackson add error, json: {}, key: {}, value: {}", json, key, value, e);
return json;
}
}

private static <T> void add(JsonNode jsonNode, String key, T value) {
if (value instanceof String) {
((ObjectNode) jsonNode).put(key, (String) value);
} else if (value instanceof Short) {
((ObjectNode) jsonNode).put(key, (Short) value);
} else if (value instanceof Integer) {
((ObjectNode) jsonNode).put(key, (Integer) value);
} else if (value instanceof Long) {
((ObjectNode) jsonNode).put(key, (Long) value);
} else if (value instanceof Float) {
((ObjectNode) jsonNode).put(key, (Float) value);
} else if (value instanceof Double) {
((ObjectNode) jsonNode).put(key, (Double) value);
} else if (value instanceof BigDecimal) {
((ObjectNode) jsonNode).put(key, (BigDecimal) value);
} else if (value instanceof BigInteger) {
((ObjectNode) jsonNode).put(key, (BigInteger) value);
} else if (value instanceof Boolean) {
((ObjectNode) jsonNode).put(key, (Boolean) value);
} else if (value instanceof byte[]) {
((ObjectNode) jsonNode).put(key, (byte[]) value);
} else {
((ObjectNode) jsonNode).put(key, to(value));
}
}

public static String remove(String json, String key) {
try {
JsonNode node = mapper.readTree(json);
((ObjectNode) node).remove(key);
return node.toString();
} catch (IOException e) {
log.error("jackson remove error, json: {}, key: {}", json, key, e);
return json;
}
}

public static <T> String update(String json, String key, T value) {
try {
JsonNode node = mapper.readTree(json);
((ObjectNode) node).remove(key);
add(node, key, value);
return node.toString();
} catch (IOException e) {
log.error("jackson update error, json: {}, key: {}, value: {}", json, key, value, e);
return json;
}
}

public static String format(String json) {
try {
JsonNode node = mapper.readTree(json);
return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(node);
} catch (IOException e) {
log.error("jackson format json error, json: {}", json, e);
return json;
}
}

public static boolean isJson(String json) {
try {
mapper.readTree(json);
return true;
} catch (Exception e) {
log.error("jackson check json error, json: {}", json, e);
return false;
}
}

private static InputStream getResourceStream(String name) {
return JsonUtil.class.getClassLoader().getResourceAsStream(name);
}

private static InputStreamReader getResourceReader(InputStream inputStream) {
if (null == inputStream) {
return null;
}
return new InputStreamReader(inputStream, StandardCharsets.UTF_8);
}
}

废弃 OrderServiceImpl

之前的业务实现类 OrderServiceImpl 废弃,后面用一个新的类 TxOrderService 来代替。

OrderServiceImpl.java 直接删除即可。

TxOrderService 发送事务消息

TxAccountMessage 封装发送给账户服务的数据:用户id和扣减金额。另外还封装了事务id。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package cn.tedu.order.tx;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.math.BigDecimal;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class TxAccountMessage {
Long userId;
BigDecimal money;
String xid;
}

在业务方法 create() 中不直接保存订单,而是发送事务消息。

消息发出后,会触发 TxListener 执行本地事务,它执行时会回调这里的 doCreate() 方法完成订单的保存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package cn.tedu.order.tx;

import cn.tedu.order.entity.Order;
import cn.tedu.order.feign.EasyIdGeneratorClient;
import cn.tedu.order.mapper.OrderMapper;
import cn.tedu.order.mapper.TxMapper;
import cn.tedu.order.service.OrderService;
import cn.tedu.order.util.JsonUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Primary;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.UUID;

@Slf4j
@Primary
@Service
public class TxOrderService implements OrderService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Autowired
private OrderMapper orderMapper;
@Autowired
private TxMapper txMapper;
@Autowired
EasyIdGeneratorClient easyIdGeneratorClient;

/*
创建订单的业务方法
这里修改为:只向 Rocketmq 发送事务消息。
*/
@Override
public void create(Order order) {
// 产生事务ID
String xid = UUID.randomUUID().toString().replace("-", "");

//对事务相关数据进行封装,并转成 json 字符串
TxAccountMessage sMsg = new TxAccountMessage(order.getUserId(), order.getMoney(), xid);
String json = JsonUtil.to(sMsg);

//json字符串封装到 Spring Message 对象
Message<String> msg = MessageBuilder.withPayload(json).build();

//发送事务消息
rocketMQTemplate.sendMessageInTransaction("order-topic:account", msg, order);
log.info("事务消息已发送");
}

//本地事务,执行订单保存
//这个方法在事务监听器中调用
@Transactional
public void doCreate(Order order, String xid) {
log.info("执行本地事务,保存订单");

// 从全局唯一id发号器获得id
Long orderId = easyIdGeneratorClient.nextId("order_business");
order.setId(orderId);

orderMapper.create(order);

log.info("订单已保存! 事务日志已保存");
}
}

TxListener 事务监听器

发送事务消息后会触发事务监听器执行。

事务监听器有两个方法:

  • executeLocalTransaction(): 执行本地事务
  • checkLocalTransaction(): 负责响应Rocketmq服务器的事务回查操作

TxListener 监听器类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package cn.tedu.order.tx;

import cn.tedu.order.entity.Order;
import cn.tedu.order.mapper.TxMapper;
import cn.tedu.order.util.JsonUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RocketMQTransactionListener
public class TxListener implements RocketMQLocalTransactionListener {
@Autowired
private TxOrderService orderService;
@Autowired
private TxMapper txMapper;

@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
log.info("事务监听 - 开始执行本地事务");

// 监听器中得到的 message payload 是 byte[]
String json = new String((byte[]) message.getPayload());
String xid = JsonUtil.getString(json, "xid");

log.info("事务监听 - "+json);
log.info("事务监听 - xid: "+xid);

RocketMQLocalTransactionState state;
int status = 0;
Order order = (Order) o;

try {
orderService.doCreate(order, xid);

log.info("本地事务执行成功,提交消息");
state = RocketMQLocalTransactionState.COMMIT;
status = 0;
} catch (Exception e) {
e.printStackTrace();
log.info("本地事务执行失败,回滚消息");
state = RocketMQLocalTransactionState.ROLLBACK;
status = 1;
}

TxInfo txInfo = new TxInfo(xid, System.currentTimeMillis(), status);
txMapper.insert(txInfo);

return state;
}

@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
log.info("事务监听 - 回查事务状态");

// 监听器中得到的 message payload 是 byte[]
String json = new String((byte[]) message.getPayload());
String xid = JsonUtil.getString(json, "xid");

TxInfo txInfo = txMapper.selectById(xid);
if (txInfo == null) {
log.info("事务监听 - 回查事务状态 - 事务不存在:"+xid);
return RocketMQLocalTransactionState.UNKNOWN;
}

log.info("事务监听 - 回查事务状态 - "+ txInfo.getStatus());

switch (txInfo.getStatus()) {
case 0: return RocketMQLocalTransactionState.COMMIT;
case 1: return RocketMQLocalTransactionState.ROLLBACK;
default: return RocketMQLocalTransactionState.UNKNOWN;
}
}
}

启动订单项目进行测试

按顺序启动项目:

  1. Eureka
  2. Easy Id Generator
  3. Order

调用保存订单,地址:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100

观察控制台日志:

a
订单表:
a

事务表:
a

访问 Rocketmq,查看事务消息:
a

account 接收事务消息,并执行本地事务

application.yml 添加 Rocketmq 连接配置

1
2
rocketmq:
name-server: 192.168.64.151:9876;192.168.64.152:9876

JsonUtil 工具类

同上面的代码。

TxConsumer 接收事务消息,调用账户业务方法

接收的消息转换成 TxAccountMessage 对象,这里先创建这个类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package cn.tedu.account.tx;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.math.BigDecimal;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class TxAccountMessage {
Long userId;
BigDecimal money;
String xid;
}

TxConsumer 实现消息监听,收到消息后完成扣减金额业务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package cn.tedu.account.tx;

import cn.tedu.account.service.AccountService;
import cn.tedu.account.util.JsonUtil;
import com.fasterxml.jackson.core.type.TypeReference;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "account-consumer-group", topic = "order-topic", selectorExpression = "account")
public class TxConsumer implements RocketMQListener<String> {
@Autowired
private AccountService accountService;

@Override
public void onMessage(String msg) {
TxAccountMessage txAccountMessage = JsonUtil.from(msg, new TypeReference<TxAccountMessage>() {});
log.info("收到消息: "+txAccountMessage);

accountService.decrease(txAccountMessage.getUserId(), txAccountMessage.getMoney());
}
}

AccountServiceImpl 添加事务注解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package cn.tedu.account.service;

import cn.tedu.account.mapper.AccountMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.math.BigDecimal;
@Service
public class AccountServiceImpl implements AccountService {
@Autowired
private AccountMapper accountMapper;

@Transactional
@Override
public void decrease(Long userId, BigDecimal money) {
accountMapper.decrease(userId,money);
}
}

启动 account 项目进行测试

按顺序启动项目:

  1. Eureka
  2. Easy Id Generator
  3. Account
  4. Order

account 项目启动时,会立即从 Rocketmq 收到消息,执行账户扣减业务:
a

order 本地事务失败测试

修改 TxOrderService 添加模拟异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package cn.tedu.order.tx;

import cn.tedu.order.entity.Order;
import cn.tedu.order.feign.EasyIdGeneratorClient;
import cn.tedu.order.mapper.OrderMapper;
import cn.tedu.order.mapper.TxMapper;
import cn.tedu.order.service.OrderService;
import cn.tedu.order.util.JsonUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Primary;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.UUID;

@Slf4j
@Primary
@Service
public class TxOrderService implements OrderService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Autowired
private OrderMapper orderMapper;
@Autowired
private TxMapper txMapper;
@Autowired
EasyIdGeneratorClient easyIdGeneratorClient;

/*
创建订单的业务方法
这里修改为:只向 Rocketmq 发送事务消息。
*/
@Override
public void create(Order order) {
// 产生事务ID
String xid = UUID.randomUUID().toString().replace("-", "");

//对事务相关数据进行封装,并转成 json 字符串
TxAccountMessage sMsg = new TxAccountMessage(order.getUserId(), order.getMoney(), xid);
String json = JsonUtil.to(sMsg);

//json字符串封装到 Spring Message 对象
Message<String> msg = MessageBuilder.withPayload(json).build();

//发送事务消息
log.info("开始发送事务消息");
rocketMQTemplate.sendMessageInTransaction("order-topic:account", msg, order);
log.info("事务消息已发送");
}

//本地事务,执行订单保存
//这个方法在事务监听器中调用
@Transactional
public void doCreate(Order order, String xid) {
log.info("执行本地事务,保存订单");

// 从全局唯一id发号器获得id
Long orderId = easyIdGeneratorClient.nextId("order_business");
order.setId(orderId);

orderMapper.create(order);

if (Math.random() < 0.5) {
throw new RuntimeException("模拟异常");
}

log.info("订单已保存! 事务日志已保存");
}
}

调用保存订单,地址:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100

本地事务失败后,会通知 Rocketmq 回滚事务:
a

测试完后,将模拟异常代码注释掉

account 本地事务失败测试

修改 AccountServiceImpl,添加随机模拟异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package cn.tedu.account.service;

import cn.tedu.account.mapper.AccountMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.math.BigDecimal;
@Service
public class AccountServiceImpl implements AccountService {
@Autowired
private AccountMapper accountMapper;

@Transactional
@Override
public void decrease(Long userId, BigDecimal money) {
accountMapper.decrease(userId,money);

if (Math.random() < 0.5) {
throw new RuntimeException("模拟异常");
}
}
}

调用保存订单,地址:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100

account 账户服务接收消息后,如果处理失败,Rocketmq会进行重试,直到处理成功为止。