微盛|团队技术博客

分布式数据一致性的保证

行秋(姚弯弯)

2023-04-21

一、背景

业务中往往存在需要串联多个领域的协同调用,本地事务无法保证整个链路的数据一致性。通常我们利用消息中间件来应对这样的场景。

二、业务场景模拟

举例:答题活动结束后,需将活动状态进行变更,然后进行奖品派发和群发答案。

三、常规做法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 开启DB事务
TransactionStatus transaction = transactionManager.getTransaction(new DefaultTransactionDefinition());

try {
// 关闭活动(数据库操作)
closeActivity();

// 直接派奖(调用积分/抽奖二方服务)
syncSendReward();

// 直接发送通知(调用超级群发服务)
syncSendNotification();

// 提交DB事务
transactionManager.commit(transaction);
} catch (Exception e) {
// 回滚DB事务
log.error("error, rollback.", e);
transactionManager.rollback(transaction);
throw new QuizActivityException(e);
}

1. 思考问题:

1.1. 从业务角度考虑服务的稳定性:

关闭活动是本次调用的核心业务操作。

【派奖】【发送通知】是关闭活动衍生出来的业务动作,要在关闭活动的时候去派奖,去发送通知。

而且这两步均依赖外部服务,跨服务调用无法保证 100%的成功。假如失败的话,不应该影响核心业务的稳定。

在核心业务成功的前提下,衍生动作也要确保执行成功,所以需要有对应的最终一致性方案来确保业务正确。

四、一致性解决方案 1:事务粒度细化 + 多阶段处理

将一个大事务拆成若干小事务,并且从流程设计上采取多阶段的思路允许外部服务调用失败,并有相应的补偿机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 开启DB事务
TransactionStatus transaction = transactionManager.getTransaction(new DefaultTransactionDefinition());

try {
// 派奖状态为失败
sendRewardStatus = false;

// 发送通知状态为失败
sendNotificationStatus = false;

// 关闭活动(数据库操作)
closeActivity();

// 提交DB事务
transactionManager.commit(transaction);
} catch (Exception e) {
// 回滚DB事务
log.error("error, rollback.", e);
transactionManager.rollback(transaction);
throw new QuizActivityException(e);
}

//以下代码忽略try catch

// 直接派奖(调用积分/抽奖二方服务),派奖内部需做幂等
syncSendReward();
// 修改派奖状态为成功
updateRewardStatus(true);

// 直接发送通知(调用超级群发服务),通知内部需做幂等
syncSendNotification();
// 修改通知状态为成功
updateSendNotificationStatus(true);

可以看到上述流程修改为多阶段的模式,并新增定时任务补偿

  • 阶段一:结束活动本地事务,此时派奖与通知状态均为失败。
  • 阶段二:执行派奖||通知流程, 如成功则修改派奖||通知状态为成功,如失败则流程结束。通过定时任务来补偿

定时任务逻辑:扫描派奖||通知失败的活动, 进行补偿派奖||通知,调用外部服务成功后,修改状态为成功

tips:无论是定时任务还是业务主流程均可能出现调用外部服务成功,但本地修改状态失败的场景。

如:调用外部响应过慢触发 timeout || rpc 连接异常断开,本地认为调用失败不修改状态,实际已成功。
如:调用外部成功,本地事务提交失败 || 本地宕机 || 数据库负载过高无响应

所以调用外部服务要做幂等,最佳实践为外部服务天生支持幂等 (通过业务标识幂等,业务标识由业务方指定)。

如:活动 ID + 用户 ID + 业务数据 (积分数量 || 抽奖次数)

1. 思考问题:

1.1. 是否对业务代码有额外侵入,导致代码复杂度提升?

1.2. 如出现调用失败的场景,定时任务时效性是否不佳?

五、一致性解决方案 2:异步执行 + 最终一致性

基于业务角度来看,派奖跟通知两个动作可以异步化处理,这样核心链路就简化为本地事务。

