(给
ImportNew 加星标,提高Java技能)
1、引言
Spring 的 spring-tx 模块提供了对事务管理支持,使用 S pring 事务可以让我们从复杂的事务处理中得到解脱,无需要去处理获得连接、关闭连接、事务提交和回滚等这些操作。 S pring 事务有编程式事务和声明式事务两种实现方式。编程式事务是通过编写代码来管理事务的提交、回滚、以及事务的边界。这意味着开发者需要在代码中显式地调用事务的开始、提交和回滚。声明式事务是通过配置来管理事务,您可以使用注解或 XML 配置来定义事务的边界和属性,而无需显式编写事务管理的代码。下面我们逐步分析 S pring 源代码,理解 S pring 事务的实现原理。
2、编程式事务
2.1 使用示例
private PlatformTransactionManager transactionManager;
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
TransactionStatus status = transactionManager.getTransaction(def);
try {
transactionManager.commit(status);
} catch (Exception ex) {
transactionManager.rollback(status);
}
在使用编程式事务处理的过程中,利用 DefaultTransactionDefinition 对象来持有事务处理属性。同时,在创建事务的过程中得到一个 TransactionStatus 对象,然后通过直接调用 transactionManager 对象 的 commit() 和 rollback()方法 来完成事务处理。 2.2 PlatformTransactionManager 核心接口
PlatformTransactionManager 是 Spring 事务管理的核心接口,通过 PlatformTransactionManager 接口设计了一系列与事务处理息息相关的接口方法,如 getTransaction()、commit()、rollback() 这些和事务处理相关的统一接口。对于这些接口的实现,很大一部分是由 AbstractTransactionManager 抽象类来完成的。
AbstractPlatformManager 封装了 Spring 事务处理中通用的处理部分,比如事务的创建、提交、回滚,事务状态和信息的处理,与线程的绑定等,有了这些通用处理的支持,对于具体的事务管理器而言,它们只需要处理和具体数据源相关的组件设置就可以了,比如在 DataSourceTransactionManager 中,就只需要配置好和 DataSource 事务处理相关的接口以及相关的设置。
2.3 事务的创建
PlatformTransactionManager 的 getTransaction() 方法,封装了底层事务的创建,并生成一个 TransactionStatus 对象。
AbstractPlatformTransactionManager提供了创建事务的模板,这个模板会被具体的事务处理器所使用。从下面的代码中可以看到: AbstractPlatformTransactionManager 会根据事务属性配置和当前进程绑定的事务信息,对事务是否需要创建,怎样创建 进行一些通用的处理,然后把事务创建的底层工作交给具体的事务处理器完成,如: DataSourceTransactionManager、HibernateTransactionManager。
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults() );
Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
if (isExistingTransaction(transaction)) {
return handleExistingTransaction(def , transaction , debugEnabled );
}
if (def .getTimeout () < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout" , def .getTimeout () );
}
if (def .getPropagationBehavior () == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'" );
}
else if (def .getPropagationBehavior () == TransactionDefinition.PROPAGATION_REQUIRED ||
def .getPropagationBehavior () == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def .getPropagationBehavior () == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + def .getName () + "]: " + def );
}
try {
return startTransaction(def , transaction , false , debugEnabled , suspendedResources );
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
else {
if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + def);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(def, null, true , newSynchronization, debugEnabled, null);
}
}
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
boolean newSynchronization = this.getTransactionSynchronization() != SYNCHRONIZATION_NEVER;
DefaultTransactionStatus status = this.newTransactionStatus(definition, transaction, true , newSynchronization, debugEnabled, suspendedResources);
this.doBegin(transaction, definition);
this.prepareSynchronization(status, definition);
return status;
}
事务创建的结果是生成一个 TransactionStatus 对象,通过这个对象来保存事务处理需要的基本信息,TransactionStatus 的创建过程如下:
protected DefaultTransactionStatus newTransactionStatus(TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction, boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {
boolean actualNewSynchronization = newSynchronization && !TransactionSynchronizationManager.isSynchronizationActive();
return new DefaultTransactionStatus(transaction, newTransaction, actualNewSynchronization, definition.isReadOnly(), debug, suspendedResources);
}
以上是创建一个全新事务的过程,还有另一种情况是:在创建当前事务时,线程中已经有事务存在了。这种情况会涉及事务传播行为的处理。spring中七种事务传播行为如下:
如果检测到已存在事务,handleExistingTransaction() 方法将根据不同的事务传播行为类型执行相应逻辑。
PROPAGATION_NEVER
即当前方法需要在非事务的环境下执行,如果有事务存在,那么抛出异常。
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'" );
}
PROPAGATION_NOT_SUPPORTED
与前者的区别在于,如果有事务存在,那么将事务挂起,而不是抛出异常。
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(
definition, null , false , newSynchronization, debugEnabled, suspendedResources);
}
PROPAGATION_REQUIRES_NEW
新建事务,如果当前存在事务,把当前事务挂起。
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
SuspendedResourcesHolder suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
PROPAGATION_NESTED
开始一个 "嵌套的" 事务, 它是已经存在事务的一个真正的子事务. 嵌套事务开始执行时, 它将取得一个 savepoint. 如果这个嵌套事务失败, 我们将回滚到此 savepoint. 嵌套事务是外部事务的一部分, 只有外部事务结束后它才会被提交。
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (useSavepointForNestedTransaction()) {
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, false , false , true , debugEnabled, null );
this .transactionExecutionListeners.forEach(listener -> listener.beforeBegin(status));
try {
status.createAndHoldSavepoint();
}
catch (RuntimeException | Error ex) {
this .transactionExecutionListeners.forEach(listener -> listener.afterBegin(status, ex));
throw ex;
}
this .transactionExecutionListeners.forEach(listener -> listener.afterBegin(status, null ));
return status;
}
else {
return startTransaction(definition, transaction, true , debugEnabled, null );
}
}
2.4 事务挂起 事务挂起在 AbstractTransactionMa nager.suspend() 中处理,该方法内部将调用具体事务管理器的 doSuspend() 方法。 以 DataSourceTransactionManager 为例,将 ConnectionHolder 设为 null,因为一个 ConnectionHolder 对象就代表了一个数据库连接,将 ConnectionHolder 设为 null 就意味着我们下次要使用连接时,将重新从连接池获取。 protected Object doSuspend(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
txObject.setConnectionHolder(null );
return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}
unbindResource()方法最终会调用TransactionSynchronizationManager.doUnbindResource()方法,该方法将移除当前线程与事务对象的绑定。
private static Object doUnbindResource (Object actualKey ) {
Map map = resources.get ();
if (map == null ) {
return null ;
}
Object value = map.remove (actualKey);
if (map.isEmpty()) {
resources.remove ();
}
if (value instanceof ResourceHolder resourceHolder && resourceHolder.isVoid()) {
value = null ;
}
return value ;
}
而被挂起的事务的各种状态最终会保存在 TransactionStatus 对象中。 2.5 事务提交 & 回滚 主要是对 JDBC 的封装、源码逻辑较清晰,不展开细说。 3、声明式事务 其底层建立在 AOP 的基础之上,对方法前后进行拦截,然后在目标方法开始之前创建或者加入一个事务,在执行完目标方法之后根据执行情况提交或者回滚事务。通过声明式事物,无需在业务逻辑代码中掺杂事务管理的代码,只需在配置文件中做相关的事务规则声明(或通过等价的基于标注的方式),便可以将事务规则应用到业务逻辑中。 3.1 使用示例 <bean id ="dataSource" class ="com.zaxxer.hikari.HikariDataSource" >
<property name ="driverClassName" value ="${jdbc.driverClassName}" />
<property name ="url" value ="${jdbc.url}" />
<property name ="username" value ="${jdbc.username}" />
<property name ="password" value ="${jdbc.password}" />
bean >
<bean id ="transactionManager" class ="org.springframework.jdbc.datasource.DataSourceTransactionManager" >
<property name ="dataSource" ref ="dataSource" />
bean >
<tx:annotation-driven transaction-manager ="transactionManager" />
代码:
@Transactional
public void addOrder () {
}
3.2 自定义标签解析 先从配置文件开始入手,找到处理 annotation-driven 标签的类 TxNamespaceHandler。TxNamespaceHandler 实现了 NamespaceHandler 接口,定义了如何解析和处理自定义 XML 标签。 @Override
public void init () {
registerBeanDefinitionParser("advice" , new TxAdviceBeanDefinitionParser());
registerBeanDefinitionParser("annotation-driven" , new AnnotationDrivenBeanDefinitionParser());
registerBeanDefinitionParser("jta-transaction-manager" , new JtaTransactionManagerBeanDefinitionParser());
}
AnnotationDrivenBeanDefinitionParser 里的 parse() 方法,对 XML 标签 annotation-driven 进行解析。
@Override
public BeanDefinition parse (Element element, ParserContext parserContext ) {
registerTransactionalEventListenerFactory(parserContext);
String mode = element.getAttribute("mode" );
if ("aspectj" .equals (mode)) {
registerTransactionAspect(element, parserContext);
if (ClassUtils.isPresent("jakarta.transaction.Transactional" , getClass().getClassLoader())) {
registerJtaTransactionAspect(element, parserContext);
}
}
else {
AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext);
}
return null ;
}
以默认mode配置为例,执行configureAutoProxyCreator()方法,将在Spring容器中注册了3个bean:
BeanFactoryTransactionAttributeSourceAdvisor、TransactionInterceptor、AnnotationTransactionAttributeSource。同时会将TransactionInterceptor的BeanName传入到 Advisor 中,然后将 AnnotationTransactionAttributeSource 这个 Bean 注入到 Advisor 中。之后动态代理的时候会使用这个 Advisor 去寻找每个 Bean 是否需要动态代理。
RootBeanDefinition advisorDef = new RootBeanDefinition(BeanFactoryTransactionAttributeSourceAdvisor.class);
advisorDef.setSource(eleSource);
advisorDef.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
advisorDef.getPropertyValues().add ("transactionAttributeSource" , new RuntimeBeanReference(sourceName));
advisorDef.getPropertyValues().add ("adviceBeanName" , interceptorName);
if (element.hasAttribute("order" )) {
advisorDef.getPropertyValues().add ("order" , element.getAttribute("order" ));
}
parserContext.getRegistry().registerBeanDefinition(txAdvisorBeanName, advisorDef);
CompositeComponentDefinition compositeDef = new CompositeComponentDefinition(element.getTagName(), eleSource);
compositeDef.addNestedComponent(new BeanComponentDefinition(sourceDef, sourceName));
compositeDef.addNestedComponent(new BeanComponentDefinition(interceptorDef, interceptorName));
compositeDef.addNestedComponent(new BeanComponentDefinition(advisorDef, txAdvisorBeanName));
parserContext.registerComponent(compositeDef);
3.3 Advisor 回顾 AOP 用法,Advisor 可用于定义一个切面,它包含切点(Pointcut)和通知(Advice),用于在特定的连接点上执行特定的操作。Spring 事务实现了一个Advisor: BeanFactoryTransactionAttributeSourceAdvisor。 public class BeanFactoryTransactionAttributeSourceAdvisor extends AbstractBeanFactoryPointcutAdvisor {
private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut();
public void setTransactionAttributeSource (TransactionAttributeSource transactionAttributeSource) {
this .pointcut.setTransactionAttributeSource(transactionAttributeSource);
}
public void setClassFilter (ClassFilter classFilter) {
this .pointcut.setClassFilter(classFilter);
}
@Override
public Pointcut getPointcut () {
return this .pointcut;
}
}
BeanFactoryTransactionAttributeSourceAdvisor 其实是一个 PointcutAdvisor,是否匹配到切入点取决于 Pointcut。Pointcut 的核心在于其 ClassFilter 和 MethodMatcher。
ClassFilter:
TransactionAttributeSourcePointcut 内部私有类 TransactionAttributeSourceClassFilter,实现了 Spring 框架中的 ClassFilter 接口。在 matches 方法中,它首先检查传入的类 clazz 否为 TransactionalProxy、TransactionManager 或 PersistenceExceptionTranslator 的子类,如果不是,则获取当前的 TransactionAttributeSource 并检查其是否允许该类作为候选类。
private class TransactionAttributeSourceClassFilter implements ClassFilter {
@Override
public boolean matches(Class> clazz) {
if (TransactionalProxy.class .isAssignableFrom(clazz) ||
TransactionManager.class .isAssignableFrom(clazz) ||
PersistenceExceptionTranslator.class .isAssignableFrom(clazz)) {
return false ;
}
return (transactionAttributeSource == null || transactionAttributeSource.isCandidateClass(clazz));
}
}
MethodMatcher:
TransactionAttributeSourcePointcut.matches:
@Override
public boolean matches(Method method, Class> targetClass) {
return (this .transactionAttributeSource == null ||
this .transactionAttributeSource.getTransactionAttribute(method, targetClass) != null );
}
getTransactionAttribute()方法最终会调用至 AbstractFallbackTransactionAttributeSource.computeTransactionAttribute() 方法,该方法将先去方法上查找是否有相应的事务注解(比如 @Transactional),如果没有,那么再去类上查找。