Skip to content

Commit 2640635

Browse files
authored
Add tracing logs for Nexus HTTP request retries (#7186)
## What changed? <!-- Describe what has changed in this PR --> Added additional HTTP tracing logs for the first 3 Nexus request retries. Both minimum and maximum attempts to add additional tracing info to are configurable. ## Why? <!-- Tell your future self why have you made these changes --> To aid in debugging. ## How did you test it? <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> ## Potential risks <!-- Assuming the worst case, what can be broken when deploying this change to production? --> ## Documentation <!-- Have you made sure this change doesn't falsify anything currently stated in `docs/`? If significant new behavior is added, have you described that in `docs/`? --> ## Is hotfix candidate? <!-- Is this PR a hotfix candidate or does it require a notification to be sent to the broader community? (Yes/No) -->
1 parent f34f860 commit 2640635

File tree

6 files changed

+254
-5
lines changed

6 files changed

+254
-5
lines changed

common/log/tag/tags.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,11 @@ func Timestamp(timestamp time.Time) ZapTag {
8686
return NewTimeTag("timestamp", timestamp)
8787
}
8888

89+
// RequestID returns tag for RequestID
90+
func RequestID(requestID string) ZapTag {
91+
return NewStringTag("request-id", requestID)
92+
}
93+
8994
// ========== Workflow tags defined here: ( wf is short for workflow) ==========
9095

9196
// WorkflowAction returns tag for WorkflowAction

common/nexus/trace.go

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
// The MIT License
2+
//
3+
// Copyright (c) 2025 Temporal Technologies Inc. All rights reserved.
4+
//
5+
// Permission is hereby granted, free of charge, to any person obtaining a copy
6+
// of this software and associated documentation files (the "Software"), to deal
7+
// in the Software without restriction, including without limitation the rights
8+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
// copies of the Software, and to permit persons to whom the Software is
10+
// furnished to do so, subject to the following conditions:
11+
//
12+
// The above copyright notice and this permission notice shall be included in
13+
// all copies or substantial portions of the Software.
14+
//
15+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21+
// THE SOFTWARE.
22+
23+
package nexus
24+
25+
import (
26+
"crypto/tls"
27+
"net/http/httptrace"
28+
"time"
29+
30+
"go.temporal.io/server/common/dynamicconfig"
31+
"go.temporal.io/server/common/log"
32+
"go.temporal.io/server/common/log/tag"
33+
)
34+
35+
type HTTPClientTraceProvider interface {
36+
// NewTrace returns a *httptrace.ClientTrace which provides hooks to invoke at each point in the HTTP request
37+
// lifecycle. This trace must be added to the HTTP request context using httptrace.WithClientTrace for the hooks to
38+
// be invoked. The provided logger should already be tagged with relevant request information
39+
// e.g. using log.With(logger, tag.RequestID(id), tag.Operation(op), ...).
40+
NewTrace(attempt int32, logger log.Logger) *httptrace.ClientTrace
41+
}
42+
43+
// HTTPTraceConfig is the dynamic config for controlling Nexus HTTP request tracing behavior.
44+
// The default is nil and the conversion function does not do any actual conversion because this should be wrapped by
45+
// a dynamicconfig.NewGlobalCachedTypedValue with the actual conversion function so that it is cached.
46+
var HTTPTraceConfig = dynamicconfig.NewGlobalTypedSettingWithConverter(
47+
"system.nexusHTTPTraceConfig",
48+
func(a any) (any, error) { return a, nil },
49+
nil,
50+
`Configuration options for controlling additional tracing for Nexus HTTP requests. Fields: Enabled, MinAttempt, MaxAttempt, Hooks. See HTTPClientTraceConfig comments for more detail.`,
51+
)
52+
53+
type HTTPClientTraceConfig struct {
54+
// Enabled controls whether any additional tracing will be invoked. Default false.
55+
Enabled bool
56+
// MinAttempt is the first operation attempt to include additional tracing. Default 2. Setting to 0 or 1 will add tracing to all requests and may be expensive.
57+
MinAttempt int32
58+
// MaxAttempt is the maximum operation attempt to include additional tracing. Default 2. Setting to 0 means no maximum.
59+
MaxAttempt int32
60+
// Hooks is the list of method names to invoke with extra tracing. See httptrace.ClientTrace for more detail.
61+
// Defaults to all implemented hooks: GetConn, GotConn, ConnectStart, ConnectDone, DNSStart, DNSDone, TLSHandshakeStart, TLSHandshakeDone, WroteRequest, GotFirstResponseByte.
62+
Hooks []string
63+
}
64+
65+
var defaultHTTPClientTraceConfig = HTTPClientTraceConfig{
66+
Enabled: false,
67+
MinAttempt: 2,
68+
MaxAttempt: 2,
69+
// Set to nil here because of dynamic config conversion limitations.
70+
Hooks: []string(nil),
71+
}
72+
73+
var defaultHTTPClientTraceHooks = []string{"GetConn", "GotConn", "ConnectStart", "ConnectDone", "DNSStart", "DNSDone", "TLSHandshakeStart", "TLSHandshakeDone", "WroteRequest", "GotFirstResponseByte"}
74+
75+
func convertHTTPClientTraceConfig(in any) (HTTPClientTraceConfig, error) {
76+
cfg, err := dynamicconfig.ConvertStructure(defaultHTTPClientTraceConfig)(in)
77+
if err != nil {
78+
cfg = defaultHTTPClientTraceConfig
79+
}
80+
if len(cfg.Hooks) == 0 {
81+
cfg.Hooks = defaultHTTPClientTraceHooks
82+
}
83+
return cfg, nil
84+
}
85+
86+
type LoggedHTTPClientTraceProvider struct {
87+
Config *dynamicconfig.GlobalCachedTypedValue[HTTPClientTraceConfig]
88+
}
89+
90+
func NewLoggedHTTPClientTraceProvider(dc *dynamicconfig.Collection) HTTPClientTraceProvider {
91+
return &LoggedHTTPClientTraceProvider{
92+
Config: dynamicconfig.NewGlobalCachedTypedValue(dc, HTTPTraceConfig, convertHTTPClientTraceConfig),
93+
}
94+
}
95+
96+
//nolint:revive // cognitive complexity (> 25 max) but is just adding a logging function for each method in the list.
97+
func (p *LoggedHTTPClientTraceProvider) NewTrace(attempt int32, logger log.Logger) *httptrace.ClientTrace {
98+
config := p.Config.Get()
99+
if !config.Enabled {
100+
return nil
101+
}
102+
if attempt < config.MinAttempt {
103+
return nil
104+
}
105+
if config.MaxAttempt > 0 && attempt > config.MaxAttempt {
106+
return nil
107+
}
108+
109+
clientTrace := &httptrace.ClientTrace{}
110+
for _, h := range config.Hooks {
111+
switch h {
112+
case "GetConn":
113+
clientTrace.GetConn = func(hostPort string) {
114+
logger.Info("attempting to get HTTP connection for Nexus request",
115+
tag.Timestamp(time.Now().UTC()),
116+
tag.Address(hostPort))
117+
}
118+
case "GotConn":
119+
clientTrace.GotConn = func(info httptrace.GotConnInfo) {
120+
logger.Info("got HTTP connection for Nexus request",
121+
tag.Timestamp(time.Now().UTC()),
122+
tag.NewBoolTag("reused", info.Reused),
123+
tag.NewBoolTag("was-idle", info.WasIdle),
124+
tag.NewDurationTag("idle-time", info.IdleTime))
125+
}
126+
case "ConnectStart":
127+
clientTrace.ConnectStart = func(network, addr string) {
128+
logger.Info("starting dial for new connection for Nexus request",
129+
tag.Timestamp(time.Now().UTC()),
130+
tag.Address(addr),
131+
tag.NewStringTag("network", network))
132+
}
133+
case "ConnectDone":
134+
clientTrace.ConnectDone = func(network, addr string, err error) {
135+
logger.Info("finished dial for new connection for Nexus request",
136+
tag.Timestamp(time.Now().UTC()),
137+
tag.Address(addr),
138+
tag.NewStringTag("network", network),
139+
tag.Error(err))
140+
}
141+
case "DNSStart":
142+
clientTrace.DNSStart = func(info httptrace.DNSStartInfo) {
143+
logger.Info("starting DNS lookup for Nexus request",
144+
tag.Timestamp(time.Now().UTC()),
145+
tag.Host(info.Host))
146+
}
147+
case "DNSDone":
148+
clientTrace.DNSDone = func(info httptrace.DNSDoneInfo) {
149+
addresses := make([]string, len(info.Addrs))
150+
for i, a := range info.Addrs {
151+
addresses[i] = a.String()
152+
}
153+
logger.Info("finished DNS lookup for Nexus request",
154+
tag.Timestamp(time.Now().UTC()),
155+
tag.Addresses(addresses),
156+
tag.Error(info.Err),
157+
tag.NewBoolTag("coalesced", info.Coalesced))
158+
}
159+
case "TLSHandshakeStart":
160+
clientTrace.TLSHandshakeStart = func() {
161+
logger.Info("starting TLS handshake for Nexus request", tag.Timestamp(time.Now().UTC()))
162+
}
163+
case "TLSHandshakeDone":
164+
clientTrace.TLSHandshakeDone = func(state tls.ConnectionState, err error) {
165+
logger.Info("finished TLS handshake for Nexus request",
166+
tag.Timestamp(time.Now().UTC()),
167+
tag.NewBoolTag("handshake-complete", state.HandshakeComplete),
168+
tag.Error(err))
169+
}
170+
case "WroteRequest":
171+
clientTrace.WroteRequest = func(info httptrace.WroteRequestInfo) {
172+
logger.Info("finished writing Nexus HTTP request",
173+
tag.Timestamp(time.Now().UTC()),
174+
tag.Error(info.Err))
175+
}
176+
case "GotFirstResponseByte":
177+
clientTrace.GotFirstResponseByte = func() {
178+
logger.Info("got response to Nexus HTTP request", tag.AttemptEnd(time.Now().UTC()))
179+
}
180+
}
181+
}
182+
return clientTrace
183+
}

components/callbacks/executors.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"go.temporal.io/server/common/log"
3232
"go.temporal.io/server/common/metrics"
3333
"go.temporal.io/server/common/namespace"
34+
commonnexus "go.temporal.io/server/common/nexus"
3435
"go.temporal.io/server/common/resource"
3536
"go.temporal.io/server/service/history/hsm"
3637
"go.temporal.io/server/service/history/queues"
@@ -66,6 +67,7 @@ type TaskExecutorOptions struct {
6667
MetricsHandler metrics.Handler
6768
Logger log.Logger
6869
HTTPCallerProvider HTTPCallerProvider
70+
HTTPTraceProvider commonnexus.HTTPClientTraceProvider
6971
HistoryClient resource.HistoryClient
7072
}
7173

@@ -168,6 +170,9 @@ func (e taskExecutor) loadInvocationArgs(
168170
nexusInvokable := nexusInvocation{}
169171
nexusInvokable.nexus = variant.Nexus
170172
nexusInvokable.completion, err = target.GetNexusCompletion(ctx)
173+
nexusInvokable.workflowID = ref.WorkflowKey.WorkflowID
174+
nexusInvokable.runID = ref.WorkflowKey.RunID
175+
nexusInvokable.attempt = callback.Attempt
171176
invokable = nexusInvokable
172177
if err != nil {
173178
return err

components/callbacks/nexus_invocation.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,13 @@ import (
2727
"fmt"
2828
"io"
2929
"net/http"
30+
"net/http/httptrace"
3031
"slices"
3132
"time"
3233

3334
"github.com/nexus-rpc/sdk-go/nexus"
3435
persistencespb "go.temporal.io/server/api/persistence/v1"
36+
"go.temporal.io/server/common/log"
3537
"go.temporal.io/server/common/log/tag"
3638
"go.temporal.io/server/common/metrics"
3739
"go.temporal.io/server/common/namespace"
@@ -48,8 +50,10 @@ type CanGetNexusCompletion interface {
4850
}
4951

5052
type nexusInvocation struct {
51-
nexus *persistencespb.Callback_Nexus
52-
completion nexus.OperationCompletion
53+
nexus *persistencespb.Callback_Nexus
54+
completion nexus.OperationCompletion
55+
workflowID, runID string
56+
attempt int32
5357
}
5458

5559
func isRetryableHTTPResponse(response *http.Response) bool {
@@ -74,6 +78,21 @@ func (n nexusInvocation) WrapError(result invocationResult, err error) error {
7478
}
7579

7680
func (n nexusInvocation) Invoke(ctx context.Context, ns *namespace.Namespace, e taskExecutor, task InvocationTask) invocationResult {
81+
if e.HTTPTraceProvider != nil {
82+
traceLogger := log.With(e.Logger,
83+
tag.WorkflowNamespace(ns.Name().String()),
84+
tag.Operation("CompleteNexusOperation"),
85+
tag.NewStringTag("destination", task.destination),
86+
tag.WorkflowID(n.workflowID),
87+
tag.WorkflowRunID(n.runID),
88+
tag.AttemptStart(time.Now().UTC()),
89+
tag.Attempt(n.attempt),
90+
)
91+
if trace := e.HTTPTraceProvider.NewTrace(n.attempt, traceLogger); trace != nil {
92+
ctx = httptrace.WithClientTrace(ctx, trace)
93+
}
94+
}
95+
7796
request, err := nexus.NewCompletionHTTPRequest(ctx, n.nexus.Url, n.completion)
7897
if err != nil {
7998
return invocationResultFail{queues.NewUnprocessableTaskError(

components/nexusoperations/executors.go

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"context"
2727
"errors"
2828
"fmt"
29+
"net/http/httptrace"
2930
"strings"
3031
"sync/atomic"
3132
"text/template"
@@ -67,6 +68,7 @@ type TaskExecutorOptions struct {
6768
CallbackTokenGenerator *commonnexus.CallbackTokenGenerator
6869
ClientProvider ClientProvider
6970
EndpointRegistry commonnexus.EndpointRegistry
71+
HTTPTraceProvider commonnexus.HTTPClientTraceProvider
7072
}
7173

7274
func RegisterExecutor(
@@ -201,6 +203,22 @@ func (e taskExecutor) executeInvocationTask(ctx context.Context, env hsm.Environ
201203
callCtx, cancel := context.WithTimeout(ctx, callTimeout)
202204
defer cancel()
203205

206+
if e.HTTPTraceProvider != nil {
207+
traceLogger := log.With(e.Logger,
208+
tag.WorkflowNamespace(ns.Name().String()),
209+
tag.RequestID(args.requestID),
210+
tag.Operation(args.operation),
211+
tag.Endpoint(args.endpointName),
212+
tag.WorkflowID(ref.WorkflowKey.WorkflowID),
213+
tag.WorkflowRunID(ref.WorkflowKey.RunID),
214+
tag.AttemptStart(time.Now().UTC()),
215+
tag.Attempt(task.Attempt),
216+
)
217+
if trace := e.HTTPTraceProvider.NewTrace(task.Attempt, traceLogger); trace != nil {
218+
callCtx = httptrace.WithClientTrace(callCtx, trace)
219+
}
220+
}
221+
204222
startTime := time.Now()
205223
var rawResult *nexus.ClientStartOperationResult[*nexus.LazyValue]
206224
var callErr error
@@ -546,6 +564,22 @@ func (e taskExecutor) executeCancelationTask(ctx context.Context, env hsm.Enviro
546564
callCtx, cancel := context.WithTimeout(ctx, callTimeout)
547565
defer cancel()
548566

567+
if e.HTTPTraceProvider != nil {
568+
traceLogger := log.With(e.Logger,
569+
tag.WorkflowNamespace(ns.Name().String()),
570+
tag.RequestID(args.requestID),
571+
tag.Operation(args.operation),
572+
tag.Endpoint(args.endpointName),
573+
tag.WorkflowID(ref.WorkflowKey.WorkflowID),
574+
tag.WorkflowRunID(ref.WorkflowKey.RunID),
575+
tag.AttemptStart(time.Now().UTC()),
576+
tag.Attempt(task.Attempt),
577+
)
578+
if trace := e.HTTPTraceProvider.NewTrace(task.Attempt, traceLogger); trace != nil {
579+
callCtx = httptrace.WithClientTrace(callCtx, trace)
580+
}
581+
}
582+
549583
var callErr error
550584
startTime := time.Now()
551585
if callTimeout < e.Config.MinOperationTimeout(ns.Name().String()) {
@@ -576,9 +610,9 @@ func (e taskExecutor) executeCancelationTask(ctx context.Context, env hsm.Enviro
576610
}
577611

578612
type cancelArgs struct {
579-
service, operation, token, endpointID, endpointName string
580-
scheduledTime time.Time
581-
scheduleToCloseTimeout time.Duration
613+
service, operation, token, endpointID, endpointName, requestID string
614+
scheduledTime time.Time
615+
scheduleToCloseTimeout time.Duration
582616
}
583617

584618
// loadArgsForCancelation loads state from the operation state machine that's the parent of the cancelation machine the
@@ -599,6 +633,7 @@ func (e taskExecutor) loadArgsForCancelation(ctx context.Context, env hsm.Enviro
599633
args.token = op.OperationToken
600634
args.endpointID = op.EndpointId
601635
args.endpointName = op.Endpoint
636+
args.requestID = op.RequestId
602637
args.scheduledTime = op.ScheduledTime.AsTime()
603638
args.scheduleToCloseTimeout = op.ScheduleToCloseTimeout.AsDuration()
604639
return nil

service/history/fx.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"go.temporal.io/server/common/membership"
3636
"go.temporal.io/server/common/metrics"
3737
"go.temporal.io/server/common/namespace"
38+
commonnexus "go.temporal.io/server/common/nexus"
3839
persistenceClient "go.temporal.io/server/common/persistence/client"
3940
"go.temporal.io/server/common/persistence/visibility"
4041
"go.temporal.io/server/common/persistence/visibility/manager"
@@ -93,6 +94,7 @@ var Module = fx.Options(
9394
fx.Provide(ServerProvider),
9495
fx.Provide(NewService),
9596
fx.Provide(ReplicationProgressCacheProvider),
97+
fx.Provide(commonnexus.NewLoggedHTTPClientTraceProvider),
9698
fx.Invoke(ServiceLifetimeHooks),
9799

98100
callbacks.Module,

0 commit comments

Comments
 (0)