Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 67 additions & 98 deletions packages/sdk/src/reliable_channel/reliable_channel.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import { expect } from "chai";
import { beforeEach, describe } from "mocha";
import sinon from "sinon";

import { waitForEvent } from "./test_utils.js";

import { ReliableChannel } from "./index.js";

const TEST_CONTENT_TOPIC = "/my-tests/0/topic-name/proto";
Expand Down Expand Up @@ -63,18 +65,12 @@ describe("Reliable Channel", () => {

// Setting up message tracking
const messageId = reliableChannel.send(message);
let messageSending = false;
reliableChannel.addEventListener("sending-message", (event) => {
if (event.detail === messageId) {
messageSending = true;
}
});

while (!messageSending) {
await delay(50);
}

expect(messageSending).to.be.true;
await waitForEvent<string>(
reliableChannel,
"sending-message",
(id) => id === messageId
);
});

it("Outgoing message is emitted as sent", async () => {
Expand All @@ -90,19 +86,11 @@ describe("Reliable Channel", () => {

const messageId = reliableChannel.send(message);

// Setting up message tracking
let messageSent = false;
reliableChannel.addEventListener("message-sent", (event) => {
if (event.detail === messageId) {
messageSent = true;
}
});

while (!messageSent) {
await delay(50);
}

expect(messageSent).to.be.true;
await waitForEvent<string>(
reliableChannel,
"message-sent",
(id) => id === messageId
);
});

it("Encoder error raises irrecoverable error", async () => {
Expand Down Expand Up @@ -130,22 +118,11 @@ describe("Reliable Channel", () => {
encoder.contentTopic = "...";
const messageId = reliableChannel.send(message);

// Setting up message tracking
let irrecoverableError = false;
reliableChannel.addEventListener(
await waitForEvent<{ messageId: string; error: any }>(
reliableChannel,
"sending-message-irrecoverable-error",
(event) => {
if (event.detail.messageId === messageId) {
irrecoverableError = true;
}
}
(detail) => detail.messageId === messageId
);

while (!irrecoverableError) {
await delay(50);
}

expect(irrecoverableError).to.be.true;
});

it("Outgoing message is not emitted as acknowledged from own outgoing messages", async () => {
Expand Down Expand Up @@ -205,39 +182,32 @@ describe("Reliable Channel", () => {

// Alice sets up message tracking for first message
const firstMessageId = ReliableChannel.getMessageId(messages[0]);
let firstMessagePossiblyAcknowledged = false;
reliableChannelAlice.addEventListener(
"message-possibly-acknowledged",
(event) => {
if (event.detail.messageId === firstMessageId) {
firstMessagePossiblyAcknowledged = true;
}
}

const bobReceivedThirdPromise = waitForEvent<IDecodedMessage>(
reliableChannelBob,
"message-received",
(msg) => bytesToUtf8(msg.payload) === "third"
);

let messageReceived = false;
reliableChannelBob.addEventListener("message-received", (event) => {
if (bytesToUtf8(event.detail.payload) === "third") {
messageReceived = true;
}
});
const firstMessagePossiblyAckPromise = waitForEvent<{
messageId: string;
possibleAckCount: number;
}>(
reliableChannelAlice,
"message-possibly-acknowledged",
(detail) => detail.messageId === firstMessageId
);

for (const m of messages) {
reliableChannelAlice.send(m);
}

// Wait for Bob to receive last message to ensure it is all included in filter
while (!messageReceived) {
await delay(50);
}
await bobReceivedThirdPromise;

// Bobs sends a message now, it should include first one in bloom filter
// Bob sends a message now, it should include first one in bloom filter
reliableChannelBob.send(utf8ToBytes("message back"));
while (!firstMessagePossiblyAcknowledged) {
await delay(50);
}

expect(firstMessagePossiblyAcknowledged).to.be.true;
await firstMessagePossiblyAckPromise;
});

it("Outgoing message is acknowledged", async () => {
Expand All @@ -264,31 +234,23 @@ describe("Reliable Channel", () => {

const messageId = reliableChannelAlice.send(message);

// Alice sets up message tracking
let messageAcknowledged = false;
reliableChannelAlice.addEventListener("message-acknowledged", (event) => {
if (event.detail === messageId) {
messageAcknowledged = true;
}
});
const bobReceivedPromise = waitForEvent<IDecodedMessage>(
reliableChannelBob,
"message-received"
);

let bobReceivedMessage = false;
reliableChannelBob.addEventListener("message-received", () => {
bobReceivedMessage = true;
});
const messageAcknowledgedPromise = waitForEvent<string>(
reliableChannelAlice,
"message-acknowledged",
(id) => id === messageId
);

// Wait for bob to receive the message to ensure it's included in causal history
while (!bobReceivedMessage) {
await delay(50);
}
await bobReceivedPromise;

// Bobs sends a message now, it should include first one in causal history
// Bob sends a message now, it should include first one in causal history
reliableChannelBob.send(utf8ToBytes("second message in channel"));
while (!messageAcknowledged) {
await delay(50);
}

expect(messageAcknowledged).to.be.true;
await messageAcknowledgedPromise;
});

it("Incoming message is emitted as received", async () => {
Expand All @@ -300,19 +262,17 @@ describe("Reliable Channel", () => {
decoder
);

let receivedMessage: IDecodedMessage;
reliableChannel.addEventListener("message-received", (event) => {
receivedMessage = event.detail;
});

const message = utf8ToBytes("message in channel");

const receivedPromise = waitForEvent<IDecodedMessage>(
reliableChannel,
"message-received"
);

reliableChannel.send(message);
while (!receivedMessage!) {
await delay(50);
}
const receivedMessage = await receivedPromise;

expect(bytesToUtf8(receivedMessage!.payload)).to.eq(bytesToUtf8(message));
expect(bytesToUtf8(receivedMessage.payload)).to.eq(bytesToUtf8(message));
});

describe("Retries", () => {
Expand Down Expand Up @@ -355,20 +315,29 @@ describe("Reliable Channel", () => {
}
});

reliableChannelAlice.send(message);
// Wait for first message
const firstMessagePromise = waitForEvent<IDecodedMessage>(
reliableChannelBob,
"message-received",
(msg) => bytesToUtf8(msg.payload) === msgTxt
);

while (messageCount < 1) {
await delay(10);
}
reliableChannelAlice.send(message);
await firstMessagePromise;
expect(messageCount).to.equal(1, "Bob received Alice's message once");

// Wait for retry - Bob should receive the same message again
const retryMessagePromise = waitForEvent<IDecodedMessage>(
reliableChannelBob,
"message-received",
(msg) => bytesToUtf8(msg.payload) === msgTxt
);

// No response from Bob should trigger a retry from Alice
while (messageCount < 2) {
await delay(10);
}
await retryMessagePromise;
expect(messageCount).to.equal(2, "retried once");

// Bobs sends a message now, it should include first one in causal history
// Bob sends a message now, it should include first one in causal history
reliableChannelBob.send(utf8ToBytes("second message in channel"));

// Wait long enough to confirm no retry is executed
Expand Down
Loading
Loading