1.MySQL XA事务例子

XA是分布式事务处理的一个规范, open group发起, 被数据库厂商广泛的支持, 具体规范在 http://www.opengroup.org/public/pubs/catalog/c193.htm. MySQL5.x和Connector/J 5.0.0起,InnoDB引擎就开始支持XA了, 来段代码实际些。

package com._51discuss.xa_test;

import com.mysql.cj.jdbc.JdbcConnection;
import com.mysql.cj.jdbc.MysqlXAConnection;
import com.mysql.cj.jdbc.MysqlXid;

import javax.sql.XAConnection;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.sql.*;
import java.util.UUID;

/**
 *
CREATE TABLE `user` (
  `user_id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `user_name` varchar(32) CHARACTER SET utf8mb4 NOT NULL COMMENT 'User nick name',
  `user_mobile` char(11) CHARACTER SET utf8mb4 NOT NULL,
  `user_pwd` varchar(64) COLLATE utf8mb4_bin NOT NULL,
  `user_create_time` datetime(3) NOT NULL,
  PRIMARY KEY (`user_id`),
  UNIQUE KEY `user_mobile_UNIQUE` (`user_mobile`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
 * @author Zeal
 */
public class LocalXaTest {

    public LocalXaTest() {
        try {
            // The newInstance() call is a work around for some
            // broken Java implementations
            Class.forName("com.mysql.cj.jdbc.Driver").newInstance();
        } catch (Exception ex) {
            // handle the error
            throw new IllegalStateException(ex.toString(), ex);
        }
    }

    public void multiInserts() throws Exception {
        String jdbcUrl = "jdbc:mysql://localhost/test?serverTimezone=GMT%2B8&autoReconnect=true&characterEncoding=utf8&zeroDateTimeBehavior=CONVERT_TO_NULL&allowMultiQueries=true";
        String jdbcUser = "root";
        String jdbcPassword = null;
        XAConnection connection1 = null;
        XAConnection connection2 = null;
        try {
            connection1 = getConnection(jdbcUrl, jdbcUser, jdbcPassword, true);
            connection2 = getConnection(jdbcUrl, jdbcUser, jdbcPassword, true);
            XAResource resource1 = connection1.getXAResource();
            XAResource resource2 = connection2.getXAResource();
            final byte[] gtrid = UUID.randomUUID().toString().getBytes();
            final int formatId = 1;
            Xid xid1 = null;
            Xid xid2 = null;
            try {
                //======================================================================================================
                //First time, it will succeed
                xid1 = insertUser(connection1, gtrid, formatId, "user1", "13555555555", "123456");
                //With duplicate user, it should roll back
                xid2 = insertUser(connection2, gtrid, formatId, "user2", "13555555556", "123456");
                //======================================================================================================
                //Second time, it will roll back
                //The second time, it should roll back
//                xid1 = insertUser(connection1, gtrid, formatId, "user3", "13555555557", "123456");
//                //With duplicate user, it should roll back
//                xid2 = insertUser(connection2, gtrid, formatId, "user1", "13555555556", "123456");
                //2PC
                int prepare1 = resource1.prepare(xid1);
                int prepare2 = resource2.prepare(xid2);
                final boolean onePhase = false;
                if (prepare1 == XAResource.XA_OK && prepare2 == XAResource.XA_OK) {
                    resource1.commit(xid1, onePhase);
                    resource2.commit(xid2, onePhase);
                } else {
                    resource1.rollback(xid1);
                    resource2.rollback(xid2);
                }
            }
            catch (Exception e) {
                e.printStackTrace();
                if (xid1 != null) {
                    resource1.rollback(xid1);
                }
                if (xid2 != null) {
                    resource2.rollback(xid2);
                }
                throw e;
            }

        } finally {
            close(connection1).close(connection2);
        }
    }


    private Xid insertUser(XAConnection xaConnection, final byte[] gtrid, final int formatId, String userName, String userMobile, String userPwd) throws Exception {
        Connection connection = xaConnection.getConnection();
        XAResource resource = xaConnection.getXAResource();
        String sql = "INSERT INTO `user`(`user_name`,`user_mobile`,`user_pwd`,`user_create_time`) VALUES(?,?,?,?)";
        byte[] bqual = UUID.randomUUID().toString().getBytes();
        MysqlXid xid = new MysqlXid(gtrid, bqual, formatId);
        resource.start(xid, XAResource.TMNOFLAGS);
        try (PreparedStatement psm = connection.prepareStatement(sql)) {
            psm.setString(1, userName);
            psm.setString(2, userMobile);
            psm.setString(3, userPwd);
            psm.setTimestamp(4, new Timestamp(System.currentTimeMillis()));
            psm.executeUpdate();
            resource.end(xid, XAResource.TMSUCCESS);
            return xid;
        }
    }

