.NET CORE 分布式事务(二) DTM实现TCC

news/2024/5/19 20:57:10 标签: .netcore, 分布式, 微服务, 架构

目录

引言:

1. TCC事务模式

2. TCC组成

3. TCC执行流程 

3.1 TCC正常执行流程

3.2 TCC失败回滚 

4. Confirm/Cancel操作异常

5. TCC 设计原则

5.1 TCC如何做到更好的一致性

5.2 为什么只适合短事务

6. 嵌套的TCC

7. .NET CORE结合DTM实现TCC分布式事务

7.1 轮子小卖部(Nuget)引入Dtmcli

7.2 生成转账数据库(EF_CORE)

7.3 数据库持久化

7.4 appsettings.json

7.5 Program.cs

7.6 主程序事务API控制器

7.7 用户1转账事务API控制器

7.8 用户2转账事务API控制器

小结


引言:

紧接上一期.NET CORE 分布式事务(一) DTM实现二阶段提交(.NET CORE 分布式事务(一) DTM实现二阶段提交-CSDN博客)

1. TCC事务模式

什么是TCC,TCC是Try、Confirm、Cancel三个词语的缩写,最早是由 Pat Helland 于 2007 年发表的一篇名为《Life beyond Distributed Transactions:an Apostate’s Opinion》的论文提出。

2. TCC组成

TCC分为3个阶段

  • Try 阶段:尝试执行,完成所有业务检查(一致性), 预留必须业务资源(准隔离性)
  • Confirm 阶段:如果所有分支的Try都成功了,则走到Confirm阶段。Confirm真正执行业务,不作任何业务检查,只使用 Try 阶段预留的业务资源
  • Cancel 阶段:如果所有分支的Try有一个失败了,则走到Cancel阶段。Cancel释放 Try 阶段预留的业务资源。

3. TCC执行流程 

3.1 TCC正常执行流程

一般情况下,时序图中的9个步骤会正常完成,整个业务按照预期进行。主程序注册全局事务,以及Try尝试事务Api地址、Confirm提交事务Api地址、Cancel回滚事务Api地址。并开始执行Try尝试事务,Try中的事务要对资源进行预算以及锁定,也就是尝试执行,判断资源是否支持提交执行事务。然后进行提交事务。最终完成全局事务。 

3.2 TCC失败回滚 

当Try尝试事务异常时与上面的正常流程的区别是,现在不会调用Confirm提交事务,而是调用Cancel回滚事务,对Try尝试事务进行的资源锁定进行解锁释放等操作。回退到全局事务开始前。

4. Confirm/Cancel操作异常

假如Confirm/Cancel操作遇见失败会怎么样?按照Tcc模式的协议,Confirm/Cancel操作是要求最终成功的,遇见失败的情况,都是由于临时故障或者程序bug。dtm在Confirm/Cancel操作遇见失败时,会不断进行重试,直到成功。

为了避免程序bug导致补偿操作一直无法成功,建议开发者对全局事务表进行监控,发现重试超过3次的事务,发出报警,由运维人员找开发手动处理。进行人工干预。

5. TCC 设计原则

 在设计上,TCC主要用于处理一致性要求较高、需要较多灵活性的短事务。

5.1 TCC如何做到更好的一致性

对于我们的 A 跨行转账给 B 的场景,如果采用SAGA,在正向操作中调余额,在补偿操作中,反向调整余额,那么会出现这种情况:如果A扣款成功,金额转入B失败,最后回滚,把A的余额调整为初始值。整个过程中如果A发现自己的余额被扣减了,但是收款方B迟迟没有收到资金,那么会对A造成非常大的困扰。

上述需求在SAGA中无法解决,但是可以通过TCC来解决,设计技巧如下:

  • 在账户中的 balance 字段之外,再引入一个 trading_balance 字段
  • Try 阶段检查账户是否被冻结,检查账户余额是否充足,没问题后,调整 trading_balance (即业务上的冻结资金)
  • Confirm 阶段,调整 balance ,调整 trading_balance (即业务上的解冻资金)
  • Cancel 阶段,调整 trading_balance (即业务上的解冻资金)

