Skip to content

Commit 6e441f6

Browse files
authored
@fix(data): Fix reachability debouncer causing delay on local inserts (#2464)
1 parent db5a501 commit 6e441f6

File tree

3 files changed

+249
-40
lines changed

3 files changed

+249
-40
lines changed

aws-datastore/src/androidTest/java/com/amplifyframework/datastore/BasicCloudSyncInstrumentationTest.java

Lines changed: 146 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -216,13 +216,16 @@ public void syncDownFromCloudIsWorking() throws AmplifyException {
216216
}
217217

218218
/**
219-
* Verify that updating an item shortly after creating it succeeds. This can be tricky because the _version
220-
* returned in the response from the create request must be included in the input for the subsequent update request.
219+
* Verify that updating an item shortly after creating it succeeds locally.
220+
*
221+
* Note: If this test periodically fails, consider the immediate update save may still be in process since
222+
* we are only waiting on 1 hub event. Because we call back to back, I haven't seen this happen yet.
223+
*
221224
* @throws DataStoreException On failure to save or query items from DataStore.
222225
* @throws ApiException On failure to query the API.
223226
*/
224227
@Test
225-
public void updateAfterCreate() throws DataStoreException, ApiException {
228+
public void createThenUpdate() throws DataStoreException, ApiException {
226229
// Setup
227230
BlogOwner richard = BlogOwner.builder()
228231
.name("Richard")
@@ -232,16 +235,16 @@ public void updateAfterCreate() throws DataStoreException, ApiException {
232235
.build();
233236
String modelName = BlogOwner.class.getSimpleName();
234237

235-
// Expect two mutations to be published to AppSync.
238+
// Expect at least 1 mutation to be published to AppSync.
236239
HubAccumulator richardAccumulator =
237-
HubAccumulator.create(HubChannel.DATASTORE, publicationOf(modelName, richard.getId()), 2)
240+
HubAccumulator.create(HubChannel.DATASTORE, publicationOf(modelName, richard.getId()), 1)
238241
.start();
239242

240243
// Create an item, then update it and save it again.
241244
dataStore.save(richard);
242245
dataStore.save(updatedRichard);
243246

244-
// Verify that 2 mutations were published.
247+
// Verify that at least 1 mutations was published.
245248
richardAccumulator.await(60, TimeUnit.SECONDS);
246249

247250
// Verify that the updatedRichard is saved in the DataStore.
@@ -254,7 +257,56 @@ public void updateAfterCreate() throws DataStoreException, ApiException {
254257
}
255258

256259
/**
257-
* Verify that updating a different field of an item shortly after creating it succeeds.
260+
* Verify that updating an item shortly after creating it succeeds. This can be tricky because the _version
261+
* returned in the response from the create request must be included in the input for the subsequent update request.
262+
* @throws DataStoreException On failure to save or query items from DataStore.
263+
* @throws ApiException On failure to query the API.
264+
*/
265+
@Test
266+
public void createWaitThenUpdate() throws DataStoreException, ApiException {
267+
// Setup
268+
BlogOwner richard = BlogOwner.builder()
269+
.name("Richard")
270+
.build();
271+
BlogOwner updatedRichard = richard.copyOfBuilder()
272+
.name("Richard McClellan")
273+
.build();
274+
String modelName = BlogOwner.class.getSimpleName();
275+
276+
HubAccumulator accumulator1 =
277+
HubAccumulator.create(HubChannel.DATASTORE, publicationOf(modelName, richard.getId()), 1);
278+
HubAccumulator accumulator2 =
279+
HubAccumulator.create(HubChannel.DATASTORE, publicationOf(modelName, richard.getId()), 1);
280+
281+
// Create an item, then update it and save it again.
282+
accumulator1.start();
283+
dataStore.save(richard);
284+
285+
// Verify first save published
286+
accumulator1.await(TIMEOUT_SECONDS, TimeUnit.SECONDS);
287+
288+
// Update item and save
289+
accumulator2.start();
290+
dataStore.save(updatedRichard);
291+
292+
// Verify update published
293+
accumulator2.await(TIMEOUT_SECONDS, TimeUnit.SECONDS);
294+
295+
// Verify that the updatedRichard is saved in the DataStore.
296+
BlogOwner localRichard = dataStore.get(BlogOwner.class, richard.getId());
297+
ModelAssert.assertEqualsIgnoringTimestamps(updatedRichard, localRichard);
298+
299+
// Verify that the updatedRichard is saved on the backend.
300+
BlogOwner remoteRichard = api.get(BlogOwner.class, richard.getId());
301+
ModelAssert.assertEqualsIgnoringTimestamps(updatedRichard, remoteRichard);
302+
}
303+
304+
/**
305+
* Verify that updating a different field of an item immediately after creating it succeeds.
306+
*
307+
* Note: If this test periodically fails, consider the immediate update save may still be in process since
308+
* we are only waiting on 1 hub event. Because we call back to back, I haven't seen this happen yet.
309+
*
258310
* @throws DataStoreException On failure to save or query items from DataStore.
259311
* @throws ApiException On failure to query the API.
260312
*/
@@ -269,17 +321,61 @@ public void createThenUpdateDifferentField() throws DataStoreException, ApiExcep
269321
.build();
270322
String modelName = BlogOwner.class.getSimpleName();
271323

272-
// Expect two mutations to be published to AppSync.
324+
// Expect at least 1 mutation to be published to AppSync.
273325
HubAccumulator accumulator =
274-
HubAccumulator.create(HubChannel.DATASTORE, publicationOf(modelName, owner.getId()), 2)
326+
HubAccumulator.create(HubChannel.DATASTORE, publicationOf(modelName, owner.getId()), 1)
275327
.start();
276328

277329
// Create an item, then update it with different field and save it again.
278330
dataStore.save(owner);
279331
dataStore.save(updatedOwner);
280332

281-
// Verify that 2 mutations were published.
282-
accumulator.await(60, TimeUnit.SECONDS);
333+
// Verify that mutation(s) were published.
334+
accumulator.await(TIMEOUT_SECONDS, TimeUnit.SECONDS);
335+
336+
// Verify that the updatedOwner is saved in the DataStore.
337+
BlogOwner localOwner = dataStore.get(BlogOwner.class, owner.getId());
338+
ModelAssert.assertEqualsIgnoringTimestamps(updatedOwner, localOwner);
339+
340+
// Verify that the updatedOwner is saved on the backend.
341+
BlogOwner remoteOwner = api.get(BlogOwner.class, owner.getId());
342+
ModelAssert.assertEqualsIgnoringTimestamps(updatedOwner, remoteOwner);
343+
}
344+
345+
/**
346+
* Verify that updating a different field of an item succeeds, after verifying the initial item has been published.
347+
* @throws DataStoreException On failure to save or query items from DataStore.
348+
* @throws ApiException On failure to query the API.
349+
*/
350+
@Test
351+
public void createWaitThenUpdateDifferentField() throws DataStoreException, ApiException {
352+
// Setup
353+
BlogOwner owner = BlogOwner.builder()
354+
.name("Richard")
355+
.build();
356+
BlogOwner updatedOwner = owner.copyOfBuilder()
357+
.wea("pon")
358+
.build();
359+
String modelName = BlogOwner.class.getSimpleName();
360+
361+
HubAccumulator accumulator1 =
362+
HubAccumulator.create(HubChannel.DATASTORE, publicationOf(modelName, owner.getId()), 1);
363+
HubAccumulator accumulator2 =
364+
HubAccumulator.create(HubChannel.DATASTORE, publicationOf(modelName, owner.getId()), 1);
365+
366+
// Create an item, then
367+
accumulator1.start();
368+
dataStore.save(owner);
369+
370+
// Verify save published
371+
accumulator1.await(TIMEOUT_SECONDS, TimeUnit.SECONDS);
372+
373+
// Update item with different field and save it again.
374+
accumulator2.start();
375+
dataStore.save(updatedOwner);
376+
377+
// Verify update save published
378+
accumulator2.await(TIMEOUT_SECONDS, TimeUnit.SECONDS);
283379

284380
// Verify that the updatedOwner is saved in the DataStore.
285381
BlogOwner localOwner = dataStore.get(BlogOwner.class, owner.getId());
@@ -333,31 +429,52 @@ public void create1ThenCreate2ThenUpdate2() throws DataStoreException, ApiExcept
333429
}
334430

335431
/**
336-
* Verify that creating a new item, then immediately delete succeeds.
432+
* Verify that creating a new item, then immediately deleting succeeds.
337433
* @throws DataStoreException On failure to save or query items from DataStore.
338434
* @throws ApiException On failure to query the API.
339435
*/
340436
@Test
341437
public void createThenDelete() throws DataStoreException, ApiException {
438+
// Setup
439+
BlogOwner owner = BlogOwner.builder()
440+
.name("Jean")
441+
.build();
442+
443+
dataStore.save(owner);
444+
dataStore.delete(owner);
445+
446+
// Verify that the owner is deleted from the local data store.
447+
assertThrows(NoSuchElementException.class, () -> dataStore.get(BlogOwner.class, owner.getId()));
448+
}
449+
450+
/**
451+
* Verify that creating a new item, waiting for it to post, then immediately delete succeeds.
452+
* @throws DataStoreException On failure to save or query items from DataStore.
453+
* @throws ApiException On failure to query the API.
454+
*/
455+
@Test
456+
public void createWaitThenDelete() throws DataStoreException, ApiException {
342457
// Setup
343458
BlogOwner owner = BlogOwner.builder()
344459
.name("Jean")
345460
.build();
346461
String modelName = BlogOwner.class.getSimpleName();
347-
348-
HubAccumulator accumulator =
349-
HubAccumulator.create(HubChannel.DATASTORE, publicationOf(modelName, owner.getId()), 2)
350-
.start();
462+
463+
HubAccumulator accumulator1 =
464+
HubAccumulator.create(HubChannel.DATASTORE, publicationOf(modelName, owner.getId()), 1);
465+
HubAccumulator accumulator2 =
466+
HubAccumulator.create(HubChannel.DATASTORE, publicationOf(modelName, owner.getId()), 1);
467+
468+
accumulator1.start();
351469
dataStore.save(owner);
470+
accumulator1.awaitFirst(TIMEOUT_SECONDS, TimeUnit.SECONDS);
471+
472+
accumulator2.start();
352473
dataStore.delete(owner);
353-
354-
accumulator.await(TIMEOUT_SECONDS, TimeUnit.SECONDS);
355-
474+
accumulator2.await(TIMEOUT_SECONDS, TimeUnit.SECONDS);
475+
356476
// Verify that the owner is deleted from the local data store.
357477
assertThrows(NoSuchElementException.class, () -> dataStore.get(BlogOwner.class, owner.getId()));
358-
359-
// Note: Currently, default GraphQL resolvers do not filter records that have been deleted.
360-
// Therefore, calling api to get the item at this point would still succeed.
361478
}
362479

363480
/**
@@ -457,6 +574,10 @@ public void createWaitThenUpdate10TimesWithPredicate() throws DataStoreException
457574
/**
458575
* Create new item, then immediately update a different field.
459576
* Wait for sync round trip. Then update the first field.
577+
*
578+
* Note: If this test periodically fails, consider the immediate update save may still be in process since
579+
* we are only waiting on 1 hub event. Because we call back to back, I haven't seen this happen yet.
580+
*
460581
* @throws DataStoreException On failure to save or query items from DataStore.
461582
* @throws ApiException On failure to query the API.
462583
*/
@@ -466,9 +587,10 @@ public void createItemThenUpdateThenWaitThenUpdate() throws DataStoreException,
466587
BlogOwner owner = BlogOwner.builder().name("ownerName").build();
467588
BlogOwner updatedOwner = owner.copyOfBuilder().wea("pon").build();
468589
String modelName = BlogOwner.class.getSimpleName();
469-
590+
591+
// Expect at least 1 update (2 is possible)
470592
HubAccumulator accumulator =
471-
HubAccumulator.create(HubChannel.DATASTORE, publicationOf(modelName, owner.getId()), 2)
593+
HubAccumulator.create(HubChannel.DATASTORE, publicationOf(modelName, owner.getId()), 1)
472594
.start();
473595
// Create new and then immediately update
474596
dataStore.save(owner);

aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/ReachabilityMonitor.kt

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -62,32 +62,34 @@ private class ReachabilityMonitorImpl constructor(val schedulerProvider: Schedul
6262

6363
override fun configure(context: Context, connectivityProvider: ConnectivityProvider) {
6464
this.connectivityProvider = connectivityProvider
65-
connectivityProvider.registerDefaultNetworkCallback(
66-
context,
67-
object : NetworkCallback() {
68-
override fun onAvailable(network: Network) {
69-
subject.onNext(true)
70-
}
65+
val observable = Observable.create { emitter ->
66+
connectivityProvider.registerDefaultNetworkCallback(
67+
context,
68+
object : NetworkCallback() {
69+
override fun onAvailable(network: Network) {
70+
emitter.onNext(true)
71+
}
7172

72-
override fun onLost(network: Network) {
73-
subject.onNext(false)
73+
override fun onLost(network: Network) {
74+
emitter.onNext(false)
75+
}
7476
}
75-
}
76-
)
77+
)
78+
emitter.onNext(connectivityProvider.hasActiveNetwork)
79+
}
80+
observable.debounce(250, TimeUnit.MILLISECONDS, schedulerProvider.computation())
81+
.distinctUntilChanged()
82+
.subscribe(subject)
7783
}
7884

7985
override fun getObservable(): Observable<Boolean> {
80-
connectivityProvider?.let { connectivityProvider ->
81-
return subject.subscribeOn(schedulerProvider.io())
82-
.doOnSubscribe { subject.onNext(connectivityProvider.hasActiveNetwork) }
83-
.debounce(250, TimeUnit.MILLISECONDS, schedulerProvider.computation())
84-
.distinctUntilChanged()
85-
} ?: run {
86+
if (connectivityProvider == null) {
8687
throw DataStoreException(
8788
"ReachabilityMonitor has not been configured.",
8889
"Call ReachabilityMonitor.configure() before calling ReachabilityMonitor.getObservable()"
8990
)
9091
}
92+
return subject.subscribeOn(schedulerProvider.io())
9193
}
9294
}
9395

0 commit comments

Comments
 (0)