Skip to content

Feature/workflow http/grpc record interceptor #97

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 11 commits into from
3 changes: 2 additions & 1 deletion .github/workflows/build_and_it.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ jobs:
tar -xvf dtm_1.18.0_linux_amd64.tar.gz
pwd
mkdir /home/runner/work/client-csharp/client-csharp/logs
nohup ./dtm > /home/runner/work/client-csharp/client-csharp/logs/dtm.log 2>&1 &
echo "UpdateBranchSync: 1" > ./config.yml
nohup ./dtm -c ./config.yml > /home/runner/work/client-csharp/client-csharp/logs/dtm.log 2>&1 &
sleep 5
curl "127.0.0.1:36789/api/dtmsvr/newGid"
- name: Setup Busi Service
Expand Down
5 changes: 3 additions & 2 deletions src/DtmCommon/Constant.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ public class Barrier

public static readonly Dictionary<string, string> OpDict = new Dictionary<string, string>()
{
{ "cancel", "try" },
{ "compensate", "action" },
{ "cancel", "try" }, // tcc
{ "compensate", "action" }, // saga
{ "rollback", "action" }, // workflow
};
public static readonly string REDIS_LUA_CheckAdjustAmount = @" -- RedisCheckAdjustAmount
local v = redis.call('GET', KEYS[1])
Expand Down
3 changes: 2 additions & 1 deletion src/Dtmcli/Constant.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ internal static class Constant
{
internal const string DtmClientHttpName = "dtmClient";
internal const string BranchClientHttpName = "branchClient";

internal const string WorkflowBranchClientHttpName = "WF";

internal static class Request
{
internal const string CONTENT_TYPE = "application/json";
Expand Down
4 changes: 2 additions & 2 deletions src/Dtmcli/DtmClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public async Task<TransGlobal> Query(string gid, CancellationToken cancellationT
var client = _httpClientFactory.CreateClient(Constant.DtmClientHttpName);
var response = await client.GetAsync(url, cancellationToken).ConfigureAwait(false);
var dtmContent = await response.Content.ReadAsStringAsync().ConfigureAwait(false);
DtmImp.Utils.CheckStatus(response.StatusCode, dtmContent);
DtmImp.Utils.CheckStatusCode(response.StatusCode);
return JsonSerializer.Deserialize<TransGlobal>(dtmContent, _jsonOptions);
}

Expand All @@ -167,7 +167,7 @@ public async Task<string> QueryStatus(string gid, CancellationToken cancellation
var client = _httpClientFactory.CreateClient(Constant.DtmClientHttpName);
var response = await client.GetAsync(url, cancellationToken).ConfigureAwait(false);
var dtmContent = await response.Content.ReadAsStringAsync().ConfigureAwait(false);
DtmImp.Utils.CheckStatus(response.StatusCode, dtmContent);
DtmImp.Utils.CheckStatusCode(response.StatusCode);
var graph = JsonSerializer.Deserialize<TransGlobalForStatus>(dtmContent, _jsonOptions);
return graph.Transaction == null
? string.Empty
Expand Down
8 changes: 8 additions & 0 deletions src/Dtmcli/DtmImp/Utils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ public static void CheckStatus(HttpStatusCode status, string dtmResult)
}
}

public static void CheckStatusCode(HttpStatusCode status)
{
if (status != HttpStatusCode.OK)
{
throw new DtmException(string.Format(CheckStatusMsgFormat, status.ToString(), string.Empty));
}
}

/// <summary>
/// OrString return the first not null or not empty string
/// </summary>
Expand Down
6 changes: 3 additions & 3 deletions src/Dtmcli/TransGlobal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class DtmTransaction
{
[JsonPropertyName("id")] public int Id { get; set; }

[JsonPropertyName("create_time")] public DateTimeOffset CreateTime { get; set; }
[JsonPropertyName("create_time")] public DateTimeOffset? CreateTime { get; set; }

[JsonPropertyName("update_time")] public DateTimeOffset UpdateTime { get; set; }

Expand Down Expand Up @@ -64,9 +64,9 @@ public class DtmBranch
{
[JsonPropertyName("id")] public int Id { get; set; }

[JsonPropertyName("create_time")] public DateTimeOffset CreateTime { get; set; }
[JsonPropertyName("create_time")] public DateTimeOffset? CreateTime { get; set; }

[JsonPropertyName("update_time")] public DateTimeOffset UpdateTime { get; set; }
[JsonPropertyName("update_time")] public DateTimeOffset? UpdateTime { get; set; }

[JsonPropertyName("gid")] public string Gid { get; set; }

Expand Down
18 changes: 17 additions & 1 deletion src/Dtmworkflow/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public static IServiceCollection AddDtmWorkflow(this IServiceCollection services
services.TryAddSingleton<IWorkflowFactory, WorkflowFactory>();
services.TryAddSingleton<WorkflowGlobalTransaction>();

// AddHttpClient(services);

return services;
}

Expand All @@ -33,8 +35,22 @@ public static IServiceCollection AddDtmWorkflow(this IServiceCollection services

services.TryAddSingleton<IWorkflowFactory, WorkflowFactory>();
services.TryAddSingleton<WorkflowGlobalTransaction>();

// AddHttpClient(services);

return services;
}

// private static void AddHttpClient(IServiceCollection services /*, DtmOptions options*/)
// {
// services.AddHttpClient(Dtmcli.Constant.WorkflowBranchClientHttpName, client =>
// {
// // TODO DtmOptions
// // client.Timeout = TimeSpan.FromMilliseconds(options.BranchTimeout);
// }).AddHttpMessageHandler<WorkflowHttpInterceptor>();
//
// // TODO how to inject workflow instance?
// services.AddTransient<WorkflowHttpInterceptor>();
// }
}
}
}
24 changes: 12 additions & 12 deletions src/Dtmworkflow/Workflow.Imp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
}