这种情况下,终端用户 A 就不会看到自己的余额扣减了,但是 B 又迟迟收不到资金的情况。

5.2 为什么只适合短事务

TCC 的事务编排放在了应用端上,就是事务一共包含多少个分支,每个分支的顺序什么样,这些信息不会像 SAGA 那样,都发送给dtm服务器之后,再去调用实际的事务分支。当应用出现 crash 或退出,编排信息丢失,那么整个全局事务,就没有办法往前重试,只能够进行回滚。如果全局事务持续时间很长,例如一分钟以上,那么当应用进行正常的发布升级时,也会导致全局事务回滚,影响业务。因此 TCC 会更适合短事务。

那么是否可以把TCC的事务编排都保存到服务器,保证应用重启也不受到影响呢?理论上这种做法是可以解决这个问题的,但是存储到服务器会比在应用端更不灵活,无法获取到每个分支的中间结果,无法做嵌套等等。

考虑到一致性要求较高和短事务是高度相关的(一个中间不一致状态持续很长时间的事务,自然不能算一致性较好),这两者跟“应用灵活编排”,也是有较高相关度,所以将 TCC 实现为应用端编排,而 SAGA 实现为服务端编排。

6. 嵌套的TCC

dtm的Tcc事务模式,支持子事务嵌套,流程图如下:

在这个流程图中,Order这个微服务,管理了订单相关的数据修改,同时还管理了一个嵌套的子事务,因此他即扮演了RM的角色,也扮演了AP的角色。

7. .NET CORE结合DTM实现TCC分布式事务

还是以跨行转账作为例子,给大家详解这种架构。业务场景介绍如下:

我们需要跨行从A转给B 30元,我们先进行可能失败的转出操作TccUserTry,即进行A扣减30元。如果A因余额不足扣减失败,那么转账直接失败,返回错误;如果扣减成功,那么进行下一步转入操作,因为转入操作没有余额不足的问题,可以假定转入操作一定会成功。

7.1 轮子小卖部(Nuget)引入Dtmcli

  <ItemGroup>
    <PackageReference Include="Dtmcli" Version="1.4.0" />
  </ItemGroup>

7.2 生成转账数据库(EF_CORE)

数据库模型

//模型
public partial class UserMoney
{
    public int id { get; set; }
    public int money { get; set; }
    public int trading_balance { get; set; }
    public int balance { get; set; }
    public int trymoney { get; set; }
    public string guid { get; set; }
}

DbContext

 public class DtmDbContext : DbContext
 {
     public DtmDbContext() { }
     public DtmDbContext(DbContextOptions<DtmDbContext> options) : base(options) { }
 
     public virtual DbSet<UserMoney> UserMoney { get; set; }
 
     protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
     {
         optionsBuilder
             .UseMySql("server=localhost;port=3307;user id=root;password=123;database=DTM_Test", ServerVersion.Parse("8.0.23-mysql"))
             .UseLoggerFactory(LoggerFactory.Create(option =>
             {
                 option.AddConsole();
             }));
     }
 
     protected override void OnModelCreating(ModelBuilder modelBuilder)
     {
         modelBuilder
             .UseCollation("utf8_general_ci")
             .HasCharSet("utf8");
 
         modelBuilder.Entity<UserMoney>(entity =>
         {
             entity.ToTable("UserMoney");
         });
     }
 }

7.3 数据库持久化

CREATE TABLE
IF
	NOT EXISTS DTM_Test.barrier (
	id BIGINT ( 22 ) PRIMARY KEY AUTO_INCREMENT,
	trans_type VARCHAR ( 45 ) DEFAULT '',
	gid VARCHAR ( 128 ) DEFAULT '',
	branch_id VARCHAR ( 128 ) DEFAULT '',
	op VARCHAR ( 45 ) DEFAULT '',
	barrier_id VARCHAR ( 45 ) DEFAULT '',
	reason VARCHAR ( 45 ) DEFAULT '' COMMENT 'the branch type who insert this record',
	create_time datetime DEFAULT now( ),
	update_time datetime DEFAULT now( ),
	KEY ( create_time ),
	KEY ( update_time ),
	UNIQUE KEY ( gid, branch_id, op, barrier_id ) 
	) ENGINE = INNODB DEFAULT CHARSET = utf8mb4;

