@@ -443,6 +443,16 @@ public function rebalance(Queue $queue): void {
443
443
$ cluster ->processPendingTables ($ queue );
444
444
}
445
445
446
+ // Handle new nodes that need shard creation
447
+ $ originalNodes = new Set ($ schema ->map (fn ($ row ) => $ row ['node ' ]));
448
+ $ newNodes = $ activeNodes ->diff ($ originalNodes );
449
+
450
+
451
+
452
+ if ($ newNodes ->count () > 0 ) {
453
+ $ this ->handleShardCreationForRebalancing ($ queue , $ schema , $ newSchema , $ clusterMap );
454
+ }
455
+
446
456
// At this case we update schema
447
457
// before creating distributed table
448
458
$ this ->updateScheme ($ newSchema );
@@ -664,10 +674,12 @@ protected function getCreateShardedTableSQL(Set $shards): string {
664
674
$ map = new Map ;
665
675
foreach ($ nodes as $ row ) {
666
676
foreach ($ row ['shards ' ] as $ shard ) {
667
- $ map [$ shard ] ??= new Set ;
677
+ if (!$ map ->hasKey ($ shard )) {
678
+ $ map [$ shard ] = new Set ();
679
+ }
668
680
$ shardName = $ this ->getShardName ($ shard );
669
- // @phpstan-ignore-next-line
670
- $ map [$ shard ]->add ("{$ row ['node ' ]}: {$ shardName }" );
681
+
682
+ $ map [$ shard ]? ->add("{$ row ['node ' ]}: {$ shardName }" );
671
683
}
672
684
}
673
685
@@ -748,4 +760,61 @@ protected static function parseShards(string $shards): Set {
748
760
: new Set
749
761
;
750
762
}
763
+
764
+ /**
765
+ * Handle shard creation for rebalancing (all nodes that need new shards)
766
+ * @param Queue $queue
767
+ * @param Vector<array{node:string,shards:Set<int>,connections:Set<string>}> $oldSchema
768
+ * @param Vector<array{node:string,shards:Set<int>,connections:Set<string>}> $newSchema
769
+ * @param Map<string,Cluster> $clusterMap
770
+ * @return void
771
+ */
772
+ protected function handleShardCreationForRebalancing (Queue $ queue , Vector $ oldSchema , Vector $ newSchema , Map $ clusterMap ): void {
773
+ // Create map of old schema for comparison
774
+ /** @var Map<string,Set<int>> */
775
+ $ oldShardMap = new Map ();
776
+ foreach ($ oldSchema as $ row ) {
777
+ $ oldShardMap [$ row ['node ' ]] = $ row ['shards ' ];
778
+ }
779
+
780
+ foreach ($ newSchema as $ row ) {
781
+ $ oldShards = $ oldShardMap ->get ($ row ['node ' ], new Set ());
782
+ $ newShards = $ row ['shards ' ];
783
+ $ shardsToCreate = $ newShards ->diff ($ oldShards );
784
+
785
+ if ($ shardsToCreate ->isEmpty ()) {
786
+ continue ;
787
+ }
788
+
789
+ // Create missing shard tables on this node
790
+ foreach ($ shardsToCreate as $ shard ) {
791
+ $ sql = $ this ->getCreateTableShardSQL ($ shard );
792
+ $ queue ->add ($ row ['node ' ], $ sql );
793
+
794
+ // Find nodes that already have this shard in old schema for replication
795
+ $ existingNodesWithShard = $ oldSchema ->filter (
796
+ fn ($ existingRow ) => $ existingRow ['shards ' ]->contains ($ shard )
797
+ );
798
+
799
+ // If there are existing nodes with this shard, set up replication
800
+ if ($ existingNodesWithShard ->count () <= 0 ) {
801
+ continue ;
802
+ }
803
+
804
+ $ sourceNode = $ existingNodesWithShard ->first ()['node ' ];
805
+ $ connectedNodes = new Set ([$ row ['node ' ], $ sourceNode ]);
806
+
807
+
808
+
809
+ // Set up cluster replication for this shard
810
+ $ this ->handleReplication (
811
+ $ sourceNode ,
812
+ $ queue ,
813
+ $ connectedNodes ,
814
+ $ clusterMap ,
815
+ $ shard
816
+ );
817
+ }
818
+ }
819
+ }
751
820
}
0 commit comments