Skip to content

Commit 69132df

Browse files
fix(datastore): Add Model Class Check in getMutationById() to Prevent Cross-Model Primary Key Collisions (#2612)
Co-authored-by: Tyler Roach <[email protected]>
1 parent 740c52c commit 69132df

File tree

5 files changed

+161
-25
lines changed

5 files changed

+161
-25
lines changed

aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/Merger.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,10 @@ <T extends Model> Completable merge(
102102
// If we should merge, then do so now, starting with the model data.
103103
.flatMapCompletable(shouldMerge -> {
104104
Completable firstStep;
105-
if (mutationOutbox.hasPendingMutation(model.getPrimaryKeyString())) {
106-
LOG.info("Mutation outbox has pending mutation for " + model.resolveIdentifier()
105+
if (mutationOutbox.hasPendingMutation(model.getPrimaryKeyString(),
106+
model.getClass().getName())) {
107+
LOG.info("Mutation outbox has pending mutation for Model: " +
108+
model.getModelName() + " with primary key: " + model.resolveIdentifier()
107109
+ ". Saving the metadata, but not model itself.");
108110
firstStep = Completable.complete();
109111
} else {

aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/MutationOutbox.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,15 @@ interface MutationOutbox {
4545
Completable load();
4646

4747
/**
48-
* Checks to see if there is a pending mutation for a model with the given ID.
48+
* Checks to see if there is a pending mutation for a model with the given ID and Class.
4949
*
5050
* @param modelId ID of any model in the system
51-
* @return true if there is a pending mutation for the model id, false if not.
51+
* @param modelClass The fully qualified class name of the model for which you want to check
52+
* pending mutations. This should match the name returned by the model's
53+
* getClass().getName() method.
54+
* @return true if there is a pending mutation for the model with the given ID and class, false if not.
5255
*/
53-
boolean hasPendingMutation(@NonNull String modelId);
56+
boolean hasPendingMutation(@NonNull String modelId, @NonNull String modelClass);
5457

5558
/**
5659
* Write a new {@link PendingMutation} into the outbox.

aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/PersistentMutationOutbox.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,13 +82,13 @@ final class PersistentMutationOutbox implements MutationOutbox {
8282
}
8383

8484
@Override
85-
public boolean hasPendingMutation(@NonNull String modelId) {
85+
public boolean hasPendingMutation(@NonNull String modelId, @NonNull String modelClass) {
8686
Objects.requireNonNull(modelId);
87-
return getMutationForModelId(modelId) != null;
87+
return getMutationForModelId(modelId, modelClass) != null;
8888
}
8989

9090
@VisibleForTesting
91-
PendingMutation<? extends Model> getMutationForModelId(@NonNull String modelId) {
91+
PendingMutation<? extends Model> getMutationForModelId(@NonNull String modelId, @NonNull String modelClass) {
9292
Objects.requireNonNull(modelId);
9393
AtomicReference<PendingMutation<? extends Model>> mutationResult = new AtomicReference<>();
9494
Completable.create(emitter -> {
@@ -98,7 +98,10 @@ PendingMutation<? extends Model> getMutationForModelId(@NonNull String modelId)
9898
if (results.hasNext()) {
9999
try {
100100
PendingMutation.PersistentRecord persistentRecord = results.next();
101-
mutationResult.set(converter.fromRecord(persistentRecord));
101+
PendingMutation<?> pendingMutation = converter.fromRecord(persistentRecord);
102+
if (pendingMutation.getModelSchema().getModelClass().getName().equals(modelClass)) {
103+
mutationResult.set(pendingMutation);
104+
}
102105
} catch (Throwable throwable) {
103106
emitter.onError(throwable);
104107
}
@@ -150,8 +153,9 @@ public <T extends Model> Completable enqueue(@NonNull PendingMutation<T> incomin
150153
// If there is no existing mutation for the model, then just apply the incoming
151154
// mutation, and be done with this.
152155
String modelId = incomingMutation.getMutatedItem().getPrimaryKeyString();
156+
String modelClass = incomingMutation.getModelSchema().getModelClass().getName();
153157
@SuppressWarnings("unchecked")
154-
PendingMutation<T> existingMutation = (PendingMutation<T>) getMutationForModelId(modelId);
158+
PendingMutation<T> existingMutation = (PendingMutation<T>) getMutationForModelId(modelId, modelClass);
155159
if (existingMutation == null || inFlightMutations.contains(existingMutation.getMutationId())) {
156160
return save(incomingMutation, true)
157161
.andThen(notifyContentAvailable());

aws-datastore/src/test/java/com/amplifyframework/datastore/syncengine/MutationProcessorTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,8 @@ public void canDrainMutationOutbox() throws DataStoreException {
177177
assertEquals(1, accumulator.await().size());
178178

179179
// And that it is no longer in the outbox.
180-
assertFalse(mutationOutbox.hasPendingMutation(tony.getPrimaryKeyString()));
180+
assertFalse(mutationOutbox.hasPendingMutation(tony.getPrimaryKeyString(),
181+
tony.getClass().getName()));
181182

182183
// And that it was passed to AppSync for publication.
183184
verify(appSync).create(eq(tony), any(), any(), any());

aws-datastore/src/test/java/com/amplifyframework/datastore/syncengine/PersistentMutationOutboxTest.java

Lines changed: 140 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@
2727
import com.amplifyframework.datastore.syncengine.MutationOutbox.OutboxEvent;
2828
import com.amplifyframework.datastore.syncengine.PendingMutation.PersistentRecord;
2929
import com.amplifyframework.hub.HubChannel;
30+
import com.amplifyframework.testmodels.commentsblog.Author;
3031
import com.amplifyframework.testmodels.commentsblog.BlogOwner;
32+
import com.amplifyframework.testmodels.commentsblog.Post;
33+
import com.amplifyframework.testmodels.commentsblog.PostStatus;
3134
import com.amplifyframework.testutils.HubAccumulator;
3235
import com.amplifyframework.testutils.random.RandomString;
3336

@@ -144,7 +147,7 @@ public void enqueuePersistsMutationAndNotifiesObserver() throws DataStoreExcepti
144147
Collections.singletonList(converter.toRecord(createJameson)),
145148
storage.query(PersistentRecord.class)
146149
);
147-
assertTrue(mutationOutbox.hasPendingMutation(jameson.getId()));
150+
assertTrue(mutationOutbox.hasPendingMutation(jameson.getId(), jameson.getClass().getName()));
148151
assertEquals(createJameson, mutationOutbox.peek());
149152
}
150153

@@ -205,8 +208,8 @@ public void loadPreparesOutbox() throws DataStoreException, InterruptedException
205208
loadObserver.dispose();
206209

207210
// Assert: items are in the outbox.
208-
assertTrue(mutationOutbox.hasPendingMutation(tony.getId()));
209-
assertTrue(mutationOutbox.hasPendingMutation(sam.getId()));
211+
assertTrue(mutationOutbox.hasPendingMutation(tony.getId(), tony.getClass().getName()));
212+
assertTrue(mutationOutbox.hasPendingMutation(sam.getId(), sam.getClass().getName()));
210213

211214
// Tony is first, since he is the older of the two mutations.
212215
assertEquals(updateTony, mutationOutbox.peek());
@@ -237,7 +240,7 @@ public void removeRemovesChangesFromQueue() throws DataStoreException, Interrupt
237240
assertEquals(0, storage.query(PersistentRecord.class).size());
238241

239242
assertNull(mutationOutbox.peek());
240-
assertFalse(mutationOutbox.hasPendingMutation(bill.getId()));
243+
assertFalse(mutationOutbox.hasPendingMutation(bill.getId(), bill.getClass().getName()));
241244
}
242245

243246
/**
@@ -327,8 +330,8 @@ public void hasPendingMutationReturnsTrueForExistingModelMutation() {
327330
boolean completed = mutationOutbox.enqueue(pendingMutation).blockingAwait(TIMEOUT_MS, TimeUnit.MILLISECONDS);
328331

329332
assertTrue(completed);
330-
assertTrue(mutationOutbox.hasPendingMutation(modelId));
331-
assertFalse(mutationOutbox.hasPendingMutation(mutationId.toString()));
333+
assertTrue(mutationOutbox.hasPendingMutation(modelId, joe.getClass().getName()));
334+
assertFalse(mutationOutbox.hasPendingMutation(mutationId.toString(), mutationId.getClass().getName()));
332335
}
333336

334337
/**
@@ -354,10 +357,128 @@ public void hasPendingMutationReturnsFalseForItemNotInStore() throws DataStoreEx
354357
mutationId, joe, schema, PendingMutation.Type.CREATE, QueryPredicates.all()
355358
);
356359

357-
assertFalse(mutationOutbox.hasPendingMutation(joeId));
358-
assertFalse(mutationOutbox.hasPendingMutation(unrelatedMutation.getMutationId().toString()));
360+
assertFalse(mutationOutbox.hasPendingMutation(joeId, joe.getClass().getName()));
361+
assertFalse(mutationOutbox.hasPendingMutation(unrelatedMutation.getMutationId().toString(),
362+
unrelatedMutation.getClass().getName()));
359363
}
360364

365+
/**
366+
* When queuing record to mutationOutbox for a model, hasPendingMutation should return false
367+
* for other existing models with same primary key values.
368+
*
369+
* @throws AmplifyException On failure to convert the modelClass item to ModelSchema
370+
*/
371+
@Test
372+
public void hasPendingMutationReturnsFalseForModelMutationWithSamePrimaryKeyForDifferentModels()
373+
throws AmplifyException {
374+
375+
String modelId = RandomString.string();
376+
BlogOwner blogOwner = BlogOwner.builder()
377+
.name("Sample BlogOwner")
378+
.id(modelId)
379+
.build();
380+
TimeBasedUuid mutationId = TimeBasedUuid.create();
381+
382+
PendingMutation<BlogOwner> pendingBlogOwnerMutation = PendingMutation.instance(
383+
mutationId, blogOwner, schema, PendingMutation.Type.CREATE, QueryPredicates.all()
384+
);
385+
386+
// Act & Assert: Enqueue and verify BlogOwner
387+
assertTrue(mutationOutbox.enqueue(pendingBlogOwnerMutation).blockingAwait(TIMEOUT_MS, TimeUnit.MILLISECONDS));
388+
assertTrue(mutationOutbox.hasPendingMutation(modelId, blogOwner.getClass().getName()));
389+
390+
// Act & Assert: Enqueue and verify Author
391+
Author author = Author.builder()
392+
.name("Sample Author")
393+
.id(modelId)
394+
.build();
395+
396+
// Check hasPendingMutation returns False for Author with same Primary Key (id) as BlogOwner
397+
assertFalse(mutationOutbox.hasPendingMutation(modelId, author.getClass().getName()));
398+
399+
PendingMutation<Author> pendingAuthorMutation = PendingMutation.instance(
400+
mutationId, author, ModelSchema.fromModelClass(Author.class),
401+
PendingMutation.Type.CREATE, QueryPredicates.all()
402+
);
403+
assertTrue(mutationOutbox.enqueue(pendingAuthorMutation).blockingAwait(TIMEOUT_MS, TimeUnit.MILLISECONDS));
404+
405+
// Make sure Author Mutation is stored
406+
assertTrue(mutationOutbox.hasPendingMutation(modelId, author.getClass().getName()));
407+
408+
// Act & Assert: Enqueue and verify Author
409+
Post post = Post.builder()
410+
.title("Sample Author")
411+
.status(PostStatus.ACTIVE)
412+
.rating(1)
413+
.id(modelId)
414+
.build();
415+
416+
// Check hasPendingMutation returns False for Post with same Primary Key (id) as BlogOwner
417+
assertFalse(mutationOutbox.hasPendingMutation(modelId, post.getClass().getName()));
418+
419+
PendingMutation<Post> pendingPostMutation = PendingMutation.instance(
420+
mutationId, post, ModelSchema.fromModelClass(Post.class),
421+
PendingMutation.Type.CREATE, QueryPredicates.all()
422+
);
423+
assertTrue(mutationOutbox.enqueue(pendingPostMutation).blockingAwait(TIMEOUT_MS, TimeUnit.MILLISECONDS));
424+
425+
// Make sure Post Mutation is stored
426+
assertTrue(mutationOutbox.hasPendingMutation(modelId, post.getClass().getName()));
427+
}
428+
429+
/**
430+
* Validates that attempting to enqueue a mutation for a model with a duplicate primary key results
431+
* in a DataStoreException. Also checks that the original mutation is still in the outbox.
432+
*
433+
* @throws DataStoreException On failure to query storage to assert post-action value of mutation
434+
* @throws InterruptedException If interrupted while awaiting terminal result in test observer
435+
* @throws AmplifyException If schema cannot be found in the registry
436+
*/
437+
@Test
438+
public void mutationEnqueueForModelWithDuplicatePrimaryKeyThrowsDatastoreException() throws
439+
AmplifyException, InterruptedException {
440+
441+
// Arrange: Create and enqueue an initial BlogOwner mutation
442+
String modelId = RandomString.string();
443+
BlogOwner existingBlogOwner = BlogOwner.builder()
444+
.name("Sample BlogOwner")
445+
.id(modelId)
446+
.build();
447+
448+
PendingMutation<BlogOwner> existingCreation = PendingMutation.creation(existingBlogOwner, schema);
449+
String existingCreationId = existingCreation.getMutationId().toString();
450+
assertTrue(mutationOutbox.enqueue(existingCreation).blockingAwait(TIMEOUT_MS, TimeUnit.MILLISECONDS));
451+
452+
// Arrange: Create a new BlogOwner with the same ID as the existing one
453+
BlogOwner duplicateBlogOwner = BlogOwner.builder()
454+
.name("Sample BlogOwner")
455+
.id(modelId)
456+
.build();
457+
PendingMutation<BlogOwner> duplicateMutation =
458+
PendingMutation.creation(duplicateBlogOwner, schema);
459+
String duplicateMutationId = duplicateMutation.getMutationId().toString();
460+
461+
// Act: Attempt to enqueue the duplicate mutation
462+
TestObserver<Void> enqueueObserver = mutationOutbox.enqueue(duplicateMutation).test();
463+
464+
// Assert: Verify that a DataStoreException is thrown
465+
enqueueObserver.await(TIMEOUT_MS, TimeUnit.MILLISECONDS);
466+
enqueueObserver.assertError(throwable -> throwable instanceof DataStoreException);
467+
468+
// Assert: original mutation is present, but the new one isn't.
469+
PersistentRecord storedMutation = storage.query(PersistentRecord.class,
470+
Where.identifier(PersistentRecord.class, existingCreationId)).get(0);
471+
assertEquals(existingBlogOwner, converter.fromRecord(storedMutation).getMutatedItem());
472+
assertTrue(storage.query(PersistentRecord.class,
473+
Where.identifier(PersistentRecord.class, duplicateMutationId)).isEmpty());
474+
475+
// Additional Checks: Peek the Mutation outbox, existing mutation should be present.
476+
assertTrue(mutationOutbox.hasPendingMutation(existingBlogOwner.getPrimaryKeyString(),
477+
existingBlogOwner.getClass().getName()));
478+
assertEquals(existingCreation, mutationOutbox.peek());
479+
}
480+
481+
361482
/**
362483
* When there is an existing creation for a model, and a new creation for that
363484
* model comes in, an error should be returned. In other words, it is illegal to
@@ -399,7 +520,8 @@ public void existingCreationIncomingCreationYieldsError() throws AmplifyExceptio
399520
Where.identifier(PersistentRecord.class, incomingCreationId)).isEmpty());
400521

401522
// Existing mutation still attainable as next mutation (right now, its the ONLY mutation in outbox)
402-
assertTrue(mutationOutbox.hasPendingMutation(modelInExistingMutation.getPrimaryKeyString()));
523+
assertTrue(mutationOutbox.hasPendingMutation(modelInExistingMutation.getPrimaryKeyString(),
524+
modelInExistingMutation.getClass().getName()));
403525
assertEquals(existingCreation, mutationOutbox.peek());
404526
}
405527

@@ -444,7 +566,8 @@ public void existingUpdateIncomingCreationYieldsError() throws AmplifyException,
444566
incomingCreationId)).isEmpty());
445567

446568
// Existing mutation still attainable as next mutation (right now, its the ONLY mutation in outbox)
447-
assertTrue(mutationOutbox.hasPendingMutation(modelInExistingMutation.getPrimaryKeyString()));
569+
assertTrue(mutationOutbox.hasPendingMutation(modelInExistingMutation.getPrimaryKeyString(),
570+
modelInExistingMutation.getClass().getName()));
448571
assertEquals(existingUpdate, mutationOutbox.peek());
449572
}
450573

@@ -490,7 +613,8 @@ public void existingDeletionIncomingCreationYieldsError() throws AmplifyExceptio
490613
Where.identifier(PersistentRecord.class, incomingCreationId)).isEmpty());
491614

492615
// Existing mutation still attainable as next mutation (right now, its the ONLY mutation in outbox)
493-
assertTrue(mutationOutbox.hasPendingMutation(modelInExistingMutation.getPrimaryKeyString()));
616+
assertTrue(mutationOutbox.hasPendingMutation(modelInExistingMutation.getPrimaryKeyString(),
617+
modelInExistingMutation.getClass().getName()));
494618
assertEquals(existingDeletion, mutationOutbox.peek());
495619
}
496620

@@ -535,7 +659,8 @@ public void existingDeletionIncomingUpdateYieldsError() throws AmplifyException,
535659
incomingUpdateId)).isEmpty());
536660

537661
// Existing mutation still attainable as next mutation (right now, its the ONLY mutation in outbox)
538-
assertTrue(mutationOutbox.hasPendingMutation(modelInExistingMutation.getPrimaryKeyString()));
662+
assertTrue(mutationOutbox.hasPendingMutation(modelInExistingMutation.getPrimaryKeyString(),
663+
modelInExistingMutation.getClass().getName()));
539664
assertEquals(existingDeletion, mutationOutbox.peek());
540665
}
541666

@@ -667,7 +792,8 @@ public void existingUpdateIncomingUpdateWithoutConditionRewritesExistingMutation
667792
* @throws InterruptedException If interrupted while awaiting terminal result in test observer
668793
*/
669794
@Test
670-
public void existingSerializedModelUpdateIncomingUpdateWithoutConditionMergesWithExistingMutation()
795+
public void
796+
existingSerializedModelUpdateIncomingUpdateWithoutConditionMergesWithExistingMutation()
671797
throws AmplifyException, InterruptedException {
672798
// Arrange an existing update mutation
673799
BlogOwner modelInSqlLite = BlogOwner.builder()
@@ -1060,7 +1186,7 @@ public void nextItemForModelIdReturnsFirstEnqueued() throws DataStoreException {
10601186
assertTrue(completed);
10611187
assertEquals(
10621188
firstMutation,
1063-
mutationOutbox.getMutationForModelId(originalJoe.getId())
1189+
mutationOutbox.getMutationForModelId(originalJoe.getId(), originalJoe.getClass().getName())
10641190
);
10651191
}
10661192

0 commit comments

Comments
 (0)