    private XAConnection getConnection(String jdbcUrl, String jdbcUser, String jdbcPassword, boolean logXaCommand) throws SQLException {
        Connection connection = DriverManager.getConnection(jdbcUrl, jdbcUser, jdbcPassword);
        return new MysqlXAConnection((JdbcConnection) connection, logXaCommand);
    }

    private LocalXaTest close(XAConnection closeable) {
        if (closeable != null) {
            try {
                closeable.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        return this;
    }

    public static void main(String[] args) throws Exception {
        LocalXaTest test = new LocalXaTest();
        test.multiInserts();
    }
}

第一轮插入user1,user2成功, 第二轮插入user3,user1会因为user1主键重复整体回滚。这里偷懒了,最好能连不同的数据库去执行。而这里XA就是经典的2PC(Two-Phase Commit两段提交)。按照MySQL官方的话说:

The MySQL implementation of XA enables a MySQL server to act as a Resource Manager that handles XA transactions within a global transaction. A client program that connects to the MySQL server acts as the Transaction Manager.

在微服务等场景, 每个服务管理自己的数据库, 调用客户端就用不到XAResource了, 就得用其它方案了。而实际XA用得不多,性能通常不大好,数据库实现也有差异甚至限制,实际上可能会使用更多的柔性事务。

2. JTA/XA实现,Atomikos与Bitronix

Spring boot官方推荐的JTA解决分布式事务。Atomikos其实是先驱级别的方案, TransactionEssentials是开源版, 支持JTA/XA, JDBC, JMS。 而商用版ExtremeTransactions则首创的支持TCC(TRYING、CONFIRMING、CANCELIING), 概念上和XA和DTP模型有些相像, 把资源层事务转移到业务层去补偿, 全局事务下的每个小事务就类似要实现try, confirm, cancel的业务接口了, 对业务实现要求就提高了。

Bitronix和开源版的Atomikos类似, 仅支持JTA/XA, 配置上一般是代理若干个数据源,实现JtaTransactionManger, 接口方法加个指定的@Transactional,用起来挺方便, 只是在服务化的场景使用有限, 细节就跳过了。

3.柔性事务Saga

Saga是分布式事务模式, 实现最终一致性。Saga包含多个子事务和对应子事务的补偿动作, 相比TCC则是少了try/prepare 准备动作, 不同的场景这样有好有坏。Saga一个出名些的实现是Apache ServiceComb, 主要是华为系的开源的全栈微服务解决方案,感觉蛮不错的, 新版本也开始支持TCC了,可以多了解。 还有一个是jdon提到的Eventuate Tram Saga框架。 service comb

4.柔性事务TCC

TCC有蛮多实现Atomikos ExtremeTransactions, ByteTCC, EasyTransaction,spring-cloud-rest-tcc,TCC-transaction,TXLCN还有阿里的Seata, 我们就简单的看下大厂的Seata, 直接套用官方例子。https://github.com/seata/seata/wiki/Quick-Start, 看下常用的spring boot, mybatis例子, https://github.com/seata/seata-samples/tree/master/springboot-mybatis

seata

4.1 注意: 每个服务的数据库都需要增加undo日志表

-- 注意此处0.3.0+ 增加唯一索引 ux_undo_log
CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  `ext` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

4.2 需要启动seata-server, 即作为Transcation Coordinate/TC事务协调者

Usage: sh seata-server.sh(for linux and mac) or cmd seata-server.bat(for windows) [options]
  Options:
    --host, -h
      The host to bind.
      Default: 0.0.0.0
    --port, -p
      The port to listen.
      Default: 8091
    --storeMode, -m
      log store mode : file、db
      Default: file
    --help

e.g.

sh seata-server.sh -p 8091 -h 127.0.0.1 -m file

4.3 服务调用需要传递全局xid, spring boot mybatis例子是通过web filter获取xid, rest template拦截器调用服务自动增加xid。

package io.seata.samples.common.filter;

import io.seata.core.context.RootContext;
import org.apache.commons.lang.StringUtils;
import org.springframework.stereotype.Component;

import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import java.io.IOException;

@Component
public class SeataFilter implements Filter {
    @Override
    public void init(FilterConfig filterConfig) throws ServletException {
    }

    @Override
    public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
        HttpServletRequest req = (HttpServletRequest) servletRequest;
        String xid = req.getHeader(RootContext.KEY_XID.toLowerCase());
        boolean isBind = false;
        if (StringUtils.isNotBlank(xid)) {
            RootContext.bind(xid);
            isBind = true;
        }
        try {
            filterChain.doFilter(servletRequest, servletResponse);
        } finally {
            if (isBind) {
                RootContext.unbind();
            }
        }
    }

    @Override
    public void destroy() {
    }
}
package io.seata.samples.common.interceptor;

import io.seata.core.context.RootContext;
import org.apache.commons.lang.StringUtils;
import org.springframework.http.HttpRequest;
import org.springframework.http.client.ClientHttpRequestExecution;
import org.springframework.http.client.ClientHttpRequestInterceptor;
import org.springframework.http.client.ClientHttpResponse;
import org.springframework.http.client.support.HttpRequestWrapper;

import java.io.IOException;

public class SeataRestTemplateInterceptor implements ClientHttpRequestInterceptor {
    public SeataRestTemplateInterceptor() {
    }