数据库最终生成:

7.4 appsettings.json

{
  "AllowedHosts": "*",
  "ConnectionString": "server=localhost;port=3307;user id=root;password=123;database=test",
  "DtmUrl": "http://localhost:36789",
  "TransactionUrl": "http://localhost:5016",

  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft.AspNetCore": "Warning"
    }
  }
}

7.5 Program.cs

           // 注册DbContext
           builder.Services.AddDbContext<DtmDbContext>(options =>
           {
               options.UseMySql(builder.Configuration.GetValue<string>("ConnectionString"), ServerVersion.Parse("8.0.23-mysql"));
           });

           // 注册dtm

           builder.Services.AddDtmcli(dtm =>
           {
               dtm.DtmUrl = builder.Configuration.GetValue<string>("DtmUrl");
               dtm.DBType = "mysql";
               dtm.BarrierTableName = "dtm_test.barrier";
           });

7.6 主程序事务API控制器

using DTM_EF.Model;
using Dtmcli;
using Microsoft.AspNetCore.Mvc;
using System.Threading;

namespace Dtm_TCC.Controllers
{
    [ApiController]
    [Route("[controller]")]
    public class DtmTccController : ControllerBase
    {
        private readonly ILogger<DtmTccController> _logger;
        private readonly TccGlobalTransaction _globalTransaction;
        private readonly IDtmClient _dtmClient;
        private readonly IConfiguration _configuration;

        public DtmTccController(ILogger<DtmTccController> logger,
            TccGlobalTransaction globalTransaction,
            IDtmClient dtmClient,
            IConfiguration configuration)
        {
            _logger = logger;
            _globalTransaction = globalTransaction;
            _dtmClient = dtmClient;
            _configuration = configuration;
        }

        [HttpPost(Name = "DtmTcc")]
        public async Task<IActionResult> DtmTcc()
        {
            var transactionurl = _configuration.GetValue<string>("TransactionUrl");
            // 创建CancellationToken用于取消事务
            CancellationToken cancellationToken = new CancellationToken();
            // 生成全局事务ID
            var gid = await _dtmClient.GenGid(cancellationToken);

            UserMoney body = new UserMoney() { id = 1, trymoney = -30, guid = string.Empty };
            UserMoney body2 = new UserMoney() { id = 2, trymoney = 30, guid = string.Empty };

            await _globalTransaction.Excecute(/*gid,*/ async (tcc) =>
            {
                // 用户1 转出30元 第一个参数是try检测及冻结阶段,第二个是提交,第三个是回滚
                var res1 = await tcc.CallBranch(body,
                     transactionurl + "/TccUserTry",
                     transactionurl + "/TccUserConfirm",
                     transactionurl + "/TccUserCancel", cancellationToken);

                // 用户2 转入30元
                var res2 = await tcc.CallBranch(body2,
                     transactionurl + "/TccUser2Try",
                     transactionurl + "/TccUser2Confirm",
                     transactionurl + "/TccUser2Cancel", cancellationToken);
            }, cancellationToken);

            return Ok(TransResponse.BuildSucceedResponse());
        }
    }
}

7.7 用户1转账事务API控制器

using DTM_EF;
using DTM_EF.Model;
using Dtmcli;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using MySqlConnector;

namespace Dtm_TCC.Controllers
{
    [Route("api/[controller]")]
    [ApiController]
    public class UserController : ControllerBase
    {
        private readonly IBranchBarrierFactory _barrierFactory;
        private readonly ILogger<UserController> _Logger;
        private readonly DtmDbContext _dtmDbContext;

        public UserController(IBranchBarrierFactory barrierFactory,
            ILogger<UserController> Logger,
            DtmDbContext dtmDbContext)
        {
            _barrierFactory = barrierFactory;
            _Logger = Logger;
            _dtmDbContext = dtmDbContext;
        }

