Skip to content

Commit 2469efb

Browse files
authored
Merge pull request #35 from SpineEventEngine/async-subscriptions
Async subscriptions
2 parents 68b7e53 + ed41332 commit 2469efb

File tree

7 files changed

+159
-115
lines changed

7 files changed

+159
-115
lines changed

client/lib/client.dart

Lines changed: 98 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ class Client {
250250
void cancelAllSubscriptions() {
251251
for (Subscription subscription in _activeSubscriptions) {
252252
subscription.unsubscribe();
253-
subscription.subscription.then(_cancel);
253+
_cancel(subscription.subscription);
254254
}
255255
_activeSubscriptions.clear();
256256
}
@@ -266,19 +266,19 @@ class Client {
266266
});
267267
}
268268

269-
EventSubscription<E>
269+
Future<EventSubscription<E>>
270270
_subscribeToEvents<E extends GeneratedMessage>(pbSubscription.Topic topic) {
271271
return _subscribe(topic, (s, d) => EventSubscription.of(s, d));
272272
}
273273

274-
StateSubscription<S>
274+
Future<StateSubscription<S>>
275275
_subscribeToStateUpdates<S extends GeneratedMessage>(pbSubscription.Topic topic,
276276
BuilderInfo builderInfo) {
277277
return _subscribe(topic, (s, d) => StateSubscription.of(s, builderInfo, d));
278278
}
279279

