分布式-2pc示例简介

2p示例简介

Posted by Kang on September 19, 2019

前面分布式-2pc、3pc、TCC已经介绍分布式相关的事务。本文接上文,提供一个可简单学习的2PC示例。

1. 基于XAResource底层接口实现

1
2
3
4
5
6
7
8
9
10
11
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.17</version>
</dependency>

<dependency>
    <groupId>javax.transaction</groupId>
    <artifactId>javax.transaction-api</artifactId>
    <version>1.3</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class XATest {

    public static void main(String[] args) throws SQLException {
        Connection conn1 = DriverManager.getConnection("jdbc:mysql://localhost:3306/db_user", "root", "root");
        Connection conn2 = DriverManager.getConnection("jdbc:mysql://localhost:3306/db_order", "root", "root");

        //获取资源管理器接口
        XAConnection xaConnection1 = new MysqlXAConnection((JdbcConnection) conn1,true);
        XAResource xaResource1 = xaConnection1.getXAResource();
        XAConnection xaConnection2 = new MysqlXAConnection((JdbcConnection) conn2,true);
        XAResource xaResource2 = xaConnection2.getXAResource();
        //全局事务id,此处模拟直接写死一个
        byte[] gtrid = "t1234".getBytes();
        int formatId = 1;
        try {
            //事务分支上的分支id
            byte[] bqual1 = "cid1234".getBytes();
            Xid xid1 = new MysqlXid(gtrid,bqual1,formatId);
            //执行事务分支
            xaResource1.start(xid1,XAResource.TMNOFLAGS); //不选择任何标志值。
            PreparedStatement ps1 = conn1.prepareStatement("insert into user(name,sex) values('zhangsan','man')");
            ps1.execute();
            xaResource1.end(xid1,XAResource.TMSUCCESS); //取消调用者与事务分支的关联。

            byte[] bqual2 = "cid12345".getBytes();
            Xid xid2 = new MysqlXid(gtrid,bqual1,formatId);
            //执行事务分支
            xaResource2.start(xid2,XAResource.TMNOFLAGS);
            PreparedStatement ps2 = conn2.prepareStatement("insert into order(userid,objName) values(1,'吸尘器')");
            ps2.execute();
            xaResource2.end(xid2,XAResource.TMSUCCESS);

            //###################两阶段提交#####################
            //第一阶段:询问所有的事务分支,准备提交事务-->开启事务并执行
            int prepare1 = xaResource1.prepare(xid1);
            int prepare2 = xaResource2.prepare(xid2);

            //第二阶段:提交所有的事务分支
            boolean onePhase = false; //是否优化为一阶段提交-->有两个事务分支,所以不能优化
            if(prepare1 == XAResource.XA_OK && prepare2 == XAResource.XA_OK){
                xaResource1.commit(xid1,onePhase);
                xaResource2.commit(xid2,onePhase);
            }else{ //事务没有提交,则回滚
                xaResource1.rollback(xid1);
                xaResource2.rollback(xid2);
            }

        }catch (XAException e) {
            //出现任何异常,也需要回滚
        }


    }

}

2. 基于atomikos框架实现

该实现可参考一些网上的实现,此处暂不介绍。 其全局使用了一把大锁,锁内部进行两阶段提交。