Skip to content

Commit 4fa0a1a

Browse files
committed
message: Consider unsubscribed channels in reconcileMessages
This fixes the "fourth buggy behavior" in #1798: #1798 (comment) Fixes-partly: #1798
1 parent e600864 commit 4fa0a1a

File tree

3 files changed

+211
-26
lines changed

3 files changed

+211
-26
lines changed

lib/model/message.dart

Lines changed: 76 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -409,22 +409,59 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage
409409
// from the event queue, so there's inherently a race.
410410
//
411411
// If the fetched message reflects changes we haven't yet heard from the
412-
// event queue, then it doesn't much matter which version we use: we'll
413-
// soon get the corresponding events and apply the changes anyway.
414-
// But if it lacks changes we've already heard from the event queue, then
415-
// we won't hear those events again; the only way to wind up with an
412+
// event queue, then normally (see [1]) it doesn't much matter which version
413+
// we use: we'll soon get the corresponding events and apply the changes
414+
// anyway. But if it lacks changes we've already heard from the event queue,
415+
// then we won't hear those events again; the only way to wind up with an
416416
// updated message is to use the version we have, that already reflects
417-
// those events' changes. So we always stick with the version we have.
417+
// those events' changes. So we always stick with the version we have. [1]
418+
//
419+
// [1] With one exception: if the version we have was in an unsubscribed
420+
// channel when we got it or sometime since, we take the fetched version
421+
// instead. That's because our version might be stale; we don't expect
422+
// update events for messages in unsubscribed channels.
418423
for (int i = 0; i < messages.length; i++) {
419424
final message = messages[i];
420-
messages[i] = this.messages.putIfAbsent(message.id, () {
421-
message.matchContent = null;
422-
message.matchTopic = null;
423-
return message;
424-
});
425+
426+
// Whether we had marked the version we have (if any) as potentially
427+
// stale; see [1] above.
428+
final ourVersionMightBeStale =
429+
message is StreamMessage
430+
// Side effect: update our "potentially stale" knowledge based on
431+
// whether the channel is subscribed right now.
432+
&& (subscriptions[message.streamId] != null
433+
? _maybeStaleChannelMessages.remove(message.id)
434+
: !_maybeStaleChannelMessages.add(message.id));
435+
436+
if (ourVersionMightBeStale) {
437+
this.messages[message.id] = _stripMatchFields(message);
438+
} else {
439+
messages[i] = this.messages.putIfAbsent(message.id, () {
440+
return _stripMatchFields(message);
441+
});
442+
}
425443
}
426444
}
427445

446+
/// Messages in [messages] whose data stream is or was presumably broken
447+
/// by the message being in an unsubscribed channel.
448+
///
449+
/// We don't expect update events for messages in unsubscribed channels,
450+
/// so if some of these maybe-stale messages appear in a fetch,
451+
/// we'll always clobber our stored version with the fetched version.
452+
/// See implementation comment in [reconcileMessages].
453+
///
454+
/// (We have seen a few such events, actually --
455+
/// maybe because the channel was recently subscribed? --
456+
/// but not consistently, and we're not supposed to rely on them.)
457+
final Set<int> _maybeStaleChannelMessages = {};
458+
459+
Message _stripMatchFields(Message message) {
460+
message.matchContent = null;
461+
message.matchTopic = null;
462+
return message;
463+
}
464+
428465
@override
429466
bool? getEditMessageErrorStatus(int messageId) {
430467
assert(!_disposed);
@@ -489,6 +526,19 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage
489526
);
490527
}
491528

