Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions internal/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ func WithLocalActivityTask(
dataConverter converter.DataConverter,
interceptors []WorkerInterceptor,
client *WorkflowClient,
workerStopChannel <-chan struct{},
) (context.Context, error) {
if ctx == nil {
ctx = context.Background()
Expand Down Expand Up @@ -386,6 +387,7 @@ func WithLocalActivityTask(
dataConverter: dataConverter,
attempt: task.attempt,
client: client,
workerStopChannel: workerStopChannel,
})
}

Expand Down
4 changes: 2 additions & 2 deletions internal/internal_task_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1828,7 +1828,7 @@ func (t *TaskHandlersTestSuite) TestLocalActivityRetry_Workflow() {
t.True(ok)
taskHandlerImpl.laTunnel = laTunnel

laTaskPoller := newLocalActivityPoller(params, laTunnel, nil, nil)
laTaskPoller := newLocalActivityPoller(params, laTunnel, nil, nil, stopCh)
go func() {
for {
task, _ := laTaskPoller.PollTask()
Expand Down Expand Up @@ -1910,7 +1910,7 @@ func (t *TaskHandlersTestSuite) TestLocalActivityRetry_WorkflowTaskHeartbeatFail
t.True(ok)
taskHandlerImpl.laTunnel = laTunnel

laTaskPoller := newLocalActivityPoller(params, laTunnel, nil, nil)
laTaskPoller := newLocalActivityPoller(params, laTunnel, nil, nil, stopCh)
doneCh := make(chan struct{})
go func() {
// laTaskPoller needs to poll the local activity and process it
Expand Down
21 changes: 13 additions & 8 deletions internal/internal_task_pollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,10 @@ type (

localActivityTaskPoller struct {
basePoller
handler *localActivityTaskHandler
logger log.Logger
laTunnel *localActivityTunnel
handler *localActivityTaskHandler
logger log.Logger
laTunnel *localActivityTunnel
workerStopCh <-chan struct{}
}

localActivityTaskHandler struct {
Expand All @@ -144,6 +145,7 @@ type (
contextPropagators []ContextPropagator
interceptors []WorkerInterceptor
client *WorkflowClient
workerStopChannel <-chan struct{}
}

localActivityResult struct {
Expand Down Expand Up @@ -575,6 +577,7 @@ func newLocalActivityPoller(
laTunnel *localActivityTunnel,
interceptors []WorkerInterceptor,
client *WorkflowClient,
workerStopCh <-chan struct{},
) *localActivityTaskPoller {
handler := &localActivityTaskHandler{
backgroundContext: params.BackgroundContext,
Expand All @@ -584,12 +587,14 @@ func newLocalActivityPoller(
contextPropagators: params.ContextPropagators,
interceptors: interceptors,
client: client,
workerStopChannel: workerStopCh,
}
return &localActivityTaskPoller{
basePoller: basePoller{metricsHandler: params.MetricsHandler, stopC: params.WorkerStopChannel},
handler: handler,
logger: params.Logger,
laTunnel: laTunnel,
basePoller: basePoller{metricsHandler: params.MetricsHandler, stopC: params.WorkerStopChannel},
handler: handler,
logger: params.Logger,
laTunnel: laTunnel,
workerStopCh: workerStopCh,
}
}

Expand Down Expand Up @@ -643,7 +648,7 @@ func (lath *localActivityTaskHandler) executeLocalActivityTask(task *localActivi
)
})
ctx, err := WithLocalActivityTask(lath.backgroundContext, task, lath.logger, lath.metricsHandler,
lath.dataConverter, lath.interceptors, lath.client)
lath.dataConverter, lath.interceptors, lath.client, lath.workerStopChannel)
if err != nil {
return &localActivityResult{task: task, err: fmt.Errorf("failed building context: %w", err)}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ func newWorkflowTaskWorkerInternal(
}

// 2) local activity task poller will poll from laTunnel, and result will be pushed to laTunnel
localActivityTaskPoller := newLocalActivityPoller(laParams, laTunnel, interceptors, client)
localActivityTaskPoller := newLocalActivityPoller(laParams, laTunnel, interceptors, client, stopC)
localActivityWorker := newBaseWorker(baseWorkerOptions{
pollerCount: 1, // 1 poller (from local channel) is enough for local activity
slotSupplier: laParams.Tuner.GetLocalActivitySlotSupplier(),
Expand Down
2 changes: 2 additions & 0 deletions internal/internal_workflow_testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,7 @@ func (env *testWorkflowEnvironmentImpl) executeLocalActivity(
logger: env.logger,
interceptors: env.registry.interceptors,
contextPropagators: env.contextPropagators,
workerStopChannel: env.workerStopChannel,
}

result := taskHandler.executeLocalActivityTask(task)
Expand Down Expand Up @@ -1578,6 +1579,7 @@ func (env *testWorkflowEnvironmentImpl) ExecuteLocalActivity(params ExecuteLocal
dataConverter: env.dataConverter,
contextPropagators: env.contextPropagators,
interceptors: env.registry.interceptors,
workerStopChannel: env.workerStopChannel,
}

env.localActivities[activityID] = task
Expand Down
82 changes: 6 additions & 76 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,6 @@ func (ts *IntegrationTestSuite) SetupTest() {
}

if strings.Contains(ts.T().Name(), "GracefulActivityCompletion") ||
strings.Contains(ts.T().Name(), "GracefulLocalActivityCompletion") ||
strings.Contains(ts.T().Name(), "LocalActivityCompleteWithinGracefulShutdown") ||
strings.Contains(ts.T().Name(), "LocalActivityTaskTimeoutHeartbeat") {
options.WorkerStopTimeout = 10 * time.Second
Expand Down Expand Up @@ -2409,82 +2408,14 @@ func (ts *IntegrationTestSuite) TestGracefulActivityCompletion() {
ts.Equal("stopped", s)
}

func (ts *IntegrationTestSuite) TestGracefulLocalActivityCompletion() {
// FYI, setup of this test allows the worker to wait to stop for 10 seconds
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
localActivityFn := func(ctx context.Context) error {
time.Sleep(100 * time.Millisecond)
return ctx.Err()
}

workflowFn := func(ctx workflow.Context) error {
ctx = workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{
StartToCloseTimeout: 1 * time.Minute,
})
localActivity := workflow.ExecuteLocalActivity(ctx, localActivityFn)
err := localActivity.Get(ctx, nil)
if err != nil {
workflow.GetLogger(ctx).Error("Activity failed.", "Error", err)
}

localActivity = workflow.ExecuteLocalActivity(ctx, localActivityFn)
err = localActivity.Get(ctx, nil)
if err != nil {
workflow.GetLogger(ctx).Error("Second activity failed.", "Error", err)
}

return nil

}

workflowID := "local-activity-stop-" + uuid.NewString()
ts.worker.RegisterWorkflowWithOptions(workflowFn, workflow.RegisterOptions{Name: "local-activity-stop"})
startOptions := client.StartWorkflowOptions{
ID: workflowID,
TaskQueue: ts.taskQueueName,
WorkflowTaskTimeout: 5 * time.Second,
}

// Start workflow
run, err := ts.client.ExecuteWorkflow(ctx, startOptions, workflowFn)
ts.NoError(err)

// Stop the worker
time.Sleep(100 * time.Millisecond)
ts.worker.Stop()
ts.workerStopped = true
time.Sleep(500 * time.Millisecond)

// Look for activity completed from the history
var laCompleted int
var wfeCompleted bool
iter := ts.client.GetWorkflowHistory(ctx, run.GetID(), run.GetRunID(),
false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
for iter.HasNext() {
event, err := iter.Next()
ts.NoError(err)
attributes := event.GetMarkerRecordedEventAttributes()
if event.EventType == enumspb.EVENT_TYPE_MARKER_RECORDED && attributes.MarkerName == "LocalActivity" && attributes.GetFailure() == nil {
laCompleted++
}
if event.EventType == enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED {
wfeCompleted = true
}
}

// Confirm local activity and WFE completed
ts.Equal(2, laCompleted)
ts.True(wfeCompleted)
}

func (ts *IntegrationTestSuite) TestLocalActivityTaskTimeoutHeartbeat() {
// FYI, setup of this test allows the worker to wait to stop for 10 seconds
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

localActivityFn := func(ctx context.Context) error {
// wait for worker shutdown to be started and WorkflowTaskTimeout to be hit
<-activity.GetWorkerStopChannel(ctx)
time.Sleep(1500 * time.Millisecond) // 1.5 seconds
return ctx.Err()
}
Expand Down Expand Up @@ -7777,13 +7708,11 @@ func (ts *IntegrationTestSuite) TestLocalActivityCancelFromWorkerShutdown() {
}

func (ts *IntegrationTestSuite) TestLocalActivityWorkerShutdownNoHeartbeat() {
// FYI, setup of this test allows the worker to wait to stop for 10 seconds
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
localActivityFn := func(ctx context.Context) error {
// TODO: Use GetWorkerStopChannel once https://github.com/temporalio/sdk-go/issues/1963 is fixed
// in this place and other similar tests
time.Sleep(300 * time.Millisecond)
// Wait for the LA to return context canceled, so we can test failed LA will not heartbeat on worker shutdown
time.Sleep(100 * time.Millisecond)
return ctx.Err()
}
workflowFn := func(ctx workflow.Context) error {
Expand Down Expand Up @@ -7852,7 +7781,7 @@ func (ts *IntegrationTestSuite) TestLocalActivityCompleteWithinGracefulShutdown(
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
localActivityFn := func(ctx context.Context) error {
time.Sleep(300 * time.Millisecond)
<-activity.GetWorkerStopChannel(ctx)
return ctx.Err()
}
workflowFn := func(ctx workflow.Context) error {
Expand Down Expand Up @@ -7910,7 +7839,8 @@ func (ts *IntegrationTestSuite) TestLocalActivityCompleteWithinGracefulShutdown(
}
}

// Confirm no heartbeats from local activity
// Confirm no heartbeats from local activity and confirm that LA and workflow have completed successfully within
// graceful shutdown
ts.Equal(1, wftStarted)
ts.Equal(2, laCompleted)
ts.True(wfeCompleted)
Expand Down
Loading