Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add serialize_writes option to ensure lines are sent in-order (resolves #361) #362

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ module.exports = class IrcClient extends EventEmitter {
message_max_length: 350,
sasl_disconnect_on_fail: false,
transport: default_transport,
websocket_protocol: 'text.ircv3.net'
websocket_protocol: 'text.ircv3.net',
serialize_writes: false
};

const props = Object.keys(defaults);
Expand Down
35 changes: 32 additions & 3 deletions src/transports/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,31 @@ module.exports = class Connection extends EventEmitter {
this.socket_events = [];

this.encoding = 'utf8';

this.write_queue = null;
this.write_queue_servicer = () => {};
}

isConnected() {
return this.state === SOCK_CONNECTED;
}

_writeLineConnected(line, cb) {
if (this.encoding !== 'utf8') {
this.socket.write(iconv.encode(line + '\r\n', this.encoding), cb);
} else {
this.socket.write(line + '\r\n', cb);
}
}

writeLine(line, cb) {
if (this.socket && this.isConnected()) {
if (this.encoding !== 'utf8') {
this.socket.write(iconv.encode(line + '\r\n', this.encoding), cb);
if (this.options.serialize_writes && this.write_queue) {
if (this.write_queue.push({ line, cb }) === 1) {
this.write_queue_servicer();
}
} else {
this.socket.write(line + '\r\n', cb);
this._writeLineConnected(line, cb);
}
} else {
this.debugOut('writeLine() called when not connected');
Expand Down Expand Up @@ -77,6 +90,22 @@ module.exports = class Connection extends EventEmitter {
this.requested_disconnect = false;
this.incoming_buffer = Buffer.from('');

if (options.serialize_writes) {
this.write_queue = [];
this.write_queue_servicer = () => {
if (this.write_queue.length) {
this._writeLineConnected(this.write_queue[0].line, () => {
if (this.write_queue[0].cb) {
this.write_queue[0].cb();
}

this.write_queue = this.write_queue.slice(1);
process.nextTick(this.write_queue_servicer);
});
}
};
}

// Include server name (SNI) if provided host is not an IP address
if (!this.getAddressFamily(ircd_host)) {
sni = ircd_host;
Expand Down
93 changes: 93 additions & 0 deletions test/rawWriteOrdering.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
'use strict';

/* globals describe, it */

const net = require('net');
const Connection = require('../src/transports/net');

const chai = require('chai');

chai.use(require('chai-subset'));

async function runTest(serialize_writes) {
const numLines = 12;
const timeSlice = 100;

return new Promise((resolve) => {
let conn;
let server; // eslint-disable-line prefer-const
let wroteLines = [];
const bufferedLines = [];

const clientHandler = (client) => {
client.on('data', (data) => {
const dataStr = data.toString('utf8');
bufferedLines.push(dataStr);

if (wroteLines.length && wroteLines.length === bufferedLines.length) {
conn.close();
server.close();
resolve({ wroteLines, bufferedLines });
}
});
};

server = net.createServer(clientHandler);
server.listen(0, '0.0.0.0', () => {
conn = new Connection({
host: server.address().address,
port: server.address().port,
tls: false,
serialize_writes,
});

wroteLines = Array.from({ length: numLines }).map((_, i) => i).map(String);
let delay = wroteLines.length / timeSlice;
const rudeHandler = {
get(target, prop) {
if (prop === 'write') {
return (data, cb) => {
setTimeout(() => target[prop](data, cb), delay * 1000);
delay -= 1 / timeSlice;
};
} else {
return target[prop];
}
}
};

conn.on('open', () => {
conn.socket = new Proxy(conn.socket, rudeHandler);
wroteLines.forEach((line) => conn.writeLine(line));
});

conn.connect();
});
});
}

function compareLines(wroteLines, bufferedLines) {
return bufferedLines.map((l) => l.trim()).every((line, index) => line === wroteLines[index]);
}

describe('src/transports/net.js', function() {
it('should recieve messages in reverse of the order sent when serialize_writes is false', function(done) {
runTest(false).then(({ wroteLines, bufferedLines }) => {
let error = null;
if (compareLines(wroteLines, bufferedLines) === true) {
error = new Error('Line order matches when it should not!');
}
done(error);
});
});

it('should recieve messages in the order sent when serialize_writes is true', function(done) {
runTest(true).then(({ wroteLines, bufferedLines }) => {
let error = null;
if (compareLines(wroteLines, bufferedLines) === false) {
error = new Error('Line order does not match when it should!');
}
done(error);
});
});
});