当前位置:首页 > 科技  > 软件

事务钩子函数,打造高效支付系统

来源: 责编: 时间:2024-05-09 09:23:03 223观看
导读今天,我继续安利一个独门绝技:Spring 事务的钩子函数。单纯的讲技术可能比较枯燥乏味。接下来,我将以一个实际的案例来描述Spring事务钩子函数的正确使用姿势。一、案例背景拿支付系统相关的业务来举例。在支付系统中,我

今天,我继续安利一个独门绝技:Spring 事务的钩子函数。单纯的讲技术可能比较枯燥乏味。接下来,我将以一个实际的案例来描述Spring事务钩子函数的正确使用姿势。yT228资讯网——每日最新资讯28at.com

一、案例背景

拿支付系统相关的业务来举例。在支付系统中,我们需要记录每个账户的资金流水(记录用户A因为哪个操作扣了钱,因为哪个操作加了钱),这样我们才能对每个账户的账做到心中有数,对于支付系统而言,资金流水的数据可谓是最重要的。因此,为了防止支付系统的老大徇私舞弊,CTO提了一个流水存档的需求:要求支付系统对每个账户的资金流水做一份存档,要求支付系统在写流水的时候,把流水相关的信息以消息的形式推送到kafka,由存档系统消费这个消息并落地到库里(这个库只有存档系统拥有写权限)。整个需求的流程如下所示:yT228资讯网——每日最新资讯28at.com

图片图片yT228资讯网——每日最新资讯28at.com

整个需求的流程还是比较简单的,考虑到后续会有其他事业部也要进行数据存档操作,CTO建议支付系统团队内部开发一个二方库,这个二方库的主要功能就是发送消息到kafka中去。yT228资讯网——每日最新资讯28at.com

二、确定方案

既然要求开发一个二方库,因此,我们需要考虑如下几件事情:yT228资讯网——每日最新资讯28at.com

1、技术栈使用的springboot,因此,这里最好以starter的方式提供yT228资讯网——每日最新资讯28at.com

2、二方库需要发送消息给kafka,最好是二方库内部基于kafka生产者的api创建生产者,不要使用Spring自带的kafkaTemplate,因为集成方有可能已经使用了kafkaTemplate。不能与集成方造成冲突。yT228资讯网——每日最新资讯28at.com

3、减少对接方的集成难度、学习成本,最好是提供一个简单实用的api,业务侧能简单上手。yT228资讯网——每日最新资讯28at.com

4、发送消息这个操作需要支持事务,尽量不影响主业务yT228资讯网——每日最新资讯28at.com

在上述的几件事情中,最需要注意的应该就是第4点:发送消息这个操作需要支持事务,尽量不影响主业务。这是什么意思呢?首先,尽量不影响主业务,这个最简单的方式就是使用异步机制。其次,需要支持事务是指:假设我们的api是在事务方法内部调用的,那么我们需要保证事务提交后再执行这个api。那么,我们的流水落地api应该要有这样的功能:yT228资讯网——每日最新资讯28at.com

图片图片yT228资讯网——每日最新资讯28at.com

内部可以判断当前是否存在事务,如果存在事务,则需要等事务提交后再异步发送消息给kafka。如果不存在事务则直接异步发送消息给kafka。而且这样的判断逻辑得放在二方库内部才行。那现在摆在我们面前的问题就是:我要如何判断当前是否存在事务,以及如何在事务提交后再触发我们自定义的逻辑呢?yT228资讯网——每日最新资讯28at.com

三、TransactionSynchronizationManager显神威

这个类内部所有的变量、方法都是static修饰的,也就是说它其实是一个工具类。是一个事务同步器。下述是流水落地API的伪代码,这段代码就解决了我们上述提到的疑问:yT228资讯网——每日最新资讯28at.com

