diff --git a/src/backend/distributed/operations/shard_transfer.c b/src/backend/distributed/operations/shard_transfer.c index 468706f0e..356e3dd6a 100644 --- a/src/backend/distributed/operations/shard_transfer.c +++ b/src/backend/distributed/operations/shard_transfer.c @@ -75,6 +75,9 @@ static bool CanUseLogicalReplication(Oid relationId, char shardReplicationMode); static void ErrorIfTableCannotBeReplicated(Oid relationId); static void ErrorIfTargetNodeIsNotSafeToCopyTo(const char *targetNodeName, int targetNodePort); +static void ErrorIfSameNode(char *sourceNodeName, int sourceNodePort, + char *targetNodeName, int targetNodePort, + const char *operationName); static void ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, int32 targetNodePort, @@ -107,6 +110,8 @@ static void UpdateColocatedShardPlacementMetadataOnWorkers(int64 shardId, int32 sourceNodePort, char *targetNodeName, int32 targetNodePort); +static bool IsShardListOnNode(List *colocatedShardList, char *targetNodeName, + uint32 targetPort); static void CheckSpaceConstraints(MultiConnection *connection, uint64 colocationSizeInBytes); static void EnsureEnoughDiskSpaceForShardMove(List *colocatedShardList, @@ -236,6 +241,10 @@ citus_move_shard_placement(PG_FUNCTION_ARGS) ListCell *colocatedTableCell = NULL; ListCell *colocatedShardCell = NULL; + ErrorIfSameNode(sourceNodeName, sourceNodePort, + targetNodeName, targetNodePort, + "move"); + Oid relationId = RelationIdForShard(shardId); ErrorIfMoveUnsupportedTableType(relationId); ErrorIfTargetNodeIsNotSafeToMove(targetNodeName, targetNodePort); @@ -276,6 +285,20 @@ citus_move_shard_placement(PG_FUNCTION_ARGS) /* we sort colocatedShardList so that lock operations will not cause any deadlocks */ colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById); + + /* + * If there are no active placements on the source and only active placements on + * the target node, we assume the copy to already be done. + */ + if (IsShardListOnNode(colocatedShardList, targetNodeName, targetNodePort) && + !IsShardListOnNode(colocatedShardList, sourceNodeName, sourceNodePort)) + { + ereport(WARNING, (errmsg("shard is already present on node %s:%d", + targetNodeName, targetNodePort), + errdetail("Move may have already completed."))); + PG_RETURN_VOID(); + } + foreach(colocatedShardCell, colocatedShardList) { ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell); @@ -390,6 +413,39 @@ citus_move_shard_placement(PG_FUNCTION_ARGS) } +/* + * IsShardListOnNode determines whether a co-located shard list has + * active placements on a given node. + */ +static bool +IsShardListOnNode(List *colocatedShardList, char *targetNodeName, uint32 targetNodePort) +{ + WorkerNode *workerNode = FindWorkerNode(targetNodeName, targetNodePort); + if (workerNode == NULL) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("Moving shards to a non-existing node is not supported"))); + } + + /* + * We exhaustively search all co-located shards + */ + ShardInterval *shardInterval = NULL; + foreach_ptr(shardInterval, colocatedShardList) + { + uint64 shardId = shardInterval->shardId; + List *placementList = ActiveShardPlacementListOnGroup(shardId, + workerNode->groupId); + if (placementList == NIL) + { + return false; + } + } + + return true; +} + + /* * EnsureEnoughDiskSpaceForShardMove checks that there is enough space for * shard moves of the given colocated shard list from source node to target node. @@ -552,6 +608,25 @@ ErrorIfTargetNodeIsNotSafeToMove(const char *targetNodeName, int targetNodePort) } +/* + * ErrorIfSameNode throws an error if the two host:port combinations + * are the same. + */ +static void +ErrorIfSameNode(char *sourceNodeName, int sourceNodePort, + char *targetNodeName, int targetNodePort, + const char *operationName) +{ + if (strncmp(sourceNodeName, targetNodeName, MAX_NODE_LENGTH) == 0 && + sourceNodePort == targetNodePort) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot %s shard to the same node", + operationName))); + } +} + + /* * master_move_shard_placement is a wrapper around citus_move_shard_placement. */ @@ -886,6 +961,10 @@ ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName, ShardInterval *shardInterval = LoadShardInterval(shardId); Oid distributedTableId = shardInterval->relationId; + ErrorIfSameNode(sourceNodeName, sourceNodePort, + targetNodeName, targetNodePort, + "copy"); + ErrorIfTableCannotBeReplicated(shardInterval->relationId); ErrorIfTargetNodeIsNotSafeToCopyTo(targetNodeName, targetNodePort); EnsureNoModificationsHaveBeenDone(); @@ -904,6 +983,19 @@ ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName, */ colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById); + /* + * If there are active placements on both nodes, we assume the copy to already + * be done. + */ + if (IsShardListOnNode(colocatedShardList, targetNodeName, targetNodePort) && + IsShardListOnNode(colocatedShardList, sourceNodeName, sourceNodePort)) + { + ereport(WARNING, (errmsg("shard is already present on node %s:%d", + targetNodeName, targetNodePort), + errdetail("Copy may have already completed."))); + return; + } + /* * At this point of the shard replication, we don't need to block the writes to * shards when logical replication is used. diff --git a/src/test/regress/expected/citus_copy_shard_placement.out b/src/test/regress/expected/citus_copy_shard_placement.out index 61b935276..d82b36383 100644 --- a/src/test/regress/expected/citus_copy_shard_placement.out +++ b/src/test/regress/expected/citus_copy_shard_placement.out @@ -56,14 +56,20 @@ SELECT citus_copy_shard_placement( 'localhost', :worker_2_port, 'localhost', :worker_2_port, transfer_mode := 'block_writes'); -ERROR: shard xxxxx already exists in the target node --- verify we error out if target already contains a healthy placement +ERROR: cannot copy shard to the same node +-- verify we warn if target already contains a healthy placement SELECT citus_copy_shard_placement( (SELECT shardid FROM pg_dist_shard WHERE logicalrelid='ref_table'::regclass::oid), 'localhost', :worker_1_port, 'localhost', :worker_2_port, transfer_mode := 'block_writes'); -ERROR: shard xxxxx already exists in the target node +WARNING: shard is already present on node localhost:xxxxx +DETAIL: Copy may have already completed. + citus_copy_shard_placement +--------------------------------------------------------------------- + +(1 row) + -- verify we error out if table has foreign key constraints INSERT INTO ref_table SELECT 1, value FROM data; ALTER TABLE data ADD CONSTRAINT distfk FOREIGN KEY (value) REFERENCES ref_table (b) MATCH FULL; diff --git a/src/test/regress/expected/multi_colocated_shard_rebalance.out b/src/test/regress/expected/multi_colocated_shard_rebalance.out index bb3e68bd4..da3bae484 100644 --- a/src/test/regress/expected/multi_colocated_shard_rebalance.out +++ b/src/test/regress/expected/multi_colocated_shard_rebalance.out @@ -143,9 +143,15 @@ SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.table2_ (1 row) \c - - - :master_port --- copy colocated shards again to see error message +-- copy colocated shards again to see warning SELECT citus_copy_shard_placement(13000000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical'); -ERROR: shard xxxxx already exists in the target node +WARNING: shard is already present on node localhost:xxxxx +DETAIL: Copy may have already completed. + citus_copy_shard_placement +--------------------------------------------------------------------- + +(1 row) + -- test copying NOT colocated shard -- status before shard copy SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport @@ -300,6 +306,15 @@ ORDER BY s.shardid, sp.nodeport; 13000011 | table2_group1 | 57638 (14 rows) +-- moving the shard again is idempotent +SELECT citus_move_shard_placement(13000001, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'force_logical'); +WARNING: shard is already present on node localhost:xxxxx +DETAIL: Move may have already completed. + citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + -- also connect worker to verify we successfully moved given shard (and other colocated shards) \c - - - :worker_1_port SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.table1_group1_13000001'::regclass; @@ -412,8 +427,9 @@ ORDER BY s.shardid, sp.nodeport; (3 rows) -- try to move shard from wrong node -SELECT master_move_shard_placement(13000021, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical'); -ERROR: source placement must be in active state +SELECT master_move_shard_placement(13000021, 'localhost', :master_port, 'localhost', :worker_1_port, 'force_logical'); +ERROR: could not find placement matching "localhost:xxxxx" +HINT: Confirm the placement still exists and try again. -- test shard move with foreign constraints DROP TABLE IF EXISTS table1_group1, table2_group1; SET citus.shard_count TO 6; diff --git a/src/test/regress/expected/multi_move_mx.out b/src/test/regress/expected/multi_move_mx.out index eaf2273de..833c9f7df 100644 --- a/src/test/regress/expected/multi_move_mx.out +++ b/src/test/regress/expected/multi_move_mx.out @@ -138,20 +138,6 @@ SELECT pg_reload_conf(); t (1 row) -\c - - - :master_port -BEGIN; -SELECT - master_move_shard_placement(shardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical') -FROM - pg_dist_shard NATURAL JOIN pg_dist_shard_placement -WHERE - logicalrelid = 'mx_table_1'::regclass - AND nodeport = :worker_1_port -ORDER BY - shardid -LIMIT 1; -ERROR: source placement must be in active state -ROLLBACK; \c - - - :worker_2_port -- before reseting citus.node_conninfo, check that CREATE SUBSCRIPTION -- with citus_use_authinfo takes into account node_conninfo even when diff --git a/src/test/regress/sql/citus_copy_shard_placement.sql b/src/test/regress/sql/citus_copy_shard_placement.sql index 7861434d3..0e6a42e79 100644 --- a/src/test/regress/sql/citus_copy_shard_placement.sql +++ b/src/test/regress/sql/citus_copy_shard_placement.sql @@ -48,7 +48,7 @@ SELECT citus_copy_shard_placement( 'localhost', :worker_2_port, transfer_mode := 'block_writes'); --- verify we error out if target already contains a healthy placement +-- verify we warn if target already contains a healthy placement SELECT citus_copy_shard_placement( (SELECT shardid FROM pg_dist_shard WHERE logicalrelid='ref_table'::regclass::oid), 'localhost', :worker_1_port, diff --git a/src/test/regress/sql/multi_colocated_shard_rebalance.sql b/src/test/regress/sql/multi_colocated_shard_rebalance.sql index a8ad39c06..2afbd0942 100644 --- a/src/test/regress/sql/multi_colocated_shard_rebalance.sql +++ b/src/test/regress/sql/multi_colocated_shard_rebalance.sql @@ -78,7 +78,7 @@ SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.table2_ \c - - - :master_port --- copy colocated shards again to see error message +-- copy colocated shards again to see warning SELECT citus_copy_shard_placement(13000000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical'); @@ -159,6 +159,9 @@ WHERE AND sp.shardstate != 4 ORDER BY s.shardid, sp.nodeport; +-- moving the shard again is idempotent +SELECT citus_move_shard_placement(13000001, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'force_logical'); + -- also connect worker to verify we successfully moved given shard (and other colocated shards) \c - - - :worker_1_port SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.table1_group1_13000001'::regclass; @@ -222,7 +225,7 @@ ORDER BY s.shardid, sp.nodeport; -- try to move shard from wrong node -SELECT master_move_shard_placement(13000021, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical'); +SELECT master_move_shard_placement(13000021, 'localhost', :master_port, 'localhost', :worker_1_port, 'force_logical'); -- test shard move with foreign constraints diff --git a/src/test/regress/sql/multi_move_mx.sql b/src/test/regress/sql/multi_move_mx.sql index d02c1f417..166069a6e 100644 --- a/src/test/regress/sql/multi_move_mx.sql +++ b/src/test/regress/sql/multi_move_mx.sql @@ -86,23 +86,7 @@ LIMIT 1; ALTER SYSTEM SET citus.node_conninfo TO 'sslrootcert=/non/existing/certificate.crt sslmode=verify-full'; SELECT pg_reload_conf(); -\c - - - :master_port - -BEGIN; -SELECT - master_move_shard_placement(shardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical') -FROM - pg_dist_shard NATURAL JOIN pg_dist_shard_placement -WHERE - logicalrelid = 'mx_table_1'::regclass - AND nodeport = :worker_1_port -ORDER BY - shardid -LIMIT 1; -ROLLBACK; - \c - - - :worker_2_port - -- before reseting citus.node_conninfo, check that CREATE SUBSCRIPTION -- with citus_use_authinfo takes into account node_conninfo even when -- one of host, port, or user parameters are not specified.