Skip to content

Commit d884d72

Browse files
committed
More continue as new cases
1 parent b0538fb commit d884d72

File tree

3 files changed

+170
-11
lines changed

3 files changed

+170
-11
lines changed

src/test/java/com/uber/cadence/samples/replaytests/HelloPeriodicReplayTest.java

+13-5
Original file line numberDiff line numberDiff line change
@@ -23,19 +23,27 @@
2323

2424
public class HelloPeriodicReplayTest {
2525

26-
// continue-as-new case for replayer tests: Passing
26+
/* Runs a history which ends with WorkflowExecutionContinuedAsNew. Replay fails because of the additional checks done for continue as new case by replayWorkflowHistory(). This should not have any error because it's a valid continue as new case. */
2727
@Test
2828
public void testReplay_continueAsNew() throws Exception {
2929
WorkflowReplayer.replayWorkflowExecutionFromResource(
3030
"replaytests/HelloPeriodic.json", HelloPeriodic.GreetingWorkflowImpl.class);
3131
}
3232

33-
// Continue as new case: change in sleep timer compared to original workflow definition. It should
33+
// Continue as new case: change in frequency compared to original workflow definition by
34+
// increasing number of times greet is hit. It should
3435
// fail. BUT it is currently passing.
3536
@Test
36-
public void testReplay_continueAsNew_timerChange() throws Exception {
37+
public void testReplay_continueAsNew_moreFrequency() throws Exception {
3738
WorkflowReplayer.replayWorkflowExecutionFromResource(
38-
"replaytests/HelloPeriodic.json",
39-
HelloPeriodic_sleepTimerChange.GreetingWorkflowImpl.class);
39+
"replaytests/HelloPeriodic.json", HelloPeriodic_moreFrequency.GreetingWorkflowImpl.class);
40+
}
41+
42+
// Continue as new case: If frequency is changed to lesser number.
43+
// FAIL As expected: It should hit non-determinism case and it is hitting properly.
44+
@Test
45+
public void testReplay_continueAsNew_lessFrequency() throws Exception {
46+
WorkflowReplayer.replayWorkflowExecutionFromResource(
47+
"replaytests/HelloPeriodic.json", HelloPeriodic_lessFrequency.GreetingWorkflowImpl.class);
4048
}
4149
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.samples.replaytests;
19+
20+
import static com.uber.cadence.samples.common.SampleConstants.DOMAIN;
21+
22+
import com.google.common.base.Throwables;
23+
import com.uber.cadence.WorkflowExecution;
24+
import com.uber.cadence.WorkflowIdReusePolicy;
25+
import com.uber.cadence.activity.Activity;
26+
import com.uber.cadence.activity.ActivityOptions;
27+
import com.uber.cadence.client.DuplicateWorkflowException;
28+
import com.uber.cadence.client.WorkflowClient;
29+
import com.uber.cadence.client.WorkflowClientOptions;
30+
import com.uber.cadence.client.WorkflowException;
31+
import com.uber.cadence.client.WorkflowStub;
32+
import com.uber.cadence.internal.compatibility.Thrift2ProtoAdapter;
33+
import com.uber.cadence.internal.compatibility.proto.serviceclient.IGrpcServiceStubs;
34+
import com.uber.cadence.worker.Worker;
35+
import com.uber.cadence.worker.WorkerFactory;
36+
import com.uber.cadence.workflow.Workflow;
37+
import com.uber.cadence.workflow.WorkflowMethod;
38+
import java.time.Duration;
39+
import java.util.Optional;
40+
41+
public class HelloPeriodic_lessFrequency {
42+
43+
static final String TASK_LIST = "HelloPeriodic";
44+
static final String PERIODIC_WORKFLOW_ID = "HelloPeriodic";
45+
46+
public interface GreetingWorkflow {
47+
@WorkflowMethod(
48+
// At most one instance.
49+
workflowId = PERIODIC_WORKFLOW_ID,
50+
// To allow starting workflow with the same ID after the previous one has terminated.
51+
workflowIdReusePolicy = WorkflowIdReusePolicy.AllowDuplicate,
52+
// Adjust this value to the maximum time workflow is expected to run.
53+
// It usually depends on the number of repetitions and interval between them.
54+
executionStartToCloseTimeoutSeconds = 300,
55+
taskList = TASK_LIST
56+
)
57+
void greetPeriodically(String name, Duration delay);
58+
}
59+
60+
public interface GreetingActivities {
61+
void greet(String greeting);
62+
}
63+
64+
public static class GreetingWorkflowImpl implements GreetingWorkflow {
65+
66+
// If we change the value to 10 (compared to 1000 in original case), then non-determinism case
67+
// is not hitting.
68+
private final int CONTINUE_AS_NEW_FREQUENCEY = 1;
69+
70+
private final GreetingActivities activities =
71+
Workflow.newActivityStub(
72+
GreetingActivities.class,
73+
new ActivityOptions.Builder()
74+
.setScheduleToCloseTimeout(Duration.ofSeconds(10))
75+
.build());
76+
77+
/**
78+
* Stub used to terminate this workflow run and create the next one with the same ID atomically.
79+
*/
80+
private final GreetingWorkflow continueAsNew =
81+
Workflow.newContinueAsNewStub(GreetingWorkflow.class);
82+
83+
@Override
84+
public void greetPeriodically(String name, Duration delay) {
85+
// Loop the predefined number of times then continue this workflow as new.
86+
// This is needed to periodically truncate the history size.
87+
for (int i = 0; i < CONTINUE_AS_NEW_FREQUENCEY; i++) {
88+
activities.greet("Hello " + name + "!");
89+
Workflow.sleep(delay);
90+
}
91+
// Current workflow run stops executing after this call.
92+
continueAsNew.greetPeriodically(name, delay);
93+
// unreachable line
94+
}
95+
}
96+
97+
static class GreetingActivitiesImpl implements GreetingActivities {
98+
@Override
99+
public void greet(String greeting) {
100+
System.out.println("From " + Activity.getWorkflowExecution() + ": " + greeting);
101+
}
102+
}
103+
104+
public static void main(String[] args) throws InterruptedException {
105+
// Get a new client
106+
// NOTE: to set a different options, you can do like this:
107+
// ClientOptions.newBuilder().setRpcTimeout(5 * 1000).build();
108+
WorkflowClient workflowClient =
109+
WorkflowClient.newInstance(
110+
new Thrift2ProtoAdapter(IGrpcServiceStubs.newInstance()),
111+
WorkflowClientOptions.newBuilder().setDomain(DOMAIN).build());
112+
// Get worker to poll the task list.
113+
WorkerFactory factory = WorkerFactory.newInstance(workflowClient);
114+
Worker worker = factory.newWorker(TASK_LIST);
115+
// Workflows are stateful. So you need a type to create instances.
116+
worker.registerWorkflowImplementationTypes(GreetingWorkflowImpl.class);
117+
// Activities are stateless and thread safe. So a shared instance is used.
118+
worker.registerActivitiesImplementations(new GreetingActivitiesImpl());
119+
// Start listening to the workflow and activity task lists.
120+
factory.start();
121+
122+
// Start a workflow execution. Usually this is done from another program.
123+
// To ensure that this daemon type workflow is always running try to start it periodically
124+
// ignoring the duplicated exception.
125+
// It is only to protect from application level failures.
126+
// Failures of a workflow worker don't lead to workflow failures.
127+
WorkflowExecution execution = null;
128+
while (true) {
129+
// Print reason of failure of the previous run, before restarting.
130+
if (execution != null) {
131+
WorkflowStub workflow = workflowClient.newUntypedWorkflowStub(execution, Optional.empty());
132+
try {
133+
workflow.getResult(Void.class); //
134+
} catch (WorkflowException e) {
135+
System.out.println("Previous instance failed:\n" + Throwables.getStackTraceAsString(e));
136+
}
137+
}
138+
// New stub instance should be created for each new workflow start.
139+
GreetingWorkflow workflow = workflowClient.newWorkflowStub(GreetingWorkflow.class);
140+
try {
141+
execution =
142+
WorkflowClient.start(workflow::greetPeriodically, "World", Duration.ofSeconds(3));
143+
System.out.println("Started " + execution);
144+
} catch (DuplicateWorkflowException e) {
145+
System.out.println("Still running as " + e.getExecution());
146+
} catch (Throwable e) {
147+
e.printStackTrace();
148+
System.exit(1);
149+
}
150+
// This value is so low just for the sample purpose. In production workflow
151+
// it is usually much higher.
152+
Thread.sleep(10000);
153+
}
154+
}
155+
}

src/test/java/com/uber/cadence/samples/replaytests/HelloPeriodic_sleepTimerChange.java src/test/java/com/uber/cadence/samples/replaytests/HelloPeriodic_moreFrequency.java

+2-6
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
import java.time.Duration;
3939
import java.util.Optional;
4040

41-
public class HelloPeriodic_sleepTimerChange {
41+
public class HelloPeriodic_moreFrequency {
4242

4343
static final String TASK_LIST = "HelloPeriodic";
4444
static final String PERIODIC_WORKFLOW_ID = "HelloPeriodic";
@@ -63,11 +63,7 @@ public interface GreetingActivities {
6363

6464
public static class GreetingWorkflowImpl implements GreetingWorkflow {
6565

66-
/**
67-
* This value is so low just to make the example interesting to watch. In real life you would
68-
* use something like 100 or a value that matches a business cycle. For example if it runs once
69-
* an hour 24 would make sense.
70-
*/
66+
// If we change the value to 1, then non-determinism case will hit.
7167
private final int CONTINUE_AS_NEW_FREQUENCEY = 1000;
7268

7369
private final GreetingActivities activities =

0 commit comments

Comments
 (0)