Skip to content

Commit a1a14b9

Browse files
authored
Fix GetWorkerStopChannel() for Local Activities (#1965)
* Plumb worker stopCh through to GetWorkerStopChannel, change tests to use this to avoid potential race condition * no need for sleep for graceful shutdown tests * Add comment for clarity, combine duplicate tests
1 parent a569259 commit a1a14b9

File tree

6 files changed

+26
-87
lines changed

6 files changed

+26
-87
lines changed

internal/activity.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,7 @@ func WithLocalActivityTask(
340340
dataConverter converter.DataConverter,
341341
interceptors []WorkerInterceptor,
342342
client *WorkflowClient,
343+
workerStopChannel <-chan struct{},
343344
) (context.Context, error) {
344345
if ctx == nil {
345346
ctx = context.Background()
@@ -386,6 +387,7 @@ func WithLocalActivityTask(
386387
dataConverter: dataConverter,
387388
attempt: task.attempt,
388389
client: client,
390+
workerStopChannel: workerStopChannel,
389391
})
390392
}
391393

internal/internal_task_handlers_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1828,7 +1828,7 @@ func (t *TaskHandlersTestSuite) TestLocalActivityRetry_Workflow() {
18281828
t.True(ok)
18291829
taskHandlerImpl.laTunnel = laTunnel
18301830

1831-
laTaskPoller := newLocalActivityPoller(params, laTunnel, nil, nil)
1831+
laTaskPoller := newLocalActivityPoller(params, laTunnel, nil, nil, stopCh)
18321832
go func() {
18331833
for {
18341834
task, _ := laTaskPoller.PollTask()
@@ -1910,7 +1910,7 @@ func (t *TaskHandlersTestSuite) TestLocalActivityRetry_WorkflowTaskHeartbeatFail
19101910
t.True(ok)
19111911
taskHandlerImpl.laTunnel = laTunnel
19121912

1913-
laTaskPoller := newLocalActivityPoller(params, laTunnel, nil, nil)
1913+
laTaskPoller := newLocalActivityPoller(params, laTunnel, nil, nil, stopCh)
19141914
doneCh := make(chan struct{})
19151915
go func() {
19161916
// laTaskPoller needs to poll the local activity and process it

internal/internal_task_pollers.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -131,9 +131,10 @@ type (
131131

132132
localActivityTaskPoller struct {
133133
basePoller
134-
handler *localActivityTaskHandler
135-
logger log.Logger
136-
laTunnel *localActivityTunnel
134+
handler *localActivityTaskHandler
135+
logger log.Logger
136+
laTunnel *localActivityTunnel
137+
workerStopCh <-chan struct{}
137138
}
138139

139140
localActivityTaskHandler struct {
@@ -144,6 +145,7 @@ type (
144145
contextPropagators []ContextPropagator
145146
interceptors []WorkerInterceptor
146147
client *WorkflowClient
148+
workerStopChannel <-chan struct{}
147149
}
148150

149151
localActivityResult struct {
@@ -575,6 +577,7 @@ func newLocalActivityPoller(
575577
laTunnel *localActivityTunnel,
576578
interceptors []WorkerInterceptor,
577579
client *WorkflowClient,
580+
workerStopCh <-chan struct{},
578581
) *localActivityTaskPoller {
579582
handler := &localActivityTaskHandler{
580583
backgroundContext: params.BackgroundContext,
@@ -584,12 +587,14 @@ func newLocalActivityPoller(
584587
contextPropagators: params.ContextPropagators,
585588
interceptors: interceptors,
586589
client: client,
590+
workerStopChannel: workerStopCh,
587591
}
588592
return &localActivityTaskPoller{
589-
basePoller: basePoller{metricsHandler: params.MetricsHandler, stopC: params.WorkerStopChannel},
590-
handler: handler,
591-
logger: params.Logger,
592-
laTunnel: laTunnel,
593+
basePoller: basePoller{metricsHandler: params.MetricsHandler, stopC: params.WorkerStopChannel},
594+
handler: handler,
595+
logger: params.Logger,
596+
laTunnel: laTunnel,
597+
workerStopCh: workerStopCh,
593598
}
594599
}
595600

@@ -643,7 +648,7 @@ func (lath *localActivityTaskHandler) executeLocalActivityTask(task *localActivi
643648
)
644649
})
645650
ctx, err := WithLocalActivityTask(lath.backgroundContext, task, lath.logger, lath.metricsHandler,
646-
lath.dataConverter, lath.interceptors, lath.client)
651+
lath.dataConverter, lath.interceptors, lath.client, lath.workerStopChannel)
647652
if err != nil {
648653
return &localActivityResult{task: task, err: fmt.Errorf("failed building context: %w", err)}
649654
}

internal/internal_worker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,7 @@ func newWorkflowTaskWorkerInternal(
360360
}
361361

362362
// 2) local activity task poller will poll from laTunnel, and result will be pushed to laTunnel
363-
localActivityTaskPoller := newLocalActivityPoller(laParams, laTunnel, interceptors, client)
363+
localActivityTaskPoller := newLocalActivityPoller(laParams, laTunnel, interceptors, client, stopC)
364364
localActivityWorker := newBaseWorker(baseWorkerOptions{
365365
pollerCount: 1, // 1 poller (from local channel) is enough for local activity
366366
slotSupplier: laParams.Tuner.GetLocalActivitySlotSupplier(),

internal/internal_workflow_testsuite.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -784,6 +784,7 @@ func (env *testWorkflowEnvironmentImpl) executeLocalActivity(
784784
logger: env.logger,
785785
interceptors: env.registry.interceptors,
786786
contextPropagators: env.contextPropagators,
787+
workerStopChannel: env.workerStopChannel,
787788
}
788789

789790
result := taskHandler.executeLocalActivityTask(task)
@@ -1578,6 +1579,7 @@ func (env *testWorkflowEnvironmentImpl) ExecuteLocalActivity(params ExecuteLocal
15781579
dataConverter: env.dataConverter,
15791580
contextPropagators: env.contextPropagators,
15801581
interceptors: env.registry.interceptors,
1582+
workerStopChannel: env.workerStopChannel,
15811583
}
15821584

15831585
env.localActivities[activityID] = task

test/integration_test.go

Lines changed: 6 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,6 @@ func (ts *IntegrationTestSuite) SetupTest() {
227227
}
228228

229229
if strings.Contains(ts.T().Name(), "GracefulActivityCompletion") ||
230-
strings.Contains(ts.T().Name(), "GracefulLocalActivityCompletion") ||
231230
strings.Contains(ts.T().Name(), "LocalActivityCompleteWithinGracefulShutdown") ||
232231
strings.Contains(ts.T().Name(), "LocalActivityTaskTimeoutHeartbeat") {
233232
options.WorkerStopTimeout = 10 * time.Second
@@ -2409,82 +2408,14 @@ func (ts *IntegrationTestSuite) TestGracefulActivityCompletion() {
24092408
ts.Equal("stopped", s)
24102409
}
24112410

2412-
func (ts *IntegrationTestSuite) TestGracefulLocalActivityCompletion() {
2413-
// FYI, setup of this test allows the worker to wait to stop for 10 seconds
2414-
ctx, cancel := context.WithCancel(context.Background())
2415-
defer cancel()
2416-
localActivityFn := func(ctx context.Context) error {
2417-
time.Sleep(100 * time.Millisecond)
2418-
return ctx.Err()
2419-
}
2420-
2421-
workflowFn := func(ctx workflow.Context) error {
2422-
ctx = workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{
2423-
StartToCloseTimeout: 1 * time.Minute,
2424-
})
2425-
localActivity := workflow.ExecuteLocalActivity(ctx, localActivityFn)
2426-
err := localActivity.Get(ctx, nil)
2427-
if err != nil {
2428-
workflow.GetLogger(ctx).Error("Activity failed.", "Error", err)
2429-
}
2430-
2431-
localActivity = workflow.ExecuteLocalActivity(ctx, localActivityFn)
2432-
err = localActivity.Get(ctx, nil)
2433-
if err != nil {
2434-
workflow.GetLogger(ctx).Error("Second activity failed.", "Error", err)
2435-
}
2436-
2437-
return nil
2438-
2439-
}
2440-
2441-
workflowID := "local-activity-stop-" + uuid.NewString()
2442-
ts.worker.RegisterWorkflowWithOptions(workflowFn, workflow.RegisterOptions{Name: "local-activity-stop"})
2443-
startOptions := client.StartWorkflowOptions{
2444-
ID: workflowID,
2445-
TaskQueue: ts.taskQueueName,
2446-
WorkflowTaskTimeout: 5 * time.Second,
2447-
}
2448-
2449-
// Start workflow
2450-
run, err := ts.client.ExecuteWorkflow(ctx, startOptions, workflowFn)
2451-
ts.NoError(err)
2452-
2453-
// Stop the worker
2454-
time.Sleep(100 * time.Millisecond)
2455-
ts.worker.Stop()
2456-
ts.workerStopped = true
2457-
time.Sleep(500 * time.Millisecond)
2458-
2459-
// Look for activity completed from the history
2460-
var laCompleted int
2461-
var wfeCompleted bool
2462-
iter := ts.client.GetWorkflowHistory(ctx, run.GetID(), run.GetRunID(),
2463-
false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
2464-
for iter.HasNext() {
2465-
event, err := iter.Next()
2466-
ts.NoError(err)
2467-
attributes := event.GetMarkerRecordedEventAttributes()
2468-
if event.EventType == enumspb.EVENT_TYPE_MARKER_RECORDED && attributes.MarkerName == "LocalActivity" && attributes.GetFailure() == nil {
2469-
laCompleted++
2470-
}
2471-
if event.EventType == enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED {
2472-
wfeCompleted = true
2473-
}
2474-
}
2475-
2476-
// Confirm local activity and WFE completed
2477-
ts.Equal(2, laCompleted)
2478-
ts.True(wfeCompleted)
2479-
}
2480-
24812411
func (ts *IntegrationTestSuite) TestLocalActivityTaskTimeoutHeartbeat() {
24822412
// FYI, setup of this test allows the worker to wait to stop for 10 seconds
24832413
ctx, cancel := context.WithCancel(context.Background())
24842414
defer cancel()
24852415

24862416
localActivityFn := func(ctx context.Context) error {
24872417
// wait for worker shutdown to be started and WorkflowTaskTimeout to be hit
2418+
<-activity.GetWorkerStopChannel(ctx)
24882419
time.Sleep(1500 * time.Millisecond) // 1.5 seconds
24892420
return ctx.Err()
24902421
}
@@ -7777,13 +7708,11 @@ func (ts *IntegrationTestSuite) TestLocalActivityCancelFromWorkerShutdown() {
77777708
}
77787709

77797710
func (ts *IntegrationTestSuite) TestLocalActivityWorkerShutdownNoHeartbeat() {
7780-
// FYI, setup of this test allows the worker to wait to stop for 10 seconds
77817711
ctx, cancel := context.WithCancel(context.Background())
77827712
defer cancel()
77837713
localActivityFn := func(ctx context.Context) error {
7784-
// TODO: Use GetWorkerStopChannel once https://github.com/temporalio/sdk-go/issues/1963 is fixed
7785-
// in this place and other similar tests
7786-
time.Sleep(300 * time.Millisecond)
7714+
// Wait for the LA to return context canceled, so we can test failed LA will not heartbeat on worker shutdown
7715+
time.Sleep(100 * time.Millisecond)
77877716
return ctx.Err()
77887717
}
77897718
workflowFn := func(ctx workflow.Context) error {
@@ -7852,7 +7781,7 @@ func (ts *IntegrationTestSuite) TestLocalActivityCompleteWithinGracefulShutdown(
78527781
ctx, cancel := context.WithCancel(context.Background())
78537782
defer cancel()
78547783
localActivityFn := func(ctx context.Context) error {
7855-
time.Sleep(300 * time.Millisecond)
7784+
<-activity.GetWorkerStopChannel(ctx)
78567785
return ctx.Err()
78577786
}
78587787
workflowFn := func(ctx workflow.Context) error {
@@ -7910,7 +7839,8 @@ func (ts *IntegrationTestSuite) TestLocalActivityCompleteWithinGracefulShutdown(
79107839
}
79117840
}
79127841

7913-
// Confirm no heartbeats from local activity
7842+
// Confirm no heartbeats from local activity and confirm that LA and workflow have completed successfully within
7843+
// graceful shutdown
79147844
ts.Equal(1, wftStarted)
79157845
ts.Equal(2, laCompleted)
79167846
ts.True(wfeCompleted)

0 commit comments

Comments
 (0)