一、背景
业务中往往存在需要串联多个领域的协同调用,本地事务无法保证整个链路的数据一致性。通常我们利用消息中间件来应对这样的场景。
二、业务场景模拟 举例:答题活动结束后,需将活动状态进行变更,然后进行奖品派发和群发答案。
三、常规做法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 TransactionStatus transaction = transactionManager.getTransaction(new DefaultTransactionDefinition ());try { closeActivity(); syncSendReward(); syncSendNotification(); transactionManager.commit(transaction); } catch (Exception e) { log.error("error, rollback." , e); transactionManager.rollback(transaction); throw new QuizActivityException (e); }
1. 思考问题: 1.1. 从业务角度考虑服务的稳定性: 关闭活动是本次调用的核心业务操作。
【派奖】【发送通知】是关闭活动衍生出来的业务动作,要在关闭活动的时候去派奖,去发送通知。
而且这两步均依赖外部服务,跨服务调用无法保证 100%的成功。假如失败的话,不应该影响核心业务的稳定。
在核心业务成功的前提下,衍生动作也要确保执行成功,所以需要有对应的最终一致性方案来确保业务正确。
四、一致性解决方案 1:事务粒度细化 + 多阶段处理 将一个大事务拆成若干小事务,并且从流程设计上采取多阶段的思路允许外部服务调用失败,并有相应的补偿机制
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 TransactionStatus transaction = transactionManager.getTransaction(new DefaultTransactionDefinition ());try { sendRewardStatus = false ; sendNotificationStatus = false ; closeActivity(); transactionManager.commit(transaction); } catch (Exception e) { log.error("error, rollback." , e); transactionManager.rollback(transaction); throw new QuizActivityException (e); } syncSendReward(); updateRewardStatus(true ); syncSendNotification(); updateSendNotificationStatus(true );
可以看到上述流程修改为多阶段的模式,并新增定时任务补偿
阶段一:结束活动本地事务,此时派奖与通知状态均为失败。
阶段二:执行派奖||通知流程, 如成功则修改派奖||通知状态为成功,如失败则流程结束。通过定时任务来补偿
定时任务逻辑:扫描派奖||通知失败的活动, 进行补偿派奖||通知,调用外部服务成功后,修改状态为成功
tips:无论是定时任务还是业务主流程均可能出现调用外部服务成功,但本地修改状态失败的场景。
如:调用外部响应过慢触发 timeout || rpc 连接异常断开,本地认为调用失败不修改状态,实际已成功。 如:调用外部成功,本地事务提交失败 || 本地宕机 || 数据库负载过高无响应
所以调用外部服务要做幂等 ,最佳实践为外部服务天生支持幂等 (通过业务标识幂等,业务标识由业务方指定)。
如:活动 ID + 用户 ID + 业务数据 (积分数量 || 抽奖次数)
1. 思考问题: 1.1. 是否对业务代码有额外侵入,导致代码复杂度提升? 1.2. 如出现调用失败的场景,定时任务时效性是否不佳? 五、一致性解决方案 2:异步执行 + 最终一致性 基于业务角度来看,派奖跟通知两个动作可以异步化处理,这样核心链路就简化为本地事务。
异步化实践可以通过 MQ 来做。但是 MQ 的使用方式也有几种。
1. 反例: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 TransactionStatus transaction = transactionManager.getTransaction(new DefaultTransactionDefinition ());try { closeActivity(); transactionManager.commit(transaction); } catch (Exception e) { log.error("error, rollback." , e); transactionManager.rollback(transaction); throw new QuizActivityException (e); } messageManager.sendMessage(rewardMessage); messageManager.sendMessage(notificationMessage);
1.1. 问题: 本地事务与发消息无法保证一致性。
本地事务提交后,发消息失败 (补偿同样失败)的话,事务无法回滚。
会出现丢消息的情况,派奖业务跟通知业务会出现异常。
2. 弱一致性处理 对模型间的数据一致性要求不太强的场景。通常使用轻量级的处理方式,如最终一致性。这样既能保证实现逻辑的简单,也能在极大程度上保证业务的稳定。
2.1. 实现方式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 TransactionStatus transaction = transactionManager.getTransaction(new DefaultTransactionDefinition ());try { closeActivity(); messageManager.sendMessage(notificationMessage); messageManager.sendMessage(rewardMessage); transactionManager.commit(transaction); } catch (Exception e) { log.error("error, rollback." , e); transactionManager.rollback(transaction); throw new QuizActivityException (e); }
将发消息的动作放在本地事务内部。可一定程度上保证一致性。发消息失败的话,本地事务可回滚。
如消息消费者业务依赖事务结果,此消息的发送应放在最后。
2.2. 思考问题: 2.2.1. 异步操作要考虑执行时序问题,MQ 消费执行先于事务提交时如何处理?(使用@Transactional 注解同理) 由于 MQ 消息发送成功后,消息消费时机是不可控的,所以会出现消息消费先于事务提交的时序出现此场景时,消费者可采取以下方式处理:
消费者:收到消息后应进行验证,防止收到消息时可能存在的写入的数据还未提交成功的情况。在这种情况下应抛出异常利用 MQ 进行重试。 (业务上有可能存在正常的根据 id 查不到的情况,应该想办法在消费时进行判断事务中的数据是否已经提交)
代码示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @RMQLaneTransfer @StreamListener(value = StreamBinding.XXXXX) public void mqConsumer (@Payload Message message, @Header(required = false, name = CommonConstant.RMQ_CUSTOM_MAP) String rocketmqCustomMap , @Header(RocketMQHeaders.PREFIX + RocketMQHeaders.TOPIC) String topic, @Header(required = false, name = RocketMQHeaders.PREFIX + RocketMQHeaders.TAGS) String tag) { XxxDTO xxxDTO = convert(message); boolean checkResult = checkCommited(); if (!checkResult) { throw new QuizActivityException (QuizActivityExceptionEnum.UN_SUBMIT_TX_ERROR); } doSomething(); }
2.2.2. 如消费者异常重试,事务仍然没提交 由于公司内部采用的是 Spring Cloud Stream,默认只会重试两次, 重试间隔分别为 1S 跟 2S,时间较短,如遇 DB 负载较高场景 || 慢事务场景,可能会多次重试均失败。
如遇这种场景可以适当修改重试策略,根据调整以下参数即可
MQ Consumer Properties:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 **maxAttempts** If processing fails, the number of attempts to process the message (including the first). Set to 1 to disable retry. Default: 3. **backOffInitialInterval** The backoff initial interval on retry. Default: 1000. **backOffMaxInterval** The maximum backoff interval. Default: 10000. **backOffMultiplier** The backoff multiplier. Default: 2.0.
3. 强一致性处理:本地消息表 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 TransactionStatus transaction = transactionManager.getTransaction(new DefaultTransactionDefinition ());try { writeRewardMessageStatus(rewardMessage, false ); writeNotificationMessageStatus(notificationMessage, false ); closeActivity(); transactionManager.commit(transaction); } catch (Exception e) { log.error("error, rollback." , e); transactionManager.rollback(transaction); throw new QuizActivityException (e); } messageManager.sendMessage(rewardMessage); writeRewardMessageStatus(rewardMessage, true ); messageManager.sendMessage(notificationMessage); writeNotificationMessageStatus(notificationMessage, true );
一致性解决方案1 的变种实现。
在一致性解决方案 1 的基础上,将同步调用外部服务改为发 MQ 消息。
原因是:通常我们认为基础中间件的稳定性是优于二方服务的,性能也是优于二方服务。所以在主流程中直接发送 MQ 消息。
思路与优缺点均与一致性解决方案1 一致,不多赘述
4. 强一致性处理:事务消息 4.1. 适用场景: 对模型间的数据有强一致性要求的场景。上下游的数据需要保证同时成功或者同时失败。
举例:商城下单时需要创建订单,锁定库存,锁定优惠券,锁定积分。需要一并成功,或者一并失败。
4.2. 事务消息简介 RMQ 事务消息作为分布式事务的一种解决方案,应对复杂场景应该根据业务逻辑进行合理的应用。保证多个模型的数据一致性。 脚手架目前引入 RocketMQ 版本为 4.9.1-RELEASE。下面引用RocketMQ 4.X官方文档 的说明。
4.3. 事务消息生命周期
初始化:半事务消息被生产者构建并完成初始化,待发送到服务端的状态。
事务待提交:半事务消息被发送到服务端,和普通消息不同,并不会直接被服务端持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见。
消息回滚:第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止。
提交待消费:第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见,等待被消费者获取并消费。
消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,Apache RocketMQ 会对消息进行重试处理。具体信息,请参见消费重试 。
消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。 Apache RocketMQ 默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。
消息删除:Apache RocketMQ 按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。更多信息,请参见消息存储和清理机制 。
4.4. 代码示例(较长,参考官网即可) 4.5. 思考问题: 4.5.1. 事务回查时,本地事务未执行完成如何处理? 消息回查时,如事务还未提交,应继续保持 Unknown 的状态,不要 Commit 或者 Rollback。以保证事务性。
一般出现此场景的原因:事务执行慢 (如 DB 热点单行排队,DB 负载高等)。
解决方法:
4.6. 使用建议: 事务检查间隔不宜过小,频繁的事务回查会对 broker 造成较大的压力。 尽量保证本地事务的快速提交,减少 Unknown 状态的事务,减少回查次数,减小 broker 的压力。
RocketMQ 相关参数手册: https://rocketmq.apache.org/zh/docs/introduction/03limits/