280-
S _subscribe<S extends Subscription>(pbSubscription.Topic topic,
281-
_CreateSubscription<S> newSubscription) {
280+
Future<S> _subscribe<S extends Subscription>(pbSubscription.Topic topic,
281+
_CreateSubscription<S> newSubscription) {
282282
if (_firebase == null) {
283283
throw StateError('Cannot create a subscription. No Firebase client is provided.');
284284
}
@@ -287,10 +287,11 @@ class Client {
287287
if (builder == null) {
288288
throw ArgumentError.value(topic, 'topic', 'Target type `$targetTypeUrl` is unknown.');
289289
}
290-
var fbSubscription = _httpClient
290+
var subscription = _httpClient
291291
.postMessage(_endpoints.subscription.create, topic)
292-
.then(_parseFirebaseSubscription);
293-
return newSubscription(fbSubscription, _firebase!);
292+
.then(_parseFirebaseSubscription)
293+
.then((value) => newSubscription(value, _firebase!));
294+
return subscription;
294295
}
295296

296297
FirebaseSubscription _parseFirebaseSubscription(http.Response response) {
@@ -311,10 +312,10 @@ class Client {
311312
void _refreshSubscription(Subscription subscription) {
312313
var subscriptionMessage = subscription.subscription;
313314
if (subscription.closed) {
314-
subscriptionMessage.then(_cancel);
315+
_cancel(subscriptionMessage);
315316
_activeSubscriptions.remove(subscription);
316317
} else {
317-
subscriptionMessage.then(_keepUp);
318+
_keepUp(subscriptionMessage);
318319
}
319320
}
320321

@@ -330,69 +331,111 @@ class Client {
330331
/// A function which accepts a future of `FirebaseSubscription` and a firebase client and creates
331332
/// an instance of [Subscription].
332333
typedef _CreateSubscription<S extends Subscription> =
333-
S Function(Future<FirebaseSubscription>, FirebaseClient);
334+
S Function(FirebaseSubscription, FirebaseClient);
335+
336+
/// A simple or a composite field filter.
337+
///
338+
abstract class FilterOrComposite {
339+
340+
/// Obtains the Protobuf `CompositeFilter` representing this filter.
341+
CompositeFilter _toProto();
342+
}
343+
344+
/// A composite field filter.
345+
///
346+
class Composite implements FilterOrComposite {
347+
348+
final CompositeFilter filter;
349+
350+
Composite._(this.filter);
351+
352+
@override
353+
CompositeFilter _toProto() {
354+
return filter;
355+
}
356+
}
357+
358+
/// A simple field filter.
359+
///
360+
/// A single simple filter is represented as a `CompositeFilter` by wrapping it into the composite
361+
/// filter with the `AND` operator.
362+
///
363+
class SimpleFilter implements FilterOrComposite {
364+
365+
final Filter filter;
366+
367+
SimpleFilter._(this.filter);
368+
369+
@override
370+
CompositeFilter _toProto() {
371+
return CompositeFilter()
372+
..filter.add(filter)
373+
..operator = CompositeFilter_CompositeOperator.ALL
374+
..freeze();
375+
}
376+
}
334377

335378
/// Creates a composite filter which groups one or more field filters with the `ALL` operator.
336379
///
337380
/// All the field filters should pass in order for the composite filter to pass.
338381
///
339-
CompositeFilter all(Iterable<Filter> filters) {
382+
Composite all(Iterable<SimpleFilter> filters) {
340383
ArgumentError.checkNotNull(filters);
341-
return CompositeFilter()
384+
return Composite._(CompositeFilter()
342385
..operator = CompositeFilter_CompositeOperator.ALL
343-
..filter.addAll(filters)
344-
..freeze();
386+
..filter.addAll(filters.map((f) => f.filter))
387+
..freeze());
345388
}
346389

347390
/// Creates a composite filter which groups one or more field filters with the `EITHER` operator.
348391
///
349392
/// At least one field filter should pass in order for the composite filter to pass.
350393
///
351-
CompositeFilter either(Iterable<Filter> filters) {
394+
Composite either(Iterable<SimpleFilter> filters) {
352395
ArgumentError.checkNotNull(filters);
353-
return CompositeFilter()
396+
return Composite._(CompositeFilter()
354397
..operator = CompositeFilter_CompositeOperator.EITHER
355-
..filter.addAll(filters)
356-
..freeze();
398+
..filter.addAll(filters.map((f) => f.filter))
399+
..freeze());
357400
}
358401

359402
/// Creates a field filter with the `=` operator.
360-
Filter eq(String fieldPath, Object value) =>
403+
SimpleFilter eq(String fieldPath, Object value) =>
361404
_filter(fieldPath, Filter_Operator.EQUAL, value);
362405

363406
/// Creates a field filter with the `<=` operator.
364-
Filter le(String fieldPath, Object value) =>
407+
SimpleFilter le(String fieldPath, Object value) =>
365408
_filter(fieldPath, Filter_Operator.LESS_OR_EQUAL, value);
366409

367410
/// Creates a field filter with the `>=` operator.
368-
Filter ge(String fieldPath, Object value) =>
411+
SimpleFilter ge(String fieldPath, Object value) =>
369412
_filter(fieldPath, Filter_Operator.GREATER_OR_EQUAL, value);
370413

371414
/// Creates a field filter with the `<` operator.
372-
Filter lt(String fieldPath, Object value) =>
415+
SimpleFilter lt(String fieldPath, Object value) =>
373416
_filter(fieldPath, Filter_Operator.LESS_THAN, value);
374417

375418
/// Creates a field filter with the `>` operator.
376-
Filter gt(String fieldPath, Object value) =>
419+
SimpleFilter gt(String fieldPath, Object value) =>
377420
_filter(fieldPath, Filter_Operator.GREATER_THAN, value);
378421

379-
Filter _filter(String fieldPath, Filter_Operator operator, Object value) {
422+
SimpleFilter _filter(String fieldPath, Filter_Operator operator, Object value) {
380423
ArgumentError.checkNotNull(fieldPath);
381424
ArgumentError.checkNotNull(value);
382425
var pathElements = fieldPath.split('.');
383-
return Filter()
426+
return SimpleFilter._(Filter()
384427
..fieldPath = (FieldPath()..fieldName.addAll(pathElements))
385428
..operator = operator
386429
..value = packObject(value)
387-
..freeze();
430+
..freeze());
388431
}
389432

390433
/// A request to the server to post a command.
391434
class CommandRequest<M extends GeneratedMessage> {
392435

393436
final Client _client;
394437
final Command _command;
395-
final List<EventSubscription> _subscriptions = [];
438+
final List<Future<EventSubscription>> _futureSubscriptions = [];
396439

397440
CommandRequest._(this._client, M command) :
398441
_command = _client._requests.command().create(command);
@@ -402,11 +445,15 @@ class CommandRequest<M extends GeneratedMessage> {
402445
/// Events down the line, i.e. events produced as the result of other messages which where
403446
/// produced as the result of this command, do not match this subscription.
404447
///
405-
EventSubscription<E> observeEvents<E extends GeneratedMessage>() {
448+
/// When the resulting future completes, the subscription is guaranteed to be created.
449+
/// Also, when the future created in `post(..)` completes, the all subscriptions created within
450+
/// the same `CommandRequest` are guaranteed to have completed.
451+
///
452+
Future<EventSubscription<E>> observeEvents<E extends GeneratedMessage>() {
406453
var subscription = _client.subscribeToEvents<E>()
407-
.where(all([eq('context.past_message', _commandAsOrigin())]))
454+
.where(eq('context.past_message', _commandAsOrigin()))
408455
.post();
409-
_subscriptions.add(subscription);
456+
_futureSubscriptions.add(subscription);
410457
return subscription;
411458
}
412459

@@ -424,19 +471,22 @@ class CommandRequest<M extends GeneratedMessage> {
424471
/// Fails if there are no event subscriptions to monitor the command execution. If this is
425472
/// the desired behaviour, use `CommandRequest.postAndForget(..)`.
426473
///
474+
/// When the command is sent, the event subscriptions created within this request
475+
/// are guaranteed to be active.
476+
///
427477
/// Returns a future which completes when the request is sent. If there was a network problem,
428478
/// the future, completes with an error.
429479
///
430480
/// If the server rejects the command with an error and the [onError] callback is set,
431481
/// the callback will be triggered with the error. Otherwise, the error is silently ignored.
432482
///
433483
Future<void> post({CommandErrorCallback? onError}) {
434-
if (_subscriptions.isEmpty) {
484+
if (_futureSubscriptions.isEmpty) {
435485
throw StateError('Use `observeEvents(..)` or `observeEventsWithContexts(..)` to observe'
436486
' command results or call `postAndForget()` instead of `post()` if you observe'
437487
' command results elsewhere.');
438488
}
439-
return Future.wait(_subscriptions.map((s) => s.subscription))
489+
return Future.wait(_futureSubscriptions)
440490
.then((_) => _client._postCommand(_command, onError));
441491
}
442492

@@ -452,7 +502,7 @@ class CommandRequest<M extends GeneratedMessage> {
452502
/// the callback will be triggered with the error. Otherwise, the error is silently ignored.
453503
///
454504
Future<void> postAndForget({CommandErrorCallback? onError}) {
455-
if (_subscriptions.isNotEmpty) {
505+
if (_futureSubscriptions.isNotEmpty) {
456506
throw StateError('Use `post()` to add event subscriptions.');
457507
}
458508
return _client._postCommand(_command, onError);
@@ -502,9 +552,9 @@ class QueryRequest<M extends GeneratedMessage> {
502552
/// an entity state should pass all of the composite filters to be included in the query
503553
/// results.
504554
///
505-
QueryRequest<M> where(CompositeFilter filter) {
555+
QueryRequest<M> where(FilterOrComposite filter) {
506556
ArgumentError.checkNotNull(filter, 'filter');
507-
_filters.add(filter);
557+
_filters.add(filter._toProto());
508558
return this;
509559
}
510560

@@ -585,9 +635,9 @@ class StateSubscriptionRequest<M extends GeneratedMessage> {
585635
/// If called multiple times, the composite filters are composed with the `ALL` operator, i.e.
586636
/// an entity state should pass all of the composite filters to match the subscription.
587637
///
588-
StateSubscriptionRequest<M> where(CompositeFilter filter) {
638+
StateSubscriptionRequest<M> where(FilterOrComposite filter) {
589639
ArgumentError.checkNotNull(filter, 'filter');
590-
_filters.add(filter);
640+
_filters.add(filter._toProto());
591641
return this;
592642
}
593643

@@ -605,7 +655,10 @@ class StateSubscriptionRequest<M extends GeneratedMessage> {
605655

606656
/// Asynchronously sends this request to the server.
607657
///
608-
StateSubscription<M> post() {
658+
/// The subscription is guaranteed to have been created on server when the resulting future
659+
/// completes.
660+
///
661+
Future<StateSubscription<M>> post() {
609662
var topic = _client._requests.topic().withFilters(_type, ids: _ids, filters: _filters);
610663
var builderInfo = theKnownTypes.findBuilderInfo(theKnownTypes.typeUrlFrom(_type))!;
611664
return _client._subscribeToStateUpdates(topic, builderInfo);
@@ -629,15 +682,18 @@ class EventSubscriptionRequest<M extends GeneratedMessage> {
629682
/// If called multiple times, the composite filters are composed with the `ALL` operator, i.e.
630683
/// an event should pass all of the composite filters to match the subscription.
631684
///
632-
EventSubscriptionRequest<M> where(CompositeFilter filter) {
685+
EventSubscriptionRequest<M> where(FilterOrComposite filter) {
633686
ArgumentError.checkNotNull(filter, 'filter');
634-
_filers.add(filter);
687+
_filers.add(filter._toProto());
635688
return this;
636689
}
637690

638691
/// Asynchronously sends this request to the server.
639692
///
640-
EventSubscription<M> post() {
693+
/// The subscription is guaranteed to have been created on server when the resulting future
694+
/// completes.
695+
///
696+
Future<EventSubscription<M>> post() {
641697
var topic = _client._requests.topic().withFilters(_type, filters: _filers);
642698
return _client._subscribeToEvents(topic);
643699
}

client/lib/subscription.dart

Lines changed: 16 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -41,20 +41,15 @@ class Subscription<T extends GeneratedMessage> {
4141
///
4242
/// Completes when the subscription is created.
4343
///
44-
final Future<pb.Subscription> subscription;
44+
final pb.Subscription subscription;
4545

4646
final Stream<T> _itemAdded;
4747

4848
bool _closed;
4949

5050
Subscription._(this.subscription, Stream<T> itemAdded)
5151
: _itemAdded = _checkBroadcast(itemAdded),
52-
_closed = false {
53-
subscription.catchError((e) {
54-
unsubscribe();
55-
return pb.Subscription.getDefault();
56-
});
57-
}
52+
_closed = false;
5853

5954
/// Shows if this subscription is still active or already closed.
6055
bool get closed => _closed;
@@ -86,7 +81,7 @@ class StateSubscription<T extends GeneratedMessage> extends Subscription<T> {
8681

8782
bool _closed;
8883

89-
StateSubscription._(Future<pb.Subscription> subscription,
84+
StateSubscription._(pb.Subscription subscription,
9085
Stream<T> itemAdded,
9186
Stream<T> itemChanged,
9287
Stream<T> itemRemoved):
@@ -96,19 +91,18 @@ class StateSubscription<T extends GeneratedMessage> extends Subscription<T> {
9691
super._(subscription, itemAdded);
9792

9893
/// Creates a new instance which broadcasts updates from under the given Firebase node.
99-
factory StateSubscription.of(Future<FirebaseSubscription> firebaseSubscription,
94+
factory StateSubscription.of(FirebaseSubscription firebaseSubscription,
10095
BuilderInfo builderInfoForType,
10196
FirebaseClient database) {
102-
var subscription = firebaseSubscription.then((value) => value.subscription);
103-
var nodePath = _nodePath(firebaseSubscription);
104-
var itemAdded = _nodePathStream(nodePath)
105-
.asyncExpand((element) => database.childAdded(element))
97+
var subscription = firebaseSubscription.subscription;
98+
var nodePath = firebaseSubscription.nodePath.value;
99+
var itemAdded = database
100+
.childAdded(nodePath)
106101
.map((json) => parseIntoNewInstance<T>(builderInfoForType, json));
107-
var itemChanged = _nodePathStream(nodePath)
108-
.asyncExpand((element) => database.childChanged(element))
102+
var itemChanged = database
103+
.childChanged(nodePath)
109104
.map((json) => parseIntoNewInstance<T>(builderInfoForType, json));
110-
var itemRemoved = _nodePathStream(nodePath)
111-
.asyncExpand((element) => database.childRemoved(element))
105+
var itemRemoved = database.childRemoved(nodePath)
112106
.map((json) => parseIntoNewInstance<T>(builderInfoForType, json));
113107
return StateSubscription._(subscription, itemAdded, itemChanged, itemRemoved);
114108
}
@@ -127,14 +121,14 @@ class EventSubscription<T extends GeneratedMessage> extends Subscription<Event>
127121

128122
static final BuilderInfo _eventBuilderInfo = Event.getDefault().info_;
129123

130-
EventSubscription._(Future<pb.Subscription> subscription, Stream<Event> itemAdded) :
124+
EventSubscription._(pb.Subscription subscription, Stream<Event> itemAdded) :
131125
super._(subscription, itemAdded);
132126

133-
factory EventSubscription.of(Future<FirebaseSubscription> firebaseSubscription,
127+
factory EventSubscription.of(FirebaseSubscription firebaseSubscription,
134128
FirebaseClient database) {
135-
var subscription = firebaseSubscription.then((value) => value.subscription);
136-
var itemAdded = _nodePathStream(_nodePath(firebaseSubscription))
137-
.asyncExpand((path) => database.childAdded(path))
129+
var subscription = firebaseSubscription.subscription;
130+
var nodePath = firebaseSubscription.nodePath.value;
131+
var itemAdded = database.childAdded(nodePath)
138132
.map((json) => parseIntoNewInstance<Event>(_eventBuilderInfo, json));
139133
return EventSubscription._(subscription, itemAdded);
140134
}
@@ -147,13 +141,6 @@ class EventSubscription<T extends GeneratedMessage> extends Subscription<Event>
147141
.map((event) => unpack(event.message) as T);
148142
}
149143

150-
Future<String> _nodePath(Future<FirebaseSubscription> subscription) =>
151-
subscription
152-
.then((value) => value.nodePath.value);
153-
154-
Stream<String> _nodePathStream(Future<String> futureValue) =>
155-
futureValue.asStream().asBroadcastStream();
156-
157144
Stream<T> _checkBroadcast<T>(Stream<T> stream) {
158145
if (!stream.isBroadcast) {
159146
throw ArgumentError(

0 commit comments

Comments
 (0)