From e5fd1c3a87ed73f34f6ae59fa968e272cc48a9fb Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Tue, 28 Mar 2023 15:10:05 +0200 Subject: [PATCH] Fix TAP tests after CREATE PUBLICATION changes --- .../001_cdc_create_distributed_table_test.pl | 3 +- ...c_create_distributed_table_concurrently.pl | 3 +- src/test/cdc/t/004_cdc_move_shard.pl | 3 +- .../cdc/t/005_cdc_reference_table_test.pl | 2 +- .../cdc/t/006_cdc_schema_change_and_move.pl | 2 +- .../cdc/t/007_cdc_undistributed_table_test.pl | 3 +- src/test/cdc/t/008_cdc_shard_split_test.pl | 3 +- .../009_cdc_shard_split_test_non_blocking.pl | 3 +- .../t/010_cdc_shard_split_parallel_insert.pl | 3 +- src/test/cdc/t/012_cdc_restart_test.pl | 3 +- src/test/cdc/t/cdctestlib.pm | 28 ++----------------- 11 files changed, 12 insertions(+), 44 deletions(-) diff --git a/src/test/cdc/t/001_cdc_create_distributed_table_test.pl b/src/test/cdc/t/001_cdc_create_distributed_table_test.pl index 7a562da94..74850e58a 100644 --- a/src/test/cdc/t/001_cdc_create_distributed_table_test.pl +++ b/src/test/cdc/t/001_cdc_create_distributed_table_test.pl @@ -40,12 +40,11 @@ create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors'); connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client); wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator); -create_cdc_replication_slots_for_workers(\@workers); +create_cdc_slots_for_workers(\@workers); # Distribut the sensors table to worker nodes. $node_coordinator->safe_psql('postgres',"SELECT create_distributed_table('sensors', 'measureid');"); -create_cdc_publication_for_workers(\@workers,'sensors'); connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client); wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers); diff --git a/src/test/cdc/t/002_cdc_create_distributed_table_concurrently.pl b/src/test/cdc/t/002_cdc_create_distributed_table_concurrently.pl index b8906421b..511ec1672 100644 --- a/src/test/cdc/t/002_cdc_create_distributed_table_concurrently.pl +++ b/src/test/cdc/t/002_cdc_create_distributed_table_concurrently.pl @@ -41,12 +41,11 @@ create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors'); connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client); wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator); -create_cdc_replication_slots_for_workers(\@workers); +create_cdc_slots_for_workers(\@workers); # Distribut the sensors table to worker nodes. $node_coordinator->safe_psql('postgres',"SELECT create_distributed_table_concurrently('sensors', 'measureid');"); -create_cdc_publication_for_workers(\@workers,'sensors'); connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client); wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers); diff --git a/src/test/cdc/t/004_cdc_move_shard.pl b/src/test/cdc/t/004_cdc_move_shard.pl index 73045f4c3..3b76c15f5 100644 --- a/src/test/cdc/t/004_cdc_move_shard.pl +++ b/src/test/cdc/t/004_cdc_move_shard.pl @@ -46,7 +46,7 @@ create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors'); connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client); wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator); -create_cdc_replication_slots_for_workers(\@workers); +create_cdc_slots_for_workers(\@workers); #insert data into the sensors table in the coordinator node before distributing the table. $node_coordinator->safe_psql('postgres'," @@ -56,7 +56,6 @@ FROM generate_series(0,100)i;"); $node_coordinator->safe_psql('postgres',"SET citus.shard_count = 2; SELECT create_distributed_table_concurrently('sensors', 'measureid');"); -create_cdc_publication_for_workers(\@workers,'sensors'); connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client); wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers); diff --git a/src/test/cdc/t/005_cdc_reference_table_test.pl b/src/test/cdc/t/005_cdc_reference_table_test.pl index 5858b9ecd..d41fbc909 100644 --- a/src/test/cdc/t/005_cdc_reference_table_test.pl +++ b/src/test/cdc/t/005_cdc_reference_table_test.pl @@ -26,7 +26,7 @@ wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator); # Create the reference table in the coordinator and cdc client nodes. $node_coordinator->safe_psql('postgres',"SELECT create_reference_table('reference_table');"); -create_cdc_replication_slots_for_workers(\@workers); +create_cdc_slots_for_workers(\@workers); connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client); wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers); diff --git a/src/test/cdc/t/006_cdc_schema_change_and_move.pl b/src/test/cdc/t/006_cdc_schema_change_and_move.pl index 9a05d977a..7b0c0759d 100644 --- a/src/test/cdc/t/006_cdc_schema_change_and_move.pl +++ b/src/test/cdc/t/006_cdc_schema_change_and_move.pl @@ -56,7 +56,7 @@ FROM generate_series(0,100)i;"); $node_coordinator->safe_psql('postgres',"SET citus.shard_count = 2; SELECT create_distributed_table_concurrently('sensors', 'measureid');"); #connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client); -create_cdc_publication_and_slots_for_workers(\@workers,'sensors'); +create_cdc_slots_for_workers(\@workers); connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client); wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers); diff --git a/src/test/cdc/t/007_cdc_undistributed_table_test.pl b/src/test/cdc/t/007_cdc_undistributed_table_test.pl index c7504ef4d..c8c87f678 100644 --- a/src/test/cdc/t/007_cdc_undistributed_table_test.pl +++ b/src/test/cdc/t/007_cdc_undistributed_table_test.pl @@ -43,12 +43,11 @@ create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors'); connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client); wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator); -create_cdc_replication_slots_for_workers(\@workers); +create_cdc_slots_for_workers(\@workers); # Distribut the sensors table to worker nodes. $node_coordinator->safe_psql('postgres',"SELECT create_distributed_table('sensors', 'measureid');"); -create_cdc_publication_for_workers(\@workers,'sensors'); connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client); wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers); diff --git a/src/test/cdc/t/008_cdc_shard_split_test.pl b/src/test/cdc/t/008_cdc_shard_split_test.pl index ba33d6a21..6348116f7 100644 --- a/src/test/cdc/t/008_cdc_shard_split_test.pl +++ b/src/test/cdc/t/008_cdc_shard_split_test.pl @@ -46,13 +46,12 @@ create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors'); connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client); wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator); -create_cdc_replication_slots_for_workers(\@workers); +create_cdc_slots_for_workers(\@workers); # Distribut the sensors table to worker nodes. $node_coordinator->safe_psql('postgres',"SELECT create_distributed_table('sensors', 'measureid');"); -create_cdc_publication_for_workers(\@workers,'sensors'); connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client); wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers); diff --git a/src/test/cdc/t/009_cdc_shard_split_test_non_blocking.pl b/src/test/cdc/t/009_cdc_shard_split_test_non_blocking.pl index b4f4c9bec..e73637aa1 100644 --- a/src/test/cdc/t/009_cdc_shard_split_test_non_blocking.pl +++ b/src/test/cdc/t/009_cdc_shard_split_test_non_blocking.pl @@ -46,13 +46,12 @@ create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors'); connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client); wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator); -create_cdc_replication_slots_for_workers(\@workers); +create_cdc_slots_for_workers(\@workers); # Distribut the sensors table to worker nodes. $node_coordinator->safe_psql('postgres',"SELECT create_distributed_table('sensors', 'measureid');"); -create_cdc_publication_for_workers(\@workers,'sensors'); connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client); wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers); diff --git a/src/test/cdc/t/010_cdc_shard_split_parallel_insert.pl b/src/test/cdc/t/010_cdc_shard_split_parallel_insert.pl index 0f3e574bc..cb00ec328 100644 --- a/src/test/cdc/t/010_cdc_shard_split_parallel_insert.pl +++ b/src/test/cdc/t/010_cdc_shard_split_parallel_insert.pl @@ -48,12 +48,11 @@ create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors'); connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client); wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator); -create_cdc_replication_slots_for_workers(\@workers); +create_cdc_slots_for_workers(\@workers); # Distribut the sensors table to worker nodes. $node_coordinator->safe_psql('postgres',"SELECT create_distributed_table('sensors', 'measureid');"); -create_cdc_publication_for_workers(\@workers,'sensors'); connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client); wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers); diff --git a/src/test/cdc/t/012_cdc_restart_test.pl b/src/test/cdc/t/012_cdc_restart_test.pl index 610c0b558..81a129c5a 100644 --- a/src/test/cdc/t/012_cdc_restart_test.pl +++ b/src/test/cdc/t/012_cdc_restart_test.pl @@ -40,12 +40,11 @@ create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors'); connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client); wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator); -create_cdc_replication_slots_for_workers(\@workers); +create_cdc_slots_for_workers(\@workers); # Distribut the sensors table to worker nodes. $node_coordinator->safe_psql('postgres',"SELECT create_distributed_table('sensors', 'measureid');"); -create_cdc_publication_for_workers(\@workers,'sensors'); connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client); wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers); $result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt); diff --git a/src/test/cdc/t/cdctestlib.pm b/src/test/cdc/t/cdctestlib.pm index e3c72d7aa..393a62ac9 100644 --- a/src/test/cdc/t/cdctestlib.pm +++ b/src/test/cdc/t/cdctestlib.pm @@ -195,7 +195,7 @@ sub create_cdc_publication_and_replication_slots_for_citus_cluster { my $table_names = $_[2]; create_cdc_publication_and_slots_for_coordinator($node_coordinator, $table_names); - create_cdc_publication_and_slots_for_workers($workersref, $table_names); + create_cdc_slots_for_workers($workersref); } sub create_cdc_publication_and_slots_for_coordinator { @@ -210,31 +210,7 @@ sub create_cdc_publication_and_slots_for_coordinator { $node_coordinator->safe_psql('postgres',"SELECT pg_catalog.pg_create_logical_replication_slot('cdc_replication_slot','citus',false,false)"); } -sub create_cdc_publication_and_slots_for_workers { - my $workersref = $_[0]; - my $table_names = $_[1]; - create_cdc_publication_for_workers($workersref, $table_names); - create_cdc_replication_slots_for_workers($workersref); -} - -sub create_cdc_publication_for_workers { - my $workersref = $_[0]; - my $table_names = $_[1]; - for (@$workersref) { - my $pub = $_->safe_psql('postgres',"SELECT * FROM pg_publication WHERE pubname = 'cdc_publication';"); - if ($pub ne "") { - $_->safe_psql('postgres',"DROP PUBLICATION IF EXISTS cdc_publication;"); - } - if ($table_names eq "all") { - $_->safe_psql('postgres',"CREATE PUBLICATION cdc_publication FOR ALL TABLES;"); - } else { - $_->safe_psql('postgres',"CREATE PUBLICATION cdc_publication FOR TABLE $table_names;"); - } - } -} - - -sub create_cdc_replication_slots_for_workers { +sub create_cdc_slots_for_workers { my $workersref = $_[0]; for (@$workersref) { my $slot = $_->safe_psql('postgres',"select * from pg_replication_slots where slot_name = 'cdc_replication_slot';");