Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add ValueStream.isReplayValueStream #784

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions examples/flutter/github_search/lib/bloc/search_bloc.dart
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import 'dart:async';

import 'package:rxdart_ext/state_stream.dart';
import 'package:rxdart/rxdart.dart';

import '../api/github_api.dart';
import 'search_state.dart';

class SearchBloc {
final Sink<String> onTextChanged;
final StateStream<SearchState> state;
final ValueStream<SearchState> state;

final StreamSubscription<void> _subscription;

Expand All @@ -25,7 +25,7 @@ class SearchBloc {
// to the View.
.switchMap<SearchState>((String term) => _search(term, api))
// The initial state to deliver to the screen.
.publishState(const SearchNoTerm());
.publishValueSeeded(const SearchNoTerm());

final subscription = state.connect();

Expand Down
5 changes: 2 additions & 3 deletions examples/flutter/github_search/lib/search_screen.dart
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ class SearchScreen extends StatefulWidget {
const SearchScreen({super.key, required this.api});

@override
SearchScreenState createState() {
return SearchScreenState();
}
SearchScreenState createState() => SearchScreenState();
}

class SearchScreenState extends State<SearchScreen> {
Expand All @@ -47,6 +45,7 @@ class SearchScreenState extends State<SearchScreen> {
Widget build(BuildContext context) {
return ValueStreamBuilder<SearchState>(
stream: bloc.state,
buildWhen: (previous, current) => previous != current,
builder: (context, state, child) {
return Scaffold(
appBar: AppBar(
Expand Down
16 changes: 0 additions & 16 deletions examples/flutter/github_search/pubspec.lock
Original file line number Diff line number Diff line change
Expand Up @@ -174,14 +174,6 @@ packages:
url: "https://pub.dev"
source: hosted
version: "3.0.6"
dart_either:
dependency: transitive
description:
name: dart_either
sha256: "928895b8266ac5906eb4e2993fead563a73b17fc86eec6b40172100d56ca2507"
url: "https://pub.dev"
source: hosted
version: "1.0.0"
dart_style:
dependency: transitive
description:
Expand Down Expand Up @@ -463,14 +455,6 @@ packages:
relative: true
source: path
version: "0.28.0"
rxdart_ext:
dependency: "direct main"
description:
name: rxdart_ext
sha256: "95df7e8b13140e2c3fdb3b943569a51f18090e82aaaf6ca6e8e6437e434a6fb0"
url: "https://pub.dev"
source: hosted
version: "0.3.0"
rxdart_flutter:
dependency: "direct main"
description:
Expand Down
1 change: 0 additions & 1 deletion examples/flutter/github_search/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ dependencies:
path: ../../../packages/rxdart_flutter
http: ^0.13.3
flutter_spinkit: ^5.1.0
rxdart_ext: ^0.3.0
equatable: ^2.0.5

dev_dependencies:
Expand Down
3 changes: 3 additions & 0 deletions packages/rxdart/lib/src/streams/connectable_stream.dart
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ class ValueConnectableStream<T>

@override
StreamNotification<T>? get lastEventOrNull => _subject.lastEventOrNull;

@override
bool get isReplayValueStream => _subject.isReplayValueStream;
}

/// A [ConnectableStream] that converts a single-subscription Stream into
Expand Down
4 changes: 3 additions & 1 deletion packages/rxdart/lib/src/streams/replay_stream.dart
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import 'package:rxdart/src/utils/collection_extensions.dart';
import 'package:rxdart/src/utils/error_and_stacktrace.dart';

/// An [Stream] that provides synchronous access to the emitted values
/// A [Stream] that provides synchronous access to the emitted values,
/// and replays its emitted events (data and error events)
/// to new listeners when they subscribe.
abstract class ReplayStream<T> implements Stream<T> {
/// Synchronously get the values stored in Subject. May be empty.
List<T> get values;
Expand Down
6 changes: 6 additions & 0 deletions packages/rxdart/lib/src/streams/value_stream.dart
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ abstract class ValueStream<T> implements Stream<T> {
/// Returns the last emitted event (either data/value or error event).
/// `null` if no value or error events have been emitted yet.
StreamNotification<T>? get lastEventOrNull;

/// Returns `true` if this [ValueStream] replays its last emitted event
/// (either a value/data event or an error event) to new listeners when they subscribe.
///
/// See also [lastEventOrNull].
bool get isReplayValueStream;
}

/// Extension methods on [ValueStream] related to [lastEventOrNull].
Expand Down
6 changes: 6 additions & 0 deletions packages/rxdart/lib/src/subjects/behavior_subject.dart
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ class BehaviorSubject<T> extends Subject<T> implements ValueStream<T> {
// no event
return null;
}

@override
bool get isReplayValueStream => true;
}

class _Wrapper<T> {
Expand Down Expand Up @@ -272,4 +275,7 @@ class _BehaviorSubjectStream<T> extends Stream<T> implements ValueStream<T> {

@override
StreamNotification<T>? get lastEventOrNull => _subject.lastEventOrNull;

@override
bool get isReplayValueStream => _subject.isReplayValueStream;
}
9 changes: 9 additions & 0 deletions packages/rxdart/test/subject/behavior_subject_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -1471,5 +1471,14 @@ void main() {
);
});
});

test('isReplayValueStream', () {
expect(BehaviorSubject<int>().isReplayValueStream, isTrue);
expect(BehaviorSubject<int>.seeded(42).isReplayValueStream, isTrue);

expect(BehaviorSubject<int>().stream.isReplayValueStream, isTrue);
expect(
BehaviorSubject<int>.seeded(42).stream.isReplayValueStream, isTrue);
});
});
}
15 changes: 9 additions & 6 deletions packages/rxdart_flutter/lib/src/value_stream_builder.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import 'package:flutter/foundation.dart';
import 'package:rxdart/rxdart.dart';
import 'package:flutter/widgets.dart';
import 'package:rxdart/rxdart.dart';

import 'errors.dart';
import 'value_stream_listener.dart';
Expand Down Expand Up @@ -94,7 +94,7 @@ class ValueStreamBuilder<T> extends StatefulWidget {
required this.builder,
this.buildWhen,
this.child,
this.isReplayValueStream = true,
this.isReplayValueStream,
}) : super(key: key);

/// The [ValueStream] that the [ValueStreamBuilder] will interact with.
Expand Down Expand Up @@ -122,9 +122,12 @@ class ValueStreamBuilder<T> extends StatefulWidget {

/// Whether or not the [stream] emits the last value
/// like [BehaviorSubject] does.
/// See [ValueStream.isReplayValueStream] for more information.
///
/// If this argument is `null`, the [ValueStream.isReplayValueStream] is used instead.
///
/// Defaults to `true`.
final bool isReplayValueStream;
/// Defaults to `null`.
final bool? isReplayValueStream;

@override
State<ValueStreamBuilder<T>> createState() => _ValueStreamBuilderState();
Expand All @@ -134,8 +137,8 @@ class ValueStreamBuilder<T> extends StatefulWidget {
super.debugFillProperties(properties);
properties
..add(DiagnosticsProperty<ValueStream<T>>('stream', stream))
..add(
DiagnosticsProperty<bool>('isReplayValueStream', isReplayValueStream))
..add(DiagnosticsProperty<bool>('isReplayValueStream',
isReplayValueStream ?? stream.isReplayValueStream))
..add(ObjectFlagProperty<ValueStreamBuilderCondition<T>?>.has(
'buildWhen',
buildWhen,
Expand Down
13 changes: 8 additions & 5 deletions packages/rxdart_flutter/lib/src/value_stream_consumer.dart
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class ValueStreamConsumer<T> extends StatefulWidget {
required this.builder,
this.buildWhen,
this.child,
this.isReplayValueStream = true,
this.isReplayValueStream,
}) : super(key: key);

/// The [ValueStream] that the [ValueStreamConsumer] will interact with.
Expand Down Expand Up @@ -117,9 +117,12 @@ class ValueStreamConsumer<T> extends StatefulWidget {

/// Whether or not the [stream] emits the last value
/// like [BehaviorSubject] does.
/// See [ValueStream.isReplayValueStream] for more information.
///
/// Defaults to `true`.
final bool isReplayValueStream;
/// If this argument is `null`, the [ValueStream.isReplayValueStream] is used instead.
///
/// Defaults to `null`.
final bool? isReplayValueStream;

@override
State<ValueStreamConsumer<T>> createState() => _ValueStreamConsumerState<T>();
Expand All @@ -129,8 +132,8 @@ class ValueStreamConsumer<T> extends StatefulWidget {
super.debugFillProperties(properties);
properties
..add(DiagnosticsProperty<ValueStream<T>>('stream', stream))
..add(
DiagnosticsProperty<bool>('isReplayValueStream', isReplayValueStream))
..add(DiagnosticsProperty<bool>('isReplayValueStream',
isReplayValueStream ?? stream.isReplayValueStream))
..add(ObjectFlagProperty<ValueStreamWidgetBuilder<T>>.has(
'builder',
builder,
Expand Down
35 changes: 23 additions & 12 deletions packages/rxdart_flutter/lib/src/value_stream_listener.dart
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class ValueStreamListener<T> extends StatefulWidget {
required this.stream,
required this.listener,
required this.child,
this.isReplayValueStream = true,
this.isReplayValueStream,
}) : super(key: key);

/// The [ValueStream] that the [ValueStreamConsumer] will interact with.
Expand All @@ -67,9 +67,12 @@ class ValueStreamListener<T> extends StatefulWidget {

/// Whether or not the [stream] emits the last value
/// like [BehaviorSubject] does.
/// See [ValueStream.isReplayValueStream] for more information.
///
/// Defaults to `true`.
final bool isReplayValueStream;
/// If this argument is `null`, the [ValueStream.isReplayValueStream] is used instead.
///
/// Defaults to `null`.
final bool? isReplayValueStream;

@override
State<ValueStreamListener<T>> createState() => _ValueStreamListenerState<T>();
Expand All @@ -79,8 +82,8 @@ class ValueStreamListener<T> extends StatefulWidget {
super.debugFillProperties(properties);
properties
..add(DiagnosticsProperty<ValueStream<T>>('stream', stream))
..add(
DiagnosticsProperty<bool>('isReplayValueStream', isReplayValueStream))
..add(DiagnosticsProperty<bool>('isReplayValueStream',
isReplayValueStream ?? stream.isReplayValueStream))
..add(ObjectFlagProperty<ValueStreamWidgetListener<T>>.has(
'listener', listener))
..add(ObjectFlagProperty<Widget>.has('child', child));
Expand Down Expand Up @@ -127,21 +130,28 @@ class _ValueStreamListenerState<T> extends State<ValueStreamListener<T>> {
_currentValue = stream.value;
}

final int skipCount;

if (widget.isReplayValueStream) {
skipCount = _initialized ? 0 : 1;
if (widget.isReplayValueStream ?? stream.isReplayValueStream) {
final skipCount = _initialized ? 0 : 1;
_subscribeIfNeeded(skipCount > 0 ? stream.skip(skipCount) : stream);
} else {
skipCount = 0;
if (_initialized) {
_ambiguate(WidgetsBinding.instance)!.addPostFrameCallback((_) {
if (widget.stream != stream) {
return;
}
_notifyListener(stream.value);
_subscribeIfNeeded(stream);
});
} else {
_subscribeIfNeeded(stream);
}
}
}

final streamToListen = skipCount > 0 ? stream.skip(skipCount) : stream;

void _subscribeIfNeeded(Stream<T> streamToListen) {
if (_subscription != null) {
return;
}
_subscription = streamToListen.listen(
(value) {
if (!mounted) return;
Expand All @@ -164,6 +174,7 @@ class _ValueStreamListenerState<T> extends State<ValueStreamListener<T>> {

void _unsubscribe() {
_subscription?.cancel();
_subscription = null;
}

@override
Expand Down
5 changes: 5 additions & 0 deletions packages/rxdart_flutter/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ dev_dependencies:
sdk: flutter
flutter_lints: ^1.0.4

# TODO: Remove this
dependency_overrides:
rxdart:
path: ../rxdart

topics:
- rxdart
- reactive-programming
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ class ValueSubject<T> extends Subject<T> implements ValueStream<T> {
// no event
return null;
}

@override
bool get isReplayValueStream => false;
}

class _ValueSubjectStream<T> extends Stream<T> implements ValueStream<T> {
Expand Down Expand Up @@ -184,6 +187,9 @@ class _ValueSubjectStream<T> extends Stream<T> implements ValueStream<T> {

@override
T? get valueOrNull => _subject.valueOrNull;

@override
bool get isReplayValueStream => _subject.isReplayValueStream;
}

/// Class that holds latest value and lasted error emitted from Stream.
Expand Down