err = Utils.GrpcError2DtmError(err);

if (err != null && err is not DtmCommon.DtmFailureException) throw err;

try
Expand Down Expand Up @@ -161,23 +161,23 @@
};
}

private Exception StepResultToGrpc(StepResult r, IMessage reply)
internal Exception StepResultToGrpc(StepResult r, IMessage reply)
{
if (r.Error == null && r.Status == DtmCommon.Constant.StatusSucceed)
{
// Check

// TODO Check
// dtmgimp.MustProtoUnmarshal(s.Data, reply.(protoreflect.ProtoMessage));
}

return r.Error;
}

private StepResult StepResultFromGrpc(IMessage reply, Exception err)
internal StepResult StepResultFromGrpc(IMessage reply, Exception err)
{
var sr = new StepResult
{
// GRPCError2DtmError
Error = null,
// TODO GRPCError2DtmError
Error = Utils.GrpcError2DtmError(err),

Check warning on line 180 in src/Dtmworkflow/Workflow.Imp.cs

View check run for this annotation

Codecov / codecov/patch

src/Dtmworkflow/Workflow.Imp.cs#L179-L180

Added lines #L179 - L180 were not covered by tests
};

sr.Status = WfErrorToStatus(sr.Error);
Expand All @@ -193,7 +193,7 @@
return sr;
}

