Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion packages/proto/src/generated/sds_message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import type { Uint8ArrayList } from 'uint8arraylist'
export interface HistoryEntry {
messageId: string
retrievalHint?: Uint8Array
senderId?: string
}

export namespace HistoryEntry {
Expand All @@ -35,6 +36,11 @@ export namespace HistoryEntry {
w.bytes(obj.retrievalHint)
}

if (obj.senderId != null) {
w.uint32(26)
w.string(obj.senderId)
}

if (opts.lengthDelimited !== false) {
w.ldelim()
}
Expand All @@ -57,6 +63,10 @@ export namespace HistoryEntry {
obj.retrievalHint = reader.bytes()
break
}
case 3: {
obj.senderId = reader.string()
break
}
default: {
reader.skipType(tag & 7)
break
Expand Down Expand Up @@ -87,6 +97,7 @@ export interface SdsMessage {
lamportTimestamp?: bigint
causalHistory: HistoryEntry[]
bloomFilter?: Uint8Array
repairRequest: HistoryEntry[]
content?: Uint8Array
}

Expand Down Expand Up @@ -132,6 +143,13 @@ export namespace SdsMessage {
w.bytes(obj.bloomFilter)
}

if (obj.repairRequest != null) {
for (const value of obj.repairRequest) {
w.uint32(106)
HistoryEntry.codec().encode(value, w)
}
}

if (obj.content != null) {
w.uint32(162)
w.bytes(obj.content)
Expand All @@ -145,7 +163,8 @@ export namespace SdsMessage {
senderId: '',
messageId: '',
channelId: '',
causalHistory: []
causalHistory: [],
repairRequest: []
}

const end = length == null ? reader.len : reader.pos + length
Expand Down Expand Up @@ -184,6 +203,16 @@ export namespace SdsMessage {
obj.bloomFilter = reader.bytes()
break
}
case 13: {
if (opts.limits?.repairRequest != null && obj.repairRequest.length === opts.limits.repairRequest) {
throw new MaxLengthError('Decode error - map field "repairRequest" had too many elements')
}

obj.repairRequest.push(HistoryEntry.codec().decode(reader, reader.uint32(), {
limits: opts.limits?.repairRequest$
}))
break
}
case 20: {
obj.content = reader.bytes()
break
Expand Down
5 changes: 5 additions & 0 deletions packages/proto/src/lib/sds_message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ syntax = "proto3";
message HistoryEntry {
string message_id = 1; // Unique identifier of the SDS message, as defined in `Message`
optional bytes retrieval_hint = 2; // Optional information to help remote parties retrieve this SDS message; For example, A Waku deterministic message hash or routing payload hash

optional string sender_id = 3; // Participant ID of original message sender. Only populated if using optional SDS Repair extension
}

message SdsMessage {
Expand All @@ -12,5 +14,8 @@ message SdsMessage {
optional uint64 lamport_timestamp = 10; // Logical timestamp for causal ordering in channel
repeated HistoryEntry causal_history = 11; // List of preceding message IDs that this message causally depends on. Generally 2 or 3 message IDs are included.
optional bytes bloom_filter = 12; // Bloom filter representing received message IDs in channel

repeated HistoryEntry repair_request = 13; // Capped list of history entries missing from sender's causal history. Only populated if using the optional SDS Repair extension.

optional bytes content = 20; // Actual content of the message
}
8 changes: 7 additions & 1 deletion packages/sds/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,14 @@ export {
type HistoryEntry,
type ChannelId,
type MessageChannelEvents,
type SenderId,
type ParticipantId,
type MessageId
} from "./message_channel/index.js";

/**
* @deprecated Use ParticipantId instead. SenderId has been renamed to ParticipantId
* to better reflect that it represents a channel participant, not just a message sender.
*/
export type { ParticipantId as SenderId } from "./message_channel/index.js";

export { BloomFilter };
31 changes: 28 additions & 3 deletions packages/sds/src/message_channel/events.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { HistoryEntry, Message, MessageId } from "./message.js";
import { HistoryEntry, Message, MessageId, ParticipantId } from "./message.js";

export enum MessageChannelEvent {
OutMessageSent = "sds:out:message-sent",
Expand All @@ -10,7 +10,13 @@ export enum MessageChannelEvent {
OutSyncSent = "sds:out:sync-sent",
InSyncReceived = "sds:in:sync-received",
InMessageLost = "sds:in:message-irretrievably-lost",
ErrorTask = "sds:error-task"
ErrorTask = "sds:error-task",
// SDS-R Repair Events
RepairRequestQueued = "sds:repair:request-queued",
RepairRequestSent = "sds:repair:request-sent",
RepairRequestReceived = "sds:repair:request-received",
RepairResponseQueued = "sds:repair:response-queued",
RepairResponseSent = "sds:repair:response-sent"
}

export type MessageChannelEvents = {
Expand All @@ -26,5 +32,24 @@ export type MessageChannelEvents = {
[MessageChannelEvent.InMessageLost]: CustomEvent<HistoryEntry[]>;
[MessageChannelEvent.OutSyncSent]: CustomEvent<Message>;
[MessageChannelEvent.InSyncReceived]: CustomEvent<Message>;
[MessageChannelEvent.ErrorTask]: CustomEvent<any>;
[MessageChannelEvent.ErrorTask]: CustomEvent<unknown>;
[MessageChannelEvent.RepairRequestQueued]: CustomEvent<{
messageId: MessageId;
tReq: number;
}>;
[MessageChannelEvent.RepairRequestSent]: CustomEvent<{
messageIds: MessageId[];
carrierMessageId: MessageId;
}>;
[MessageChannelEvent.RepairRequestReceived]: CustomEvent<{
messageIds: MessageId[];
fromSenderId?: ParticipantId;
}>;
[MessageChannelEvent.RepairResponseQueued]: CustomEvent<{
messageId: MessageId;
tResp: number;
}>;
[MessageChannelEvent.RepairResponseSent]: CustomEvent<{
messageId: MessageId;
}>;
};
2 changes: 1 addition & 1 deletion packages/sds/src/message_channel/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export {
HistoryEntry,
Message,
MessageId,
SenderId,
ParticipantId,
SyncMessage,
isContentMessage,
isEphemeralMessage,
Expand Down
34 changes: 34 additions & 0 deletions packages/sds/src/message_channel/message.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ describe("Message serialization", () => {
[{ messageId: depMessageId, retrievalHint: depRetrievalHint }],
0n,
undefined,
undefined,
undefined
);

Expand All @@ -54,6 +55,39 @@ describe("Message serialization", () => {
{ messageId: depMessageId, retrievalHint: depRetrievalHint }
]);
});

it("Repair Request", () => {
const repairMessageId = "missing-message";
const repairRetrievalHint = utf8ToBytes("missing-retrieval");
const repairSenderId = "original-sender";
const message = new Message(
"123",
"my-channel",
"me",
[],
0n,
undefined,
undefined,
[
{
messageId: repairMessageId,
retrievalHint: repairRetrievalHint,
senderId: repairSenderId
}
]
);

const bytes = message.encode();
const decMessage = Message.decode(bytes);

expect(decMessage!.repairRequest).to.deep.equal([
{
messageId: repairMessageId,
retrievalHint: repairRetrievalHint,
senderId: repairSenderId
}
]);
});
});

describe("ContentMessage comparison with < operator", () => {
Expand Down
37 changes: 24 additions & 13 deletions packages/sds/src/message_channel/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@ import { Logger } from "@waku/utils";
export type MessageId = string;
export type HistoryEntry = proto_sds_message.HistoryEntry;
export type ChannelId = string;
export type SenderId = string;
export type ParticipantId = string;

const log = new Logger("sds:message");

export class Message implements proto_sds_message.SdsMessage {
public constructor(
public messageId: string,
public messageId: MessageId,
public channelId: string,
public senderId: string,
public senderId: ParticipantId,
public causalHistory: proto_sds_message.HistoryEntry[],
public lamportTimestamp?: bigint | undefined,
public bloomFilter?: Uint8Array<ArrayBufferLike> | undefined,
public content?: Uint8Array<ArrayBufferLike> | undefined,
public repairRequest: proto_sds_message.HistoryEntry[] = [],
/**
* Not encoded, set after it is sent, used to include in follow-up messages
*/
Expand All @@ -38,7 +39,8 @@ export class Message implements proto_sds_message.SdsMessage {
causalHistory,
lamportTimestamp,
bloomFilter,
content
content,
repairRequest
} = proto_sds_message.SdsMessage.decode(data);

if (testContentMessage({ lamportTimestamp, content })) {
Expand All @@ -49,7 +51,8 @@ export class Message implements proto_sds_message.SdsMessage {
causalHistory,
lamportTimestamp!,
bloomFilter,
content!
content!,
repairRequest
);
}

Expand All @@ -61,7 +64,8 @@ export class Message implements proto_sds_message.SdsMessage {
causalHistory,
undefined,
bloomFilter,
content!
content!,
repairRequest
);
}

Expand All @@ -73,7 +77,8 @@ export class Message implements proto_sds_message.SdsMessage {
causalHistory,
lamportTimestamp!,
bloomFilter,
undefined
undefined,
repairRequest
);
}
log.error(
Expand All @@ -90,13 +95,14 @@ export class Message implements proto_sds_message.SdsMessage {

export class SyncMessage extends Message {
public constructor(
public messageId: string,
public messageId: MessageId,
public channelId: string,
public senderId: string,
public senderId: ParticipantId,
public causalHistory: proto_sds_message.HistoryEntry[],
public lamportTimestamp: bigint,
public bloomFilter: Uint8Array<ArrayBufferLike> | undefined,
public content: undefined,
public repairRequest: proto_sds_message.HistoryEntry[] = [],
/**
* Not encoded, set after it is sent, used to include in follow-up messages
*/
Expand All @@ -110,6 +116,7 @@ export class SyncMessage extends Message {
lamportTimestamp,
bloomFilter,
content,
repairRequest,
retrievalHint
);
}
Expand All @@ -134,13 +141,14 @@ export function isSyncMessage(

export class EphemeralMessage extends Message {
public constructor(
public messageId: string,
public messageId: MessageId,
public channelId: string,
public senderId: string,
public senderId: ParticipantId,
public causalHistory: proto_sds_message.HistoryEntry[],
public lamportTimestamp: undefined,
public bloomFilter: Uint8Array<ArrayBufferLike> | undefined,
public content: Uint8Array<ArrayBufferLike>,
public repairRequest: proto_sds_message.HistoryEntry[] = [],
/**
* Not encoded, set after it is sent, used to include in follow-up messages
*/
Expand All @@ -157,6 +165,7 @@ export class EphemeralMessage extends Message {
lamportTimestamp,
bloomFilter,
content,
repairRequest,
retrievalHint
);
}
Expand All @@ -182,13 +191,14 @@ function testEphemeralMessage(message: {

export class ContentMessage extends Message {
public constructor(
public messageId: string,
public messageId: MessageId,
public channelId: string,
public senderId: string,
public senderId: ParticipantId,
public causalHistory: proto_sds_message.HistoryEntry[],
public lamportTimestamp: bigint,
public bloomFilter: Uint8Array<ArrayBufferLike> | undefined,
public content: Uint8Array<ArrayBufferLike>,
public repairRequest: proto_sds_message.HistoryEntry[] = [],
/**
* Not encoded, set after it is sent, used to include in follow-up messages
*/
Expand All @@ -205,6 +215,7 @@ export class ContentMessage extends Message {
lamportTimestamp,
bloomFilter,
content,
repairRequest,
retrievalHint
);
}
Expand Down
4 changes: 3 additions & 1 deletion packages/sds/src/message_channel/message_channel.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ describe("MessageChannel", function () {
.slice(-causalHistorySize - 1, -1)
.map((message) => ({
messageId: MessageChannel.getMessageId(utf8ToBytes(message)),
retrievalHint: undefined
retrievalHint: undefined,
senderId: "alice"
}));
expect(causalHistory).to.deep.equal(expectedCausalHistory);
});
Expand Down Expand Up @@ -298,6 +299,7 @@ describe("MessageChannel", function () {
1n,
undefined,
payload,
undefined,
testRetrievalHint
),
testRetrievalHint
Expand Down
Loading
Loading