diff --git a/README.md b/README.md index 5875d33..cc110d6 100644 --- a/README.md +++ b/README.md @@ -79,6 +79,35 @@ $client->listen('some_channel') $client->query("NOTIFY some_channel, 'Hello World'")->subscribe(); ``` +## Example - Connecting over TLS with CA certificate file +```php + +$client = new PgAsync\Client([ + "host" => "127.0.0.1", + "port" => "5432", + "user" => "matt", + "database" => "matt", + "tls" => "verify-full", + "tls_connector_flags" => [ + "cafile" => "/path/to/ca.crt", + ], +]); + +$client->query('SELECT * FROM channel')->subscribe( + function ($row) { + var_dump($row); + }, + function ($e) { + echo "Failed.\n"; + }, + function () { + echo "Complete.\n"; + } +); + + +``` + ## Install With [composer](https://getcomposer.org/) install into you project with: diff --git a/composer.json b/composer.json index 7c0172a..a4edd23 100644 --- a/composer.json +++ b/composer.json @@ -36,8 +36,9 @@ "php": ">=7.0.0", "voryx/event-loop": "^3.0 || ^2.0.2", "reactivex/rxphp": "^2.0", - "react/socket": "^1.0 || ^0.8 || ^0.7", - "evenement/evenement": "^2.0 | ^3.0" + "react/promise-stream": "^1.5", + "evenement/evenement": "^2.0 | ^3.0", + "wyrihaximus/react-opportunistic-tls": "dev-prepare-for-release" }, "require-dev": { "phpunit/phpunit": ">=8.5.23 || ^6.5.5", diff --git a/src/PgAsync/Connection.php b/src/PgAsync/Connection.php index 2a06107..463d446 100644 --- a/src/PgAsync/Connection.php +++ b/src/PgAsync/Connection.php @@ -12,6 +12,7 @@ use PgAsync\Command\PasswordMessage; use PgAsync\Command\SaslInitialResponse; use PgAsync\Command\SaslResponse; +use PgAsync\Command\SSLRequest; use PgAsync\Command\Sync; use PgAsync\Command\Terminate; use PgAsync\Message\Authentication; @@ -32,9 +33,13 @@ use PgAsync\Message\ReadyForQuery; use PgAsync\Message\RowDescription; use PgAsync\Command\StartupMessage; +use React\EventLoop\Loop; use React\EventLoop\LoopInterface; -use React\Socket\Connector; +use React\Promise\Promise; +use React\Socket\ConnectionInterface; +use WyriHaximus\React\Socket\Connector; use React\Socket\ConnectorInterface; +use WyriHaximus\React\Socket\OpportunisticTlsConnectionInterface; use React\Stream\DuplexStreamInterface; use Rx\Disposable\CallbackDisposable; use Rx\Disposable\EmptyDisposable; @@ -43,6 +48,8 @@ use Rx\ObserverInterface; use Rx\SchedulerInterface; use Rx\Subject\Subject; +use function React\Promise\resolve; +use function React\Promise\Stream\first; class Connection extends EventEmitter { @@ -73,6 +80,16 @@ class Connection extends EventEmitter const CONNECTION_NEEDED = 8; /* Internal state: connect() needed */ const CONNECTION_CLOSED = 9; + // Reference table: https://www.postgresql.org/docs/current/libpq-ssl.html#LIBPQ-SSL-PROTECTION + const TLS_MODE_DISABLE = 'disable'; + const TLS_MODE_ALLOW = 'allow'; + const TLS_MODE_PREFER = 'prefer'; + const TLS_MODE_REQUIRE = 'require'; + const TLS_MODE_VERIFY_CA = 'verify-ca'; + const TLS_MODE_VERIFY_FULL = 'verify-full'; + const TLS_MODE_LIST_FULL = [self::TLS_MODE_DISABLE, self::TLS_MODE_ALLOW, self::TLS_MODE_PREFER, self::TLS_MODE_REQUIRE, self::TLS_MODE_VERIFY_CA, self::TLS_MODE_VERIFY_FULL]; + const TLS_MODE_LIST_REQUIRED = [self::TLS_MODE_REQUIRE, self::TLS_MODE_VERIFY_CA, self::TLS_MODE_VERIFY_FULL]; + private $queryState; private $queryType; private $connStatus; @@ -134,6 +151,8 @@ class Connection extends EventEmitter /** @var bool */ private $auto_disconnect = false; + private $tls = self::TLS_MODE_PREFER; + private $tlsConnectorFlags = []; private $password; public function __construct(array $parameters, LoopInterface $loop, ConnectorInterface $connector = null) @@ -158,6 +177,19 @@ public function __construct(array $parameters, LoopInterface $loop, ConnectorInt unset($parameters['password']); } + if (array_key_exists('tls', $parameters)) { + if (!in_array($this->tls, self::TLS_MODE_LIST_FULL)) { + throw new \InvalidArgumentException('TLS mode must be one off "' . implode(', ', self::TLS_MODE_LIST_FULL) . ' but got "' . $parameters['tls'] . '" instead'); + } + $this->tls = $parameters['tls']; + unset($parameters['tls']); + } + + if (array_key_exists('tls_connector_flags', $parameters)) { + $this->tlsConnectorFlags = $parameters['tls_connector_flags']; + unset($parameters['tls_connector_flags']); + } + if (isset($parameters['auto_disconnect'])) { $this->auto_disconnect = $parameters['auto_disconnect']; unset($parameters['auto_disconnect']); @@ -172,8 +204,17 @@ public function __construct(array $parameters, LoopInterface $loop, ConnectorInt $this->queryState = static::STATE_BUSY; $this->queryType = static::QUERY_SIMPLE; $this->connStatus = static::CONNECTION_NEEDED; - $this->socket = $connector ?: new Connector($loop); - $this->uri = 'tcp://' . $parameters['host'] . ':' . $parameters['port']; + $this->socket = $connector ?: new Connector($loop, [ + 'tls' => [ + 'verify_peer' => $this->tls === self::TLS_MODE_VERIFY_FULL, + 'verify_peer_name' => $this->tls === self::TLS_MODE_VERIFY_FULL, + 'allow_self_signed' => $this->tls !== self::TLS_MODE_VERIFY_FULL, + ] + $this->tlsConnectorFlags, + ]); + // We always url `opportunistic+tls` as scheme because the logic required for using `tcp` on TLS `disable` + // mode is more complex than worth it when connecting to the server. And the `SecureConnector` gives us a + // plaint text connection with all TLS flags already set and ready to use for all the other modes. + $this->uri = 'opportunistic+tls://' . $parameters['host'] . ':' . $parameters['port']; $this->notificationSubject = new Subject(); $this->cancelPending = false; $this->cancelRequested = false; @@ -191,23 +232,43 @@ private function start() $this->connStatus = static::CONNECTION_STARTED; $this->socket->connect($this->uri)->then( - function (DuplexStreamInterface $stream) { - $this->stream = $stream; - $this->connStatus = static::CONNECTION_MADE; - - $stream->on('close', [$this, 'onClose']); + function (OpportunisticTlsConnectionInterface $stream) { + (new Promise(function (callable $resolve, callable $reject) use ($stream) { + if ($this->tls !== self::TLS_MODE_DISABLE) { + first($stream)->then(function ($data) use ($resolve, $reject, $stream) { + if (trim($data) === 'S') { + $stream->enableEncryption()->then($resolve, $reject); + return; + } + + if (in_array($this->tls, self::TLS_MODE_LIST_REQUIRED)) { + $reject(new \RuntimeException('Failed to encrypt connection while required')); + return; + } + + $resolve($stream); + }, $reject); + + $ssl = new SSLRequest(); + $stream->write($ssl->encodedMessage()); + return; + } - $stream->on('data', [$this, 'onData']); + $resolve($stream); + }))->then(function (DuplexStreamInterface $stream) { + $this->stream = $stream; + $this->connStatus = static::CONNECTION_MADE; - // $ssl = new SSLRequest(); - // $stream->write($ssl->encodedMessage()); + $stream->on('close', [$this, 'onClose']); + $stream->on('data', [$this, 'onData']); - $startupParameters = $this->parameters; - unset($startupParameters['host'], $startupParameters['port']); + $startupParameters = $this->parameters; + unset($startupParameters['host'], $startupParameters['port']); - $startup = new StartupMessage(); - $startup->setParameters($startupParameters); - $stream->write($startup->encodedMessage()); + $startup = new StartupMessage(); + $startup->setParameters($startupParameters); + $stream->write($startup->encodedMessage()); + })->done(); }, function ($e) { // connection error @@ -596,11 +657,11 @@ function (ObserverInterface $observer, SchedulerInterface $scheduler = null) use $this->processQueue(); return new CallbackDisposable(function () use ($q) { - if ($this->currentCommand === $q && $q->isActive()) { - $this->cancelRequested = true; - } - $q->cancel(); - }); + if ($this->currentCommand === $q && $q->isActive()) { + $this->cancelRequested = true; + } + $q->cancel(); + }); } );