@@ -2415,6 +2415,46 @@ func TestLogicalReplicationGatewayRoute(t *testing.T) {
2415
2415
require .Empty (t , progress .Details .(* jobspb.Progress_LogicalReplication ).LogicalReplication .PartitionConnUris )
2416
2416
}
2417
2417
2418
+ func TestMismatchColIDs (t * testing.T ) {
2419
+ defer leaktest .AfterTest (t )()
2420
+ skip .UnderDeadlock (t )
2421
+ defer log .Scope (t ).Close (t )
2422
+
2423
+ ctx := context .Background ()
2424
+ tc , s , sqlA , sqlB := setupLogicalTestServer (t , ctx , testClusterBaseClusterArgs , 1 )
2425
+ defer tc .Stopper ().Stop (ctx )
2426
+
2427
+ dbBURL := replicationtestutils .GetExternalConnectionURI (t , s , s , serverutils .DBName ("b" ))
2428
+
2429
+ createStmt := "CREATE TABLE foo (pk int primary key, payload string)"
2430
+ sqlA .Exec (t , createStmt )
2431
+ sqlA .Exec (t , "ALTER TABLE foo ADD COLUMN baz INT DEFAULT 2" )
2432
+
2433
+ // Insert some data into foo
2434
+ sqlA .Exec (t , "INSERT INTO foo VALUES (1, 'hello')" )
2435
+ sqlA .Exec (t , "INSERT INTO foo VALUES (2, 'world')" )
2436
+
2437
+ sqlB .Exec (t , createStmt )
2438
+ sqlB .Exec (t , "ALTER TABLE foo ADD COLUMN bar INT DEFAULT 2" )
2439
+
2440
+ sqlB .Exec (t , "ALTER TABLE foo ADD COLUMN baz INT DEFAULT 2" )
2441
+ sqlB .Exec (t , "INSERT INTO foo VALUES (3, 'hello', 3)" )
2442
+ sqlB .Exec (t , "ALTER TABLE foo DROP COLUMN bar" )
2443
+ sqlB .Exec (t , "INSERT INTO foo VALUES (4, 'world')" )
2444
+
2445
+ // LDR immediate mode creation should fail because of mismatched column IDs.
2446
+ sqlA .ExpectErr (t ,
2447
+ "destination table foo column baz has ID 3, but the source table foo has ID 4" ,
2448
+ "CREATE LOGICAL REPLICATION STREAM FROM TABLE foo ON $1 INTO TABLE foo WITH MODE = 'immediate'" , dbBURL .String ())
2449
+
2450
+ // LDR validated mode creation should succeed because the SQL writer supports mismatched column IDs.
2451
+ var jobID jobspb.JobID
2452
+ sqlA .QueryRow (t , "CREATE LOGICAL REPLICATION STREAM FROM TABLE foo ON $1 INTO TABLE foo WITH MODE = 'validated'" , dbBURL .String ()).Scan (& jobID )
2453
+
2454
+ now := s .Clock ().Now ()
2455
+ WaitUntilReplicatedTime (t , now , sqlA , jobID )
2456
+ }
2457
+
2418
2458
// TestLogicalReplicationCreationChecks verifies that we check that the table
2419
2459
// schemas are compatible when creating the replication stream.
2420
2460
func TestLogicalReplicationCreationChecks (t * testing.T ) {
@@ -2437,40 +2477,33 @@ func TestLogicalReplicationCreationChecks(t *testing.T) {
2437
2477
2438
2478
dbBURL := replicationtestutils .GetExternalConnectionURI (t , s , s , serverutils .DBName ("b" ))
2439
2479
2480
+ expectErr := func (t * testing.T , tableName string , err string ) {
2481
+ t .Helper ()
2482
+ dbA .ExpectErr (t , err , fmt .Sprintf ("CREATE LOGICAL REPLICATION STREAM FROM TABLE %s ON $1 INTO TABLE %s WITH MODE = 'validated'" , tableName , tableName ), dbBURL .String ())
2483
+ replicationtestutils .WaitForAllProducerJobsToFail (t , dbB )
2484
+ }
2485
+
2440
2486
// Column families are not allowed.
2441
2487
dbA .Exec (t , "ALTER TABLE tab ADD COLUMN new_col INT NOT NULL CREATE FAMILY f1" )
2442
2488
dbB .Exec (t , "ALTER TABLE b.tab ADD COLUMN new_col INT NOT NULL" )
2443
- dbA .ExpectErr (t ,
2444
- "cannot create logical replication stream: table tab has more than one column family" ,
2445
- "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab" , dbBURL .String (),
2446
- )
2447
- replicationtestutils .WaitForAllProducerJobsToFail (t , dbB )
2489
+ expectErr (t , "tab" , "cannot create logical replication stream: table tab has more than one column family" )
2448
2490
2449
2491
// UniqueWithoutIndex constraints are not allowed.
2450
2492
for _ , db := range []* sqlutils.SQLRunner {dbA , dbB } {
2451
2493
db .Exec (t , "SET experimental_enable_unique_without_index_constraints = true" )
2452
2494
db .Exec (t , "CREATE TABLE tab_with_uwi (pk INT PRIMARY KEY, v INT UNIQUE WITHOUT INDEX)" )
2453
2495
}
2454
- dbA .ExpectErr (t ,
2455
- "cannot create logical replication stream: table tab_with_uwi has UNIQUE WITHOUT INDEX constraints: unique_v" ,
2456
- "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab_with_uwi ON $1 INTO TABLE tab_with_uwi" , dbBURL .String (),
2457
- )
2496
+ expectErr (t , "tab_with_uwi" , "cannot create logical replication stream: table tab_with_uwi has UNIQUE WITHOUT INDEX constraints: unique_v" )
2458
2497
replicationtestutils .WaitForAllProducerJobsToFail (t , dbB )
2459
2498
2460
2499
// Check for mismatched numbers of columns.
2461
2500
dbA .Exec (t , "ALTER TABLE tab DROP COLUMN new_col" )
2462
- dbA .ExpectErr (t ,
2463
- "cannot create logical replication stream: destination table tab has 2 columns, but the source table tab has 3 columns" ,
2464
- "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab" , dbBURL .String (),
2465
- )
2501
+ expectErr (t , "tab" , "cannot create logical replication stream: destination table tab has 2 columns, but the source table tab has 3 columns" )
2466
2502
replicationtestutils .WaitForAllProducerJobsToFail (t , dbB )
2467
2503
2468
2504
// Check for mismatched column types.
2469
2505
dbA .Exec (t , "ALTER TABLE tab ADD COLUMN new_col TEXT NOT NULL" )
2470
- dbA .ExpectErr (t ,
2471
- "cannot create logical replication stream: destination table tab column new_col has type STRING, but the source table tab has type INT8" ,
2472
- "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab" , dbBURL .String (),
2473
- )
2506
+ expectErr (t , "tab" , "cannot create logical replication stream: destination table tab column new_col has type STRING, but the source table tab has type INT8" )
2474
2507
replicationtestutils .WaitForAllProducerJobsToFail (t , dbB )
2475
2508
2476
2509
// Check for composite type in primary key.
@@ -2479,39 +2512,27 @@ func TestLogicalReplicationCreationChecks(t *testing.T) {
2479
2512
dbA .Exec (t , "ALTER TABLE tab ADD COLUMN composite_col DECIMAL NOT NULL" )
2480
2513
dbB .Exec (t , "ALTER TABLE b.tab ADD COLUMN composite_col DECIMAL NOT NULL" )
2481
2514
dbA .Exec (t , "ALTER TABLE tab ALTER PRIMARY KEY USING COLUMNS (pk, composite_col)" )
2482
- dbA .ExpectErr (t ,
2483
- `cannot create logical replication stream: table tab has a primary key column \(composite_col\) with composite encoding` ,
2484
- "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab" , dbBURL .String (),
2485
- )
2515
+ expectErr (t , "tab" , `cannot create logical replication stream: table tab has a primary key column \(composite_col\) with composite encoding` )
2486
2516
replicationtestutils .WaitForAllProducerJobsToFail (t , dbB )
2487
2517
2488
2518
// Check for partial indexes.
2489
2519
dbA .Exec (t , "ALTER TABLE tab ALTER PRIMARY KEY USING COLUMNS (pk)" )
2490
2520
dbA .Exec (t , "CREATE INDEX partial_idx ON tab(composite_col) WHERE pk > 0" )
2491
- dbA .ExpectErr (t ,
2492
- `cannot create logical replication stream: table tab has a partial index partial_idx` ,
2493
- "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab" , dbBURL .String (),
2494
- )
2521
+ expectErr (t , "tab" , "cannot create logical replication stream: table tab has a partial index partial_idx" )
2495
2522
replicationtestutils .WaitForAllProducerJobsToFail (t , dbB )
2496
2523
2497
2524
// Check for virtual computed columns that are a key of a secondary index.
2498
2525
dbA .Exec (t , "DROP INDEX partial_idx" )
2499
2526
dbA .Exec (t , "ALTER TABLE tab ADD COLUMN virtual_col INT NOT NULL AS (pk + 1) VIRTUAL" )
2500
2527
dbB .Exec (t , "ALTER TABLE b.tab ADD COLUMN virtual_col INT NOT NULL AS (pk + 1) VIRTUAL" )
2501
2528
dbA .Exec (t , "CREATE INDEX virtual_col_idx ON tab(virtual_col)" )
2502
- dbA .ExpectErr (t ,
2503
- `cannot create logical replication stream: table tab has a virtual computed column virtual_col that is a key of index virtual_col_idx` ,
2504
- "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab" , dbBURL .String (),
2505
- )
2529
+ expectErr (t , "tab" , "cannot create logical replication stream: table tab has a virtual computed column virtual_col that is a key of index virtual_col_idx" )
2506
2530
replicationtestutils .WaitForAllProducerJobsToFail (t , dbB )
2507
2531
2508
2532
// Check for virtual columns that are in the primary index.
2509
2533
dbA .Exec (t , "DROP INDEX virtual_col_idx" )
2510
2534
dbA .Exec (t , "ALTER TABLE tab ALTER PRIMARY KEY USING COLUMNS (pk, virtual_col)" )
2511
- dbA .ExpectErr (t ,
2512
- `cannot create logical replication stream: table tab has a virtual computed column virtual_col that appears in the primary key` ,
2513
- "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab" , dbBURL .String (),
2514
- )
2535
+ expectErr (t , "tab" , "cannot create logical replication stream: table tab has a virtual computed column virtual_col that appears in the primary key" )
2515
2536
replicationtestutils .WaitForAllProducerJobsToFail (t , dbB )
2516
2537
2517
2538
// Change the primary key back, and remove the indexes that are left over from
@@ -2524,10 +2545,7 @@ func TestLogicalReplicationCreationChecks(t *testing.T) {
2524
2545
// Check that CHECK constraints match.
2525
2546
dbA .Exec (t , "ALTER TABLE tab ADD CONSTRAINT check_constraint_1 CHECK (pk > 0)" )
2526
2547
dbB .Exec (t , "ALTER TABLE b.tab ADD CONSTRAINT check_constraint_1 CHECK (length(payload) > 1)" )
2527
- dbA .ExpectErr (t ,
2528
- `cannot create logical replication stream: destination table tab CHECK constraints do not match source table tab` ,
2529
- "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab" , dbBURL .String (),
2530
- )
2548
+ expectErr (t , "tab" , "cannot create logical replication stream: destination table tab CHECK constraints do not match source table tab" )
2531
2549
replicationtestutils .WaitForAllProducerJobsToFail (t , dbB )
2532
2550
2533
2551
// Allow user to create LDR stream with mismatched CHECK via SKIP SCHEMA CHECK.
@@ -2545,7 +2563,7 @@ func TestLogicalReplicationCreationChecks(t *testing.T) {
2545
2563
dbB .Exec (t , "ALTER TABLE b.tab ADD CONSTRAINT check_constraint_2 CHECK (pk > 0)" )
2546
2564
var jobAID jobspb.JobID
2547
2565
dbA .QueryRow (t ,
2548
- "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab" ,
2566
+ "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab WITH MODE = 'validated' " ,
2549
2567
dbBURL .String (),
2550
2568
).Scan (& jobAID )
2551
2569
@@ -2559,10 +2577,7 @@ func TestLogicalReplicationCreationChecks(t *testing.T) {
2559
2577
dbA .Exec (t , "ALTER TABLE tab ADD COLUMN udf_col INT NOT NULL" )
2560
2578
dbA .Exec (t , "ALTER TABLE tab ALTER COLUMN udf_col SET DEFAULT my_udf()" )
2561
2579
dbB .Exec (t , "ALTER TABLE tab ADD COLUMN udf_col INT NOT NULL DEFAULT 1" )
2562
- dbA .ExpectErr (t ,
2563
- `cannot create logical replication stream: table tab references functions with IDs \[[0-9]+\]` ,
2564
- "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab" , dbBURL .String (),
2565
- )
2580
+ expectErr (t , "tab" , "cannot create logical replication stream: table tab references functions with IDs [[0-9]+]" )
2566
2581
replicationtestutils .WaitForAllProducerJobsToFail (t , dbB )
2567
2582
2568
2583
// Check if the table references a sequence.
@@ -2571,21 +2586,15 @@ func TestLogicalReplicationCreationChecks(t *testing.T) {
2571
2586
dbA .Exec (t , "CREATE SEQUENCE my_seq" )
2572
2587
dbA .Exec (t , "ALTER TABLE tab ADD COLUMN seq_col INT NOT NULL DEFAULT nextval('my_seq')" )
2573
2588
dbB .Exec (t , "ALTER TABLE tab ADD COLUMN seq_col INT NOT NULL DEFAULT 1" )
2574
- dbA .ExpectErr (t ,
2575
- `cannot create logical replication stream: table tab references sequences with IDs \[[0-9]+\]` ,
2576
- "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab" , dbBURL .String (),
2577
- )
2589
+ expectErr (t , "tab" , "cannot create logical replication stream: table tab references sequences with IDs [[0-9]+]" )
2578
2590
replicationtestutils .WaitForAllProducerJobsToFail (t , dbB )
2579
2591
2580
2592
// Check if table has a trigger.
2581
2593
dbA .Exec (t , "ALTER TABLE tab DROP COLUMN seq_col" )
2582
2594
dbB .Exec (t , "ALTER TABLE tab DROP COLUMN seq_col" )
2583
2595
dbA .Exec (t , "CREATE OR REPLACE FUNCTION my_trigger() RETURNS TRIGGER AS $$ BEGIN RETURN NEW; END $$ LANGUAGE PLPGSQL" )
2584
2596
dbA .Exec (t , "CREATE TRIGGER my_trigger BEFORE INSERT ON tab FOR EACH ROW EXECUTE FUNCTION my_trigger()" )
2585
- dbA .ExpectErr (t ,
2586
- `cannot create logical replication stream: table tab references triggers \[my_trigger\]` ,
2587
- "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab" , dbBURL .String (),
2588
- )
2597
+ expectErr (t , "tab" , `cannot create logical replication stream: table tab references triggers \[my_trigger\]` )
2589
2598
replicationtestutils .WaitForAllProducerJobsToFail (t , dbB )
2590
2599
2591
2600
// Verify that the stream cannot be created with mismatched enum types.
@@ -2594,9 +2603,8 @@ func TestLogicalReplicationCreationChecks(t *testing.T) {
2594
2603
dbB .Exec (t , "CREATE TYPE b.mytype AS ENUM ('a', 'b')" )
2595
2604
dbA .Exec (t , "ALTER TABLE tab ADD COLUMN enum_col mytype NOT NULL" )
2596
2605
dbB .Exec (t , "ALTER TABLE b.tab ADD COLUMN enum_col b.mytype NOT NULL" )
2597
- dbA . ExpectErr ( t ,
2606
+ expectErr ( t , "tab" ,
2598
2607
`cannot create logical replication stream: .* destination type USER DEFINED ENUM: public.mytype has logical representations \[a b c\], but the source type USER DEFINED ENUM: mytype has \[a b\]` ,
2599
- "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab" , dbBURL .String (),
2600
2608
)
2601
2609
replicationtestutils .WaitForAllProducerJobsToFail (t , dbB )
2602
2610
@@ -2616,21 +2624,15 @@ func TestLogicalReplicationCreationChecks(t *testing.T) {
2616
2624
dbB .Exec (t , "CREATE TYPE b.composite_typ AS (a TEXT, b INT)" )
2617
2625
dbA .Exec (t , "ALTER TABLE tab ADD COLUMN composite_udt_col composite_typ NOT NULL" )
2618
2626
dbB .Exec (t , "ALTER TABLE b.tab ADD COLUMN composite_udt_col b.composite_typ NOT NULL" )
2619
- dbA .ExpectErr (t ,
2620
- `cannot create logical replication stream: .* destination type USER DEFINED RECORD: public.composite_typ tuple element 0 does not match source type USER DEFINED RECORD: composite_typ tuple element 0: destination type INT8 does not match source type STRING` ,
2621
- "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab" , dbBURL .String (),
2622
- )
2627
+ expectErr (t , "tab" , "cannot create logical replication stream: .* destination type USER DEFINED RECORD: public.composite_typ tuple element 0 does not match source type USER DEFINED RECORD: composite_typ tuple element 0: destination type INT8 does not match source type STRING" )
2623
2628
replicationtestutils .WaitForAllProducerJobsToFail (t , dbB )
2624
2629
2625
2630
// Check that UNIQUE indexes match.
2626
2631
dbA .Exec (t , "ALTER TABLE tab DROP COLUMN composite_udt_col" )
2627
2632
dbB .Exec (t , "ALTER TABLE b.tab DROP COLUMN composite_udt_col" )
2628
2633
dbA .Exec (t , "CREATE UNIQUE INDEX payload_idx ON tab(payload)" )
2629
2634
dbB .Exec (t , "CREATE UNIQUE INDEX multi_idx ON b.tab(composite_col, pk)" )
2630
- dbA .ExpectErr (t ,
2631
- `cannot create logical replication stream: destination table tab UNIQUE indexes do not match source table tab` ,
2632
- "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab" , dbBURL .String (),
2633
- )
2635
+ expectErr (t , "tab" , "cannot create logical replication stream: destination table tab UNIQUE indexes do not match source table tab" )
2634
2636
replicationtestutils .WaitForAllProducerJobsToFail (t , dbB )
2635
2637
2636
2638
// Create the missing indexes on each side and verify the stream can be
@@ -2639,7 +2641,7 @@ func TestLogicalReplicationCreationChecks(t *testing.T) {
2639
2641
dbA .Exec (t , "CREATE UNIQUE INDEX multi_idx ON tab(composite_col, pk)" )
2640
2642
dbB .Exec (t , "CREATE UNIQUE INDEX payload_idx ON b.tab(payload)" )
2641
2643
dbA .QueryRow (t ,
2642
- "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab" ,
2644
+ "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab WITH MODE = 'validated' " ,
2643
2645
dbBURL .String (),
2644
2646
).Scan (& jobAID )
2645
2647
@@ -2654,7 +2656,7 @@ func TestLogicalReplicationCreationChecks(t *testing.T) {
2654
2656
dbB .Exec (t , "CREATE TABLE b.tab2 (pk INT PRIMARY KEY, payload STRING DEFAULT 'dog')" )
2655
2657
dbB .Exec (t , "Insert into tab2 values (1)" )
2656
2658
dbA .QueryRow (t ,
2657
- "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab2 ON $1 INTO TABLE tab2" ,
2659
+ "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab2 ON $1 INTO TABLE tab2 WITH MODE = 'validated' " ,
2658
2660
dbBURL .String (),
2659
2661
).Scan (& jobAID )
2660
2662
WaitUntilReplicatedTime (t , s .Clock ().Now (), dbA , jobAID )
0 commit comments