异步化实践可以通过 MQ 来做。但是 MQ 的使用方式也有几种。

1. 反例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 开启DB事务
TransactionStatus transaction = transactionManager.getTransaction(new DefaultTransactionDefinition());

try {
// 关闭活动(数据库操作)
closeActivity();

// 提交DB事务
transactionManager.commit(transaction);
} catch (Exception e) {
// 回滚DB事务
log.error("error, rollback.", e);
transactionManager.rollback(transaction);
throw new QuizActivityException(e);
}

// 发送派奖消息(MQ消费者执行派奖)
messageManager.sendMessage(rewardMessage);

// 发送通知消息(MQ消费者执行通知)
messageManager.sendMessage(notificationMessage);

1.1. 问题:

本地事务与发消息无法保证一致性。

本地事务提交后,发消息失败 (补偿同样失败)的话,事务无法回滚。

会出现丢消息的情况,派奖业务跟通知业务会出现异常。

2. 弱一致性处理

对模型间的数据一致性要求不太强的场景。通常使用轻量级的处理方式,如最终一致性。这样既能保证实现逻辑的简单,也能在极大程度上保证业务的稳定。

2.1. 实现方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 开启DB事务
TransactionStatus transaction = transactionManager.getTransaction(new DefaultTransactionDefinition());

try {
// 关闭活动(数据库操作)
closeActivity();

// 发送通知消息(MQ消费者执行通知)
messageManager.sendMessage(notificationMessage);

// 发送派奖消息(MQ消费者执行派奖,此消息消费依赖事务结果,应放最后)
messageManager.sendMessage(rewardMessage);

// 提交DB事务
transactionManager.commit(transaction);
} catch (Exception e) {
// 回滚DB事务
log.error("error, rollback.", e);
transactionManager.rollback(transaction);
throw new QuizActivityException(e);
}

将发消息的动作放在本地事务内部。可一定程度上保证一致性。发消息失败的话,本地事务可回滚。

如消息消费者业务依赖事务结果,此消息的发送应放在最后。

2.2. 思考问题:

2.2.1. 异步操作要考虑执行时序问题,MQ 消费执行先于事务提交时如何处理?(使用@Transactional 注解同理)

由于 MQ 消息发送成功后,消息消费时机是不可控的,所以会出现消息消费先于事务提交的时序出现此场景时,消费者可采取以下方式处理:

消费者:收到消息后应进行验证,防止收到消息时可能存在的写入的数据还未提交成功的情况。在这种情况下应抛出异常利用 MQ 进行重试。
(业务上有可能存在正常的根据 id 查不到的情况,应该想办法在消费时进行判断事务中的数据是否已经提交)

代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@RMQLaneTransfer
@StreamListener(value = StreamBinding.XXXXX)
public void mqConsumer(@Payload Message message,
@Header(required = false, name = CommonConstant.RMQ_CUSTOM_MAP) String rocketmqCustomMap ,
@Header(RocketMQHeaders.PREFIX + RocketMQHeaders.TOPIC) String topic,
@Header(required = false, name = RocketMQHeaders.PREFIX + RocketMQHeaders.TAGS) String tag) {
// 解析消息内容
XxxDTO xxxDTO = convert(message);

// 业务校验(防止事务提交前收到消息的校验,检查事务提交状态&必要的业务数据)
boolean checkResult = checkCommited();

// 校验失败,抛出异常
if (!checkResult) {
throw new QuizActivityException(QuizActivityExceptionEnum.UN_SUBMIT_TX_ERROR);
}

// 消费消息处理正常业务
doSomething();
}
2.2.2. 如消费者异常重试,事务仍然没提交

由于公司内部采用的是 Spring Cloud Stream,默认只会重试两次, 重试间隔分别为 1S 跟 2S,时间较短,如遇 DB 负载较高场景 || 慢事务场景,可能会多次重试均失败。

如遇这种场景可以适当修改重试策略,根据调整以下参数即可

