-
Notifications
You must be signed in to change notification settings - Fork 70
/
Copy pathtransforms.js
40 lines (33 loc) · 884 Bytes
/
transforms.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
var nano = require('..')
var pub = nano.socket('pub');
var sub = nano.socket('sub');
pub.bind('inproc://transform');
sub.connect('inproc://transform');
/**
* minimal transform stream implementation
*/
require('util').inherits(thr, require('stream').Transform);
function thr(fn){
this._transform = fn;
require('stream').Transform.call(this);
}
/**
* pipe from a readable source to minimal transform streams
*/
var t = new thr(function (msg, _, cb){
process.stdout.write('transformed: ' + msg + '\n'); //write to stdout stream
this.emit('destroy');
cb();
});
sub
.pipe(new thr(function (msg, _, cb){
console.log('original: ' + msg); //original: hello from nanømsg
this.push(msg + ' and cheers!');
return cb();
}))
.pipe(t);
pub.send('hello from nanømsg');
t.on('destroy', cleanup); //do some cleanup
function cleanup(){
return sub.close();
}