        [HttpPost]
        [Route("/TccUserTry")]
        public async Task<IActionResult> TccUserTry([FromQuery] string gid, [FromQuery] string trans_type, [FromQuery] string branch_id, [FromQuery] string op, [FromBody] UserMoney body)
        {
            var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
            var obj = TransResponse.BuildFailureResponse();
            using (MySqlConnection conn = new MySqlConnection("server=localhost;port=3307;user id=root;password=123;database=DTM_Test"))
            {
                await branchBarrier.Call(conn, async (tx) =>
                {
                    //获取用户账户信息
                    var UserMoney = _dtmDbContext.Set<UserMoney>().Where(c => c.id == body.id ).FirstOrDefault();
                    //判断预算--不够回滚
                    if (UserMoney == null || UserMoney!.money + body.trymoney < 0) obj = TransResponse.BuildFailureResponse();
                    else
                    {
                        //修改信息准备提交
                        UserMoney!.balance = 1;
                        UserMoney.trading_balance = 1;
                        UserMoney.trymoney = body.trymoney;
                        UserMoney.guid = gid;
                        _dtmDbContext.SaveChanges();
                        obj = TransResponse.BuildSucceedResponse();
                    }
                    await Task.CompletedTask;
                });
            }
            _Logger.LogInformation($"{gid}--Try成功");
            return Ok(obj);
        }

        [HttpPost]
        [Route("/TccUserConfirm")]
        public async Task<IActionResult> TccUserConfirm([FromQuery] string gid, [FromQuery] string trans_type,
                    [FromQuery] string branch_id, [FromQuery] string op, [FromBody] UserMoney body)
        {
            var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
            var obj = TransResponse.BuildFailureResponse();
            using (MySqlConnection conn = new MySqlConnection("server=localhost;port=3307;user id=root;password=123;database=DTM_Test"))
            {
                await branchBarrier.Call(conn, async (tx) =>
                {
                    //获取用户账户信息
                    var UserMoney = _dtmDbContext.Set<UserMoney>().Where(c => c.id == body.id).FirstOrDefault();
                    if (UserMoney != null)
                    {
                        UserMoney!.balance = 0;
                        UserMoney!.trading_balance = 0;
                        UserMoney!.money += UserMoney!.trymoney;
                        UserMoney!.trymoney = 0;
                        UserMoney!.guid = string.Empty;

                        _dtmDbContext.SaveChanges();
                        obj = TransResponse.BuildSucceedResponse();
                    }
                    //修改信息准备提交         
                    await Task.CompletedTask;
                });
            }
            _Logger.LogInformation($"{gid}--Confirm成功");
            return Ok(obj);
        }

        [HttpPost]
        [Route("/TccUserCancel")]
        public async Task<IActionResult> TccUserCancel([FromQuery] string gid, [FromQuery] string trans_type,
             [FromQuery] string branch_id, [FromQuery] string op, [FromBody] UserMoney body)
        {
            var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);

            var obj = TransResponse.BuildFailureResponse();
            using (MySqlConnection conn = new MySqlConnection("server=localhost;port=3307;user id=root;password=123;database=DTM_Test"))
            {
                //操作回滚并解锁
                await branchBarrier.Call(conn, async (tx) =>
                {
                    //获取用户账户信息
                    var UserMoney = _dtmDbContext.Set<UserMoney>().Where(c => c.id == body.id ).FirstOrDefault();
                    if (UserMoney != null)
                    {
                        UserMoney!.balance = 0;
                        UserMoney!.trading_balance = 0;
                        UserMoney!.trymoney = 0;
                        UserMoney!.guid = string.Empty;

                        _dtmDbContext.SaveChanges();
                        obj = TransResponse.BuildSucceedResponse();
                    }
                    await Task.CompletedTask;
                });
            }
            _Logger.LogInformation($"{gid}--Cancel成功");
            return Ok(obj);
        }
    }
}

7.8 用户2转账事务API控制器

using DTM_EF;
using DTM_EF.Model;
using Dtmcli;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using MySqlConnector;

