@@ -164,6 +164,9 @@ public final class HTTPServerPipelineHandler: ChannelDuplexHandler, RemovableCha
164
164
165
165
/// Quiescing and the last request's `.end` has been seen which means we no longer accept any input.
166
166
case quiescingLastRequestEndReceived
167
+
168
+ /// Quiescing and we have issued a channel close. Further I/O here is not expected, and won't be managed.
169
+ case quiescingCompleted
167
170
}
168
171
169
172
private var lifecycleState : LifecycleState = . acceptingEvents
@@ -174,8 +177,13 @@ public final class HTTPServerPipelineHandler: ChannelDuplexHandler, RemovableCha
174
177
private var nextExpectedOutboundMessage : NextExpectedMessageType ?
175
178
176
179
public func channelRead( context: ChannelHandlerContext , data: NIOAny ) {
177
- guard self . lifecycleState != . quiescingLastRequestEndReceived else {
180
+ switch self . lifecycleState {
181
+ case . quiescingLastRequestEndReceived, . quiescingCompleted:
182
+ // We're done, no more data for you.
178
183
return
184
+ case . acceptingEvents, . quiescingWaitingForRequestEnd:
185
+ // Still accepting I/O
186
+ ( )
179
187
}
180
188
181
189
if self . eventBuffer. count != 0 || self . state == . responseEndPending {
@@ -187,7 +195,8 @@ public final class HTTPServerPipelineHandler: ChannelDuplexHandler, RemovableCha
187
195
}
188
196
189
197
private func deliverOneMessage( context: ChannelHandlerContext , data: NIOAny ) {
190
- assert ( self . lifecycleState != . quiescingLastRequestEndReceived,
198
+ assert ( self . lifecycleState != . quiescingLastRequestEndReceived &&
199
+ self . lifecycleState != . quiescingCompleted,
191
200
" deliverOneMessage called in lifecycle illegal state \( self . lifecycleState) " )
192
201
let msg = self . unwrapInboundIn ( data)
193
202
@@ -216,6 +225,7 @@ public final class HTTPServerPipelineHandler: ChannelDuplexHandler, RemovableCha
216
225
self . eventBuffer. removeAll ( )
217
226
}
218
227
if self . lifecycleState == . quiescingLastRequestEndReceived && self . state == . idle {
228
+ self . lifecycleState = . quiescingCompleted
219
229
context. close ( promise: nil )
220
230
}
221
231
case . body:
@@ -248,7 +258,7 @@ public final class HTTPServerPipelineHandler: ChannelDuplexHandler, RemovableCha
248
258
self . eventBuffer. removeAll ( )
249
259
case . idle:
250
260
// we're completely idle, let's just close
251
- self . lifecycleState = . quiescingLastRequestEndReceived
261
+ self . lifecycleState = . quiescingCompleted
252
262
self . eventBuffer. removeAll ( )
253
263
context. close ( promise: nil )
254
264
case . requestEndPending, . requestAndResponseEndPending:
@@ -313,11 +323,17 @@ public final class HTTPServerPipelineHandler: ChannelDuplexHandler, RemovableCha
313
323
// we just received the .end that we're missing so we can fall through to closing the connection
314
324
fallthrough
315
325
case . quiescingLastRequestEndReceived:
326
+ self . lifecycleState = . quiescingCompleted
316
327
context. write ( data) . flatMap {
317
328
context. close ( )
318
329
} . cascade ( to: promise)
319
330
case . acceptingEvents, . quiescingWaitingForRequestEnd:
320
331
context. write ( data, promise: promise)
332
+ case . quiescingCompleted:
333
+ // Uh, why are we writing more data here? We'll write it, but it should be guaranteed
334
+ // to fail.
335
+ assertionFailure ( " Wrote in quiescing completed state " )
336
+ context. write ( data, promise: promise)
321
337
}
322
338
case . body, . head:
323
339
context. write ( data, promise: promise)
@@ -331,7 +347,11 @@ public final class HTTPServerPipelineHandler: ChannelDuplexHandler, RemovableCha
331
347
}
332
348
333
349
public func read( context: ChannelHandlerContext ) {
334
- if self . lifecycleState != . quiescingLastRequestEndReceived {
350
+ switch self . lifecycleState {
351
+ case . quiescingLastRequestEndReceived, . quiescingCompleted:
352
+ // We swallow all reads now, as we're going to close the connection.
353
+ ( )
354
+ case . acceptingEvents, . quiescingWaitingForRequestEnd:
335
355
if case . responseEndPending = self . state {
336
356
self . readPending = true
337
357
} else {
@@ -340,13 +360,75 @@ public final class HTTPServerPipelineHandler: ChannelDuplexHandler, RemovableCha
340
360
}
341
361
}
342
362
363
+ public func handlerRemoved( context: ChannelHandlerContext ) {
364
+ // We're being removed from the pipeline. We need to do a few things:
365
+ //
366
+ // 1. If we have buffered events, deliver them. While we shouldn't be
367
+ // re-entrantly called, we want to ensure that so we take a local copy.
368
+ // 2. If we are quiescing, we swallowed a quiescing event from the user: replay it,
369
+ // as the user has hopefully added a handler that will do something with this.
370
+ // 3. Finally, if we have a read pending, we need to release it.
371
+ //
372
+ // The basic theory here is that if there is anything we were going to do when we received
373
+ // either a request .end or a response .end, we do it now because there is no future for us.
374
+ // We also need to ensure we do not drop any data on the floor.
375
+ //
376
+ // At this stage we are no longer in the pipeline, so all further content should be
377
+ // blocked from reaching us. Thus we can avoid mutating our own internal state any
378
+ // longer.
379
+ let bufferedEvents = self . eventBuffer
380
+ for event in bufferedEvents {
381
+ switch event {
382
+ case . channelRead( let read) :
383
+ context. fireChannelRead ( read)
384
+ case . halfClose:
385
+ context. fireUserInboundEventTriggered ( ChannelEvent . inputClosed)
386
+ case . error( let error) :
387
+ context. fireErrorCaught ( error)
388
+ }
389
+ }
390
+
391
+
392
+ switch self . lifecycleState {
393
+ case . quiescingLastRequestEndReceived, . quiescingWaitingForRequestEnd:
394
+ context. fireUserInboundEventTriggered ( ChannelShouldQuiesceEvent ( ) )
395
+ case . acceptingEvents, . quiescingCompleted:
396
+ // Either we haven't quiesced, or we succeeded in doing it.
397
+ ( )
398
+ }
399
+
400
+ if self . readPending {
401
+ context. read ( )
402
+ }
403
+ }
404
+
405
+ public func channelInactive( context: ChannelHandlerContext ) {
406
+ // Welp, this channel isn't going to work anymore. We may as well drop our pending events here, as we
407
+ // cannot be expected to act on them any longer.
408
+ //
409
+ // Side note: it's important that we drop these. If we don't, handlerRemoved will deliver them all.
410
+ // While it's fair to immediately pipeline a channel where the user chose to remove the HTTPPipelineHandler,
411
+ // it's deeply unfair to do so to a user that didn't choose to do that, where it happened to them only because
412
+ // the channel closed.
413
+ //
414
+ // We set keepingCapacity to avoid this reallocating a buffer, as we'll just free it shortly anyway.
415
+ self . eventBuffer. removeAll ( keepingCapacity: true )
416
+ context. fireChannelInactive ( )
417
+ }
418
+
343
419
/// A response has been sent: we can now start passing reads through
344
420
/// again if there are no further pending requests, and send any read()
345
421
/// call we may have swallowed.
346
422
private func startReading( context: ChannelHandlerContext ) {
347
- if self . readPending && self . state != . responseEndPending && self . lifecycleState != . quiescingLastRequestEndReceived {
348
- self . readPending = false
349
- context. read ( )
423
+ if self . readPending && self . state != . responseEndPending {
424
+ switch self . lifecycleState {
425
+ case . quiescingLastRequestEndReceived, . quiescingCompleted:
426
+ // No more reads in these states.
427
+ ( )
428
+ case . acceptingEvents, . quiescingWaitingForRequestEnd:
429
+ self . readPending = false
430
+ context. read ( )
431
+ }
350
432
}
351
433
}
352
434
0 commit comments