diff --git a/.gitignore b/.gitignore index 01db8ad..3f09eb0 100644 --- a/.gitignore +++ b/.gitignore @@ -34,3 +34,4 @@ api_docs /log.txt /bin/kalm.min.js /bin/kalm.min.js.map +/stacks.out diff --git a/.npmignore b/.npmignore new file mode 100644 index 0000000..7ea8582 --- /dev/null +++ b/.npmignore @@ -0,0 +1,5 @@ +tests +README.md +.travis.yml +webpack.config +.jshintrc \ No newline at end of file diff --git a/README.md b/README.md index df21b13..eee46b0 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,12 @@ - + # Kalm *The Socket Optimizer* [![Kalm](https://img.shields.io/npm/v/kalm.svg)](https://www.npmjs.com/package/kalm) [![Build Status](https://travis-ci.org/fed135/Kalm.svg?branch=master)](https://travis-ci.org/fed135/Kalm) [![Dependencies Status](https://david-dm.org/fed135/Kalm.svg)](https://www.npmjs.com/package/kalm) -[![Current Stage](https://img.shields.io/badge/stage-beta-blue.svg)](https://codeclimate.com/github/fed135/Kalm) +[![Code Climate](https://codeclimate.com/github/fed135/Kalm/badges/gpa.svg)](https://codeclimate.com/github/fed135/Kalm) +[![Current API Stability](https://img.shields.io/badge/stability-stable-blue.svg)](https://codeclimate.com/github/fed135/Kalm) --- @@ -75,13 +76,13 @@ Simplify and optimize your Socket communications with: **Requests per minute** - + *Benchmarks based on a single-thread queue test with Kalm default bundling settings AND msg-pack enabled* **Bytes transfered** - + *Number of bytes transfered per 1000 requests* @@ -139,7 +140,6 @@ By default, all Kalm logs are hidden. They can be enabled through the DEBUG envi [Milestones](https://github.com/fed135/Kalm/milestones) - ## Contributing I am looking for contributors to help improve the codebase and create adapters, encoders and middleware. diff --git a/docs/BRAINSTORM.md b/docs/BRAINSTORM.md deleted file mode 100644 index 089e685..0000000 --- a/docs/BRAINSTORM.md +++ /dev/null @@ -1,112 +0,0 @@ -# Passing K to modules and packages... - -K should be passed to components in prototype in -either a wrapper, or a 'parent' property of some sort - -# Class migrations - -- Kill manifest (not required) -- Move event into Kalm/bootstrap - never going to be needed by another module -- Config is now part of Kalm/bootstrap -- Circles move into Services.find logic - -# Prototype object brainstorm - -Kalm = new Kalm() - -#properties - .pkg # Kalm's package file - .app # The App's package file - .config # The App's config - .controllers # The app's controllers - -#methods - .onready - .onshutdown - --- Prototype - -.Net = new Net() - -#properties - .wrapper # The call wrapper - .adapters # The collection of adapters - -#methods - .loadAdapter - -.Peers = new Peers() - -#properties - .list - -#methods - .add (Peer constructor) - .remove - .all - .find - -.Peer = new Peer() - -#properties - .channels #list of channels - .label - .uid - .config - -#methods - .channel - -.Peer.Channel = new Channel() - -#methods - .send - .destroy - .ondata - -.Console = new Console() - -#properties - .colors - -#methods - .log - .warn - .error - -.System = new System() - -#properties - .hostname - .arch - .platform - -.Utils = new Utils() - -#properties - .async - .crypto - .object - -Promisify Error-prone sections: - -- Server bind, -- Server write, -- Request handling, -- Shutdown protocol - ------------------------------------------- - -Stateless & stateful socket management - -### Stateless - -- Wrapped -- Anonymous pooled sockets -- Emit, Broadcast, Receive from server - -### Stateful - -- (Un)wrapped -- Named sockets (client channel) ---> Check if channel is present to toggle modes \ No newline at end of file diff --git a/docs/PONG_TEST.md b/docs/PONG_TEST.md deleted file mode 100644 index ea4dcd6..0000000 --- a/docs/PONG_TEST.md +++ /dev/null @@ -1,22 +0,0 @@ -# The Ping-Pong test - -^ [Back to home](../README.md) - -This is the benchmark test for adapter performance. -It has been (crudely) designed for the Kalm framework. - -## Settings - -- Must be run between two instances of a Kalm micro-service. -- Instances must not be deamonized or clustered. -- Machine configuration must be vanilla. - -## Parameters - -- Timer must run for a whole minute. -- Unnamed sockets are used. -- One process sends a string 'ping' to the other, which replies 'pong'. -- Service has default socket pool size and a socket timeout of 1000 ms. -- 1000 parallel sockets are sent at the beginning. -- A point is awarded for every complete back and forth trip (ping-pong). - diff --git a/docs/ROADMAP.md b/docs/ROADMAP.md deleted file mode 100644 index 37984bf..0000000 --- a/docs/ROADMAP.md +++ /dev/null @@ -1,62 +0,0 @@ -# Roadmap - -^ [Back to home](../README.md) - - -## v.1.0.2 - -- [x] Environments - -- [x] IPC adapter binds to a specified and static location (and remove 'i') - -- [x] Create services from config on load - -- [x] Add reply interface (uses same socket) to handlers - -- [x] Better handling of service/socket update on request - -- [x] Manage unwrapped requests - -- [x] Handle uncaughtExceptions to prevent process from crashing - -- [x] Make sure socket clients all implement closing behaviour (disconnect) - -- [x] Allow 'unlimited' pool size - -- [x] Implement optionnal socket timeout - -- [x] Resolve parallel ipc connections - -- [x] Fix udp reply problem - -- [x] Optimize tcp receiver - -- [x] Minify wrapper - -- [x] Review app shutdown to eliminate all existing connections and hanging resources - -## v.1.0.3 - -- [x] API docs - -- [x] Logging destination - -- [x] Proper load-balancing strategy for pooled sockets - -- [x] Add proper uid generation - -- [x] Bug: ipc will notify sender with onRequest - -- [x] Mock mode config - -- [x] Feature: need to be able to add a default handler + dynamic bind to handler - -## v.1.0.4 - -- [ ] Internal map - -- [ ] Unit tests - -- [ ] Smoke tests - -- [ ] Bug: Recover from fatal crash - resolve already bound addresses \ No newline at end of file diff --git a/docs/RULES.md b/docs/RULES.md deleted file mode 100644 index f25fceb..0000000 --- a/docs/RULES.md +++ /dev/null @@ -1,118 +0,0 @@ -# Rules - -^ [Back to home](../README.md) - - -## Module Structure - -1. Requires -1. Local variables -1. Methods -1. Exports - - -## Export Structure - -1. Namespace -1. Properties -1. Methods - - -## Example Module - -```javascript -/** - * InterProcessCall connector methods - */ - -/* Requires ------------------------------------------------------------------*/ - -var path = require('path'); - -/* Local variables -----------------------------------------------------------*/ - -var loc = 'local'; - -/* Methods -------------------------------------------------------------------*/ - -function merge() { - return path.join(loc, this.foo, this.bar); -} - -/* Exports -------------------------------------------------------------------*/ - -module.exports = { - pkgName: 'myPackage', - attributes: { - foo: 'foo', - bar: 'bar' - } -}; -``` - -## Definitions - -1. Module folders - - General regroupements for submodules. - -1. Submodule folders - - Specific group of classes and packages that take care of - a specific functionnality. - -1. Classes - - Interfaces and programs that take care of a specific - functionnality or task with the help of packages. - -1. Packages - - Helpers, pieces of logic that can be of use by multiple - classes. These are prefered to putting too much logic in - the classes, for decoupling, reduction of redundancies and - ease of maintenance. They can be compared to npm packages. - - -## Coding - -1. Style - -- No more than 80 characters per line. -- Use short, but meaningful variable names. -- Avoid structures where you would need to perform a require - from a higher directory. (../) -- ALWAYS run the linter before sending. - -1. Philosophy - -- Functions and procedures should be identified as such - and should not have a number of arguments greater than 4 -- Favor decoupling over closures, when possible. -- Choose clarity over optimization (unless you can get both) - -1. Technical considerations - -- Be aware of the service size once packaged with dependencies. -- Avoid packages with addons when available. -- Build your paths. -- Use ES6 constructs only if necessary. -- Avoid constructors, the 'new' keyword and prototypes. -- Avoid variable names with keywords in them. Ex: 'newThing' -- Disk operations should be kept to a minimum. - - -## Contributing - -1. Branching - -Use the industry-wide branching conventions. - -1. Merge requests - -Needs to be reviewed by a commity of contributors before integrating - -1. Using Kalm - -If you or your company use Kalm, or are planning to do so; please let -me know and I will do my best to provide support. \ No newline at end of file diff --git a/docs/WHY.md b/docs/WHY.md deleted file mode 100644 index 0d7458c..0000000 --- a/docs/WHY.md +++ /dev/null @@ -1,17 +0,0 @@ -# Why - -^ [Back to home](../README.md) - - -There seemed to be a lack of decent frameworks that dealt with -the challenges of micro-service applications. Things like low- -latency inter-service calls, vertical and horizontal scaling -solutions as well as modern client communication protocols that are not -limited to web. - -Kalm aims at being the reference in terms of micro-service frameworks and -modern back-end technology for apps and games. - -Merge requests are more than welcommed. I will try to keep the -contributor circle small, but I will make sure that every requests gets -addressed as soon as possible. \ No newline at end of file diff --git a/package.json b/package.json index 479caee..f33cde9 100644 --- a/package.json +++ b/package.json @@ -1,10 +1,10 @@ { "name": "kalm", - "version": "0.2.0", + "version": "0.3.0", "description": "The socket optimizer", "main": "./index.js", "scripts": { - "test": "mocha tests/index.js", + "test": "mocha tests/unit.js && mocha tests/smoke.js", "build": "webpack -p -d -j --define process.env.NODE_ENV='\"browser\"' --config ./webpack.config" }, "repository": { diff --git a/src/Channel.js b/src/Channel.js index 9a2f83b..f79621a 100644 --- a/src/Channel.js +++ b/src/Channel.js @@ -26,6 +26,8 @@ class Channel { this._timer = null; this._packets = []; this._handlers = []; + + this.splitBatches = true; } /** @@ -38,16 +40,33 @@ class Channel { this._packets.push(payload); // Bundling process - if (this._packets.length >= this.options.maxPackets) { - if (this._timer !== null) { - clearTimeout(this._timer); - this._timer = null; - } - + if (this._packets.length >= this.options.maxPackets) { this._emit(); return; } + this._startBundler(); + } + + /** + * Sends the latest payload only + * @method sendOnce + * @memberof Channel + * @param {object|string} payload The payload to send + */ + sendOnce(payload) { + this._packets = [payload]; + + this._startBundler(); + } + + /** + * Initializes the bundler timer + * @private + * @method _startBundler + * @memberof Channel + */ + _startBundler() { if (this._timer === null) { this._timer = setTimeout(this._emit.bind(this), this.options.delay); } @@ -62,6 +81,17 @@ class Channel { _emit() { this._emitter(this.name, this._packets); this._packets.length = 0; + this.resetBundler(); + } + + /** + * Clears the bundler timer + * @method resetBundler + * @memberof Channel + */ + resetBundler() { + clearTimeout(this._timer); + this._timer = null; } /** @@ -70,10 +100,30 @@ class Channel { * @memberof Channel * @param {function} method The method to bind */ - addHandler(method) { + addHandler(method, bindOnce) { this._handlers.push(method); } + /** + * Removes a handler from this channel + * @method removeHandler + * @memberof Channel + * @param {function} method The method to bind + */ + removeHandler(method) { + var index = this._handlers.indexOf(method); + if (index > -1) this._handlers.splice(index, 1); + } + + /** + * Destroys the client and connection + * @method destroy + * @memberof Client + */ + destroy() { + this._client.destroy(); + } + /** * Handles channel data * @method handleData @@ -83,15 +133,23 @@ class Channel { handleData(payload) { var _reqs = payload.length; var _listeners = this._handlers.length; + var reply = this.send.bind(this); var i; var c; - for (i = 0; i<_reqs; i++) { - for (c = 0; c<_listeners; c++) { - this._handlers[c](payload[i], this._client); + if (this.splitBatches) { + for (i = 0; i < _reqs; i++) { + for (c = 0; c <_listeners; c++) { + this._handlers[c](payload[i], reply, this); + } + } + } + else { + for (c = 0; c < _listeners; c++) { + this._handlers[c](payload, reply, this); } } - }; + } } /* Exports -------------------------------------------------------------------*/ diff --git a/src/Client.js b/src/Client.js index 3ccfb80..5fab052 100644 --- a/src/Client.js +++ b/src/Client.js @@ -18,10 +18,6 @@ var encoders = require('./encoders'); var Channel = require('./Channel'); -/* Local variables -----------------------------------------------------------*/ - -const _channelBase = '/'; - /* Methods -------------------------------------------------------------------*/ class Client extends EventEmitter{ @@ -49,7 +45,9 @@ class Client extends EventEmitter{ // Encoding encoder: options.encoder || defaults.encoder, // Transformations (middleware) - bundler: options.bundler || defaults.bundler + bundler: options.bundler || defaults.bundler, + // Wether to output statistics in stdout + stats: options.stats || defaults.stats }; // List of channels @@ -58,24 +56,27 @@ class Client extends EventEmitter{ // Populate channels if (options.channels) { for (var c in options.channels) { - this.channel(c, options.channels[c]); + this.subscribe(c, options.channels[c]); } } // Socket object + this.socket = null; this.use(socket); } /** * Creates a channel for the client - * @method channel + * @method subscribe * @memberof Client * @param {string} name The name of the channel. * @param {function} handler The handler to add to the channel + * @params {object} options The options object for the channel * @returns {Client} The client, for chaining */ - channel(name, handler) { - name = name || _channelBase; + subscribe(name, handler, options) { + name = name + ''; // Stringification + options = options || {}; if (!this.channels.hasOwnProperty(name)) { debug( @@ -85,7 +86,7 @@ class Client extends EventEmitter{ ); this.channels[name] = new Channel( name, - this.options.bundler, + Object.assign(this.options.bundler, options), this ); } @@ -97,6 +98,23 @@ class Client extends EventEmitter{ return this; } + /** + * Removes a handler from a channel + * @method unsubscribe + * @memberof Client + * @param {string} name The name of the channel. + * @param {function} handler The handler to remove from the channel + * @returns {Client} The client, for chaining + */ + unsubscribe(name, handler) { + name = name + ''; // Stringification + + if (!this.channels.hasOwnProperty(name)) return this; + + this.channels[name].removeHandler(handler); + return this; + } + /** * Defines a socket to use for communication, disconnects previous connection * @method use @@ -107,13 +125,47 @@ class Client extends EventEmitter{ use(socket) { if (this.socket) { debug('log: disconnecting current socket'); - adapters.resolve(this.options.adapter).disconnect(this.socket); + adapters.resolve(this.options.adapter).disconnect(this); } - this.socket = this._createSocket(socket); + this.socket = this.createSocket(socket); return this; } + /** + * Socket error handler + * @method handleError + * @memberof Client + * @param {Error} err The socket triggered error + */ + handleError(err) { + debug('error: ' + err); + this.emit('error', err); + } + + /** + * New socket connection handler + * @method handleConnect + * @memberof Client + * @param {Socket} socket The newly connected socket + */ + handleConnect(socket) { + this.emit('connect', socket); + this.emit('connection', socket); + } + + /** + * Socket connection lost handler + * @method handleDisconnect + * @memberof Client + * @param {Socket} socket The disconnected socket + */ + handleDisconnect(socket) { + this.emit('disconnect', socket); + this.emit('disconnection', socket); + this.socket = null; + } + /** * Queues a packet for transfer on the given channel * @method send @@ -123,13 +175,27 @@ class Client extends EventEmitter{ * @returns {Client} The client, for chaining */ send(name, payload) { - if (!this.channels.hasOwnProperty(name)) { - this.channel(name); - } + this.subscribe(name); + this.channels[name].send(payload); return this; } + /** + * Trumps other packets on the given channel, will only send the latest + * @method sendOnce + * @memberof Client + * @param {string} name The channel to send to data through + * @param {string|object} payload The payload to send + * @returns {Client} The client, for chaining + */ + sendOnce(name, payload) { + this.subscribe(name); + + this.channels[name].sendOnce(payload); + return this; + } + /** * Creates or attaches a socket for the appropriate adapter * @private @@ -138,7 +204,7 @@ class Client extends EventEmitter{ * @param {Socket} socket The socket to use * @returns {Socket} The created or attached socket for the client */ - _createSocket(socket) { + createSocket(socket) { return adapters.resolve(this.options.adapter).createSocket(this, socket); } @@ -150,31 +216,53 @@ class Client extends EventEmitter{ * @param {string} channel The channel targeted for transfer */ _emit(channel, packets) { + var payload = encoders.resolve(this.options.encoder).encode([ + channel, + packets + ]); + adapters.resolve(this.options.adapter).send( this.socket, - encoders.resolve(this.options.encoder).encode({ - c: channel, - d: packets - }) + payload ); + + if (this.options.stats) { + process.stdout.write(JSON.stringify({ + packets: packets.length, + bytes: payload.length + })); + } } /** * Handler for receiving data through the listener * @private - * @method _handleRequest + * @method handleRequest * @memberof Client * @param {Buffer} evt The data received */ - _handleRequest(evt) { + handleRequest(evt) { var raw = encoders.resolve(this.options.encoder).decode(evt); - if (raw && raw.c) { - if (this.channels.hasOwnProperty(raw.c)) { - this.channels[raw.c].handleData(raw.d); + if (raw && raw.length) { + if (this.channels.hasOwnProperty(raw[0])) { + this.channels[raw[0]].handleData(raw[1]); } } } + + /** + * Destroys the client and connection + * @method destroy + * @memberof Client + */ + destroy() { + adapters.resolve(this.options.adapter).disconnect(this); + this.socket = null; + for (var channel in this.channels) { + this.channels[channel].resetBundler(); + } + } } /* Exports -------------------------------------------------------------------*/ diff --git a/src/Server.js b/src/Server.js index f01b08c..755f6f0 100644 --- a/src/Server.js +++ b/src/Server.js @@ -29,6 +29,8 @@ class Server extends EventEmitter { super(); options = options || {}; + this.listener = null; + this.options = { adapter: options.adapter || defaults.adapter, encoder: options.encoder || defaults.encoder, @@ -45,17 +47,21 @@ class Server extends EventEmitter { * Server lift method * @method listen * @memberof Server - * @param {function} callback The callback method for server lift */ - listen(callback) { + listen() { var adapter = adapters.resolve(this.options.adapter); - let _self = this; if (adapter) { - debug('log: listening ' + this.options.adapter + '://0.0.0.0:' + this.options.port); - adapter.listen(this, function _handleLift() { - process.nextTick(function _deferredLift() { - _self.emit('ready'); + debug( + 'log: listening ' + + this.options.adapter + + '://0.0.0.0:' + + this.options.port + ); + + adapter.listen(this, () => { + process.nextTick(() => { + this.emit('ready'); }); }); } @@ -66,24 +72,42 @@ class Server extends EventEmitter { /** * Adds a channel to listen for on attached clients - * @method channel + * @method subscribe * @memberof Server * @param {string} name The name of the channel to attach * @param {function} handler The handler to attach to the channel + * @params {object} options The options object for the channel * @returns {Server} Returns itself for chaining */ - channel(name, handler) { - this.channels[name] = handler; + subscribe(name, handler, options) { + this.channels[name + ''] = handler; - for (var i = this.connections.length - 1; i >= 0; i--) { - this.connections[i].channel(name, handler); - } + this.connections.forEach((client) => { + client.subscribe(name, handler, options); + }); + + return this; + } + + /** + * Removes a handler on attached clients + * @method subscribe + * @memberof Server + * @param {string} name The name of the channel + * @param {function} handler The handler to remove from the channel + * @returns {Server} Returns itself for chaining + */ + unsubscribe(name, handler) { + this.connections.forEach((client) => { + client.unsubscribe(name, handler); + }); return this; } /** * Sends data to all connected clients + * !! Creates the channel if it has to !! * @method broadcast * @memberof Server * @param {string} channel The name of the channel to send to @@ -98,6 +122,27 @@ class Server extends EventEmitter { return this; } + /** + * Sends data to all connected clients with a specific channel opened + * !! Does not create new channels !! + * @method whisper + * @memberof Server + * @param {string} channel The name of the channel to send to + * @param {string|object} payload The payload to send + * @returns {Server} Returns itself for chaining + */ + whisper(channel, payload) { + for (var i = this.connections.length - 1; i >= 0; i--) { + for (var u in this.connections[i].channels) { + if (this.connections[i].channels[u].name === channel) { + this.connections[i].channels[u].send(payload); + } + } + } + + return this; + } + /** * Closes the server * @method stop @@ -105,15 +150,46 @@ class Server extends EventEmitter { * @param {function} callback The callback method for the operation */ stop(callback) { + callback = callback || function() {}; + + var adapter = adapters.resolve(this.options.adapter); + debug('warn: stopping server'); if (this.listener) { - adapters.resolve(this.options.adapter).stop(this, callback); + this.connections.forEach(adapter.disconnect); + this.connections.length = 0; + adapter.stop(this, callback); + this.listener = null; } else { - callback(); + return callback(); } } + /** + * Creates a new client with the provided arguments + * @private + * @method _createClient + * @memberof Server + * @param {Socket} socket The received connection socket + * @param {object} options The options for the client + * @returns {Client} The newly created client + */ + createClient(socket, options) { + return new Client(socket, options); + } + + /** + * Server error handler + * @method handleError + * @memberof Server + * @param {Error} err The triggered error + */ + handleError(err) { + debug('error: ' + err); + this.emit('error', err); + } + /** * Handler for receiving a new connection * @private @@ -121,14 +197,19 @@ class Server extends EventEmitter { * @memberof Server * @param {Socket} socket The received connection socket */ - _handleRequest(socket) { - var client = new Client(socket, { + handleRequest(socket) { + var client = this.createClient(socket, { adapter: this.options.adapter, encoder: this.options.encoder, channels: this.channels }); this.connections.push(client); + client.on('disconnect', (socket) => { + this.emit('disconnect', socket); + this.emit('disconnection', socket); + }); this.emit('connection', client); + this.emit('connect', client); return client; } } diff --git a/src/adapters/ipc.adapter.js b/src/adapters/ipc.adapter.js index 08b6a72..746ae24 100644 --- a/src/adapters/ipc.adapter.js +++ b/src/adapters/ipc.adapter.js @@ -11,8 +11,6 @@ const net = require('net'); const fs = require('fs'); -const debug = require('debug')('kalm'); - /* Local variables -----------------------------------------------------------*/ const _path = '/tmp/app.socket-'; @@ -27,14 +25,11 @@ const _path = '/tmp/app.socket-'; */ function listen(server, callback) { fs.unlink(_path + server.options.port, () => { - server.listener = net.createServer(server._handleRequest.bind(server)); + server.listener = net.createServer(server.handleRequest.bind(server)); server.listener.listen(_path + server.options.port, callback); - server.listener.on('error', (err) => { - debug('error: ' + err); - server.emit('error', err); - }); + server.listener.on('error', server.handleError.bind(server)); }); -}; +} /** * Stops the server. @@ -43,14 +38,7 @@ function listen(server, callback) { * @param {function} callback The success callback for the operation */ function stop(server, callback) { - server.connections.forEach((e) => { - e.socket.destroy(); - }); - - process.nextTick(() => { - server.connections.length = 0; - server.listener.close(callback || function() {}); - }); + server.listener.close(callback); } /** @@ -61,7 +49,7 @@ function stop(server, callback) { */ function send(socket, payload) { if (socket) socket.write(payload); -}; +} /** * Creates a client and adds the data listener(s) to it @@ -74,27 +62,29 @@ function createSocket(client, socket) { if (!socket) { socket = net.connect(_path + client.options.port); } - socket.on('data', client._handleRequest.bind(client)); - socket.on('error', (err) => { - debug('error: ' + err); - client.emit('error', err); - }); + socket.on('data', client.handleRequest.bind(client)); + + // Emit on error + socket.on('error', client.handleError.bind(client)); + + // Emit on connect + socket.on('connect', client.handleConnect.bind(client)); // Will auto-reconnect - socket.on('close', () => { - client.socket = null; - }); + socket.on('close', client.handleDisconnect.bind(client)); return socket; -}; +} /** - * Attempts to disconnect the socket + * Attempts to disconnect the client's connection * @method disconnect - * @param {Socket} socket The socket to disconnect + * @param {Client} client The client to disconnect */ -function disconnect(socket) { - if (socket.disconnect) socket.disconnect(); +function disconnect(client) { + if (client.socket && client.socket.destroy) { + client.socket.destroy(); + } } /* Exports -------------------------------------------------------------------*/ diff --git a/src/adapters/tcp.adapter.js b/src/adapters/tcp.adapter.js index 1063ca1..ac2a339 100644 --- a/src/adapters/tcp.adapter.js +++ b/src/adapters/tcp.adapter.js @@ -10,8 +10,6 @@ const net = require('net'); -const debug = require('debug')('kalm'); - /* Methods -------------------------------------------------------------------*/ /** @@ -21,12 +19,9 @@ const debug = require('debug')('kalm'); * @param {function} callback The success callback for the operation */ function listen(server, callback) { - server.listener = net.createServer(server._handleRequest.bind(server)); + server.listener = net.createServer(server.handleRequest.bind(server)); server.listener.listen(server.options.port, callback); - server.listener.on('error', (err) => { - debug('error: ' + err); - server.emit('error', err); - }); + server.listener.on('error', server.handleError.bind(server)); } /** @@ -46,14 +41,7 @@ function send(socket, payload) { * @param {function} callback The success callback for the operation */ function stop(server, callback) { - server.connections.forEach((e) => { - e.socket.destroy(); - }); - - process.nextTick(() => { - server.connections.length = 0; - server.listener.close(callback || function() {}); - }); + server.listener.close(callback); } /** @@ -67,27 +55,29 @@ function createSocket(client, socket) { if (!socket) { socket = net.connect(client.options.port, client.options.hostname); } - socket.on('data', client._handleRequest.bind(client)); - socket.on('error', (err) => { - debug('error: ' + err); - client.emit('error', err); - }); + socket.on('data', client.handleRequest.bind(client)); + + // Emit on error + socket.on('error', client.handleError.bind(client)); + + // Emit on connect + socket.on('connect', client.handleConnect.bind(client)); // Will auto-reconnect - socket.on('close', () => { - client.socket = null; - }); + socket.on('close', client.handleDisconnect.bind(client)); return socket; } /** - * Attempts to disconnect the socket + * Attempts to disconnect the client's connection * @method disconnect - * @param {Socket} socket The socket to disconnect + * @param {Client} client The client to disconnect */ -function disconnect(socket) { - if (socket.disconnect) socket.disconnect(); +function disconnect(client) { + if (client.socket && client.socket.destroy) { + client.socket.destroy(); + } } /* Exports -------------------------------------------------------------------*/ diff --git a/src/adapters/udp.adapter.js b/src/adapters/udp.adapter.js index 51f17d2..0beac0f 100644 --- a/src/adapters/udp.adapter.js +++ b/src/adapters/udp.adapter.js @@ -10,8 +10,6 @@ const dgram = require('dgram'); -const debug = require('debug')('kalm'); - /* Helpers -------------------------------------------------------------------*/ /** @@ -22,15 +20,16 @@ function _handleNewSocket(data, origin) { if (!this.__clients) this.__clients = {}; if (!(key in this.__clients)) { - this.__clients[key] = this._handleRequest(createSocket({ - options: { - hostname: origin.address, - port: origin.port - } - })); + this.__clients[key] = this.createClient({}, { + hostname: origin.address, + port: origin.port, + adapter: 'udp', + encoder: this.options.encoder, + channels: this.channels + }); } - this.__clients[key]._handleRequest(data); + this.__clients[key].handleRequest(data); } /* Methods -------------------------------------------------------------------*/ @@ -44,14 +43,11 @@ function _handleNewSocket(data, origin) { function listen(server, callback) { server.listener = dgram.createSocket('udp4'); server.listener.on('message', _handleNewSocket.bind(server)); - server.listener.on('error', (err) => { - debug('error: ' + err); - server.emit('error', err); - }); + server.listener.on('error', server.handleError.bind(server)); server.listener.bind(server.options.port, '127.0.0.1'); - callback(); -}; + return callback(); +} /** * Sends a message with a socket client @@ -67,7 +63,7 @@ function send(socket, payload) { socket.__port, socket.__hostname ); -}; +} /** * Stops the server. @@ -76,11 +72,7 @@ function send(socket, payload) { * @param {function} callback The success callback for the operation */ function stop(server, callback) { - server.connections.length = 0; - if (server.listener && server.listener.close) { - server.listener.close(callback); - } - else callback(); + server.listener.close(callback); } /** @@ -96,20 +88,22 @@ function createSocket(client, soc) { var socket = dgram.createSocket('udp4'); socket.__port = client.options.port; socket.__hostname = client.options.hostname; - socket.on('error', (err) => { - debug('error: ' + err); - client.emit('error', err); - }); + + // Emit on error + socket.on('error', client.handleError.bind(client)); + + // Emit on connect + process.nextTick(client.handleConnect.bind(client)); return socket; -}; +} /** - * Attempts to disconnect the socket + * Attempts to disconnect the client's connection * @method disconnect - * @param {Socket} socket The socket to disconnect + * @param {Client} client The client to disconnect */ -function disconnect(socket) { +function disconnect() { // Nothing to do } diff --git a/src/defaults.js b/src/defaults.js index 86979ee..baad258 100644 --- a/src/defaults.js +++ b/src/defaults.js @@ -11,6 +11,7 @@ module.exports = { port: 3000, adapter: 'tcp', encoder: 'msg-pack', + stats: false, bundler: { maxPackets: 2048, delay: 16 diff --git a/tests/benchmarks/adapters/kalm.js b/tests/benchmarks/adapters/kalm.js index d80b53b..6312d21 100644 --- a/tests/benchmarks/adapters/kalm.js +++ b/tests/benchmarks/adapters/kalm.js @@ -26,7 +26,7 @@ function setup(resolve) { encoder: settings.encoder, }); - server.channel(settings.testChannel, function() { + server.subscribe(settings.testChannel, function() { count++; }); diff --git a/tests/index.js b/tests/index.js deleted file mode 100644 index 072a720..0000000 --- a/tests/index.js +++ /dev/null @@ -1,218 +0,0 @@ -/** - * Kalm test suite - */ - -'use strict'; - -/* Requires ------------------------------------------------------------------*/ - -var assert = require('chai').assert; -var Kalm = require('../index'); - -/* Models --------------------------------------------------------------------*/ - -var adapterFormat = { - listen: function() {}, - send: function() {}, - stop: function() {}, - createSocket: function() {}, - disconnect: function() {} -}; - -var encoderFormat = { - encode: function() {}, - decode: function() {} -}; - -/* Suite ---------------------------------------------------------------------*/ - -describe('Index', function() { - it('Kalm', function() { - assert.property(Kalm, 'Client', 'Client not exposed in Kalm index'); - assert.property(Kalm, 'Server', 'Server not exposed in Kalm index'); - assert.property(Kalm, 'adapters', 'adapters not exposed in Kalm index'); - assert.property(Kalm, 'encoders', 'encoders not exposed in Kalm index'); - }); -}); - -describe('Adapters', function() { - - it('index', function() { - assert.isFunction(Kalm.adapters.register, 'register method not valid in Kalm adapters'); - assert.isFunction(Kalm.adapters.resolve, 'resolve method not valid in Kalm adapters'); - }); - - describe('bundled', function() { - it('ipc', function() { - var ipc_test = Kalm.adapters.resolve('ipc'); - assert.isObject(ipc_test, 'ipc is not a valid adapter object'); - allMembersTypeMatch(ipc_test, adapterFormat); - }); - - it('tcp', function() { - var tcp_test = Kalm.adapters.resolve('tcp'); - assert.isObject(tcp_test, 'tcp is not a valid adapter object'); - allMembersTypeMatch(tcp_test, adapterFormat); - }); - - it('udp', function() { - var udp_test = Kalm.adapters.resolve('udp'); - assert.isObject(udp_test, 'udp is not a valid adapter object'); - allMembersTypeMatch(udp_test, adapterFormat); - }); - }); - - describe('methods', function() { - it('register', function() { - Kalm.adapters.register('test', adapterFormat); - Kalm.adapters.register('test2', null); - }); - - it('resolve', function() { - assert.deepEqual(Kalm.adapters.resolve('test'), adapterFormat); - assert.equal(Kalm.adapters.resolve('test2'), null); - assert.equal(Kalm.adapters.resolve('test3'), null); - }); - }); -}); - -describe('Encoders', function() { - - it('index', function() { - assert.isFunction(Kalm.encoders.register, 'register method not valid in Kalm encoders'); - assert.isFunction(Kalm.encoders.resolve, 'resolve method not valid in Kalm encoders'); - }); - - describe('bundled', function() { - var objTest = {foo: 'bar'}; - var strTest = '{"foo":"bar}'; - - it('json', function() { - var json_test = Kalm.encoders.resolve('json'); - assert.isObject(json_test, 'json is not a valid encoder object'); - allMembersTypeMatch(json_test, encoderFormat); - - assert.instanceOf(json_test.encode(objTest), Buffer, 'json encoder does not output a buffer'); - assert.deepEqual(json_test.decode(json_test.encode(objTest)), objTest, 'Object is not the same after json decoding.'); - }); - - it('msg-pack', function() { - var msg_test = Kalm.encoders.resolve('msg-pack'); - assert.isObject(msg_test, 'msg-pack is not a valid encoder object'); - allMembersTypeMatch(msg_test, encoderFormat); - - assert.instanceOf(msg_test.encode(objTest), Buffer, 'msg-pack encoder does not output a buffer'); - assert.deepEqual(msg_test.decode(msg_test.encode(objTest)), objTest, 'Object is not the same after msg-pack decoding.'); - }); - }); - - describe('methods', function() { - it('register', function() { - Kalm.encoders.register('test', encoderFormat); - Kalm.encoders.register('test2', null); - }); - - it('resolve', function() { - assert.deepEqual(Kalm.encoders.resolve('test'), encoderFormat); - assert.equal(Kalm.encoders.resolve('test2'), null); - assert.equal(Kalm.encoders.resolve('test3'), null); - }); - }); -}); - -describe('Smoke test', function() { - var server; - var client; - - it('run ipc + json', function(done) { - server = new Kalm.Server({adapter:'ipc', encoder:'json'}); - server.channel('test', function(data) { - assert.deepEqual(data, {foo:'bar'}); - server.stop(done); - }); - - server.on('ready', function() { - client = new Kalm.Client({adapter:'ipc', encoder:'json'}); - client.send('test', {foo:'bar'}); - }); - }); - - it('run ipc + msg-pack', function(done) { - server = new Kalm.Server({adapter:'ipc', encoder: 'msg-pack'}); - server.channel('test', function(data) { - assert.deepEqual(data, {foo:'bar'}); - server.stop(done); - }); - - server.on('ready', function() { - client = new Kalm.Client({adapter:'ipc', encoder: 'msg-pack'}); - client.send('test', {foo:'bar'}); - }); - }); - - it('run tcp + json', function(done) { - server = new Kalm.Server({adapter:'tcp', encoder:'json'}); - server.channel('test', function(data) { - assert.deepEqual(data, {foo:'bar'}); - server.stop(done); - }); - - server.on('ready', function() { - client = new Kalm.Client({adapter:'tcp', encoder:'json'}); - client.send('test', {foo:'bar'}); - }); - }); - - it('run tcp + msg-pack', function(done) { - server = new Kalm.Server({encoder: 'msg-pack', adapter:'tcp'}); - server.channel('test', function(data) { - assert.deepEqual(data, {foo:'bar'}); - server.stop(done); - }); - - server.on('ready', function() { - client = new Kalm.Client({encoder: 'msg-pack', adapter:'tcp'}); - client.send('test', {foo:'bar'}); - }); - }); - - it('run udp + json', function(done) { - server = new Kalm.Server({adapter:'udp', encoder:'json'}); - server.channel('test', function(data) { - assert.deepEqual(data, {foo:'bar'}); - server.stop(done); - }); - - server.on('ready', function() { - client = new Kalm.Client({adapter:'udp', encoder:'json'}); - client.send('test', {foo:'bar'}); - }); - }); - - it('run udp + msg-pack', function(done) { - server = new Kalm.Server({encoder: 'msg-pack', adapter:'udp'}); - server.channel('test', function(data) { - assert.deepEqual(data, {foo:'bar'}); - server.stop(done); - }); - - server.on('ready', function() { - client = new Kalm.Client({encoder: 'msg-pack', adapter:'udp'}); - client.send('test', {foo:'bar'}); - }); - }); -}); - -/* Tooling -------------------------------------------------------------------*/ - -/** - * Checks that all properties are present and of the proper type - */ -function allMembersTypeMatch(set1, model) { - for (var i in model) { - var type = typeof model[i]; - assert.property(set1, i, 'property ' + i + ' is missing'); - assert.typeOf(set1[i], type, 'property ' + i + ' should be ' + type); - } - return true; -} \ No newline at end of file diff --git a/tests/smoke.js b/tests/smoke.js new file mode 100644 index 0000000..76537d4 --- /dev/null +++ b/tests/smoke.js @@ -0,0 +1,95 @@ +/** + * Kalm smoke test suite + */ + +'use strict'; + +/* Requires ------------------------------------------------------------------*/ + +var assert = require('chai').assert; +var Kalm = require('../index'); + +/* Suite --------------------------------------------------------------------*/ + +describe('Smoke test', () => { + var server; + var client; + + it('run ipc + json', (done) => { + server = new Kalm.Server({adapter:'ipc', encoder:'json'}); + server.subscribe('test', (data) => { + assert.deepEqual(data, {foo:'bar'}); + server.stop(done); + }); + + server.on('ready', () => { + client = new Kalm.Client({adapter:'ipc', encoder:'json'}); + client.send('test', {foo:'bar'}); + }); + }); + + it('run ipc + msg-pack', (done) => { + server = new Kalm.Server({adapter:'ipc', encoder: 'msg-pack'}); + server.subscribe('test', (data) => { + assert.deepEqual(data, {foo:'bar'}); + server.stop(done); + }); + + server.on('ready', () => { + client = new Kalm.Client({adapter:'ipc', encoder: 'msg-pack'}); + client.send('test', {foo:'bar'}); + }); + }); + + it('run tcp + json', (done) => { + server = new Kalm.Server({adapter:'tcp', encoder:'json'}); + server.subscribe('test', (data) => { + assert.deepEqual(data, {foo:'bar'}); + server.stop(done); + }); + + server.on('ready', () => { + client = new Kalm.Client({adapter:'tcp', encoder:'json'}); + client.send('test', {foo:'bar'}); + }); + }); + + it('run tcp + msg-pack', (done) => { + server = new Kalm.Server({encoder: 'msg-pack', adapter:'tcp'}); + server.subscribe('test', (data) => { + assert.deepEqual(data, {foo:'bar'}); + server.stop(done); + }); + + server.on('ready', () => { + client = new Kalm.Client({encoder: 'msg-pack', adapter:'tcp'}); + client.send('test', {foo:'bar'}); + }); + }); + + it('run udp + json', (done) => { + server = new Kalm.Server({adapter:'udp', encoder:'json'}); + server.subscribe('test', (data) => { + assert.deepEqual(data, {foo:'bar'}); + server.stop(done); + }); + + server.on('ready', () => { + client = new Kalm.Client({adapter:'udp', encoder:'json'}); + client.send('test', {foo:'bar'}); + }); + }); + + it('run udp + msg-pack', (done) => { + server = new Kalm.Server({encoder: 'msg-pack', adapter:'udp'}); + server.subscribe('test', (data) => { + assert.deepEqual(data, {foo:'bar'}); + server.stop(done); + }); + + server.on('ready', () => { + client = new Kalm.Client({encoder: 'msg-pack', adapter:'udp'}); + client.send('test', {foo:'bar'}); + }); + }); +}); \ No newline at end of file diff --git a/tests/unit.js b/tests/unit.js new file mode 100644 index 0000000..5e6d641 --- /dev/null +++ b/tests/unit.js @@ -0,0 +1,404 @@ +/** + * Kalm unit test suite + */ + +'use strict'; + +/* Requires ------------------------------------------------------------------*/ + +var assert = require('chai').assert; +var Kalm = require('../index'); +var Channel = require('../src/Channel'); +var EventEmitter = require('events').EventEmitter; + +/* Suite --------------------------------------------------------------------*/ + +var adapterFormat = { + listen: function() {}, + send: function() {}, + stop: function() {}, + createSocket: function() {}, + disconnect: function() {} +}; + +var encoderFormat = { + encode: function() {}, + decode: function() {} +}; + +/* Suite ---------------------------------------------------------------------*/ + +describe('Index', () => { + it('Kalm', () => { + assert.property(Kalm, 'Client', 'Client not exposed in Kalm index'); + assert.property(Kalm, 'Server', 'Server not exposed in Kalm index'); + assert.property(Kalm, 'adapters', 'adapters not exposed in Kalm index'); + assert.property(Kalm, 'encoders', 'encoders not exposed in Kalm index'); + assert.property(Kalm, 'defaults', 'defaults not exposed in Kalm index'); + }); +}); + +describe('Adapters', () => { + + it('index', () => { + assert.isFunction(Kalm.adapters.register, 'register method not valid in Kalm adapters'); + assert.isFunction(Kalm.adapters.resolve, 'resolve method not valid in Kalm adapters'); + }); + + describe('bundled', () => { + it('ipc', () => { + var ipc_test = Kalm.adapters.resolve('ipc'); + assert.isObject(ipc_test, 'ipc is not a valid adapter object'); + allMembersTypeMatch(ipc_test, adapterFormat); + }); + + it('tcp', () => { + var tcp_test = Kalm.adapters.resolve('tcp'); + assert.isObject(tcp_test, 'tcp is not a valid adapter object'); + allMembersTypeMatch(tcp_test, adapterFormat); + }); + + it('udp', () => { + var udp_test = Kalm.adapters.resolve('udp'); + assert.isObject(udp_test, 'udp is not a valid adapter object'); + allMembersTypeMatch(udp_test, adapterFormat); + }); + }); + + describe('methods', () => { + it('register', () => { + Kalm.adapters.register('test', adapterFormat); + Kalm.adapters.register('test2', null); + }); + + it('resolve', () => { + assert.deepEqual(Kalm.adapters.resolve('test'), adapterFormat); + assert.equal(Kalm.adapters.resolve('test2'), null); + assert.equal(Kalm.adapters.resolve('test3'), null); + }); + }); +}); + +describe('Encoders', () => { + + it('index', () => { + assert.isFunction(Kalm.encoders.register, 'register method not valid in Kalm encoders'); + assert.isFunction(Kalm.encoders.resolve, 'resolve method not valid in Kalm encoders'); + }); + + describe('bundled', () => { + var objTest = {foo: 'bar'}; + var strTest = '{"foo":"bar}'; + + it('json', () => { + var json_test = Kalm.encoders.resolve('json'); + assert.isObject(json_test, 'json is not a valid encoder object'); + allMembersTypeMatch(json_test, encoderFormat); + + assert.instanceOf(json_test.encode(objTest), Buffer, 'json encoder does not output a buffer'); + assert.deepEqual(json_test.decode(json_test.encode(objTest)), objTest, 'Object is not the same after json decoding.'); + }); + + it('msg-pack', () => { + var msg_test = Kalm.encoders.resolve('msg-pack'); + assert.isObject(msg_test, 'msg-pack is not a valid encoder object'); + allMembersTypeMatch(msg_test, encoderFormat); + + assert.instanceOf(msg_test.encode(objTest), Buffer, 'msg-pack encoder does not output a buffer'); + assert.deepEqual(msg_test.decode(msg_test.encode(objTest)), objTest, 'Object is not the same after msg-pack decoding.'); + }); + }); + + describe('methods', () => { + it('register', () => { + Kalm.encoders.register('test', encoderFormat); + Kalm.encoders.register('test2', null); + }); + + it('resolve', () => { + assert.deepEqual(Kalm.encoders.resolve('test'), encoderFormat); + assert.equal(Kalm.encoders.resolve('test2'), null); + assert.equal(Kalm.encoders.resolve('test3'), null); + }); + }); +}); + +describe('Channel', () => { + + var channel = new Channel('test', Kalm.defaults.bundler, { + _emit: function() {}, + destroy: function() {} + }); + + it('constructor', () => { + assert.equal(channel.name, 'test'); + assert.deepEqual(channel.options, Kalm.defaults.bundler); + assert.equal(channel.splitBatches, true); + }); + + it('send', (done) => { + channel.send('foo'); + assert.isNotNull(channel._timer); + channel.send('foo2'); + assert.include(channel._packets, 'foo'); + assert.include(channel._packets, 'foo2'); + + setTimeout(() => { + assert.equal(channel._packets.length, 0); + assert.isNull(channel._timer); + done(); + }, Kalm.defaults.bundler.delay + 1); + }); + + it('sendOnce', (done) => { + channel.sendOnce('foo'); + assert.isNotNull(channel._timer); + assert.include(channel._packets, 'foo'); + channel.sendOnce('foo2'); + assert.notInclude(channel._packets, 'foo'); + assert.include(channel._packets, 'foo2'); + + setTimeout(() => { + assert.equal(channel._packets.length, 0); + assert.isNull(channel._timer); + done(); + }, Kalm.defaults.bundler.delay + 1); + }); + + it('addHandler', () => { + var testHandler = function foo() {}; + + channel.addHandler(testHandler); + assert.include(channel._handlers, testHandler); + }); + + it('removeHandler', () => { + var testHandler = function foobar() {}; + + channel.addHandler(testHandler); + channel.removeHandler(testHandler); + assert.notInclude(channel._handlers, testHandler); + }); + + it('handleData', (done) => { + var testHandler = function(data) { + done(); + }; + + channel.addHandler(testHandler); + channel.handleData(['callDone']); + }); + + it('destroy', () => { + channel.destroy(); + }); +}); + +describe('Client', () => { + var testSocket = new EventEmitter(); + var testHandler = function() {}; + var client = new Kalm.Client(testSocket, { + adapter: 'ipc', + port: 9000, + channels: { + test: testHandler + } + }); + + it('constructor', () => { + assert.deepEqual(client.options, { + hostname: Kalm.defaults.hostname, + port: 9000, + adapter: 'ipc', + bundler: Kalm.defaults.bundler, + encoder: Kalm.defaults.encoder, + stats: Kalm.defaults.stats + }); + + assert.property(client.channels, 'test'); + assert.instanceOf(client.channels.test, Channel); + + assert.isNotNull(client.socket); + }); + + it('subscribe', () => { + var testHandler = function bar() {}; + client.subscribe('test-subscribe', testHandler); + assert.instanceOf(client.channels['test-subscribe'], Channel); + assert.include(client.channels['test-subscribe']._handlers, testHandler); + client.subscribe('test-subscribe-delay', testHandler, {delay:1}); + assert.equal(client.channels['test-subscribe-delay'].options.delay, 1); + }); + + it('unsubscribe', () => { + var testHandler = function unbar() {}; + client.subscribe('test-unsubscribe', testHandler); + client.unsubscribe('test-unsubscribe', testHandler); + assert.notInclude(client.channels['test-unsubscribe']._handlers, testHandler); + }); + + it('use', () => { + var socketReplacement = new EventEmitter(); + client.use(socketReplacement); + assert.isNotNull(client.socket); + }); + + it('handleError', (done) => { + client.once('error', () => { + done(); + }); + + client.handleError('test'); + }); + + it('handleConnect', (done) => { + client.once('connect', () => { + client.once('connection', () => { + done(); + }); + + client.handleConnect(); + }); + + client.handleConnect(); + }); + + it('handleDisconnect', (done) => { + client.once('disconnect', () => { + client.once('disconnection', () => { + done(); + }); + + client.handleDisconnect(); + }); + + client.handleDisconnect(); + }); + + it('send', () => { + client.send('test-send', 'test1'); + client.send('test-send', 'test2'); + assert.instanceOf(client.channels['test-send'], Channel); + assert.equal(client.channels['test-send']._packets.length, 2); + }); + + it('sendOnce', () => { + client.sendOnce('test-sendOnce', 'test1'); + client.sendOnce('test-sendOnce', 'test2'); + assert.instanceOf(client.channels['test-sendOnce'], Channel); + assert.equal(client.channels['test-sendOnce']._packets.length, 1); + }); + + it('handleRequest', (done) => { + var testPayload = ['test-handleRequest', [{foo: 'bar'}]]; + + client.subscribe('test-handleRequest', (data) => { + assert.deepEqual(data, testPayload[1][0]); + done(); + }); + + client.handleRequest(Kalm.encoders.resolve('msg-pack').encode(testPayload)); + }); + + it('destroy', () => { + client.destroy(); + assert.isNull(client.socket); + }); +}); + +describe('Server', () => { + var fooHandler = function() {}; + var server; + + it('constructor', (done) => { + server = new Kalm.Server({ + adapter: 'ipc', + port: 9000, + channels: { + 'foo': fooHandler + } + }); + + server.on('ready', () => { + assert.isNotNull(server.listener); + assert.deepEqual(server.options, { + adapter: 'ipc', + encoder: Kalm.defaults.encoder, + port: 9000 + }); + assert.isArray(server.connections); + assert.isObject(server.channels); + assert.equal(server.channels.foo, fooHandler); + done(); + }); + }); + + it('handleRequest', () => { + var testSocket = new EventEmitter(); + server.handleRequest(testSocket); + assert.equal(server.connections.length, 1); + assert.instanceOf(server.connections[0], Kalm.Client); + }); + + it('subscribe', () => { + var testHandler = function bar() {}; + server.subscribe('test-subscribe', testHandler); + assert.instanceOf(server.connections[0].channels['test-subscribe'], Channel); + assert.include(server.connections[0].channels['test-subscribe']._handlers, testHandler); + server.subscribe('test-subscribe-delay', testHandler, {delay:1}); + assert.equal(server.connections[0].channels['test-subscribe-delay'].options.delay, 1); + }); + + it('unsubscribe', () => { + var testHandler = function unbar() {}; + server.subscribe('test-unsubscribe', testHandler); + server.unsubscribe('test-unsubscribe', testHandler); + assert.notInclude(server.connections[0].channels['test-unsubscribe']._handlers, testHandler); + }); + + it('broadcast', () => { + var testSocket = new EventEmitter(); + server.handleRequest(testSocket); + assert.equal(server.connections.length, 2); + server.broadcast('test-broadcast', 'test'); + assert.include(server.connections[0].channels['test-broadcast']._packets, 'test'); + assert.include(server.connections[1].channels['test-broadcast']._packets, 'test'); + }); + + it('whisper', () => { + server.connections[0].subscribe('test-whisper'); + server.whisper('test-whisper', 'test'); + assert.include(server.connections[0].channels['test-whisper']._packets, 'test'); + assert.isUndefined(server.connections[1].channels['test-whisper']); + }); + + it('handleError', (done) => { + server.once('error', () => { + done(); + }); + + server.handleError('test'); + }); + + it('stop', (done) => { + server.stop(() => { + assert.isNull(server.listener); + assert.equal(server.connections.length, 0); + done(); + }); + }); +}); + +/* Tooling -------------------------------------------------------------------*/ + +/** + * Checks that all properties are present and of the proper type + */ +function allMembersTypeMatch(set1, model) { + for (var i in model) { + var type = typeof model[i]; + assert.property(set1, i, 'property ' + i + ' is missing'); + assert.typeOf(set1[i], type, 'property ' + i + ' should be ' + type); + } + return true; +} \ No newline at end of file