namespace Dtm_TCC.Controllers
{
    [Route("api/[controller]")]
    [ApiController]
    public class User2Controller : ControllerBase
    {
        private readonly IBranchBarrierFactory _barrierFactory;
        private readonly ILogger<User2Controller> _Logger;
        private readonly DtmDbContext _dtmDbContext;

        public User2Controller(IBranchBarrierFactory barrierFactory,
            ILogger<User2Controller> Logger,
            DtmDbContext dtmDbContext)
        {
            _barrierFactory = barrierFactory;
            _Logger = Logger;
            _dtmDbContext = dtmDbContext;
        }

        [HttpPost]
        [Route("/TccUser2Try")]
        public async Task<IActionResult> TccUserTry([FromQuery] string gid, [FromQuery] string trans_type, [FromQuery] string branch_id, [FromQuery] string op, [FromBody] UserMoney body)
        {
            var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
            var obj = TransResponse.BuildFailureResponse();
            using (MySqlConnection conn = new MySqlConnection("server=localhost;port=3307;user id=root;password=123;database=DTM_Test"))
            {
                await branchBarrier.Call(conn, async (tx) =>
                {
                    //获取用户账户信息
                    var UserMoney = _dtmDbContext.Set<UserMoney>().Where(c => c.id == body.id).FirstOrDefault();
                    //判断预算--不够回滚
                    if (UserMoney == null || UserMoney!.money + body.trymoney < 0) obj = TransResponse.BuildFailureResponse();
                    else
                    {
                        //修改信息准备提交
                        UserMoney!.balance = 1;
                        UserMoney.trading_balance = 1;
                        UserMoney.trymoney = body.trymoney;
                        UserMoney.guid = gid;
                        _dtmDbContext.SaveChanges();
                        obj = TransResponse.BuildSucceedResponse();
                    }
                    await Task.CompletedTask;
                });
            }
            obj = TransResponse.BuildFailureResponse();
            _Logger.LogInformation($"{gid}--Try成功");
            return Ok(obj);
        }

        [HttpPost]
        [Route("/TccUser2Confirm")]
        public async Task<IActionResult> TccUserConfirm([FromQuery] string gid, [FromQuery] string trans_type,
                    [FromQuery] string branch_id, [FromQuery] string op, [FromBody] UserMoney body)
        {
            var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
            var obj = TransResponse.BuildFailureResponse();
            using (MySqlConnection conn = new MySqlConnection("server=localhost;port=3307;user id=root;password=123;database=DTM_Test"))
            {
                await branchBarrier.Call(conn, async (tx) =>
                {
                    //获取用户账户信息
                    var UserMoney = _dtmDbContext.Set<UserMoney>().Where(c => c.id == body.id).FirstOrDefault();
                    if (UserMoney != null)
                    {
                        UserMoney!.balance = 0;
                        UserMoney!.trading_balance = 0;
                        UserMoney!.money += UserMoney!.trymoney;
                        UserMoney!.trymoney = 0;
                        UserMoney!.guid = string.Empty;

                        _dtmDbContext.SaveChanges();
                        obj = TransResponse.BuildSucceedResponse();
                    }
                    //修改信息准备提交         
                    await Task.CompletedTask;
                });
            }
            _Logger.LogInformation($"{gid}--Confirm成功");
            return Ok(obj);
        }
        [HttpPost]
        [Route("/TccUser2Cancel")]
        public async Task<IActionResult> TccUserCancel([FromQuery] string gid, [FromQuery] string trans_type,
             [FromQuery] string branch_id, [FromQuery] string op, [FromBody] UserMoney body)
        {
            var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);

