Skip to content

Commit

Permalink
PHPLIB-760: Force primary for $out/$merge if any servers are pre-5.0 (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
jmikola authored Dec 6, 2021
1 parent f5133ad commit 9e0da59
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 36 deletions.
21 changes: 3 additions & 18 deletions src/Collection.php
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,6 @@ class Collection
/** @var integer */
private static $wireVersionForReadConcernWithWriteStage = 8;

/** @var integer */
private static $wireVersionForSecondarySupportsWriteStage = 13;

/** @var string */
private $collectionName;

Expand Down Expand Up @@ -228,21 +225,9 @@ public function aggregate(array $pipeline, array $options = [])
$options['readPreference'] = $this->readPreference;
}

$server = select_server($this->manager, $options);

/* If a write stage is being used with a read preference (explicit or
* inherited), check that the wire version supports it. If not, force a
* primary read preference and select a new server if necessary. */
if (
$hasWriteStage && isset($options['readPreference']) &&
! server_supports_feature($server, self::$wireVersionForSecondarySupportsWriteStage)
) {
$options['readPreference'] = new ReadPreference(ReadPreference::RP_PRIMARY);

if ($server->isSecondary()) {
$server = select_server($this->manager, $options);
}
}
$server = $hasWriteStage
? select_server_for_aggregate_write_stage($this->manager, $options)
: select_server($this->manager, $options);

/* MongoDB 4.2 and later supports a read concern when an $out stage is
* being used, but earlier versions do not.
Expand Down
21 changes: 3 additions & 18 deletions src/Database.php
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,6 @@ class Database
/** @var integer */
private static $wireVersionForReadConcernWithWriteStage = 8;

/** @var integer */
private static $wireVersionForSecondarySupportsWriteStage = 13;

/** @var string */
private $databaseName;

Expand Down Expand Up @@ -209,21 +206,9 @@ public function aggregate(array $pipeline, array $options = [])
$options['readPreference'] = $this->readPreference;
}

$server = select_server($this->manager, $options);

/* If a write stage is being used with a read preference (explicit or
* inherited), check that the wire version supports it. If not, force a
* primary read preference and select a new server if necessary. */
if (
$hasWriteStage && isset($options['readPreference']) &&
! server_supports_feature($server, self::$wireVersionForSecondarySupportsWriteStage)
) {
$options['readPreference'] = new ReadPreference(ReadPreference::RP_PRIMARY);

if ($server->isSecondary()) {
$server = select_server($this->manager, $options);
}
}
$server = $hasWriteStage
? select_server_for_aggregate_write_stage($this->manager, $options)
: select_server($this->manager, $options);

/* MongoDB 4.2 and later supports a read concern when an $out stage is
* being used, but earlier versions do not.
Expand Down
73 changes: 73 additions & 0 deletions src/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

use Exception;
use MongoDB\BSON\Serializable;
use MongoDB\Driver\Exception\RuntimeException as DriverRuntimeException;
use MongoDB\Driver\Manager;
use MongoDB\Driver\ReadPreference;
use MongoDB\Driver\Server;
Expand All @@ -42,6 +43,32 @@
use function reset;
use function substr;

/**
* Check whether all servers support executing a write stage on a secondary.
*
* @internal
* @param Server[] $servers
*/
function all_servers_support_write_stage_on_secondary(array $servers): bool
{
/* Write stages on secondaries are technically supported by FCV 4.4, but the
* CRUD spec requires all 5.0+ servers since FCV is not tracked by SDAM. */
static $wireVersionForWriteStageOnSecondary = 13;

foreach ($servers as $server) {
// We can assume that load balancers only front 5.0+ servers
if ($server->getType() === Server::TYPE_LOAD_BALANCER) {
continue;
}

if (! server_supports_feature($server, $wireVersionForWriteStageOnSecondary)) {
return false;
}
}

return true;
}

/**
* Applies a type map to a document.
*
Expand Down Expand Up @@ -459,3 +486,49 @@ function select_server(Manager $manager, array $options): Server

return $manager->selectServer($readPreference);
}

/**
* Performs server selection for an aggregate operation with a write stage. The
* $options parameter may be modified by reference if a primary read preference
* must be forced due to the existence of pre-5.0 servers in the topology.
*
* @internal
* @see https://github.com/mongodb/specifications/blob/master/source/crud/crud.rst#aggregation-pipelines-with-write-stages
*/
function select_server_for_aggregate_write_stage(Manager $manager, array &$options): Server
{
$readPreference = extract_read_preference_from_options($options);

/* If there is either no read preference or a primary read preference, there
* is no special server selection logic to apply. */
if ($readPreference === null || $readPreference->getMode() === ReadPreference::RP_PRIMARY) {
return select_server($manager, $options);
}

$server = null;
$serverSelectionError = null;

try {
$server = select_server($manager, $options);
} catch (DriverRuntimeException $serverSelectionError) {
}

/* If any pre-5.0 servers exist in the topology, force a primary read
* preference and repeat server selection if it previously failed or
* selected a secondary. */
if (! all_servers_support_write_stage_on_secondary($manager->getServers())) {
$options['readPreference'] = new ReadPreference(ReadPreference::RP_PRIMARY);

if ($server === null || $server->isSecondary()) {
return select_server($manager, $options);
}
}

/* If the topology only contains 5.0+ servers, we should either return the
* previously selected server or propagate the server selection error. */
if ($serverSelectionError !== null) {
throw $serverSelectionError;
}

return $server;
}

0 comments on commit 9e0da59

Please sign in to comment.