分布式事务常用的解决方案,XA,Saga,TCC,MQ补偿
1.MySQL XA事务例子
XA是分布式事务处理的一个规范, open group发起, 被数据库厂商广泛的支持, 具体规范在 http://www.opengroup.org/public/pubs/catalog/c193.htm. MySQL5.x和Connector/J 5.0.0起,InnoDB引擎就开始支持XA了, 来段代码实际些。
package com._51discuss.xa_test;
import com.mysql.cj.jdbc.JdbcConnection;
import com.mysql.cj.jdbc.MysqlXAConnection;
import com.mysql.cj.jdbc.MysqlXid;
import javax.sql.XAConnection;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.sql.*;
import java.util.UUID;
/**
*
CREATE TABLE `user` (
`user_id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`user_name` varchar(32) CHARACTER SET utf8mb4 NOT NULL COMMENT 'User nick name',
`user_mobile` char(11) CHARACTER SET utf8mb4 NOT NULL,
`user_pwd` varchar(64) COLLATE utf8mb4_bin NOT NULL,
`user_create_time` datetime(3) NOT NULL,
PRIMARY KEY (`user_id`),
UNIQUE KEY `user_mobile_UNIQUE` (`user_mobile`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
* @author Zeal
*/
public class LocalXaTest {
public LocalXaTest() {
try {
// The newInstance() call is a work around for some
// broken Java implementations
Class.forName("com.mysql.cj.jdbc.Driver").newInstance();
} catch (Exception ex) {
// handle the error
throw new IllegalStateException(ex.toString(), ex);
}
}
public void multiInserts() throws Exception {
String jdbcUrl = "jdbc:mysql://localhost/test?serverTimezone=GMT%2B8&autoReconnect=true&characterEncoding=utf8&zeroDateTimeBehavior=CONVERT_TO_NULL&allowMultiQueries=true";
String jdbcUser = "root";
String jdbcPassword = null;
XAConnection connection1 = null;
XAConnection connection2 = null;
try {
connection1 = getConnection(jdbcUrl, jdbcUser, jdbcPassword, true);
connection2 = getConnection(jdbcUrl, jdbcUser, jdbcPassword, true);
XAResource resource1 = connection1.getXAResource();
XAResource resource2 = connection2.getXAResource();
final byte[] gtrid = UUID.randomUUID().toString().getBytes();
final int formatId = 1;
Xid xid1 = null;
Xid xid2 = null;
try {
//======================================================================================================
//First time, it will succeed
xid1 = insertUser(connection1, gtrid, formatId, "user1", "13555555555", "123456");
//With duplicate user, it should roll back
xid2 = insertUser(connection2, gtrid, formatId, "user2", "13555555556", "123456");
//======================================================================================================
//Second time, it will roll back
//The second time, it should roll back
// xid1 = insertUser(connection1, gtrid, formatId, "user3", "13555555557", "123456");
// //With duplicate user, it should roll back
// xid2 = insertUser(connection2, gtrid, formatId, "user1", "13555555556", "123456");
//2PC
int prepare1 = resource1.prepare(xid1);
int prepare2 = resource2.prepare(xid2);
final boolean onePhase = false;
if (prepare1 == XAResource.XA_OK && prepare2 == XAResource.XA_OK) {
resource1.commit(xid1, onePhase);
resource2.commit(xid2, onePhase);
} else {
resource1.rollback(xid1);
resource2.rollback(xid2);
}
}
catch (Exception e) {
e.printStackTrace();
if (xid1 != null) {
resource1.rollback(xid1);
}
if (xid2 != null) {
resource2.rollback(xid2);
}
throw e;
}
} finally {
close(connection1).close(connection2);
}
}
private Xid insertUser(XAConnection xaConnection, final byte[] gtrid, final int formatId, String userName, String userMobile, String userPwd) throws Exception {
Connection connection = xaConnection.getConnection();
XAResource resource = xaConnection.getXAResource();
String sql = "INSERT INTO `user`(`user_name`,`user_mobile`,`user_pwd`,`user_create_time`) VALUES(?,?,?,?)";
byte[] bqual = UUID.randomUUID().toString().getBytes();
MysqlXid xid = new MysqlXid(gtrid, bqual, formatId);
resource.start(xid, XAResource.TMNOFLAGS);
try (PreparedStatement psm = connection.prepareStatement(sql)) {
psm.setString(1, userName);
psm.setString(2, userMobile);
psm.setString(3, userPwd);
psm.setTimestamp(4, new Timestamp(System.currentTimeMillis()));
psm.executeUpdate();
resource.end(xid, XAResource.TMSUCCESS);
return xid;
}
}
private XAConnection getConnection(String jdbcUrl, String jdbcUser, String jdbcPassword, boolean logXaCommand) throws SQLException {
Connection connection = DriverManager.getConnection(jdbcUrl, jdbcUser, jdbcPassword);
return new MysqlXAConnection((JdbcConnection) connection, logXaCommand);
}
private LocalXaTest close(XAConnection closeable) {
if (closeable != null) {
try {
closeable.close();
} catch (Exception e) {
e.printStackTrace();
}
}
return this;
}
public static void main(String[] args) throws Exception {
LocalXaTest test = new LocalXaTest();
test.multiInserts();
}
}
第一轮插入user1,user2成功, 第二轮插入user3,user1会因为user1主键重复整体回滚。这里偷懒了,最好能连不同的数据库去执行。而这里XA就是经典的2PC(Two-Phase Commit两段提交)。按照MySQL官方的话说:
The MySQL implementation of XA enables a MySQL server to act as a Resource Manager that handles XA transactions within a global transaction. A client program that connects to the MySQL server acts as the Transaction Manager.
在微服务等场景, 每个服务管理自己的数据库, 调用客户端就用不到XAResource了, 就得用其它方案了。而实际XA用得不多,性能通常不大好,数据库实现也有差异甚至限制,实际上可能会使用更多的柔性事务。
2. JTA/XA实现,Atomikos与Bitronix
Spring boot官方推荐的JTA解决分布式事务。Atomikos其实是先驱级别的方案, TransactionEssentials是开源版, 支持JTA/XA, JDBC, JMS。 而商用版ExtremeTransactions则首创的支持TCC(TRYING、CONFIRMING、CANCELIING), 概念上和XA和DTP模型有些相像, 把资源层事务转移到业务层去补偿, 全局事务下的每个小事务就类似要实现try, confirm, cancel的业务接口了, 对业务实现要求就提高了。
Bitronix和开源版的Atomikos类似, 仅支持JTA/XA, 配置上一般是代理若干个数据源,实现JtaTransactionManger, 接口方法加个指定的@Transactional,用起来挺方便, 只是在服务化的场景使用有限, 细节就跳过了。
3.柔性事务Saga
Saga是分布式事务模式, 实现最终一致性。Saga包含多个子事务和对应子事务的补偿动作, 相比TCC则是少了try/prepare 准备动作, 不同的场景这样有好有坏。Saga一个出名些的实现是Apache ServiceComb, 主要是华为系的开源的全栈微服务解决方案,感觉蛮不错的, 新版本也开始支持TCC了,可以多了解。 还有一个是jdon提到的Eventuate Tram Saga框架。
4.柔性事务TCC
TCC有蛮多实现Atomikos ExtremeTransactions, ByteTCC, EasyTransaction,spring-cloud-rest-tcc,TCC-transaction,TXLCN还有阿里的Seata, 我们就简单的看下大厂的Seata, 直接套用官方例子。https://github.com/seata/seata/wiki/Quick-Start, 看下常用的spring boot, mybatis例子, https://github.com/seata/seata-samples/tree/master/springboot-mybatis
4.1 注意: 每个服务的数据库都需要增加undo日志表
-- 注意此处0.3.0+ 增加唯一索引 ux_undo_log
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
4.2 需要启动seata-server, 即作为Transcation Coordinate/TC事务协调者
Usage: sh seata-server.sh(for linux and mac) or cmd seata-server.bat(for windows) [options]
Options:
--host, -h
The host to bind.
Default: 0.0.0.0
--port, -p
The port to listen.
Default: 8091
--storeMode, -m
log store mode : file、db
Default: file
--help
e.g.
sh seata-server.sh -p 8091 -h 127.0.0.1 -m file
4.3 服务调用需要传递全局xid, spring boot mybatis例子是通过web filter获取xid, rest template拦截器调用服务自动增加xid。
package io.seata.samples.common.filter;
import io.seata.core.context.RootContext;
import org.apache.commons.lang.StringUtils;
import org.springframework.stereotype.Component;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
@Component
public class SeataFilter implements Filter {
@Override
public void init(FilterConfig filterConfig) throws ServletException {
}
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
HttpServletRequest req = (HttpServletRequest) servletRequest;
String xid = req.getHeader(RootContext.KEY_XID.toLowerCase());
boolean isBind = false;
if (StringUtils.isNotBlank(xid)) {
RootContext.bind(xid);
isBind = true;
}
try {
filterChain.doFilter(servletRequest, servletResponse);
} finally {
if (isBind) {
RootContext.unbind();
}
}
}
@Override
public void destroy() {
}
}
package io.seata.samples.common.interceptor;
import io.seata.core.context.RootContext;
import org.apache.commons.lang.StringUtils;
import org.springframework.http.HttpRequest;
import org.springframework.http.client.ClientHttpRequestExecution;
import org.springframework.http.client.ClientHttpRequestInterceptor;
import org.springframework.http.client.ClientHttpResponse;
import org.springframework.http.client.support.HttpRequestWrapper;
import java.io.IOException;
public class SeataRestTemplateInterceptor implements ClientHttpRequestInterceptor {
public SeataRestTemplateInterceptor() {
}
public ClientHttpResponse intercept(HttpRequest httpRequest, byte[] bytes, ClientHttpRequestExecution clientHttpRequestExecution) throws IOException {
HttpRequestWrapper requestWrapper = new HttpRequestWrapper(httpRequest);
String xid = RootContext.getXID();
if (StringUtils.isNotEmpty(xid)) {
requestWrapper.getHeaders().add(RootContext.KEY_XID, xid);
}
return clientHttpRequestExecution.execute(requestWrapper, bytes);
}
}
4.4 业务端调用只需要增加个@GlobalTransaction即可
package io.seata.samples.business.controller;
import io.seata.samples.business.service.BusinessService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.http.HttpServletRequest;
@RequestMapping("/api/business")
@RestController
public class BusinessController {
@Autowired
private BusinessService businessService;
/**
* 购买下单,模拟全局事务提交
*
* @return
*/
@RequestMapping("/purchase/commit")
public Boolean purchaseCommit(HttpServletRequest request) {
businessService.purchase("1001", "2001", 1);
return true;
}
/**
* 购买下单,模拟全局事务回滚
*
* @return
*/
@RequestMapping("/purchase/rollback")
public Boolean purchaseRollback() {
try {
businessService.purchase("1002", "2001", 1);
} catch (Exception e) {
e.printStackTrace();
return false;
}
return true;
}
}
package io.seata.samples.business.service;
import io.seata.core.context.RootContext;
import io.seata.samples.business.client.OrderClient;
import io.seata.samples.business.client.StorageClient;
import io.seata.spring.annotation.GlobalTransactional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class BusinessService {
private static final Logger LOGGER = LoggerFactory.getLogger(BusinessService.class);
@Autowired
private StorageClient storageClient;
@Autowired
private OrderClient orderClient;
/**
* 减库存,下订单
*
* @param userId
* @param commodityCode
* @param orderCount
*/
@GlobalTransactional
public void purchase(String userId, String commodityCode, int orderCount) {
LOGGER.info("purchase begin ... xid: " + RootContext.getXID());
storageClient.deduct(commodityCode, orderCount);
orderClient.create(userId, commodityCode, orderCount);
}
}
可能觉得神奇, TCC的cancel都不用写吗? 实际上seata的DataSourceProxy(DataSource)和RootContext应该做了很多事情, 上图整个流程的xid,undo-log/rollback-info,tc交互都隐藏其中,补偿SQL都自动做了,大厂还是大厂,且不论是KPI项目还是什么,不得不承认,阿里开源的中间件还是给中小团队带来福利。
seata最新的连saga也加入了,例子和文档也丰富, 估计这玩意会火起来。不过从架构上也看到, TC/seata-server应该容易成为瓶颈(单点或集群?), undo-log, rollback等肯定会拖慢些, 不过分布式事务都快不了。
5.其它分布式解决方案
互联网方案更多的实现最终一致性, 采用MQ最大努力的通知补偿, 或业务端一些主动查询,对账等方式补偿。
分布式事务如何处理还是得结合业务去考虑, 并不是什么业务都强要求ACID, 例如支付最大努力通知,主动查询业务补偿, 因地制宜的选择适合自己的方案就好。
- 原文作者:Zealot
- 原文链接:https://www.51discuss.com/posts/distribute-transaction-solution/
- 版权声明:本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可,非商业转载请注明出处(作者,原文链接),商业转载请联系作者获得授权。