            var obj = TransResponse.BuildFailureResponse();
            using (MySqlConnection conn = new MySqlConnection("server=localhost;port=3307;user id=root;password=123;database=DTM_Test"))
            {
                //操作回滚并解锁
                await branchBarrier.Call(conn, async (tx) =>
                {
                    //获取用户账户信息
                    var UserMoney = _dtmDbContext.Set<UserMoney>().Where(c => c.id == body.id ).FirstOrDefault();
                    if (UserMoney != null)
                    {
                        UserMoney!.balance = 0;
                        UserMoney!.trading_balance = 0;
                        UserMoney!.trymoney = 0;
                        UserMoney!.guid = string.Empty;
                    
                        _dtmDbContext.SaveChanges();
                        obj = TransResponse.BuildSucceedResponse();
                    }
                    await Task.CompletedTask;
                });
            }
            _Logger.LogInformation($"{gid}--Cancel成功");
            return Ok(obj);
        }
    }
}

小结

本文给出了一个完整的 TCC 事务方案,是一个可以实际运行的 TCC,您只需要在这个示例的基础上进行简单修改,就能够用于解决您的真实问题


http://www.niftyadmin.cn/n/5460357.html

相关文章

【Go】三、Go指针

文章目录 1、指针2、说明 1、指针 &符号变量 就可以获取这个变量内存的地址*int 是一个指针类型 &#xff08;可以理解为 指向int类型的指针&#xff09; package main import("fmt" ) func main(){var age int 18//&符号变量 就可以获取这个变量内存的地…

mysql笔记:25. docker环境中mysql主从复制、主主复制实操

文章目录 一、准备工作1. 安装配置Docker2. 准备MySQL相关的配置和数据目录 二、基于日志点的主从复制1. 配置Master服务器1.1 修改配置文件1.2. 在docker中启动Master节点1.3. 创建用户并授权 2. 配置Slave1服务器2.1. 修改配置2.2. 启动服务2.3. 指定Master2.4. 开始复制 3. …

CVE-2023-38408漏洞修复 - 升级openssl和openssh

CVE-2023-38408 OpenSSH 代码问题漏洞修复 - 升级openssl和openssh ※ 重要说明&#xff1a; 1、升级后会导致无法用ssh远程登录&#xff0c;提示“Permission denied, please try again.” 2、解决方案请查看本章节【三、解决升级后无法用ssh远程登录】 目录 CVE-2023-38408 O…

暴力枚举--组合的输出

题目描述 排列与组合是常用的数学方法&#xff0c;其中组合就是从 n 个元素中抽出 r 个元素&#xff08;不分顺序且 r≤n&#xff09;&#xff0c;我们可以简单地将 n 个元素理解为自然数 1,2,…,n&#xff0c;从中任取 r 个数。 现要求你输出所有组合。 例如 n5,r3&#xf…

阳光消费金融2023利润创新高,固收业务立功

来源 | 镭射财经&#xff08;leishecaijing&#xff09; 3月28日&#xff0c;光大银行披露了2023年年报&#xff0c;亦公布旗下消金公司阳光消费金融业绩。截至2023年末&#xff0c;阳光消费金融总资产116.77 亿元&#xff0c;同比下滑6.41%&#xff1b;净资产13.19亿元&#…

阿里云2核4G服务器租用价格,支持多少人在线?

阿里云2核4G服务器多少钱一年&#xff1f;2核4G配置1个月多少钱&#xff1f;2核4G服务器30元3个月、轻量应用服务器2核4G4M带宽165元一年、企业用户2核4G5M带宽199元一年。可以在阿里云CLUB中心查看 aliyun.club 当前最新2核4G服务器精准报价、优惠券和活动信息。 阿里云官方2…

android WMS服务

android WMS服务 WMS的定义 窗口的分类 WMS的启动 WindowManager Activity、Window、DecorView、ViewRootImpl 之间的关系 WindowToken WMS的定义 WMS是WindowManagerService的简称&#xff0c;它是android系统的核心服务之一&#xff0c;它在android的显示功能中扮演着…

PowerBI和Tableau之间该怎么选择?

最近经常看到朋友询问&#xff0c;最近想学习数据分析工具&#xff0c;但是PowerBI和Tableau之间不知道怎么选择? 其实可以从下面几个方面进行参考&#xff0c;Power BI和Tableau哪个更适合你&#xff1f; 共同点&#xff1a; Power BI和Tableau都是强大的数据分析和数据可…