Skip to content

Commit 5c22fab

Browse files
authored
Merge pull request #30 from roadrunner-php/ping-pong-stream
Implement Ping-Pong strategy for worker with blocking relay
2 parents 97f966a + 075ddc6 commit 5c22fab

File tree

5 files changed

+100
-7
lines changed

5 files changed

+100
-7
lines changed

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
"ext-json": "*",
4242
"ext-sockets": "*",
4343
"psr/log": "^2.0|^3.0",
44-
"spiral/goridge": "^4.0",
44+
"spiral/goridge": "^4.1.0",
4545
"spiral/roadrunner": "^2023.1",
4646
"composer-runtime-api": "^2.0"
4747
},

src/Message/Command/Pong.php

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Spiral\RoadRunner\Message\Command;
6+
7+
use Spiral\RoadRunner\Message\SkipMessage;
8+
use Spiral\RoadRunner\Payload;
9+
10+
/**
11+
* @psalm-immutable
12+
*/
13+
final class Pong extends Payload implements SkipMessage
14+
{
15+
}

src/PayloadFactory.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
use Spiral\Goridge\Frame;
88
use Spiral\RoadRunner\Exception\RoadRunnerException;
99
use Spiral\RoadRunner\Message\Command\GetProcessId;
10+
use Spiral\RoadRunner\Message\Command\Pong;
1011
use Spiral\RoadRunner\Message\Command\StreamStop;
1112
use Spiral\RoadRunner\Message\Command\WorkerStop;
1213

@@ -26,6 +27,10 @@ public static function fromFrame(Frame $frame): Payload
2627
return new StreamStop($payload);
2728
}
2829

30+
if (($frame->byte10 & Frame::BYTE10_PONG) !== 0) {
31+
return new Pong($payload);
32+
}
33+
2934
return new Payload(
3035
\substr($payload, $frame->options[0]),
3136
\substr($payload, 0, $frame->options[0]),

src/StreamWorkerInterface.php

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Spiral\RoadRunner;
6+
7+
interface StreamWorkerInterface extends WorkerInterface
8+
{
9+
public function withStreamMode(): static;
10+
}

src/Worker.php

Lines changed: 69 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
namespace Spiral\RoadRunner;
66

77
use Psr\Log\LoggerInterface;
8+
use Spiral\Goridge\BlockingRelayInterface;
89
use Spiral\Goridge\Exception\GoridgeException;
910
use Spiral\Goridge\Exception\TransportException;
1011
use Spiral\Goridge\Frame;
@@ -13,6 +14,7 @@
1314
use Spiral\RoadRunner\Exception\RoadRunnerException;
1415
use Spiral\RoadRunner\Internal\StdoutHandler;
1516
use Spiral\RoadRunner\Message\Command\GetProcessId;
17+
use Spiral\RoadRunner\Message\Command\Pong;
1618
use Spiral\RoadRunner\Message\Command\WorkerStop;
1719
use Spiral\RoadRunner\Message\SkipMessage;
1820

@@ -27,13 +29,19 @@
2729
* }
2830
* </code>
2931
*/
30-
class Worker implements WorkerInterface
32+
class Worker implements StreamWorkerInterface
3133
{
3234
private const JSON_ENCODE_FLAGS = \JSON_THROW_ON_ERROR | \JSON_PRESERVE_ZERO_FRACTION;
3335

3436
/** @var array<int, Payload> */
3537
private array $payloads = [];
3638

39+
private bool $streamMode = false;
40+
/** @var int<0, max> Count of frames sent in stream mode */
41+
private int $framesSent = 0;
42+
private bool $shouldPing = false;
43+
private bool $waitingPong = false;
44+
3745
public function __construct(
3846
private readonly RelayInterface $relay,
3947
bool $interceptSideEffects = true,
@@ -63,18 +71,33 @@ public function waitPayload(): ?Payload
6371
case $payload::class === Payload::class:
6472
return $payload;
6573
case $payload instanceof WorkerStop:
74+
$this->waitingPong = false;
6675
return null;
6776
case $payload::class === GetProcessId::class:
6877
$this->sendProcessId();
69-
// no break
78+
continue 2;
79+
case $payload instanceof Pong:
80+
$this->waitingPong = false;
81+
continue 2;
7082
case $payload instanceof SkipMessage:
7183
continue 2;
7284
}
7385
}
7486
}
7587

