Skip to content

Commit e92f6a2

Browse files
authored
feat!: do not send sync messages with empty history (#2658)
* feat!: do not send sync messages with empty history A sync message without any history as no value. If there are no messages in the channel, then a sync messages does not help. If there are messages in the channel, but this participant is not aware of them, then it can confuse other participants to assume that the channel is empty. * fix test by adding a message to channel history * make `pushOutgoingSyncMessage` return true even if no callback passed
1 parent c0ecb6a commit e92f6a2

File tree

3 files changed

+113
-10
lines changed

3 files changed

+113
-10
lines changed

packages/sdk/src/reliable_channel/reliable_channel_sync.spec.ts

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,19 @@ describe("Reliable Channel: Sync", () => {
5656
}
5757
);
5858

59+
// Send a message to have a history
60+
const sentMsgId = reliableChannel.send(utf8ToBytes("some message"));
61+
let messageSent = false;
62+
reliableChannel.addEventListener("message-sent", (event) => {
63+
if (event.detail === sentMsgId) {
64+
messageSent = true;
65+
}
66+
});
67+
68+
while (!messageSent) {
69+
await delay(50);
70+
}
71+
5972
let syncMessageSent = false;
6073
reliableChannel.messageChannel.addEventListener(
6174
MessageChannelEvent.OutSyncSent,
@@ -131,6 +144,19 @@ describe("Reliable Channel: Sync", () => {
131144
return 1;
132145
}; // will wait a full second
133146

147+
// Send a message to have a history
148+
const sentMsgId = reliableChannelAlice.send(utf8ToBytes("some message"));
149+
let messageSent = false;
150+
reliableChannelAlice.addEventListener("message-sent", (event) => {
151+
if (event.detail === sentMsgId) {
152+
messageSent = true;
153+
}
154+
});
155+
156+
while (!messageSent) {
157+
await delay(50);
158+
}
159+
134160
let syncMessageSent = false;
135161
reliableChannelBob.messageChannel.addEventListener(
136162
MessageChannelEvent.OutSyncSent,
@@ -191,6 +217,19 @@ describe("Reliable Channel: Sync", () => {
191217
return 1;
192218
}; // will wait a full second
193219

220+
// Send a message to have a history
221+
const sentMsgId = reliableChannelAlice.send(utf8ToBytes("some message"));
222+
let messageSent = false;
223+
reliableChannelAlice.addEventListener("message-sent", (event) => {
224+
if (event.detail === sentMsgId) {
225+
messageSent = true;
226+
}
227+
});
228+
229+
while (!messageSent) {
230+
await delay(50);
231+
}
232+
194233
let syncMessageSent = false;
195234
reliableChannelBob.messageChannel.addEventListener(
196235
MessageChannelEvent.OutSyncSent,
@@ -232,6 +271,19 @@ describe("Reliable Channel: Sync", () => {
232271
return 1;
233272
}; // will wait a full second
234273

274+
// Send a message to have a history
275+
const sentMsgId = reliableChannel.send(utf8ToBytes("some message"));
276+
let messageSent = false;
277+
reliableChannel.addEventListener("message-sent", (event) => {
278+
if (event.detail === sentMsgId) {
279+
messageSent = true;
280+
}
281+
});
282+
283+
while (!messageSent) {
284+
await delay(50);
285+
}
286+
235287
let syncMessageSent = false;
236288
reliableChannel.messageChannel.addEventListener(
237289
MessageChannelEvent.OutSyncSent,
@@ -273,6 +325,19 @@ describe("Reliable Channel: Sync", () => {
273325
return 1;
274326
}; // will wait a full second
275327

328+
// Send a message to have a history
329+
const sentMsgId = reliableChannel.send(utf8ToBytes("some message"));
330+
let messageSent = false;
331+
reliableChannel.addEventListener("message-sent", (event) => {
332+
if (event.detail === sentMsgId) {
333+
messageSent = true;
334+
}
335+
});
336+
337+
while (!messageSent) {
338+
await delay(50);
339+
}
340+
276341
let syncMessageSent = false;
277342
reliableChannel.messageChannel.addEventListener(
278343
MessageChannelEvent.OutSyncSent,

packages/sds/src/message_channel/message_channel.spec.ts

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -647,11 +647,12 @@ describe("MessageChannel", function () {
647647
});
648648

649649
// And be sends a sync message
650-
await channelB.pushOutgoingSyncMessage(async (message) => {
650+
const res = await channelB.pushOutgoingSyncMessage(async (message) => {
651651
await receiveMessage(channelA, message);
652652
return true;
653653
});
654654

655+
expect(res).to.be.true;
655656
expect(messageAcked).to.be.true;
656657
});
657658
});
@@ -1089,17 +1090,41 @@ describe("MessageChannel", function () {
10891090
causalHistorySize: 2
10901091
});
10911092
channelB = new MessageChannel(channelId, "bob", { causalHistorySize: 2 });
1093+
const message = utf8ToBytes("first message in channel");
1094+
channelA["localHistory"].push(
1095+
new ContentMessage(
1096+
MessageChannel.getMessageId(message),
1097+
"MyChannel",
1098+
"alice",
1099+
[],
1100+
1n,
1101+
undefined,
1102+
message
1103+
)
1104+
);
10921105
});
10931106

10941107
it("should be sent with empty content", async () => {
1095-
await channelA.pushOutgoingSyncMessage(async (message) => {
1108+
const res = await channelA.pushOutgoingSyncMessage(async (message) => {
10961109
expect(message.content).to.be.undefined;
10971110
return true;
10981111
});
1112+
expect(res).to.be.true;
1113+
});
1114+
1115+
it("should not be sent when there is no history", async () => {
1116+
const channelC = new MessageChannel(channelId, "carol", {
1117+
causalHistorySize: 2
1118+
});
1119+
const res = await channelC.pushOutgoingSyncMessage(async (_msg) => {
1120+
throw "callback was called when it's not expected";
1121+
});
1122+
expect(res).to.be.false;
10991123
});
11001124

11011125
it("should not be added to outgoing buffer, bloom filter, or local log", async () => {
1102-
await channelA.pushOutgoingSyncMessage();
1126+
const res = await channelA.pushOutgoingSyncMessage();
1127+
expect(res).to.be.true;
11031128

11041129
const outgoingBuffer = channelA["outgoingBuffer"] as Message[];
11051130
expect(outgoingBuffer.length).to.equal(0);
@@ -1110,15 +1135,16 @@ describe("MessageChannel", function () {
11101135
).to.equal(false);
11111136

11121137
const localLog = channelA["localHistory"];
1113-
expect(localLog.length).to.equal(0);
1138+
expect(localLog.length).to.equal(1); // beforeEach adds one message
11141139
});
11151140

11161141
it("should not be delivered", async () => {
11171142
const timestampBefore = channelB["lamportTimestamp"];
1118-
await channelA.pushOutgoingSyncMessage(async (message) => {
1143+
const res = await channelA.pushOutgoingSyncMessage(async (message) => {
11191144
await receiveMessage(channelB, message);
11201145
return true;
11211146
});
1147+
expect(res).to.be.true;
11221148
const timestampAfter = channelB["lamportTimestamp"];
11231149
expect(timestampAfter).to.equal(timestampBefore);
11241150

@@ -1132,20 +1158,23 @@ describe("MessageChannel", function () {
11321158
});
11331159

11341160
it("should update ack status of messages in outgoing buffer", async () => {
1161+
const channelC = new MessageChannel(channelId, "carol", {
1162+
causalHistorySize: 2
1163+
});
11351164
for (const m of messagesA) {
1136-
await sendMessage(channelA, utf8ToBytes(m), async (message) => {
1165+
await sendMessage(channelC, utf8ToBytes(m), async (message) => {
11371166
await receiveMessage(channelB, message);
11381167
return { success: true };
11391168
});
11401169
}
11411170

11421171
await sendSyncMessage(channelB, async (message) => {
1143-
await receiveMessage(channelA, message);
1172+
await receiveMessage(channelC, message);
11441173
return true;
11451174
});
11461175

1147-
const causalHistorySize = channelA["causalHistorySize"];
1148-
const outgoingBuffer = channelA["outgoingBuffer"] as Message[];
1176+
const causalHistorySize = channelC["causalHistorySize"];
1177+
const outgoingBuffer = channelC["outgoingBuffer"] as Message[];
11491178
expect(outgoingBuffer.length).to.equal(
11501179
messagesA.length - causalHistorySize
11511180
);

packages/sds/src/message_channel/message_channel.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,14 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
384384
undefined
385385
);
386386

387+
if (!message.causalHistory || message.causalHistory.length === 0) {
388+
log.info(
389+
this.senderId,
390+
"no causal history in sync message, aborting sending"
391+
);
392+
return false;
393+
}
394+
387395
if (callback) {
388396
try {
389397
await callback(message);
@@ -400,7 +408,8 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
400408
throw error;
401409
}
402410
}
403-
return false;
411+
// No problem encountered so returning true
412+
return true;
404413
}
405414

406415
private _pushIncomingMessage(message: Message): void {

0 commit comments

Comments
 (0)