Skip to content

Commit 286aa04

Browse files
committed
streams2: Abstract out onread function
1 parent f624ccb commit 286aa04

File tree

1 file changed

+61
-53
lines changed

1 file changed

+61
-53
lines changed

lib/_stream_readable.js

+61-53
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ var StringDecoder;
2828

2929
util.inherits(Readable, Stream);
3030

31-
function ReadableState(options) {
31+
function ReadableState(options, stream) {
3232
options = options || {};
3333

3434
this.bufferSize = options.bufferSize || 16 * 1024;
@@ -45,6 +45,8 @@ function ReadableState(options) {
4545
this.ended = false;
4646
this.endEmitted = false;
4747
this.reading = false;
48+
this.sync = false;
49+
this.onread = onread.bind(stream);
4850

4951
// whenever we return null, then we set a flag to say
5052
// that we're awaiting a 'readable' event emission.
@@ -144,59 +146,10 @@ Readable.prototype.read = function(n) {
144146
if (doRead) {
145147
var sync = true;
146148
state.reading = true;
149+
state.sync = true;
147150
// call internal read method
148-
this._read(state.bufferSize, function onread(er, chunk) {
149-
state.reading = false;
150-
if (er)
151-
return this.emit('error', er);
152-
153-
if (!chunk || !chunk.length) {
154-
// eof
155-
state.ended = true;
156-
if (state.decoder) {
157-
chunk = state.decoder.end();
158-
if (chunk && chunk.length) {
159-
state.buffer.push(chunk);
160-
state.length += chunk.length;
161-
}
162-
}
163-
// if we've ended and we have some data left, then emit
164-
// 'readable' now to make sure it gets picked up.
165-
if (!sync) {
166-
if (state.length > 0)
167-
this.emit('readable');
168-
else
169-
endReadable(this);
170-
}
171-
return;
172-
}
173-
174-
if (state.decoder)
175-
chunk = state.decoder.write(chunk);
176-
177-
// update the buffer info.
178-
if (chunk) {
179-
state.length += chunk.length;
180-
state.buffer.push(chunk);
181-
}
182-
183-
// if we haven't gotten enough to pass the lowWaterMark,
184-
// and we haven't ended, then don't bother telling the user
185-
// that it's time to read more data. Otherwise, that'll
186-
// probably kick off another stream.read(), which can trigger
187-
// another _read(n,cb) before this one returns!
188-
if (state.length <= state.lowWaterMark) {
189-
state.reading = true;
190-
this._read(state.bufferSize, onread.bind(this));
191-
return;
192-
}
193-
194-
if (state.needReadable && !sync) {
195-
state.needReadable = false;
196-
this.emit('readable');
197-
}
198-
}.bind(this));
199-
sync = false;
151+
this._read(state.bufferSize, state.onread);
152+
state.sync = false;
200153
}
201154

202155
// If _read called its callback synchronously, then `reading`
@@ -221,6 +174,61 @@ Readable.prototype.read = function(n) {
221174
return ret;
222175
};
223176

177+
function onread(er, chunk) {
178+
var state = this._readableState;
179+
var sync = state.sync;
180+
181+
state.reading = false;
182+
if (er)
183+
return this.emit('error', er);
184+
185+
if (!chunk || !chunk.length) {
186+
// eof
187+
state.ended = true;
188+
if (state.decoder) {
189+
chunk = state.decoder.end();
190+
if (chunk && chunk.length) {
191+
state.buffer.push(chunk);
192+
state.length += chunk.length;
193+
}
194+
}
195+
// if we've ended and we have some data left, then emit
196+
// 'readable' now to make sure it gets picked up.
197+
if (!sync) {
198+
if (state.length > 0)
199+
this.emit('readable');
200+
else
201+
endReadable(this);
202+
}
203+
return;
204+
}
205+
206+
if (state.decoder)
207+
chunk = state.decoder.write(chunk);
208+
209+
// update the buffer info.
210+
if (chunk) {
211+
state.length += chunk.length;
212+
state.buffer.push(chunk);
213+
}
214+
215+
// if we haven't gotten enough to pass the lowWaterMark,
216+
// and we haven't ended, then don't bother telling the user
217+
// that it's time to read more data. Otherwise, that'll
218+
// probably kick off another stream.read(), which can trigger
219+
// another _read(n,cb) before this one returns!
220+
if (state.length <= state.lowWaterMark) {
221+
state.reading = true;
222+
this._read(state.bufferSize, state.onread);
223+
return;
224+
}
225+
226+
if (state.needReadable && !sync) {
227+
state.needReadable = false;
228+
this.emit('readable');
229+
}
230+
}
231+
224232
// abstract method. to be overridden in specific implementation classes.
225233
// call cb(er, data) where data is <= n in length.
226234
// for virtual (non-string, non-buffer) streams, "length" is somewhat

0 commit comments

Comments
 (0)