private final ExecutorService executor = Executors.newSingleThreadExecutor();public void sendLog() {    // 判断当前是否存在事务    if (!TransactionSynchronizationManager.isSynchronizationActive()) {        // 无事务,异步发送消息给kafka                executor.submit(() -> {            // 发送消息给kafka            try {                // 发送消息给kafka            } catch (Exception e) {                // 记录异常信息,发邮件或者进入待处理列表,让开发人员感知异常            }        });        return;    }    // 有事务,则添加一个事务同步器,并重写afterCompletion方法(此方法在事务提交后会做回调)    TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {        @Override        public void afterCompletion(int status) {            if (status == TransactionSynchronization.STATUS_COMMITTED) {                // 事务提交后,再异步发送消息给kafka                executor.submit(() -> {                    try {                     // 发送消息给kafka                    } catch (Exception e) {                     // 记录异常信息,发邮件或者进入待处理列表,让开发人员感知异常                    }                });            }        }    });}

代码比较简单,其主要是TransactionSynchronizationManager的使用。yT228资讯网——每日最新资讯28at.com

3.1、判断是否存在事务?TransactionSynchronizationManager.isSynchronizationActive() 方法显神威

我们先看下这个方法的源码:yT228资讯网——每日最新资讯28at.com

// TransactionSynchronizationManager.java类内部的部分代码private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =   new NamedThreadLocal<>("Transaction synchronizations");public static boolean isSynchronizationActive() {    return (synchronizations.get() != null);}

很明显,synchronizations是一个线程变量(ThreadLocal)。那它是在什么时候set进去的呢?这里的话,可以参考下这个方法:org.springframework.transaction.support.TransactionSynchronizationManager#initSynchronization,其源码如下所示:yT228资讯网——每日最新资讯28at.com

/**  * Activate transaction synchronization for the current thread.  * Called by a transaction manager on transaction begin.  * @throws IllegalStateException if synchronization is already active  */public static void initSynchronization() throws IllegalStateException {    if (isSynchronizationActive()) {        throw new IllegalStateException("Cannot activate transaction synchronization - already active");    }    logger.trace("Initializing transaction synchronization");    synchronizations.set(new LinkedHashSet<>());}

由源码中的注释也可以知道,它是在事务管理器开启事务时调用的。换句话说,只要我们的程序执行到带有事务特性的方法时,就会在线程变量中放入一个LinkedHashSet,用来标识当前存在事务。只要isSynchronizationActive返回true,则代表当前有事务。因此,结合这两个方法我们是指能解决我们最开始提出的疑问:**要如何判断当前是否存在事务**yT228资讯网——每日最新资讯28at.com

3.2、如何在事务提交后触发自定义逻辑?

TransactionSynchronizationManager.registerSynchronization()方法显神威

我们来看下这个方法的源代码:yT228资讯网——每日最新资讯28at.com

/**  * Register a new transaction synchronization for the current thread.  * Typically called by resource management code.  * <p>Note that synchronizations can implement the  * {@link org.springframework.core.Ordered} interface.  * They will be executed in an order according to their order value (if any).  * @param synchronization the synchronization object to register  * @throws IllegalStateException if transaction synchronization is not active  * @see org.springframework.core.Ordered  */public static void registerSynchronization(TransactionSynchronization synchronization)    throws IllegalStateException {    Assert.notNull(synchronization, "TransactionSynchronization must not be null");    if (!isSynchronizationActive()) {        throw new IllegalStateException("Transaction synchronization is not active");    }    synchronizations.get().add(synchronization);}

这里又使用到了synchronizations线程变量,我们在判断是否存在事务时,就是判断这个线程变量内部是否有值。那我们现在想在事务提交后触发自定义逻辑和这个有什么关系呢?我们在上面构建流水落地api的伪代码中有向synchronizations内部添加了一个TransactionSynchronizationAdapter,内部并重写了afterCompletion方法,其代码如下所示:yT228资讯网——每日最新资讯28at.com

TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {    @Override    public void afterCompletion(int status) {        if (status == TransactionSynchronization.STATUS_COMMITTED) {            // 事务提交后,再异步发送消息给kafka            executor.submit(() -> {                    try {                     // 发送消息给kafka                    } catch (Exception e) {                     // 记录异常信息,发邮件或者进入待处理列表,让开发人员感知异常                    }            });        }    }});

我们结合registerSynchronization的源码来看,其实这段代码主要就是向线程变量内部的LinkedHashSet添加了一个对象而已,但就是这么一个操作,让Spring在事务执行的过程中变得“有事情可做”。这是什么意思呢?是因为Spring在执行事务方法时,对于操作事务的每一个阶段都有一个回调操作,比如:trigger系列的回调yT228资讯网——每日最新资讯28at.com

图片图片yT228资讯网——每日最新资讯28at.com

invoke系列的回调yT228资讯网——每日最新资讯28at.com

图片图片yT228资讯网——每日最新资讯28at.com

而我们现在的需求就是在事务提交后触发自定义的函数,那就是在invokeAfterCommit和invokeAfterCompletion这两个方法来选了。首先,这两个方法都会拿到所有TransactionSynchronization的集合(其中会包括我们上述添加的TransactionSynchronizationAdapter)。但是要注意一点:invokeAfterCommit只能拿到集合,invokeAfterCompletion除了集合还有一个int类型的参数,而这个int类型的参数其实是当前事务的一种状态。也就是说,如果我们重写了invokeAfterCompletion方法,我们除了能拿到集合外,还能拿到当前事务的状态。因此,此时我们可以根据这个状态来做不同的事情,比如:可以在事务提交时做自定义处理,也可以在事务回滚时做自定义处理等等。yT228资讯网——每日最新资讯28at.com

四、总结

上面有说到,我们判断当前是否存在事务、添加钩子函数都是依赖线程变量的。因此,我们在使用过程中,一定要避免切换线程。否则会出现不生效的情况。yT228资讯网——每日最新资讯28at.com

本文链接:http://www.28at.com/showinfo-26-87482-0.html事务钩子函数,打造高效支付系统

声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com

上一篇: WPF UI更新技巧:掌握EventHandler的基础与Dispatcher的高级应用

下一篇: 哈希表哪家强?几大编程语言吵起来了!

标签:
  • 热门焦点
  • K60 Pro官方停产 第三方瞬间涨价

    虽然没有官方宣布,但Redmi的一些高管也已经透露了,Redmi K60 Pro已经停产且不会补货,这一切都是为了即将到来的K60 Ultra铺路,属于厂家的正常操作。但有意思的是该机在停产之后
  • 小米官宣:2023年上半年出货量中国第一!

    今日早间,小米电视官方微博带来消息,称2023年小米电视上半年出货量达到了中国第一,同时还表示小米电视的巨屏风暴即将开始。“公布一个好消息2023年#小米电视上半年出货量中国
  • 中兴AX5400Pro+上手体验:再升级 双2.5G网口+USB 3.0这次全都有

    2021年11月的时候,中兴先后发布了两款路由器产品,中兴AX5400和中兴AX5400 Pro,从产品命名上就不难看出这是隶属于同一系列的,但在外观设计上这两款产品可以说是完全没一点关系
  • 三言两语说透设计模式的艺术-简单工厂模式

    一、写在前面工厂模式是最常见的一种创建型设计模式,通常说的工厂模式指的是工厂方法模式,是使用频率最高的工厂模式。简单工厂模式又称为静态工厂方法模式,不属于GoF 23种设计
  • 如何正确使用:Has和:Nth-Last-Child

    我们可以用CSS检查,以了解一组元素的数量是否小于或等于一个数字。例如,一个拥有三个或更多子项的grid。你可能会想,为什么需要这样做呢?在某些情况下,一个组件或一个布局可能会
  • 三分钟白话RocketMQ系列—— 如何发送消息

    我们知道RocketMQ主要分为消息 生产、存储(消息堆积)、消费 三大块领域。那接下来,我们白话一下,RocketMQ是如何发送消息的,揭秘消息生产全过程。注意,如果白话中不小心提到相关代
  • 电视息屏休眠仍有网络上传 爱奇艺被质疑“薅消费者羊毛”

    记者丨宁晓敏 见习生丨汗青出品丨鳌头财经(theSankei) 前不久,爱奇艺发布了一份亮眼的一季报,不仅营收和会员营收创造历史最佳表现,其运营利润也连续6个月实现增长。自去年年初
  • ESG的面子与里子

    来源 | 光子星球撰文 | 吴坤谚编辑 | 吴先之三伏大幕拉起,各地高温预警不绝,但处于厄尔尼诺大&ldquo;烤&rdquo;之下的除了众生,还有各大企业发布的ESG报告。ESG是&ldquo;环境保
  • 华为将推出盘古数字人大模型 可帮助用户12小时完成数字人生成

    在今日举行的2023年华为云数字文娱AI创新峰会上,华为云全球Marketing与销售服务总裁石冀琳表示,华为云将在后续推出盘古数字人大模型,可帮助用户12小
Top