怎么用C#实现SAGA分布式事务(saga,分布式事务,开发技术)

时间:2024-05-03 03:21:33 作者 : 石家庄SEO 分类 : 开发技术
  • TAG :

背景

银行跨行转账业务是一个典型分布式事务场景,假设 A 需要跨行转账给 B,那么就涉及两个银行的数据,无法通过一个数据库的本地事务保证转账的 ACID ,只能够通过分布式事务来解决。

市面上使用比较多的分布式事务框架,支持 SAGA 的,大部分都是 JAVA 为主的,没有提供 C# 的对接方式,或者是对接难度大,一定程度上让人望而却步。

下面就基于这个框架来实践一下银行转账的例子。

前置工作

dotnetaddpackageDtmcli--version0.3.0

成功的 SAGA

先来看一下一个成功完成的 SAGA 时序图。

怎么用C#实现SAGA分布式事务

上图的微服务1,对应我们示例的 OutApi,也就是转钱出去的那个服务。

微服务2,对应我们示例的 InApi,也就是转钱进来的那个服务。

下面是两个服务的正向操作和补偿操作的处理。

OutApi

app.MapPost("/api/TransOut",(stringbranch_id,stringgid,stringop,TransRequestreq)=>{//进行数据库操作Console.WriteLine($"用户【{req.UserId}】转出【{req.Amount}】正向操作,gid={gid},branch_id={branch_id},op={op}");returnResults.Ok(TransResponse.BuildSucceedResponse());});app.MapPost("/api/TransOutCompensate",(stringbranch_id,stringgid,stringop,TransRequestreq)=>{//进行数据库操作Console.WriteLine($"用户【{req.UserId}】转出【{req.Amount}】补偿操作,gid={gid},branch_id={branch_id},op={op}");returnResults.Ok(TransResponse.BuildSucceedResponse());});

InApi

app.MapPost("/api/TransIn",(stringbranch_id,stringgid,stringop,TransRequestreq)=>{Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】正向操作,gid={gid},branch_id={branch_id},op={op}");returnResults.Ok(TransResponse.BuildSucceedResponse());});app.MapPost("/api/TransInCompensate",(stringbranch_id,stringgid,stringop,TransRequestreq)=>{Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】补偿操作,gid={gid},branch_id={branch_id},op={op}");returnResults.Ok(TransResponse.BuildSucceedResponse());});

注:示例为了简单,没有进行实际的数据库操作。

到此各个子事务的处理已经 OK 了,然后是开启 SAGA 事务,进行分支调用

varuserOutReq=newTransRequest(){UserId="1",Amount=-30};varuserInReq=newTransRequest(){UserId="2",Amount=30};varct=newCancellationToken();vargid=awaitdtmClient.GenGid(ct);varsaga=newSaga(dtmClient,gid).Add(outApi+"/TransOut",outApi+"/TransOutCompensate",userOutReq).Add(inApi+"/TransIn",inApi+"/TransInCompensate",userInReq);varflag=awaitsaga.Submit(ct);Console.WriteLine($"case1,{gid}saga提交结果={flag}");

到这里,一个完整的 SAGA 分布式事务就编写完成了。

搭建好 dtm 的环境后,运行上面的例子,会看到下面的输出。

怎么用C#实现SAGA分布式事务

当然,上面的情况太理想了,转出转入都是一次性就成功了。

但是实际上我们会遇到许许多多的问题,最常见的应该就是网络故障了。

下面来看一个异常的 SAGA 示例

异常的 SAGA

做一个假设,用户1的转出是正常的,但是用户2在转入的时候出现了问题。

由于事务已经提交给 dtm 了,按照 SAGA 事务的协议,dtm 会重试未完成的操作。

这个时候用户2 这边会出现什么样的情况呢?

转入其实成功了,但是 dtm 收到错误 (网络故障等)转入没有成功,直接告诉 dtm 失败了 (应用异常等)

无论是那一种,dtm 都会进行重试操作。这个时候会发生什么呢?我们继续往下看。

先看一下事务失败交互的时序图

怎么用C#实现SAGA分布式事务

再通过调整上面成功的例子,来比较直观的看看出现的情况。

在 InApi 加多一个转入失败的处理接口

app.MapPost("/api/TransInError",(stringbranch_id,stringgid,stringop,TransRequestreq)=>{Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】正向操作--失败,gid={gid},branch_id={branch_id},op={op}");//returnResults.BadRequest();returnResults.Ok(TransResponse.BuildFailureResponse());});

失败的返回有两种,一种是状态码大于 400,一种是状态码是 200 并且响应体包含 FAILURE,上面的例子是第二种

调整一下调用方,把转入正向操作替换成上面这个返回错误的接口。

varsaga=newSaga(dtmClient,gid).Add(outApi+"/TransOut",outApi+"/TransOutCompensate",userOutReq).Add(inApi+"/TransInError",inApi+"/TransInCompensate",userInReq);

运行结果如下:

怎么用C#实现SAGA分布式事务

在这个例子中,只考虑补偿/重试成功的情况下。

用户1 转出的 30 块钱最终是回到了他的帐号上,他没有出现损失。

用户2 就有点苦逼了,转入没有成功,返回了失败,还触发了转入的补偿机制,结果就是把用户2 还没进帐的 30 块钱给多扣了,这个就是上面的情况2,常见的空补偿问题。

这个时候就要在进行转入补偿的时候做一系列的判断,转入有没有成功,转出有没有失败等等,把业务变的十分复杂。

如果出现了上述的情况1,会发生什么呢?

用户2 第一次已经成功转入 30 块钱,返回的也是成功,但是网络出了点问题,导致 dtm 认为失败了,它就会进行重试,相当于用户2 还会收到第二个转入 30 块钱的请求!也就是说这次转帐,用户2 会进账 60 块钱,翻倍了,也就是说这个请求不是幂等。

同样的,要处理这个问题,在进行转入的正向操作中也要进行一系列的判断,同样会把复杂度上升一个级别。

前面有提到 dtm 提供了子事务屏障的功能,保证了幂等、空补偿等常见问题。

怎么用C#实现SAGA分布式事务

再来看看这个子事务屏障的功能有没有帮我们简化上面异常处理。

子事务屏障

子事务屏障,需要根据 trans_type,gid,branch_id 和 op 四个内容进行创建。

这4个内容 dtm 在回调时会放在 querysting 上面。

客户端里面提供了 IBranchBarrierFactory 来供我们使用。

空补偿

针对上面的异常情况(用户2 凭空消失 30 块钱),对转入的补偿进行子事务屏障的改造。

app.MapPost("/api/BarrierTransInCompensate",async(stringbranch_id,stringgid,stringop,stringtrans_type,TransRequestreq,IBranchBarrierFactoryfactory)=>{varbarrier=factory.CreateBranchBarrier(trans_type,gid,branch_id,op);usingvardb=Db.GeConn();awaitbarrier.Call(db,async(tx)=>{//转入失败的情况下,不应该输出下面这个Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】补偿操作,gid={gid},branch_id={branch_id},op={op}");//tx参数是事务,可和本地事务一起提交回滚awaitTask.CompletedTask;});Console.WriteLine($"子事务屏障-补偿操作,gid={gid},branch_id={branch_id},op={op}");returnResults.Ok(TransResponse.BuildSucceedResponse());});

Call 方法就是关键所在了,需要传入一个 DbConnection 和真正的业务操作,这里的业务操作就是在控制台输出补偿操作的信息。

同样的,我们再调整一下调用方,把转入补偿操作替换成上面带子事务屏障的接口。

varsaga=newSaga(dtmClient,gid).Add(outApi+"/TransOut",outApi+"/TransOutCompensate",userOutReq).Add(inApi+"/TransInError",inApi+"/BarrierTransInCompensate",userInReq);

再来运行这个例子。

怎么用C#实现SAGA分布式事务

会发现转入的补偿操作并没执行,控制台没有输出补偿信息,而是输出了

WillnotexecbusiCall,isNullCompensation=True,isDuplicateOrPend=False

这个就表明了,这个请求是个空补偿,是不应该执行业务方法的,既空操作。

再来看一下,转入成功的,但是 dtm 收到了失败的信号,不断重试造成重复请求的情况。

幂等

针对用户2 转入两次 30 块钱的异常情况,对转入的正向操作进行子事务屏障的改造。

app.MapPost("/api/BarrierTransIn",async(stringbranch_id,stringgid,stringop,stringtrans_type,TransRequestreq,IBranchBarrierFactoryfactory)=>{Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】请求来了!!!gid={gid},branch_id={branch_id},op={op}");varbarrier=factory.CreateBranchBarrier(trans_type,gid,branch_id,op);usingvardb=Db.GeConn();awaitbarrier.Call(db,async(tx)=>{varc=Interlocked.Increment(ref_errCount);//模拟一个超时执行if(c>0&&c<2)awaitTask.Delay(10000);Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】正向操作,gid={gid},branch_id={branch_id},op={op}");awaitTask.CompletedTask;});returnResults.Ok(TransResponse.BuildSucceedResponse());});

这里通过一个超时执行来让 dtm 进行转入正向操作的重试。

同样的,我们再调整一下调用方,把转入的正向操作也替换成上面带子事务屏障的接口。

varsaga=newSaga(dtmClient,gid).Add(outApi+"/TransOut",outApi+"/TransOutCompensate",userOutReq).Add(inApi+"/BarrierTransIn",inApi+"/BarrierTransInCompensate",userInReq);

再来运行这个例子。

怎么用C#实现SAGA分布式事务

可以看到转入的正向操作确实是触发了多次,第一次实际上是成功,只是响应比较慢,导致 dtm 认为是失败了,触发了第二次请求,但是第二次请求并没有执行业务操作,而是输出了

WillnotexecbusiCall,isNullCompensation=False,isDuplicateOrPend=True

这个就表明了,这个请求是个重复请求,是不应该执行业务方法的,保证了幂等。

到这里,可以看出,子事务屏障确实解决了幂等和空补偿的问题,大大降低了业务判断的复杂度和出错的可能性。

 </div> <div class="zixun-tj-product adv-bottom"></div> </div> </div> <div class="prve-next-news">
本文:怎么用C#实现SAGA分布式事务的详细内容,希望对您有所帮助,信息来源于网络。
上一篇:Java是怎么实现图片裁剪功能的下一篇:

13 人围观 / 0 条评论 ↓快速评论↓

(必须)

(必须,保密)

阿狸1 阿狸2 阿狸3 阿狸4 阿狸5 阿狸6 阿狸7 阿狸8 阿狸9 阿狸10 阿狸11 阿狸12 阿狸13 阿狸14 阿狸15 阿狸16 阿狸17 阿狸18