Loading...
墨滴

Roy4259

2022/01/03  阅读:37  主题:嫩青

Spring声明式事务源码探究

Spring 事务源码探究

1. 入口分析

开启事务可以通过注解 @EnableTransactionManagement 的方式开启,看到 @Import(TransactionManagementConfigurationSelector.class) 查看 TransactionManagementConfigurationSelector @Import 会在 ConfigurationClassPostProcessor#postProcessBeanDefinitionRegistry 的时候扫描解析配置类,调用 selectImports 方法获得全类名路径,然后根据反射为类注册进容器中;一般情况下,开启事务会使用动态代理的方式, 会注册 AutoProxyRegistrarProxyTransactionManagementConfiguration 两个类

  • 看到 AutoProxyRegistrar 类,实现了 ImportBeanDefinitionRegistrar ,会向容器中注册 bean

      1. 拿到所有注解类型(因为 mode、proxyTargetClass 等属性会直接影响到代理的方式,而拥有这些属性的注解至少有:@EnableTransactionManagement、@EnableAsync、@EnableCaching 等)
      1. 得到 modeproxyTargetClass 的属性,注册不同的 bean 跟进 AopConfigUtils.registerAutoProxyCreatorIfNecessary(registry); 最后发现注册了一个 internalAutoProxyCreator public static final String AUTO_PROXY_CREATOR_BEAN_NAME = "org.springframework.aop.config.internalAutoProxyCreator";

        由于 AOP 和事务注册的都是 名字为 org.springframework.aop.config.internalAutoProxyCreatorBeanPostProcessor ,但是只会保留一个, AOP 的会覆盖事务的,因为 AOP 优先级更大,见 findPriorityForClass 方法实现, list 的下标越小优先级越高

  • 看到 ProxyTransactionManagementConfiguration ,有 @Configuration 注解,是一个配置类,会向容器注入 bean ,有事务增强器 BeanFactoryTransactionAttributeSourceAdvisor、事务属性源 TransactionAttributeSource、事务拦截器 TransactionInterceptor

    • 重点看到 BeanFactoryTransactionAttributeSourceAdvisor ,可以看到设置了注解的事务源 AnnotationTransactionAttributeSource ,事务的拦截器 TransactionInterceptor, BeanFactoryTransactionAttributeSourceAdvisor 类中有一个重要的属性 TransactionAttributeSourcePointcut ,它是一个切面,决定了哪些类会被切入,从而生成代理对象

    容器内的每个Bean,都会经过 AbstractAutoProxyCreator#postProcessAfterInitialization 从而会调用wrapIfNecessary方法, 因此容器内所有的Bean的所有方法在容器启动时候都会执行此matches方法, 所以spring使用了缓存

2. 解析 advisor

事务通知的解析跟 AOP 中的步骤是差不多的,不同点就是声明式事务的通知是自己定义的ProxyTransactionManagementConfiguration,如下方流程图,看到 AnnotationAwareAspectJAutoProxyCreator#findCandidateAdvisors 方法 进入到 super.findCandidateAdvisors();, 最后来到 BeanFactoryAdvisorRetrievalHelper#findAdvisorBeans

    1. 上面入口提到的 ProxyTransactionManagementConfiguration 配置类注入容器的事务增强器 BeanFactoryTransactionAttributeSourceAdvisor 也会被取出来

    advisorNames = BeanFactoryUtils.beanNamesForTypeIncludingAncestors (this.beanFactory, Advisor.class, true, false);

    1. 循环通知的名称集合
      1. 校验是不是合适的,( InfrastructureAdvisorAutoProxyCreator: bean 在容器中,并且 bean 定义的角色为 BeanDefinition.ROLE_INFRASTRUCTURE )
      1. 没有在创建中
      1. 把他加入 list 中返回

    advisors.add(this.beanFactory.getBean(name, Advisor.class));

3. 创建动态代理

