Skip to content

Commit cb3af8c

Browse files
authored
fix(sds): initialize lamport timestamp with current time (#2610)
1 parent 4d5c152 commit cb3af8c

File tree

2 files changed

+26
-11
lines changed

2 files changed

+26
-11
lines changed

packages/sds/src/message_channel/message_channel.spec.ts

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -154,8 +154,8 @@ describe("MessageChannel", function () {
154154
});
155155

156156
// Causal history should only contain the last N messages as defined by causalHistorySize
157-
const causalHistory = outgoingBuffer[outgoingBuffer.length - 1]
158-
.causalHistory as HistoryEntry[];
157+
const causalHistory =
158+
outgoingBuffer[outgoingBuffer.length - 1].causalHistory;
159159
expect(causalHistory.length).to.equal(causalHistorySize);
160160

161161
const expectedCausalHistory = messages
@@ -185,6 +185,8 @@ describe("MessageChannel", function () {
185185
});
186186

187187
it("should update lamport timestamp if greater than current timestamp and dependencies are met", async () => {
188+
const timestampBefore = channelA["lamportTimestamp"];
189+
188190
for (const m of messagesA) {
189191
await sendMessage(channelA, utf8ToBytes(m), callback);
190192
}
@@ -195,11 +197,12 @@ describe("MessageChannel", function () {
195197
});
196198
}
197199
const timestampAfter = channelA["lamportTimestamp"];
198-
expect(timestampAfter).to.equal(messagesB.length);
200+
expect(timestampAfter - timestampBefore).to.equal(messagesB.length);
199201
});
200202

201203
it("should maintain proper timestamps if all messages received", async () => {
202-
let timestamp = 0;
204+
const aTimestampBefore = channelA["lamportTimestamp"];
205+
let timestamp = channelB["lamportTimestamp"];
203206
for (const m of messagesA) {
204207
await sendMessage(channelA, utf8ToBytes(m), async (message) => {
205208
timestamp++;
@@ -219,7 +222,9 @@ describe("MessageChannel", function () {
219222
}
220223

221224
const expectedLength = messagesA.length + messagesB.length;
222-
expect(channelA["lamportTimestamp"]).to.equal(expectedLength);
225+
expect(channelA["lamportTimestamp"]).to.equal(
226+
aTimestampBefore + expectedLength
227+
);
223228
expect(channelA["lamportTimestamp"]).to.equal(
224229
channelB["lamportTimestamp"]
225230
);
@@ -312,6 +317,8 @@ describe("MessageChannel", function () {
312317
const message2Id = MessageChannel.getMessageId(message2Payload);
313318
const message3Id = MessageChannel.getMessageId(message3Payload);
314319

320+
const startTimestamp = channelA["lamportTimestamp"];
321+
315322
// Send own message first (timestamp will be 1)
316323
await sendMessage(channelA, message1Payload, callback);
317324

@@ -323,7 +330,7 @@ describe("MessageChannel", function () {
323330
channelA.channelId,
324331
"bob",
325332
[],
326-
3, // Higher timestamp
333+
startTimestamp + 3, // Higher timestamp
327334
undefined,
328335
message3Payload
329336
)
@@ -337,7 +344,7 @@ describe("MessageChannel", function () {
337344
channelA.channelId,
338345
"carol",
339346
[],
340-
2, // Middle timestamp
347+
startTimestamp + 2, // Middle timestamp
341348
undefined,
342349
message2Payload
343350
)
@@ -350,21 +357,27 @@ describe("MessageChannel", function () {
350357

351358
const first = localHistory.findIndex(
352359
({ messageId, lamportTimestamp }) => {
353-
return messageId === message1Id && lamportTimestamp === 1;
360+
return (
361+
messageId === message1Id && lamportTimestamp === startTimestamp + 1
362+
);
354363
}
355364
);
356365
expect(first).to.eq(0);
357366

358367
const second = localHistory.findIndex(
359368
({ messageId, lamportTimestamp }) => {
360-
return messageId === message2Id && lamportTimestamp === 2;
369+
return (
370+
messageId === message2Id && lamportTimestamp === startTimestamp + 2
371+
);
361372
}
362373
);
363374
expect(second).to.eq(1);
364375

365376
const third = localHistory.findIndex(
366377
({ messageId, lamportTimestamp }) => {
367-
return messageId === message3Id && lamportTimestamp === 3;
378+
return (
379+
messageId === message3Id && lamportTimestamp === startTimestamp + 3
380+
);
368381
}
369382
);
370383
expect(third).to.eq(2);

packages/sds/src/message_channel/message_channel.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,9 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
9595
super();
9696
this.channelId = channelId;
9797
this.senderId = senderId;
98-
this.lamportTimestamp = 0;
98+
// SDS RFC says to use nanoseconds, but current time in nanosecond is > Number.MAX_SAFE_INTEGER
99+
// So instead we are using milliseconds and proposing a spec change (TODO)
100+
this.lamportTimestamp = Date.now();
99101
this.filter = new DefaultBloomFilter(DEFAULT_BLOOM_FILTER_OPTIONS);
100102
this.outgoingBuffer = [];
101103
this.possibleAcks = new Map();

0 commit comments

Comments
 (0)