diff --git a/.github/workflows/build_and_it.yml b/.github/workflows/build_and_it.yml index d5fafaf..c7b3edd 100644 --- a/.github/workflows/build_and_it.yml +++ b/.github/workflows/build_and_it.yml @@ -51,7 +51,8 @@ jobs: tar -xvf dtm_1.18.0_linux_amd64.tar.gz pwd mkdir /home/runner/work/client-csharp/client-csharp/logs - nohup ./dtm > /home/runner/work/client-csharp/client-csharp/logs/dtm.log 2>&1 & + echo "UpdateBranchSync: 1" > ./config.yml + nohup ./dtm -c ./config.yml > /home/runner/work/client-csharp/client-csharp/logs/dtm.log 2>&1 & sleep 5 curl "127.0.0.1:36789/api/dtmsvr/newGid" - name: Setup Busi Service diff --git a/src/DtmCommon/Constant.cs b/src/DtmCommon/Constant.cs index 71961cb..e3523b5 100644 --- a/src/DtmCommon/Constant.cs +++ b/src/DtmCommon/Constant.cs @@ -67,8 +67,9 @@ public class Barrier public static readonly Dictionary OpDict = new Dictionary() { - { "cancel", "try" }, - { "compensate", "action" }, + { "cancel", "try" }, // tcc + { "compensate", "action" }, // saga + { "rollback", "action" }, // workflow }; public static readonly string REDIS_LUA_CheckAdjustAmount = @" -- RedisCheckAdjustAmount local v = redis.call('GET', KEYS[1]) diff --git a/src/Dtmcli/Constant.cs b/src/Dtmcli/Constant.cs index 3bc3959..a132433 100644 --- a/src/Dtmcli/Constant.cs +++ b/src/Dtmcli/Constant.cs @@ -4,7 +4,8 @@ internal static class Constant { internal const string DtmClientHttpName = "dtmClient"; internal const string BranchClientHttpName = "branchClient"; - + internal const string WorkflowBranchClientHttpName = "WF"; + internal static class Request { internal const string CONTENT_TYPE = "application/json"; diff --git a/src/Dtmcli/DtmClient.cs b/src/Dtmcli/DtmClient.cs index 7a811e3..246b69b 100644 --- a/src/Dtmcli/DtmClient.cs +++ b/src/Dtmcli/DtmClient.cs @@ -148,7 +148,7 @@ public async Task Query(string gid, CancellationToken cancellationT var client = _httpClientFactory.CreateClient(Constant.DtmClientHttpName); var response = await client.GetAsync(url, cancellationToken).ConfigureAwait(false); var dtmContent = await response.Content.ReadAsStringAsync().ConfigureAwait(false); - DtmImp.Utils.CheckStatus(response.StatusCode, dtmContent); + DtmImp.Utils.CheckStatusCode(response.StatusCode); return JsonSerializer.Deserialize(dtmContent, _jsonOptions); } @@ -167,7 +167,7 @@ public async Task QueryStatus(string gid, CancellationToken cancellation var client = _httpClientFactory.CreateClient(Constant.DtmClientHttpName); var response = await client.GetAsync(url, cancellationToken).ConfigureAwait(false); var dtmContent = await response.Content.ReadAsStringAsync().ConfigureAwait(false); - DtmImp.Utils.CheckStatus(response.StatusCode, dtmContent); + DtmImp.Utils.CheckStatusCode(response.StatusCode); var graph = JsonSerializer.Deserialize(dtmContent, _jsonOptions); return graph.Transaction == null ? string.Empty diff --git a/src/Dtmcli/DtmImp/Utils.cs b/src/Dtmcli/DtmImp/Utils.cs index 0b374e9..54f7f16 100644 --- a/src/Dtmcli/DtmImp/Utils.cs +++ b/src/Dtmcli/DtmImp/Utils.cs @@ -40,6 +40,14 @@ public static void CheckStatus(HttpStatusCode status, string dtmResult) } } + public static void CheckStatusCode(HttpStatusCode status) + { + if (status != HttpStatusCode.OK) + { + throw new DtmException(string.Format(CheckStatusMsgFormat, status.ToString(), string.Empty)); + } + } + /// /// OrString return the first not null or not empty string /// diff --git a/src/Dtmcli/TransGlobal.cs b/src/Dtmcli/TransGlobal.cs index 03f3002..4953604 100644 --- a/src/Dtmcli/TransGlobal.cs +++ b/src/Dtmcli/TransGlobal.cs @@ -28,7 +28,7 @@ public class DtmTransaction { [JsonPropertyName("id")] public int Id { get; set; } - [JsonPropertyName("create_time")] public DateTimeOffset CreateTime { get; set; } + [JsonPropertyName("create_time")] public DateTimeOffset? CreateTime { get; set; } [JsonPropertyName("update_time")] public DateTimeOffset UpdateTime { get; set; } @@ -64,9 +64,9 @@ public class DtmBranch { [JsonPropertyName("id")] public int Id { get; set; } - [JsonPropertyName("create_time")] public DateTimeOffset CreateTime { get; set; } + [JsonPropertyName("create_time")] public DateTimeOffset? CreateTime { get; set; } - [JsonPropertyName("update_time")] public DateTimeOffset UpdateTime { get; set; } + [JsonPropertyName("update_time")] public DateTimeOffset? UpdateTime { get; set; } [JsonPropertyName("gid")] public string Gid { get; set; } diff --git a/src/Dtmworkflow/ServiceCollectionExtensions.cs b/src/Dtmworkflow/ServiceCollectionExtensions.cs index 55d759b..c440ff0 100644 --- a/src/Dtmworkflow/ServiceCollectionExtensions.cs +++ b/src/Dtmworkflow/ServiceCollectionExtensions.cs @@ -23,6 +23,8 @@ public static IServiceCollection AddDtmWorkflow(this IServiceCollection services services.TryAddSingleton(); services.TryAddSingleton(); + // AddHttpClient(services); + return services; } @@ -33,8 +35,22 @@ public static IServiceCollection AddDtmWorkflow(this IServiceCollection services services.TryAddSingleton(); services.TryAddSingleton(); + + // AddHttpClient(services); return services; } + + // private static void AddHttpClient(IServiceCollection services /*, DtmOptions options*/) + // { + // services.AddHttpClient(Dtmcli.Constant.WorkflowBranchClientHttpName, client => + // { + // // TODO DtmOptions + // // client.Timeout = TimeSpan.FromMilliseconds(options.BranchTimeout); + // }).AddHttpMessageHandler(); + // + // // TODO how to inject workflow instance? + // services.AddTransient(); + // } } -} +} \ No newline at end of file diff --git a/src/Dtmworkflow/Workflow.Imp.cs b/src/Dtmworkflow/Workflow.Imp.cs index 1febed0..ac87efe 100644 --- a/src/Dtmworkflow/Workflow.Imp.cs +++ b/src/Dtmworkflow/Workflow.Imp.cs @@ -64,7 +64,7 @@ internal async Task Process(WfFunc2 handler, byte[] data) } err = Utils.GrpcError2DtmError(err); - + if (err != null && err is not DtmCommon.DtmFailureException) throw err; try @@ -161,23 +161,23 @@ private StepResult StepResultFromLocal(byte[] data, Exception err) }; } - private Exception StepResultToGrpc(StepResult r, IMessage reply) + internal Exception StepResultToGrpc(StepResult r, IMessage reply) { if (r.Error == null && r.Status == DtmCommon.Constant.StatusSucceed) { - // Check - + // TODO Check + // dtmgimp.MustProtoUnmarshal(s.Data, reply.(protoreflect.ProtoMessage)); } return r.Error; } - private StepResult StepResultFromGrpc(IMessage reply, Exception err) + internal StepResult StepResultFromGrpc(IMessage reply, Exception err) { var sr = new StepResult { - // GRPCError2DtmError - Error = null, + // TODO GRPCError2DtmError + Error = Utils.GrpcError2DtmError(err), }; sr.Status = WfErrorToStatus(sr.Error); @@ -193,7 +193,7 @@ private StepResult StepResultFromGrpc(IMessage reply, Exception err) return sr; } - private HttpResponseMessage StepResultToHttp(StepResult r) + internal HttpResponseMessage StepResultToHttp(StepResult r) { if (r.Error != null) { @@ -203,7 +203,7 @@ private HttpResponseMessage StepResultToHttp(StepResult r) return Utils.NewJSONResponse(HttpStatusCode.OK, r.Data); } - private StepResult StepResultFromHTTP(HttpResponseMessage resp, Exception err) + internal StepResult StepResultFromHTTP(HttpResponseMessage resp, Exception err) { var sr = new StepResult { @@ -212,7 +212,7 @@ private StepResult StepResultFromHTTP(HttpResponseMessage resp, Exception err) if (err == null) { - // HTTPResp2DtmError + (sr.Data, sr.Error) = Utils.HTTPResp2DtmError(resp); // TODO go 使用了 this.Options.HTTPResp2DtmError(resp), 方便定制 sr.Status = WfErrorToStatus(sr.Error); } @@ -234,9 +234,9 @@ private string WfErrorToStatus(Exception err) } - private async Task RecordedDo(Func> fn) + internal async Task RecordedDo(Func> fn) { - var sr = await this.RecordedDoInner(fn); + StepResult sr = await this.RecordedDoInner(fn); // do not compensate the failed branch if !CompensateErrorBranch if (this.Options.CompensateErrorBranch && sr.Status == DtmCommon.Constant.StatusFailed) diff --git a/src/Dtmworkflow/Workflow.cs b/src/Dtmworkflow/Workflow.cs index 930a02c..21d86c7 100644 --- a/src/Dtmworkflow/Workflow.cs +++ b/src/Dtmworkflow/Workflow.cs @@ -3,7 +3,9 @@ using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; +using System.Net.Http; using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; namespace Dtmworkflow { @@ -32,7 +34,8 @@ public Workflow(IDtmClient httpClient, IDtmgRPCClient grpcClient, Dtmcli.IBranch public System.Net.Http.HttpClient NewRequest() { - return _httpClient.GetHttpClient("WF"); + // return _httpClient.GetHttpClient("WF"); + return new HttpClient(new WorkflowHttpInterceptor(this)); } /// diff --git a/src/Dtmworkflow/WorkflowGrpcInterceptor.cs b/src/Dtmworkflow/WorkflowGrpcInterceptor.cs new file mode 100644 index 0000000..c9e64b4 --- /dev/null +++ b/src/Dtmworkflow/WorkflowGrpcInterceptor.cs @@ -0,0 +1,103 @@ +using System; +using System.Threading.Tasks; +using Google.Protobuf; +using Grpc.Core; +using Grpc.Core.Interceptors; +using Microsoft.Extensions.Logging; + +namespace Dtmworkflow; + +public class WorkflowGrpcInterceptor(Workflow wf, ILogger logger) : Interceptor +{ + public WorkflowGrpcInterceptor(Workflow wf) : this(wf, null) + { + } + + public override AsyncUnaryCall AsyncUnaryCall( + TRequest request, + ClientInterceptorContext context, + AsyncUnaryCallContinuation continuation) + { + logger?.LogDebug($"grpc client calling: {context.Host}{context.Method.FullName}"); + + if (wf == null) + { + return base.AsyncUnaryCall(request, context, continuation); + } + + async Task<(AsyncUnaryCall, TResponse, Status)> Origin() + { + var newContext = Dtmgimp.TransInfo2Ctx(context, wf.TransBase.Gid, wf.TransBase.TransType, wf.WorkflowImp.CurrentBranch, wf.WorkflowImp.CurrentOp, wf.TransBase.Dtm); + + var call = continuation(request, newContext); + TResponse response; + try + { + response = await call.ResponseAsync; + } + catch (Exception e) + { + logger?.LogDebug($"grpc client: {context.Host}{context.Method.FullName} ex: {e}"); + response = null; + } + + Status status = call.GetStatus(); + return ( + new AsyncUnaryCall( + call.ResponseAsync, + call.ResponseHeadersAsync, + call.GetStatus, + call.GetTrailers, + call.Dispose), + response, + status + ); + } + + if (wf.WorkflowImp.CurrentOp != DtmCommon.Constant.OpAction) + { + var (newCall, _, _) = Origin().GetAwaiter().GetResult(); + return newCall; + } + + AsyncUnaryCall call = null; + StepResult sr = wf.RecordedDo(bb => + { + (call, TResponse data, Status status) = Origin().GetAwaiter().GetResult(); + RpcException err = status.StatusCode != StatusCode.OK ? new RpcException(status) : null; + return Task.FromResult(wf.StepResultFromGrpc(data as IMessage, err)); + }).GetAwaiter().GetResult(); + wf.StepResultToGrpc(sr, null); + + return call; + } + + private class Dtmgimp + { + public static ClientInterceptorContext TransInfo2Ctx( + ClientInterceptorContext ctx, + string gid, + string transType, + string branchID, + string op, + string dtm) where TRequest : class where TResponse : class + { + // 创建一个新的元数据对象 + var headers = new Metadata(); + // 添加自定义元数据 + const string dtmpre = "dtm-"; + headers.Add(dtmpre + "gid", gid); + headers.Add(dtmpre + "trans_type", transType); + headers.Add(dtmpre + "branch_id", branchID); + headers.Add(dtmpre + "op", op); + headers.Add(dtmpre + "dtm", dtm); + // 修改上下文的元数据 + var nctx = new ClientInterceptorContext( + ctx.Method, + ctx.Host, + new CallOptions(headers: headers, deadline: ctx.Options.Deadline, cancellationToken: ctx.Options.CancellationToken)); + + return nctx; + } + } +} \ No newline at end of file diff --git a/src/Dtmworkflow/WorkflowHttpInterceptor.cs b/src/Dtmworkflow/WorkflowHttpInterceptor.cs new file mode 100644 index 0000000..81110d3 --- /dev/null +++ b/src/Dtmworkflow/WorkflowHttpInterceptor.cs @@ -0,0 +1,39 @@ +using System; +using System.Net.Http; +using System.Threading; +using System.Threading.Tasks; + +namespace Dtmworkflow; + +internal class WorkflowHttpInterceptor : DelegatingHandler +{ + private readonly Workflow _wf; + + public WorkflowHttpInterceptor(Workflow wf) + { + this._wf = wf; + InnerHandler = new HttpClientHandler(); + } + + protected override async Task SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) + { + Func> origin = async (barrier) => + { + var response = await base.SendAsync(request, cancellationToken); + return _wf.StepResultFromHTTP(response, null); + }; + + StepResult sr; + // in phase 2, do not save, because it is saved outer + if (_wf.WorkflowImp.CurrentOp != DtmCommon.Constant.OpAction) + { + sr = await origin(null); + } + else + { + sr = await _wf.RecordedDo(origin); + } + + return _wf.StepResultToHttp(sr); + } +} \ No newline at end of file diff --git a/tests/BusiGrpcService/Program.cs b/tests/BusiGrpcService/Program.cs index be2cc0d..11aa92a 100644 --- a/tests/BusiGrpcService/Program.cs +++ b/tests/BusiGrpcService/Program.cs @@ -8,6 +8,8 @@ { // Setup a HTTP/2 endpoint without TLS. options.ListenLocalhost(5005, o => o.Protocols = HttpProtocols.Http2); + // test for workflow http branch + options.ListenLocalhost(5006, o => o.Protocols = HttpProtocols.Http1); }); builder.Services.AddGrpc(); @@ -20,6 +22,15 @@ // Configure the HTTP request pipeline. app.MapGrpcService(); + +// test for workflow http branch +app.MapGet("/test-http-ok1", () => "SUCCESS"); +app.MapGet("/test-http-ok2", () => "SUCCESS"); +app.MapGet("/409", context => +{ + context.Response.StatusCode = 409; + return context.Response.WriteAsync("i am body, the http branch is 409"); // FAILURE +}); app.MapGet("/", () => "Communication with gRPC endpoints must be made through a gRPC client. To learn how to create a client, visit: https://go.microsoft.com/fwlink/?linkid=2086909"); app.Run(); diff --git a/tests/BusiGrpcService/Services/BusiApiService.cs b/tests/BusiGrpcService/Services/BusiApiService.cs index a8f1370..7c41bc0 100644 --- a/tests/BusiGrpcService/Services/BusiApiService.cs +++ b/tests/BusiGrpcService/Services/BusiApiService.cs @@ -26,7 +26,8 @@ public BusiApiService(ILogger logger, Dtmgrpc.IDtmgRPCClient cli public override async Task TransIn(BusiReq request, ServerCallContext context) { - _logger.LogInformation("TransIn req={req}", JsonSerializer.Serialize(request)); + string gid = context.RequestHeaders.Get("dtm-gid")?.Value; + _logger.LogInformation("TransIn gid={gid} req={req}", gid, JsonSerializer.Serialize(request)); if (string.IsNullOrWhiteSpace(request.TransInResult) || request.TransInResult.Equals("SUCCESS")) { @@ -86,16 +87,31 @@ public override async Task TransInRevert(BusiReq request, ServerCallConte public override async Task TransOut(BusiReq request, ServerCallContext context) { - _logger.LogInformation("TransOut req={req}", JsonSerializer.Serialize(request)); + string gid = context.RequestHeaders.Get("dtm-gid")?.Value; + _logger.LogInformation("TransOut gid={gid} req={req}", gid, JsonSerializer.Serialize(request)); await Task.CompletedTask; return new Empty(); } public override async Task TransOutTcc(BusiReq request, ServerCallContext context) { - _logger.LogInformation("TransOut req={req}", JsonSerializer.Serialize(request)); - await Task.CompletedTask; - return new Empty(); + _logger.LogInformation("TransOutTry req={req}", JsonSerializer.Serialize(request)); + + if (string.IsNullOrWhiteSpace(request.TransOutResult) || request.TransOutResult.Equals("SUCCESS")) + { + await Task.CompletedTask; + return new Empty(); + } + else if (request.TransOutResult.Equals("FAILURE")) + { + throw new Grpc.Core.RpcException(new Status(StatusCode.Aborted, "FAILURE")); + } + else if (request.TransOutResult.Equals("ONGOING")) + { + throw new Grpc.Core.RpcException(new Status(StatusCode.FailedPrecondition, "ONGOING")); + } + + throw new Grpc.Core.RpcException(new Status(StatusCode.Internal, $"unknow result {request.TransOutResult}")); } public override async Task TransOutConfirm(BusiReq request, ServerCallContext context) diff --git a/tests/Dtmgrpc.IntegrationTests/WorkflowGrpcTest.cs b/tests/Dtmgrpc.IntegrationTests/WorkflowGrpcTest.cs index 9d3735e..fa87e5e 100644 --- a/tests/Dtmgrpc.IntegrationTests/WorkflowGrpcTest.cs +++ b/tests/Dtmgrpc.IntegrationTests/WorkflowGrpcTest.cs @@ -1,19 +1,35 @@ using Microsoft.Extensions.DependencyInjection; using System; +using System.Net; +using System.Net.Http; using System.Text; using System.Threading; using System.Threading.Tasks; using busi; +using Dtmcli; +using DtmCommon; using Dtmworkflow; +using Google.Protobuf.WellKnownTypes; +using Grpc.Core.Interceptors; using Grpc.Net.Client; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using MySqlConnector; using Newtonsoft.Json; using Xunit; +using Xunit.Abstractions; namespace Dtmgrpc.IntegrationTests { public class WorkflowGrpcTest { + private readonly ITestOutputHelper _testOutputHelper; + + public WorkflowGrpcTest(ITestOutputHelper testOutputHelper) + { + _testOutputHelper = testOutputHelper; + } + [Fact] public async Task Execute_Http_Should_Succeed() { @@ -42,7 +58,7 @@ public async Task Execute_gPRC_Should_Succeed() WorkflowGlobalTransaction workflowGlobalTransaction = new WorkflowGlobalTransaction(workflowFactory, loggerFactory); string wfName1 = $"wf-simple-{Guid.NewGuid().ToString("D")[..8]}"; - workflowGlobalTransaction.Register(wfName1, async (workflow, data) => await Task.FromResult("my result"u8.ToArray())); + workflowGlobalTransaction.Register(wfName1, async (workflow, data) => await Task.FromResult("fmy result"u8.ToArray())); string gid = wfName1 + Guid.NewGuid().ToString()[..8]; var req = ITTestHelper.GenBusiReq(false, false); @@ -53,46 +69,567 @@ public async Task Execute_gPRC_Should_Succeed() } [Fact] - public async Task Execute_Success() + public async Task Execute_DoAndHttp_ShouldSuccess() { var provider = ITTestHelper.AddDtmGrpc(); var workflowFactory = provider.GetRequiredService(); var loggerFactory = provider.GetRequiredService(); WorkflowGlobalTransaction workflowGlobalTransaction = new WorkflowGlobalTransaction(workflowFactory, loggerFactory); - Busi.BusiClient busiClient = new Busi.BusiClient(GrpcChannel.ForAddress(ITTestHelper.BuisgRPCUrlWithProtocol)); + string wfName1 = $"wf-simple-{Guid.NewGuid().ToString("D")[..8]}"; + workflowGlobalTransaction.Register(wfName1, async (workflow, data) => + { + BusiReq request = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data)); + // 1. local + workflow.NewBranch().OnRollback(async (barrier) => + { + _testOutputHelper.WriteLine("1. local rollback"); + await Task.CompletedTask; + }).Do(async (barrier) => + { + return await Task.FromResult<(byte[], Exception)>(("my result"u8.ToArray(), null)); + }); + + // 2. http1, SAGA + HttpResponseMessage httpResult1 = await workflow.NewBranch().OnRollback(async (barrier) => + { + _testOutputHelper.WriteLine("4. http1 rollback"); + await workflow.NewRequest().GetAsync("http://localhost:5006/test-http-ok1"); + }).NewRequest().GetAsync("http://localhost:5006/test-http-ok1"); + + // 3. http2, TCC + HttpResponseMessage httpResult2 = await workflow.NewBranch().OnRollback(async (barrier) => + { + _testOutputHelper.WriteLine("4. http2 cancel"); + await workflow.NewRequest().GetAsync("http://localhost:5006/test-http-ok1"); + }).OnCommit(async (barrier) => + { + _testOutputHelper.WriteLine("4. http2 commit"); + // NOT must use workflow.NewRequest() + await workflow.NewRequest().GetAsync("http://localhost:5006/test-http-ok1"); + }).NewRequest().GetAsync("http://localhost:5006/test-http-ok1"); + + return await Task.FromResult("my result"u8.ToArray()); + }); + + string gid = wfName1 + Guid.NewGuid().ToString()[..8]; + var req = ITTestHelper.GenBusiReq(false, false); + + DtmClient dtmClient = new DtmClient(provider.GetRequiredService(), provider.GetRequiredService>()); + TransGlobal trans; + + // BranchID Op Status + // 01 action succeed + // 02 action succeed + // 03 action succeed + // 03 commit succeed + // first + byte[] result = await workflowGlobalTransaction.Execute(wfName1, gid, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(req))); + Assert.Equal("my result", Encoding.UTF8.GetString(result)); + trans = await dtmClient.Query(gid, CancellationToken.None); + Assert.Equal("succeed", trans.Transaction.Status); + Assert.Equal(4, trans.Branches.Count); // 1.Do x1, 2.http, saga x1, 3.Http tcc x2 + Assert.Equal("action", trans.Branches[0].Op); + Assert.Equal("succeed", trans.Branches[0].Status); + Assert.Equal("action", trans.Branches[1].Op); + Assert.Equal("succeed", trans.Branches[1].Status); + Assert.Equal("action", trans.Branches[2].Op); + Assert.Equal("succeed", trans.Branches[2].Status); + Assert.Equal("commit", trans.Branches[3].Op); + Assert.Equal("succeed", trans.Branches[3].Status); + + // same gid again + result = await workflowGlobalTransaction.Execute(wfName1, gid, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(req))); + Assert.Equal("my result", Encoding.UTF8.GetString(result)); + trans = await dtmClient.Query(gid, CancellationToken.None); + Assert.Equal("succeed", trans.Transaction.Status); + Assert.Equal(4, trans.Branches.Count); // 1.Do x1, 2.http, saga x1, 3.Http tcc x2 + Assert.Equal("action", trans.Branches[0].Op); + Assert.Equal("succeed", trans.Branches[0].Status); + Assert.Equal("action", trans.Branches[1].Op); + Assert.Equal("succeed", trans.Branches[1].Status); + Assert.Equal("action", trans.Branches[2].Op); + Assert.Equal("succeed", trans.Branches[2].Status); + Assert.Equal("commit", trans.Branches[3].Op); + Assert.Equal("succeed", trans.Branches[3].Status); + } + + [Fact] + public async Task Execute_DoAndHttp_Failed() + { + var provider = ITTestHelper.AddDtmGrpc(); + var workflowFactory = provider.GetRequiredService(); + var loggerFactory = provider.GetRequiredService(); + WorkflowGlobalTransaction workflowGlobalTransaction = new WorkflowGlobalTransaction(workflowFactory, loggerFactory); + string wfName1 = $"wf-simple-{Guid.NewGuid().ToString("D")[..8]}"; workflowGlobalTransaction.Register(wfName1, async (workflow, data) => + { + // 1. local + workflow.NewBranch().OnRollback(async (barrier) => + { + _testOutputHelper.WriteLine("1. local rollback"); + await Task.CompletedTask; + }).Do(async (barrier) => + { + return await Task.FromResult<(byte[], Exception)>(("my result"u8.ToArray(), null)); + }); + + // 2. http1 + HttpResponseMessage httpResult1 = await workflow.NewBranch().OnRollback(async (barrier) => + { + _testOutputHelper.WriteLine("4. http1 rollback"); + await Task.CompletedTask; + }).NewRequest().GetAsync("http://localhost:5006/test-http-ok1"); + + // 3. http2 + HttpResponseMessage httpResult2 = await workflow.NewBranch().OnRollback(async (barrier) => + { + _testOutputHelper.WriteLine("4. http2 rollback"); + await Task.CompletedTask; + }).NewRequest().GetAsync("http://localhost:5006/409"); // 409 + + return await Task.FromResult("my result"u8.ToArray()); + }); + + string gid = wfName1 + Guid.NewGuid().ToString()[..8]; + var req = ITTestHelper.GenBusiReq(false, false); + + DtmClient dtmClient = new DtmClient(provider.GetRequiredService(), provider.GetRequiredService>()); + TransGlobal trans; + + byte[] result = await workflowGlobalTransaction.Execute(wfName1, gid, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(req))); + Assert.Null(result); + // same gid again + await Assert.ThrowsAsync( async () => + { + await workflowGlobalTransaction.Execute(wfName1, gid, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(req))); + }); + trans = await dtmClient.Query(gid, CancellationToken.None); + Assert.Equal("failed", trans.Transaction.Status); + // BranchID Op Status CreateTime UpdateTime Url + // 01 action succeed + // 02 action succeed + // 03 action failed + // 02 rollback succeed + // 01 rollback succeed + Assert.Equal(5, trans.Branches.Count); + Assert.Equal("action", trans.Branches[0].Op); + Assert.Equal("succeed", trans.Branches[0].Status); + Assert.Equal("action", trans.Branches[1].Op); + Assert.Equal("succeed", trans.Branches[1].Status); + Assert.Equal("action", trans.Branches[2].Op); + Assert.Equal("failed", trans.Branches[2].Status); + Assert.Equal("rollback", trans.Branches[3].Op); + Assert.Equal("succeed", trans.Branches[3].Status); + Assert.Equal("rollback", trans.Branches[4].Op); + Assert.Equal("succeed", trans.Branches[4].Status); + } + + [Fact] + public async Task Execute_DoAndGrpcSAGA_Should_Success() + { + var provider = ITTestHelper.AddDtmGrpc(); + var workflowFactory = provider.GetRequiredService(); + var loggerFactory = provider.GetRequiredService(); + WorkflowGlobalTransaction workflowGlobalTransaction = new WorkflowGlobalTransaction(workflowFactory, loggerFactory); + + string wfName1 = $"{nameof(this.Execute_DoAndGrpcSAGA_Should_Success)}-{Guid.NewGuid().ToString("D")[..8]}"; + workflowGlobalTransaction.Register(wfName1, async (workflow, data) => { BusiReq request = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data)); + + // 1. local workflow.NewBranch().OnRollback(async (barrier) => + { + _testOutputHelper.WriteLine("1. local rollback"); + }).Do(async (barrier) => + { + return ("my result"u8.ToArray(), null); + }); + + // 2. grpc1 + Busi.BusiClient busiClient = null; + var wf = workflow.NewBranch().OnRollback(async (barrier) => { await busiClient.TransInRevertAsync(request); + _testOutputHelper.WriteLine("2. grpc1 rollback"); + }); + busiClient = GetBusiClientWithWf(wf, provider); + await busiClient.TransOutAsync(request); + + // 3. grpc2 + wf = workflow.NewBranch().OnRollback(async (barrier) => + { + await busiClient.TransOutRevertAsync(request); + _testOutputHelper.WriteLine("3. grpc2 rollback"); }); await busiClient.TransInAsync(request); + + return await Task.FromResult("my result"u8.ToArray()); + }); + + string gid = wfName1 + Guid.NewGuid().ToString()[..8]; + var req = ITTestHelper.GenBusiReq(false, false); + + DtmClient dtmClient = new DtmClient(provider.GetRequiredService(), provider.GetRequiredService>()); + TransGlobal trans; + + // first + byte[] result = await workflowGlobalTransaction.Execute(wfName1, gid, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(req))); + Assert.Equal("my result", Encoding.UTF8.GetString(result)); + trans = await dtmClient.Query(gid, CancellationToken.None); + Assert.Equal("succeed", trans.Transaction.Status); + Assert.Equal(3, trans.Branches.Count); // 1.Do 2.grpc 3.grpc + Assert.Equal("succeed", trans.Branches[0].Status); + Assert.Equal("succeed", trans.Branches[1].Status); + Assert.Equal("succeed", trans.Branches[2].Status); + + // same gid again + result = await workflowGlobalTransaction.Execute(wfName1, gid, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(req))); + Assert.Equal("my result", Encoding.UTF8.GetString(result)); + trans = await dtmClient.Query(gid, CancellationToken.None); + Assert.Equal("succeed", trans.Transaction.Status); + Assert.Equal(3, trans.Branches.Count); // 1.Do 2.Http 3.Http + Assert.Equal("succeed", trans.Branches[0].Status); + Assert.Equal("succeed", trans.Branches[1].Status); + Assert.Equal("succeed", trans.Branches[2].Status); + } + + [Fact] + public async Task Execute_DoAndGrpcSAGA_Should_Failed() + { + var provider = ITTestHelper.AddDtmGrpc(); + var workflowFactory = provider.GetRequiredService(); + var loggerFactory = provider.GetRequiredService(); + WorkflowGlobalTransaction workflowGlobalTransaction = new WorkflowGlobalTransaction(workflowFactory, loggerFactory); + string wfName1 = $"{nameof(this.Execute_DoAndGrpcSAGA_Should_Failed)}-{Guid.NewGuid().ToString("D")[..8]}"; + workflowGlobalTransaction.Register(wfName1, async (workflow, data) => + { + BusiReq request = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data)); + + // 1. local workflow.NewBranch().OnRollback(async (barrier) => + { + _testOutputHelper.WriteLine("1. local rollback"); + }).Do(async (barrier) => + { + return await Task.FromResult<(byte[], Exception)>(("my result"u8.ToArray(), null)); + }); + + // 2. grpc1 + Busi.BusiClient busiClient = null; + var wf = workflow.NewBranch().OnRollback(async (barrier) => + { + await busiClient.TransInRevertAsync(request); + _testOutputHelper.WriteLine("2. grpc1 rollback"); + }); + busiClient = GetBusiClientWithWf(wf, provider); + Empty response1 = await busiClient.TransOutAsync(request); + + // 3. grpc2 + wf = workflow.NewBranch().OnRollback(async (barrier) => { await busiClient.TransOutRevertAsync(request); + _testOutputHelper.WriteLine("3. grpc2 rollback"); }); - await busiClient.TransOutAsync(request); + Empty response2 = await busiClient.TransInAsync(request); + + return await Task.FromResult("my result"u8.ToArray()); + }); + string gid = wfName1 + Guid.NewGuid().ToString()[..8]; + var req = ITTestHelper.GenBusiReq(outFailed: false, inFailed: true); + + DtmClient dtmClient = new DtmClient(provider.GetRequiredService(), provider.GetRequiredService>()); + TransGlobal trans; + + // first + byte[] result = await workflowGlobalTransaction.Execute(wfName1, gid, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(req))); + // Assert.Null(result); + trans = await dtmClient.Query(gid, CancellationToken.None); + Assert.Equal("failed", trans.Transaction.Status); + // BranchID Op Status CreateTime UpdateTime Url + // 01 action succeed + // 02 action succeed + // 03 action failed + // 02 rollback succeed + // 01 rollback succeed + Assert.Equal(5, trans.Branches.Count); + Assert.Equal("action", trans.Branches[0].Op); + Assert.Equal("succeed", trans.Branches[0].Status); + Assert.Equal("action", trans.Branches[1].Op); + Assert.Equal("succeed", trans.Branches[1].Status); + Assert.Equal("action", trans.Branches[2].Op); + Assert.Equal("failed", trans.Branches[2].Status); + Assert.Equal("rollback", trans.Branches[3].Op); + Assert.Equal("succeed", trans.Branches[3].Status); + Assert.Equal("rollback", trans.Branches[4].Op); + Assert.Equal("succeed", trans.Branches[4].Status); + + // same gid again + await Assert.ThrowsAsync( async () => + { + result = await workflowGlobalTransaction.Execute(wfName1, gid, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(req))); + }); + } + + + [Fact] + public async Task Execute_GrpcTccAndDo_Should_Success() + { + var provider = ITTestHelper.AddDtmGrpc(); + var workflowFactory = provider.GetRequiredService(); + var loggerFactory = provider.GetRequiredService(); + WorkflowGlobalTransaction workflowGlobalTransaction = new WorkflowGlobalTransaction(workflowFactory, loggerFactory); + + string wfName1 = $"{nameof(this.Execute_GrpcTccAndDo_Should_Success)}-{Guid.NewGuid().ToString("D")[..8]}"; + workflowGlobalTransaction.Register(wfName1, async (workflow, data) => + { + BusiReq request = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data)); + + // 1. grpc1 TCC + Busi.BusiClient busiClient = null; + Workflow wf = workflow.NewBranch() + .OnCommit(async (barrier) => // confirm + { + await busiClient.TransOutConfirmAsync(request); + }) + .OnRollback(async (barrier) => // cancel + { + await busiClient.TransOutRevertAsync(request); + _testOutputHelper.WriteLine("1. grpc1 cancel"); + }); + busiClient = GetBusiClientWithWf(wf, provider); // busiClient的构建依赖Workflow实例,只能这么写 + // try + await busiClient.TransOutTccAsync(request); + + // 2. local, 可以是SAG, 因为排在最后,不必写反向的回滚 + workflow.NewBranch() + // .OnRollback(async (barrier) => // 反向 rollback + // { + // _testOutputHelper.WriteLine("1. local rollback"); + // }) + .Do(async (barrier) => + { + return ("my result"u8.ToArray(), null); + }); // 正向 + return await Task.FromResult("my result"u8.ToArray()); }); string gid = wfName1 + Guid.NewGuid().ToString()[..8]; var req = ITTestHelper.GenBusiReq(false, false); + + DtmClient dtmClient = new DtmClient(provider.GetRequiredService(), provider.GetRequiredService>()); + TransGlobal trans; + + // first byte[] result = await workflowGlobalTransaction.Execute(wfName1, gid, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(req))); Assert.Equal("my result", Encoding.UTF8.GetString(result)); - string status = await ITTestHelper.GetTranStatus(gid); - Assert.Equal("succeed", status); + trans = await dtmClient.Query(gid, CancellationToken.None); + // BranchID Op Status + // 01 action succeed + // 02 action succeed + // 01 commit succeed + Assert.Equal("succeed", trans.Transaction.Status); + Assert.Equal(3, trans.Branches.Count); + Assert.Equal("succeed", trans.Branches[0].Status); + Assert.Equal("action", trans.Branches[0].Op); + Assert.Equal("succeed", trans.Branches[1].Status); + Assert.Equal("action", trans.Branches[1].Op); + Assert.Equal("succeed", trans.Branches[2].Status); + Assert.Equal("commit", trans.Branches[2].Op); - // again + // same gid again result = await workflowGlobalTransaction.Execute(wfName1, gid, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(req))); Assert.Equal("my result", Encoding.UTF8.GetString(result)); - status = await ITTestHelper.GetTranStatus(gid); - Assert.Equal("succeed", status); + trans = await dtmClient.Query(gid, CancellationToken.None); + Assert.Equal("succeed", trans.Transaction.Status); + Assert.Equal(3, trans.Branches.Count); + Assert.Equal("succeed", trans.Branches[0].Status); + Assert.Equal("action", trans.Branches[0].Op); + Assert.Equal("succeed", trans.Branches[1].Status); + Assert.Equal("action", trans.Branches[1].Op); + Assert.Equal("succeed", trans.Branches[2].Status); + Assert.Equal("commit", trans.Branches[2].Op); + } + + [Fact] + public async Task Execute_GrpcTccAndDo_Should_TryFailed() + { + var provider = ITTestHelper.AddDtmGrpc(); + var workflowFactory = provider.GetRequiredService(); + var loggerFactory = provider.GetRequiredService(); + WorkflowGlobalTransaction workflowGlobalTransaction = new WorkflowGlobalTransaction(workflowFactory, loggerFactory); + + string wfName1 = $"{nameof(this.Execute_GrpcTccAndDo_Should_Success)}-{Guid.NewGuid().ToString("D")[..8]}"; + workflowGlobalTransaction.Register(wfName1, async (workflow, data) => + { + BusiReq request = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data)); + + // 1. grpc1 TCC + Busi.BusiClient busiClient = null; + Workflow wf = workflow.NewBranch() + .OnCommit(async (barrier) => // confirm + { + await busiClient.TransOutConfirmAsync(request); + }) + .OnRollback(async (barrier) => // cancel + { + await busiClient.TransOutRevertAsync(request); + _testOutputHelper.WriteLine("1. grpc1 cancel"); + }); + busiClient = GetBusiClientWithWf(wf, provider); // busiClient reference Workflow instance + // try + await busiClient.TransOutTccAsync(request); + + // 2. local, it's the tail, rollback is NOT necessary + workflow.NewBranch() + // .OnRollback(async (barrier) => // rollback + // { + // _testOutputHelper.WriteLine("1. local rollback"); + // }) + .Do(async (barrier) => + { + return ("my result"u8.ToArray(), null); + }); + + return await Task.FromResult("my result"u8.ToArray()); + }); + + string gid = wfName1 + Guid.NewGuid().ToString()[..8]; + var req = ITTestHelper.GenBusiReq(outFailed: true, inFailed: false); // 1. trans out try failed + + DtmClient dtmClient = new DtmClient(provider.GetRequiredService(), provider.GetRequiredService>()); + TransGlobal trans; + + // first + byte[] result = await workflowGlobalTransaction.Execute(wfName1, gid, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(req))); + Assert.Null(result); + trans = await dtmClient.Query(gid, CancellationToken.None); + // BranchID Op Status + // 01 action failed + Assert.Equal("failed", trans.Transaction.Status); + Assert.Equal(1, trans.Branches.Count); + Assert.Equal("failed", trans.Branches[0].Status); + Assert.Equal("action", trans.Branches[0].Op); + + // same gid again + Assert.ThrowsAsync(async () => + { + var result = await workflowGlobalTransaction.Execute(wfName1, gid, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(req))); + // DtmCommon.DtmFailureException: Status(StatusCode="Aborted", Detail="FAILURE") + // + // DtmCommon.DtmFailureException + // Status(StatusCode="Aborted", Detail="FAILURE") + // at Dtmworkflow.Workflow.Process(WfFunc2 handler, Byte[] data) in src/Dtmworkflow/Workflow.Imp.cs + // at Dtmworkflow.WorkflowGlobalTransaction.Execute(String name, String gid, Byte[] data, Boolean isHttp) in src/Dtmworkflow/WorkflowGlobalTransaction.cs + }); + + trans = await dtmClient.Query(gid, CancellationToken.None); + // BranchID Op Status + // 01 action failed + Assert.Equal("failed", trans.Transaction.Status); + Assert.Equal(1, trans.Branches.Count); + Assert.Equal("failed", trans.Branches[0].Status); + Assert.Equal("action", trans.Branches[0].Op); + } + + + [Fact] + public async Task Execute_GrpcTccAndDo_Should_DoFailed() + { + var provider = ITTestHelper.AddDtmGrpc(); + var workflowFactory = provider.GetRequiredService(); + var loggerFactory = provider.GetRequiredService(); + WorkflowGlobalTransaction workflowGlobalTransaction = new WorkflowGlobalTransaction(workflowFactory, loggerFactory); + + string wfName1 = $"{nameof(this.Execute_GrpcTccAndDo_Should_Success)}-{Guid.NewGuid().ToString("D")[..8]}"; + workflowGlobalTransaction.Register(wfName1, async (workflow, data) => + { + BusiReq request = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data)); + + // 1. grpc1 TCC + Busi.BusiClient busiClient = null; + Workflow wf = workflow.NewBranch() + .OnCommit(async (barrier) => // confirm + { + await busiClient.TransOutConfirmAsync(request); + }) + .OnRollback(async (barrier) => // cancel + { + await busiClient.TransOutRevertAsync(request); + _testOutputHelper.WriteLine("1. grpc1 cancel"); + }); + busiClient = GetBusiClientWithWf(wf, provider); // busiClient reference Workflow instance + // try + await busiClient.TransOutTccAsync(request); + + // 2. local, it's the tail, rollback is NOT necessary + (byte[] doResult, Exception ex) = await workflow.NewBranch() + .OnRollback(async (barrier) => // rollback + { + _testOutputHelper.WriteLine("1. local rollback"); + }) + .Do(async (barrier) => + { + // throw new DtmFailureException("db do failed"); // can't throw + var ex = new DtmFailureException("db do failed"); + return ("my result"u8.ToArray(), ex); + }); + if (ex != null) + throw ex; + + return await Task.FromResult("my result"u8.ToArray()); + }); + + string gid = wfName1 + Guid.NewGuid().ToString()[..8]; + var req = ITTestHelper.GenBusiReq(outFailed: false, inFailed: false); + + DtmClient dtmClient = new DtmClient(provider.GetRequiredService(), provider.GetRequiredService>()); + TransGlobal trans; + + // first + byte[] result = await workflowGlobalTransaction.Execute(wfName1, gid, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(req))); + Assert.Null(result); + trans = await dtmClient.Query(gid, CancellationToken.None); + // BranchID Op Status + // 01 action succeed + // 02 action failed + // 01 rollback succeed + Assert.Equal("failed", trans.Transaction.Status); + Assert.Equal(3, trans.Branches.Count); + Assert.Equal("succeed", trans.Branches[0].Status); + Assert.Equal("action", trans.Branches[0].Op); + Assert.Equal("failed", trans.Branches[1].Status); + Assert.Equal("action", trans.Branches[1].Op); + Assert.Equal("succeed", trans.Branches[2].Status); + Assert.Equal("rollback", trans.Branches[2].Op); + + // same gid again + Assert.ThrowsAsync(async () => + { + var result = await workflowGlobalTransaction.Execute(wfName1, gid, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(req))); + // DtmCommon.DtmFailureException + // db do failed + // at Dtmworkflow.Workflow.Process(WfFunc2 handler, Byte[] data) in src/Dtmworkflow/Workflow.Imp.cs + // at Dtmworkflow.WorkflowGlobalTransaction.Execute(String name, String gid, Byte[] data, Boolean isHttp) in src/Dtmworkflow/WorkflowGlobalTransaction.cs + // at Dtmgrpc.IntegrationTests.WorkflowGrpcTest.Execute_GrpcTccAndDo_Should_DoFailed() in tests/Dtmgrpc.IntegrationTests/WorkflowGrpcTest.cs + }); + } + + private static Busi.BusiClient GetBusiClientWithWf(Workflow wf, ServiceProvider provider) + { + var loggerFactory = provider.GetRequiredService(); + var channel = GrpcChannel.ForAddress(ITTestHelper.BuisgRPCUrlWithProtocol); + var logger = loggerFactory.CreateLogger(); + var interceptor = new WorkflowGrpcInterceptor(wf, logger); // inject client interceptor, and workflow instance + var callInvoker = channel.Intercept(interceptor); + Busi.BusiClient busiClient = new Busi.BusiClient(callInvoker); + return busiClient; } } } \ No newline at end of file