Skip to content

Commit 47de6e5

Browse files
fix(datastore) Make PersistentMutationOutbox operations synchronized (#1085)
* Make tests that fail * fix(datastore) Make PersistentMutationOutbox operations synchronized
1 parent 08ce7f5 commit 47de6e5

File tree

2 files changed

+141
-65
lines changed

2 files changed

+141
-65
lines changed

aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/PersistentMutationOutbox.java

Lines changed: 60 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -87,17 +87,21 @@ public boolean hasPendingMutation(@NonNull String modelId) {
8787
@Override
8888
public <T extends Model> Completable enqueue(@NonNull PendingMutation<T> incomingMutation) {
8989
Objects.requireNonNull(incomingMutation);
90-
// If there is no existing mutation for the model, then just apply the incoming
91-
// mutation, and be done with this.
92-
String modelId = incomingMutation.getMutatedItem().getId();
93-
@SuppressWarnings("unchecked")
94-
PendingMutation<T> existingMutation = (PendingMutation<T>) mutationQueue.nextMutationForModelId(modelId);
95-
if (existingMutation == null || inFlightMutations.contains(existingMutation.getMutationId())) {
96-
return save(incomingMutation)
97-
.andThen(notifyContentAvailable());
98-
} else {
99-
return resolveConflict(existingMutation, incomingMutation);
100-
}
90+
return Completable.defer(() -> {
91+
// If there is no existing mutation for the model, then just apply the incoming
92+
// mutation, and be done with this.
93+
String modelId = incomingMutation.getMutatedItem().getId();
94+
@SuppressWarnings("unchecked")
95+
PendingMutation<T> existingMutation = (PendingMutation<T>) mutationQueue.nextMutationForModelId(modelId);
96+
if (existingMutation == null || inFlightMutations.contains(existingMutation.getMutationId())) {
97+
return save(incomingMutation)
98+
.andThen(notifyContentAvailable());
99+
} else {
100+
return resolveConflict(existingMutation, incomingMutation);
101+
}
102+
})
103+
.doOnSubscribe(disposable -> semaphore.acquire())
104+
.doOnTerminate(semaphore::release);
101105
}
102106

103107
private <T extends Model> Completable resolveConflict(@NonNull PendingMutation<T> existingMutation,
@@ -108,48 +112,47 @@ private <T extends Model> Completable resolveConflict(@NonNull PendingMutation<T
108112
}
109113

110114
private <T extends Model> Completable save(PendingMutation<T> pendingMutation) {
111-
return Completable.defer(() -> Completable.create(subscriber -> {
112-
semaphore.acquire();
113-
storage.save(
114-
converter.toRecord(pendingMutation),
115-
StorageItemChange.Initiator.SYNC_ENGINE,
116-
QueryPredicates.all(),
117-
saved -> {
118-
// The return value is StorageItemChange, referring to a PersistentRecord
119-
// that was saved. We could "unwrap" a PendingMutation from that PersistentRecord,
120-
// to get identically the thing that was saved. But we know the save succeeded.
121-
// So, let's skip the unwrapping, and use the thing that was enqueued,
122-
// the pendingMutation, directly.
123-
mutationQueue.updateExistingQueueItemOrAppendNew(pendingMutation.getMutationId(), pendingMutation);
124-
LOG.info("Successfully enqueued " + pendingMutation);
125-
announceEventEnqueued(pendingMutation);
126-
publishCurrentOutboxStatus();
127-
semaphore.release();
128-
subscriber.onComplete();
129-
},
130-
failure -> {
131-
semaphore.release();
132-
subscriber.onError(failure);
133-
}
134-
);
135-
}));
115+
return Completable.create(emitter -> storage.save(
116+
converter.toRecord(pendingMutation),
117+
StorageItemChange.Initiator.SYNC_ENGINE,
118+
QueryPredicates.all(),
119+
saved -> {
120+
// The return value is StorageItemChange, referring to a PersistentRecord
121+
// that was saved. We could "unwrap" a PendingMutation from that PersistentRecord,
122+
// to get identically the thing that was saved. But we know the save succeeded.
123+
// So, let's skip the unwrapping, and use the thing that was enqueued,
124+
// the pendingMutation, directly.
125+
mutationQueue.updateExistingQueueItemOrAppendNew(pendingMutation.getMutationId(), pendingMutation);
126+
LOG.info("Successfully enqueued " + pendingMutation);
127+
announceEventEnqueued(pendingMutation);
128+
publishCurrentOutboxStatus();
129+
emitter.onComplete();
130+
},
131+
emitter::onError
132+
));
136133
}
137134

138135
@NonNull
139136
@Override
140137
public Completable remove(@NonNull TimeBasedUuid pendingMutationId) {
138+
return removeNotLocking(pendingMutationId)
139+
.doOnSubscribe(disposable -> semaphore.acquire())
140+
.doOnTerminate(semaphore::release);
141+
}
142+
143+
@NonNull
144+
private Completable removeNotLocking(@NonNull TimeBasedUuid pendingMutationId) {
141145
Objects.requireNonNull(pendingMutationId);
142-
PendingMutation<? extends Model> pendingMutation = mutationQueue.getMutationById(pendingMutationId);
143-
if (pendingMutation == null) {
144-
return Completable.error(new DataStoreException(
145-
"Outbox was asked to remove a mutation with ID = " + pendingMutationId + ". " +
146-
"However, there was no mutation with that ID in the outbox, to begin with.",
147-
AmplifyException.REPORT_BUG_TO_AWS_SUGGESTION
148-
));
149-
}
150-
return Completable.defer(() ->
151-
Maybe.<OutboxEvent>create(subscriber -> {
152-
semaphore.acquire();
146+
return Completable.defer(() -> {
147+
PendingMutation<? extends Model> pendingMutation = mutationQueue.getMutationById(pendingMutationId);
148+
if (pendingMutation == null) {
149+
throw new DataStoreException(
150+
"Outbox was asked to remove a mutation with ID = " + pendingMutationId + ". " +
151+
"However, there was no mutation with that ID in the outbox, to begin with.",
152+
AmplifyException.REPORT_BUG_TO_AWS_SUGGESTION
153+
);
154+
}
155+
return Maybe.<OutboxEvent>create(subscriber -> {
153156
storage.delete(
154157
converter.toRecord(pendingMutation),
155158
StorageItemChange.Initiator.SYNC_ENGINE,
@@ -159,28 +162,23 @@ public Completable remove(@NonNull TimeBasedUuid pendingMutationId) {
159162
inFlightMutations.remove(pendingMutationId);
160163
LOG.info("Successfully removed from mutations outbox" + pendingMutation);
161164
final boolean contentAvailable = !mutationQueue.isEmpty();
162-
semaphore.release(); // Done accessing queue, now.
163165
if (contentAvailable) {
164166
subscriber.onSuccess(OutboxEvent.CONTENT_AVAILABLE);
165167
} else {
166168
subscriber.onComplete();
167169
}
168170
},
169-
failure -> {
170-
semaphore.release();
171-
subscriber.onError(failure);
172-
}
171+
subscriber::onError
173172
);
174173
})
175-
.flatMapCompletable(contentAvailable -> notifyContentAvailable())
176-
);
174+
.flatMapCompletable(contentAvailable -> notifyContentAvailable());
175+
});
177176
}
178177

179178
@NonNull
180179
@Override
181180
public Completable load() {
182-
return Completable.defer(() -> Completable.create(emitter -> {
183-
semaphore.acquire();
181+
return Completable.create(emitter -> {
184182
inFlightMutations.clear();
185183
mutationQueue.clear();
186184
storage.query(PendingMutation.PersistentRecord.class,
@@ -189,22 +187,19 @@ public Completable load() {
189187
try {
190188
mutationQueue.add(converter.fromRecord(results.next()));
191189
} catch (DataStoreException conversionFailure) {
192-
semaphore.release();
193190
emitter.onError(conversionFailure);
194191
return;
195192
}
196193
}
197194
// Publish outbox status upon loading
198195
publishCurrentOutboxStatus();
199-
semaphore.release();
200196
emitter.onComplete();
201197
},
202-
failure -> {
203-
semaphore.release();
204-
emitter.onError(failure);
205-
}
198+
emitter::onError
206199
);
207-
}));
200+
})
201+
.doOnSubscribe(disposable -> semaphore.acquire())
202+
.doOnTerminate(semaphore::release);
208203
}
209204

210205
@NonNull
@@ -333,7 +328,7 @@ private Completable handleIncomingUpdate() {
333328
if (QueryPredicates.all().equals(incoming.getPredicate())) {
334329
// If the incoming update does not have a condition, we want to delete any
335330
// existing mutations for the modelId before saving the incoming one.
336-
return remove(existing.getMutationId()).andThen(saveIncomingAndNotify());
331+
return removeNotLocking(existing.getMutationId()).andThen(saveIncomingAndNotify());
337332
} else {
338333
// If it has a condition, we want to just add it to the queue
339334
return saveIncomingAndNotify();
@@ -360,7 +355,7 @@ private Completable handleIncomingDelete() {
360355
} else {
361356
// The existing create mutation hasn't made it to the remote store, so we
362357
// ignore the incoming and remove the existing create mutation from outbox.
363-
return remove(existing.getMutationId());
358+
return removeNotLocking(existing.getMutationId());
364359
}
365360
case UPDATE:
366361
case DELETE:

aws-datastore/src/test/java/com/amplifyframework/datastore/syncengine/PersistentMutationOutboxTest.java

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -943,6 +943,87 @@ public void errorWhenMarkingItemNotInQueue() throws InterruptedException {
943943
);
944944
}
945945

946+
/**
947+
* When two creations for the same model are enqueued, the second should fail. This is similar to
948+
* {@link #existingCreationIncomingCreationYieldsError}, except that the Completable's from the two enqueue calls
949+
* are concatenated into the same stream. The second enqueue should not check if an item exists in the queue
950+
* until the first enqueue is completed.
951+
* @throws InterruptedException If interrupted while awaiting terminal result in test observer
952+
*/
953+
@Test
954+
public void enqueueIsSynchronized() throws InterruptedException {
955+
// Arrange an existing creation mutation
956+
BlogOwner modelInExistingMutation = BlogOwner.builder()
957+
.name("The Real Papa Tony")
958+
.build();
959+
PendingMutation<BlogOwner> firstCreation = PendingMutation.creation(modelInExistingMutation, schema);
960+
PendingMutation<BlogOwner> secondCreation = PendingMutation.creation(modelInExistingMutation, schema);
961+
962+
TestObserver<Void> enqueueObserver = mutationOutbox.enqueue(firstCreation)
963+
.andThen(mutationOutbox.enqueue(secondCreation))
964+
.test();
965+
966+
// Assert: caused a failure.
967+
enqueueObserver.await(TIMEOUT_MS, TimeUnit.MILLISECONDS);
968+
enqueueObserver.assertError(throwable -> throwable instanceof DataStoreException);
969+
}
970+
971+
/**
972+
* Attempting to remove an item from the queue which doesn't exist should throw an error.
973+
* @throws InterruptedException If interrupted while awaiting terminal result in test observer
974+
*/
975+
@Test
976+
public void removeIsSynchronized() throws InterruptedException {
977+
// Enqueue and remove a mutation.
978+
BlogOwner tabby = BlogOwner.builder()
979+
.name("Tabitha Stevens of Beaver Falls, Idaho")
980+
.build();
981+
PendingMutation<BlogOwner> creation = PendingMutation.creation(tabby, schema);
982+
mutationOutbox.enqueue(creation)
983+
.blockingAwait(TIMEOUT_MS, TimeUnit.MILLISECONDS);
984+
985+
TestObserver<Void> observer = mutationOutbox.remove(creation.getMutationId())
986+
.andThen(mutationOutbox.remove(creation.getMutationId()))
987+
.test();
988+
989+
observer.await(TIMEOUT_MS, TimeUnit.MILLISECONDS);
990+
observer
991+
.assertError(DataStoreException.class)
992+
.assertError(error ->
993+
error.getMessage() != null &&
994+
error.getMessage().contains("there was no mutation with that ID in the outbox")
995+
);
996+
}
997+
998+
/**
999+
* Marking an item in flight should throw an error if the item is already removed from the queue. This is similar
1000+
* to {@link #errorWhenMarkingItemNotInQueue}, except that the removal and marking in flight Completables are
1001+
* concatenated into the same stream. This validates that markInFlight does not check if the item is in the queue
1002+
* until after the removal is complete.
1003+
* @throws InterruptedException If interrupted while awaiting terminal result in test observer
1004+
*/
1005+
@Test
1006+
public void markInFlightIsSynchronized() throws InterruptedException {
1007+
// Enqueue and remove a mutation.
1008+
BlogOwner tabby = BlogOwner.builder()
1009+
.name("Tabitha Stevens of Beaver Falls, Idaho")
1010+
.build();
1011+
PendingMutation<BlogOwner> creation = PendingMutation.creation(tabby, schema);
1012+
mutationOutbox.enqueue(creation)
1013+
.blockingAwait(TIMEOUT_MS, TimeUnit.MILLISECONDS);
1014+
TestObserver<Void> observer = mutationOutbox.remove(creation.getMutationId())
1015+
.andThen(mutationOutbox.markInFlight(creation.getMutationId())).test();
1016+
1017+
// Now, we should see an error since we can't mark a mutation as in-flight that has already been removed.
1018+
observer.await(TIMEOUT_MS, TimeUnit.MILLISECONDS);
1019+
observer
1020+
.assertError(DataStoreException.class)
1021+
.assertError(error ->
1022+
error.getMessage() != null &&
1023+
error.getMessage().contains("there was no mutation with that ID in the outbox")
1024+
);
1025+
}
1026+
9461027
private void assertRecordCountForMutationId(String mutationId, int expectedCount) throws DataStoreException {
9471028
List<PersistentRecord> recordsForExistingMutationId = getPendingMutationRecordFromStorage(mutationId);
9481029
assertEquals(expectedCount, recordsForExistingMutationId.size());

0 commit comments

Comments
 (0)