Skip to content

Commit

Permalink
TLS Connection Support
Browse files Browse the repository at this point in the history
  • Loading branch information
WyriHaximus committed Dec 8, 2022
1 parent 696b766 commit efa59a1
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 22 deletions.
29 changes: 29 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,35 @@ $client->listen('some_channel')
$client->query("NOTIFY some_channel, 'Hello World'")->subscribe();
```

## Example - Connecting over TLS with CA certificate file
```php

$client = new PgAsync\Client([
"host" => "127.0.0.1",
"port" => "5432",
"user" => "matt",
"database" => "matt",
"tls" => "verify-full",
"tls_connector_flags" => [
"cafile" => "/path/to/ca.crt",
],
]);

$client->query('SELECT * FROM channel')->subscribe(
function ($row) {
var_dump($row);
},
function ($e) {
echo "Failed.\n";
},
function () {
echo "Complete.\n";
}
);


```

## Install
With [composer](https://getcomposer.org/) install into you project with:

Expand Down
11 changes: 9 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,18 @@
"php": ">=7.0.0",
"voryx/event-loop": "^3.0 || ^2.0.2",
"reactivex/rxphp": "^2.0",
"react/socket": "^1.0 || ^0.8 || ^0.7",
"react/promise-stream": "^1.5",
"react/socket": "dev-1.x-opportunistic-tls-connection as 1.999.999",
"evenement/evenement": "^2.0 | ^3.0"
},
"require-dev": {
"phpunit/phpunit": ">=8.5.23 || ^6.5.5",
"react/dns": "^1.0"
}
},
"repositories": [
{
"type": "vcs",
"url": "https://github.com/WyriHaximus-labs/socket"
}
]
}
103 changes: 83 additions & 20 deletions src/PgAsync/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use PgAsync\Command\Execute;
use PgAsync\Command\Parse;
use PgAsync\Command\PasswordMessage;
use PgAsync\Command\SSLRequest;
use PgAsync\Command\Sync;
use PgAsync\Command\Terminate;
use PgAsync\Message\Authentication;
Expand All @@ -30,9 +31,15 @@
use PgAsync\Message\ReadyForQuery;
use PgAsync\Message\RowDescription;
use PgAsync\Command\StartupMessage;
use React\EventLoop\Loop;
use React\EventLoop\LoopInterface;
use React\Promise\Promise;
use React\Socket\ConnectionInterface;
use React\Socket\Connector;
use React\Socket\ConnectorInterface;
use React\Socket\DelayedSpecialStreamEncryption;
use React\Socket\OpportunisticTlsConnectionInterface;
use React\Socket\StartTlsConnectionInterface;
use React\Stream\DuplexStreamInterface;
use Rx\Disposable\CallbackDisposable;
use Rx\Disposable\EmptyDisposable;
Expand All @@ -41,6 +48,8 @@
use Rx\ObserverInterface;
use Rx\SchedulerInterface;
use Rx\Subject\Subject;
use function React\Promise\resolve;
use function React\Promise\Stream\first;

class Connection extends EventEmitter
{
Expand Down Expand Up @@ -71,6 +80,16 @@ class Connection extends EventEmitter
const CONNECTION_NEEDED = 8; /* Internal state: connect() needed */
const CONNECTION_CLOSED = 9;

// Reference table: https://www.postgresql.org/docs/current/libpq-ssl.html#LIBPQ-SSL-PROTECTION
const TLS_MODE_DISABLE = 'disable';
const TLS_MODE_ALLOW = 'allow';
const TLS_MODE_PREFER = 'prefer';
const TLS_MODE_REQUIRE = 'require';
const TLS_MODE_VERIFY_CA = 'verify-ca';
const TLS_MODE_VERIFY_FULL = 'verify-full';
const TLS_MODE_LIST_FULL = [self::TLS_MODE_DISABLE, self::TLS_MODE_ALLOW, self::TLS_MODE_PREFER, self::TLS_MODE_REQUIRE, self::TLS_MODE_VERIFY_CA, self::TLS_MODE_VERIFY_FULL];
const TLS_MODE_LIST_REQUIRED = [self::TLS_MODE_REQUIRE, self::TLS_MODE_VERIFY_CA, self::TLS_MODE_VERIFY_FULL];

private $queryState;
private $queryType;
private $connStatus;
Expand Down Expand Up @@ -129,6 +148,8 @@ class Connection extends EventEmitter

/** @var bool */
private $auto_disconnect = false;
private $tls = self::TLS_MODE_PREFER;
private $tlsConnectorFlags = [];
private $password;

public function __construct(array $parameters, LoopInterface $loop, ConnectorInterface $connector = null)
Expand All @@ -153,6 +174,19 @@ public function __construct(array $parameters, LoopInterface $loop, ConnectorInt
unset($parameters['password']);
}

if (array_key_exists('tls', $parameters)) {
if (!in_array($this->tls, self::TLS_MODE_LIST_FULL)) {
throw new \InvalidArgumentException('TLS mode must be one off "' . implode(', ', self::TLS_MODE_LIST_FULL) . ' but got "' . $parameters['tls'] . '" instead');
}
$this->tls = $parameters['tls'];
unset($parameters['tls']);
}

if (array_key_exists('tls_connector_flags', $parameters)) {
$this->tlsConnectorFlags = $parameters['tls_connector_flags'];
unset($parameters['tls_connector_flags']);
}

if (isset($parameters['auto_disconnect'])) {
$this->auto_disconnect = $parameters['auto_disconnect'];
unset($parameters['auto_disconnect']);
Expand All @@ -167,8 +201,17 @@ public function __construct(array $parameters, LoopInterface $loop, ConnectorInt
$this->queryState = static::STATE_BUSY;
$this->queryType = static::QUERY_SIMPLE;
$this->connStatus = static::CONNECTION_NEEDED;
$this->socket = $connector ?: new Connector($loop);
$this->uri = 'tcp://' . $parameters['host'] . ':' . $parameters['port'];
$this->socket = $connector ?: new Connector($loop, [
'tls' => [
'verify_peer' => $this->tls === self::TLS_MODE_VERIFY_FULL,
'verify_peer_name' => $this->tls === self::TLS_MODE_VERIFY_FULL,
'allow_self_signed' => $this->tls !== self::TLS_MODE_VERIFY_FULL,
] + $this->tlsConnectorFlags,
]);
// We always url `opportunistic+tls` as scheme because the logic required for using `tcp` on TLS `disable`
// mode is more complex than worth it when connecting to the server. And the `SecureConnector` gives us a
// plaint text connection with all TLS flags already set and ready to use for all the other modes.
$this->uri = 'opportunistic+tls://' . $parameters['host'] . ':' . $parameters['port'];
$this->notificationSubject = new Subject();
$this->cancelPending = false;
$this->cancelRequested = false;
Expand All @@ -185,23 +228,43 @@ private function start()
$this->connStatus = static::CONNECTION_STARTED;

$this->socket->connect($this->uri)->then(
function (DuplexStreamInterface $stream) {
$this->stream = $stream;
$this->connStatus = static::CONNECTION_MADE;

$stream->on('close', [$this, 'onClose']);
function (OpportunisticTlsConnectionInterface $stream) {
(new Promise(function (callable $resolve, callable $reject) use ($stream) {
if ($this->tls !== self::TLS_MODE_DISABLE) {
first($stream)->then(function ($data) use ($resolve, $reject, $stream) {
if (trim($data) === 'S') {
$stream->enableEncryption()->then($resolve, $reject);
return;
}

if (in_array($this->tls, self::TLS_MODE_LIST_REQUIRED)) {
$reject(new \RuntimeException('Failed to encrypt connection while required'));
return;
}

$resolve($stream);
}, $reject);

$ssl = new SSLRequest();
$stream->write($ssl->encodedMessage());
return;
}

$stream->on('data', [$this, 'onData']);
$resolve($stream);
}))->then(function (DuplexStreamInterface $stream) {
$this->stream = $stream;
$this->connStatus = static::CONNECTION_MADE;

// $ssl = new SSLRequest();
// $stream->write($ssl->encodedMessage());
$stream->on('close', [$this, 'onClose']);
$stream->on('data', [$this, 'onData']);

$startupParameters = $this->parameters;
unset($startupParameters['host'], $startupParameters['port']);
$startupParameters = $this->parameters;
unset($startupParameters['host'], $startupParameters['port']);

$startup = new StartupMessage();
$startup->setParameters($startupParameters);
$stream->write($startup->encodedMessage());
$startup = new StartupMessage();
$startup->setParameters($startupParameters);
$stream->write($startup->encodedMessage());
})->done();
},
function ($e) {
// connection error
Expand Down Expand Up @@ -566,11 +629,11 @@ function (ObserverInterface $observer, SchedulerInterface $scheduler = null) use
$this->processQueue();

return new CallbackDisposable(function () use ($q) {
if ($this->currentCommand === $q && $q->isActive()) {
$this->cancelRequested = true;
}
$q->cancel();
});
if ($this->currentCommand === $q && $q->isActive()) {
$this->cancelRequested = true;
}
$q->cancel();
});
}
);

Expand Down

0 comments on commit efa59a1

Please sign in to comment.