将自己热爱与信仰的技术,持续不辍地传递。—— 步道师
Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。

下面是一个分布式事务在Seata中的执行流程(后面我们会详细讲解):
在了解Seata的分布式事务解决方案是业务层面,只依赖于单台数据库的事务能力。Seata框架中一个分布式事务包含3中角色:
Transaction Coordinator (TC): 事务协调器,维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚。
Transaction Manager ™: 控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议。
Resource Manager (RM): 控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚。
基于支持本地 ACID 事务的关系型数据库。
Java 应用,通过 JDBC 访问数据库。
严格意义上的事务实现应该是具备原子性、一致性、隔离性和持久性,简称ACID。
2PC(Two-phase commit protocol),二阶段提交。二阶段提交是一种强一致性设计,2PC引入一个事务协调者的角色来协调管理各参与者(也可称之为各本地资源)的提交和回滚,二阶段分别指的是准备和提交两个阶段。


这里把这两种情况在这里给大家简单的描述了下
一个分布式的全局事务,整体是 两阶段提交 的模型。全局事务是由若干分支事务组成的,分支事务要满足 两阶段提交 的模型要求,即需要每个分支事务都具备自己的:
一阶段 prepare 行为
二阶段 commit 或 rollback 行为

根据两阶段行为模式的不同,我们将分支事务划分为 Automatic (Branch) Transaction Mode 和 Manual (Branch) Transaction Mode.
AT 模式基于 支持本地 ACID 事务 的 关系型数据库:
一阶段 prepare 行为:在本地事务中,一并提交业务数据更新和相应回滚日志记录。
二阶段 commit 行为:马上成功结束,自动 异步批量清理回滚日志。
二阶段 rollback 行为:通过回滚日志,自动 生成补偿操作,完成数据回滚。
相应的,TCC 模式,不依赖于底层数据资源的事务支持:
一阶段 prepare 行为:调用 自定义 的 prepare 逻辑。
二阶段 commit 行为:调用 自定义 的 commit 逻辑。
二阶段 rollback 行为:调用 自定义 的 rollback 逻辑。
所谓 TCC 模式,是指支持把 自定义 的分支事务纳入到全局事务的管理中。
Saga模式是SEATA提供的长事务解决方案,在Saga模式中,业务流程中每个参与者都提交本地事务,当出现某一个参与者失败则补偿前面已经成功的参与者,一阶段正向服务和二阶段补偿服务都由业务开发实现。

适用场景:
优势:
缺点:
在 Seata 定义的分布式事务框架内,利用事务资源(数据库、消息服务等)对 XA 协议的支持,以 XA 协议的机制来管理分支事务的一种 事务模式。
执行阶段:
完成阶段:
官网下载地址: https://seata.io/zh-cn/blog/download.html
解压seata-server-1.5.2安装包。解压出来我们进入bin目录会看到相应的启动脚本(由于这里是在Windows演示)

在 Linux/Mac 下
sh ./bin/seata-server.sh
在 Windows 下
bin\seata-server.bat
高可用部署:
Seata 的高可用依赖于注册中心、配置中心和数据库来实现。使用nacos和redis为例

这里我们看到是基于文件的方式,后面我们会有详细的课程讲解基于nacos注册中心的安装配置等
windows:bin目录下双击seata-server.bat启动
linux:命令行启动 seata-server.sh -h 127.0.0.1 -p 8091
启动参数:

首先,您需要将 nacos-client 的 Maven 依赖添加到您的项目 pom.xml 文件中
io.seata</groupId>
seata-spring-boot-starter</artifactId>
最新版</version>
</dependency>
com.alibaba.nacos</groupId>
nacos-client</artifactId>
1.2.0及以上版本</version>
</dependency>
在 application.yml 中加入对应的配置中心,其余配置参考
seata:
config:
type: nacos
nacos:
server-addr: 127.0.0.1:8848
group : "SEATA_GROUP"
namespace: ""
username: "XXXXXX"
在 registry.conf 中加入对应配置中心,其余配置参考
config {
type = "nacos"
nacos {
serverAddr = "127.0.0.1:8848"
group = "SEATA_GROUP"
namespace = ""
username = "nacos"
}
}
seata:
config:
type: nacos
nacos:
server-addr: 127.0.0.1:8848
group : "SEATA_GROUP"
namespace: ""
dataId: "seataServer.properties"
username: "nacos"
今天咱们开始咱们的主题了,在了解以上问题之后我们是不是比较好奇他的运行流程。了解执行流程我们就要从官网,或者源码入手来了解
// 【排除SpringBoot启动时自动加载GlobalTransactionAutoConfiguration.class】
@SpringBootApplication(exclude = {GlobalTransactionAutoConfiguration.class})
@EnableFeignClients // 远程调用
@EnableDiscoveryClient // 开启nacos服务
public class GatewayApplication{
public static void main(String[] args) {
ErosApplication.run(ProjectNameConstatnt.GATEWAY_NAME,"consume",GatewayApplication.class,args);
}
}
@GlobalTransactional(name="XXXXX",rollbackFor = RuntimeException.class)
方法上加了@GlobalTransactional,Seata通过aop检测到之后,就会使用TM和TC通信,注册全局事务。
在@GlobalTransactional涵括的代码中,不管是本服务中的sql操作,还是feign调用别的服务的sql操作,只要sql操作满足如下:insert操作,delete操作,update操作,select for update操作。 就会被seata增强,使用RM与TC通信,注册分支事务。
全局事务:包括开启事务、提交、回滚、获取当前状态等方法。
public interface GlobalTransaction {
/**
* 开启一个全局事务(使用默认的事务名和超时时间)
*/
void begin() throws TransactionException;
/**
* 开启一个全局事务,并指定超时时间(使用默认的事务名)
*/
void begin(int timeout) throws TransactionException;
/**
* 开启一个全局事务,并指定事务名和超时时间
*/
void begin(int timeout, String name) throws TransactionException;
/**
* 全局提交
*/
void commit() throws TransactionException;
/**
* 全局回滚
*/
void rollback() throws TransactionException;
/**
* 获取事务的当前状态
*/
GlobalStatus getStatus() throws TransactionException;
/**
* 获取事务的 XID
*/
String getXid();
}
GlobalTransactionScanner继承自AbstractAutoProxyCreator,在这里拦截到加了@GlobalTransactional的方法。
GlobalTransactionScanner

@Override
public void afterPropertiesSet() {
//是否禁止了全局事务
if (disableGlobalTransaction) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global transaction is disabled.");
}
return;
}
//初始化netty 客户端(TM RM)
initClient();
}
初始化TM,RM
private void initClient() {
//init TM
TMClient.init(applicationId, txServiceGroup);
//init RM
RMClient.init(applicationId, txServiceGroup);
}
GlobalTransactionalInterceptor#invoke

GlobalTransaction 和 GlobalTransactionContext API 把一个业务服务的调用包装成带有分布式事务支持的服务。
public class TransactionalTemplate {
public Object execute(TransactionalExecutor business) throws TransactionalExecutor.ExecutionException {
// 1. 获取当前全局事务实例或创建新的实例
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
// 2. 开启全局事务
try {
tx.begin(business.timeout(), business.name());
} catch (TransactionException txe) {
// 2.1 开启失败
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.BeginFailure);
}
Object rs = null;
try {
// 3. 调用业务服务
rs = business.execute();
} catch (Throwable ex) {
// 业务调用本身的异常
try {
// 全局回滚
tx.rollback();
// 3.1 全局回滚成功:抛出原始业务异常
throw new TransactionalExecutor.ExecutionException(tx, TransactionalExecutor.Code.RollbackDone, ex);
} catch (TransactionException txe) {
// 3.2 全局回滚失败:
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.RollbackFailure, ex);
}
}
// 4. 全局提交
try {
tx.commit();
} catch (TransactionException txe) {
// 4.1 全局提交失败:
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.CommitFailure);
}
return rs;
}
}
DataSourceProxy#getConnection
@Override
public ConnectionProxy getConnection() throws SQLException {
Connection targetConnection = targetDataSource.getConnection();
return new ConnectionProxy(this, targetConnection);
}
ConnectionProxy#doCommit
private void doCommit() throws SQLException {
//处理@GlobalTransaction的分支事务
if (context.inGlobalTransaction()) {
processGlobalTransactionCommit();
}
//处理@GlobalLock,即检查一下是否可以获取全局锁
else if (context.isGlobalLockRequire()) {
processLocalCommitWithGlobalLocks();
} else {
targetConnection.commit();
}
}
ConnectionProxy#processGlobalTransactionCommit

模板的异常方法
class ExecutionException extends Exception {
// 发生异常的事务实例
private GlobalTransaction transaction;
// 异常编码:
// BeginFailure(开启事务失败)
// CommitFailure(全局提交失败)
// RollbackFailure(全局回滚失败)
// RollbackDone(全局回滚成功)
private Code code;
// 触发回滚的业务原始异常
private Throwable originalException;