Fix TAP tests after CREATE PUBLICATION changes

pull/6776/head
Marco Slot 2023-03-28 15:10:05 +02:00
parent 8ad444f8ef
commit e5fd1c3a87
11 changed files with 12 additions and 44 deletions

View File

@ -40,12 +40,11 @@ create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors');
connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator);
create_cdc_replication_slots_for_workers(\@workers);
create_cdc_slots_for_workers(\@workers);
# Distribut the sensors table to worker nodes.
$node_coordinator->safe_psql('postgres',"SELECT create_distributed_table('sensors', 'measureid');");
create_cdc_publication_for_workers(\@workers,'sensors');
connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);

View File

@ -41,12 +41,11 @@ create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors');
connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator);
create_cdc_replication_slots_for_workers(\@workers);
create_cdc_slots_for_workers(\@workers);
# Distribut the sensors table to worker nodes.
$node_coordinator->safe_psql('postgres',"SELECT create_distributed_table_concurrently('sensors', 'measureid');");
create_cdc_publication_for_workers(\@workers,'sensors');
connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);

View File

@ -46,7 +46,7 @@ create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors');
connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator);
create_cdc_replication_slots_for_workers(\@workers);
create_cdc_slots_for_workers(\@workers);
#insert data into the sensors table in the coordinator node before distributing the table.
$node_coordinator->safe_psql('postgres',"
@ -56,7 +56,6 @@ FROM generate_series(0,100)i;");
$node_coordinator->safe_psql('postgres',"SET citus.shard_count = 2; SELECT create_distributed_table_concurrently('sensors', 'measureid');");
create_cdc_publication_for_workers(\@workers,'sensors');
connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);

View File

@ -26,7 +26,7 @@ wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator);
# Create the reference table in the coordinator and cdc client nodes.
$node_coordinator->safe_psql('postgres',"SELECT create_reference_table('reference_table');");
create_cdc_replication_slots_for_workers(\@workers);
create_cdc_slots_for_workers(\@workers);
connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);

View File

@ -56,7 +56,7 @@ FROM generate_series(0,100)i;");
$node_coordinator->safe_psql('postgres',"SET citus.shard_count = 2; SELECT create_distributed_table_concurrently('sensors', 'measureid');");
#connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client);
create_cdc_publication_and_slots_for_workers(\@workers,'sensors');
create_cdc_slots_for_workers(\@workers);
connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);

View File

@ -43,12 +43,11 @@ create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors');
connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator);
create_cdc_replication_slots_for_workers(\@workers);
create_cdc_slots_for_workers(\@workers);
# Distribut the sensors table to worker nodes.
$node_coordinator->safe_psql('postgres',"SELECT create_distributed_table('sensors', 'measureid');");
create_cdc_publication_for_workers(\@workers,'sensors');
connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);

View File

@ -46,13 +46,12 @@ create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors');
connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator);
create_cdc_replication_slots_for_workers(\@workers);
create_cdc_slots_for_workers(\@workers);
# Distribut the sensors table to worker nodes.
$node_coordinator->safe_psql('postgres',"SELECT create_distributed_table('sensors', 'measureid');");
create_cdc_publication_for_workers(\@workers,'sensors');
connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);

View File

@ -46,13 +46,12 @@ create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors');
connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator);
create_cdc_replication_slots_for_workers(\@workers);
create_cdc_slots_for_workers(\@workers);
# Distribut the sensors table to worker nodes.
$node_coordinator->safe_psql('postgres',"SELECT create_distributed_table('sensors', 'measureid');");
create_cdc_publication_for_workers(\@workers,'sensors');
connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);

View File

@ -48,12 +48,11 @@ create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors');
connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator);
create_cdc_replication_slots_for_workers(\@workers);
create_cdc_slots_for_workers(\@workers);
# Distribut the sensors table to worker nodes.
$node_coordinator->safe_psql('postgres',"SELECT create_distributed_table('sensors', 'measureid');");
create_cdc_publication_for_workers(\@workers,'sensors');
connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);

View File

@ -40,12 +40,11 @@ create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors');
connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator);
create_cdc_replication_slots_for_workers(\@workers);
create_cdc_slots_for_workers(\@workers);
# Distribut the sensors table to worker nodes.
$node_coordinator->safe_psql('postgres',"SELECT create_distributed_table('sensors', 'measureid');");
create_cdc_publication_for_workers(\@workers,'sensors');
connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt);

View File

@ -195,7 +195,7 @@ sub create_cdc_publication_and_replication_slots_for_citus_cluster {
my $table_names = $_[2];
create_cdc_publication_and_slots_for_coordinator($node_coordinator, $table_names);
create_cdc_publication_and_slots_for_workers($workersref, $table_names);
create_cdc_slots_for_workers($workersref);
}
sub create_cdc_publication_and_slots_for_coordinator {
@ -210,31 +210,7 @@ sub create_cdc_publication_and_slots_for_coordinator {
$node_coordinator->safe_psql('postgres',"SELECT pg_catalog.pg_create_logical_replication_slot('cdc_replication_slot','citus',false,false)");
}
sub create_cdc_publication_and_slots_for_workers {
my $workersref = $_[0];
my $table_names = $_[1];
create_cdc_publication_for_workers($workersref, $table_names);
create_cdc_replication_slots_for_workers($workersref);
}
sub create_cdc_publication_for_workers {
my $workersref = $_[0];
my $table_names = $_[1];
for (@$workersref) {
my $pub = $_->safe_psql('postgres',"SELECT * FROM pg_publication WHERE pubname = 'cdc_publication';");
if ($pub ne "") {
$_->safe_psql('postgres',"DROP PUBLICATION IF EXISTS cdc_publication;");
}
if ($table_names eq "all") {
$_->safe_psql('postgres',"CREATE PUBLICATION cdc_publication FOR ALL TABLES;");
} else {
$_->safe_psql('postgres',"CREATE PUBLICATION cdc_publication FOR TABLE $table_names;");
}
}
}
sub create_cdc_replication_slots_for_workers {
sub create_cdc_slots_for_workers {
my $workersref = $_[0];
for (@$workersref) {
my $slot = $_->safe_psql('postgres',"select * from pg_replication_slots where slot_name = 'cdc_replication_slot';");