Skip to content

Commit 0f8dcdc

Browse files
feat(datastore) selective sync (#959)
1 parent 1f73dba commit 0f8dcdc

19 files changed

+545
-95
lines changed

aws-datastore/src/androidTest/java/com/amplifyframework/datastore/AppSyncClientInstrumentationTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -206,15 +206,16 @@ public void testAllOperations() throws AmplifyException {
206206
// Run sync on Blogs
207207
// TODO: This is currently a pretty worthless test - mainly for setting a debug point and manually inspecting
208208
// When you call sync with a null lastSync it gives only one entry per object (the latest state)
209-
Iterable<ModelWithMetadata<Blog>> blogSyncResult = sync(api.buildSyncRequest(blogSchema, null, 1000));
209+
Iterable<ModelWithMetadata<Blog>> blogSyncResult =
210+
sync(api.buildSyncRequest(blogSchema, null, 1000, QueryPredicates.all()));
210211
assertTrue(blogSyncResult.iterator().hasNext());
211212

212213
// Run sync on Posts
213214
// TODO: This is currently a pretty worthless test - mainly for setting a debug point and manually inspecting
214215
// When you call sync with a lastSyncTime it gives you one entry per version of that object which was created
215216
// since that time.
216217
Iterable<ModelWithMetadata<Post>> postSyncResult =
217-
sync(api.buildSyncRequest(postSchema, startTimeSeconds, 1000));
218+
sync(api.buildSyncRequest(postSchema, startTimeSeconds, 1000, QueryPredicates.all()));
218219
assertTrue(postSyncResult.iterator().hasNext());
219220
}
220221

aws-datastore/src/main/java/com/amplifyframework/datastore/DataStoreConfiguration.java

Lines changed: 54 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,14 @@
2121
import androidx.annotation.VisibleForTesting;
2222
import androidx.core.util.ObjectsCompat;
2323

24+
import com.amplifyframework.core.model.Model;
25+
2426
import org.json.JSONException;
2527
import org.json.JSONObject;
2628

29+
import java.util.HashMap;
2730
import java.util.Iterator;
31+
import java.util.Map;
2832
import java.util.Objects;
2933
import java.util.concurrent.TimeUnit;
3034

@@ -44,21 +48,16 @@ public final class DataStoreConfiguration {
4448
private final DataStoreConflictHandler conflictHandler;
4549
private final Integer syncMaxRecords;
4650
private final Integer syncPageSize;
47-
private Long syncIntervalInMinutes;
48-
49-
private DataStoreConfiguration(
50-
DataStoreErrorHandler errorHandler,
51-
DataStoreConflictHandler conflictHandler,
52-
Long syncIntervalInMinutes,
53-
Integer syncMaxRecords,
54-
Integer syncPageSize) {
55-
this.errorHandler = errorHandler;
56-
this.conflictHandler = conflictHandler;
57-
this.syncMaxRecords = syncMaxRecords;
58-
this.syncPageSize = syncPageSize;
59-
if (syncIntervalInMinutes != null) {
60-
this.syncIntervalInMinutes = syncIntervalInMinutes;
61-
}
51+
private final Map<String, DataStoreSyncExpression> syncExpressions;
52+
private final Long syncIntervalInMinutes;
53+
54+
private DataStoreConfiguration(Builder builder) {
55+
this.errorHandler = builder.errorHandler;
56+
this.conflictHandler = builder.conflictHandler;
57+
this.syncMaxRecords = builder.syncMaxRecords;
58+
this.syncPageSize = builder.syncPageSize;
59+
this.syncIntervalInMinutes = builder.syncIntervalInMinutes;
60+
this.syncExpressions = builder.syncExpressions;
6261
}
6362

6463
/**
@@ -175,6 +174,16 @@ public Integer getSyncPageSize() {
175174
return this.syncPageSize;
176175
}
177176

177+
/**
178+
* Returns the Map of all {@link DataStoreSyncExpression}s used to filter data received from AppSync, either during
179+
* a sync or over the real-time subscription.
180+
* @return the Map of all {@link DataStoreSyncExpression}s.
181+
*/
182+
@NonNull
183+
public Map<String, DataStoreSyncExpression> getSyncExpressions() {
184+
return this.syncExpressions;
185+
}
186+
178187
@Override
179188
public boolean equals(@Nullable Object thatObject) {
180189
if (this == thatObject) {
@@ -196,7 +205,13 @@ public boolean equals(@Nullable Object thatObject) {
196205
if (!ObjectsCompat.equals(getSyncPageSize(), that.getSyncPageSize())) {
197206
return false;
198207
}
199-
return ObjectsCompat.equals(getSyncIntervalInMinutes(), that.getSyncIntervalInMinutes());
208+
if (!ObjectsCompat.equals(getSyncIntervalInMinutes(), that.getSyncIntervalInMinutes())) {
209+
return false;
210+
}
211+
if (!ObjectsCompat.equals(getSyncExpressions(), that.getSyncExpressions())) {
212+
return false;
213+
}
214+
return true;
200215
}
201216

202217
@Override
@@ -206,6 +221,7 @@ public int hashCode() {
206221
result = 31 * result + (getSyncMaxRecords() != null ? getSyncMaxRecords().hashCode() : 0);
207222
result = 31 * result + (getSyncPageSize() != null ? getSyncPageSize().hashCode() : 0);
208223
result = 31 * result + (getSyncIntervalInMinutes() != null ? getSyncIntervalInMinutes().hashCode() : 0);
224+
result = 31 * result + (getSyncExpressions() != null ? getSyncExpressions().hashCode() : 0);
209225
return result;
210226
}
211227

@@ -217,6 +233,7 @@ public String toString() {
217233
", syncMaxRecords=" + syncMaxRecords +
218234
", syncPageSize=" + syncPageSize +
219235
", syncIntervalInMinutes=" + syncIntervalInMinutes +
236+
", syncExpressions=" + syncExpressions +
220237
'}';
221238
}
222239

@@ -230,13 +247,15 @@ public static final class Builder {
230247
private Long syncIntervalInMinutes;
231248
private Integer syncMaxRecords;
232249
private Integer syncPageSize;
250+
private Map<String, DataStoreSyncExpression> syncExpressions;
233251
private boolean ensureDefaults;
234252
private JSONObject pluginJson;
235253
private DataStoreConfiguration userProvidedConfiguration;
236254

237255
private Builder() {
238256
this.errorHandler = DefaultDataStoreErrorHandler.instance();
239257
this.conflictHandler = DataStoreConflictHandler.alwaysApplyRemote();
258+
this.syncExpressions = new HashMap<>();
240259
this.ensureDefaults = false;
241260
}
242261

@@ -305,6 +324,23 @@ public Builder syncPageSize(@IntRange(from = 0) Integer syncPageSize) {
305324
return Builder.this;
306325
}
307326

327+
/**
328+
* Sets a sync expression for a particular model to filter which data is synced locally. The expression
329+
* is evaluated each time DataStore is started. The QueryPredicate is applied on both sync and subscriptions.
330+
* @param modelClass the model class for which the filter applies
331+
* @param syncExpression DataStoreSyncExpression that should be used to filter the data that is synced.
332+
* @return Current builder
333+
*/
334+
@NonNull
335+
public Builder syncExpression(@NonNull Class<? extends Model> modelClass,
336+
@NonNull DataStoreSyncExpression syncExpression) {
337+
this.syncExpressions.put(
338+
Objects.requireNonNull(modelClass).getSimpleName(),
339+
Objects.requireNonNull(syncExpression)
340+
);
341+
return Builder.this;
342+
}
343+
308344
private void populateSettingsFromJson() throws DataStoreException {
309345
if (pluginJson == null) {
310346
return;
@@ -356,6 +392,7 @@ private void applyUserProvidedConfiguration() {
356392
syncIntervalInMinutes);
357393
syncMaxRecords = getValueOrDefault(userProvidedConfiguration.getSyncMaxRecords(), syncMaxRecords);
358394
syncPageSize = getValueOrDefault(userProvidedConfiguration.getSyncPageSize(), syncPageSize);
395+
syncExpressions = userProvidedConfiguration.getSyncExpressions();
359396
}
360397

361398
private static <T> T getValueOrDefault(T value, T defaultValue) {
@@ -383,13 +420,7 @@ public DataStoreConfiguration build() throws DataStoreException {
383420
syncMaxRecords = getValueOrDefault(syncMaxRecords, DEFAULT_SYNC_MAX_RECORDS);
384421
syncPageSize = getValueOrDefault(syncPageSize, DEFAULT_SYNC_PAGE_SIZE);
385422
}
386-
return new DataStoreConfiguration(
387-
errorHandler,
388-
conflictHandler,
389-
syncIntervalInMinutes,
390-
syncMaxRecords,
391-
syncPageSize
392-
);
423+
return new DataStoreConfiguration(this);
393424
}
394425
}
395426

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package com.amplifyframework.datastore;
17+
18+
import com.amplifyframework.core.Action;
19+
import com.amplifyframework.core.Consumer;
20+
import com.amplifyframework.core.model.Model;
21+
import com.amplifyframework.core.model.query.predicate.QueryPredicate;
22+
23+
/**
24+
* Used to specify a QueryPredicate for a {@link Model} that will filter what is synced to the client. A DataStore
25+
* customer should provide their implementation to the {@link DataStoreConfiguration} while
26+
* constructing the DataStore plugin using {@link AWSDataStorePlugin#AWSDataStorePlugin(DataStoreConfiguration)}.
27+
*/
28+
public interface DataStoreSyncExpression {
29+
/**
30+
* This will be called each time DataStore is started. This allows the QueryPredicate to be modified at runtime,
31+
* by calling {@link DataStoreCategoryBehavior#stop(Action, Consumer)} followed by
32+
* {@link DataStoreCategoryBehavior#start(Action, Consumer)}.
33+
* @return QueryPredicate to filter what is synced.
34+
*/
35+
QueryPredicate resolvePredicate();
36+
}

aws-datastore/src/main/java/com/amplifyframework/datastore/appsync/AppSync.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,16 @@ public interface AppSync {
5454
* @param modelSchema The schema of the Model we are listening on
5555
* @param lastSync The time you last synced - all changes since this time are retrieved.
5656
* @param syncPageSize limit for number of records to return per page.
57+
* @param queryPredicate QueryPredicate to filter the records returned.
5758
* @return A {@link GraphQLRequest} for making a sync query
5859
* @throws DataStoreException on error building GraphQLRequest due to inability to obtain model schema.
5960
*/
6061
@NonNull
6162
<T extends Model> GraphQLRequest<PaginatedResult<ModelWithMetadata<T>>> buildSyncRequest(
6263
@NonNull ModelSchema modelSchema,
6364
@Nullable Long lastSync,
64-
@Nullable Integer syncPageSize
65+
@Nullable Integer syncPageSize,
66+
@NonNull QueryPredicate queryPredicate
6567
) throws DataStoreException;
6668

6769
/**

aws-datastore/src/main/java/com/amplifyframework/datastore/appsync/AppSyncClient.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,9 @@ public static AppSyncClient via(@NonNull GraphQLBehavior api) {
7474
public <T extends Model> GraphQLRequest<PaginatedResult<ModelWithMetadata<T>>> buildSyncRequest(
7575
@NonNull ModelSchema modelSchema,
7676
@Nullable Long lastSync,
77-
@Nullable Integer syncPageSize) throws DataStoreException {
78-
return AppSyncRequestFactory.buildSyncRequest(modelSchema, lastSync, syncPageSize);
77+
@Nullable Integer syncPageSize,
78+
@NonNull QueryPredicate queryPredicate) throws DataStoreException {
79+
return AppSyncRequestFactory.buildSyncRequest(modelSchema, lastSync, syncPageSize, queryPredicate);
7980
}
8081

8182
@NonNull

aws-datastore/src/main/java/com/amplifyframework/datastore/appsync/AppSyncRequestFactory.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@ private AppSyncRequestFactory() {}
8080
static <T> AppSyncGraphQLRequest<T> buildSyncRequest(
8181
@NonNull final ModelSchema modelSchema,
8282
@Nullable final Long lastSync,
83-
@Nullable final Integer limit)
83+
@Nullable final Integer limit,
84+
@NonNull final QueryPredicate predicate)
8485
throws DataStoreException {
8586
try {
8687
AppSyncGraphQLRequest.Builder builder = AppSyncGraphQLRequest.builder()
@@ -99,7 +100,10 @@ static <T> AppSyncGraphQLRequest<T> buildSyncRequest(
99100
if (limit != null) {
100101
builder.variable("limit", "Int", limit);
101102
}
102-
103+
if (!QueryPredicates.all().equals(predicate)) {
104+
String filterType = "Model" + Casing.capitalizeFirst(modelSchema.getName()) + "FilterInput";
105+
builder.variable("filter", filterType, parsePredicate(predicate));
106+
}
103107
return builder.build();
104108
} catch (AmplifyException amplifyException) {
105109
throw new DataStoreException("Failed to get fields for model.",
@@ -170,7 +174,6 @@ static <M extends Model> AppSyncGraphQLRequest<ModelWithMetadata<M>> buildCreati
170174
throw new DataStoreException("Failed to get fields for model.",
171175
amplifyException, "Validate your model file.");
172176
}
173-
174177
}
175178

176179
static Map<String, Object> parsePredicate(QueryPredicate queryPredicate) throws DataStoreException {

aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/Orchestrator.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ public final class Orchestrator {
6060
private final SubscriptionProcessor subscriptionProcessor;
6161
private final SyncProcessor syncProcessor;
6262
private final MutationProcessor mutationProcessor;
63+
private final QueryPredicateProvider queryPredicateProvider;
6364
private final StorageObserver storageObserver;
6465
private final Supplier<Mode> targetMode;
6566
private final AtomicReference<Mode> currentMode;
@@ -103,6 +104,7 @@ public Orchestrator(
103104
Merger merger = new Merger(mutationOutbox, versionRepository, localStorageAdapter);
104105
SyncTimeRegistry syncTimeRegistry = new SyncTimeRegistry(localStorageAdapter);
105106
ConflictResolver conflictResolver = new ConflictResolver(dataStoreConfigurationProvider, appSync);
107+
this.queryPredicateProvider = new QueryPredicateProvider(dataStoreConfigurationProvider);
106108

107109
this.mutationProcessor = MutationProcessor.builder()
108110
.merger(merger)
@@ -119,8 +121,14 @@ public Orchestrator(
119121
.appSync(appSync)
120122
.merger(merger)
121123
.dataStoreConfigurationProvider(dataStoreConfigurationProvider)
124+
.queryPredicateProvider(queryPredicateProvider)
122125
.build();
123-
this.subscriptionProcessor = new SubscriptionProcessor(appSync, modelProvider, merger);
126+
this.subscriptionProcessor = SubscriptionProcessor.builder()
127+
.appSync(appSync)
128+
.modelProvider(modelProvider)
129+
.merger(merger)
130+
.queryPredicateProvider(queryPredicateProvider)
131+
.build();
124132
this.storageObserver = new StorageObserver(localStorageAdapter, mutationOutbox);
125133
this.currentMode = new AtomicReference<>(Mode.STOPPED);
126134
this.targetMode = targetMode;
@@ -332,6 +340,11 @@ private Completable startApiSync() {
332340
return Completable.create(emitter -> {
333341
LOG.info("Starting API synchronization mode.");
334342

343+
// Resolve any client provided DataStoreSyncExpressions, before starting sync and subscriptions, once each
344+
// time DataStore starts. The QueryPredicateProvider caches the resolved QueryPredicates, which are then
345+
// used to filter data received from AppSync.
346+
queryPredicateProvider.resolvePredicates();
347+
335348
subscriptionProcessor.startSubscriptions();
336349

337350
LOG.debug("About to hydrate...");
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package com.amplifyframework.datastore.syncengine;
17+
18+
import androidx.annotation.NonNull;
19+
import androidx.core.util.Pair;
20+
21+
import com.amplifyframework.core.model.Model;
22+
import com.amplifyframework.core.model.query.predicate.QueryPredicate;
23+
import com.amplifyframework.core.model.query.predicate.QueryPredicates;
24+
import com.amplifyframework.datastore.DataStoreConfiguration;
25+
import com.amplifyframework.datastore.DataStoreConfigurationProvider;
26+
import com.amplifyframework.datastore.DataStoreException;
27+
import com.amplifyframework.datastore.DataStoreSyncExpression;
28+
29+
import java.util.HashMap;
30+
import java.util.Map;
31+
import java.util.Objects;
32+
33+
import io.reactivex.rxjava3.core.Observable;
34+
35+
/**
36+
* Maintains a Map of QueryPredicates for each {@link Model} for the current DataStore session.
37+
*/
38+
final class QueryPredicateProvider {
39+
private final DataStoreConfigurationProvider dataStoreConfigurationProvider;
40+
private final Map<String, QueryPredicate> predicateMap = new HashMap<>();
41+
42+
/**
43+
* Constructs a QueryPredicateProvider.
44+
* @param dataStoreConfigurationProvider a DataStoreConfigurationProvider.
45+
*/
46+
QueryPredicateProvider(DataStoreConfigurationProvider dataStoreConfigurationProvider) {
47+
this.dataStoreConfigurationProvider = dataStoreConfigurationProvider;
48+
}
49+
50+
/**
51+
* Evaluates any client provided {@link DataStoreSyncExpression}s and caches the resolved {@link QueryPredicate}s
52+
* for use in the current DataStore session. These are used to filter data received from AppSync, either during a
53+
* sync or over the real-time subscription. This is called once each time DataStore is started.
54+
*
55+
* @throws DataStoreException on error obtaining the {@link DataStoreConfiguration}.
56+
*/
57+
public void resolvePredicates() throws DataStoreException {
58+
Map<String, DataStoreSyncExpression> expressions =
59+
dataStoreConfigurationProvider.getConfiguration().getSyncExpressions();
60+
predicateMap.clear();
61+
predicateMap.putAll(Observable.fromIterable(expressions.entrySet())
62+
.map(entry -> Pair.create(entry.getKey(), entry.getValue().resolvePredicate()))
63+
.toMap(pair -> pair.first, pair -> pair.second)
64+
.blockingGet());
65+
}
66+
67+
/**
68+
* Returns the {@link QueryPredicate} for the given modelName.
69+
* @param modelName name of the {@link Model}.
70+
* @return the {@link QueryPredicate} for the given modelName.
71+
*/
72+
@NonNull
73+
public QueryPredicate getPredicate(@NonNull String modelName) {
74+
QueryPredicate predicate = predicateMap.get(Objects.requireNonNull(modelName));
75+
if (predicate == null) {
76+
predicate = QueryPredicates.all();
77+
}
78+
return predicate;
79+
}
80+
}

0 commit comments

Comments
 (0)