diff --git a/src/backend/distributed/master/master_repair_shards.c b/src/backend/distributed/master/master_repair_shards.c index 70dba5e2a..850e65c58 100644 --- a/src/backend/distributed/master/master_repair_shards.c +++ b/src/backend/distributed/master/master_repair_shards.c @@ -426,15 +426,28 @@ ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName, CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort, targetNodeName, targetNodePort); - /* finally insert the placements to pg_dist_placement */ + /* + * Finally insert the placements to pg_dist_placement and sync it to the + * metadata workers. + */ foreach_ptr(colocatedShard, colocatedShardList) { uint64 colocatedShardId = colocatedShard->shardId; uint32 groupId = GroupForNode(targetNodeName, targetNodePort); + uint64 placementId = GetNextPlacementId(); - InsertShardPlacementRow(colocatedShardId, INVALID_PLACEMENT_ID, + InsertShardPlacementRow(colocatedShardId, placementId, SHARD_STATE_ACTIVE, ShardLength(colocatedShardId), groupId); + + if (ShouldSyncTableMetadata(colocatedShard->relationId)) + { + char *placementCommand = PlacementUpsertCommand(colocatedShardId, placementId, + SHARD_STATE_ACTIVE, 0, + groupId); + + SendCommandToWorkersWithMetadata(placementCommand); + } } } diff --git a/src/test/regress/expected/multi_replicate_reference_table.out b/src/test/regress/expected/multi_replicate_reference_table.out index bf7c9377c..bcbc634d2 100644 --- a/src/test/regress/expected/multi_replicate_reference_table.out +++ b/src/test/regress/expected/multi_replicate_reference_table.out @@ -848,7 +848,8 @@ SELECT min(result) = max(result) AS consistent FROM run_command_on_placements('r (1 row) SET client_min_messages TO WARNING; -SELECT count(*) AS ref_table_placements FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass \gset +SELECT shardid AS ref_table_shard FROM pg_dist_shard WHERE logicalrelid = 'ref_table'::regclass \gset +SELECT count(*) AS ref_table_placements FROM pg_dist_shard_placement WHERE shardid = :ref_table_shard \gset -- remove reference table replica from worker 2 SELECT 1 FROM master_remove_node('localhost', :worker_2_port); ?column? @@ -856,7 +857,7 @@ SELECT 1 FROM master_remove_node('localhost', :worker_2_port); 1 (1 row) -SELECT count(*) - :ref_table_placements FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass; +SELECT count(*) - :ref_table_placements FROM pg_dist_shard_placement WHERE shardid = :ref_table_shard; ?column? --------------------------------------------------------------------- -1 @@ -871,7 +872,7 @@ SELECT 1 FROM master_add_node('localhost', :worker_2_port); 1 (1 row) -SELECT count(*) - :ref_table_placements FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass; +SELECT count(*) - :ref_table_placements FROM pg_dist_shard_placement WHERE shardid = :ref_table_shard; ?column? --------------------------------------------------------------------- 0 @@ -890,7 +891,7 @@ SELECT 1 FROM master_add_inactive_node('localhost', :worker_2_port); 1 (1 row) -SELECT count(*) - :ref_table_placements FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass; +SELECT count(*) - :ref_table_placements FROM pg_dist_shard_placement WHERE shardid = :ref_table_shard; ?column? --------------------------------------------------------------------- -1 @@ -902,7 +903,7 @@ SELECT 1 FROM master_activate_node('localhost', :worker_2_port); 1 (1 row) -SELECT count(*) - :ref_table_placements FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass; +SELECT count(*) - :ref_table_placements FROM pg_dist_shard_placement WHERE shardid = :ref_table_shard; ?column? --------------------------------------------------------------------- 0 @@ -914,6 +915,105 @@ SELECT min(result) = max(result) AS consistent FROM run_command_on_placements('r t (1 row) +-- test that metadata is synced when master_copy_shard_placement replicates +-- reference table shards +SET citus.replicate_reference_tables_on_activate TO off; +SELECT 1 FROM master_remove_node('localhost', :worker_2_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT 1 FROM master_add_node('localhost', :worker_2_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SET citus.replication_model TO streaming; +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + +SELECT master_copy_shard_placement( + :ref_table_shard, + 'localhost', :worker_1_port, + 'localhost', :worker_2_port, + do_repair := false, + transfer_mode := 'block_writes'); + master_copy_shard_placement +--------------------------------------------------------------------- + +(1 row) + +SELECT result::int - :ref_table_placements +FROM run_command_on_workers('SELECT count(*) FROM pg_dist_placement a, pg_dist_shard b, pg_class c WHERE a.shardid=b.shardid AND b.logicalrelid=c.oid AND c.relname=''ref_table''') +WHERE nodeport=:worker_1_port; + ?column? +--------------------------------------------------------------------- + 0 +(1 row) + +-- test that metadata is synced on replicate_reference_tables +SELECT 1 FROM master_remove_node('localhost', :worker_2_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT 1 FROM master_add_node('localhost', :worker_2_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT replicate_reference_tables(); + replicate_reference_tables +--------------------------------------------------------------------- + +(1 row) + +SELECT result::int - :ref_table_placements +FROM run_command_on_workers('SELECT count(*) FROM pg_dist_placement a, pg_dist_shard b, pg_class c WHERE a.shardid=b.shardid AND b.logicalrelid=c.oid AND c.relname=''ref_table''') +WHERE nodeport=:worker_1_port; + ?column? +--------------------------------------------------------------------- + 0 +(1 row) + +-- join the reference table with a distributed table from worker 1 +-- to verify that metadata for worker 2 placements have been synced +-- to worker 1. +CREATE TABLE dist_table(a int, b int); +SELECT create_distributed_table('dist_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO dist_table SELECT i, i * i FROM generate_series(1, 20) i; +TRUNCATE ref_table; +INSERT INTO ref_table SELECT 2 * i FROM generate_series(1, 5) i; +\c - - - :worker_1_port +SET search_path TO replicate_reference_table; +SELECT array_agg(dist_table.b ORDER BY ref_table.a) +FROM ref_table, dist_table +WHERE ref_table.a = dist_table.a; + array_agg +--------------------------------------------------------------------- + {4,16,36,64,100} +(1 row) + +\c - - - :master_port +SET search_path TO replicate_reference_table; +SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); + stop_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + -- test adding an invalid node while we have reference tables to replicate -- set client message level to ERROR and verbosity to terse to supporess -- OS-dependent host name resolution warnings diff --git a/src/test/regress/sql/multi_replicate_reference_table.sql b/src/test/regress/sql/multi_replicate_reference_table.sql index 7b1951bfb..4535aebfc 100644 --- a/src/test/regress/sql/multi_replicate_reference_table.sql +++ b/src/test/regress/sql/multi_replicate_reference_table.sql @@ -551,32 +551,89 @@ SELECT min(result) = max(result) AS consistent FROM run_command_on_placements('r SET client_min_messages TO WARNING; -SELECT count(*) AS ref_table_placements FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass \gset +SELECT shardid AS ref_table_shard FROM pg_dist_shard WHERE logicalrelid = 'ref_table'::regclass \gset + +SELECT count(*) AS ref_table_placements FROM pg_dist_shard_placement WHERE shardid = :ref_table_shard \gset -- remove reference table replica from worker 2 SELECT 1 FROM master_remove_node('localhost', :worker_2_port); -SELECT count(*) - :ref_table_placements FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass; +SELECT count(*) - :ref_table_placements FROM pg_dist_shard_placement WHERE shardid = :ref_table_shard; -- test setting citus.replicate_reference_tables_on_activate to on -- master_add_node SET citus.replicate_reference_tables_on_activate TO on; SELECT 1 FROM master_add_node('localhost', :worker_2_port); -SELECT count(*) - :ref_table_placements FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass; +SELECT count(*) - :ref_table_placements FROM pg_dist_shard_placement WHERE shardid = :ref_table_shard; -- master_activate_node SELECT 1 FROM master_remove_node('localhost', :worker_2_port); SELECT 1 FROM master_add_inactive_node('localhost', :worker_2_port); -SELECT count(*) - :ref_table_placements FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass; +SELECT count(*) - :ref_table_placements FROM pg_dist_shard_placement WHERE shardid = :ref_table_shard; SELECT 1 FROM master_activate_node('localhost', :worker_2_port); -SELECT count(*) - :ref_table_placements FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass; +SELECT count(*) - :ref_table_placements FROM pg_dist_shard_placement WHERE shardid = :ref_table_shard; SELECT min(result) = max(result) AS consistent FROM run_command_on_placements('ref_table', 'SELECT sum(a) FROM %s'); +-- test that metadata is synced when master_copy_shard_placement replicates +-- reference table shards +SET citus.replicate_reference_tables_on_activate TO off; +SELECT 1 FROM master_remove_node('localhost', :worker_2_port); +SELECT 1 FROM master_add_node('localhost', :worker_2_port); + +SET citus.replication_model TO streaming; +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + +SELECT master_copy_shard_placement( + :ref_table_shard, + 'localhost', :worker_1_port, + 'localhost', :worker_2_port, + do_repair := false, + transfer_mode := 'block_writes'); + +SELECT result::int - :ref_table_placements +FROM run_command_on_workers('SELECT count(*) FROM pg_dist_placement a, pg_dist_shard b, pg_class c WHERE a.shardid=b.shardid AND b.logicalrelid=c.oid AND c.relname=''ref_table''') +WHERE nodeport=:worker_1_port; + +-- test that metadata is synced on replicate_reference_tables +SELECT 1 FROM master_remove_node('localhost', :worker_2_port); +SELECT 1 FROM master_add_node('localhost', :worker_2_port); + +SELECT replicate_reference_tables(); + +SELECT result::int - :ref_table_placements +FROM run_command_on_workers('SELECT count(*) FROM pg_dist_placement a, pg_dist_shard b, pg_class c WHERE a.shardid=b.shardid AND b.logicalrelid=c.oid AND c.relname=''ref_table''') +WHERE nodeport=:worker_1_port; + +-- join the reference table with a distributed table from worker 1 +-- to verify that metadata for worker 2 placements have been synced +-- to worker 1. + +CREATE TABLE dist_table(a int, b int); +SELECT create_distributed_table('dist_table', 'a'); +INSERT INTO dist_table SELECT i, i * i FROM generate_series(1, 20) i; + +TRUNCATE ref_table; +INSERT INTO ref_table SELECT 2 * i FROM generate_series(1, 5) i; + +\c - - - :worker_1_port + +SET search_path TO replicate_reference_table; + +SELECT array_agg(dist_table.b ORDER BY ref_table.a) +FROM ref_table, dist_table +WHERE ref_table.a = dist_table.a; + +\c - - - :master_port + +SET search_path TO replicate_reference_table; + +SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); + -- test adding an invalid node while we have reference tables to replicate -- set client message level to ERROR and verbosity to terse to supporess -- OS-dependent host name resolution warnings