@@ -100,9 +100,10 @@ public function handleClient(
100
100
$ this ->insertTimeout ();
101
101
102
102
$ headerSizeLimit = $ this ->headerSizeLimit ;
103
+ $ cancellation = $ this ->deferredCancellation ->getCancellation ();
103
104
104
105
try {
105
- $ buffer = $ readableStream ->read ();
106
+ $ buffer = $ readableStream ->read ($ cancellation );
106
107
if ($ buffer === null ) {
107
108
$ this ->removeTimeout ();
108
109
return ;
@@ -141,7 +142,7 @@ public function handleClient(
141
142
);
142
143
}
143
144
144
- $ chunk = $ readableStream ->read ();
145
+ $ chunk = $ readableStream ->read ($ cancellation );
145
146
if ($ chunk === null ) {
146
147
return ;
147
148
}
@@ -413,7 +414,8 @@ public function handleClient(
413
414
$ this ->suspendTimeout ();
414
415
415
416
$ this ->currentBuffer = $ buffer ;
416
- $ this ->handleRequest ($ request );
417
+ $ this ->pendingResponse = async ($ this ->handleRequest (...), $ request );
418
+ $ this ->pendingResponse ->await ();
417
419
$ this ->pendingResponseCount --;
418
420
419
421
continue ;
@@ -486,7 +488,7 @@ static function (int $bodySize) use (&$bodySizeLimit): void {
486
488
);
487
489
}
488
490
489
- $ chunk = $ this ->readableStream ->read ();
491
+ $ chunk = $ this ->readableStream ->read ($ cancellation );
490
492
if ($ chunk === null ) {
491
493
return ;
492
494
}
@@ -514,7 +516,7 @@ static function (int $bodySize) use (&$bodySizeLimit): void {
514
516
515
517
if ($ chunkLengthRemaining === 0 ) {
516
518
while (!isset ($ buffer [1 ])) {
517
- $ chunk = $ readableStream ->read ();
519
+ $ chunk = $ readableStream ->read ($ cancellation );
518
520
if ($ chunk === null ) {
519
521
return ;
520
522
}
@@ -546,7 +548,7 @@ static function (int $bodySize) use (&$bodySizeLimit): void {
546
548
);
547
549
}
548
550
549
- $ chunk = $ this ->readableStream ->read ();
551
+ $ chunk = $ this ->readableStream ->read ($ cancellation );
550
552
if ($ chunk === null ) {
551
553
return ;
552
554
}
@@ -599,7 +601,7 @@ static function (int $bodySize) use (&$bodySizeLimit): void {
599
601
$ remaining -= $ bodyBufferSize ;
600
602
}
601
603
602
- $ body = $ readableStream ->read ();
604
+ $ body = $ readableStream ->read ($ cancellation );
603
605
if ($ body === null ) {
604
606
return ;
605
607
}
@@ -635,7 +637,7 @@ static function (int $bodySize) use (&$bodySizeLimit): void {
635
637
$ bufferLength = \strlen ($ buffer );
636
638
637
639
if (!$ bufferLength ) {
638
- $ chunk = $ readableStream ->read ();
640
+ $ chunk = $ readableStream ->read ($ cancellation );
639
641
if ($ chunk === null ) {
640
642
return ;
641
643
}
@@ -647,7 +649,7 @@ static function (int $bodySize) use (&$bodySizeLimit): void {
647
649
// These first two (extreme) edge cases prevent errors where the packet boundary ends after
648
650
// the \r and before the \n at the end of a chunk.
649
651
if ($ bufferLength === $ chunkLengthRemaining || $ bufferLength === $ chunkLengthRemaining + 1 ) {
650
- $ chunk = $ readableStream ->read ();
652
+ $ chunk = $ readableStream ->read ($ cancellation );
651
653
if ($ chunk === null ) {
652
654
return ;
653
655
}
@@ -704,7 +706,7 @@ static function (int $bodySize) use (&$bodySizeLimit): void {
704
706
$ bodySize += $ bodyBufferSize ;
705
707
}
706
708
707
- $ chunk = $ readableStream ->read ();
709
+ $ chunk = $ readableStream ->read ($ cancellation );
708
710
if ($ chunk === null ) {
709
711
return ;
710
712
}
@@ -756,6 +758,12 @@ static function (int $bodySize) use (&$bodySizeLimit): void {
756
758
}
757
759
} catch (StreamException ) {
758
760
// Client disconnected, finally block will clean up.
761
+ } catch (CancelledException ) {
762
+ // Server shutting down.
763
+ if ($ this ->bodyQueue === null || !$ this ->pendingResponseCount ) {
764
+ // Send a service unavailable response only if another response has not already been sent.
765
+ $ this ->sendServiceUnavailableResponse ($ request ?? null )->await ();
766
+ }
759
767
} finally {
760
768
$ this ->pendingResponse ->finally (function (): void {
761
769
$ this ->removeTimeout ();
@@ -1023,6 +1031,19 @@ private function upgrade(Request $request, Response $response): void
1023
1031
}
1024
1032
}
1025
1033
1034
+ /**
1035
+ * Creates a service unavailable response from the error handler and sends that response to the client.
1036
+ *
1037
+ * @return Future<void>
1038
+ */
1039
+ private function sendServiceUnavailableResponse (?Request $ request ): Future
1040
+ {
1041
+ $ response = $ this ->errorHandler ->handleError (HttpStatus::SERVICE_UNAVAILABLE , request: $ request );
1042
+ $ response ->setHeader ("connection " , "close " );
1043
+
1044
+ return $ this ->lastWrite = async ($ this ->send (...), $ this ->lastWrite , $ response );
1045
+ }
1046
+
1026
1047
/**
1027
1048
* Creates an error response from the error handler and sends that response to the client.
1028
1049
*
@@ -1062,6 +1083,7 @@ public function stop(): void
1062
1083
1063
1084
$ this ->pendingResponse ->await ();
1064
1085
$ this ->lastWrite ?->await();
1086
+ $ this ->deferredCancellation ->cancel ();
1065
1087
}
1066
1088
1067
1089
public function getApplicationLayerProtocols (): array
0 commit comments