Skip to content

Commit 02594b0

Browse files
authored
Add sample of batch executions (#92)
What Add an example on how to batch start activities while holding concurrency limits Why Fanout use cases are common in workflows. However, current Cadence server doesn't support batch user cases well. The "correct" way to not bombard the server is to control the concurrency limits of activities so we don't experience lock contentions on mutable states. Next We might want to expose some APIs in client like workflow.BatchExecuteActivity so users can easily write scaling workflow code.
1 parent 84120a4 commit 02594b0

File tree

2 files changed

+91
-0
lines changed

2 files changed

+91
-0
lines changed

cmd/samples/batch/workflow.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package batch
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"math/rand"
7+
"time"
8+
9+
"go.uber.org/cadence/workflow"
10+
"go.uber.org/multierr"
11+
)
12+
13+
type BatchWorkflowInput struct {
14+
Concurrency int
15+
TotalSize int
16+
}
17+
18+
func BatchWorkflow(ctx workflow.Context, input BatchWorkflowInput) error {
19+
wg := workflow.NewWaitGroup(ctx)
20+
21+
buffered := workflow.NewBufferedChannel(ctx, input.Concurrency)
22+
futures := workflow.NewNamedChannel(ctx, "futures")
23+
24+
var errs error
25+
wg.Add(1)
26+
// task result collector
27+
workflow.Go(ctx, func(ctx workflow.Context) {
28+
defer wg.Done()
29+
for {
30+
var future workflow.Future
31+
ok := futures.Receive(ctx, &future)
32+
if !ok {
33+
break
34+
}
35+
err := future.Get(ctx, nil)
36+
errs = multierr.Append(errs, err)
37+
buffered.Receive(ctx, nil)
38+
}
39+
})
40+
41+
// submit all tasks
42+
for taskID := 0; taskID < input.TotalSize; taskID++ {
43+
taskID := taskID
44+
buffered.Send(ctx, nil)
45+
46+
aCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
47+
ScheduleToStartTimeout: time.Second * 10,
48+
StartToCloseTimeout: time.Second * 10,
49+
})
50+
futures.Send(ctx, workflow.ExecuteActivity(aCtx, BatchActivity, taskID))
51+
}
52+
// close the channel to signal the task result collector that no more tasks are coming
53+
futures.Close()
54+
55+
wg.Wait(ctx)
56+
57+
return errs
58+
}
59+
60+
func BatchActivity(ctx context.Context, taskID int) error {
61+
select {
62+
case <-ctx.Done():
63+
return fmt.Errorf("batch activity %d failed: %w", taskID, ctx.Err())
64+
case <-time.After(time.Duration(rand.Int63n(100))*time.Millisecond + 900*time.Millisecond):
65+
return nil
66+
}
67+
}

cmd/samples/batch/workflow_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package batch
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
"go.uber.org/cadence/testsuite"
8+
)
9+
10+
func Test_BatchWorkflow(t *testing.T) {
11+
testSuite := &testsuite.WorkflowTestSuite{}
12+
env := testSuite.NewTestWorkflowEnvironment()
13+
14+
env.RegisterWorkflow(BatchWorkflow)
15+
env.RegisterActivity(BatchActivity)
16+
17+
env.ExecuteWorkflow(BatchWorkflow, BatchWorkflowInput{
18+
Concurrency: 2,
19+
TotalSize: 10,
20+
})
21+
22+
assert.True(t, env.IsWorkflowCompleted())
23+
assert.Nil(t, env.GetWorkflowError())
24+
}

0 commit comments

Comments
 (0)