创建动态代理跟 AOP 中的步骤是差不多的,如下方流程图 不同点是在找 eligbleAdvisor 中不同,声明式事务不需要判断切点表达式,它只需要判断有没有 @Transational 这个注解,不需要判断切点表达式,所以它在入口中提到的配置类中引入了自己的通知跟切点,下面看看找出 eligbleAdvisor 的,在下方 if 分支中,会进入 if (advisor instanceof PointcutAdvisor) 分支中(BeanFactoryTransactionAttributeSourceAdvisor 继承了 AbstractBeanFactoryPointcutAdvisor) 查看 canApply 方法

    1. 初筛时事务不像 AOP,上面介绍的 TransactionAttributeSourcePointcutgetClassFilterTrueClassFilter,所以所有的类都能匹配

    if (!pc.getClassFilter().matches(targetClass))

    1. 事务的 methodMatcher 没有实现该接口,只有 AOP 的实现了该接口

    if (methodMatcher instanceof IntroductionAwareMethodMatcher)

    1. 事务直接调用 methodMatcher.matches 进行方法匹配

    methodMatcher.matches(method, targetClass) getTransactionAttributeSource 方法会返回上面入口注册的注解事务源 AnnotationTransactionAttributeSource, AnnotationTransactionAttributeSource 继承了 AbstractFallbackTransactionAttributeSource,调用 getTransactionAttribute 方法,通过 methodtargetClass 查找到 method 上的事务注解

      1. 先从缓存中取,因为这个过程相对比较耗资源,会使用缓存存储已经解析过的,后续调用时需要获取

      if (cached != null)

      1. 查找事务注解,该方法中具体执行匹配过程,实现类方法--->接口的方法--->实现类--->接口类

      computeTransactionAttribute(method, targetClass)

        1. 得到具体方法,如果 method 是接口方法将从 targetClass 得到实现类的方法,无论传的是接口还是实现类的方法,都会先解析实现类,所以接口传进来没用,因为 findTransactionAttribute 方法本身就会去接口中解析

        Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass)

        1. 根据具体方法解析

        TransactionAttribute txAttr = findTransactionAttribute(specificMethod);

        会进到 AnnotationTransactionAttributeSource#determineTransactionAttribute 方法中, 最后会来到 SpringTransactionAnnotationParser#parseTransactionAnnotation

          1. element 对象中获取 @Transactional 注解 然后把注解属性封装到了 AnnotationAttributes ,最后会去到 AnnotatedElementUtils#searchWithFindSemantics ,代码较长看流程图

        AnnotationAttributes attributes = AnnotatedElementUtils.findMergedAnnotationAttributes( element, Transactional.class, false, false);

          1. 解析出真正的事务属性对象

        return parseTransactionAnnotation(attributes);

        1. 根据实现类解析

        txAttr = findTransactionAttribute(specificMethod.getDeclaringClass());

      1. 把方法描述设置到事务属性上去,方便后续调用时判断

      ((DefaultTransactionAttribute) txAttr).setDescriptor(methodIdentification);

      1. 加入到缓存

      this.attributeCache.put(cacheKey, txAttr);

4. 调用代理对象

声明式事务调用代理对象方法跟 AOP 也是大体相同, AOP 中调用代理对象是将通知 advisor 转换为一个个拦截器,然后加进拦截器链中,然后责任链模式的调用通知的增强方法;而声明式事务在 入口分析 中配置类会注册一个事务拦截器 TransactionInterceptor ,事务的实现就是通过这个拦截器实现的,看到它的 invoke 方法,最终会来到 TransactionAspectSupport#invokeWithinTransaction 方法中

protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
   final InvocationCallback invocation)
 throws Throwable 
{

 //获取事务属源对象
 TransactionAttributeSource tas = getTransactionAttributeSource();
 //通过事务属性源对象获取到事务属性信息
 final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
 //获取配置的事务管理器对象
 final PlatformTransactionManager tm = determineTransactionManager(txAttr);
 //从tx属性对象中获取出标注了@Transactionl的方法描述符
 final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

 //处理声明式事务
 if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
  //有没有必要创建事务
  TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);

  Object retVal;
  try {
   //调用钩子函数进行回调目标方法
   retVal = invocation.proceedWithInvocation();
  }
  catch (Throwable ex) {
   //抛出异常进行回滚处理
   completeTransactionAfterThrowing(txInfo, ex);
   throw ex;
  }
  finally {
   //清空线程变量中transactionInfo的值
   cleanupTransactionInfo(txInfo);
  }
  //提交事务
  commitTransactionAfterReturning(txInfo);
  return retVal;
 }
 //编程式事务
 else {
  //省略...
 }
}