MQ Consumer Properties:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
**maxAttempts**
If processing fails, the number of attempts to process the message (including the first). Set to 1 to disable retry.
Default: 3.

**backOffInitialInterval**
The backoff initial interval on retry.
Default: 1000.

**backOffMaxInterval**
The maximum backoff interval.
Default: 10000.

**backOffMultiplier**
The backoff multiplier.
Default: 2.0.

3. 强一致性处理:本地消息表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 开启DB事务
TransactionStatus transaction = transactionManager.getTransaction(new DefaultTransactionDefinition());

try {
// 派奖消息状态为失败
writeRewardMessageStatus(rewardMessage, false);

// 发送通知消息状态为失败
writeNotificationMessageStatus(notificationMessage, false);

// 关闭活动(数据库操作)
closeActivity();

// 提交DB事务
transactionManager.commit(transaction);
} catch (Exception e) {
// 回滚DB事务
log.error("error, rollback.", e);
transactionManager.rollback(transaction);
throw new QuizActivityException(e);
}

//以下代码忽略try catch

// 发送通知消息(MQ消费者执行通知)
messageManager.sendMessage(rewardMessage);
// 修改派奖消息状态为成功
writeRewardMessageStatus(rewardMessage, true);

// 发送派奖消息(MQ消费者执行派奖,此消息消费依赖事务结果,应放最后)
messageManager.sendMessage(notificationMessage);
// 修改发送通知消息状态为成功
writeNotificationMessageStatus(notificationMessage, true);

一致性解决方案1 的变种实现。

在一致性解决方案 1 的基础上,将同步调用外部服务改为发 MQ 消息。

原因是:通常我们认为基础中间件的稳定性是优于二方服务的,性能也是优于二方服务。所以在主流程中直接发送 MQ 消息。

思路与优缺点均与一致性解决方案1 一致,不多赘述

4. 强一致性处理:事务消息

4.1. 适用场景:

对模型间的数据有强一致性要求的场景。上下游的数据需要保证同时成功或者同时失败。

举例:商城下单时需要创建订单,锁定库存,锁定优惠券,锁定积分。需要一并成功,或者一并失败。

4.2. 事务消息简介

RMQ 事务消息作为分布式事务的一种解决方案,应对复杂场景应该根据业务逻辑进行合理的应用。保证多个模型的数据一致性。
脚手架目前引入 RocketMQ 版本为 4.9.1-RELEASE。下面引用RocketMQ 4.X官方文档的说明。

image.png

4.3. 事务消息生命周期

image.png

  • 初始化:半事务消息被生产者构建并完成初始化,待发送到服务端的状态。
  • 事务待提交:半事务消息被发送到服务端,和普通消息不同,并不会直接被服务端持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见。
  • 消息回滚:第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止。
  • 提交待消费:第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见,等待被消费者获取并消费。
  • 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,Apache RocketMQ 会对消息进行重试处理。具体信息,请参见消费重试
  • 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。 Apache RocketMQ 默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。
  • 消息删除:Apache RocketMQ 按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。更多信息,请参见消息存储和清理机制

4.4. 代码示例(较长,参考官网即可)

4.5. 思考问题:

4.5.1. 事务回查时,本地事务未执行完成如何处理?

消息回查时,如事务还未提交,应继续保持 Unknown 的状态,不要 Commit 或者 Rollback。以保证事务性。

一般出现此场景的原因:事务执行慢 (如 DB 热点单行排队,DB 负载高等)。

解决方法:

  • 延长第一次回查时间。缺点时会造成事务提交延迟较大

4.6. 使用建议:

事务检查间隔不宜过小,频繁的事务回查会对 broker 造成较大的压力。
尽量保证本地事务的快速提交,减少 Unknown 状态的事务,减少回查次数,减小 broker 的压力。

RocketMQ 相关参数手册: https://rocketmq.apache.org/zh/docs/introduction/03limits/

Tags: 后端

作者: 行秋(姚弯弯)