private HttpResponseMessage StepResultToHttp(StepResult r)
internal HttpResponseMessage StepResultToHttp(StepResult r)
{
if (r.Error != null)
{
Expand All @@ -203,7 +203,7 @@
return Utils.NewJSONResponse(HttpStatusCode.OK, r.Data);
}

private StepResult StepResultFromHTTP(HttpResponseMessage resp, Exception err)
internal StepResult StepResultFromHTTP(HttpResponseMessage resp, Exception err)
{
var sr = new StepResult
{
Expand All @@ -212,7 +212,7 @@

if (err == null)
{
// HTTPResp2DtmError
(sr.Data, sr.Error) = Utils.HTTPResp2DtmError(resp); // TODO go 使用了 this.Options.HTTPResp2DtmError(resp), 方便定制

Check warning on line 215 in src/Dtmworkflow/Workflow.Imp.cs

View check run for this annotation

Codecov / codecov/patch

src/Dtmworkflow/Workflow.Imp.cs#L215

Added line #L215 was not covered by tests
sr.Status = WfErrorToStatus(sr.Error);
}

Expand All @@ -234,9 +234,9 @@
}


private async Task<StepResult> RecordedDo(Func<DtmCommon.BranchBarrier, Task<StepResult>> fn)
internal async Task<StepResult> RecordedDo(Func<DtmCommon.BranchBarrier, Task<StepResult>> fn)
{
var sr = await this.RecordedDoInner(fn);
StepResult sr = await this.RecordedDoInner(fn);

// do not compensate the failed branch if !CompensateErrorBranch
if (this.Options.CompensateErrorBranch && sr.Status == DtmCommon.Constant.StatusFailed)
Expand Down
5 changes: 4 additions & 1 deletion src/Dtmworkflow/Workflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;

namespace Dtmworkflow
{
Expand Down Expand Up @@ -32,7 +34,8 @@ public Workflow(IDtmClient httpClient, IDtmgRPCClient grpcClient, Dtmcli.IBranch

public System.Net.Http.HttpClient NewRequest()
{
return _httpClient.GetHttpClient("WF");
// return _httpClient.GetHttpClient("WF");
return new HttpClient(new WorkflowHttpInterceptor(this));
}

/// <summary>
Expand Down
103 changes: 103 additions & 0 deletions src/Dtmworkflow/WorkflowGrpcInterceptor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
using System;
using System.Threading.Tasks;
using Google.Protobuf;
using Grpc.Core;
using Grpc.Core.Interceptors;
using Microsoft.Extensions.Logging;

namespace Dtmworkflow;

public class WorkflowGrpcInterceptor(Workflow wf, ILogger<WorkflowGrpcInterceptor> logger) : Interceptor

Check warning on line 10 in src/Dtmworkflow/WorkflowGrpcInterceptor.cs

View check run for this annotation

Codecov / codecov/patch

src/Dtmworkflow/WorkflowGrpcInterceptor.cs#L10

Added line #L10 was not covered by tests
{
public WorkflowGrpcInterceptor(Workflow wf) : this(wf, null)
{
}

Check warning on line 14 in src/Dtmworkflow/WorkflowGrpcInterceptor.cs

View check run for this annotation

Codecov / codecov/patch

src/Dtmworkflow/WorkflowGrpcInterceptor.cs#L12-L14

Added lines #L12 - L14 were not covered by tests

public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(
TRequest request,
ClientInterceptorContext<TRequest, TResponse> context,
AsyncUnaryCallContinuation<TRequest, TResponse> continuation)
{

Check warning on line 20 in src/Dtmworkflow/WorkflowGrpcInterceptor.cs

View check run for this annotation

Codecov / codecov/patch

src/Dtmworkflow/WorkflowGrpcInterceptor.cs#L20

Added line #L20 was not covered by tests
logger?.LogDebug($"grpc client calling: {context.Host}{context.Method.FullName}");

if (wf == null)
{
return base.AsyncUnaryCall(request, context, continuation);

Check warning on line 25 in src/Dtmworkflow/WorkflowGrpcInterceptor.cs

View check run for this annotation

Codecov / codecov/patch

src/Dtmworkflow/WorkflowGrpcInterceptor.cs#L24-L25

Added lines #L24 - L25 were not covered by tests
}

async Task<(AsyncUnaryCall<TResponse>, TResponse, Status)> Origin()
{
var newContext = Dtmgimp.TransInfo2Ctx(context, wf.TransBase.Gid, wf.TransBase.TransType, wf.WorkflowImp.CurrentBranch, wf.WorkflowImp.CurrentOp, wf.TransBase.Dtm);

Check warning on line 30 in src/Dtmworkflow/WorkflowGrpcInterceptor.cs

View check run for this annotation

Codecov / codecov/patch

src/Dtmworkflow/WorkflowGrpcInterceptor.cs#L29-L30

Added lines #L29 - L30 were not covered by tests

var call = continuation(request, newContext);

Check warning on line 32 in src/Dtmworkflow/WorkflowGrpcInterceptor.cs

View check run for this annotation

Codecov / codecov/patch

src/Dtmworkflow/WorkflowGrpcInterceptor.cs#L32

Added line #L32 was not covered by tests
TResponse response;
try
{
response = await call.ResponseAsync;
}
catch (Exception e)
{

Check warning on line 39 in src/Dtmworkflow/WorkflowGrpcInterceptor.cs

View check run for this annotation

Codecov / codecov/patch

src/Dtmworkflow/WorkflowGrpcInterceptor.cs#L35-L39

Added lines #L35 - L39 were not covered by tests
logger?.LogDebug($"grpc client: {context.Host}{context.Method.FullName} ex: {e}");
response = null;
}

Check warning on line 42 in src/Dtmworkflow/WorkflowGrpcInterceptor.cs

View check run for this annotation

Codecov / codecov/patch

src/Dtmworkflow/WorkflowGrpcInterceptor.cs#L41-L42

Added lines #L41 - L42 were not covered by tests

Status status = call.GetStatus();
return (
new AsyncUnaryCall<TResponse>(
call.ResponseAsync,
call.ResponseHeadersAsync,
call.GetStatus,
call.GetTrailers,
call.Dispose),
response,
status
);
}

Check warning on line 55 in src/Dtmworkflow/WorkflowGrpcInterceptor.cs

View check run for this annotation

Codecov / codecov/patch

src/Dtmworkflow/WorkflowGrpcInterceptor.cs#L44-L55

Added lines #L44 - L55 were not covered by tests

if (wf.WorkflowImp.CurrentOp != DtmCommon.Constant.OpAction)
{
var (newCall, _, _) = Origin().GetAwaiter().GetResult();
return newCall;

Check warning on line 60 in src/Dtmworkflow/WorkflowGrpcInterceptor.cs

View check run for this annotation

Codecov / codecov/patch

src/Dtmworkflow/WorkflowGrpcInterceptor.cs#L58-L60

Added lines #L58 - L60 were not covered by tests
}

AsyncUnaryCall<TResponse> call = null;
StepResult sr = wf.RecordedDo(bb =>
{
(call, TResponse data, Status status) = Origin().GetAwaiter().GetResult();

Check warning on line 66 in src/Dtmworkflow/WorkflowGrpcInterceptor.cs

View check run for this annotation

Codecov / codecov/patch

src/Dtmworkflow/WorkflowGrpcInterceptor.cs#L63-L66

Added lines #L63 - L66 were not covered by tests
RpcException err = status.StatusCode != StatusCode.OK ? new RpcException(status) : null;
return Task.FromResult(wf.StepResultFromGrpc(data as IMessage, err));
}).GetAwaiter().GetResult();
wf.StepResultToGrpc(sr, null);

Check warning on line 70 in src/Dtmworkflow/WorkflowGrpcInterceptor.cs

View check run for this annotation

Codecov / codecov/patch

src/Dtmworkflow/WorkflowGrpcInterceptor.cs#L68-L70

Added lines #L68 - L70 were not covered by tests

return call;
}

Check warning on line 73 in src/Dtmworkflow/WorkflowGrpcInterceptor.cs

View check run for this annotation

Codecov / codecov/patch

src/Dtmworkflow/WorkflowGrpcInterceptor.cs#L72-L73

Added lines #L72 - L73 were not covered by tests

private class Dtmgimp
{
public static ClientInterceptorContext<TRequest, TResponse> TransInfo2Ctx<TRequest, TResponse>(
ClientInterceptorContext<TRequest, TResponse> ctx,
string gid,
string transType,
string branchID,
string op,
string dtm) where TRequest : class where TResponse : class
{

Check warning on line 84 in src/Dtmworkflow/WorkflowGrpcInterceptor.cs

View check run for this annotation

Codecov / codecov/patch

src/Dtmworkflow/WorkflowGrpcInterceptor.cs#L84

Added line #L84 was not covered by tests
// 创建一个新的元数据对象
var headers = new Metadata();

Check warning on line 86 in src/Dtmworkflow/WorkflowGrpcInterceptor.cs

View check run for this annotation

Codecov / codecov/patch

src/Dtmworkflow/WorkflowGrpcInterceptor.cs#L86

Added line #L86 was not covered by tests
// 添加自定义元数据
const string dtmpre = "dtm-";
headers.Add(dtmpre + "gid", gid);
headers.Add(dtmpre + "trans_type", transType);
headers.Add(dtmpre + "branch_id", branchID);
headers.Add(dtmpre + "op", op);
headers.Add(dtmpre + "dtm", dtm);

Check warning on line 93 in src/Dtmworkflow/WorkflowGrpcInterceptor.cs

View check run for this annotation

Codecov / codecov/patch

src/Dtmworkflow/WorkflowGrpcInterceptor.cs#L89-L93

Added lines #L89 - L93 were not covered by tests
// 修改上下文的元数据
var nctx = new ClientInterceptorContext<TRequest, TResponse>(
ctx.Method,
ctx.Host,
new CallOptions(headers: headers, deadline: ctx.Options.Deadline, cancellationToken: ctx.Options.CancellationToken));

Check warning on line 98 in src/Dtmworkflow/WorkflowGrpcInterceptor.cs

View check run for this annotation

Codecov / codecov/patch

src/Dtmworkflow/WorkflowGrpcInterceptor.cs#L95-L98

Added lines #L95 - L98 were not covered by tests

return nctx;
}

Check warning on line 101 in src/Dtmworkflow/WorkflowGrpcInterceptor.cs

View check run for this annotation

Codecov / codecov/patch

src/Dtmworkflow/WorkflowGrpcInterceptor.cs#L100-L101

Added lines #L100 - L101 were not covered by tests
}
}
39 changes: 39 additions & 0 deletions src/Dtmworkflow/WorkflowHttpInterceptor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
using System;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;

namespace Dtmworkflow;

internal class WorkflowHttpInterceptor : DelegatingHandler
{
private readonly Workflow _wf;

public WorkflowHttpInterceptor(Workflow wf)
{
this._wf = wf;
InnerHandler = new HttpClientHandler();
}

protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
{
Func<DtmCommon.BranchBarrier, Task<StepResult>> origin = async (barrier) =>
{
var response = await base.SendAsync(request, cancellationToken);
return _wf.StepResultFromHTTP(response, null);
};

Check warning on line 24 in src/Dtmworkflow/WorkflowHttpInterceptor.cs

View check run for this annotation

Codecov / codecov/patch

src/Dtmworkflow/WorkflowHttpInterceptor.cs#L19-L24

Added lines #L19 - L24 were not covered by tests

StepResult sr;
// in phase 2, do not save, because it is saved outer
if (_wf.WorkflowImp.CurrentOp != DtmCommon.Constant.OpAction)
{
sr = await origin(null);
}

Check warning on line 31 in src/Dtmworkflow/WorkflowHttpInterceptor.cs

View check run for this annotation

Codecov / codecov/patch

src/Dtmworkflow/WorkflowHttpInterceptor.cs#L29-L31

Added lines #L29 - L31 were not covered by tests
else
{
sr = await _wf.RecordedDo(origin);
}

Check warning on line 35 in src/Dtmworkflow/WorkflowHttpInterceptor.cs

View check run for this annotation

Codecov / codecov/patch

src/Dtmworkflow/WorkflowHttpInterceptor.cs#L33-L35

Added lines #L33 - L35 were not covered by tests

return _wf.StepResultToHttp(sr);
}

Check warning on line 38 in src/Dtmworkflow/WorkflowHttpInterceptor.cs

View check run for this annotation

Codecov / codecov/patch

src/Dtmworkflow/WorkflowHttpInterceptor.cs#L37-L38

Added lines #L37 - L38 were not covered by tests
}
11 changes: 11 additions & 0 deletions tests/BusiGrpcService/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
{
// Setup a HTTP/2 endpoint without TLS.
options.ListenLocalhost(5005, o => o.Protocols = HttpProtocols.Http2);
// test for workflow http branch
options.ListenLocalhost(5006, o => o.Protocols = HttpProtocols.Http1);
});

builder.Services.AddGrpc();
Expand All @@ -20,6 +22,15 @@

// Configure the HTTP request pipeline.
app.MapGrpcService<BusiApiService>();

// test for workflow http branch
app.MapGet("/test-http-ok1", () => "SUCCESS");
app.MapGet("/test-http-ok2", () => "SUCCESS");
app.MapGet("/409", context =>
{
context.Response.StatusCode = 409;
return context.Response.WriteAsync("i am body, the http branch is 409"); // FAILURE
});
app.MapGet("/", () => "Communication with gRPC endpoints must be made through a gRPC client. To learn how to create a client, visit: https://go.microsoft.com/fwlink/?linkid=2086909");

app.Run();
Loading
Loading