其实可以理解为是个环绕通知

//创建事务
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);

try{
  //调用钩子函数进行回调目标方法
 retVal = invocation.proceedWithInvocation();
catch {
  //抛出异常进行回滚处理
  completeTransactionAfterThrowing(txInfo, ex);
  throw ex;
}

//提交事务
commitTransactionAfterReturning(txInfo);

创建事务前都是先获取基本信息,此处跳过,重点看到 createTransactionIfNecessary 方法

protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
   @Nullable TransactionAttribute txAttr, final String joinpointIdentification)
 
{

 // 把方法描述符作为一个事务名称
 if (txAttr != null && txAttr.getName() == null) {
  txAttr = new DelegatingTransactionAttribute(txAttr) {
   @Override
   public String getName() {
    return joinpointIdentification;
   }
  };
 }

 TransactionStatus status = null;
 if (txAttr != null) {
  if (tm != null) {
   //获取一个事务状态
   status = tm.getTransaction(txAttr);
  }
  else {
   if (logger.isDebugEnabled()) {
    logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
      "] because no transaction manager has been configured");
   }
  }
 }
 //把事物状态和事物属性等信息封装成一个TransactionInfo对象
 return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}

看到 tm.getTransaction(txAttr) 方法,此处的 tm 是入口分析中提到的配置类的父类 AbstractTransactionManagementConfiguration 中注册的事务管理器,来到 AbstractPlatformTransactionManager#getTransaction 方法

//AbstractTransactionManagementConfiguration
@Nullable
protected PlatformTransactionManager txManager;
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
 //尝试获取一个事务对象
 Object transaction = doGetTransaction();

 // Cache debug flag to avoid repeated checks.
 boolean debugEnabled = logger.isDebugEnabled();

 /**
  * 判断从上一个方法传递进来的事务属性是不是为空
  */

 if (definition == null) {
  //为空的话,执行非事务方法
  definition = new DefaultTransactionDefinition();
 }

 /**
  * 判断是不是已经存在了事务对象
  */

 if (isExistingTransaction(transaction)) {
  //处理存在的事务
  return handleExistingTransaction(definition, transaction, debugEnabled);
 }

 //检查事务设置的超时时间
 if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
  throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
 }

 /**
  * 若当前的事务属性式 PROPAGATION_MANDATORY 表示必须运行在事务中,若当前没有事务就抛出异常
  * 由于isExistingTransaction(transaction)跳过了这里,说明当前是不存在事务的,那么就会抛出异常
  */

 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
  throw new IllegalTransactionStateException(
    "No existing transaction found for transaction marked with propagation 'mandatory'");
 }
 /**
  * PROPAGATION_REQUIRED 当前存在事务就加入到当前的事务,没有就新开一个
  * PROPAGATION_REQUIRES_NEW:新开一个事务,若当前存在事务就挂起当前事务
  * PROPAGATION_NESTED: PROPAGATION_NESTED
     表示如果当前正有一个事务在运行中,则该方法应该运行在 一个嵌套的事务中,
     被嵌套的事务可以独立于封装事务进行提交或者回滚(保存点),
     如果封装事务不存在,行为就像 PROPAGATION_REQUIRES NEW
  */

 else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
   definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
   definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
  /**
   * 挂起当前事务,在这里为啥传入null?
   * 因为逻辑走到这里了,经过了上面的isExistingTransaction(transaction) 判断当前是不存在事务的
   * 所有再这里是挂起当前事务传递一个null进去
   */

  SuspendedResourcesHolder suspendedResources = suspend(null);
  if (debugEnabled) {
   logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
  }
  try {
   boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
   //新创建一个事务状态
   DefaultTransactionStatus status = newTransactionStatus(
     definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
   //开启一个新的事物
   doBegin(transaction, definition);
   //把当前的事务信息绑定到线程变量去
   prepareSynchronization(status, definition);
   return status;
  }
  catch (RuntimeException | Error ex) {
   resume(null, suspendedResources);
   throw ex;
  }
 }
 else { //创建一个空的事务
  // Create "empty" transaction: no actual transaction, but potentially synchronization.
  if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
   logger.warn("Custom isolation level specified but no actual transaction initiated; " +
     "isolation level will effectively be ignored: " + definition);
  }
  boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
  return prepareTransactionStatus(definition, nulltrue, newSynchronization, debugEnabled, null);
 }
}

