@@ -113,14 +113,19 @@ public function getConnectedNodes(Set $shards): Set {
113
113
* @return Vector<array{node:string,shards:Set<int>}>
114
114
*/
115
115
public function getExternalNodeShards (Set $ shards ): Vector {
116
+ // There is a case when we have ONLY distributed table with shards
117
+ // but all shards are remote agents and nothing locally
118
+ $ shardsCond = $ shards ->count () > 0
119
+ ? "AND ANY(shards) not in ( {$ shards ->join (', ' )}) "
120
+ : '' ;
121
+
116
122
$ query = "
117
123
SELECT node, shards FROM {$ this ->table }
118
124
WHERE
119
125
cluster = ' {$ this ->cluster ->name }'
120
126
AND
121
127
table = ' {$ this ->name }'
122
- AND
123
- ANY(shards) not in ( {$ shards ->join (', ' )})
128
+ $ shardsCond
124
129
ORDER BY id ASC
125
130
" ;
126
131
@@ -192,27 +197,21 @@ public function shard(
192
197
$ nodes = new Set ;
193
198
194
199
$ schema = $ this ->configureNodeShards ($ shardCount , $ replicationFactor );
195
- $ reduceFn = function (Map $ clusterMap , array $ row ) use ($ queue , $ replicationFactor , &$ nodes , &$ nodeShardsMap ) {
200
+ $ reduceFn = function (Map $ clusterMap , array $ row ) use ($ queue , &$ nodes , &$ nodeShardsMap ) {
196
201
/** @var Map<string,Cluster> $clusterMap */
197
202
$ nodes ->add ($ row ['node ' ]);
198
203
$ nodeShardsMap [$ row ['node ' ]] = $ row ['shards ' ];
199
204
200
205
foreach ($ row ['shards ' ] as $ shard ) {
201
206
$ connectedNodes = $ this ->getConnectedNodes (new Set ([$ shard ]));
202
207
203
- if ($ replicationFactor > 1 ) {
204
- $ clusterMap = $ this ->handleReplication (
205
- $ row ['node ' ],
206
- $ queue ,
207
- $ connectedNodes ,
208
- $ clusterMap ,
209
- $ shard
210
- );
211
- } else {
212
- // If no replication, add create table shard SQL to queue
213
- $ sql = $ this ->getCreateTableShardSQL ($ shard );
214
- $ queue ->add ($ row ['node ' ], $ sql );
215
- }
208
+ $ clusterMap = $ this ->handleReplication (
209
+ $ row ['node ' ],
210
+ $ queue ,
211
+ $ connectedNodes ,
212
+ $ clusterMap ,
213
+ $ shard
214
+ );
216
215
}
217
216
218
217
return $ clusterMap ;
@@ -227,10 +226,7 @@ public function shard(
227
226
/** @var Set<int> */
228
227
$ queueIds = new Set ;
229
228
foreach ($ nodeShardsMap as $ node => $ shards ) {
230
- // Do nothing when no shards present for this node
231
- if (!$ shards ->count ()) {
232
- continue ;
233
- }
229
+ // Even when no shards, we still create distributed table
234
230
$ sql = $ this ->getCreateShardedTableSQL ($ shards );
235
231
$ queueId = $ queue ->add ($ node , $ sql );
236
232
$ queueIds ->add ($ queueId );
@@ -311,6 +307,13 @@ protected function handleReplication(
311
307
Map $ clusterMap ,
312
308
int $ shard
313
309
): Map {
310
+ // If no replication, add create table shard SQL to queue
311
+ if ($ connectedNodes ->count () === 1 ) {
312
+ $ sql = $ this ->getCreateTableShardSQL ($ shard );
313
+ $ queue ->add ($ node , $ sql );
314
+ return $ clusterMap ;
315
+ }
316
+
314
317
$ clusterName = static ::getClusterName ($ connectedNodes );
315
318
$ hasCluster = isset ($ clusterMap [$ clusterName ]);
316
319
if ($ hasCluster ) {
@@ -354,9 +357,6 @@ public function rebalance(Queue $queue): void {
354
357
$ schema = $ this ->getShardSchema ();
355
358
$ allNodes = $ this ->cluster ->getNodes ();
356
359
$ inactiveNodes = $ this ->cluster ->getInactiveNodes ();
357
- if (!$ inactiveNodes ->count ()) {
358
- return ;
359
- }
360
360
$ activeNodes = $ allNodes ->diff ($ inactiveNodes );
361
361
$ newSchema = Util::rebalanceShardingScheme ($ schema , $ activeNodes );
362
362
@@ -376,6 +376,7 @@ public function rebalance(Queue $queue): void {
376
376
$ clusterMap [$ clusterName ] = $ cluster ;
377
377
}
378
378
379
+ // Get affected schema with nodes that are out
379
380
$ affectedSchema = $ schema ->filter (
380
381
fn ($ row ) => $ inactiveNodes ->contains ($ row ['node ' ])
381
382
);
@@ -387,7 +388,7 @@ public function rebalance(Queue $queue): void {
387
388
// First thing first, remove from inactive node using the queue
388
389
$ this ->cleanUpNode ($ queue , $ row ['node ' ], $ row ['shards ' ], $ processedTables );
389
390
390
- // Do real rebaliance now
391
+ // Do real rebalancing now
391
392
foreach ($ row ['shards ' ] as $ shard ) {
392
393
/** @var Set<string> */
393
394
$ nodesForShard = new Set ;
@@ -438,20 +439,34 @@ public function rebalance(Queue $queue): void {
438
439
439
440
/** @var Set<int> */
440
441
$ queueIds = new Set ;
442
+ foreach ($ clusterMap as $ cluster ) {
443
+ $ cluster ->processPendingTables ($ queue );
444
+ }
445
+
446
+ // Handle new nodes that need shard creation
447
+ $ originalNodes = new Set ($ schema ->map (fn ($ row ) => $ row ['node ' ]));
448
+ $ newNodes = $ activeNodes ->diff ($ originalNodes );
449
+
450
+
451
+
452
+ if ($ newNodes ->count () > 0 ) {
453
+ $ this ->handleShardCreationForRebalancing ($ queue , $ schema , $ newSchema , $ clusterMap );
454
+ }
455
+
456
+ // At this case we update schema
457
+ // before creating distributed table
458
+ $ this ->updateScheme ($ newSchema );
441
459
foreach ($ newSchema as $ row ) {
442
- $ sql = "DROP TABLE {$ this ->name } OPTION force=1 " ;
460
+ // We should drop distributed table everywhere
461
+ // even when node has ONLY it but may have no shards on it
462
+ $ sql = "DROP TABLE IF EXISTS {$ this ->name } OPTION force=1 " ;
443
463
$ queueId = $ queue ->add ($ row ['node ' ], $ sql );
444
464
$ queueIds ->add ($ queueId );
445
- // Do nothing when no shards present for this node
446
- if (!$ row ['shards ' ]->count ()) {
447
- continue ;
448
- }
465
+
449
466
$ sql = $ this ->getCreateShardedTableSQL ($ row ['shards ' ]);
450
467
$ queueId = $ queue ->add ($ row ['node ' ], $ sql );
451
468
$ queueIds ->add ($ queueId );
452
469
}
453
-
454
- $ this ->updateScheme ($ newSchema );
455
470
} catch (\Throwable $ t ) {
456
471
var_dump ($ t ->getMessage ());
457
472
}
@@ -659,10 +674,12 @@ protected function getCreateShardedTableSQL(Set $shards): string {
659
674
$ map = new Map ;
660
675
foreach ($ nodes as $ row ) {
661
676
foreach ($ row ['shards ' ] as $ shard ) {
662
- $ map [$ shard ] ??= new Set ;
677
+ if (!$ map ->hasKey ($ shard )) {
678
+ $ map [$ shard ] = new Set ();
679
+ }
663
680
$ shardName = $ this ->getShardName ($ shard );
664
- // @phpstan-ignore-next-line
665
- $ map [$ shard ]->add ("{$ row ['node ' ]}: {$ shardName }" );
681
+
682
+ $ map [$ shard ]? ->add("{$ row ['node ' ]}: {$ shardName }" );
666
683
}
667
684
}
668
685
@@ -738,6 +755,105 @@ public function setup(): void {
738
755
* @return Set<int>
739
756
*/
740
757
protected static function parseShards (string $ shards ): Set {
741
- return new Set (array_map ('intval ' , explode (', ' , $ shards )));
758
+ return trim ($ shards ) !== ''
759
+ ? new Set (array_map ('intval ' , explode (', ' , $ shards )))
760
+ : new Set
761
+ ;
762
+ }
763
+
764
+ /**
765
+ * Handle shard creation for rebalancing (all nodes that need new shards)
766
+ *
767
+ * SAFETY: Respects original replication factor - with RF=1, only creates shard tables
768
+ * but does NOT set up replication to prevent data movement and potential data loss.
769
+ *
770
+ * @param Queue $queue
771
+ * @param Vector<array{node:string,shards:Set<int>,connections:Set<string>}> $oldSchema
772
+ * @param Vector<array{node:string,shards:Set<int>,connections:Set<string>}> $newSchema
773
+ * @param Map<string,Cluster> $clusterMap
774
+ * @return void
775
+ */
776
+ protected function handleShardCreationForRebalancing (
777
+ Queue $ queue ,
778
+ Vector $ oldSchema ,
779
+ Vector $ newSchema ,
780
+ Map $ clusterMap
781
+ ): void {
782
+ // Calculate original replication factor to ensure safe operations
783
+ $ originalRf = $ this ->calculateReplicationFactor ($ oldSchema );
784
+
785
+ // Create map of old schema for comparison
786
+ /** @var Map<string,Set<int>> */
787
+ $ oldShardMap = new Map ();
788
+ foreach ($ oldSchema as $ row ) {
789
+ $ oldShardMap [$ row ['node ' ]] = $ row ['shards ' ];
790
+ }
791
+
792
+ foreach ($ newSchema as $ row ) {
793
+ $ oldShards = $ oldShardMap ->get ($ row ['node ' ], new Set ());
794
+ $ newShards = $ row ['shards ' ];
795
+ $ shardsToCreate = $ newShards ->diff ($ oldShards );
796
+
797
+ if ($ shardsToCreate ->isEmpty ()) {
798
+ continue ;
799
+ }
800
+
801
+ // Create missing shard tables on this node
802
+ foreach ($ shardsToCreate as $ shard ) {
803
+ $ sql = $ this ->getCreateTableShardSQL ($ shard );
804
+ $ queue ->add ($ row ['node ' ], $ sql );
805
+
806
+ // Find nodes that already have this shard in old schema for replication
807
+ $ existingNodesWithShard = $ oldSchema ->filter (
808
+ fn ($ existingRow ) => $ existingRow ['shards ' ]->contains ($ shard )
809
+ );
810
+
811
+ // If there are existing nodes with this shard, set up replication
812
+ // BUT only if original RF > 1 (safe to replicate)
813
+ if ($ existingNodesWithShard ->count () <= 0 || $ originalRf === 1 ) {
814
+ continue ;
815
+ }
816
+
817
+ $ sourceNode = $ existingNodesWithShard ->first ()['node ' ];
818
+ $ connectedNodes = new Set ([$ row ['node ' ], $ sourceNode ]);
819
+
820
+ // Set up cluster replication for this shard
821
+ $ this ->handleReplication (
822
+ $ sourceNode ,
823
+ $ queue ,
824
+ $ connectedNodes ,
825
+ $ clusterMap ,
826
+ $ shard
827
+ );
828
+ }
829
+ }
830
+ }
831
+
832
+ /**
833
+ * Calculate the original replication factor from the schema
834
+ * @param Vector<array{node:string,shards:Set<int>,connections:Set<string>}> $schema
835
+ * @return int
836
+ */
837
+ private function calculateReplicationFactor (Vector $ schema ): int {
838
+ if ($ schema ->isEmpty ()) {
839
+ return 1 ;
840
+ }
841
+
842
+ // Count how many nodes have each shard
843
+ $ shardCounts = new Map ();
844
+
845
+ foreach ($ schema as $ row ) {
846
+ foreach ($ row ['shards ' ] as $ shard ) {
847
+ $ currentCount = $ shardCounts ->get ($ shard , 0 );
848
+ $ shardCounts ->put ($ shard , $ currentCount + 1 );
849
+ }
850
+ }
851
+
852
+ if ($ shardCounts ->isEmpty ()) {
853
+ return 1 ;
854
+ }
855
+
856
+ // The replication factor is the maximum count of any shard
857
+ return max ($ shardCounts ->values ()->toArray ());
742
858
}
743
859
}
0 commit comments