Skip to content

Commit

Permalink
Add flags to track replica queries and synthetic requests
Browse files Browse the repository at this point in the history
  • Loading branch information
muglug committed Jun 1, 2023
1 parent 87cea53 commit e3213f7
Show file tree
Hide file tree
Showing 12 changed files with 205 additions and 37 deletions.
7 changes: 6 additions & 1 deletion src/Index.hack
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
namespace Slack\SQLFake;

final class Index {
public function __construct(public string $name, public string $type, public keyset<string> $fields) {}
public function __construct(
public string $name,
public string $type,
public keyset<string> $fields,
public bool $vitess_sharding_key = false,
) {}
}
34 changes: 28 additions & 6 deletions src/Query/DeleteQuery.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public function __construct(public string $sql) {}
public function execute(AsyncMysqlConnection $conn): int {
$this->fromClause as nonnull;
list($database, $table_name) = Query::parseTableName($conn, $this->fromClause['name']);
$data = $conn->getServer()->getTableData($database, $table_name) ?? tuple(dict[], dict[]);
$table_data = $conn->getServer()->getTableData($database, $table_name) ?? tuple(dict[], dict[], keyset[]);
$schema = QueryContext::getSchema($database, $table_name);

Metrics::trackQuery(QueryType::DELETE, $conn->getServer()->name, $table_name, $this->sql);
Expand All @@ -26,10 +26,26 @@ public function execute(AsyncMysqlConnection $conn): int {
}
}

return $this->applyWhere($conn, $data[0], $data[1], $columns, $schema?->indexes)
return $this->applyWhere(
$conn,
$table_data[0],
$table_data[1],
$table_data[2] ?? keyset[],
$columns,
$schema?->indexes,
)
|> $this->applyOrderBy($conn, $$)
|> $this->applyLimit($$)
|> $this->applyDelete($conn, $database, $table_name, $$, $data[0], $data[1], $schema);
|> $this->applyDelete(
$conn,
$database,
$table_name,
$$,
$table_data[0],
$table_data[1],
$table_data[2] ?? keyset[],
$schema,
);
}

