Skip to content

Commit 9cdf22b

Browse files
poojamatrichardmcclellanrjuliano
authored
feat(datastore): Retry sync (#1414)
* Added retry logic on SyncProcessor. https://github.com/orgs/aws-amplify/projects/62#card-56626619 * Updated retry on syncprocessor to retry only on recoverable error. Updated retry on syncprocessor to retry only on recoverable error. * Adding irrecoverable error to APIException * Updating unit test * Making retry class generic * Update settings.gradle Co-authored-by: Richard McClellan <[email protected]> * Made retrier generic but still using Rx Java * Made retrier generic but still using Rx Java * Update aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/RequestRetry.kt Co-authored-by: Richard McClellan <[email protected]> * PR suggestions * Update aws-api/src/main/java/com/amplifyframework/api/aws/AppSyncGraphQLOperation.java Co-authored-by: Richard McClellan <[email protected]> * PR suggestions * PR suggestions * PR suggestions * More unit test * Update aws-datastore/src/main/java/com/amplifyframework/datastore/appsync/AppSyncClient.java Co-authored-by: Rafael Juliano <[email protected]> * Update aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/RetryHandler.kt Co-authored-by: Rafael Juliano <[email protected]> * PR suggestions! * checkstyle fixes * PR Suggestions * Removing timeout from Orchestrator blocking await. * Checkstyle fixes. * Checkstyle fixes. * Checkstyle fix. * Checkstyle fix. * Adding test for retry on subscription disposed. * Adding maxDelay to retry logic. * Added isDisposed check on the emitter in retryhandler before retrying and emitting error. * Added retry on sync enable as a property on builder in awsdatastore plugin constructor. * adding retry enable as a builder method in constructor * checkstyle * Update aws-datastore/src/main/java/com/amplifyframework/datastore/AWSDataStorePlugin.java Co-authored-by: Rafael Juliano <[email protected]> * Added code to get syncRetry from userprovidedconfiguration.getDoSyncRetry() * Push to start git checks. Co-authored-by: Richard McClellan <[email protected]> Co-authored-by: Rafael Juliano <[email protected]>
1 parent c241e51 commit 9cdf22b

File tree

18 files changed

+547
-45
lines changed

18 files changed

+547
-45
lines changed

aws-api/src/main/java/com/amplifyframework/api/aws/AppSyncGraphQLOperation.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@
5050
public final class AppSyncGraphQLOperation<R> extends GraphQLOperation<R> {
5151
private static final Logger LOG = Amplify.Logging.forNamespace("amplify:aws-api");
5252
private static final String CONTENT_TYPE = "application/json";
53-
53+
private static final int START_OF_CLIENT_ERROR_CODE = 400;
54+
private static final int END_OF_CLIENT_ERROR_CODE = 499;
5455
private final String endpoint;
5556
private final OkHttpClient client;
5657
private final Consumer<GraphQLResponse<R>> onResponse;
@@ -142,6 +143,11 @@ public void onResponse(@NonNull Call call, @NonNull Response response) {
142143
return;
143144
}
144145
}
146+
if (response.code() >= START_OF_CLIENT_ERROR_CODE && response.code() <= END_OF_CLIENT_ERROR_CODE) {
147+
onFailure.accept(new ApiException
148+
.NonRetryableException("OkHttp client request failed.", "Irrecoverable error")
149+
);
150+
}
145151

146152
try {
147153
onResponse.accept(wrapResponse(jsonResponse));

aws-datastore/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@ dependencies {
3535
testImplementation dependency.mockito
3636
testImplementation dependency.robolectric
3737
testImplementation dependency.androidx.test.core
38+
testImplementation 'io.mockk:mockk:1.11.0'
3839

40+
androidTestImplementation dependency.mockito
3941
androidTestImplementation project(path: ':testmodels')
4042
androidTestImplementation project(path: ':testutils')
4143
androidTestImplementation project(path: ':aws-api')

aws-datastore/src/main/java/com/amplifyframework/datastore/AWSDataStorePlugin.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ public final class AWSDataStorePlugin extends DataStorePlugin<Void> {
8585

8686
private final AuthModeStrategyType authModeStrategy;
8787

88+
private final boolean isSyncRetryEnabled;
89+
8890
private AWSDataStorePlugin(
8991
@NonNull ModelProvider modelProvider,
9092
@NonNull ModelSchemaRegistry modelSchemaRegistry,
@@ -93,16 +95,19 @@ private AWSDataStorePlugin(
9395
this.sqliteStorageAdapter = SQLiteStorageAdapter.forModels(modelSchemaRegistry, modelProvider);
9496
this.categoryInitializationsPending = new CountDownLatch(1);
9597
this.authModeStrategy = AuthModeStrategyType.DEFAULT;
98+
this.userProvidedConfiguration = userProvidedConfiguration;
99+
this.isSyncRetryEnabled = userProvidedConfiguration != null && userProvidedConfiguration.getDoSyncRetry();
96100
// Used to interrogate plugins, to understand if sync should be automatically turned on
97101
this.orchestrator = new Orchestrator(
98102
modelProvider,
99103
modelSchemaRegistry,
100104
sqliteStorageAdapter,
101105
AppSyncClient.via(api),
102106
() -> pluginConfiguration,
103-
() -> api.getPlugins().isEmpty() ? Orchestrator.State.LOCAL_ONLY : Orchestrator.State.SYNC_VIA_API
107+
() -> api.getPlugins().isEmpty() ? Orchestrator.State.LOCAL_ONLY : Orchestrator.State.SYNC_VIA_API,
108+
isSyncRetryEnabled
104109
);
105-
this.userProvidedConfiguration = userProvidedConfiguration;
110+
106111
}
107112

108113
private AWSDataStorePlugin(@NonNull Builder builder) throws DataStoreException {
@@ -115,6 +120,7 @@ private AWSDataStorePlugin(@NonNull Builder builder) throws DataStoreException {
115120
this.authModeStrategy = builder.authModeStrategy == null ?
116121
AuthModeStrategyType.DEFAULT :
117122
builder.authModeStrategy;
123+
this.isSyncRetryEnabled = builder.isSyncRetryEnabled;
118124
ApiCategory api = builder.apiCategory == null ? Amplify.API : builder.apiCategory;
119125
this.userProvidedConfiguration = builder.dataStoreConfiguration;
120126
this.sqliteStorageAdapter = builder.storageAdapter == null ?
@@ -129,7 +135,8 @@ private AWSDataStorePlugin(@NonNull Builder builder) throws DataStoreException {
129135
sqliteStorageAdapter,
130136
AppSyncClient.via(api, this.authModeStrategy),
131137
() -> pluginConfiguration,
132-
() -> api.getPlugins().isEmpty() ? Orchestrator.State.LOCAL_ONLY : Orchestrator.State.SYNC_VIA_API
138+
() -> api.getPlugins().isEmpty() ? Orchestrator.State.LOCAL_ONLY : Orchestrator.State.SYNC_VIA_API,
139+
isSyncRetryEnabled
133140
);
134141
}
135142

@@ -626,6 +633,7 @@ public static final class Builder {
626633
private ApiCategory apiCategory;
627634
private AuthModeStrategyType authModeStrategy;
628635
private LocalStorageAdapter storageAdapter;
636+
private boolean isSyncRetryEnabled;
629637

630638
private Builder() {}
631639

@@ -693,6 +701,16 @@ public Builder authModeStrategy(AuthModeStrategyType authModeStrategy) {
693701
return this;
694702
}
695703

704+
/**
705+
* Enables Retry on DataStore sync engine.
706+
* @param isSyncRetryEnabled is sync retry enabled.
707+
* @return An implementation of the {@link ModelProvider} interface.
708+
*/
709+
public Builder isSyncRetryEnabled(Boolean isSyncRetryEnabled) {
710+
this.isSyncRetryEnabled = isSyncRetryEnabled;
711+
return this;
712+
}
713+
696714
/**
697715
* Builds the DataStore plugin.
698716
* @return An instance of the DataStore plugin ready for use.

aws-datastore/src/main/java/com/amplifyframework/datastore/DataStoreConfiguration.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,14 @@ public final class DataStoreConfiguration {
4343
static final int DEFAULT_SYNC_MAX_RECORDS = 10_000;
4444
@VisibleForTesting
4545
static final int DEFAULT_SYNC_PAGE_SIZE = 1_000;
46+
@VisibleForTesting
47+
static final boolean DEFAULT_DO_SYNC_RETRY = false;
4648

4749
private final DataStoreErrorHandler errorHandler;
4850
private final DataStoreConflictHandler conflictHandler;
4951
private final Integer syncMaxRecords;
5052
private final Integer syncPageSize;
53+
private final boolean doSyncRetry;
5154
private final Map<String, DataStoreSyncExpression> syncExpressions;
5255
private final Long syncIntervalInMinutes;
5356

@@ -58,6 +61,7 @@ private DataStoreConfiguration(Builder builder) {
5861
this.syncPageSize = builder.syncPageSize;
5962
this.syncIntervalInMinutes = builder.syncIntervalInMinutes;
6063
this.syncExpressions = builder.syncExpressions;
64+
this.doSyncRetry = builder.doSyncRetry;
6165
}
6266

6367
/**
@@ -111,6 +115,7 @@ public static DataStoreConfiguration defaults() throws DataStoreException {
111115
.syncInterval(DEFAULT_SYNC_INTERVAL_MINUTES, TimeUnit.MINUTES)
112116
.syncPageSize(DEFAULT_SYNC_PAGE_SIZE)
113117
.syncMaxRecords(DEFAULT_SYNC_MAX_RECORDS)
118+
.doSyncRetry(DEFAULT_DO_SYNC_RETRY)
114119
.build();
115120
}
116121

@@ -174,6 +179,15 @@ public Integer getSyncPageSize() {
174179
return this.syncPageSize;
175180
}
176181

182+
/**
183+
* Gets the boolean for enabling retry on sync failure
184+
* a sync operation.
185+
* @return Desired size of a page of results from an AppSync sync response
186+
*/
187+
public boolean getDoSyncRetry() {
188+
return this.doSyncRetry;
189+
}
190+
177191
/**
178192
* Returns the Map of all {@link DataStoreSyncExpression}s used to filter data received from AppSync, either during
179193
* a sync or over the real-time subscription.
@@ -247,6 +261,7 @@ public static final class Builder {
247261
private Long syncIntervalInMinutes;
248262
private Integer syncMaxRecords;
249263
private Integer syncPageSize;
264+
private boolean doSyncRetry;
250265
private Map<String, DataStoreSyncExpression> syncExpressions;
251266
private boolean ensureDefaults;
252267
private JSONObject pluginJson;
@@ -313,6 +328,17 @@ public Builder syncMaxRecords(@IntRange(from = 0) Integer syncMaxRecords) {
313328
return Builder.this;
314329
}
315330

331+
/**
332+
* Sets the retry enabled on datastore sync.
333+
* @param doSyncRetry Is retry enabled on datastore sync
334+
* @return Current builder instance
335+
*/
336+
@NonNull
337+
public Builder doSyncRetry(boolean doSyncRetry) {
338+
this.doSyncRetry = doSyncRetry;
339+
return Builder.this;
340+
}
341+
316342
/**
317343
* Sets the number of items requested in each page of sync results.
318344
* @param syncPageSize Number of items requested per page in sync operation
@@ -410,6 +436,7 @@ private void applyUserProvidedConfiguration() {
410436
syncMaxRecords = getValueOrDefault(userProvidedConfiguration.getSyncMaxRecords(), syncMaxRecords);
411437
syncPageSize = getValueOrDefault(userProvidedConfiguration.getSyncPageSize(), syncPageSize);
412438
syncExpressions = userProvidedConfiguration.getSyncExpressions();
439+
doSyncRetry = getValueOrDefault(userProvidedConfiguration.getDoSyncRetry(), doSyncRetry);
413440
}
414441

415442
private static <T> T getValueOrDefault(T value, T defaultValue) {

aws-datastore/src/main/java/com/amplifyframework/datastore/appsync/AppSyncClient.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -119,9 +119,8 @@ public <T extends Model> Cancelable sync(
119119
};
120120
final Consumer<ApiException> failureConsumer =
121121
failure -> onFailure.accept(new DataStoreException(
122-
"Failure performing sync query to AppSync.",
123-
failure, AmplifyException.TODO_RECOVERY_SUGGESTION
124-
));
122+
"Failure performing sync query to AppSync.",
123+
failure, AmplifyException.TODO_RECOVERY_SUGGESTION));
125124

126125
final Cancelable cancelable = api.query(request, responseConsumer, failureConsumer);
127126
if (cancelable != null) {

aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/Orchestrator.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -84,14 +84,16 @@ public final class Orchestrator {
8484
* The reference to the variable returned by the provider only get set after the plugin's
8585
* {@link AWSDataStorePlugin#configure(JSONObject, Context)} is invoked by Amplify.
8686
* @param targetState The desired state of operation - online, or offline
87+
* @param isSyncRetryEnabled enable or disable the SyncProcessor retry
8788
*/
8889
public Orchestrator(
8990
@NonNull final ModelProvider modelProvider,
9091
@NonNull final ModelSchemaRegistry modelSchemaRegistry,
9192
@NonNull final LocalStorageAdapter localStorageAdapter,
9293
@NonNull final AppSync appSync,
9394
@NonNull final DataStoreConfigurationProvider dataStoreConfigurationProvider,
94-
@NonNull final Supplier<State> targetState) {
95+
@NonNull final Supplier<State> targetState,
96+
final boolean isSyncRetryEnabled) {
9597
Objects.requireNonNull(modelSchemaRegistry);
9698
Objects.requireNonNull(modelProvider);
9799
Objects.requireNonNull(appSync);
@@ -120,6 +122,8 @@ public Orchestrator(
120122
.merger(merger)
121123
.dataStoreConfigurationProvider(dataStoreConfigurationProvider)
122124
.queryPredicateProvider(queryPredicateProvider)
125+
.retryHandler(new RetryHandler())
126+
.isSyncRetryEnabled(isSyncRetryEnabled)
123127
.build();
124128
this.subscriptionProcessor = SubscriptionProcessor.builder()
125129
.appSync(appSync)
@@ -264,15 +268,12 @@ private void transitionToApiSync() throws DataStoreException {
264268
private void startObservingStorageChanges() throws DataStoreException {
265269
LOG.info("Starting to observe local storage changes.");
266270
try {
267-
boolean subscribed = mutationOutbox.load()
271+
mutationOutbox.load()
268272
.andThen(Completable.create(emitter -> {
269273
storageObserver.startObservingStorageChanges(emitter::onComplete);
270274
LOG.info("Setting currentState to LOCAL_ONLY");
271275
currentState.set(State.LOCAL_ONLY);
272-
})).blockingAwait(LOCAL_OP_TIMEOUT_SECONDS, TimeUnit.SECONDS);
273-
if (!subscribed) {
274-
throw new TimeoutException("Timed out while preparing local-only mode.");
275-
}
276+
})).blockingAwait();
276277
} catch (Throwable throwable) {
277278
throw new DataStoreException("Timed out while starting to observe storage changes.",
278279
throwable,
@@ -292,7 +293,6 @@ private void stopObservingStorageChanges() {
292293

293294
/**
294295
* Start syncing models to and from a remote API.
295-
* @return A Completable that succeeds when API sync is enabled.
296296
*/
297297
private void startApiSync() {
298298
LOG.info("Setting currentState to SYNC_VIA_API");
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package com.amplifyframework.datastore.syncengine;
17+
18+
import com.amplifyframework.core.Amplify;
19+
import com.amplifyframework.datastore.utils.ErrorInspector;
20+
import com.amplifyframework.logging.Logger;
21+
22+
import java.util.List;
23+
import java.util.concurrent.TimeUnit;
24+
25+
import io.reactivex.rxjava3.core.Single;
26+
import io.reactivex.rxjava3.core.SingleEmitter;
27+
28+
/**
29+
* Class for retrying call on failure on a single.
30+
*/
31+
public class RetryHandler {
32+
33+
private static final Logger LOG = Amplify.Logging.forNamespace("amplify:aws-datastore");
34+
private static final int MAX_EXPONENT_VALUE = 8;
35+
private static final int JITTER_FACTOR_VALUE = 100;
36+
private static final int MAX_ATTEMPTS_VALUE = 3;
37+
private static final int MAX_DELAY_S_VALUE = 5 * 60;
38+
private final int maxExponent;
39+
private final int jitterFactor;
40+
private final int maxAttempts;
41+
private final int maxDelayS;
42+
43+
/**
44+
* Constructor to inject constants for unit testing.
45+
* @param maxExponent maxExponent backoff can go to.
46+
* @param jitterFactor jitterFactor for backoff.
47+
* @param maxAttempts max attempt for retrying.
48+
* @param maxDelayS max delay for retrying.
49+
*/
50+
public RetryHandler(int maxExponent,
51+
int jitterFactor,
52+
int maxAttempts,
53+
int maxDelayS) {
54+
55+
this.maxExponent = maxExponent;
56+
this.jitterFactor = jitterFactor;
57+
this.maxAttempts = maxAttempts;
58+
this.maxDelayS = maxDelayS;
59+
}
60+
61+
/**
62+
* Parameter less constructor.
63+
*/
64+
public RetryHandler() {
65+
maxExponent = MAX_EXPONENT_VALUE;
66+
jitterFactor = JITTER_FACTOR_VALUE;
67+
maxAttempts = MAX_ATTEMPTS_VALUE;
68+
maxDelayS = MAX_DELAY_S_VALUE;
69+
}
70+
71+
/**
72+
* retry.
73+
* @param single single to be retried.
74+
* @param skipExceptions exceptions which should not be retried.
75+
* @param <T> The type for single.
76+
* @return single of type T.
77+
*/
78+
public <T> Single<T> retry(Single<T> single, List<Class<? extends Throwable>> skipExceptions) {
79+
return Single.create(emitter -> call(single, emitter, 0L, maxAttempts, skipExceptions));
80+
}
81+
82+
@SuppressWarnings("ResultOfMethodCallIgnored")
83+
private <T> void call(
84+
Single<T> single,
85+
SingleEmitter<T> emitter,
86+
Long delayInSeconds,
87+
int attemptsLeft,
88+
List<Class<? extends Throwable>> skipExceptions) {
89+
single.delaySubscription(delayInSeconds, TimeUnit.SECONDS)
90+
.subscribe(emitter::onSuccess, error -> {
91+
if (!emitter.isDisposed()) {
92+
LOG.verbose("Retry attempts left " + attemptsLeft + ". exception type:" + error.getClass());
93+
if (attemptsLeft == 0 || ErrorInspector.contains(error, skipExceptions)) {
94+
emitter.onError(error);
95+
} else {
96+
call(single, emitter, jitteredDelaySec(attemptsLeft),
97+
attemptsLeft - 1, skipExceptions);
98+
}
99+
} else {
100+
LOG.verbose("The subscribing channel is disposed.");
101+
}
102+
});
103+
}
104+
105+
106+
/**
107+
* Method returns jittered delay time in seconds.
108+
* @param attemptsLeft number of attempts left.
109+
* @return delay in seconds.
110+
*/
111+
long jitteredDelaySec(int attemptsLeft) {
112+
int numAttempt = maxAttempts - (maxAttempts - attemptsLeft);
113+
double waitTimeSeconds =
114+
Math.min(maxDelayS, Math.pow(2, ((numAttempt) % maxExponent))
115+
+ jitterFactor * Math.random());
116+
LOG.debug("Wait time is " + waitTimeSeconds + " seconds before retrying");
117+
return (long) waitTimeSeconds;
118+
}
119+
}

0 commit comments

Comments
 (0)