专栏名称: Java基基
一个苦练基本功的 Java 公众号,所以取名 Java 基基
目录
相关文章推荐
上海证券报  ·  深圳,连续大动作 ·  7 小时前  
中国证券报  ·  德勤首席合伙人付建超:发挥“审计+税务+咨询 ... ·  9 小时前  
上海证券报  ·  北京放大招 ·  3 天前  
上海证券报  ·  用AI大模型炮制假消息?股市“黑嘴”新套路 ·  3 天前  
51好读  ›  专栏  ›  Java基基

SpringBoot + 事务钩子函数,打造高效支付系统!

Java基基  · 公众号  ·  · 2024-05-27 11:55

正文

👉 这是一个或许对你有用 的社群

🐱 一对一交流/面试小册/简历优化/求职解惑,欢迎加入 芋道快速开发平台 知识星球。 下面是星球提供的部分资料:

👉 这是一个或许对你有用的开源项目

国产 Star 破 10w+ 的开源项目,前端包括管理后台 + 微信小程序,后端支持单体和微服务架构。

功能涵盖 RBAC 权限、SaaS 多租户、数据权限、商城、支付、工作流、大屏报表、微信公众号等等功能:

  • Boot 仓库:https://gitee.com/zhijiantianya/ruoyi-vue-pro
  • Cloud 仓库:https://gitee.com/zhijiantianya/yudao-cloud
  • 视频教程:https://doc.iocoder.cn
【国内首批】支持 JDK 21 + SpringBoot 3.2.2、JDK 8 + Spring Boot 2.7.18 双版本

来源:juejin.cn/post/
6984574787511123999


前言

经过前面对Spring AOP、事务的总结,我们已经对它们有了一个比较感性的认知了。今天,我继续安利一个独门绝技: Spring 事务的钩子函数。 单纯的讲技术可能比较枯燥乏味。

接下来,我将以一个实际的案例来描述Spring事务钩子函数的正确使用姿势。

基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://github.com/YunaiV/ruoyi-vue-pro
  • 视频教程:https://doc.iocoder.cn/video/

一、案例背景

拿支付系统相关的业务来举例。在支付系统中,我们需要记录每个账户的资金流水(记录用户A因为哪个操作扣了钱,因为哪个操作加了钱),这样我们才能对每个账户的账做到心中有数,对于支付系统而言,资金流水的数据可谓是最重要的。

因此,为了防止支付系统的老大徇私舞弊,CTO提了一个流水存档的需求: 要求支付系统对每个账户的资金流水做一份存档,要求支付系统在写流水的时候,把流水相关的信息以消息的形式推送到kafka,由存档系统消费这个消息并落地到库里 (这个库只有存档系统拥有写权限)。

整个需求的流程如下所示:

整个需求的流程还是比较简单的,考虑到后续会有其他事业部也要进行数据存档操作,CTO建议支付系统团队内部开发一个二方库,这个二方库的主要功能就是发送消息到kafka中去。

基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://github.com/YunaiV/yudao-cloud
  • 视频教程:https://doc.iocoder.cn/video/

二、确定方案

既然要求开发一个二方库,因此,我们需要考虑如下几件事情:

1、技术栈使用的springboot,因此,这里最好以starter的方式提供

2、二方库需要发送消息给kafka,最好是二方库内部基于kafka生产者的api创建生产者,不要使用Spring自带的kafkaTemplate,因为集成方有可能已经使用了kafkaTemplate。不能与集成方造成冲突。

3、减少对接方的集成难度、学习成本,最好是提供一个简单实用的api,业务侧能简单上手。

4、发送消息这个操作需要支持事务,尽量不影响主业务

在上述的几件事情中,最需要注意的应该就是第4点: 发送消息这个操作需要支持事务,尽量不影响主业务。 这是什么意思呢?首先,尽量不影响主业务,这个最简单的方式就是使用异步机制。其次,需要支持事务是指: 假设我们的api是在事务方法内部调用的,那么我们需要保证事务提交后再执行这个api。 那么,我们的流水落地api应该要有这样的功能:

内部可以判断当前是否存在事务,如果存在事务,则需要等事务提交后再异步发送消息给kafka。如果不存在事务则直接异步发送消息给kafka。而且这样的判断逻辑得放在二方库内部才行。那现在摆在我们面前的问题就是: 我要如何判断当前是否存在事务,以及如何在事务提交后再触发我们自定义的逻辑呢?

三、TransactionSynchronizationManager显神威

这个类内部所有的变量、方法都是static修饰的,也就是说它其实是一个工具类。是一个事务同步器。下述是流水落地API的伪代码,这段代码就解决了我们上述提到的疑问:

private final ExecutorService executor = Executors.newSingleThreadExecutor();

public void sendLog() {
    // 判断当前是否存在事务
    if (!TransactionSynchronizationManager.isSynchronizationActive()) {
        // 无事务,异步发送消息给kafka
        
        executor.submit(() -> {
            // 发送消息给kafka
            try {
                // 发送消息给kafka
            } catch (Exception e) {
                // 记录异常信息,发邮件或者进入待处理列表,让开发人员感知异常
            }
        });
        return;
    }

    // 有事务,则添加一个事务同步器,并重写afterCompletion方法(此方法在事务提交后会做回调)
    TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {

        @Override
        public void afterCompletion(int status) {
            if (status == TransactionSynchronization.STATUS_COMMITTED) {
                // 事务提交后,再异步发送消息给kafka
                executor.submit(() -> {
                    try {
                     // 发送消息给kafka
                    } catch (Exception e) {
                     // 记录异常信息,发邮件或者进入待处理列表,让开发人员感知异常
                    }
                });
            }
        }

    });

}

代码比较简单,其主要是 TransactionSynchronizationManager 的使用。

3.1、判断是否存在事务?TransactionSynchronizationManager.isSynchronizationActive() 方法显神威

我们先看下这个方法的源码:

// TransactionSynchronizationManager.java类内部的部分代码

private static final ThreadLocal> synchronizations =
   new NamedThreadLocal<>("Transaction synchronizations");

public static boolean isSynchronizationActive() {
    return (synchronizations.get() != null);
}

很明显, synchronizations 是一个线程变量(ThreadLocal)。那它是在什么时候set进去的呢?

这里的话,可以参考下这个方法: org.springframework.transaction.support.TransactionSynchronizationManager#initSynchronization ,其源码如下所示:

/**
  * Activate transaction synchronization for the current thread.
  * Called by a transaction manager on transaction begin.
  * @throws IllegalStateException if synchronization is already active
  */

public static void initSynchronization() throws IllegalStateException {
    if (isSynchronizationActive()) {
        throw new IllegalStateException("Cannot activate transaction synchronization - already active");
    }
    logger.trace("Initializing transaction synchronization");
    synchronizations.set(new LinkedHashSet<>());
}

由源码中的注释也可以知道,它是在事务管理器开启事务时调用的。换句话说,只要我们的程序执行到带有事务特性的方法时,就会在线程变量中放入一个LinkedHashSet,用来标识当前存在事务。只要 isSynchronizationActive 返回true,则代表当前有事务。

因此,结合这两个方法我们是指能解决我们最开始提出的疑问: 要如何判断当前是否存在事务

3.2、如何在事务提交后触发自定义逻辑?TransactionSynchronizationManager.registerSynchronization()方法显神威

我们来看下这个方法的源代码:

/**
  * Register a new transaction synchronization for the current thread.
  * Typically called by resource management code.
  * 

Note that synchronizations can implement the
  * {@link org.springframework.core.Ordered} interface.
  * They will be executed in an order according to their order value (if any).
  * @param synchronization the synchronization object to register
  * @throws IllegalStateException if transaction synchronization is not active
  * @see org.springframework.core.Ordered
  */


public static void registerSynchronization(TransactionSynchronization synchronization)
    throws IllegalStateException 
{

    Assert.notNull(synchronization, "TransactionSynchronization must not be null");
    if (!isSynchronizationActive()) {
        throw new IllegalStateException("Transaction synchronization is not active");
    }
    synchronizations.get().add(synchronization);
}

这里又使用到了 synchronizations 线程变量,我们在判断是否存在事务时,就是判断这个线程变量内部是否有值。那我们现在想在事务提交后触发自定义逻辑和这个有什么关系呢?

我们在上面构建流水落地api的伪代码中有向 synchronizations 内部添加了一个 TransactionSynchronizationAdapter ,内部并重写了 afterCompletion 方法,其代码如下所示:

TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {

    @Override
    public void afterCompletion(int status) {
        if (status == TransactionSynchronization.STATUS_COMMITTED) {
            // 事务提交后,再异步发送消息给kafka
            executor.submit(() -> {
                    try {
                     // 发送消息给kafka
                    } catch






请到「今天看啥」查看全文


推荐文章
上海证券报  ·  深圳,连续大动作
7 小时前
上海证券报  ·  北京放大招
3 天前
新材料在线  ·  120页ppt看懂碳材料家族
7 年前
解螺旋  ·  数学界的四大巨头
7 年前
教女人变苗条  ·  八个调节内分泌的方法,让女人越活越嫩!
7 年前