/**
Expand All @@ -42,11 +58,14 @@ protected function applyDelete(
dataset $filtered_rows,
dataset $original_table,
index_refs $index_refs,
keyset<arraykey> $dirty_pks,
?TableSchema $table_schema,
): int {
$rows_to_delete = Keyset\keys($filtered_rows);
$remaining_rows =
Dict\filter_with_key($original_table, ($row_num, $_) ==> !C\contains_key($rows_to_delete, $row_num));
$remaining_rows = Dict\filter_with_key(
$original_table,
($row_num, $_) ==> !C\contains_key($rows_to_delete, $row_num),
);
$rows_affected = C\count($original_table) - C\count($remaining_rows);

if ($table_schema is nonnull) {
Expand All @@ -57,6 +76,7 @@ protected function applyDelete(
$table_schema->vitess_sharding->keyspace,
'INDEX',
keyset[$table_schema->vitess_sharding->sharding_key],
true,
);
}

Expand All @@ -70,11 +90,13 @@ protected function applyDelete(
$index_refs[$index_name] = $specific_index_refs;
}
}

unset($dirty_pks[$row_id]);
}
}

// write it back to the database
$conn->getServer()->saveTable($database, $table_name, $remaining_rows, $index_refs);
$conn->getServer()->saveTable($database, $table_name, $remaining_rows, $index_refs, $dirty_pks);
return $rows_affected;
}
}
28 changes: 19 additions & 9 deletions src/Query/FromClause.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,20 @@ public function aliasRecentExpression(string $name): void {
public function process(
AsyncMysqlConnection $conn,
string $sql,
): (dataset, index_refs, vec<Index>, dict<string, Column>) {
): (dataset, index_refs, keyset<arraykey>, vec<Index>, dict<string, Column>) {

$data = dict[];
$is_first_table = true;
$index_refs = dict[];
$dirty_pks = keyset[];

$indexes = vec[];
$columns = dict[];

foreach ($this->tables as $table) {
$schema = null;
$new_index_refs = dict[];
$new_dirty_pks = keyset[];
$new_indexes = vec[];

if (Shapes::keyExists($table, 'subquery')) {
Expand All @@ -63,12 +66,17 @@ public function process(
$name = $table['alias'] ?? $table_name;
$schema = QueryContext::getSchema($database, $table_name);
if ($schema === null && QueryContext::$strictSchemaMode) {
throw
new SQLFakeRuntimeException("Table $table_name not found in schema and strict mode is enabled");
throw new SQLFakeRuntimeException(
"Table $table_name not found in schema and strict mode is enabled",
);
}

list($res, $new_index_refs) =
$conn->getServer()->getTableData($database, $table_name) ?: tuple(dict[], dict[]);
$table_data = $conn->getServer()->getTableData($database, $table_name) ?:
tuple(dict[], dict[], keyset[]);

$res = $table_data[0];
$new_index_refs = $table_data[1];
$new_dirty_pks = $table_data[2] ?? keyset[];

if (C\count($this->tables) > 1) {
$new_index_refs = Dict\map_keys($new_index_refs, $k ==> $name.'.'.$k);
Expand All @@ -94,6 +102,7 @@ public function process(
$prefix.$schema->vitess_sharding->keyspace,
'INDEX',
keyset[$prefix.$schema->vitess_sharding->sharding_key],
true,
);
}

Expand All @@ -111,6 +120,7 @@ public function process(
}

$index_refs = Dict\merge($index_refs, $new_index_refs);
$dirty_pks = Keyset\union($dirty_pks, $new_dirty_pks);
}

$new_dataset = dict[];
Expand Down Expand Up @@ -156,10 +166,10 @@ public function process(

if ($data || !$is_first_table) {
// do the join here. based on join type, pass in $data and $res to filter. and aliases
list($data, $index_refs) = JoinProcessor::process(
list($data, $index_refs, $dirty_pks) = JoinProcessor::process(
$conn,
tuple($data, $index_refs),
tuple($new_dataset, $new_index_refs),
tuple($data, $index_refs, keyset[]),
tuple($new_dataset, $new_index_refs, $new_dirty_pks),
$name,
$table['join_type'],
$table['join_operator'] ?? null,
Expand All @@ -180,6 +190,6 @@ public function process(
}
}

return tuple($data, $index_refs, $indexes, $columns);
return tuple($data, $index_refs, $dirty_pks, $indexes, $columns);
}
}
14 changes: 12 additions & 2 deletions src/Query/InsertQuery.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ public function __construct(public string $table, public string $sql, public boo
*/
public function execute(AsyncMysqlConnection $conn): int {
list($database, $table_name) = Query::parseTableName($conn, $this->table);
list($table, $index_refs) = $conn->getServer()->getTableData($database, $table_name) ?? tuple(dict[], dict[]);
$data = $conn->getServer()->getTableData($database, $table_name) ?? tuple(dict[], dict[], keyset[]);

$table = $data[0];
$index_refs = $data[1];
$dirty_pks = $data[2] ?? keyset[];

Metrics::trackQuery(QueryType::INSERT, $conn->getServer()->name, $table_name, $this->sql);

Expand Down Expand Up @@ -65,6 +69,7 @@ public function execute(AsyncMysqlConnection $conn): int {
$table_schema->vitess_sharding->keyspace,
'INDEX',
keyset[$table_schema->vitess_sharding->sharding_key],
true,
);
}

Expand Down Expand Up @@ -113,6 +118,7 @@ public function execute(AsyncMysqlConnection $conn): int {
dict[$row_id => $existing_row],
$table,
$index_refs,
$dirty_pks,
$this->updateExpressions,
$table_schema,
$row,
Expand All @@ -135,12 +141,16 @@ public function execute(AsyncMysqlConnection $conn): int {
$index_refs[$index_name] = $specific_index_refs;
}

if (QueryContext::$inRequest) {
$dirty_pks[] = $primary_key;
}

$table[$primary_key] = $row;
$rows_affected++;
}