    public ClientHttpResponse intercept(HttpRequest httpRequest, byte[] bytes, ClientHttpRequestExecution clientHttpRequestExecution) throws IOException {
        HttpRequestWrapper requestWrapper = new HttpRequestWrapper(httpRequest);
        String xid = RootContext.getXID();
        if (StringUtils.isNotEmpty(xid)) {
            requestWrapper.getHeaders().add(RootContext.KEY_XID, xid);
        }

        return clientHttpRequestExecution.execute(requestWrapper, bytes);
    }
}

4.4 业务端调用只需要增加个@GlobalTransaction即可

package io.seata.samples.business.controller;

import io.seata.samples.business.service.BusinessService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.servlet.http.HttpServletRequest;

@RequestMapping("/api/business")
@RestController
public class BusinessController {

    @Autowired
    private BusinessService businessService;

    /**
     * 购买下单,模拟全局事务提交
     *
     * @return
     */
    @RequestMapping("/purchase/commit")
    public Boolean purchaseCommit(HttpServletRequest request) {
        businessService.purchase("1001", "2001", 1);
        return true;
    }

    /**
     * 购买下单,模拟全局事务回滚
     *
     * @return
     */
    @RequestMapping("/purchase/rollback")
    public Boolean purchaseRollback() {
        try {
            businessService.purchase("1002", "2001", 1);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }

        return true;
    }
}
package io.seata.samples.business.service;

import io.seata.core.context.RootContext;
import io.seata.samples.business.client.OrderClient;
import io.seata.samples.business.client.StorageClient;
import io.seata.spring.annotation.GlobalTransactional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class BusinessService {

    private static final Logger LOGGER = LoggerFactory.getLogger(BusinessService.class);

    @Autowired
    private StorageClient storageClient;
    @Autowired
    private OrderClient orderClient;

    /**
     * 减库存,下订单
     *
     * @param userId
     * @param commodityCode
     * @param orderCount
     */
    @GlobalTransactional
    public void purchase(String userId, String commodityCode, int orderCount) {
        LOGGER.info("purchase begin ... xid: " + RootContext.getXID());
        storageClient.deduct(commodityCode, orderCount);
        orderClient.create(userId, commodityCode, orderCount);
    }
}

可能觉得神奇, TCC的cancel都不用写吗? 实际上seata的DataSourceProxy(DataSource)和RootContext应该做了很多事情, 上图整个流程的xid,undo-log/rollback-info,tc交互都隐藏其中,补偿SQL都自动做了,大厂还是大厂,且不论是KPI项目还是什么,不得不承认,阿里开源的中间件还是给中小团队带来福利。

seata最新的连saga也加入了,例子和文档也丰富, 估计这玩意会火起来。不过从架构上也看到, TC/seata-server应该容易成为瓶颈(单点或集群?), undo-log, rollback等肯定会拖慢些, 不过分布式事务都快不了。

5.其它分布式解决方案

互联网方案更多的实现最终一致性, 采用MQ最大努力的通知补偿, 或业务端一些主动查询,对账等方式补偿。

分布式事务如何处理还是得结合业务去考虑, 并不是什么业务都强要求ACID, 例如支付最大努力通知,主动查询业务补偿, 因地制宜的选择适合自己的方案就好。