• Spring-事务


    @EnableTransactionManagement工作原理

    开启Spring事务本质上就是增加了一个Advisor,在使用@EnableTransactionManagement注解来开启Spring事务时,该注解代理的功能就是向Spring容器中添加了两个Bean

    1. @Import(TransactionManagementConfigurationSelector.class)
    2. public @interface EnableTransactionManagement { ... }
    3. /**
    4. * Selects which implementation of {@link AbstractTransactionManagementConfiguration}
    5. * should be used based on the value of {@link EnableTransactionManagement#mode} on the
    6. * importing {@code @Configuration} class.
    7. *
    8. * @author Chris Beams
    9. * @author Juergen Hoeller
    10. * @since 3.1
    11. * @see EnableTransactionManagement
    12. * @see ProxyTransactionManagementConfiguration
    13. * @see TransactionManagementConfigUtils#TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME
    14. * @see TransactionManagementConfigUtils#JTA_TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME
    15. */
    16. public class TransactionManagementConfigurationSelector extends AdviceModeImportSelector {
    17. /**
    18. * Returns {@link ProxyTransactionManagementConfiguration} or
    19. * {@code AspectJ(Jta)TransactionManagementConfiguration} for {@code PROXY}
    20. * and {@code ASPECTJ} values of {@link EnableTransactionManagement#mode()},
    21. * respectively.
    22. */
    23. @Override
    24. protected String[] selectImports(AdviceMode adviceMode) {
    25. switch (adviceMode) {
    26. case PROXY:
    27. // 默认是PROXY
    28. return new String[] {AutoProxyRegistrar.class.getName(),
    29. ProxyTransactionManagementConfiguration.class.getName()};
    30. case ASPECTJ:
    31. // 表示不用动态代理技术,用ASPECTJ技术,一般用不到
    32. return new String[] {determineTransactionAspectClass()};
    33. default:
    34. return null;
    35. }
    36. }
    37. private String determineTransactionAspectClass() {
    38. return (ClassUtils.isPresent("javax.transaction.Transactional", getClass().getClassLoader()) ?
    39. TransactionManagementConfigUtils.JTA_TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME :
    40. TransactionManagementConfigUtils.TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME);
    41. }
    42. }

    1、AutoProxyRegistrar

    向Spring容器中注册了一个InfrastructureAdvisorAutoProxyCreator的Bean

    InfrastructureAdvisorAutoProxyCreator 继承了AbstractAdvisorAutoProxyCreator

    主要作用:开启自动代理

    在初始化后寻找Advisor类型的Bean,并检查当前Bean是否有匹配的Advisor,是否需要动态代理创建代理对象

    1. /**
    2. * Registers an auto proxy creator against the current {@link BeanDefinitionRegistry}
    3. * as appropriate based on an {@code @Enable*} annotation having {@code mode} and
    4. * {@code proxyTargetClass} attributes set to the correct values.
    5. *
    6. * @author Chris Beams
    7. * @since 3.1
    8. * @see org.springframework.cache.annotation.EnableCaching
    9. * @see org.springframework.transaction.annotation.EnableTransactionManagement
    10. */
    11. public class AutoProxyRegistrar implements ImportBeanDefinitionRegistrar {
    12. private final Log logger = LogFactory.getLog(getClass());
    13. /**
    14. * Register, escalate, and configure the standard auto proxy creator (APC) against the
    15. * given registry. Works by finding the nearest annotation declared on the importing
    16. * {@code @Configuration} class that has both {@code mode} and {@code proxyTargetClass}
    17. * attributes. If {@code mode} is set to {@code PROXY}, the APC is registered; if
    18. * {@code proxyTargetClass} is set to {@code true}, then the APC is forced to use
    19. * subclass (CGLIB) proxying.
    20. *

      Several {@code @Enable*} annotations expose both {@code mode} and

    21. * {@code proxyTargetClass} attributes. It is important to note that most of these
    22. * capabilities end up sharing a {@linkplain AopConfigUtils#AUTO_PROXY_CREATOR_BEAN_NAME
    23. * single APC}. For this reason, this implementation doesn't "care" exactly which
    24. * annotation it finds -- as long as it exposes the right {@code mode} and
    25. * {@code proxyTargetClass} attributes, the APC can be registered and configured all
    26. * the same.
    27. */
    28. @Override
    29. public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
    30. boolean candidateFound = false;
    31. Set annTypes = importingClassMetadata.getAnnotationTypes();
    32. for (String annType : annTypes) {
    33. AnnotationAttributes candidate = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType);
    34. if (candidate == null) {
    35. continue;
    36. }
    37. Object mode = candidate.get("mode");
    38. Object proxyTargetClass = candidate.get("proxyTargetClass");
    39. if (mode != null && proxyTargetClass != null && AdviceMode.class == mode.getClass() &&
    40. Boolean.class == proxyTargetClass.getClass()) {
    41. candidateFound = true;
    42. if (mode == AdviceMode.PROXY) {
    43. // 注册InfrastructureAdvisorAutoProxyCreator,才可以Bean进行AOP
    44. AopConfigUtils.registerAutoProxyCreatorIfNecessary(registry);
    45. if ((Boolean) proxyTargetClass) {
    46. // 设置InfrastructureAdvisorAutoProxyCreator的proxyTargetClass为true
    47. AopConfigUtils.forceAutoProxyCreatorToUseClassProxying(registry);
    48. return;
    49. }
    50. }
    51. }
    52. }
    53. if (!candidateFound && logger.isInfoEnabled()) {
    54. String name = getClass().getSimpleName();
    55. logger.info(String.format("%s was imported but no annotations were found " +
    56. "having both 'mode' and 'proxyTargetClass' attributes of type " +
    57. "AdviceMode and boolean respectively. This means that auto proxy " +
    58. "creator registration and configuration may not have occurred as " +
    59. "intended, and components may not be proxied as expected. Check to " +
    60. "ensure that %s has been @Import'ed on the same class where these " +
    61. "annotations are declared; otherwise remove the import of %s " +
    62. "altogether.", name, name, name));
    63. }
    64. }
    65. }
    66. // AopConfigUtils#registerAutoProxyCreatorIfNecessary
    67. public static BeanDefinition registerAutoProxyCreatorIfNecessary(BeanDefinitionRegistry registry) {
    68. return registerAutoProxyCreatorIfNecessary(registry, null);
    69. }
    70. // AopConfigUtils#registerAutoProxyCreatorIfNecessary
    71. public static BeanDefinition registerAutoProxyCreatorIfNecessary(
    72. BeanDefinitionRegistry registry, @Nullable Object source) {
    73. return registerOrEscalateApcAsRequired(InfrastructureAdvisorAutoProxyCreator.class, registry, source);
    74. }
    75. // AopConfigUtils#forceAutoProxyCreatorToUseClassProxying
    76. public static void forceAutoProxyCreatorToUseClassProxying(BeanDefinitionRegistry registry) {
    77. // AUTO_PROXY_CREATOR_BEAN_NAME = "org.springframework.aop.config.internalAutoProxyCreator"
    78. if (registry.containsBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME)) {
    79. BeanDefinition definition = registry.getBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME);
    80. definition.getPropertyValues().add("proxyTargetClass", Boolean.TRUE);
    81. }
    82. }

    2、ProxyTransactionManagementConfiguration

    是一个配置类,内部定义了三个Bean:

    • BeanFactoryTransactionAttributeSourceAdvisor:一个Advisor

    • AnnotationTransactionAttributeSource:相当于BeanFactoryTransactionAttributeSourceAdvisor中的Pointcut,判断某个类或者方法上面是否存在@Transactional注解,注意这里并没有解析

    • TransactionInterceptor:相当于BeanFactoryTransactionAttributeSourceAdvisor中的 Advice,当某个类中存在@Transactional注解,就会产生一个代理对象作为Bean,在执行代理对象的某个方法时会进入到TransactionInterceptor的invoke()方法中

    invoke()方法里面就会创建数据库连接、关闭自动提交、事务提交和回滚等

    1. /**
    2. * {@code @Configuration} class that registers the Spring infrastructure beans
    3. * necessary to enable proxy-based annotation-driven transaction management.
    4. *
    5. * @author Chris Beams
    6. * @author Sebastien Deleuze
    7. * @since 3.1
    8. * @see EnableTransactionManagement
    9. * @see TransactionManagementConfigurationSelector
    10. */
    11. @Configuration(proxyBeanMethods = false)
    12. @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    13. public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {
    14. @Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME)
    15. @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    16. public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor(
    17. TransactionAttributeSource transactionAttributeSource, TransactionInterceptor transactionInterceptor) {
    18. BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
    19. advisor.setTransactionAttributeSource(transactionAttributeSource);
    20. advisor.setAdvice(transactionInterceptor);
    21. if (this.enableTx != null) {
    22. advisor.setOrder(this.enableTx.getNumber("order"));
    23. }
    24. return advisor;
    25. }
    26. @Bean
    27. @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    28. public TransactionAttributeSource transactionAttributeSource() {
    29. // AnnotationTransactionAttributeSource中定义了一个Pointcut
    30. // 并且AnnotationTransactionAttributeSource可以用来解析@Transactional注解,并得到一个RuleBasedTransactionAttribute对象
    31. return new AnnotationTransactionAttributeSource();
    32. }
    33. @Bean
    34. @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    35. public TransactionInterceptor transactionInterceptor(TransactionAttributeSource transactionAttributeSource) {
    36. TransactionInterceptor interceptor = new TransactionInterceptor();
    37. interceptor.setTransactionAttributeSource(transactionAttributeSource);
    38. if (this.txManager != null) {
    39. interceptor.setTransactionManager(this.txManager);
    40. }
    41. return interceptor;
    42. }
    43. }

    Spring事务基本执行原理

    InfrastructureAdvisorAutoProxyCreator也是一个BeanPostProcessor(最终实现SmartInstantiationAwareBeanPostProcessor),在Bean的生命周期初始化后步骤时,会去判断是否需要创建代理对象

    判断依据:当前Bean和BeanFactoryTransactionAttributeSourceAdvisor匹配,简单点就是当前Bean的类或者方法上面有没有@Transactional注解,有就匹配

    在代理对象执行某个方法时,会再次判断当前执行方法和BeanFactoryTransactionAttributeSourceAdvisor是否匹配,如果匹配则执行该Advisor中的TransactionInterceptor的invoke()方法,执行流程:

    (再次判断是考虑到一个类中可能有多个方法,有的方法有@Transactional注解,有的没有)

    1、利用所配置的PlatformTransactionManager事务管理器新建一个数据库连接,注意TransactionManager有限制的必须是PlatformTransactionManager,然后会生成一个joinpointIdentification作为事务的名字

    2、修改数据库连接的autocommit为false

    3、执行MethodInvocation.proceed()方法,简单理解就是执行业务方法,其中就会执行sql

    4、如果没有抛异常,则提交,执行完finally中的方法后再进行提交

    5、如果抛了异常,则回滚,执行完finally中的方法后再抛出异常

    Spring事务传播机制

    开发过程中,经常会出现一个方法调用另外一个方法,那么就涉及到了多种场景。比如多个方法需要在同一个事务中执行;每个方法要在单独的事务中执行;有的方法需要事务,有的不需要事务;还有很多更复杂情况。Spring为了支持各种场景,也就有了Spring事务传播机制

    场景分析,假设a()方法在事务执行中,调用b()方法需要开启一个新事务执行:

    1、首先,代理对象执行a()方法前,先利用事务管理器新建一个数据库连接a

    2、将数据库连接a的autocommit改为false

    3、把数据库连接a设置到ThreadLocal中

    4、执行a()方法中的sql

    5、执行a()方法过程中,调用了b()方法(注意用代理对象调用b()方法)

    a)代理对象执行b()方法前,判断出来了当前线程中已经存在一个数据库连接a了,表示当前线程其实已经拥有一个Spring事务了,则进行挂起

    b)挂起就是把ThreadLocal中的数据库连接a从ThreadLocal中移除,并放入一个挂起资源对象中(就是事务同步管理器:TransactionSynchronizationManager)

    c)挂起完成后,再次利用事务管理器新建一个数据库连接b

    d)将数据库连接b的autocommit改为false

    e)把数据库连接b设置到ThreadLocal中

    f)执行b()方法中的sql

    g)b()方法正常执行完,则从ThreadLocal中拿到数据库连接b进行提交

    h)提交之后会恢复所挂起的数据库连接a,这里的恢复,其实只是把在挂起资源对象中所保存的数据库连接a再次设置到ThreadLocal中

    6、a()方法正常执行完,则从ThreadLocal中拿到数据库连接a进行提交

    过程核心:在执行某个方法时,判断当前是否已经存在一个事务,就是判断当前线程的ThreadLocal中是否存在一个数据库连接对象,如果存在则表示已经存在一个事务了。

    代理对象执行方法时,源码invoke入口:

    1. // TransactionInterceptor#invoke
    2. public Object invoke(MethodInvocation invocation) throws Throwable {
    3. // Work out the target class: may be {@code null}.
    4. // The TransactionAttributeSource should be passed the target class
    5. // as well as the method, which may be from an interface.
    6. Class targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
    7. // Adapt to TransactionAspectSupport's invokeWithinTransaction...
    8. return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {
    9. @Override
    10. @Nullable
    11. public Object proceedWithInvocation() throws Throwable {
    12. // 执行后续的Interceptor,以及被代理的方法
    13. return invocation.proceed(); // test() sql
    14. }
    15. @Override
    16. public Object getTarget() {
    17. return invocation.getThis();
    18. }
    19. @Override
    20. public Object[] getArguments() {
    21. return invocation.getArguments();
    22. }
    23. });
    24. }
    25. @Override
    26. @Nullable
    27. public Object proceed() throws Throwable {
    28. // We start with an index of -1 and increment early.
    29. // currentInterceptorIndex初始值为-1,每调用一个interceptor就会加1
    30. // 当调用完了最后一个interceptor后就会执行被代理方法
    31. if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) {
    32. return invokeJoinpoint();
    33. }
    34. // currentInterceptorIndex初始值为-1
    35. Object interceptorOrInterceptionAdvice =
    36. this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex);
    37. // 当前interceptor是InterceptorAndDynamicMethodMatcher,则先进行匹配,匹配成功后再调用该interceptor
    38. // 如果没有匹配则递归调用proceed()方法,调用下一个interceptor
    39. if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) {
    40. // Evaluate dynamic method matcher here: static part will already have
    41. // been evaluated and found to match.
    42. InterceptorAndDynamicMethodMatcher dm =
    43. (InterceptorAndDynamicMethodMatcher) interceptorOrInterceptionAdvice;
    44. Class targetClass = (this.targetClass != null ? this.targetClass : this.method.getDeclaringClass());
    45. // 动态匹配,根据方法参数匹配
    46. if (dm.methodMatcher.matches(this.method, targetClass, this.arguments)) {
    47. return dm.interceptor.invoke(this);
    48. }
    49. else {
    50. // Dynamic matching failed.
    51. // Skip this interceptor and invoke the next in the chain.
    52. // 不匹配则执行下一个MethodInterceptor
    53. return proceed();
    54. }
    55. }
    56. else {
    57. // It's an interceptor, so we just invoke it: The pointcut will have
    58. // been evaluated statically before this object was constructed.
    59. // 直接调用MethodInterceptor,传入this,在内部会再次调用proceed()方法进行递归
    60. // 比如MethodBeforeAdviceInterceptor
    61. return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this);
    62. }
    63. }

    TransactionInterceptor#invokeWithinTransaction:

    1. /**
    2. * General delegate for around-advice-based subclasses, delegating to several other template
    3. * methods on this class. Able to handle {@link CallbackPreferringPlatformTransactionManager}
    4. * as well as regular {@link PlatformTransactionManager} implementations and
    5. * {@link ReactiveTransactionManager} implementations for reactive return types.
    6. * @param method the Method being invoked
    7. * @param targetClass the target class that we're invoking the method on
    8. * @param invocation the callback to use for proceeding with the target invocation
    9. * @return the return value of the method, if any
    10. * @throws Throwable propagated from the target invocation
    11. */
    12. @Nullable
    13. protected Object invokeWithinTransaction(Method method, @Nullable Class targetClass,
    14. final InvocationCallback invocation) throws Throwable {
    15. // If the transaction attribute is null, the method is non-transactional.
    16. // TransactionAttribute就是@Transactional中的配置
    17. TransactionAttributeSource tas = getTransactionAttributeSource();
    18. // 获取@Transactional注解中的属性值
    19. final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
    20. // 返回Spring容器中类型为TransactionManager的Bean对象
    21. final TransactionManager tm = determineTransactionManager(txAttr);
    22. // ReactiveTransactionManager用得少,并且它只是执行方式是响应式的,原理流程和普通的是一样的
    23. if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
    24. ... ...
    25. }
    26. // 把tm强制转换为PlatformTransactionManager,所以我们在定义时得定义PlatformTransactionManager类型
    27. PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
    28. // joinpoint的唯一标识,就是当前在执行的方法名字
    29. final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
    30. // CallbackPreferringPlatformTransactionManager表示拥有回调功能的PlatformTransactionManager,也不常用
    31. if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
    32. // Standard transaction demarcation with getTransaction and commit/rollback calls.
    33. // 如果有必要就创建事务,这里就涉及到事务传播机制的实现了
    34. // TransactionInfo表示一个逻辑事务,比如两个逻辑事务属于同一个物理事务
    35. TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
    36. Object retVal;
    37. try {
    38. // This is an around advice: Invoke the next interceptor in the chain.
    39. // This will normally result in a target object being invoked.
    40. // 执行下一个Interceptor或被代理对象中的方法
    41. retVal = invocation.proceedWithInvocation(); // test() sql
    42. }
    43. catch (Throwable ex) {
    44. // target invocation exception
    45. // 抛异常了,则回滚事务,或者
    46. completeTransactionAfterThrowing(txInfo, ex);
    47. throw ex;
    48. }
    49. finally {
    50. cleanupTransactionInfo(txInfo);
    51. }
    52. if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
    53. // Set rollback-only in case of Vavr failure matching our rollback rules...
    54. TransactionStatus status = txInfo.getTransactionStatus();
    55. if (status != null && txAttr != null) {
    56. retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
    57. }
    58. }
    59. // 提交事务
    60. commitTransactionAfterReturning(txInfo);
    61. return retVal;
    62. }
    63. else {
    64. Object result;
    65. final ThrowableHolder throwableHolder = new ThrowableHolder();
    66. ... ...
    67. }
    68. }
    69. /**
    70. * Create a transaction if necessary based on the given TransactionAttribute.
    71. *

      Allows callers to perform custom TransactionAttribute lookups through

    72. * the TransactionAttributeSource.
    73. * @param txAttr the TransactionAttribute (may be {@code null})
    74. * @param joinpointIdentification the fully qualified method name
    75. * (used for monitoring and logging purposes)
    76. * @return a TransactionInfo object, whether or not a transaction was created.
    77. * The {@code hasTransaction()} method on TransactionInfo can be used to
    78. * tell if there was a transaction created.
    79. * @see #getTransactionAttributeSource()
    80. */
    81. @SuppressWarnings("serial")
    82. protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
    83. @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
    84. // If no name specified, apply method identification as transaction name.
    85. if (txAttr != null && txAttr.getName() == null) {
    86. txAttr = new DelegatingTransactionAttribute(txAttr) {
    87. @Override
    88. public String getName() {
    89. return joinpointIdentification;
    90. }
    91. };
    92. }
    93. // 每个逻辑事务都会创建一个TransactionStatus,但是TransactionStatus中有一个属性代表当前逻辑事务底层的物理事务是不是新的
    94. TransactionStatus status = null;
    95. if (txAttr != null) {
    96. if (tm != null) {
    97. //
    98. status = tm.getTransaction(txAttr);
    99. }
    100. else {
    101. if (logger.isDebugEnabled()) {
    102. logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
    103. "] because no transaction manager has been configured");
    104. }
    105. }
    106. }
    107. // 返回一个TransactionInfo对象,表示得到了一个事务,可能是新创建的一个事务,也可能是拿到的已有的事务
    108. return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
    109. }

    关键方法getTransaction(txAttr):

    1. /**
    2. * This implementation handles propagation behavior. Delegates to
    3. * {@code doGetTransaction}, {@code isExistingTransaction}
    4. * and {@code doBegin}.
    5. * @see #doGetTransaction
    6. * @see #isExistingTransaction
    7. * @see #doBegin
    8. */
    9. @Override
    10. public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
    11. throws TransactionException {
    12. // Use defaults if no transaction definition given.
    13. TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
    14. // 得到一个新的DataSourceTransactionObject对象
    15. // new DataSourceTransactionObject txObject
    16. Object transaction = doGetTransaction();
    17. boolean debugEnabled = logger.isDebugEnabled();
    18. // transaction.getConnectionHolder().isTransactionActive()
    19. if (isExistingTransaction(transaction)) {
    20. // Existing transaction found -> check propagation behavior to find out how to behave.
    21. return handleExistingTransaction(def, transaction, debugEnabled);
    22. }
    23. // Check definition settings for new transaction.
    24. if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
    25. throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
    26. }
    27. // No existing transaction found -> check propagation behavior to find out how to proceed.
    28. // 手动开启事务的传播机制
    29. if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
    30. throw new IllegalTransactionStateException(
    31. "No existing transaction found for transaction marked with propagation 'mandatory'");
    32. }
    33. // 在当前Thread中没有事务的前提下,以下三个是等价的
    34. else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
    35. def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
    36. def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
    37. // 没有事务需要挂起,不过TransactionSynchronization有可能需要挂起
    38. // suspendedResources表示当前线程被挂起的资源持有对象(数据库连接、TransactionSynchronization)
    39. SuspendedResourcesHolder suspendedResources = suspend(null);
    40. if (debugEnabled) {
    41. logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
    42. }
    43. try {
    44. // 开启事务后,transaction中就会有数据库连接了,并且isTransactionActive为true
    45. // 并返回TransactionStatus对象,该对象保存了很多信息,包括被挂起的资源
    46. return startTransaction(def, transaction, debugEnabled, suspendedResources);
    47. }
    48. catch (RuntimeException | Error ex) {
    49. resume(null, suspendedResources);
    50. throw ex;
    51. }
    52. }
    53. else {
    54. // Create "empty" transaction: no actual transaction, but potentially synchronization.
    55. if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
    56. logger.warn("Custom isolation level specified but no actual transaction initiated; " +
    57. "isolation level will effectively be ignored: " + def);
    58. }
    59. boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
    60. return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
    61. }
    62. }
    63. /**
    64. * Create a TransactionStatus for an existing transaction.
    65. */
    66. private TransactionStatus handleExistingTransaction(
    67. TransactionDefinition definition, Object transaction, boolean debugEnabled)
    68. throws TransactionException {
    69. if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
    70. throw new IllegalTransactionStateException(
    71. "Existing transaction found for transaction marked with propagation 'never'");
    72. }
    73. if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
    74. if (debugEnabled) {
    75. logger.debug("Suspending current transaction");
    76. }
    77. // 把当前事务挂起,其中就会把数据库连接对象从ThreadLocal中移除
    78. Object suspendedResources = suspend(transaction);
    79. boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
    80. return prepareTransactionStatus(
    81. definition, null, false, newSynchronization, debugEnabled, suspendedResources);
    82. }
    83. if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
    84. if (debugEnabled) {
    85. logger.debug("Suspending current transaction, creating new transaction with name [" +
    86. definition.getName() + "]");
    87. }
    88. SuspendedResourcesHolder suspendedResources = suspend(transaction);
    89. try {
    90. return startTransaction(definition, transaction, debugEnabled, suspendedResources);
    91. }
    92. catch (RuntimeException | Error beginEx) {
    93. resumeAfterBeginException(transaction, suspendedResources, beginEx);
    94. throw beginEx;
    95. }
    96. }
    97. if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
    98. if (!isNestedTransactionAllowed()) {
    99. throw new NestedTransactionNotSupportedException(
    100. "Transaction manager does not allow nested transactions by default - " +
    101. "specify 'nestedTransactionAllowed' property with value 'true'");
    102. }
    103. if (debugEnabled) {
    104. logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
    105. }
    106. if (useSavepointForNestedTransaction()) {
    107. // Create savepoint within existing Spring-managed transaction,
    108. // through the SavepointManager API implemented by TransactionStatus.
    109. // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
    110. DefaultTransactionStatus status =
    111. prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
    112. // 创建一个savepoint
    113. status.createAndHoldSavepoint();
    114. return status;
    115. }
    116. else {
    117. // Nested transaction through nested begin and commit/rollback calls.
    118. // Usually only for JTA: Spring synchronization might get activated here
    119. // in case of a pre-existing JTA transaction.
    120. return startTransaction(definition, transaction, debugEnabled, null);
    121. }
    122. }
    123. // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
    124. if (debugEnabled) {
    125. logger.debug("Participating in existing transaction");
    126. }
    127. if (isValidateExistingTransaction()) {
    128. if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
    129. Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
    130. if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
    131. Constants isoConstants = DefaultTransactionDefinition.constants;
    132. throw new IllegalTransactionStateException("Participating transaction with definition [" +
    133. definition + "] specifies isolation level which is incompatible with existing transaction: " +
    134. (currentIsolationLevel != null ?
    135. isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
    136. "(unknown)"));
    137. }
    138. }
    139. if (!definition.isReadOnly()) {
    140. if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
    141. throw new IllegalTransactionStateException("Participating transaction with definition [" +
    142. definition + "] is not marked as read-only but existing transaction is");
    143. }
    144. }
    145. }
    146. // 如果依然是Propagation.REQUIRED
    147. boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    148. return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
    149. }

    开启事务startTransaction:

    1. /**
    2. * Start a new transaction.
    3. */
    4. private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
    5. // 是否开启一个新的TransactionSynchronization
    6. boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    7. // 开启的这个事务的状态信息:
    8. // 事务的定义、用来保存数据库连接的对象、是否是新事务,是否是新的TransactionSynchronization
    9. DefaultTransactionStatus status = newTransactionStatus(
    10. definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
    11. // 开启事务
    12. doBegin(transaction, definition);
    13. // 如果需要新开一个TransactionSynchronization,就把新创建的事务的一些状态信息设置到TransactionSynchronizationManager中
    14. prepareSynchronization(status, definition);
    15. return status;
    16. }
    17. // DataSourceTransactionManager#doBegin
    18. @Override
    19. protected void doBegin(Object transaction, TransactionDefinition definition) {
    20. DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    21. Connection con = null;
    22. try {
    23. // 如果当前线程中所使用的DataSource还没有创建过数据库连接,就获取一个新的数据库连接
    24. if (!txObject.hasConnectionHolder() ||
    25. txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
    26. Connection newCon = obtainDataSource().getConnection();
    27. if (logger.isDebugEnabled()) {
    28. logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
    29. }
    30. txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
    31. }
    32. txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
    33. con = txObject.getConnectionHolder().getConnection();
    34. // 根据@Transactional注解中的设置,设置Connection的readOnly与隔离级别
    35. Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
    36. txObject.setPreviousIsolationLevel(previousIsolationLevel);
    37. txObject.setReadOnly(definition.isReadOnly());
    38. // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
    39. // so we don't want to do it unnecessarily (for example if we've explicitly
    40. // configured the connection pool to set it already).
    41. // 设置autocommit为false
    42. if (con.getAutoCommit()) {
    43. txObject.setMustRestoreAutoCommit(true);
    44. if (logger.isDebugEnabled()) {
    45. logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
    46. }
    47. con.setAutoCommit(false);
    48. }
    49. prepareTransactionalConnection(con, definition);
    50. txObject.getConnectionHolder().setTransactionActive(true);
    51. // 设置数据库连接的过期时间
    52. int timeout = determineTimeout(definition);
    53. if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
    54. txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
    55. }
    56. // Bind the connection holder to the thread.
    57. // 把新建的数据库连接设置到resources中,resources就是一个ThreadLocal>,事务管理器中的设置的DataSource对象为key,数据库连接对象为value
    58. if (txObject.isNewConnectionHolder()) {
    59. TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
    60. }
    61. }
    62. catch (Throwable ex) {
    63. if (txObject.isNewConnectionHolder()) {
    64. DataSourceUtils.releaseConnection(con, obtainDataSource());
    65. txObject.setConnectionHolder(null, false);
    66. }
    67. throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
    68. }
    69. }

    Spring事务传播机制分类

    以非事务方式运行:表示以非Spring事务运行,表示在执行这个方法时,Spring事务管理器不会去建立数据库连接,执行sql时,由Mybatis或JdbcTemplate自己来建立数据库连接来执行 sql。

    案例分析

    情况1

    事务1内部调用事务2场景,无异常

    1. @Component
    2. public class UserService
    3. {
    4. @Autowired
    5. private UserService userService;
    6. @Transactional
    7. public void test()
    8. {
    9. // test方法中的sql
    10. userService.a();
    11. }
    12. @Transactional
    13. public void a()
    14. {
    15. // a方法中的sql
    16. }
    17. }

    默认情况下传播机制为REQUIRED,表示当前如果没有事务则新建一个事务,如果有事务则在当前事务中执行。

    执行流程:

    1、新建一个数据库连接conn

    2、设置conn的autocommit为false

    3、执行test方法中的sql

    4、执行a方法中的sql

    5、执行conn的commit()方法进行提交

    情况2

    事务1内部调用事务2场景,事务1内部有异常

    1. @Component
    2. public class UserService
    3. {
    4. @Autowired
    5. private UserService userService;
    6. @Transactional
    7. public void test()
    8. {
    9. // test方法中的sql
    10. userService.a();
    11. int result = 100/0;
    12. }
    13. @Transactional
    14. public void a()
    15. {
    16. // a方法中的sql
    17. }
    18. }

    执行流程:

    1、新建一个数据库连接conn

    2、设置conn的autocommit为false

    3、执行test方法中的sql

    4、执行a方法中的sql

    5、抛出异常

    6、执行conn的rollback()方法进行回滚,所以两个方法中的sql都会回滚掉

    情况3

    事务1内部调用事务2场景,事务2内部有异常

    1. @Component
    2. public class UserService
    3. {
    4. @Autowired
    5. private UserService userService;
    6. @Transactional
    7. public void test()
    8. {
    9. // test方法中的sql
    10. userService.a();
    11. }
    12. @Transactional
    13. public void a()
    14. {
    15. // a方法中的sql
    16. int result = 100/0;
    17. }
    18. }

    执行流程:

    1、新建一个数据库连接conn

    2、设置conn的autocommit为false

    3、执行test方法中的sql

    4、执行a方法中的sql

    5、抛出异常

    6、执行conn的rollback()方法进行回滚,所以两个方法中的sql都会回滚掉

    对于事务1而言,userService.a(); 这个方法的调用抛出了异常,所以事务1也要回滚

    情况4
    1. @Component
    2. public class UserService
    3. {
    4. @Autowired
    5. private UserService userService;
    6. @Transactional
    7. public void test()
    8. {
    9. // test方法中的sql
    10. userService.a();
    11. }
    12. @Transactional(propagation = Propagation.REQUIRES_NEW)
    13. public void a()
    14. {
    15. // a方法中的sql
    16. int result = 100/0;
    17. }
    18. }

    执行流程:

    1、新建一个数据库连接conn

    2、设置conn的autocommit为false

    3、执行test方法中的sql

    4、又新建一个数据库连接conn2

    5、执行a方法中的sql

    6、抛出异常

    7、执行conn2的rollback()方法进行回滚

    8、继续抛异常,对于test()方法而言,它会接收到一个异常,然后抛出

    9、执行conn的rollback()方法进行回滚,最终还是两个方法中的sql都回滚了

    Spring事务强制回滚

    正常情况下,a()调用b()方法时,如果b()方法抛了异常,但是在a()方法捕获了,那么a()的事务还是会正常提交的,但是有的时候,我们捕获异常可能仅仅只是不把异常信息返回给客户端,而是为了返回一些更友好的错误信息,而这个时候,我们还是希望事务能回滚的,那这个时候就得告诉Spring把当前事务回滚掉,做法就是:

    1. @Transactional
    2. public void test()
    3. {
    4. // 执行sql
    5. try
    6. {
    7. b();
    8. } catch (Exception e)
    9. {
    10. // 构造友好的错误信息返回
    11. TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
    12. }
    13. }
    14. public void b() throws Exception
    15. {
    16. throw new Exception();
    17. }

    事务仍然回滚,捕获异常是为了给客户端更好的提示

    TransactionSynchronization

    Spring事务有可能会提交,回滚、挂起、恢复,所以Spring事务提供了一种机制,可以让程序员来监听当前Spring事务所处于的状态。

    1. @Component
    2. public class UserService
    3. {
    4. @Autowired
    5. private JdbcTemplate jdbcTemplate;
    6. @Autowired
    7. private UserService userService;
    8. @Transactional
    9. public void test()
    10. {
    11. TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization()
    12. {
    13. @Override
    14. public void suspend()
    15. {
    16. System.out.println("test被挂起了");
    17. }
    18. @Override
    19. public void resume()
    20. {
    21. System.out.println("test被恢复了");
    22. }
    23. @Override
    24. public void beforeCommit(boolean readOnly)
    25. {
    26. System.out.println("test准备要提交了");
    27. }
    28. @Override
    29. public void beforeCompletion()
    30. {
    31. System.out.println("test准备要提交或回滚了");
    32. }
    33. @Override
    34. public void afterCommit()
    35. {
    36. System.out.println("test提交成功了");
    37. }
    38. @Override
    39. public void afterCompletion(int status)
    40. {
    41. System.out.println("test提交或回滚成功了");
    42. }
    43. });
    44. jdbcTemplate.execute("insert into t1 values(1,1,1,1,'1')");
    45. System.out.println("test");
    46. userService.a();
    47. }
    48. @Transactional(propagation = Propagation.REQUIRES_NEW)
    49. public void a()
    50. {
    51. TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization()
    52. {
    53. @Override
    54. public void suspend()
    55. {
    56. System.out.println("a被挂起了");
    57. }
    58. @Override
    59. public void resume()
    60. {
    61. System.out.println("a被恢复了");
    62. }
    63. @Override
    64. public void beforeCommit(boolean readOnly)
    65. {
    66. System.out.println("a准备要提交了");
    67. }
    68. @Override
    69. public void beforeCompletion()
    70. {
    71. System.out.println("a准备要提交或回滚了");
    72. }
    73. @Override
    74. public void afterCommit()
    75. {
    76. System.out.println("a提交成功了");
    77. }
    78. @Override
    79. public void afterCompletion(int status)
    80. {
    81. System.out.println("a提交或回滚成功了");
    82. System.out.println("a提交或回滚成功了");
    83. }
    84. });
    85. jdbcTemplate.execute("insert into t1 values(2,2,2,2,'2')");
    86. System.out.println("a");
    87. }
    88. }

  • 相关阅读:
    Remote access minikube cluster远程访问minikube k8s集群
    CoreData 同步 iCloud 数据导致 App 启动超时被系统 watchdog 终止的原因及解决
    android学习笔记(四)
    git基本配置及代码下载上传
    js 关键字赋值.html
    mac安装java
    《动机与人格》笔记(二)——认识和理解的欲望
    starRocks搭建
    docker四种网络模式
    使用html+css实现一个静态页面(厦门旅游网站制作6个页面) 旅游网页设计制作 HTML5期末考核大作业,网站——美丽家乡。 学生旅行 游玩 主题住宿网页
  • 原文地址:https://blog.csdn.net/weixin_58482311/article/details/134387089