529+
void handleSubscriptionRemoveEvent(SubscriptionRemoveEvent event) {
530+
final channelIds = event.streamIds.length > 1
531+
? Set<int>.from(event.streamIds) // optimization
532+
: event.streamIds;
533+
534+
// Linear in [messages].
535+
final affectedKnownMessageIds = messages.values
536+
.where((message) => message is StreamMessage && channelIds.contains(message.streamId))
537+
.map((message) => message.id);
538+
539+
_maybeStaleChannelMessages.addAll(affectedKnownMessageIds);
540+
}
541+
492542
void handleUserTopicEvent(UserTopicEvent event) {
493543
for (final view in _messageListViews) {
494544
view.handleUserTopicEvent(event);
@@ -502,10 +552,18 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage
502552
}
503553

504554
void handleMessageEvent(MessageEvent event) {
555+
final message = event.message;
556+
505557
// If the message is one we already know about (from a fetch),
506558
// clobber it with the one from the event system.
507559
// See [fetchedMessages] for reasoning.
508-
messages[event.message.id] = event.message;
560+
messages[event.message.id] = message;
561+
562+
if (message is StreamMessage && subscriptions[message.streamId] == null) {
563+
// We didn't expect this event, because the channel is unsubscribed. But
564+
// that doesn't mean we should expect future events about this message.
565+
_maybeStaleChannelMessages.add(message.id);
566+
}
509567

510568
_handleMessageEventOutbox(event);
511569

@@ -594,6 +652,12 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage
594652
// See [StreamConversation.displayRecipient] on why the invalidation is
595653
// needed.
596654
message.conversation.displayRecipient = null;
655+
656+
if (subscriptions[newStreamId] == null) {
657+
// The message was moved into an unsubscribed (or unknown) channel,
658+
// which means we expect our data on it to get stale.
659+
_maybeStaleChannelMessages.add(messageId);
660+
}
597661
}
598662

599663
if (newTopic != origTopic) {
@@ -616,6 +680,7 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage
616680
void handleDeleteMessageEvent(DeleteMessageEvent event) {
617681
for (final messageId in event.messageIds) {
618682
messages.remove(messageId);
683+
_maybeStaleChannelMessages.remove(messageId);
619684
_editMessageRequests.remove(messageId);
620685
}
621686
for (final view in _messageListViews) {

lib/model/store.dart

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -826,6 +826,9 @@ class PerAccountStore extends PerAccountStoreBase with
826826

827827
case SubscriptionEvent():
828828
assert(debugLog("server event: subscription/${event.op}"));
829+
if (event is SubscriptionRemoveEvent) {
830+
_messages.handleSubscriptionRemoveEvent(event);
831+
}
829832
_channels.handleSubscriptionEvent(event);
830833
notifyListeners();
831834

test/model/message_test.dart

Lines changed: 132 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ void main() {
3636

3737
// These "late" variables are the common state operated on by each test.
3838
// Each test case calls [prepare] to initialize them.
39-
late Subscription subscription;
39+
late Subscription? subscription;
4040
late PerAccountStore store;
4141
late FakeApiConnection connection;
4242
// [messageList] is here only for the sake of checking when it notifies.
@@ -54,15 +54,18 @@ void main() {
5454
/// Initialize [store] and the rest of the test state.
5555
Future<void> prepare({
5656
ZulipStream? stream,
57+
bool isChannelSubscribed = true,
5758
int? zulipFeatureLevel,
5859
}) async {
5960
stream ??= eg.stream(streamId: eg.defaultStreamMessageStreamId);
60-
subscription = eg.subscription(stream);
6161
final selfAccount = eg.selfAccount.copyWith(zulipFeatureLevel: zulipFeatureLevel);
6262
store = eg.store(account: selfAccount,
6363
initialSnapshot: eg.initialSnapshot(zulipFeatureLevel: zulipFeatureLevel));
6464
await store.addStream(stream);
65-
await store.addSubscription(subscription);
65+
if (isChannelSubscribed) {
66+
subscription = eg.subscription(stream);
67+
await store.addSubscription(subscription!);
68+
}
6669
connection = store.connection as FakeApiConnection;
6770
notifiedCount = 0;
6871
messageList = MessageListView.init(store: store,
@@ -533,18 +536,132 @@ void main() {
533536
});
534537
});
535538

536-
test('on ID collision, new message does not clobber old in store.messages', () async {
537-
await prepare();
538-
final message = eg.streamMessage(id: 1, content: '<p>foo</p>');
539-
await addMessages([message]);
540-
check(store.messages).deepEquals({1: message});
541-
final newMessage = eg.streamMessage(id: 1, content: '<p>bar</p>');
542-
final messages = [newMessage];
543-
store.reconcileMessages(messages);
544-
check(messages).deepEquals(
545-
// (We'll check more messages in an upcoming commit.)
546-
[message].map(conditionIdentical));
547-
check(store.messages).deepEquals({1: message});
539+
group('fetched message with ID already in store.messages', () {
540+
late Message messageCopy;
541+
542+
/// Makes a copy of the single message in [MessageStore.messages]
543+
/// by round-tripping through [Message.fromJson] and [Message.toJson].
544+
///
545+
/// If that message's [StreamMessage.conversation.displayRecipient]
546+
/// is null, callers must provide a non-null [displayRecipient]
547+
/// to allow [StreamConversation.fromJson] to complete without throwing.
548+
Message copyStoredMessage({String? displayRecipient}) {
549+
final message = store.messages.values.single;
550+
551+
Map<String, dynamic> json = message.toJson();
552+
if (
553+
message is StreamMessage
554+
&& message.conversation.displayRecipient == null
555+
) {
556+
if (displayRecipient == null) throw ArgumentError();
557+
json['display_recipient'] = displayRecipient;
558+
}
559+
560+
return Message.fromJson(json);
561+
}
562+
563+
/// Checks if the single message in [MessageStore.messages]
564+
/// is identical to [message].
565+
void checkStoredMessageIdenticalTo(Message message) {
566+
check(store.messages)
567+
.deepEquals({message.id: conditionIdentical(message)});
568+
}
569+
570+
test('DM', () async {
571+
await prepare();
572+
final message = eg.dmMessage(id: 1, from: eg.otherUser, to: [eg.selfUser]);
573+
574+
store.reconcileMessages([message]);
575+
checkStoredMessageIdenticalTo(message);
576+
store.reconcileMessages([copyStoredMessage()]);
577+
// Not clobbering, because the first call didn't mark stale.
578+
checkStoredMessageIdenticalTo(message);
579+
});
580+
581+
group('channel message; chooses correctly whether to clobber the stored version', () {
582+
// Exercise the ways we move the message in and out of the "maybe stale"
583+
// state. These include reconcileMessage itself, so sometimes we test
584+
// repeated calls to that with nothing else happening in between.
585+
586+
test('various conditions', () async {
587+
final channel = eg.stream();
588+
await prepare(stream: channel, isChannelSubscribed: true);
589+
final message = eg.streamMessage(id: 1, stream: channel);
590+
591+
final otherChannel = eg.stream();
592+
await store.addStream(otherChannel);
593+
594+
store.reconcileMessages([message]);
595+
checkStoredMessageIdenticalTo(message);
596+
store.reconcileMessages([copyStoredMessage()]);
597+
// Not clobbering, because the first call didn't mark stale,
598+
// because the message was in a subscribed channel.
599+
checkStoredMessageIdenticalTo(message);
600+
601+
await store.removeSubscription(channel.streamId);
602+
messageCopy = copyStoredMessage();
603+
store.reconcileMessages([messageCopy]);
604+
// Clobbering because the unsubscribe event marked the message stale.
605+
checkStoredMessageIdenticalTo(messageCopy);
606+
messageCopy = copyStoredMessage();
607+
store.reconcileMessages([messageCopy]);
608+
// (Check that reconcileMessage itself didn't unmark as stale.)
609+
checkStoredMessageIdenticalTo(messageCopy);
610+
611+
await store.addSubscription(eg.subscription(channel));
612+
messageCopy = copyStoredMessage();
613+
store.reconcileMessages([messageCopy]);
614+
// The channel became subscribed,
615+
// but the message's data hasn't been refreshed, so clobber…
616+
checkStoredMessageIdenticalTo(messageCopy);
617+
618+
store.reconcileMessages([copyStoredMessage()]);
619+
// …Now it's been refreshed, by reconcileMessages, so don't clobber.
620+
checkStoredMessageIdenticalTo(messageCopy);
621+
622+
check(store.subscriptions[otherChannel.streamId]).isNull();
623+
await store.handleEvent(
624+
eg.updateMessageEventMoveFrom(origMessages: [message],
625+
newStreamId: otherChannel.streamId));
626+
messageCopy = copyStoredMessage(displayRecipient: otherChannel.name);
627+
store.reconcileMessages([messageCopy]);
628+
// Message was moved to an unsubscribed channel, so clobber.
629+
checkStoredMessageIdenticalTo(messageCopy);
630+
messageCopy = copyStoredMessage();
631+
store.reconcileMessages([messageCopy]);
632+
// (Check that reconcileMessage itself didn't unmark as stale.)
633+
checkStoredMessageIdenticalTo(messageCopy);
634+
});
635+
636+
test('in unsubscribed channel on first call', () async {
637+
await prepare(isChannelSubscribed: false);
638+
final message = eg.streamMessage(id: 1);
639+
640+
store.reconcileMessages([message]);
641+
checkStoredMessageIdenticalTo(message);
642+
643+
messageCopy = copyStoredMessage();
644+
store.reconcileMessages([messageCopy]);
645+
checkStoredMessageIdenticalTo(messageCopy);
646+
messageCopy = copyStoredMessage();
647+
store.reconcileMessages([messageCopy]);
648+
checkStoredMessageIdenticalTo(messageCopy);
649+
});
650+
651+
test('new-message event when in unsubscribed channel', () async {
652+
await prepare(isChannelSubscribed: false);
653+
final message = eg.streamMessage(id: 1);
654+
655+
await store.handleEvent(eg.messageEvent(message));
656+
657+
messageCopy = copyStoredMessage();
658+
store.reconcileMessages([messageCopy]);
659+
checkStoredMessageIdenticalTo(messageCopy);
660+
messageCopy = copyStoredMessage();
661+
store.reconcileMessages([messageCopy]);
662+
checkStoredMessageIdenticalTo(messageCopy);
663+
});
664+
});
548665
});
549666

550667
test('matchContent and matchTopic are removed', () async {

0 commit comments

Comments
 (0)