From fac33e2fb6497f3868fbaab74ef36ae9b2a0bbdf Mon Sep 17 00:00:00 2001 From: fryorcraken Date: Sat, 30 Aug 2025 22:03:37 +1000 Subject: [PATCH 01/10] wip --- .claude/settings.local.json | 12 ++ SDS-R-Implementation-Guide.md | 243 +++++++++++++++++++++++ sds.md | 363 ++++++++++++++++++++++++++++++++++ 3 files changed, 618 insertions(+) create mode 100644 .claude/settings.local.json create mode 100644 SDS-R-Implementation-Guide.md create mode 100644 sds.md diff --git a/.claude/settings.local.json b/.claude/settings.local.json new file mode 100644 index 0000000000..5aedfd8c6f --- /dev/null +++ b/.claude/settings.local.json @@ -0,0 +1,12 @@ +{ + "permissions": { + "allow": [ + "Bash(npm run build:*)", + "Bash(npm run check:*)", + "Bash(npm test)", + "Bash(npm run test:node:*)" + ], + "deny": [], + "ask": [] + } +} \ No newline at end of file diff --git a/SDS-R-Implementation-Guide.md b/SDS-R-Implementation-Guide.md new file mode 100644 index 0000000000..4456822946 --- /dev/null +++ b/SDS-R-Implementation-Guide.md @@ -0,0 +1,243 @@ +# SDS-Repair (SDS-R) Implementation Guide + +## Overview +SDS-R is an optional extension to the Scalable Data Sync (SDS) protocol that enables collaborative repair of missing messages within a limited time window. It's designed to work over Waku and assumes participants are already in a secure channel. + +## Core Concept +When a participant detects missing messages (via causal dependencies), it waits a random backoff period before requesting repairs. Other participants who have the missing message wait their own random backoff before responding. The protocol uses clever timing and grouping to ensure typically only one request and one response per missing message. + +--- + +## Data Structures + +### Protobuf Schema Modifications + +```protobuf +message HistoryEntry { + string message_id = 1; + optional bytes retrieval_hint = 2; + optional string sender_id = 3; // NEW: Original sender's ID (only for SDS-R) +} + +message Message { + string sender_id = 1; + string message_id = 2; + string channel_id = 3; + optional int32 lamport_timestamp = 10; + repeated HistoryEntry causal_history = 11; + optional bytes bloom_filter = 12; + repeated HistoryEntry repair_request = 13; // NEW: List of missing messages + optional bytes content = 20; +} +``` + +### Additional Participant State + +Each participant must maintain: + +1. **Outgoing Repair Request Buffer** + - Map: `HistoryEntry -> T_req (timestamp)` + - Sorted by ascending T_req + - Contains missing messages waiting to be requested + +2. **Incoming Repair Request Buffer** + - Map: `HistoryEntry -> T_resp (timestamp)` + - Contains repair requests from others that this participant can fulfill + - Only includes requests where participant is in the response group + +3. **Augmented Local History** + - Change from base SDS: Store full `Message` objects, not just message IDs + - Only for messages where participant could be a responder + - Needed to rebroadcast messages when responding to repairs + +### Global Configuration (per channel) + +``` +T_min = 30 seconds // Minimum wait before requesting repair +T_max = 120 seconds // Maximum wait for repair window (recommend 120-600) +num_response_groups = max(1, num_participants / 128) // Response group count +``` + +--- + +## Critical Algorithms + +### 1. Calculate T_req (When to Request Repair) + +**IMPORTANT BUG FIX**: The spec has an off-by-one error. Use this corrected formula: + +``` +T_req = current_time + hash(participant_id, message_id) % (T_max - T_min) + T_min +``` + +- `participant_id`: Your OWN participant ID (not the sender's) +- `message_id`: The missing message's ID +- Result: Timestamp between `current_time + T_min` and `current_time + T_max` + +### 2. Calculate T_resp (When to Respond to Repair) + +``` +distance = participant_id XOR sender_id +T_resp = current_time + (distance * hash(message_id)) % T_max +``` + +- `participant_id`: Your OWN participant ID +- `sender_id`: Original sender's ID from the HistoryEntry +- `message_id`: The requested message's ID +- Note: Original sender has distance=0, responds immediately + +### 3. Determine Response Group Membership + +``` +is_in_group = (hash(participant_id, message_id) % num_response_groups) == + (hash(sender_id, message_id) % num_response_groups) +``` + +- Only respond to repairs if `is_in_group` is true +- Original sender is always in their own response group + +--- + +## Protocol Implementation Steps + +### When SENDING a Message + +1. Check outgoing repair request buffer for eligible entries (where `T_req <= current_time`) +2. Take up to 3 eligible entries with lowest T_req values +3. Populate `repair_request` field with these HistoryEntries: + - Include `message_id` + - Include `retrieval_hint` if available + - Include `sender_id` (original sender's ID) +4. If no eligible entries, leave `repair_request` field unset +5. Continue with normal SDS send procedure + +### When RECEIVING a Message + +1. **Clean up buffers:** + - Remove received message_id from outgoing repair request buffer + - Remove received message_id from incoming repair request buffer + +2. **Process causal dependencies:** + - For each missing dependency in causal_history: + - Add to outgoing repair request buffer + - Calculate T_req using formula above + - Include sender_id from the causal history entry + +3. **Process repair_request field:** + - For each repair request entry: + a. Remove from your own outgoing buffer (someone else is requesting it) + b. Check if you have this message in local history + c. Check if you're in the response group (use formula above) + d. If both b and c are true: + - Add to incoming repair request buffer + - Calculate T_resp using formula above + +4. Continue with normal SDS receive procedure + +### Periodic Sweeps + +#### Outgoing Repair Request Buffer Sweep (every ~5 seconds) +```python +for entry, t_req in outgoing_buffer: + if current_time >= t_req: + # This entry will be included in next message's repair_request + # No action needed here, just wait for next send + pass +``` + +#### Incoming Repair Request Buffer Sweep (every ~5 seconds) +```python +for entry, t_resp in incoming_buffer: + if current_time >= t_resp: + message = get_from_local_history(entry.message_id) + if message: + broadcast(message) # Rebroadcast the full original message + remove_from_incoming_buffer(entry) +``` + +### Periodic Sync Messages with SDS-R + +When sending periodic sync messages: +1. Check if there are eligible entries in outgoing repair request buffer +2. If yes, send the sync message WITH repair_request field populated +3. Unlike base SDS, don't suppress sync message even if others recently sent one + +--- + +## Implementation Notes and Edge Cases + +### Hash Function +**CRITICAL**: The spec doesn't specify which hash function to use. Recommend: +- Use SHA256 for cryptographic properties +- Convert to integer for modulo operations: `int(hash_bytes[:8], byteorder='big')` +- Must be consistent across all participants + +### Participant ID Format +- Must support XOR operation for distance calculation +- Recommend using numeric IDs or convert string IDs to integers +- Must be globally unique within the channel + +### Memory Management +1. **Buffer limits**: Implement max size for repair buffers (suggest 1000 entries) +2. **Eviction policy**: Remove oldest T_req/T_resp when at capacity +3. **History retention**: Only keep messages for T_max duration +4. **Response group optimization**: Only cache full messages if you're likely to be in response group + +### Edge Cases to Handle + +1. **Duplicate repair requests**: Use Set semantics, only track once +2. **Expired repairs**: If T_req > current_time + T_max, remove from buffer +3. **Non-numeric participant IDs**: Hash to integer for XOR operations +4. **Missing sender_id**: Cannot participate in repair for that message +5. **Circular dependencies**: Set maximum recursion depth for dependency resolution + +### Typo to Fix +The spec has "Perdiodic" on line 461 - should be "Periodic" + +--- + +## Testing Scenarios + +1. **Single missing message**: Verify only one repair request and response +2. **Cascade recovery**: Missing message A depends on missing message B +3. **Original sender offline**: Verify next closest participant responds +4. **Response group isolation**: Verify only in-group participants respond +5. **Buffer overflow**: Test eviction policies +6. **Network partition**: Test behavior when repair window expires + +--- + +## Integration with Base SDS + +### Modified State from Base SDS +- Local history stores full Messages, not just IDs +- Additional buffers for repair tracking +- Sender_id must be preserved in HistoryEntry + +### Unchanged from Base SDS +- Lamport timestamp management +- Bloom filter operations +- Causal dependency checking +- Message delivery and conflict resolution + +--- + +## Performance Recommendations + +1. Use priority queues for T_req/T_resp ordered buffers +2. Index local history by message_id for O(1) lookup +3. Batch repair requests in single message (up to 3) +4. Cache response group calculation results +5. Implement exponential backoff in future version (noted as TODO in spec) + +--- + +## Security Assumptions + +- Operating within secure channel (via Waku) +- All participants are authenticated +- Rate limiting via Waku RLN-RELAY +- No additional authentication needed for repairs +- Trust all repair requests from channel members + +This implementation guide should be sufficient to implement SDS-R without access to the original specification. The key insight is that SDS-R elegantly uses timing and randomization to coordinate distributed repair without central coordination or excessive network traffic. \ No newline at end of file diff --git a/sds.md b/sds.md new file mode 100644 index 0000000000..1b1a902c03 --- /dev/null +++ b/sds.md @@ -0,0 +1,363 @@ +--- +title: SDS +name: Scalable Data Sync protocol for distributed logs +status: raw +editor: Hanno Cornelius +contributors: + - Akhil Peddireddy +--- + +## Abstract + +This specification introduces the Scalable Data Sync (SDS) protocol +to achieve end-to-end reliability +when consolidating distributed logs in a decentralized manner. +The protocol is designed for a peer-to-peer (p2p) topology +where an append-only log is maintained by each member of a group of nodes +who may individually append new entries to their local log at any time and +is interested in merging new entries from other nodes in real-time or close to real-time +while maintaining a consistent order. +The outcome of the log consolidation procedure is +that all nodes in the group eventually reflect in their own logs +the same entries in the same order. +The protocol aims to scale to very large groups. + +## Motivation + +A common application that fits this model is a p2p group chat (or group communication), +where the participants act as log nodes +and the group conversation is modelled as the consolidated logs +maintained on each node. +The problem of end-to-end reliability can then be stated as +ensuring that all participants eventually see the same sequence of messages +in the same causal order, +despite the challenges of network latency, message loss, +and scalability present in any communications transport layer. +The rest of this document will assume the terminology of a group communication: +log nodes being the _participants_ in the group chat +and the logged entries being the _messages_ exchanged between participants. + +## Design Assumptions + +We make the following simplifying assumptions for a proposed reliability protocol: + +* **Broadcast routing:** +Messages are broadcast disseminated by the underlying transport. +The selected transport takes care of routing messages +to all participants of the communication. +* **Store nodes:** +There are high-availability caches (a.k.a. Store nodes) +from which missed messages can be retrieved. +These caches maintain the full history of all messages that have been broadcast. +This is an optional element in the protocol design, +but improves scalability by reducing direct interactions between participants. +* **Message ID:** +Each message has a globally unique, immutable ID (or hash). +Messages can be requested from the high-availability caches or +other participants using the corresponding message ID. +* **Participant ID:** +Each participant has a globally unique, immutable ID +visible to other participants in the communication. + +## Wire protocol + +The keywords “MUST”, “MUST NOT”, “REQUIRED”, “SHALL”, “SHALL NOT”, “SHOULD”, +“SHOULD NOT”, “RECOMMENDED”, “MAY”, and + “OPTIONAL” in this document are to be interpreted as described in [2119](https://www.ietf.org/rfc/rfc2119.txt). + +### Message + +Messages MUST adhere to the following meta structure: + +```protobuf +syntax = "proto3"; + +message HistoryEntry { + string message_id = 1; // Unique identifier of the SDS message, as defined in `Message` + optional bytes retrieval_hint = 2; // Optional information to help remote parties retrieve this SDS message; For example, A Waku deterministic message hash or routing payload hash +} + +message Message { + string sender_id = 1; // Participant ID of the message sender + string message_id = 2; // Unique identifier of the message + string channel_id = 3; // Identifier of the channel to which the message belongs + optional int32 lamport_timestamp = 10; // Logical timestamp for causal ordering in channel + repeated HistoryEntry causal_history = 11; // List of preceding message IDs that this message causally depends on. Generally 2 or 3 message IDs are included. + optional bytes bloom_filter = 12; // Bloom filter representing received message IDs in channel + optional bytes content = 20; // Actual content of the message +} +``` + +The sending participant MUST include its own globally unique identifier in the `sender_id` field. +In addition, it MUST include a globally unique identifier for the message in the `message_id` field, +likely based on a message hash. +The `channel_id` field MUST be set to the identifier of the channel of group communication +that is being synchronized. +For simple group communications without individual channels, +the `channel_id` SHOULD be set to `0`. +The `lamport_timestamp`, `causal_history` and +`bloom_filter` fields MUST be set according to the [protocol steps](#protocol-steps) +set out below. +These fields MAY be left unset in the case of [ephemeral messages](#ephemeral-messages). +The message `content` MAY be left empty for [periodic sync messages](#periodic-sync-message), +otherwise it MUST contain the application-level content + +> **_Note:_** Close readers may notice that, outside of filtering messages originating from the sender itself, +the `sender_id` field is not used for much. +Its importance is expected to increase once a p2p retrieval mechanism is added to SDS, as is planned for the protocol. + +### Participant state + +Each participant MUST maintain: + +* A Lamport timestamp for each channel of communication, +initialized to current epoch time in nanosecond resolution. +* A bloom filter for received message IDs per channel. +The bloom filter SHOULD be rolled over and +recomputed once it reaches a predefined capacity of message IDs. +Furthermore, +it SHOULD be designed to minimize false positives through an optimal selection of +size and hash functions. +* A buffer for unacknowledged outgoing messages +* A buffer for incoming messages with unmet causal dependencies +* A local log (or history) for each channel, +containing all message IDs in the communication channel, +ordered by Lamport timestamp. + +Messages in the unacknowledged outgoing buffer can be in one of three states: + +1. **Unacknowledged** - there has been no acknowledgement of message receipt +by any participant in the channel +2. **Possibly acknowledged** - there has been ambiguous indication that the message +has been _possibly_ received by at least one participant in the channel +3. **Acknowledged** - there has been sufficient indication that the message +has been received by at least some of the participants in the channel. +This state will also remove the message from the outgoing buffer. + +### Protocol Steps + +For each channel of communication, +participants MUST follow these protocol steps to populate and interpret +the `lamport_timestamp`, `causal_history` and `bloom_filter` fields. + +#### Send Message + +Before broadcasting a message: + +* the participant MUST increase its local Lamport timestamp by `1` and +include this in the `lamport_timestamp` field. +* the participant MUST determine the preceding few message IDs in the local history +and include these in an ordered list in the `causal_history` field. +The number of message IDs to include in the `causal_history` depends on the application. +We recommend a causal history of two message IDs. +* the participant MAY include a `retrieval_hint` in the `HistoryEntry` +for each message ID in the `causal_history` field. +This is an application-specific field to facilitate retrieval of messages, +e.g. from high-availability caches. +* the participant MUST include the current `bloom_filter` +state in the broadcast message. + +After broadcasting a message, +the message MUST be added to the participant’s buffer +of unacknowledged outgoing messages. + +#### Receive Message + +Upon receiving a message, + +* the participant SHOULD ignore the message if it has a `sender_id` matching its own. +* the participant MAY deduplicate the message by comparing its `message_id` to previously received message IDs. +* the participant MUST [review the ACK status](#review-ack-status) of messages +in its unacknowledged outgoing buffer +using the received message's causal history and bloom filter. +* if the message has a populated `content` field, +the participant MUST include the received message ID in its local bloom filter. +* the participant MUST verify that all causal dependencies are met +for the received message. +Dependencies are met if the message IDs in the `causal_history` of the received message +appear in the local history of the receiving participant. + +If all dependencies are met and the message has a populated `content` field, +the participant MUST [deliver the message](#deliver-message). +If dependencies are unmet, +the participant MUST add the message to the incoming buffer of messages +with unmet causal dependencies. + +#### Deliver Message + +Triggered by the [Receive Message](#receive-message) procedure. + +If the received message’s Lamport timestamp is greater than the participant's +local Lamport timestamp, +the participant MUST update its local Lamport timestamp to match the received message. +The participant MUST insert the message ID into its local log, +based on Lamport timestamp. +If one or more message IDs with the same Lamport timestamp already exists, +the participant MUST follow the [Resolve Conflicts](#resolve-conflicts) procedure. + +#### Resolve Conflicts + +Triggered by the [Deliver Message](#deliver-message) procedure. + +The participant MUST order messages with the same Lamport timestamp +in ascending order of message ID. +If the message ID is implemented as a hash of the message, +this means the message with the lowest hash would precede +other messages with the same Lamport timestamp in the local log. + +#### Review ACK Status + +Triggered by the [Receive Message](#receive-message) procedure. + +For each message in the unacknowledged outgoing buffer, +based on the received `bloom_filter` and `causal_history`: + +* the participant MUST mark all messages in the received `causal_history` as **acknowledged**. +* the participant MUST mark all messages included in the `bloom_filter` +as **possibly acknowledged**. +If a message appears as **possibly acknowledged** in multiple received bloom filters, +the participant MAY mark it as acknowledged based on probabilistic grounds, +taking into account the bloom filter size and hash number. + +#### Periodic Incoming Buffer Sweep + +The participant MUST periodically check causal dependencies for each message +in the incoming buffer. +For each message in the incoming buffer: + +* the participant MAY attempt to retrieve missing dependencies from the Store node +(high-availability cache) or other peers. +It MAY use the application-specific `retrieval_hint` in the `HistoryEntry` to facilitate retrieval. +* if all dependencies of a message are met, +the participant MUST proceed to [deliver the message](#deliver-message). + +If a message's causal dependencies have failed to be met +after a predetermined amount of time, +the participant MAY mark them as **irretrievably lost**. + +#### Periodic Outgoing Buffer Sweep + +The participant MUST rebroadcast **unacknowledged** outgoing messages +after a set period. +The participant SHOULD use distinct resend periods for **unacknowledged** and +**possibly acknowledged** messages, +prioritizing **unacknowledged** messages. + +#### Periodic Sync Message + +For each channel of communication, +participants SHOULD periodically send sync messages to maintain state. +These sync messages: + +* MUST be sent with empty content +* MUST include an incremented Lamport timestamp +* MUST include causal history and bloom filter according to regular message rules +* MUST NOT be added to the unacknowledged outgoing buffer +* MUST NOT be included in causal histories of subsequent messages +* MUST NOT be included in bloom filters +* MUST NOT be added to the local log + +Since sync messages are not persisted, +they MAY have non-unique message IDs without impacting the protocol. +To avoid network activity bursts in large groups, +a participant MAY choose to only send periodic sync messages +if no other messages have been broadcast in the channel after a random backoff period. + +Participants MUST process the causal history and bloom filter of these sync messages +following the same steps as regular messages, +but MUST NOT persist the sync messages themselves. + +#### Ephemeral Messages + +Participants MAY choose to send short-lived messages for which no synchronization +or reliability is required. +These messages are termed _ephemeral_. + +Ephemeral messages SHOULD be sent with `lamport_timestamp`, `causal_history`, and +`bloom_filter` unset. +Ephemeral messages SHOULD NOT be added to the unacknowledged outgoing buffer +after broadcast. +Upon reception, +ephemeral messages SHOULD be delivered immediately without buffering for causal dependencies +or including in the local log. + +## Implementation Suggestions + +This section provides practical guidance based on the js-waku implementation of SDS. + +### Default Configuration Values + +The js-waku implementation uses the following defaults: +- **Bloom filter capacity**: 10,000 messages +- **Bloom filter error rate**: 0.001 (0.1% false positive rate) +- **Causal history size**: 200 message IDs +- **Possible ACKs threshold**: 2 bloom filter hits before considering a message acknowledged + +With 200 messages in causal history, assuming 32-byte message IDs and 32-byte retrieval hints (e.g., Waku message hashes), +each message carries 200 × 64 bytes = 12.8 KB of causal history overhead. + +### External Task Scheduling + +The js-waku implementation delegates periodic task scheduling to the library consumer by providing methods: + +- `processTasks()`: Process queued send/receive operations +- `sweepIncomingBuffer()`: Check and deliver messages with met dependencies, returns missing dependencies +- `sweepOutgoingBuffer()`: Return unacknowledged and possibly acknowledged messages for retry +- `pushOutgoingSyncMessage(callback)`: Send a sync message + +The implementation does not include internal timers, +allowing applications to integrate SDS with their existing scheduling infrastructure. + +### Message Processing + +#### Handling Missing Dependencies + +When `sweepIncomingBuffer()` returns missing dependencies, +the implementation emits an `InMessageMissing` event with `HistoryEntry[]` containing: +- `messageId`: The missing message identifier +- `retrievalHint`: Optional bytes to help retrieve the message (e.g., transport-specific hash) + +#### Timeout for Lost Messages + +The `timeoutForLostMessagesMs` option allows marking messages as irretrievably lost after a timeout. +When configured, the implementation emits an `InMessageLost` event after the timeout expires. + +### Events Emitted + +The js-waku implementation uses a `TypedEventEmitter` pattern to emit events for: +- **Incoming messages**: received, delivered, missing dependencies, lost (after timeout) +- **Outgoing messages**: sent, acknowledged, possibly acknowledged +- **Sync messages**: sent, received +- **Errors**: task execution failures + +### SDK Usage: ReliableChannel + +The SDK provides a high-level `ReliableChannel` abstraction that wraps the core SDS `MessageChannel` with automatic task scheduling and Waku protocol integration: + +#### Configuration + +The ReliableChannel uses these default intervals: +- **Sync message interval**: 30 seconds minimum between sync messages (randomized backoff) +- **Retry interval**: 30 seconds for unacknowledged messages +- **Max retry attempts**: 10 attempts before giving up +- **Store query interval**: 10 seconds for missing message retrieval + +#### Task Scheduling Implementation + +The SDK automatically schedules SDS periodic tasks: +- **Sync messages**: Uses exponential backoff with randomization; sent faster (0.5x multiplier) after receiving content to acknowledge others +- **Outgoing buffer sweeps**: Triggered after each retry interval for unacknowledged messages +- **Incoming buffer sweeps**: Performed after each incoming message and during missing message retrieval +- **Process tasks**: Called immediately after sending/receiving messages and during sync + +#### Integration with Waku Protocols + +ReliableChannel integrates SDS with Waku: +- **Sending**: Uses LightPush or Relay protocols; includes Waku message hash as retrieval hint (32 bytes) +- **Receiving**: Subscribes via Filter protocol; unwraps SDS messages before passing to application +- **Missing message retrieval**: Queries Store nodes using retrieval hints from causal history +- **Query on connect**: Automatically queries Store when connecting to new peers (enabled by default) + +## Copyright + +Copyright and related rights waived via [CC0](https://creativecommons.org/publicdomain/zero/1.0/). From 211a4da63feee1c451bd8b3be569c440a4372cc8 Mon Sep 17 00:00:00 2001 From: jm-clius Date: Fri, 3 Oct 2025 16:44:08 +0100 Subject: [PATCH 02/10] feat: integrate sds-r with message channels --- sds.md | 241 ++++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 178 insertions(+), 63 deletions(-) diff --git a/sds.md b/sds.md index 1b1a902c03..acb683ccf2 100644 --- a/sds.md +++ b/sds.md @@ -58,6 +58,9 @@ other participants using the corresponding message ID. * **Participant ID:** Each participant has a globally unique, immutable ID visible to other participants in the communication. +* **Sender ID:** +The *Participant ID* of the original sender of a message, +often coupled with a *Message ID*. ## Wire protocol @@ -75,15 +78,20 @@ syntax = "proto3"; message HistoryEntry { string message_id = 1; // Unique identifier of the SDS message, as defined in `Message` optional bytes retrieval_hint = 2; // Optional information to help remote parties retrieve this SDS message; For example, A Waku deterministic message hash or routing payload hash + + optional string sender_id = 3; // Participant ID of original message sender. Only populated if using optional SDS Repair extension } message Message { string sender_id = 1; // Participant ID of the message sender string message_id = 2; // Unique identifier of the message string channel_id = 3; // Identifier of the channel to which the message belongs - optional int32 lamport_timestamp = 10; // Logical timestamp for causal ordering in channel + optional uint64 lamport_timestamp = 10; // Logical timestamp for causal ordering in channel repeated HistoryEntry causal_history = 11; // List of preceding message IDs that this message causally depends on. Generally 2 or 3 message IDs are included. optional bytes bloom_filter = 12; // Bloom filter representing received message IDs in channel + + repeated HistoryEntry repair_request = 13; // Capped list of history entries missing from sender's causal history. Only populated if using the optional SDS Repair extension. + optional bytes content = 20; // Actual content of the message } ``` @@ -111,7 +119,11 @@ Its importance is expected to increase once a p2p retrieval mechanism is added t Each participant MUST maintain: * A Lamport timestamp for each channel of communication, -initialized to current epoch time in nanosecond resolution. +initialized to current epoch time in millisecond resolution. +The Lamport timestamp is increased as described in the [protocol steps](#protocol-steps) +to maintain a logical ordering of events while staying close to the current epoch time. +This allows the messages from new joiners to be correctly ordered with other recent messages, +without these new participants first having to synchronize past messages to discover the current Lamport timestamp. * A bloom filter for received message IDs per channel. The bloom filter SHOULD be rolled over and recomputed once it reaches a predefined capacity of message IDs. @@ -144,8 +156,11 @@ the `lamport_timestamp`, `causal_history` and `bloom_filter` fields. Before broadcasting a message: -* the participant MUST increase its local Lamport timestamp by `1` and -include this in the `lamport_timestamp` field. +* the participant MUST set its local Lamport timestamp +to the maximum between the current value + `1` +and the current epoch time in milliseconds. +In other words the local Lamport timestamp is set to `max(timeNowInMs, current_lamport_timestamp + 1)`. +* the participant MUST include the increased Lamport timestamp in the message's `lamport_timestamp` field. * the participant MUST determine the preceding few message IDs in the local history and include these in an ordered list in the `causal_history` field. The number of message IDs to include in the `causal_history` depends on the application. @@ -250,7 +265,8 @@ participants SHOULD periodically send sync messages to maintain state. These sync messages: * MUST be sent with empty content -* MUST include an incremented Lamport timestamp +* MUST include a Lamport timestamp increased to `max(timeNowInMs, current_lamport_timestamp + 1)`, +where `timeNowInMs` is the current epoch time in milliseconds. * MUST include causal history and bloom filter according to regular message rules * MUST NOT be added to the unacknowledged outgoing buffer * MUST NOT be included in causal histories of subsequent messages @@ -281,82 +297,181 @@ Upon reception, ephemeral messages SHOULD be delivered immediately without buffering for causal dependencies or including in the local log. -## Implementation Suggestions - -This section provides practical guidance based on the js-waku implementation of SDS. - -### Default Configuration Values - -The js-waku implementation uses the following defaults: -- **Bloom filter capacity**: 10,000 messages -- **Bloom filter error rate**: 0.001 (0.1% false positive rate) -- **Causal history size**: 200 message IDs -- **Possible ACKs threshold**: 2 bloom filter hits before considering a message acknowledged - -With 200 messages in causal history, assuming 32-byte message IDs and 32-byte retrieval hints (e.g., Waku message hashes), -each message carries 200 × 64 bytes = 12.8 KB of causal history overhead. +### SDS Repair (SDS-R) + +SDS Repair (SDS-R) is an optional extension module for SDS, +allowing participants in a communication to collectively repair any gaps in causal history (missing messages) +preferably over a limited time window. +Since SDS-R acts as coordinated rebroadcasting of missing messages, +which involves all participants of the communication, +it is most appropriate in a limited use case for repairing relatively recent missed dependencies. +It is not meant to replace mechanisms for long-term consistency, +such as peer-to-peer syncing or the use of a high-availability centralised cache (Store node). + +#### SDS-R message fields + +SDS-R adds the following fields to SDS messages: +* `sender_id` in `HistoryEntry`: +the original message sender's participant ID. +This is used to determine the group of participants who will respond to a repair request. +* `repair_request` in `Message`: +a capped list of history entries missing for the message sender +and for which it's requesting a repair. + +#### SDS-R participant state + +SDS-R adds the following to each participant state: + +* Outgoing **repair request buffer**: +a list of locally missing `HistoryEntry`s +each mapped to a future request timestamp, `T_req`, +after which this participant will request a repair if at that point the missing dependency has not been repaired yet. +`T_req` is computed as a pseudorandom backoff from the timestamp when the dependency was detected missing. +[Determining `T_req`](#determine-t_req) is described below. +We RECOMMEND that the outgoing repair request buffer be chronologically ordered in ascending order of `T_req`. +- Incoming **repair request buffer**: +a list of locally available `HistoryEntry`s +that were requested for repair by a remote participant +AND for which this participant might be an eligible responder, +each mapped to a future response timestamp, `T_resp`, +after which this participant will rebroadcast the corresponding requested `Message` if at that point no other participant had rebroadcast the `Message`. +`T_resp` is computed as a pseudorandom backoff from the timestamp when the repair was first requested. +[Determining `T_resp`](#determine-t_resp) is described below. +We describe below how a participant can [determine if they're an eligible responder](#determine-response-group) for a specific repair request. +- Augmented local history log: +for each message ID kept in the local log for which the participant could be a repair responder, +the full SDS `Message` must be cached rather than just the message ID, +in case this participant is called upon to rebroadcast the message. +We describe below how a participant can [determine if they're an eligible responder](#determine-response-group) for a specific message. + +**_Note:_** The required state can likely be significantly reduced in future by simply requiring that a responding participant should _reconstruct_ the original `Message` when rebroadcasting, rather than the simpler, but heavier, requirement of caching the entire received `Message` content in local history. + +#### SDS-R global state + +For a specific channel (that is, within a specific SDS-controlled communication) +the following SDS-R configuration state SHOULD be common for all participants in the conversation: + +* `T_min`: the _minimum_ time period to wait before a missing causal entry can be repaired. +We RECOMMEND a value of at least 30 seconds. +* `T_max`: the _maximum_ time period over which missing causal entries can be repaired. +We RECOMMEND a value of between 120 and 600 seconds. + +Furthermore, to avoid a broadcast storm with multiple participants responding to a repair request, +participants in a single channel MAY be divided into discrete response groups. +Participants will only respond to a repair request if they are in the response group for that request. +The global `num_response_groups` variable configures the number of response groups for this communication. +Its use is described below. +A reasonable default value for `num_response_groups` is one response group for every `128` participants. +In other words, if the (roughly) expected number of participants is expressed as `num_participants`, then +`num_response_groups = num_participants div 128 + 1`. +In other words, if there are fewer than 128 participants in a communication, +they will all belong to the same response group. + +We RECOMMEND that the global state variables `T_min`, `T_max` and `num_response_groups` be set _statically_ for a specific SDS-R application, +based on expected number of group participants and volume of traffic. + +**_Note:_** Future versions of this protocol will recommend dynamic global SDS-R variables, based on the current number of participants. + +#### SDS-R send message + +SDS-R adds the following steps when sending a message: + +Before broadcasting a message, +* the participant SHOULD populate the `repair_request` field in the message +with _eligible_ entries from the outgoing repair request buffer. +An entry is eligible to be included in a `repair_request` +if its corresponding request timestamp, `T_req`, has expired (in other words, `T_req <= current_time`). +The maximum number of repair request entries to include is up to the application. +We RECOMMEND that this quota be filled by the eligible entries from the outgoing repair request buffer with the lowest `T_req`. +We RECOMMEND a maximum of 3 entries. +If there are no eligible entries in the buffer, this optional field MUST be left unset. + +#### SDS-R receive message + +On receiving a message, +* the participant MUST remove entries matching the received message ID from its _outgoing_ repair request buffer. +This ensures that the participant does not request repairs for dependencies that have now been met. +* the participant MUST remove entries matching the received message ID from its _incoming_ repair request buffer. +This ensures that the participant does not respond to repair requests that another participant has already responded to. +* the participant SHOULD check for any unmet causal dependencies that do not yet have a corresponding entry in its outgoing repair request buffer. +For each such dependency, the participant SHOULD add a new entry against a unique `T_req` timestamp. +It MUST compute the `T_req` for each such HistoryEntry according to the steps outlined in [_Determine T_req_](#determine-t_req). +* for each item in the `repair_request` field: + - the participant MUST remove entries matching the repair message ID from its own outgoing repair request buffer. + This limits the number of participants that will request a common missing dependency. + - if the participant has the requested `Message` in its local history _and_ is an eligible responder for the repair request, + it SHOULD add the request to its incoming repair request buffer against a unique `T_resp` timestamp for that entry. + It MUST compute the `T_resp` for each such repair request according to the steps outlined in [_Determine T_resp_](#determine-t_resp). + It MUST determine if it's an eligible responder for a repair request according to the steps outlined in [_Determine response group_](#determine-response-group). + +#### Determine T_req + +A participant determines the repair request timestamp, `T_req`, +for a missing `HistoryEntry` as follows: -### External Task Scheduling - -The js-waku implementation delegates periodic task scheduling to the library consumer by providing methods: +``` +T_req = current_time + hash(participant_id, message_id) % (T_max - T_min) + T_min +``` -- `processTasks()`: Process queued send/receive operations -- `sweepIncomingBuffer()`: Check and deliver messages with met dependencies, returns missing dependencies -- `sweepOutgoingBuffer()`: Return unacknowledged and possibly acknowledged messages for retry -- `pushOutgoingSyncMessage(callback)`: Send a sync message +where `current_time` is the current timestamp, +`participant_id` is the participant's _own_ participant ID (not the `sender_id` in the missing `HistoryEntry`), +`message_id` is the missing `HistoryEntry`'s message ID, +and `T_min` and `T_max` are as set out in [SDS-R global state](#sds-r-global-state). -The implementation does not include internal timers, -allowing applications to integrate SDS with their existing scheduling infrastructure. +This allows `T_req` to be pseudorandomly and linearly distributed as a backoff of between `T_min` and `T_max` from current time. -### Message Processing +> **_Note:_** placing `T_req` values on an exponential backoff curve will likely be more appropriate and is left for a future improvement. -#### Handling Missing Dependencies +#### Determine T_resp -When `sweepIncomingBuffer()` returns missing dependencies, -the implementation emits an `InMessageMissing` event with `HistoryEntry[]` containing: -- `messageId`: The missing message identifier -- `retrievalHint`: Optional bytes to help retrieve the message (e.g., transport-specific hash) +A participant determines the repair response timestamp, `T_resp`, +for a `HistoryEntry` that it could repair as follows: -#### Timeout for Lost Messages +``` +distance = hash(participant_id) XOR hash(sender_id) +T_resp = current_time + distance*hash(message_id) % T_max +``` -The `timeoutForLostMessagesMs` option allows marking messages as irretrievably lost after a timeout. -When configured, the implementation emits an `InMessageLost` event after the timeout expires. +where `current_time` is the current timestamp, +`participant_id` is the participant's _own_ (local) participant ID, +`sender_id` is the requested `HistoryEntry` sender ID, +`message_id` is the requested `HistoryEntry` message ID, +and `T_max` is as set out in [SDS-R global state](#sds-r-global-state). -### Events Emitted +We first calculate the logical `distance` between the local `participant_id` and the original `sender_id`. +If this participant is the original sender, the `distance` will be `0`. +It should then be clear that the original participant will have a response backoff time of `0`, making it the most likely responder. +The `T_resp` values for other eligible participants will be pseudorandomly and linearly distributed as a backoff of up to `T_max` from current time. -The js-waku implementation uses a `TypedEventEmitter` pattern to emit events for: -- **Incoming messages**: received, delivered, missing dependencies, lost (after timeout) -- **Outgoing messages**: sent, acknowledged, possibly acknowledged -- **Sync messages**: sent, received -- **Errors**: task execution failures +> **_Note:_** placing `T_resp` values on an exponential backoff curve will likely be more appropriate and is left for a future improvement. -### SDK Usage: ReliableChannel +#### Determine response group -The SDK provides a high-level `ReliableChannel` abstraction that wraps the core SDS `MessageChannel` with automatic task scheduling and Waku protocol integration: +Given a message with `sender_id` and `message_id`, +a participant with `participant_id` is in the response group for that message if -#### Configuration +``` +hash(participant_id, message_id) % num_response_groups == hash(sender_id, message_id) % num_response_groups +``` -The ReliableChannel uses these default intervals: -- **Sync message interval**: 30 seconds minimum between sync messages (randomized backoff) -- **Retry interval**: 30 seconds for unacknowledged messages -- **Max retry attempts**: 10 attempts before giving up -- **Store query interval**: 10 seconds for missing message retrieval +where `num_response_groups` is as set out in [SDS-R global state](#sds-r-global-state). +This ensures that a participant will always be in the response group for its own published messages. +It also allows participants to determine immediately on first reception of a message or a history entry +if they are in the associated response group. -#### Task Scheduling Implementation +#### SDS-R incoming repair request buffer sweep -The SDK automatically schedules SDS periodic tasks: -- **Sync messages**: Uses exponential backoff with randomization; sent faster (0.5x multiplier) after receiving content to acknowledge others -- **Outgoing buffer sweeps**: Triggered after each retry interval for unacknowledged messages -- **Incoming buffer sweeps**: Performed after each incoming message and during missing message retrieval -- **Process tasks**: Called immediately after sending/receiving messages and during sync +An SDS-R participant MUST periodically check if there are any incoming requests in the *incoming repair request buffer* that is due for a response. +For each item in the buffer, +the participant SHOULD broadcast the corresponding `Message` from local history +if its corresponding response timestamp, `T_resp`, has expired (in other words, `T_resp <= current_time`). -#### Integration with Waku Protocols +#### SDS-R Periodic Sync Message -ReliableChannel integrates SDS with Waku: -- **Sending**: Uses LightPush or Relay protocols; includes Waku message hash as retrieval hint (32 bytes) -- **Receiving**: Subscribes via Filter protocol; unwraps SDS messages before passing to application -- **Missing message retrieval**: Queries Store nodes using retrieval hints from causal history -- **Query on connect**: Automatically queries Store when connecting to new peers (enabled by default) +If the participant is due to send a periodic sync message, +it SHOULD send the message according to [SDS-R send message](#sds-r-send-message) +if there are any eligible items in the outgoing repair request buffer, +regardless of whether other participants have also recently broadcast a Periodic Sync message. ## Copyright From 62aae5148f7e2ef080a8ef8be16d4dd3a83bb182 Mon Sep 17 00:00:00 2001 From: jm-clius Date: Thu, 23 Oct 2025 15:25:54 +0100 Subject: [PATCH 03/10] fix: fix implementation guide, remove unrelated claude file --- .claude/settings.local.json | 12 - SDS-R-Implementation-Guide.md | 22 +- sds.md | 478 ---------------------------------- 3 files changed, 12 insertions(+), 500 deletions(-) delete mode 100644 .claude/settings.local.json delete mode 100644 sds.md diff --git a/.claude/settings.local.json b/.claude/settings.local.json deleted file mode 100644 index 5aedfd8c6f..0000000000 --- a/.claude/settings.local.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "permissions": { - "allow": [ - "Bash(npm run build:*)", - "Bash(npm run check:*)", - "Bash(npm test)", - "Bash(npm run test:node:*)" - ], - "deny": [], - "ask": [] - } -} \ No newline at end of file diff --git a/SDS-R-Implementation-Guide.md b/SDS-R-Implementation-Guide.md index 4456822946..d47b9d25da 100644 --- a/SDS-R-Implementation-Guide.md +++ b/SDS-R-Implementation-Guide.md @@ -16,21 +16,23 @@ When a participant detects missing messages (via causal dependencies), it waits message HistoryEntry { string message_id = 1; optional bytes retrieval_hint = 2; - optional string sender_id = 3; // NEW: Original sender's ID (only for SDS-R) + optional string sender_id = 3; // NEW: Participant ID of original message sender (only for SDS-R) } -message Message { - string sender_id = 1; - string message_id = 2; - string channel_id = 3; - optional int32 lamport_timestamp = 10; - repeated HistoryEntry causal_history = 11; - optional bytes bloom_filter = 12; - repeated HistoryEntry repair_request = 13; // NEW: List of missing messages - optional bytes content = 20; +message SdsMessage { + string sender_id = 1; // Participant ID of the message sender + string message_id = 2; // Unique identifier of the message + string channel_id = 3; // Identifier of the channel to which the message belongs + optional uint64 lamport_timestamp = 10; // Logical timestamp for causal ordering in channel + repeated HistoryEntry causal_history = 11; // List of preceding message IDs that this message causally depends on + optional bytes bloom_filter = 12; // Bloom filter representing received message IDs in channel + repeated HistoryEntry repair_request = 13; // NEW: Capped list of missing messages (only for SDS-R) + optional bytes content = 20; // Actual content of the message } ``` +**Note**: The actual implementation uses `SdsMessage` (not `Message`) and `uint64` for lamport_timestamp (not `int32`). + ### Additional Participant State Each participant must maintain: diff --git a/sds.md b/sds.md deleted file mode 100644 index acb683ccf2..0000000000 --- a/sds.md +++ /dev/null @@ -1,478 +0,0 @@ ---- -title: SDS -name: Scalable Data Sync protocol for distributed logs -status: raw -editor: Hanno Cornelius -contributors: - - Akhil Peddireddy ---- - -## Abstract - -This specification introduces the Scalable Data Sync (SDS) protocol -to achieve end-to-end reliability -when consolidating distributed logs in a decentralized manner. -The protocol is designed for a peer-to-peer (p2p) topology -where an append-only log is maintained by each member of a group of nodes -who may individually append new entries to their local log at any time and -is interested in merging new entries from other nodes in real-time or close to real-time -while maintaining a consistent order. -The outcome of the log consolidation procedure is -that all nodes in the group eventually reflect in their own logs -the same entries in the same order. -The protocol aims to scale to very large groups. - -## Motivation - -A common application that fits this model is a p2p group chat (or group communication), -where the participants act as log nodes -and the group conversation is modelled as the consolidated logs -maintained on each node. -The problem of end-to-end reliability can then be stated as -ensuring that all participants eventually see the same sequence of messages -in the same causal order, -despite the challenges of network latency, message loss, -and scalability present in any communications transport layer. -The rest of this document will assume the terminology of a group communication: -log nodes being the _participants_ in the group chat -and the logged entries being the _messages_ exchanged between participants. - -## Design Assumptions - -We make the following simplifying assumptions for a proposed reliability protocol: - -* **Broadcast routing:** -Messages are broadcast disseminated by the underlying transport. -The selected transport takes care of routing messages -to all participants of the communication. -* **Store nodes:** -There are high-availability caches (a.k.a. Store nodes) -from which missed messages can be retrieved. -These caches maintain the full history of all messages that have been broadcast. -This is an optional element in the protocol design, -but improves scalability by reducing direct interactions between participants. -* **Message ID:** -Each message has a globally unique, immutable ID (or hash). -Messages can be requested from the high-availability caches or -other participants using the corresponding message ID. -* **Participant ID:** -Each participant has a globally unique, immutable ID -visible to other participants in the communication. -* **Sender ID:** -The *Participant ID* of the original sender of a message, -often coupled with a *Message ID*. - -## Wire protocol - -The keywords “MUST”, “MUST NOT”, “REQUIRED”, “SHALL”, “SHALL NOT”, “SHOULD”, -“SHOULD NOT”, “RECOMMENDED”, “MAY”, and - “OPTIONAL” in this document are to be interpreted as described in [2119](https://www.ietf.org/rfc/rfc2119.txt). - -### Message - -Messages MUST adhere to the following meta structure: - -```protobuf -syntax = "proto3"; - -message HistoryEntry { - string message_id = 1; // Unique identifier of the SDS message, as defined in `Message` - optional bytes retrieval_hint = 2; // Optional information to help remote parties retrieve this SDS message; For example, A Waku deterministic message hash or routing payload hash - - optional string sender_id = 3; // Participant ID of original message sender. Only populated if using optional SDS Repair extension -} - -message Message { - string sender_id = 1; // Participant ID of the message sender - string message_id = 2; // Unique identifier of the message - string channel_id = 3; // Identifier of the channel to which the message belongs - optional uint64 lamport_timestamp = 10; // Logical timestamp for causal ordering in channel - repeated HistoryEntry causal_history = 11; // List of preceding message IDs that this message causally depends on. Generally 2 or 3 message IDs are included. - optional bytes bloom_filter = 12; // Bloom filter representing received message IDs in channel - - repeated HistoryEntry repair_request = 13; // Capped list of history entries missing from sender's causal history. Only populated if using the optional SDS Repair extension. - - optional bytes content = 20; // Actual content of the message -} -``` - -The sending participant MUST include its own globally unique identifier in the `sender_id` field. -In addition, it MUST include a globally unique identifier for the message in the `message_id` field, -likely based on a message hash. -The `channel_id` field MUST be set to the identifier of the channel of group communication -that is being synchronized. -For simple group communications without individual channels, -the `channel_id` SHOULD be set to `0`. -The `lamport_timestamp`, `causal_history` and -`bloom_filter` fields MUST be set according to the [protocol steps](#protocol-steps) -set out below. -These fields MAY be left unset in the case of [ephemeral messages](#ephemeral-messages). -The message `content` MAY be left empty for [periodic sync messages](#periodic-sync-message), -otherwise it MUST contain the application-level content - -> **_Note:_** Close readers may notice that, outside of filtering messages originating from the sender itself, -the `sender_id` field is not used for much. -Its importance is expected to increase once a p2p retrieval mechanism is added to SDS, as is planned for the protocol. - -### Participant state - -Each participant MUST maintain: - -* A Lamport timestamp for each channel of communication, -initialized to current epoch time in millisecond resolution. -The Lamport timestamp is increased as described in the [protocol steps](#protocol-steps) -to maintain a logical ordering of events while staying close to the current epoch time. -This allows the messages from new joiners to be correctly ordered with other recent messages, -without these new participants first having to synchronize past messages to discover the current Lamport timestamp. -* A bloom filter for received message IDs per channel. -The bloom filter SHOULD be rolled over and -recomputed once it reaches a predefined capacity of message IDs. -Furthermore, -it SHOULD be designed to minimize false positives through an optimal selection of -size and hash functions. -* A buffer for unacknowledged outgoing messages -* A buffer for incoming messages with unmet causal dependencies -* A local log (or history) for each channel, -containing all message IDs in the communication channel, -ordered by Lamport timestamp. - -Messages in the unacknowledged outgoing buffer can be in one of three states: - -1. **Unacknowledged** - there has been no acknowledgement of message receipt -by any participant in the channel -2. **Possibly acknowledged** - there has been ambiguous indication that the message -has been _possibly_ received by at least one participant in the channel -3. **Acknowledged** - there has been sufficient indication that the message -has been received by at least some of the participants in the channel. -This state will also remove the message from the outgoing buffer. - -### Protocol Steps - -For each channel of communication, -participants MUST follow these protocol steps to populate and interpret -the `lamport_timestamp`, `causal_history` and `bloom_filter` fields. - -#### Send Message - -Before broadcasting a message: - -* the participant MUST set its local Lamport timestamp -to the maximum between the current value + `1` -and the current epoch time in milliseconds. -In other words the local Lamport timestamp is set to `max(timeNowInMs, current_lamport_timestamp + 1)`. -* the participant MUST include the increased Lamport timestamp in the message's `lamport_timestamp` field. -* the participant MUST determine the preceding few message IDs in the local history -and include these in an ordered list in the `causal_history` field. -The number of message IDs to include in the `causal_history` depends on the application. -We recommend a causal history of two message IDs. -* the participant MAY include a `retrieval_hint` in the `HistoryEntry` -for each message ID in the `causal_history` field. -This is an application-specific field to facilitate retrieval of messages, -e.g. from high-availability caches. -* the participant MUST include the current `bloom_filter` -state in the broadcast message. - -After broadcasting a message, -the message MUST be added to the participant’s buffer -of unacknowledged outgoing messages. - -#### Receive Message - -Upon receiving a message, - -* the participant SHOULD ignore the message if it has a `sender_id` matching its own. -* the participant MAY deduplicate the message by comparing its `message_id` to previously received message IDs. -* the participant MUST [review the ACK status](#review-ack-status) of messages -in its unacknowledged outgoing buffer -using the received message's causal history and bloom filter. -* if the message has a populated `content` field, -the participant MUST include the received message ID in its local bloom filter. -* the participant MUST verify that all causal dependencies are met -for the received message. -Dependencies are met if the message IDs in the `causal_history` of the received message -appear in the local history of the receiving participant. - -If all dependencies are met and the message has a populated `content` field, -the participant MUST [deliver the message](#deliver-message). -If dependencies are unmet, -the participant MUST add the message to the incoming buffer of messages -with unmet causal dependencies. - -#### Deliver Message - -Triggered by the [Receive Message](#receive-message) procedure. - -If the received message’s Lamport timestamp is greater than the participant's -local Lamport timestamp, -the participant MUST update its local Lamport timestamp to match the received message. -The participant MUST insert the message ID into its local log, -based on Lamport timestamp. -If one or more message IDs with the same Lamport timestamp already exists, -the participant MUST follow the [Resolve Conflicts](#resolve-conflicts) procedure. - -#### Resolve Conflicts - -Triggered by the [Deliver Message](#deliver-message) procedure. - -The participant MUST order messages with the same Lamport timestamp -in ascending order of message ID. -If the message ID is implemented as a hash of the message, -this means the message with the lowest hash would precede -other messages with the same Lamport timestamp in the local log. - -#### Review ACK Status - -Triggered by the [Receive Message](#receive-message) procedure. - -For each message in the unacknowledged outgoing buffer, -based on the received `bloom_filter` and `causal_history`: - -* the participant MUST mark all messages in the received `causal_history` as **acknowledged**. -* the participant MUST mark all messages included in the `bloom_filter` -as **possibly acknowledged**. -If a message appears as **possibly acknowledged** in multiple received bloom filters, -the participant MAY mark it as acknowledged based on probabilistic grounds, -taking into account the bloom filter size and hash number. - -#### Periodic Incoming Buffer Sweep - -The participant MUST periodically check causal dependencies for each message -in the incoming buffer. -For each message in the incoming buffer: - -* the participant MAY attempt to retrieve missing dependencies from the Store node -(high-availability cache) or other peers. -It MAY use the application-specific `retrieval_hint` in the `HistoryEntry` to facilitate retrieval. -* if all dependencies of a message are met, -the participant MUST proceed to [deliver the message](#deliver-message). - -If a message's causal dependencies have failed to be met -after a predetermined amount of time, -the participant MAY mark them as **irretrievably lost**. - -#### Periodic Outgoing Buffer Sweep - -The participant MUST rebroadcast **unacknowledged** outgoing messages -after a set period. -The participant SHOULD use distinct resend periods for **unacknowledged** and -**possibly acknowledged** messages, -prioritizing **unacknowledged** messages. - -#### Periodic Sync Message - -For each channel of communication, -participants SHOULD periodically send sync messages to maintain state. -These sync messages: - -* MUST be sent with empty content -* MUST include a Lamport timestamp increased to `max(timeNowInMs, current_lamport_timestamp + 1)`, -where `timeNowInMs` is the current epoch time in milliseconds. -* MUST include causal history and bloom filter according to regular message rules -* MUST NOT be added to the unacknowledged outgoing buffer -* MUST NOT be included in causal histories of subsequent messages -* MUST NOT be included in bloom filters -* MUST NOT be added to the local log - -Since sync messages are not persisted, -they MAY have non-unique message IDs without impacting the protocol. -To avoid network activity bursts in large groups, -a participant MAY choose to only send periodic sync messages -if no other messages have been broadcast in the channel after a random backoff period. - -Participants MUST process the causal history and bloom filter of these sync messages -following the same steps as regular messages, -but MUST NOT persist the sync messages themselves. - -#### Ephemeral Messages - -Participants MAY choose to send short-lived messages for which no synchronization -or reliability is required. -These messages are termed _ephemeral_. - -Ephemeral messages SHOULD be sent with `lamport_timestamp`, `causal_history`, and -`bloom_filter` unset. -Ephemeral messages SHOULD NOT be added to the unacknowledged outgoing buffer -after broadcast. -Upon reception, -ephemeral messages SHOULD be delivered immediately without buffering for causal dependencies -or including in the local log. - -### SDS Repair (SDS-R) - -SDS Repair (SDS-R) is an optional extension module for SDS, -allowing participants in a communication to collectively repair any gaps in causal history (missing messages) -preferably over a limited time window. -Since SDS-R acts as coordinated rebroadcasting of missing messages, -which involves all participants of the communication, -it is most appropriate in a limited use case for repairing relatively recent missed dependencies. -It is not meant to replace mechanisms for long-term consistency, -such as peer-to-peer syncing or the use of a high-availability centralised cache (Store node). - -#### SDS-R message fields - -SDS-R adds the following fields to SDS messages: -* `sender_id` in `HistoryEntry`: -the original message sender's participant ID. -This is used to determine the group of participants who will respond to a repair request. -* `repair_request` in `Message`: -a capped list of history entries missing for the message sender -and for which it's requesting a repair. - -#### SDS-R participant state - -SDS-R adds the following to each participant state: - -* Outgoing **repair request buffer**: -a list of locally missing `HistoryEntry`s -each mapped to a future request timestamp, `T_req`, -after which this participant will request a repair if at that point the missing dependency has not been repaired yet. -`T_req` is computed as a pseudorandom backoff from the timestamp when the dependency was detected missing. -[Determining `T_req`](#determine-t_req) is described below. -We RECOMMEND that the outgoing repair request buffer be chronologically ordered in ascending order of `T_req`. -- Incoming **repair request buffer**: -a list of locally available `HistoryEntry`s -that were requested for repair by a remote participant -AND for which this participant might be an eligible responder, -each mapped to a future response timestamp, `T_resp`, -after which this participant will rebroadcast the corresponding requested `Message` if at that point no other participant had rebroadcast the `Message`. -`T_resp` is computed as a pseudorandom backoff from the timestamp when the repair was first requested. -[Determining `T_resp`](#determine-t_resp) is described below. -We describe below how a participant can [determine if they're an eligible responder](#determine-response-group) for a specific repair request. -- Augmented local history log: -for each message ID kept in the local log for which the participant could be a repair responder, -the full SDS `Message` must be cached rather than just the message ID, -in case this participant is called upon to rebroadcast the message. -We describe below how a participant can [determine if they're an eligible responder](#determine-response-group) for a specific message. - -**_Note:_** The required state can likely be significantly reduced in future by simply requiring that a responding participant should _reconstruct_ the original `Message` when rebroadcasting, rather than the simpler, but heavier, requirement of caching the entire received `Message` content in local history. - -#### SDS-R global state - -For a specific channel (that is, within a specific SDS-controlled communication) -the following SDS-R configuration state SHOULD be common for all participants in the conversation: - -* `T_min`: the _minimum_ time period to wait before a missing causal entry can be repaired. -We RECOMMEND a value of at least 30 seconds. -* `T_max`: the _maximum_ time period over which missing causal entries can be repaired. -We RECOMMEND a value of between 120 and 600 seconds. - -Furthermore, to avoid a broadcast storm with multiple participants responding to a repair request, -participants in a single channel MAY be divided into discrete response groups. -Participants will only respond to a repair request if they are in the response group for that request. -The global `num_response_groups` variable configures the number of response groups for this communication. -Its use is described below. -A reasonable default value for `num_response_groups` is one response group for every `128` participants. -In other words, if the (roughly) expected number of participants is expressed as `num_participants`, then -`num_response_groups = num_participants div 128 + 1`. -In other words, if there are fewer than 128 participants in a communication, -they will all belong to the same response group. - -We RECOMMEND that the global state variables `T_min`, `T_max` and `num_response_groups` be set _statically_ for a specific SDS-R application, -based on expected number of group participants and volume of traffic. - -**_Note:_** Future versions of this protocol will recommend dynamic global SDS-R variables, based on the current number of participants. - -#### SDS-R send message - -SDS-R adds the following steps when sending a message: - -Before broadcasting a message, -* the participant SHOULD populate the `repair_request` field in the message -with _eligible_ entries from the outgoing repair request buffer. -An entry is eligible to be included in a `repair_request` -if its corresponding request timestamp, `T_req`, has expired (in other words, `T_req <= current_time`). -The maximum number of repair request entries to include is up to the application. -We RECOMMEND that this quota be filled by the eligible entries from the outgoing repair request buffer with the lowest `T_req`. -We RECOMMEND a maximum of 3 entries. -If there are no eligible entries in the buffer, this optional field MUST be left unset. - -#### SDS-R receive message - -On receiving a message, -* the participant MUST remove entries matching the received message ID from its _outgoing_ repair request buffer. -This ensures that the participant does not request repairs for dependencies that have now been met. -* the participant MUST remove entries matching the received message ID from its _incoming_ repair request buffer. -This ensures that the participant does not respond to repair requests that another participant has already responded to. -* the participant SHOULD check for any unmet causal dependencies that do not yet have a corresponding entry in its outgoing repair request buffer. -For each such dependency, the participant SHOULD add a new entry against a unique `T_req` timestamp. -It MUST compute the `T_req` for each such HistoryEntry according to the steps outlined in [_Determine T_req_](#determine-t_req). -* for each item in the `repair_request` field: - - the participant MUST remove entries matching the repair message ID from its own outgoing repair request buffer. - This limits the number of participants that will request a common missing dependency. - - if the participant has the requested `Message` in its local history _and_ is an eligible responder for the repair request, - it SHOULD add the request to its incoming repair request buffer against a unique `T_resp` timestamp for that entry. - It MUST compute the `T_resp` for each such repair request according to the steps outlined in [_Determine T_resp_](#determine-t_resp). - It MUST determine if it's an eligible responder for a repair request according to the steps outlined in [_Determine response group_](#determine-response-group). - -#### Determine T_req - -A participant determines the repair request timestamp, `T_req`, -for a missing `HistoryEntry` as follows: - -``` -T_req = current_time + hash(participant_id, message_id) % (T_max - T_min) + T_min -``` - -where `current_time` is the current timestamp, -`participant_id` is the participant's _own_ participant ID (not the `sender_id` in the missing `HistoryEntry`), -`message_id` is the missing `HistoryEntry`'s message ID, -and `T_min` and `T_max` are as set out in [SDS-R global state](#sds-r-global-state). - -This allows `T_req` to be pseudorandomly and linearly distributed as a backoff of between `T_min` and `T_max` from current time. - -> **_Note:_** placing `T_req` values on an exponential backoff curve will likely be more appropriate and is left for a future improvement. - -#### Determine T_resp - -A participant determines the repair response timestamp, `T_resp`, -for a `HistoryEntry` that it could repair as follows: - -``` -distance = hash(participant_id) XOR hash(sender_id) -T_resp = current_time + distance*hash(message_id) % T_max -``` - -where `current_time` is the current timestamp, -`participant_id` is the participant's _own_ (local) participant ID, -`sender_id` is the requested `HistoryEntry` sender ID, -`message_id` is the requested `HistoryEntry` message ID, -and `T_max` is as set out in [SDS-R global state](#sds-r-global-state). - -We first calculate the logical `distance` between the local `participant_id` and the original `sender_id`. -If this participant is the original sender, the `distance` will be `0`. -It should then be clear that the original participant will have a response backoff time of `0`, making it the most likely responder. -The `T_resp` values for other eligible participants will be pseudorandomly and linearly distributed as a backoff of up to `T_max` from current time. - -> **_Note:_** placing `T_resp` values on an exponential backoff curve will likely be more appropriate and is left for a future improvement. - -#### Determine response group - -Given a message with `sender_id` and `message_id`, -a participant with `participant_id` is in the response group for that message if - -``` -hash(participant_id, message_id) % num_response_groups == hash(sender_id, message_id) % num_response_groups -``` - -where `num_response_groups` is as set out in [SDS-R global state](#sds-r-global-state). -This ensures that a participant will always be in the response group for its own published messages. -It also allows participants to determine immediately on first reception of a message or a history entry -if they are in the associated response group. - -#### SDS-R incoming repair request buffer sweep - -An SDS-R participant MUST periodically check if there are any incoming requests in the *incoming repair request buffer* that is due for a response. -For each item in the buffer, -the participant SHOULD broadcast the corresponding `Message` from local history -if its corresponding response timestamp, `T_resp`, has expired (in other words, `T_resp <= current_time`). - -#### SDS-R Periodic Sync Message - -If the participant is due to send a periodic sync message, -it SHOULD send the message according to [SDS-R send message](#sds-r-send-message) -if there are any eligible items in the outgoing repair request buffer, -regardless of whether other participants have also recently broadcast a Periodic Sync message. - -## Copyright - -Copyright and related rights waived via [CC0](https://creativecommons.org/publicdomain/zero/1.0/). From fd357f914240e0ff3f4df9718047a2a1846736b9 Mon Sep 17 00:00:00 2001 From: jm-clius Date: Fri, 10 Oct 2025 19:41:28 +0100 Subject: [PATCH 04/10] feat: integrate sds-r within reliable channels SDK --- .../src/reliable_channel/reliable_channel.ts | 145 ++++++++++++++++-- .../src/message_channel/message_channel.ts | 25 +++ .../sds/src/message_channel/repair/repair.ts | 68 ++++++++ 3 files changed, 229 insertions(+), 9 deletions(-) diff --git a/packages/sdk/src/reliable_channel/reliable_channel.ts b/packages/sdk/src/reliable_channel/reliable_channel.ts index 49b55aa495..03418a72ea 100644 --- a/packages/sdk/src/reliable_channel/reliable_channel.ts +++ b/packages/sdk/src/reliable_channel/reliable_channel.ts @@ -36,10 +36,13 @@ import { RetryManager } from "./retry_manager.js"; const log = new Logger("sdk:reliable-channel"); const DEFAULT_SYNC_MIN_INTERVAL_MS = 30 * 1000; // 30 seconds +const DEFAULT_SYNC_MIN_INTERVAL_WITH_REPAIRS_MS = 10 * 1000; // 10 seconds when repairs pending const DEFAULT_RETRY_INTERVAL_MS = 30 * 1000; // 30 seconds const DEFAULT_MAX_RETRY_ATTEMPTS = 10; const DEFAULT_SWEEP_IN_BUF_INTERVAL_MS = 5 * 1000; +const DEFAULT_SWEEP_REPAIR_INTERVAL_MS = 10 * 1000; // 10 seconds const DEFAULT_PROCESS_TASK_MIN_ELAPSE_MS = 1000; +const DEFAULT_SDSR_FALLBACK_TIMEOUT_MS = 120 * 1000; // 2 minutes const IRRECOVERABLE_SENDING_ERRORS: LightPushError[] = [ LightPushError.ENCODE_FAILED, @@ -78,6 +81,7 @@ export type ReliableChannelOptions = MessageChannelOptions & { /** * How often store queries are done to retrieve missing messages. + * Only applies when retrievalStrategy includes Store ('both' or 'store-only'). * * @default 10,000 (10 seconds) */ @@ -111,6 +115,25 @@ export type ReliableChannelOptions = MessageChannelOptions & { * @default 1000 (1 second) */ processTaskMinElapseMs?: number; + + /** + * Strategy for retrieving missing messages. + * - 'both': Use SDS-R peer repair and Store queries (default) + * - 'sds-r-only': Only use SDS-R peer repair + * - 'store-only': Only use Store queries (legacy behavior) + * - 'none': No automatic retrieval + * + * @default 'both' + */ + retrievalStrategy?: "both" | "sds-r-only" | "store-only" | "none"; + + /** + * How long to wait for SDS-R repair before falling back to Store. + * Only applies when retrievalStrategy is 'both'. + * + * @default 120,000 (2 minutes - matches SDS-R T_max) + */ + sdsrFallbackTimeoutMs?: number; }; /** @@ -145,11 +168,22 @@ export class ReliableChannel< private syncTimeout: ReturnType | undefined; private sweepInBufInterval: ReturnType | undefined; private readonly sweepInBufIntervalMs: number; + private sweepRepairInterval: ReturnType | undefined; private processTaskTimeout: ReturnType | undefined; private readonly retryManager: RetryManager | undefined; private readonly missingMessageRetriever?: MissingMessageRetriever; private readonly queryOnConnect?: QueryOnConnect; private readonly processTaskMinElapseMs: number; + private readonly retrievalStrategy: + | "both" + | "sds-r-only" + | "store-only" + | "none"; + private readonly sdsrFallbackTimeoutMs: number; + private readonly missingMessageTimeouts: Map< + string, + ReturnType + >; private _started: boolean; private constructor( @@ -214,7 +248,13 @@ export class ReliableChannel< this.processTaskMinElapseMs = options?.processTaskMinElapseMs ?? DEFAULT_PROCESS_TASK_MIN_ELAPSE_MS; - if (this._retrieve) { + this.retrievalStrategy = options?.retrievalStrategy ?? "both"; + this.sdsrFallbackTimeoutMs = + options?.sdsrFallbackTimeoutMs ?? DEFAULT_SDSR_FALLBACK_TIMEOUT_MS; + this.missingMessageTimeouts = new Map(); + + // Only enable Store retrieval based on strategy + if (this._retrieve && this.shouldUseStore()) { this.missingMessageRetriever = new MissingMessageRetriever( this.decoder, options?.retrieveFrequencyMs, @@ -418,6 +458,13 @@ export class ReliableChannel< // missing messages or the status of previous outgoing messages this.messageChannel.pushIncomingMessage(sdsMessage, retrievalHint); + // Cancel Store fallback timeout if message was retrieved + const timeout = this.missingMessageTimeouts.get(sdsMessage.messageId); + if (timeout) { + clearTimeout(timeout); + this.missingMessageTimeouts.delete(sdsMessage.messageId); + } + this.missingMessageRetriever?.removeMissingMessage(sdsMessage.messageId); if (sdsMessage.content && sdsMessage.content.length > 0) { @@ -478,6 +525,12 @@ export class ReliableChannel< this.setupEventListeners(); this.restartSync(); this.startSweepIncomingBufferLoop(); + + // Only start repair sweep if SDS-R is enabled + if (this.shouldUseSdsR()) { + this.startRepairSweepLoop(); + } + if (this._retrieve) { this.missingMessageRetriever?.start(); this.queryOnConnect?.start(); @@ -490,6 +543,8 @@ export class ReliableChannel< this._started = false; this.stopSync(); this.stopSweepIncomingBufferLoop(); + this.stopRepairSweepLoop(); + this.clearMissingMessageTimeouts(); this.missingMessageRetriever?.stop(); this.queryOnConnect?.stop(); // TODO unsubscribe @@ -512,18 +567,67 @@ export class ReliableChannel< if (this.sweepInBufInterval) clearInterval(this.sweepInBufInterval); } + private startRepairSweepLoop(): void { + this.stopRepairSweepLoop(); + this.sweepRepairInterval = setInterval(() => { + void this.messageChannel + .sweepRepairIncomingBuffer(async (message) => { + // Rebroadcast the repair message + const wakuMessage = { payload: message.encode() }; + const result = await this._send(this.encoder, wakuMessage); + return result.failures.length === 0; + }) + .catch((err) => { + log.error("error encountered when sweeping repair buffer", err); + }); + }, DEFAULT_SWEEP_REPAIR_INTERVAL_MS); + } + + private stopRepairSweepLoop(): void { + if (this.sweepRepairInterval) clearInterval(this.sweepRepairInterval); + } + + private clearMissingMessageTimeouts(): void { + for (const timeout of this.missingMessageTimeouts.values()) { + clearTimeout(timeout); + } + this.missingMessageTimeouts.clear(); + } + + private shouldUseStore(): boolean { + return ( + this.retrievalStrategy === "both" || + this.retrievalStrategy === "store-only" + ); + } + + private shouldUseSdsR(): boolean { + return ( + this.retrievalStrategy === "both" || + this.retrievalStrategy === "sds-r-only" + ); + } + private restartSync(multiplier: number = 1): void { if (this.syncTimeout) { clearTimeout(this.syncTimeout); } if (this.syncMinIntervalMs) { - const timeoutMs = this.random() * this.syncMinIntervalMs * multiplier; + // Adaptive sync: use shorter interval when repairs are pending + const hasPendingRepairs = + this.shouldUseSdsR() && this.messageChannel.hasPendingRepairRequests(); + const baseInterval = hasPendingRepairs + ? DEFAULT_SYNC_MIN_INTERVAL_WITH_REPAIRS_MS + : this.syncMinIntervalMs; + + const timeoutMs = this.random() * baseInterval * multiplier; this.syncTimeout = setTimeout(() => { void this.sendSyncMessage(); // Always restart a sync, no matter whether the message was sent. - // Set a multiplier so we wait a bit longer to not hog the conversation - void this.restartSync(2); + // Use smaller multiplier when repairs pending to send more frequently + const nextMultiplier = hasPendingRepairs ? 1.2 : 2; + void this.restartSync(nextMultiplier); }, timeoutMs); } } @@ -669,12 +773,35 @@ export class ReliableChannel< MessageChannelEvent.InMessageMissing, (event) => { for (const { messageId, retrievalHint } of event.detail) { - if (retrievalHint && this.missingMessageRetriever) { - this.missingMessageRetriever.addMissingMessage( - messageId, - retrievalHint - ); + // Coordinated retrieval strategy + if (this.retrievalStrategy === "both") { + // SDS-R will attempt first, schedule Store fallback + // Note: missingMessageRetriever only exists if Store protocol is available + if (retrievalHint && this.missingMessageRetriever) { + const timeout = setTimeout(() => { + // Still missing after SDS-R timeout, try Store + log.info( + `Message ${messageId} not retrieved via SDS-R, falling back to Store` + ); + this.missingMessageRetriever?.addMissingMessage( + messageId, + retrievalHint + ); + this.missingMessageTimeouts.delete(messageId); + }, this.sdsrFallbackTimeoutMs); + + this.missingMessageTimeouts.set(messageId, timeout); + } + } else if (this.retrievalStrategy === "store-only") { + // Immediate Store retrieval + if (retrievalHint && this.missingMessageRetriever) { + this.missingMessageRetriever.addMissingMessage( + messageId, + retrievalHint + ); + } } + // For 'sds-r-only' and 'none', SDS-R handles it or nothing happens } } ); diff --git a/packages/sds/src/message_channel/message_channel.ts b/packages/sds/src/message_channel/message_channel.ts index f72ba52579..0ee8159e0c 100644 --- a/packages/sds/src/message_channel/message_channel.ts +++ b/packages/sds/src/message_channel/message_channel.ts @@ -142,6 +142,31 @@ export class MessageChannel extends TypedEventEmitter { return bytesToHex(sha256(payload)); } + /** + * Check if there are pending repair requests that need to be sent. + * Useful for adaptive sync intervals - increase frequency when repairs pending. + */ + public hasPendingRepairRequests(currentTime = Date.now()): boolean { + if (!this.repairManager.isEnabled) { + return false; + } + + const nextRequestTime = this.repairManager.getNextRequestTime(); + return nextRequestTime !== undefined && nextRequestTime <= currentTime; + } + + /** + * Get repair statistics for monitoring/debugging. + */ + public getRepairStats(): { + pendingRequests: number; + pendingResponses: number; + nextRequestTime?: number; + nextResponseTime?: number; + } { + return this.repairManager.getStats(); + } + /** * Processes all queued tasks sequentially to ensure proper message ordering. * diff --git a/packages/sds/src/message_channel/repair/repair.ts b/packages/sds/src/message_channel/repair/repair.ts index 4207483165..5065e0323e 100644 --- a/packages/sds/src/message_channel/repair/repair.ts +++ b/packages/sds/src/message_channel/repair/repair.ts @@ -328,4 +328,72 @@ export class RepairManager { `Updated response groups to ${this.config.numResponseGroups} for ${numParticipants} participants` ); } + + /** + * Check if there are any pending outgoing repair requests + */ + public hasPendingRequests(): boolean { + return this.outgoingBuffer.size > 0; + } + + /** + * Get count of pending repair requests + */ + public getPendingRequestCount(): number { + return this.outgoingBuffer.size; + } + + /** + * Get count of pending repair responses + */ + public getPendingResponseCount(): number { + return this.incomingBuffer.size; + } + + /** + * Get next scheduled repair request time (earliest T_req) + */ + public getNextRequestTime(): number | undefined { + const items = this.outgoingBuffer.getItems(); + return items.length > 0 ? items[0].tReq : undefined; + } + + /** + * Get next scheduled repair response time (earliest T_resp) + */ + public getNextResponseTime(): number | undefined { + const items = this.incomingBuffer.getItems(); + return items.length > 0 ? items[0].tResp : undefined; + } + + /** + * Check if a specific message has a pending repair request + */ + public isPendingRequest(messageId: string): boolean { + return this.outgoingBuffer.has(messageId); + } + + /** + * Check if we have a pending response for a message + */ + public isPendingResponse(messageId: string): boolean { + return this.incomingBuffer.has(messageId); + } + + /** + * Get stats for monitoring/debugging + */ + public getStats(): { + pendingRequests: number; + pendingResponses: number; + nextRequestTime?: number; + nextResponseTime?: number; + } { + return { + pendingRequests: this.getPendingRequestCount(), + pendingResponses: this.getPendingResponseCount(), + nextRequestTime: this.getNextRequestTime(), + nextResponseTime: this.getNextResponseTime() + }; + } } From 2cc49e1349f5d743d2fdbc62388d854edd22a3ce Mon Sep 17 00:00:00 2001 From: jm-clius Date: Wed, 15 Oct 2025 17:17:00 +0100 Subject: [PATCH 05/10] fix: fix import, export --- packages/sdk/src/reliable_channel/reliable_channel.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/sdk/src/reliable_channel/reliable_channel.ts b/packages/sdk/src/reliable_channel/reliable_channel.ts index 03418a72ea..616b14bf14 100644 --- a/packages/sdk/src/reliable_channel/reliable_channel.ts +++ b/packages/sdk/src/reliable_channel/reliable_channel.ts @@ -18,8 +18,8 @@ import { MessageChannel, MessageChannelEvent, type MessageChannelOptions, + type ParticipantId, Message as SdsMessage, - type SenderId, SyncMessage } from "@waku/sds"; import { Logger } from "@waku/utils"; @@ -304,7 +304,7 @@ export class ReliableChannel< public static async create( node: IWaku, channelId: ChannelId, - senderId: SenderId, + senderId: ParticipantId, encoder: IEncoder, decoder: IDecoder, options?: ReliableChannelOptions From 9d82564ed7963042c5eacd422ee066be989139dc Mon Sep 17 00:00:00 2001 From: jm-clius Date: Thu, 30 Oct 2025 11:50:19 +0000 Subject: [PATCH 06/10] fix: fix build errors, simplify parallel operation --- .../src/reliable_channel/reliable_channel.ts | 64 ++++--------------- packages/sds/src/message_channel/events.ts | 10 --- .../src/message_channel/message_channel.ts | 22 +------ .../sds/src/message_channel/repair/repair.ts | 29 +-------- 4 files changed, 18 insertions(+), 107 deletions(-) diff --git a/packages/sdk/src/reliable_channel/reliable_channel.ts b/packages/sdk/src/reliable_channel/reliable_channel.ts index 616b14bf14..ab66b4a3b5 100644 --- a/packages/sdk/src/reliable_channel/reliable_channel.ts +++ b/packages/sdk/src/reliable_channel/reliable_channel.ts @@ -42,7 +42,6 @@ const DEFAULT_MAX_RETRY_ATTEMPTS = 10; const DEFAULT_SWEEP_IN_BUF_INTERVAL_MS = 5 * 1000; const DEFAULT_SWEEP_REPAIR_INTERVAL_MS = 10 * 1000; // 10 seconds const DEFAULT_PROCESS_TASK_MIN_ELAPSE_MS = 1000; -const DEFAULT_SDSR_FALLBACK_TIMEOUT_MS = 120 * 1000; // 2 minutes const IRRECOVERABLE_SENDING_ERRORS: LightPushError[] = [ LightPushError.ENCODE_FAILED, @@ -118,7 +117,7 @@ export type ReliableChannelOptions = MessageChannelOptions & { /** * Strategy for retrieving missing messages. - * - 'both': Use SDS-R peer repair and Store queries (default) + * - 'both': Use SDS-R peer repair and Store queries in parallel (default) * - 'sds-r-only': Only use SDS-R peer repair * - 'store-only': Only use Store queries (legacy behavior) * - 'none': No automatic retrieval @@ -126,14 +125,6 @@ export type ReliableChannelOptions = MessageChannelOptions & { * @default 'both' */ retrievalStrategy?: "both" | "sds-r-only" | "store-only" | "none"; - - /** - * How long to wait for SDS-R repair before falling back to Store. - * Only applies when retrievalStrategy is 'both'. - * - * @default 120,000 (2 minutes - matches SDS-R T_max) - */ - sdsrFallbackTimeoutMs?: number; }; /** @@ -179,11 +170,6 @@ export class ReliableChannel< | "sds-r-only" | "store-only" | "none"; - private readonly sdsrFallbackTimeoutMs: number; - private readonly missingMessageTimeouts: Map< - string, - ReturnType - >; private _started: boolean; private constructor( @@ -249,9 +235,6 @@ export class ReliableChannel< options?.processTaskMinElapseMs ?? DEFAULT_PROCESS_TASK_MIN_ELAPSE_MS; this.retrievalStrategy = options?.retrievalStrategy ?? "both"; - this.sdsrFallbackTimeoutMs = - options?.sdsrFallbackTimeoutMs ?? DEFAULT_SDSR_FALLBACK_TIMEOUT_MS; - this.missingMessageTimeouts = new Map(); // Only enable Store retrieval based on strategy if (this._retrieve && this.shouldUseStore()) { @@ -458,13 +441,7 @@ export class ReliableChannel< // missing messages or the status of previous outgoing messages this.messageChannel.pushIncomingMessage(sdsMessage, retrievalHint); - // Cancel Store fallback timeout if message was retrieved - const timeout = this.missingMessageTimeouts.get(sdsMessage.messageId); - if (timeout) { - clearTimeout(timeout); - this.missingMessageTimeouts.delete(sdsMessage.messageId); - } - + // Remove from Store retriever if message was retrieved this.missingMessageRetriever?.removeMissingMessage(sdsMessage.messageId); if (sdsMessage.content && sdsMessage.content.length > 0) { @@ -544,7 +521,6 @@ export class ReliableChannel< this.stopSync(); this.stopSweepIncomingBufferLoop(); this.stopRepairSweepLoop(); - this.clearMissingMessageTimeouts(); this.missingMessageRetriever?.stop(); this.queryOnConnect?.stop(); // TODO unsubscribe @@ -587,13 +563,6 @@ export class ReliableChannel< if (this.sweepRepairInterval) clearInterval(this.sweepRepairInterval); } - private clearMissingMessageTimeouts(): void { - for (const timeout of this.missingMessageTimeouts.values()) { - clearTimeout(timeout); - } - this.missingMessageTimeouts.clear(); - } - private shouldUseStore(): boolean { return ( this.retrievalStrategy === "both" || @@ -773,27 +742,19 @@ export class ReliableChannel< MessageChannelEvent.InMessageMissing, (event) => { for (const { messageId, retrievalHint } of event.detail) { - // Coordinated retrieval strategy + // Parallel retrieval strategy if (this.retrievalStrategy === "both") { - // SDS-R will attempt first, schedule Store fallback - // Note: missingMessageRetriever only exists if Store protocol is available + // Both SDS-R and Store work in parallel + // SDS-R automatically handles repair via RepairManager + // Store retrieval starts immediately if (retrievalHint && this.missingMessageRetriever) { - const timeout = setTimeout(() => { - // Still missing after SDS-R timeout, try Store - log.info( - `Message ${messageId} not retrieved via SDS-R, falling back to Store` - ); - this.missingMessageRetriever?.addMissingMessage( - messageId, - retrievalHint - ); - this.missingMessageTimeouts.delete(messageId); - }, this.sdsrFallbackTimeoutMs); - - this.missingMessageTimeouts.set(messageId, timeout); + this.missingMessageRetriever.addMissingMessage( + messageId, + retrievalHint + ); } } else if (this.retrievalStrategy === "store-only") { - // Immediate Store retrieval + // Immediate Store retrieval only if (retrievalHint && this.missingMessageRetriever) { this.missingMessageRetriever.addMissingMessage( messageId, @@ -801,7 +762,8 @@ export class ReliableChannel< ); } } - // For 'sds-r-only' and 'none', SDS-R handles it or nothing happens + // For 'sds-r-only', only SDS-R repair manager operates + // For 'none', nothing happens } } ); diff --git a/packages/sds/src/message_channel/events.ts b/packages/sds/src/message_channel/events.ts index ecc2a55edc..aa088d720b 100644 --- a/packages/sds/src/message_channel/events.ts +++ b/packages/sds/src/message_channel/events.ts @@ -12,10 +12,8 @@ export enum MessageChannelEvent { InMessageLost = "sds:in:message-irretrievably-lost", ErrorTask = "sds:error-task", // SDS-R Repair Events - RepairRequestQueued = "sds:repair:request-queued", RepairRequestSent = "sds:repair:request-sent", RepairRequestReceived = "sds:repair:request-received", - RepairResponseQueued = "sds:repair:response-queued", RepairResponseSent = "sds:repair:response-sent" } @@ -33,10 +31,6 @@ export type MessageChannelEvents = { [MessageChannelEvent.OutSyncSent]: CustomEvent; [MessageChannelEvent.InSyncReceived]: CustomEvent; [MessageChannelEvent.ErrorTask]: CustomEvent; - [MessageChannelEvent.RepairRequestQueued]: CustomEvent<{ - messageId: MessageId; - tReq: number; - }>; [MessageChannelEvent.RepairRequestSent]: CustomEvent<{ messageIds: MessageId[]; carrierMessageId: MessageId; @@ -45,10 +39,6 @@ export type MessageChannelEvents = { messageIds: MessageId[]; fromSenderId?: ParticipantId; }>; - [MessageChannelEvent.RepairResponseQueued]: CustomEvent<{ - messageId: MessageId; - tResp: number; - }>; [MessageChannelEvent.RepairResponseSent]: CustomEvent<{ messageId: MessageId; }>; diff --git a/packages/sds/src/message_channel/message_channel.ts b/packages/sds/src/message_channel/message_channel.ts index 0ee8159e0c..05b8f49d80 100644 --- a/packages/sds/src/message_channel/message_channel.ts +++ b/packages/sds/src/message_channel/message_channel.ts @@ -128,13 +128,7 @@ export class MessageChannel extends TypedEventEmitter { // Only construct RepairManager if repair is enabled (default: true) if (options.enableRepair ?? true) { - this.repairManager = new RepairManager( - senderId, - options.repairConfig, - (event: string, detail: unknown) => { - this.safeSendEvent(event as MessageChannelEvent, { detail }); - } - ); + this.repairManager = new RepairManager(senderId, options.repairConfig); } } @@ -147,7 +141,7 @@ export class MessageChannel extends TypedEventEmitter { * Useful for adaptive sync intervals - increase frequency when repairs pending. */ public hasPendingRepairRequests(currentTime = Date.now()): boolean { - if (!this.repairManager.isEnabled) { + if (!this.repairManager) { return false; } @@ -155,18 +149,6 @@ export class MessageChannel extends TypedEventEmitter { return nextRequestTime !== undefined && nextRequestTime <= currentTime; } - /** - * Get repair statistics for monitoring/debugging. - */ - public getRepairStats(): { - pendingRequests: number; - pendingResponses: number; - nextRequestTime?: number; - nextResponseTime?: number; - } { - return this.repairManager.getStats(); - } - /** * Processes all queued tasks sequentially to ensure proper message ordering. * diff --git a/packages/sds/src/message_channel/repair/repair.ts b/packages/sds/src/message_channel/repair/repair.ts index 5065e0323e..6a2117cd1d 100644 --- a/packages/sds/src/message_channel/repair/repair.ts +++ b/packages/sds/src/message_channel/repair/repair.ts @@ -20,11 +20,6 @@ const log = new Logger("sds:repair:manager"); */ const PARTICIPANTS_PER_RESPONSE_GROUP = 128; -/** - * Event emitter callback for repair events - */ -export type RepairEventEmitter = (event: string, detail: unknown) => void; - /** * Configuration for SDS-R repair protocol */ @@ -58,16 +53,10 @@ export class RepairManager { private readonly config: Required; private readonly outgoingBuffer: OutgoingRepairBuffer; private readonly incomingBuffer: IncomingRepairBuffer; - private readonly eventEmitter?: RepairEventEmitter; - public constructor( - participantId: ParticipantId, - config: RepairConfig = {}, - eventEmitter?: RepairEventEmitter - ) { + public constructor(participantId: ParticipantId, config: RepairConfig = {}) { this.participantId = participantId; this.config = { ...DEFAULT_REPAIR_CONFIG, ...config }; - this.eventEmitter = eventEmitter; this.outgoingBuffer = new OutgoingRepairBuffer(this.config.bufferSize); this.incomingBuffer = new IncomingRepairBuffer(this.config.bufferSize); @@ -142,19 +131,13 @@ export class RepairManager { // Calculate when to request this repair const tReq = this.calculateTReq(entry.messageId, currentTime); - // Add to outgoing buffer - only log and emit event if actually added + // Add to outgoing buffer - only log if actually added const wasAdded = this.outgoingBuffer.add(entry, tReq); if (wasAdded) { log.info( `Added missing dependency ${entry.messageId} to repair buffer with T_req=${tReq}` ); - - // Emit event - this.eventEmitter?.("RepairRequestQueued", { - messageId: entry.messageId, - tReq - }); } } } @@ -238,19 +221,13 @@ export class RepairManager { currentTime ); - // Add to incoming buffer - only log and emit event if actually added + // Add to incoming buffer - only log if actually added const wasAdded = this.incomingBuffer.add(request, tResp); if (wasAdded) { log.info( `Will respond to repair request for ${request.messageId} at T_resp=${tResp}` ); - - // Emit event - this.eventEmitter?.("RepairResponseQueued", { - messageId: request.messageId, - tResp - }); } } } From 81b21ca461a785cf4af215a6926bf54b6960b298 Mon Sep 17 00:00:00 2001 From: jm-clius Date: Thu, 30 Oct 2025 11:55:20 +0000 Subject: [PATCH 07/10] fix: sigh. this file has 9 lives --- SDS-R-Implementation-Guide.md | 245 ---------------------------------- 1 file changed, 245 deletions(-) delete mode 100644 SDS-R-Implementation-Guide.md diff --git a/SDS-R-Implementation-Guide.md b/SDS-R-Implementation-Guide.md deleted file mode 100644 index d47b9d25da..0000000000 --- a/SDS-R-Implementation-Guide.md +++ /dev/null @@ -1,245 +0,0 @@ -# SDS-Repair (SDS-R) Implementation Guide - -## Overview -SDS-R is an optional extension to the Scalable Data Sync (SDS) protocol that enables collaborative repair of missing messages within a limited time window. It's designed to work over Waku and assumes participants are already in a secure channel. - -## Core Concept -When a participant detects missing messages (via causal dependencies), it waits a random backoff period before requesting repairs. Other participants who have the missing message wait their own random backoff before responding. The protocol uses clever timing and grouping to ensure typically only one request and one response per missing message. - ---- - -## Data Structures - -### Protobuf Schema Modifications - -```protobuf -message HistoryEntry { - string message_id = 1; - optional bytes retrieval_hint = 2; - optional string sender_id = 3; // NEW: Participant ID of original message sender (only for SDS-R) -} - -message SdsMessage { - string sender_id = 1; // Participant ID of the message sender - string message_id = 2; // Unique identifier of the message - string channel_id = 3; // Identifier of the channel to which the message belongs - optional uint64 lamport_timestamp = 10; // Logical timestamp for causal ordering in channel - repeated HistoryEntry causal_history = 11; // List of preceding message IDs that this message causally depends on - optional bytes bloom_filter = 12; // Bloom filter representing received message IDs in channel - repeated HistoryEntry repair_request = 13; // NEW: Capped list of missing messages (only for SDS-R) - optional bytes content = 20; // Actual content of the message -} -``` - -**Note**: The actual implementation uses `SdsMessage` (not `Message`) and `uint64` for lamport_timestamp (not `int32`). - -### Additional Participant State - -Each participant must maintain: - -1. **Outgoing Repair Request Buffer** - - Map: `HistoryEntry -> T_req (timestamp)` - - Sorted by ascending T_req - - Contains missing messages waiting to be requested - -2. **Incoming Repair Request Buffer** - - Map: `HistoryEntry -> T_resp (timestamp)` - - Contains repair requests from others that this participant can fulfill - - Only includes requests where participant is in the response group - -3. **Augmented Local History** - - Change from base SDS: Store full `Message` objects, not just message IDs - - Only for messages where participant could be a responder - - Needed to rebroadcast messages when responding to repairs - -### Global Configuration (per channel) - -``` -T_min = 30 seconds // Minimum wait before requesting repair -T_max = 120 seconds // Maximum wait for repair window (recommend 120-600) -num_response_groups = max(1, num_participants / 128) // Response group count -``` - ---- - -## Critical Algorithms - -### 1. Calculate T_req (When to Request Repair) - -**IMPORTANT BUG FIX**: The spec has an off-by-one error. Use this corrected formula: - -``` -T_req = current_time + hash(participant_id, message_id) % (T_max - T_min) + T_min -``` - -- `participant_id`: Your OWN participant ID (not the sender's) -- `message_id`: The missing message's ID -- Result: Timestamp between `current_time + T_min` and `current_time + T_max` - -### 2. Calculate T_resp (When to Respond to Repair) - -``` -distance = participant_id XOR sender_id -T_resp = current_time + (distance * hash(message_id)) % T_max -``` - -- `participant_id`: Your OWN participant ID -- `sender_id`: Original sender's ID from the HistoryEntry -- `message_id`: The requested message's ID -- Note: Original sender has distance=0, responds immediately - -### 3. Determine Response Group Membership - -``` -is_in_group = (hash(participant_id, message_id) % num_response_groups) == - (hash(sender_id, message_id) % num_response_groups) -``` - -- Only respond to repairs if `is_in_group` is true -- Original sender is always in their own response group - ---- - -## Protocol Implementation Steps - -### When SENDING a Message - -1. Check outgoing repair request buffer for eligible entries (where `T_req <= current_time`) -2. Take up to 3 eligible entries with lowest T_req values -3. Populate `repair_request` field with these HistoryEntries: - - Include `message_id` - - Include `retrieval_hint` if available - - Include `sender_id` (original sender's ID) -4. If no eligible entries, leave `repair_request` field unset -5. Continue with normal SDS send procedure - -### When RECEIVING a Message - -1. **Clean up buffers:** - - Remove received message_id from outgoing repair request buffer - - Remove received message_id from incoming repair request buffer - -2. **Process causal dependencies:** - - For each missing dependency in causal_history: - - Add to outgoing repair request buffer - - Calculate T_req using formula above - - Include sender_id from the causal history entry - -3. **Process repair_request field:** - - For each repair request entry: - a. Remove from your own outgoing buffer (someone else is requesting it) - b. Check if you have this message in local history - c. Check if you're in the response group (use formula above) - d. If both b and c are true: - - Add to incoming repair request buffer - - Calculate T_resp using formula above - -4. Continue with normal SDS receive procedure - -### Periodic Sweeps - -#### Outgoing Repair Request Buffer Sweep (every ~5 seconds) -```python -for entry, t_req in outgoing_buffer: - if current_time >= t_req: - # This entry will be included in next message's repair_request - # No action needed here, just wait for next send - pass -``` - -#### Incoming Repair Request Buffer Sweep (every ~5 seconds) -```python -for entry, t_resp in incoming_buffer: - if current_time >= t_resp: - message = get_from_local_history(entry.message_id) - if message: - broadcast(message) # Rebroadcast the full original message - remove_from_incoming_buffer(entry) -``` - -### Periodic Sync Messages with SDS-R - -When sending periodic sync messages: -1. Check if there are eligible entries in outgoing repair request buffer -2. If yes, send the sync message WITH repair_request field populated -3. Unlike base SDS, don't suppress sync message even if others recently sent one - ---- - -## Implementation Notes and Edge Cases - -### Hash Function -**CRITICAL**: The spec doesn't specify which hash function to use. Recommend: -- Use SHA256 for cryptographic properties -- Convert to integer for modulo operations: `int(hash_bytes[:8], byteorder='big')` -- Must be consistent across all participants - -### Participant ID Format -- Must support XOR operation for distance calculation -- Recommend using numeric IDs or convert string IDs to integers -- Must be globally unique within the channel - -### Memory Management -1. **Buffer limits**: Implement max size for repair buffers (suggest 1000 entries) -2. **Eviction policy**: Remove oldest T_req/T_resp when at capacity -3. **History retention**: Only keep messages for T_max duration -4. **Response group optimization**: Only cache full messages if you're likely to be in response group - -### Edge Cases to Handle - -1. **Duplicate repair requests**: Use Set semantics, only track once -2. **Expired repairs**: If T_req > current_time + T_max, remove from buffer -3. **Non-numeric participant IDs**: Hash to integer for XOR operations -4. **Missing sender_id**: Cannot participate in repair for that message -5. **Circular dependencies**: Set maximum recursion depth for dependency resolution - -### Typo to Fix -The spec has "Perdiodic" on line 461 - should be "Periodic" - ---- - -## Testing Scenarios - -1. **Single missing message**: Verify only one repair request and response -2. **Cascade recovery**: Missing message A depends on missing message B -3. **Original sender offline**: Verify next closest participant responds -4. **Response group isolation**: Verify only in-group participants respond -5. **Buffer overflow**: Test eviction policies -6. **Network partition**: Test behavior when repair window expires - ---- - -## Integration with Base SDS - -### Modified State from Base SDS -- Local history stores full Messages, not just IDs -- Additional buffers for repair tracking -- Sender_id must be preserved in HistoryEntry - -### Unchanged from Base SDS -- Lamport timestamp management -- Bloom filter operations -- Causal dependency checking -- Message delivery and conflict resolution - ---- - -## Performance Recommendations - -1. Use priority queues for T_req/T_resp ordered buffers -2. Index local history by message_id for O(1) lookup -3. Batch repair requests in single message (up to 3) -4. Cache response group calculation results -5. Implement exponential backoff in future version (noted as TODO in spec) - ---- - -## Security Assumptions - -- Operating within secure channel (via Waku) -- All participants are authenticated -- Rate limiting via Waku RLN-RELAY -- No additional authentication needed for repairs -- Trust all repair requests from channel members - -This implementation guide should be sufficient to implement SDS-R without access to the original specification. The key insight is that SDS-R elegantly uses timing and randomization to coordinate distributed repair without central coordination or excessive network traffic. \ No newline at end of file From 6a5eac2a85dd4a46566cec6280feac927dd3408e Mon Sep 17 00:00:00 2001 From: jm-clius Date: Thu, 30 Oct 2025 12:05:37 +0000 Subject: [PATCH 08/10] fix: simplify more --- .../src/reliable_channel/reliable_channel.ts | 32 +++++++------------ 1 file changed, 11 insertions(+), 21 deletions(-) diff --git a/packages/sdk/src/reliable_channel/reliable_channel.ts b/packages/sdk/src/reliable_channel/reliable_channel.ts index ab66b4a3b5..08d0c6097c 100644 --- a/packages/sdk/src/reliable_channel/reliable_channel.ts +++ b/packages/sdk/src/reliable_channel/reliable_channel.ts @@ -742,28 +742,18 @@ export class ReliableChannel< MessageChannelEvent.InMessageMissing, (event) => { for (const { messageId, retrievalHint } of event.detail) { - // Parallel retrieval strategy - if (this.retrievalStrategy === "both") { - // Both SDS-R and Store work in parallel - // SDS-R automatically handles repair via RepairManager - // Store retrieval starts immediately - if (retrievalHint && this.missingMessageRetriever) { - this.missingMessageRetriever.addMissingMessage( - messageId, - retrievalHint - ); - } - } else if (this.retrievalStrategy === "store-only") { - // Immediate Store retrieval only - if (retrievalHint && this.missingMessageRetriever) { - this.missingMessageRetriever.addMissingMessage( - messageId, - retrievalHint - ); - } + // Store retrieval (for 'both' and 'store-only' strategies) + // SDS-R repair happens automatically via RepairManager for 'both' and 'sds-r-only' + if ( + this.shouldUseStore() && + retrievalHint && + this.missingMessageRetriever + ) { + this.missingMessageRetriever.addMissingMessage( + messageId, + retrievalHint + ); } - // For 'sds-r-only', only SDS-R repair manager operates - // For 'none', nothing happens } } ); From 860ecfaca7544c41068705447c038174b8df308a Mon Sep 17 00:00:00 2001 From: jm-clius Date: Thu, 30 Oct 2025 13:42:02 +0000 Subject: [PATCH 09/10] fix: disable repair if not part of retrieval strategy --- packages/sdk/src/reliable_channel/reliable_channel.ts | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/packages/sdk/src/reliable_channel/reliable_channel.ts b/packages/sdk/src/reliable_channel/reliable_channel.ts index 08d0c6097c..aa975fa9af 100644 --- a/packages/sdk/src/reliable_channel/reliable_channel.ts +++ b/packages/sdk/src/reliable_channel/reliable_channel.ts @@ -292,7 +292,15 @@ export class ReliableChannel< decoder: IDecoder, options?: ReliableChannelOptions ): Promise> { - const sdsMessageChannel = new MessageChannel(channelId, senderId, options); + // Enable SDS-R repair only if retrieval strategy uses it + const retrievalStrategy = options?.retrievalStrategy ?? "both"; + const enableRepair = + retrievalStrategy === "both" || retrievalStrategy === "sds-r-only"; + + const sdsMessageChannel = new MessageChannel(channelId, senderId, { + ...options, + enableRepair + }); const messageChannel = new ReliableChannel( node, sdsMessageChannel, From 324ecbf4d0c1ee8eaa08427659a50106c48f174c Mon Sep 17 00:00:00 2001 From: jm-clius Date: Thu, 30 Oct 2025 17:03:00 +0000 Subject: [PATCH 10/10] fix: remove dead code, simplify --- .../src/message_channel/message_channel.ts | 7 +- .../sds/src/message_channel/repair/repair.ts | 66 +------------------ 2 files changed, 4 insertions(+), 69 deletions(-) diff --git a/packages/sds/src/message_channel/message_channel.ts b/packages/sds/src/message_channel/message_channel.ts index 05b8f49d80..f554e31f5f 100644 --- a/packages/sds/src/message_channel/message_channel.ts +++ b/packages/sds/src/message_channel/message_channel.ts @@ -141,12 +141,7 @@ export class MessageChannel extends TypedEventEmitter { * Useful for adaptive sync intervals - increase frequency when repairs pending. */ public hasPendingRepairRequests(currentTime = Date.now()): boolean { - if (!this.repairManager) { - return false; - } - - const nextRequestTime = this.repairManager.getNextRequestTime(); - return nextRequestTime !== undefined && nextRequestTime <= currentTime; + return this.repairManager?.hasRequestsReady(currentTime) ?? false; } /** diff --git a/packages/sds/src/message_channel/repair/repair.ts b/packages/sds/src/message_channel/repair/repair.ts index 6a2117cd1d..1a8a85f43e 100644 --- a/packages/sds/src/message_channel/repair/repair.ts +++ b/packages/sds/src/message_channel/repair/repair.ts @@ -307,70 +307,10 @@ export class RepairManager { } /** - * Check if there are any pending outgoing repair requests + * Check if there are repair requests ready to be sent */ - public hasPendingRequests(): boolean { - return this.outgoingBuffer.size > 0; - } - - /** - * Get count of pending repair requests - */ - public getPendingRequestCount(): number { - return this.outgoingBuffer.size; - } - - /** - * Get count of pending repair responses - */ - public getPendingResponseCount(): number { - return this.incomingBuffer.size; - } - - /** - * Get next scheduled repair request time (earliest T_req) - */ - public getNextRequestTime(): number | undefined { + public hasRequestsReady(currentTime = Date.now()): boolean { const items = this.outgoingBuffer.getItems(); - return items.length > 0 ? items[0].tReq : undefined; - } - - /** - * Get next scheduled repair response time (earliest T_resp) - */ - public getNextResponseTime(): number | undefined { - const items = this.incomingBuffer.getItems(); - return items.length > 0 ? items[0].tResp : undefined; - } - - /** - * Check if a specific message has a pending repair request - */ - public isPendingRequest(messageId: string): boolean { - return this.outgoingBuffer.has(messageId); - } - - /** - * Check if we have a pending response for a message - */ - public isPendingResponse(messageId: string): boolean { - return this.incomingBuffer.has(messageId); - } - - /** - * Get stats for monitoring/debugging - */ - public getStats(): { - pendingRequests: number; - pendingResponses: number; - nextRequestTime?: number; - nextResponseTime?: number; - } { - return { - pendingRequests: this.getPendingRequestCount(), - pendingResponses: this.getPendingResponseCount(), - nextRequestTime: this.getNextRequestTime(), - nextResponseTime: this.getNextResponseTime() - }; + return items.length > 0 && items[0].tReq <= currentTime; } }