From 3b57ff2867359f286c64da00c82de250d949c6e5 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Tue, 9 Aug 2022 00:38:58 +0200 Subject: [PATCH] Fix crash in citus_copy_shard_placement --- .../distributed/operations/repair_shards.c | 38 +++++++++++++++ .../multi_replicate_reference_table.out | 46 ++++++++++++++++++- .../sql/multi_replicate_reference_table.sql | 29 +++++++++++- 3 files changed, 111 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/operations/repair_shards.c b/src/backend/distributed/operations/repair_shards.c index ec7db7da4..d13a28e2c 100644 --- a/src/backend/distributed/operations/repair_shards.c +++ b/src/backend/distributed/operations/repair_shards.c @@ -74,6 +74,8 @@ static void VerifyTablesHaveReplicaIdentity(List *colocatedTableList); static bool RelationCanPublishAllModifications(Oid relationId); static bool CanUseLogicalReplication(Oid relationId, char shardReplicationMode); static void ErrorIfTableCannotBeReplicated(Oid relationId); +static void ErrorIfTargetNodeIsNotSafeToCopyTo(const char *targetNodeName, + int targetNodePort); static void RepairShardPlacement(int64 shardId, const char *sourceNodeName, int32 sourceNodePort, const char *targetNodeName, int32 targetNodePort); @@ -183,6 +185,7 @@ citus_copy_shard_placement(PG_FUNCTION_ARGS) ShardInterval *shardInterval = LoadShardInterval(shardId); ErrorIfTableCannotBeReplicated(shardInterval->relationId); + ErrorIfTargetNodeIsNotSafeToCopyTo(targetNodeName, targetNodePort); AcquirePlacementColocationLock(shardInterval->relationId, ExclusiveLock, doRepair ? "repair" : "copy"); @@ -789,6 +792,41 @@ ErrorIfTableCannotBeReplicated(Oid relationId) } +/* + * ErrorIfTargetNodeIsNotSafeToCopyTo throws an error if the target node is not + * eligible for copying shards. + */ +static void +ErrorIfTargetNodeIsNotSafeToCopyTo(const char *targetNodeName, int targetNodePort) +{ + WorkerNode *workerNode = FindWorkerNode(targetNodeName, targetNodePort); + if (workerNode == NULL) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("Copying shards to a non-existing node is not supported"), + errhint( + "Add the target node via SELECT citus_add_node('%s', %d);", + targetNodeName, targetNodePort))); + } + + if (!workerNode->isActive) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("Copying shards to a non-active node is not supported"), + errhint( + "Activate the target node via SELECT citus_activate_node('%s', %d);", + targetNodeName, targetNodePort))); + } + + if (!NodeIsPrimary(workerNode)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("Copying shards to a secondary (e.g., replica) node is " + "not supported"))); + } +} + + /* * LookupShardTransferMode maps the oids of citus.shard_transfer_mode enum * values to a char. diff --git a/src/test/regress/expected/multi_replicate_reference_table.out b/src/test/regress/expected/multi_replicate_reference_table.out index 7bc282c51..9f27e05a7 100644 --- a/src/test/regress/expected/multi_replicate_reference_table.out +++ b/src/test/regress/expected/multi_replicate_reference_table.out @@ -781,7 +781,51 @@ SELECT create_reference_table('ref_table'); (1 row) INSERT INTO ref_table SELECT * FROM generate_series(1, 10); -SELECT 1 FROM master_add_node('localhost', :worker_2_port); +-- verify direct call to citus_copy_shard_placement errors if target node is not yet added +SELECT citus_copy_shard_placement( + (SELECT shardid FROM pg_dist_shard WHERE logicalrelid='ref_table'::regclass::oid), + 'localhost', :worker_1_port, + 'localhost', :worker_2_port, + do_repair := false, + transfer_mode := 'block_writes'); +ERROR: Copying shards to a non-existing node is not supported +HINT: Add the target node via SELECT citus_add_node('localhost', 57638); +-- verify direct call to citus_copy_shard_placement errors if target node is secondary +SELECT citus_add_secondary_node('localhost', :worker_2_port, 'localhost', :worker_1_port); + citus_add_secondary_node +--------------------------------------------------------------------- + 1370012 +(1 row) + +SELECT citus_copy_shard_placement( + (SELECT shardid FROM pg_dist_shard WHERE logicalrelid='ref_table'::regclass::oid), + 'localhost', :worker_1_port, + 'localhost', :worker_2_port, + do_repair := false, + transfer_mode := 'block_writes'); +ERROR: Copying shards to a secondary (e.g., replica) node is not supported +SELECT citus_remove_node('localhost', :worker_2_port); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +-- verify direct call to citus_copy_shard_placement errors if target node is inactive +SELECT 1 FROM master_add_inactive_node('localhost', :worker_2_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT citus_copy_shard_placement( + (SELECT shardid FROM pg_dist_shard WHERE logicalrelid='ref_table'::regclass::oid), + 'localhost', :worker_1_port, + 'localhost', :worker_2_port, + do_repair := false, + transfer_mode := 'block_writes'); +ERROR: Copying shards to a non-active node is not supported +HINT: Activate the target node via SELECT citus_activate_node('localhost', 57638); +SELECT 1 FROM master_activate_node('localhost', :worker_2_port); ?column? --------------------------------------------------------------------- 1 diff --git a/src/test/regress/sql/multi_replicate_reference_table.sql b/src/test/regress/sql/multi_replicate_reference_table.sql index 3e0acbc1c..c5671a869 100644 --- a/src/test/regress/sql/multi_replicate_reference_table.sql +++ b/src/test/regress/sql/multi_replicate_reference_table.sql @@ -509,7 +509,34 @@ CREATE TABLE ref_table(a int); SELECT create_reference_table('ref_table'); INSERT INTO ref_table SELECT * FROM generate_series(1, 10); -SELECT 1 FROM master_add_node('localhost', :worker_2_port); +-- verify direct call to citus_copy_shard_placement errors if target node is not yet added +SELECT citus_copy_shard_placement( + (SELECT shardid FROM pg_dist_shard WHERE logicalrelid='ref_table'::regclass::oid), + 'localhost', :worker_1_port, + 'localhost', :worker_2_port, + do_repair := false, + transfer_mode := 'block_writes'); + +-- verify direct call to citus_copy_shard_placement errors if target node is secondary +SELECT citus_add_secondary_node('localhost', :worker_2_port, 'localhost', :worker_1_port); +SELECT citus_copy_shard_placement( + (SELECT shardid FROM pg_dist_shard WHERE logicalrelid='ref_table'::regclass::oid), + 'localhost', :worker_1_port, + 'localhost', :worker_2_port, + do_repair := false, + transfer_mode := 'block_writes'); +SELECT citus_remove_node('localhost', :worker_2_port); + +-- verify direct call to citus_copy_shard_placement errors if target node is inactive +SELECT 1 FROM master_add_inactive_node('localhost', :worker_2_port); +SELECT citus_copy_shard_placement( + (SELECT shardid FROM pg_dist_shard WHERE logicalrelid='ref_table'::regclass::oid), + 'localhost', :worker_1_port, + 'localhost', :worker_2_port, + do_repair := false, + transfer_mode := 'block_writes'); + +SELECT 1 FROM master_activate_node('localhost', :worker_2_port); -- verify we cannot replicate reference tables in a transaction modifying pg_dist_node BEGIN;