Skip to content

Commit 0d560a6

Browse files
authored
fix(datastore) Race condition fix and other stability-related fixes (#599)
* Don't try to initialize orchestrator in beforeOperation * Update aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/Orchestrator.java * Refactored some stop/start logic * Retry logic for subscription processor. Modified start/stop sequence * Add new hub event message types * Changes related to subscription connectivity problems (#615) * Adding basic unit tests for retry handler
1 parent 8a6e133 commit 0d560a6

File tree

17 files changed

+768
-226
lines changed

17 files changed

+768
-226
lines changed

aws-api/src/main/java/com/amplifyframework/api/aws/SubscriptionEndpoint.java

Lines changed: 230 additions & 149 deletions
Large diffs are not rendered by default.

aws-api/src/main/java/com/amplifyframework/api/aws/SubscriptionOperation.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828

2929
import java.util.Objects;
3030
import java.util.concurrent.ExecutorService;
31+
import java.util.concurrent.Future;
32+
import java.util.concurrent.atomic.AtomicBoolean;
3133

3234
final class SubscriptionOperation<T> extends GraphQLOperation<T> {
3335
private static final Logger LOG = Amplify.Logging.forNamespace("amplify:aws-api");
@@ -38,9 +40,10 @@ final class SubscriptionOperation<T> extends GraphQLOperation<T> {
3840
private final Consumer<GraphQLResponse<T>> onNextItem;
3941
private final Consumer<ApiException> onSubscriptionError;
4042
private final Action onSubscriptionComplete;
43+
private final AtomicBoolean canceled;
4144

4245
private String subscriptionId;
43-
private boolean canceled;
46+
private Future<?> subscriptionFuture;
4447

4548
@SuppressWarnings("ParameterNumber")
4649
private SubscriptionOperation(
@@ -59,8 +62,7 @@ private SubscriptionOperation(
5962
this.onSubscriptionError = onSubscriptionError;
6063
this.onSubscriptionComplete = onSubscriptionComplete;
6164
this.executorService = executorService;
62-
this.subscriptionId = null;
63-
this.canceled = false;
65+
this.canceled = new AtomicBoolean(false);
6466
}
6567

6668
@NonNull
@@ -70,13 +72,13 @@ static <T> SubscriptionManagerStep<T> builder() {
7072

7173
@Override
7274
public synchronized void start() {
73-
if (canceled) {
75+
if (canceled.get()) {
7476
onSubscriptionError.accept(new ApiException(
7577
"Operation already canceled.", "Don't cancel the subscription before starting it!"
7678
));
7779
return;
7880
}
79-
executorService.submit(() -> {
81+
subscriptionFuture = executorService.submit(() -> {
8082
LOG.debug("Requesting subscription: " + getRequest().getContent());
8183
subscriptionEndpoint.requestSubscription(
8284
getRequest(),
@@ -85,21 +87,31 @@ public synchronized void start() {
8587
onSubscriptionStart.accept(subscriptionId);
8688
},
8789
onNextItem,
88-
onSubscriptionError,
90+
apiException -> {
91+
// Guard against calling something that's been cancelled already.
92+
if (!canceled.get()) {
93+
canceled.set(true);
94+
onSubscriptionError.accept(apiException);
95+
}
96+
},
8997
onSubscriptionComplete
9098
);
9199
});
92100
}
93101

94102
@Override
95103
public synchronized void cancel() {
96-
if (subscriptionId != null && !canceled) {
97-
canceled = true;
104+
if (subscriptionId != null && !canceled.get()) {
105+
canceled.set(true);
98106
try {
99107
subscriptionEndpoint.releaseSubscription(subscriptionId);
100108
} catch (ApiException exception) {
101109
onSubscriptionError.accept(exception);
102110
}
111+
} else if (subscriptionFuture != null && subscriptionFuture.cancel(true)) {
112+
LOG.debug("Subscription attempt was canceled.");
113+
} else {
114+
LOG.debug("Nothing to cancel. Subscription not yet created.");
103115
}
104116
}
105117

aws-datastore/src/main/java/com/amplifyframework/datastore/AWSDataStorePlugin.java

Lines changed: 67 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -52,15 +52,18 @@
5252
import java.util.Objects;
5353
import java.util.concurrent.CountDownLatch;
5454
import java.util.concurrent.TimeUnit;
55+
import java.util.concurrent.atomic.AtomicBoolean;
5556

5657
import io.reactivex.Completable;
58+
import io.reactivex.schedulers.Schedulers;
5759

5860
/**
5961
* An AWS implementation of the {@link DataStorePlugin}.
6062
*/
6163
public final class AWSDataStorePlugin extends DataStorePlugin<Void> {
6264
private static final Logger LOG = Amplify.Logging.forNamespace("amplify:aws-datastore");
6365
private static final long PLUGIN_INIT_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(5);
66+
private static final long PLUGIN_TERMINATE_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(5);
6467
// Reference to an implementation of the Local Storage Adapter that
6568
// manages the persistence of data on-device.
6669
private final LocalStorageAdapter sqliteStorageAdapter;
@@ -72,6 +75,8 @@ public final class AWSDataStorePlugin extends DataStorePlugin<Void> {
7275
// Keeps track of whether of not the category is initialized yet
7376
private final CountDownLatch categoryInitializationsPending;
7477

78+
private final AtomicBoolean isOrchestratorReady;
79+
7580
// Used to interrogate plugins, to understand if sync should be automatically turned on
7681
private final ApiCategory api;
7782

@@ -90,6 +95,7 @@ private AWSDataStorePlugin(
9095
@Nullable DataStoreConfiguration userProvidedConfiguration) {
9196
this.sqliteStorageAdapter = SQLiteStorageAdapter.forModels(modelSchemaRegistry, modelProvider);
9297
this.categoryInitializationsPending = new CountDownLatch(1);
98+
this.isOrchestratorReady = new AtomicBoolean(false);
9399
this.api = api;
94100
this.orchestrator = new Orchestrator(
95101
modelProvider,
@@ -192,13 +198,18 @@ public void configure(
192198
@Override
193199
public void initialize(@NonNull Context context) throws AmplifyException {
194200
Throwable initError = initializeStorageAdapter(context)
195-
.andThen(initializeOrchestrator())
196201
.blockingGet(PLUGIN_INIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
197202
if (initError != null) {
198203
throw new AmplifyException("Failed to initialize the local storage adapter for the DataStore plugin.",
199204
initError,
200205
AmplifyException.TODO_RECOVERY_SUGGESTION);
201206
}
207+
// Kick off orchestrator asynchronously.
208+
synchronized (isOrchestratorReady) {
209+
initializeOrchestrator()
210+
.subscribeOn(Schedulers.io())
211+
.subscribe();
212+
}
202213
}
203214

204215
/**
@@ -219,8 +230,13 @@ private Completable initializeStorageAdapter(Context context) {
219230
*/
220231
@SuppressWarnings("unused")
221232
synchronized void terminate() throws AmplifyException {
222-
orchestrator.stop();
223-
sqliteStorageAdapter.terminate();
233+
Throwable throwable = orchestrator.stop()
234+
.andThen(
235+
Completable.fromAction(sqliteStorageAdapter::terminate)
236+
).blockingGet(PLUGIN_TERMINATE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
237+
if (throwable != null) {
238+
LOG.warn("An error occurred while terminating the DataStore plugin.", throwable);
239+
}
224240
}
225241

226242
/**
@@ -425,31 +441,66 @@ public <T extends Model> void observe(
425441
@Override
426442
public void clear(@NonNull Action onComplete,
427443
@NonNull Consumer<DataStoreException> onError) {
428-
beforeOperation(() -> {
429-
orchestrator.stop();
430-
sqliteStorageAdapter.clear(onComplete, onError);
431-
});
444+
// We shouldn't call beforeOperation when clearing the DataStore. The
445+
// only thing we have to wait for is the category initialization latch.
446+
boolean isCategoryInitialized = false;
447+
try {
448+
isCategoryInitialized = categoryInitializationsPending.await(PLUGIN_INIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
449+
} catch (InterruptedException exception) {
450+
LOG.warn("Execution interrupted while waiting for DataStore to be initialized.");
451+
}
452+
if (!isCategoryInitialized) {
453+
onError.accept(new DataStoreException("DataStore not ready to be cleared.", "Retry your request."));
454+
return;
455+
}
456+
isOrchestratorReady.set(false);
457+
orchestrator.stop()
458+
.subscribeOn(Schedulers.io())
459+
.andThen(Completable.fromAction(() -> sqliteStorageAdapter.clear(() -> {
460+
// Invoke the consumer's callback once the clear operation is finished.
461+
onComplete.call();
462+
// Kick off the orchestrator asynchronously.
463+
initializeOrchestrator()
464+
.doOnError(throwable -> LOG.warn("Failed to restart orchestrator after clearing.", throwable))
465+
.doOnComplete(() -> LOG.info("Orchestrator restarted after clear operation."))
466+
.subscribe();
467+
}, onError)))
468+
.doOnError(throwable -> LOG.warn("Clear operation failed", throwable))
469+
.doOnComplete(() -> LOG.debug("Clear operation completed."))
470+
.subscribe();
432471
}
433472

434473
private void beforeOperation(@NonNull final Runnable runnable) {
435-
Completable opCompletable = Completable.fromAction(categoryInitializationsPending::await);
436-
if (!orchestrator.isStarted()) {
437-
opCompletable = opCompletable
438-
.andThen(initializeOrchestrator());
439-
}
440-
Throwable throwable = opCompletable
474+
Throwable throwable = Completable.fromAction(categoryInitializationsPending::await)
475+
.repeatUntil(() -> {
476+
// Repeat until this is true or the blockingGet call times out.
477+
synchronized (isOrchestratorReady) {
478+
return isOrchestratorReady.get();
479+
}
480+
})
441481
.andThen(Completable.fromRunnable(runnable))
442482
.blockingGet(PLUGIN_INIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
443-
if (throwable != null) {
444-
LOG.warn("Failed to execute request due to an unexpected error.", throwable);
483+
if (!(throwable == null && isOrchestratorReady.get())) {
484+
if (!isOrchestratorReady.get()) {
485+
LOG.warn("Failed to execute request because DataStore is not fully initialized.");
486+
} else {
487+
LOG.warn("Failed to execute request due to an unexpected error.", throwable);
488+
}
445489
}
446490
}
447491

448492
private Completable initializeOrchestrator() {
449493
if (api.getPlugins().isEmpty()) {
450494
return Completable.complete();
451495
} else {
452-
return orchestrator.start();
496+
// Let's prevent the orchestrator startup from possibly running in main.
497+
return orchestrator.start(() -> {
498+
// This callback is invoked when the local storage observer gets initialized.
499+
isOrchestratorReady.set(true);
500+
})
501+
.repeatUntil(() -> isOrchestratorReady.get())
502+
.observeOn(Schedulers.io())
503+
.subscribeOn(Schedulers.io());
453504
}
454505
}
455506

aws-datastore/src/main/java/com/amplifyframework/datastore/AmplifyDisposables.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,27 @@
1818
import androidx.annotation.NonNull;
1919
import androidx.annotation.Nullable;
2020

21+
import com.amplifyframework.AmplifyException;
22+
import com.amplifyframework.api.graphql.GraphQLResponse;
23+
import com.amplifyframework.core.Amplify;
24+
import com.amplifyframework.core.Consumer;
2125
import com.amplifyframework.core.async.Cancelable;
26+
import com.amplifyframework.core.model.Model;
27+
import com.amplifyframework.datastore.appsync.ModelWithMetadata;
28+
import com.amplifyframework.logging.Logger;
2229

2330
import java.util.concurrent.atomic.AtomicReference;
2431

32+
import io.reactivex.ObservableEmitter;
2533
import io.reactivex.disposables.Disposable;
2634

2735
/**
2836
* A utility for building Rx {@link Disposable}s from Amplify entities,
2937
* e.g. the {@link Cancelable}.
3038
*/
3139
public final class AmplifyDisposables {
40+
private static final Logger LOG = Amplify.Logging.forNamespace("amplify:aws-datastore");
41+
3242
private AmplifyDisposables() {}
3343

3444
/**
@@ -61,4 +71,47 @@ public boolean isDisposed() {
6171
}
6272
};
6373
}
74+
75+
/**
76+
* This function that creates a {@link Consumer} which wraps the {@link ObservableEmitter#onError(Throwable)}
77+
* to prevent it from calling observers that have already been disposed.
78+
*
79+
* <p>
80+
* The scenario is that we have multiple event sources (1 AppSync subscription
81+
* for each model+operation type combination) being consumed by a single downstream
82+
* oberserver. Once the first subscription emits an error, the downstream subscriber
83+
* is placed in a disposed state and will not receive any further notifications.
84+
* In a situation such as loss of connectivity, it's innevitable that multiple subscriptions will fail.
85+
* With that said, after the first failure, the other events sources (AppSync subscriptions)
86+
* will attempt to invoke the downstream onError handler which then results in an
87+
* {@link io.reactivex.exceptions.UndeliverableException} being thrown.
88+
* </p>
89+
*
90+
* <p>
91+
* This method takes a reference of the observable that represents the AppSync subscription,
92+
* wraps it and returns a {@link Consumer} that is used as the onError parameter. The returned
93+
* {@link Consumer} function will delegate the onError call to the downstream observers if it's
94+
* still available, otherwise it logs a warning.
95+
* </p>
96+
*
97+
* @param realEmitter The emitter which will have it's onError function proxied by the return
98+
* value of this function.
99+
* @param <T> The type of model handled by the emitter.
100+
* @param <E> The type of exception for the onError consumer
101+
* @return A {@link Consumer} that proxies the {@link ObservableEmitter#onError(Throwable)} call
102+
* to the {@code realEmitter} if it's not disposed or logs a warning.
103+
* @see <a href="https://github.com/aws-amplify/amplify-android/issues/541">GitHub issue #541</a>
104+
*
105+
*/
106+
@NonNull
107+
public static <T extends Model, E extends AmplifyException> Consumer<E> onErrorConsumerWrapperFor(
108+
@NonNull ObservableEmitter<GraphQLResponse<ModelWithMetadata<T>>> realEmitter) {
109+
return dataStoreException -> {
110+
if (realEmitter.isDisposed()) {
111+
LOG.warn("Attempted to invoke an emitter that is already disposed", dataStoreException);
112+
} else {
113+
realEmitter.onError(dataStoreException);
114+
}
115+
};
116+
}
64117
}

aws-datastore/src/main/java/com/amplifyframework/datastore/DataStoreErrorHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,5 @@
2020
/**
2121
* Just a ~type-alias for a consumer of DataStoreException.
2222
*/
23-
interface DataStoreErrorHandler extends Consumer<DataStoreException> {
23+
public interface DataStoreErrorHandler extends Consumer<DataStoreException> {
2424
}

0 commit comments

Comments
 (0)