我先把问题抛出来,大家就明白本文目的在于解决什么样的业务痛点了:
public void removeAuthorityModuleSeq (Integer authorityModuleId, IAuthorityService iAuthorityService, IRoleAuthorityService iRoleAuthorityService) { //1.查询出当前资源模块下所有资源,查询出来后进行删除 deleteAuthoritiesOfCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService); //2.查询出当前资源模块下所有子模块,递归查询,当删除完所有子模块下的资源后,再删除所有子模块,最终删除当前资源模块 deleteSonAuthorityModuleUnderCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService); //3.删除当前资源模块 removeById(authorityModuleId); }
如果我希望将步骤1和步骤2并行执行,然后确保步骤1和步骤2执行成功后,再执行步骤3,等到步骤3执行完毕后,再提交全部事务,这个需求该如何实现呢?
基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
项目地址:https://github.com/YunaiV/ruoyi-vue-pro
视频教程:https://doc.iocoder.cn/video/
上面需求第一点是: 如何让任务异步并行执行,如何实现二元依赖呢?
说到异步执行,很多小伙伴首先想到Spring中提供的@Async注解,但是Spring提供的异步执行任务能力并不足以解决我们当前的需求。
@Async注解原理简单来说,就是扫描IOC中的bean,给方法上标注有@Async注解的bean进行代理,代理的核心是添加一个
MethodInterceptor
即
AsyncExecutionInterceptor
,该方法拦截器负责将方法真正的执行包装为任务,放入线程池中执行。
下面我们先使用
CompletableFuture
来完成我们第一步需求:
public void removeAuthorityModuleSeq (Integer authorityModuleId, IAuthorityService iAuthorityService, IRoleAuthorityService iRoleAuthorityService) { CompletableFuture.runAsync(()->{ //两个并行执行的任务 CompletableFuture future1 = CompletableFuture.runAsync(() -> deleteAuthoritiesOfCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService),executor); CompletableFuture future2 = CompletableFuture.runAsync(() -> deleteSonAuthorityModuleUnderCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService), executor); //等待两个并行任务执行完后,再执行最后一个步骤 CompletableFuture.allOf(future1,future2).thenRun(()->removeById(authorityModuleId)); },executor); }
基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
项目地址:https://github.com/YunaiV/yudao-cloud
视频教程:https://doc.iocoder.cn/video/
我们已经完成了任务的异步执行化,那么又如何确保多线程环境下的事务一致性问题呢?
public void removeAuthorityModuleSeq (Integer authorityModuleId, IAuthorityService iAuthorityService, IRoleAuthorityService iRoleAuthorityService) { CompletableFuture.runAsync(()->{ //两个并行执行的任务 CompletableFuture future1 = CompletableFuture.runAsync(() -> deleteAuthoritiesOfCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService),executor); CompletableFuture future2 = CompletableFuture.runAsync(() -> deleteSonAuthorityModuleUnderCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService), executor); //等待两个并行任务执行完后,再执行最后一个步骤 CompletableFuture.allOf(future1,future2).thenRun(()->removeById(authorityModuleId)); },executor); }
在Spring环境下说到事务控制,大家第一反应就想到使用
@Transactional
注解解决问题,但是这里显然行不通,为什么行不通呢?
我还是简单的对Spring事务实现原理进行一番概括:
事务管理大体分为三个流程: 事务创建 ,事务执行,事务结束
事务创建涉及到一些属性的配置,如:
由于涉及属性颇多,并且后期还有可能进行扩展,因此必须通过一个类来封装这些属性,在Spring中对应
TransactionDefinition
。
有了事务相关属性定义后,我们就可以利用
TransactionDefinition
来创建一个事务了,在Spring中局部事务由
PlatformTransactionManager
负责管理,创建事务也是由
PlatformTransactionManager
负责提供:
TransactionStatus getTransaction (@Nullable TransactionDefinition definition) throws TransactionException ;
如果我们希望追踪事务的状态,例如: 事务已完成,事务回滚等,那么就需要一个事务状态类贯穿当前事务的执行流程,在Spring中由
TransactionStatus
负责完成。
对于常见的数据源而言,通常需要记录的事务状态有如下几点:
当前事务是否需要回滚(通过标记来判断,因此我也可以在业务流程中手动设置标记为true,来让事务在没有发生异常的情况下进行回滚)
事务的执行过程就是具体业务代码的执行流程,这里就不多说了。
事务的结束分为两种情况: 需要进行事务回滚或者事务正常提交,如果是事务回滚,还需要判断
TransactionStatus
中的savePoint是否被设置了。
Spring中常见的事务实现方式有两种: 编程式和声明式。
编程式事务使用是本文重点,因此这里按下不表,我们先来复习一下声明式事务的使用
声明式事务就是使用我们常见的
@Transactional
注解完成的,声明式事务优点就在于让事务代码与业务代码解耦,通过Spring中提供的声明式事务使用,我们也可以发觉我们只需要编写业务代码即可,而事务的管理基本不需要我们操心,Spring就像使用了魔法一样,帮我们自动完成了。
之所以那么神奇,本质还是依靠Spring框架提供的Bean生命周期相关回调接口和AOP结合完成的,简述如下:
通过自动代理创建器依次尝试为每个放入容器中的bean尝试进行代理
尝试进行代理的过程对于事务管理来说,就是利用事务管理涉及到的增强器advisor,即
TransactionAttributeSourceAdvisor
判断当前增强器是否能够应用与当前bean上,怎么判断呢? —> advisor内部的pointCut喽 !
如果能够应用,那么好,为当前bean创建代理对象返回,并且往代理对象内部添加一个
TransactionInterceptor
拦截器。
此时我们再从容器中获取,拿到的就是代理对象了,当我们调用代理对象的方法时,首先要经过代理对象内部拦截器链的处理,处理完后,最终才会调用被代理对象的方法。(这里其实就是责任链模式的应用)
对于被事务增强器
TransactionAttributeSourceAdvisor
代理的bean而言,代理对象内部会存在一个
TransactionInterceptor
,该拦截器内部构造了一个事务执行的模板流程:
protected Object invokeWithinTransaction (Method method, @Nullable Class> targetClass, final InvocationCallback invocation) throws Throwable { //TransactionAttributeSource内部保存着当前类某个方法对应的TransactionAttribute---事务属性源 //可以看做是一个存放TransactionAttribute与method方法映射的池子 TransactionAttributeSource tas = getTransactionAttributeSource(); //获取当前事务方法对应的TransactionAttribute final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null ); //定位TransactionManager final TransactionManager tm = determineTransactionManager(txAttr); ..... //类型转换为局部事务管理器 PlatformTransactionManager ptm = asPlatformTransactionManager(tm); final String joinpointIdentification = methodIdentification(method, targetClass, txAttr); if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) { //TransactionManager根据TransactionAttribute创建事务后返回 //TransactionInfo封装了当前事务的信息--包括TransactionStatus TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification); Object retVal; try { //继续执行过滤器链---过滤链最终会调用目标方法 //因此可以理解为这里是调用目标方法 retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { //目标方法抛出异常则进行判断是否需要回滚 completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { //清除当前事务信息 cleanupTransactionInfo(txInfo); } ... //正常返回,那么就正常提交事务呗(当然还是需要判断TransactionStatus状态先) commitTransactionAfterReturning(txInfo); return retVal; } ...
还记得本文一开始提出的业务需求吗?
不清楚,可以回看一下,在上文,我们已经解决了任务异步并行执行的难题,下面我们需要解决的就是如何确保Spring在多线程环境下也能保持事务一致性。
通过上文对Spring事务基础和声明式事务的原理回顾,相信大家也发现了,声明式事务并不能解决我们当前的问题,那么就只能求助于编程式事务了。
那么编程式事务是什么样子呢?
其实上面
TransactionInterceptor
给出的那套模板流程,就是编程式事务使用的模范案例,我们可以简化上面的模板流程,简单使用如下:
public class TransactionMain { public static void main (String[] args) throws ClassNotFoundException, SQLException { test(); } private static void test () { DataSource dataSource = getDS(); JdbcTransactionManager jtm = new JdbcTransactionManager(dataSource); //JdbcTransactionManager根据TransactionDefinition信息来进行一些连接属性的设置 //包括隔离级别和传播行为等 DefaultTransactionDefinition transactionDef = new DefaultTransactionDefinition(); //开启一个新事务---此时autocommit已经被设置为了false,并且当前没有事务,这里创建的是一个新事务 TransactionStatus ts = jtm.getTransaction(transactionDef); //进行业务逻辑操作 try { update(dataSource); jtm.commit(ts); }catch (Exception e){ jtm.rollback(ts); System.out.println("发生异常,我已回滚" ); } } private static void update (DataSource dataSource) throws Exception { JdbcTemplate jt = new JdbcTemplate(); jt.setDataSource(dataSource); jt.update("UPDATE Department SET Dname=\"大忽悠\" WHERE id=6" ); throw new Exception("我是来捣乱的" ); } }
我们明白了编程式事务的使用,相信大家也都知道问题如何解决了,下面我给出一份看似正确的解决方案:
package com.user.util;import lombok.RequiredArgsConstructor;import org.springframework.jdbc.datasource.DataSourceTransactionManager;import org.springframework.stereotype.Component;import org.springframework.transaction.TransactionStatus;import org.springframework.transaction.support.DefaultTransactionDefinition;import javax.sql.DataSource;import java.util.ArrayList;import java.util.List;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;import java.util.concurrent.Executor;import java.util.concurrent.atomic.AtomicBoolean;/** * 多线程事务一致性管理 * 声明式事务管理无法完成,此时我们只能采用初期的编程式事务管理才行 * @author
大忽悠 * @create 2022/10/19 21:34 */ @Component @RequiredArgsConstructor public class MultiplyThreadTransactionManager { /** * 如果是多数据源的情况下,需要指定具体是哪一个数据源 */ private final DataSource dataSource; /** * 执行的是无返回值的任务 * @param tasks 异步执行的任务列表 * @param executor 异步执行任务需要用到的线程池,考虑到线程池需要隔离,这里强制要求传 */ public void runAsyncButWaitUntilAllDown (List tasks, Executor executor) { if (executor==null ){ throw new IllegalArgumentException("线程池不能为空" ); } DataSourceTransactionManager transactionManager = getTransactionManager(); //是否发生了异常 AtomicBoolean ex=new AtomicBoolean(); List taskFutureList=new ArrayList<>(tasks.size()); List transactionStatusList=new ArrayList<>(tasks.size()); tasks.forEach(task->{ taskFutureList.add(CompletableFuture.runAsync( () -> { try { //1.开启新事务 transactionStatusList.add(openNewTransaction(transactionManager)); //2.异步任务执行 task.run(); }catch (Throwable throwable){ //打印异常 throwable.printStackTrace(); //其中某个异步任务执行出现了异常,进行标记 ex.set(Boolean.TRUE); //其他任务还没执行的不需要执行了 taskFutureList.forEach(completableFuture -> completableFuture.cancel(true )); } } , executor) ); }); try { //阻塞直到所有任务全部执行结束---如果有任务被取消,这里会抛出异常滴,需要捕获 CompletableFuture.allOf(taskFutureList.toArray(new CompletableFuture[]{})).get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } //发生了异常则进行回滚操作,否则提交 if (ex.get()){ System.out.println("发生异常,全部事务回滚" ); transactionStatusList.forEach(transactionManager::rollback); }else { System.out.println("全部事务正常提交" ); transactionStatusList.forEach(transactionManager::commit); } } private TransactionStatus openNewTransaction (DataSourceTransactionManager transactionManager) { //JdbcTransactionManager根据TransactionDefinition信息来进行一些连接属性的设置 //包括隔离级别和传播行为等 DefaultTransactionDefinition transactionDef = new DefaultTransactionDefinition(); //开启一个新事务---此时autocommit已经被设置为了false,并且当前没有事务,这里创建的是一个新事务 return transactionManager.getTransaction(transactionDef); } private DataSourceTransactionManager getTransactionManager () { return new DataSourceTransactionManager(dataSource); } }
大家思考上面的代码存在问题吗?
测试:
public void test () { List tasks=new ArrayList<>(); tasks.add(()->{ userMapper.deleteById(26 ); }); tasks.add(()->{ signMapper.deleteById(10 ); }); multiplyThreadTransactionManager.runAsyncButWaitUntilAllDown(tasks, Executors.newCachedThreadPool()); }
任务正常都执行完毕,事务进行提交,但是会抛出异常,导致事务回滚:
抓关键字:
No value for key [HikariDataSource (HikariPool-1 )] bound to thread [main] 解释: 无法在当前线程绑定的threadLocal中寻找到HikariDataSource作为key,对应关联的资源对象ConnectionHolder
这里需要再次回顾一下Spring事务实现的小细节:
一次事务的完成通常都是默认在当前线程内完成的,又因为一次事务的执行过程中,涉及到对当前数据库连接Connection的操作,因此为了避免将Connection在事务执行过程中来回传递,我们可以将Connextion绑定到当前事务执行线程对应的ThreadLocalMap内部,顺便还可以将一些其他属性也放入其中进行保存,在Spring中,负责保存这些ThreadLocal属性的实现类由
TransactionSynchronizationManager
承担。
TransactionSynchronizationManager
类内部默认提供了下面六个ThreadLocal属性,分别保存当前线程对应的不同事务资源:
//保存当前事务关联的资源--默认只会在新建事务的时候保存当前获取到的DataSource和当前事务对应Connection的映射关系--当然这里Connection被包装为了ConnectionHolder private static final ThreadLocal> resources = new NamedThreadLocal<>("Transactional resources" ); //事务监听者--在事务执行到某个阶段的过程中,会去回调监听者对应的回调接口(典型观察者模式的应用)---默认为空集合 private static final ThreadLocal> synchronizations = new NamedThreadLocal<>("Transaction synchronizations" ); //见名知意: 存放当前事务名字 private static final ThreadLocal currentTransactionName = new NamedThreadLocal<>("Current transaction name" ); //见名知意: 存放当前事务是否是只读事务 private static final ThreadLocal currentTransactionReadOnly = new NamedThreadLocal<>("Current transaction read-only status" ); //见名知意: 存放当前事务的隔离级别 private static final ThreadLocal currentTransactionIsolationLevel = new NamedThreadLocal<>("Current transaction isolation level" ); //见名知意: 存放当前事务是否处于激活状态 private static final ThreadLocal actualTransactionActive = new NamedThreadLocal<>("Actual transaction active" );
那么上面抛出的异常的原因也就很清楚了,无法在main线程找到当前事务对应的资源,原因如下:
开启新事务时,事务相关资源都被绑定到了
thread-cache-pool-1
线程对应的threadLocalMap内部,而当执行事务提交代码时,commit内部需要从
TransactionSynchronizationManager
中获取当前事务的资源,显然我们无法从main线程对应的
threadLocalMap
中获取到对应的事务资源,这也就是异常抛出的原因。
这里给出一个我首先想到的简单粗暴的方法—
CopyTransactionResource
—将事务资源在两个线程间来回复制
这里给出解决后问题后的代码示例:
package com.user.util;import lombok.Builder;import lombok.RequiredArgsConstructor;import org.springframework.jdbc.datasource.DataSourceTransactionManager;import org.springframework.stereotype.Component;import org.springframework.transaction.TransactionStatus;import org.springframework.transaction.support.DefaultTransactionDefinition;import org.springframework.transaction.support.TransactionSynchronization;import org.springframework.transaction.support.TransactionSynchronizationManager;import javax.sql.DataSource;import java.util.*;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;import java.util.concurrent.Executor;import java.util.concurrent.atomic.AtomicBoolean;/** * 多线程事务一致性管理 * 声明式事务管理无法完成,此时我们只能采用初期的编程式事务管理才行 * @author 大忽悠 * @create 2022/10/19 21:34 */ @Component @RequiredArgsConstructor public class MultiplyThreadTransactionManager { /** * 如果是多数据源的情况下,需要指定具体是哪一个数据源 */ private final DataSource dataSource; /** * 执行的是无返回值的任务 * @param tasks 异步执行的任务列表 * @param executor 异步执行任务需要用到的线程池,考虑到线程池需要隔离,这里强制要求传