88+
public function withStreamMode(): static
89+
{
90+
$clone = clone $this;
91+
$clone->streamMode = true;
92+
$clone->framesSent = 0;
93+
$clone->shouldPing = false;
94+
$clone->waitingPong = false;
95+
return $clone;
96+
}
97+
7698
public function respond(Payload $payload): void
7799
{
100+
$this->streamMode and ++$this->framesSent;
78101
$this->send($payload->body, $payload->header, $payload->eos);
79102
}
80103

@@ -133,7 +156,7 @@ private function findPayload(string $class = null): ?int
133156
}
134157

135158
$payload = $this->pullPayload();
136-
if ($payload === null) {
159+
if ($payload === null || $payload instanceof Pong) {
137160
break;
138161
}
139162

@@ -151,20 +174,40 @@ private function findPayload(string $class = null): ?int
151174
*/
152175
private function pullPayload(): ?Payload
153176
{
177+
if (!$this->waitingPong && $this->relay instanceof BlockingRelayInterface) {
178+
if (!$this->streamMode) {
179+
return null;
180+
}
181+
182+
$this->haveToPing();
183+
return null;
184+
}
185+
154186
if (!$this->relay->hasFrame()) {
155187
return null;
156188
}
157189

158190
$frame = $this->relay->waitFrame();
159-
return PayloadFactory::fromFrame($frame);
191+
$payload = PayloadFactory::fromFrame($frame);
192+
193+
if ($payload instanceof Pong) {
194+
$this->waitingPong = false;
195+
return null;
196+
}
197+
198+
return $payload;
160199
}
161200

162201
private function send(string $body = '', string $header = '', bool $eos = true): void
163202
{
164203
$frame = new Frame($header . $body, [\strlen($header)]);
165204

166205
if (!$eos) {
167-
$frame->byte10 = Frame::BYTE10_STREAM;
206+
$frame->byte10 |= Frame::BYTE10_STREAM;
207+
}
208+
209+
if ($this->shouldPing) {
210+
$frame->byte10 |= Frame::BYTE10_PING;
168211
}
169212

170213
$this->sendFrame($frame);
@@ -173,6 +216,12 @@ private function send(string $body = '', string $header = '', bool $eos = true):
173216
private function sendFrame(Frame $frame): void
174217
{
175218
try {
219+
if ($this->streamMode && ($frame->byte10 & Frame::BYTE10_STREAM) && $this->shouldPing) {
220+
$frame->byte10 |= Frame::BYTE10_PING;
221+
$this->shouldPing = false;
222+
$this->waitingPong = true;
223+
}
224+
176225
$this->relay->send($frame);
177226
} catch (GoridgeException $e) {
178227
throw new TransportException($e->getMessage(), $e->getCode(), $e);
@@ -208,8 +257,11 @@ public static function createFromEnvironment(
208257
bool $interceptSideEffects = true,
209258
LoggerInterface $logger = new Logger(),
210259
): self {
260+
$address = $env->getRelayAddress();
261+
\assert($address !== '', 'Relay address must be specified in environment');
262+
211263
return new self(
212-
relay: Relay::create($env->getRelayAddress()),
264+
relay: Relay::create($address),
213265
interceptSideEffects: $interceptSideEffects,
214266
logger: $logger
215267
);
@@ -221,4 +273,15 @@ private function sendProcessId(): static
221273
$this->sendFrame($frame);
222274
return $this;
223275
}
276+
277+
private function haveToPing(): void
278+
{
279+
if ($this->waitingPong || $this->framesSent === 0) {
280+
return;
281+
}
282+
283+
if ($this->framesSent % 5 === 0) {
284+
$this->shouldPing = true;
285+
}
286+
}
224287
}

0 commit comments

Comments
 (0)