获取事务对象 doGetTransaction

//DataSourceTransactionManager
protected Object doGetTransaction() {
  //创建一个数据源事务对象
  DataSourceTransactionObject txObject = new DataSourceTransactionObject();
  //是否允许当前事务设置保持点
  txObject.setSavepointAllowed(isNestedTransactionAllowed());
  /**
   * TransactionSynchronizationManager 事务同步管理器对象(该类中都是局部线程变量)
   * 用来保存当前事务的信息,我们第一次从这里去线程变量中获取 事务连接持有器对象 通过数据源为key去获取
   * 由于第一次进来开始事务 我们的事务同步管理器中没有被存放.所以此时获取出来的conHolder为null
   */

  ConnectionHolder conHolder =
      (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
  txObject.setConnectionHolder(conHolder, false);
  //返回事务对象
  return txObject;
}

开启新事务 doBegin

//DataSourceTransactionManager
protected void doBegin(Object transaction, TransactionDefinition definition) {
 //强制转化事物对象
 DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
 Connection con = null;

 try {
  //判断事务对象没有数据库连接持有器
  if (!txObject.hasConnectionHolder() ||
    txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
   //通过数据源获取一个数据库连接对象
   Connection newCon = obtainDataSource().getConnection();
   if (logger.isDebugEnabled()) {
    logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
   }
   //把我们的数据库连接包装成一个ConnectionHolder对象 然后设置到我们的txObject对象中去
   txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
  }

  //标记当前的连接是一个同步事务...?
  txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
  con = txObject.getConnectionHolder().getConnection();

  // 设置isReadOnly、隔离级别
  Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
  txObject.setPreviousIsolationLevel(previousIsolationLevel);

  //setAutoCommit 默认为true,即每条SQL语句在各自的一个事务中执行。
  if (con.getAutoCommit()) {
   txObject.setMustRestoreAutoCommit(true);
   if (logger.isDebugEnabled()) {
    logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
   }
   con.setAutoCommit(false); // 开启事务
  }

  //判断事务为只读事务
  prepareTransactionalConnection(con, definition);
  //设置事务激活
  txObject.getConnectionHolder().setTransactionActive(true);

  //设置事务超时时间
  int timeout = determineTimeout(definition);
  if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
   txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
  }

  // 绑定我们的数据源和连接到我们的同步管理器上   把数据源作为key,数据库连接作为value 设置到线程变量中
  if (txObject.isNewConnectionHolder()) {
   TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
  }
 }

 catch (Throwable ex) {
  if (txObject.isNewConnectionHolder()) {
   //释放数据库连接
   DataSourceUtils.releaseConnection(con, obtainDataSource());
   txObject.setConnectionHolder(nullfalse);
  }
  throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
 }
}

大致如下流程图

事务的回滚,通过事务状态 DefaultTransactionStatus ,最后会调用到数据库连接的 rollback 方法

5. 总结

声明式事务是特殊的 AOP ,它注册自己的通知、拦截器、事务管理器,不需要像 AOP 一样解析切面做缓存,它将 @Transactional 的属性解析出来保存到事务属性 TransactionAttribute 中(会缓存,不需要重复解析),通过配置类中注册的事务属性源 TransactionAttributeSource 获取;

创建代理的时候, 不同于 AOP 可能会有五个通知,而声明式事务的 _eligbleAdvisor _ 只有配置类中注册的通知;

调用代理方法时,可理解为环绕通知.事务是根据事务状态 DefaultTransactionStatus 为基础的,它可以保存挂起事务的属性,将数据源和数据库连接保存到事务同步管理器 TransactionSynchronizationManager 中(如果挂起就是将事务同步管理器中的属性保存到新的 SuspendedResourcesHolder 中,并把原来事务的 connectionHolder 置空,如果开启新事务会在 doBegin 中重新设置 connectionHolder).

Roy4259

2022/01/03  阅读:37  主题:嫩青

作者介绍

Roy4259