// write it back to the database
$conn->getServer()->saveTable($database, $table_name, $table, $index_refs);
$conn->getServer()->saveTable($database, $table_name, $table, $index_refs, $dirty_pks);
return $rows_affected;
}
}
35 changes: 33 additions & 2 deletions src/Query/JoinProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ public static function process(
$left_mappings = dict[];
$right_mappings = dict[];

$dirty_pks = keyset[];

switch ($join_type) {
case JoinType::JOIN:
case JoinType::STRAIGHT:
Expand All @@ -77,6 +79,9 @@ public static function process(
$left_mappings[$left_row_id][] = $insert_id;
$right_mappings[$right_row_id] ??= keyset[];
$right_mappings[$right_row_id][] = $insert_id;
if (isset($left_dataset[2][$left_row_id]) || isset($right_dataset[2][$right_row_id])) {
$dirty_pks[] = $insert_id;
}
}
}
}
Expand Down Expand Up @@ -104,6 +109,9 @@ public static function process(
$right_mappings[$right_row_id] ??= keyset[];
$right_mappings[$right_row_id][] = $insert_id;
$any_match = true;
if (isset($left_dataset[2][$left_row_id]) || isset($right_dataset[2][$right_row_id])) {
$dirty_pks[] = $insert_id;
}
}
}

Expand All @@ -120,6 +128,10 @@ public static function process(
$insert_id = C\count($out) - 1;
$left_mappings[$left_row_id] ??= keyset[];
$left_mappings[$left_row_id][] = $insert_id;

if (isset($left_dataset[2][$left_row_id])) {
$dirty_pks[] = $insert_id;
}
}
}
break;
Expand All @@ -146,6 +158,9 @@ public static function process(
$left_mappings[$left_row_id][] = $insert_id;
$right_mappings[$right_row_id] ??= keyset[];
$right_mappings[$right_row_id][] = $insert_id;
if (isset($left_dataset[2][$left_row_id]) || isset($right_dataset[2][$right_row_id])) {
$dirty_pks[] = $insert_id;
}
}
}

Expand All @@ -164,6 +179,9 @@ public static function process(
$left_mappings[$left_row_id][] = $insert_id;
$right_mappings[$right_row_id] ??= keyset[];
$right_mappings[$right_row_id][] = $insert_id;
if (isset($left_dataset[2][$left_row_id]) || isset($right_dataset[2][$right_row_id])) {
$dirty_pks[] = $insert_id;
}
}
}
break;
Expand All @@ -183,6 +201,9 @@ public static function process(
$left_mappings[$left_row_id][] = $insert_id;
$right_mappings[$right_row_id] ??= keyset[];
$right_mappings[$right_row_id][] = $insert_id;
if (isset($left_dataset[2][$left_row_id]) || isset($right_dataset[2][$right_row_id])) {
$dirty_pks[] = $insert_id;
}
}
}
}
Expand All @@ -198,7 +219,7 @@ public static function process(
$right_indexes,
);

return tuple(dict($out), $index_refs);
return tuple(dict($out), $index_refs, $dirty_pks);
}

/**
Expand Down Expand Up @@ -308,6 +329,7 @@ private static function processHashJoin(

$left_mappings = dict[];
$right_mappings = dict[];
$dirty_pks = keyset[];

switch ($join_type) {
case JoinType::JOIN:
Expand All @@ -322,6 +344,9 @@ private static function processHashJoin(
$left_mappings[$left_row_id][] = $insert_id;
$right_mappings[$k] ??= keyset[];
$right_mappings[$k][] = $insert_id;
if (isset($left_dataset[2][$left_row_id]) || isset($right_dataset[2][$k])) {
$dirty_pks[] = $insert_id;
}
}
}
break;
Expand All @@ -348,6 +373,9 @@ private static function processHashJoin(
$right_mappings[$k] ??= keyset[];
$right_mappings[$k][] = $insert_id;
$any_match = true;
if (isset($left_dataset[2][$left_row_id]) || isset($right_dataset[2][$k])) {
$dirty_pks[] = $insert_id;
}
}
}

Expand All @@ -364,6 +392,9 @@ private static function processHashJoin(
$insert_id = C\count($out) - 1;
$left_mappings[$left_row_id] ??= keyset[];
$left_mappings[$left_row_id][] = $insert_id;
if (isset($left_dataset[2][$left_row_id])) {
$dirty_pks[] = $insert_id;
}
}
}
break;
Expand All @@ -380,7 +411,7 @@ private static function processHashJoin(
$right_indexes,
);

return tuple(dict($out), $index_refs);
return tuple(dict($out), $index_refs, $dirty_pks);
}

private static function getIndexRefsFromMappings(
Expand Down
Loading

0 comments on commit e3213f7

Please sign in to comment.