diff --git a/src/backend/distributed/operations/shard_transfer.c b/src/backend/distributed/operations/shard_transfer.c index 4c20c0433..f8912bdb2 100644 --- a/src/backend/distributed/operations/shard_transfer.c +++ b/src/backend/distributed/operations/shard_transfer.c @@ -84,12 +84,14 @@ static void ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName char shardReplicationMode); static void CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, - int32 targetNodePort, bool useLogicalReplication); + int32 targetNodePort, bool useLogicalReplication, + char *operationName); static void CopyShardTablesViaLogicalReplication(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, - int32 targetNodePort); + int32 targetNodePort, + char *operationName); static void CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, @@ -381,7 +383,7 @@ citus_move_shard_placement(PG_FUNCTION_ARGS) * shards. */ CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort, targetNodeName, - targetNodePort, useLogicalReplication); + targetNodePort, useLogicalReplication, "citus_move_shard_placement"); ShardInterval *colocatedShard = NULL; foreach_ptr(colocatedShard, colocatedShardList) @@ -1059,7 +1061,8 @@ ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName, } CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort, - targetNodeName, targetNodePort, useLogicalReplication); + targetNodeName, targetNodePort, useLogicalReplication, + "citus_copy_shard_placement"); /* * Finally insert the placements to pg_dist_placement and sync it to the @@ -1150,7 +1153,8 @@ EnsureTableListSuitableForReplication(List *tableIdList) */ static void CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, - char *targetNodeName, int32 targetNodePort, bool useLogicalReplication) + char *targetNodeName, int32 targetNodePort, bool useLogicalReplication, + char *operationName) { if (list_length(shardIntervalList) < 1) { @@ -1159,17 +1163,25 @@ CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodeP DropOrphanedResourcesInSeparateTransaction(); + /* Start operation to prepare for generating cleanup records */ + RegisterOperationNeedingCleanup(); + if (useLogicalReplication) { CopyShardTablesViaLogicalReplication(shardIntervalList, sourceNodeName, sourceNodePort, targetNodeName, - targetNodePort); + targetNodePort, operationName); } else { CopyShardTablesViaBlockWrites(shardIntervalList, sourceNodeName, sourceNodePort, targetNodeName, targetNodePort); } + + /* + * Drop temporary objects that were marked as CLEANUP_ALWAYS. + */ + FinalizeOperationNeedingCleanupOnSuccess(operationName); } @@ -1180,7 +1192,7 @@ CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodeP static void CopyShardTablesViaLogicalReplication(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, - int32 targetNodePort) + int32 targetNodePort, char *operationName) { MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, "CopyShardTablesViaLogicalReplication", @@ -1202,6 +1214,12 @@ CopyShardTablesViaLogicalReplication(List *shardIntervalList, char *sourceNodeNa char *tableOwner = TableOwner(shardInterval->relationId); + /* drop the shard we created on the target, in case of failure */ + InsertCleanupRecordInSubtransaction(CLEANUP_OBJECT_SHARD_PLACEMENT, + ConstructQualifiedShardName(shardInterval), + GroupForNode(targetNodeName, targetNodePort), + CLEANUP_ON_FAILURE); + SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, tableOwner, tableRecreationCommandList); @@ -1211,17 +1229,10 @@ CopyShardTablesViaLogicalReplication(List *shardIntervalList, char *sourceNodeNa MemoryContextSwitchTo(oldContext); - /* Start operation to prepare for generating cleanup records */ - RegisterOperationNeedingCleanup(); - /* data copy is done seperately when logical replication is used */ LogicallyReplicateShards(shardIntervalList, sourceNodeName, - sourceNodePort, targetNodeName, targetNodePort); - - /* - * Drop temporary objects that were marked as CLEANUP_ALWAYS. - */ - FinalizeOperationNeedingCleanupOnSuccess("citus_[move/copy]_shard_placement"); + sourceNodePort, targetNodeName, targetNodePort, + operationName); } @@ -1276,6 +1287,13 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, List *ddlCommandList = RecreateShardDDLCommandList(shardInterval, sourceNodeName, sourceNodePort); char *tableOwner = TableOwner(shardInterval->relationId); + + /* drop the shard we created on the target, in case of failure */ + InsertCleanupRecordInSubtransaction(CLEANUP_OBJECT_SHARD_PLACEMENT, + ConstructQualifiedShardName(shardInterval), + GroupForNode(targetNodeName, targetNodePort), + CLEANUP_ON_FAILURE); + SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, tableOwner, ddlCommandList); } diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index a301367e1..0947a29fc 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -152,7 +152,7 @@ static void WaitForGroupedLogicalRepTargetsToCatchUp(XLogRecPtr sourcePosition, */ void LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePort, - char *targetNodeName, int targetNodePort) + char *targetNodeName, int targetNodePort, char *operationName) { AcquireLogicalReplicationLock(); char *superUser = CitusExtensionOwnerName(); @@ -277,7 +277,7 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo * Drop temporary objects that were marked as CLEANUP_ON_FAILURE * or CLEANUP_ALWAYS. */ - FinalizeOperationNeedingCleanupOnFailure("citus_[move/copy]_shard_placement"); + FinalizeOperationNeedingCleanupOnFailure(operationName); PG_RE_THROW(); } diff --git a/src/include/distributed/multi_logical_replication.h b/src/include/distributed/multi_logical_replication.h index f5a9dc342..1994224d7 100644 --- a/src/include/distributed/multi_logical_replication.h +++ b/src/include/distributed/multi_logical_replication.h @@ -130,7 +130,7 @@ typedef enum LogicalRepType extern void LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePort, char *targetNodeName, - int targetNodePort); + int targetNodePort, char *operationName); extern void ConflictWithIsolationTestingBeforeCopy(void); extern void ConflictWithIsolationTestingAfterCopy(void); diff --git a/src/test/regress/expected/failure_online_move_shard_placement.out b/src/test/regress/expected/failure_online_move_shard_placement.out index 43cab7ae3..2242fa43f 100644 --- a/src/test/regress/expected/failure_online_move_shard_placement.out +++ b/src/test/regress/expected/failure_online_move_shard_placement.out @@ -397,7 +397,23 @@ SELECT citus.mitmproxy('conn.onQuery(query="t_pkey").cancel(' || :pid || ')'); (1 row) SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); +WARNING: canceling statement due to lock timeout +CONTEXT: while executing command on localhost:xxxxx +WARNING: failed to clean up 1 orphaned shards out of 5 after a citus_move_shard_placement operation failed ERROR: canceling statement due to user request +-- cleanup leftovers +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + -- failure on create index SELECT citus.mitmproxy('conn.matches(b"CREATE INDEX").killall()'); mitmproxy @@ -408,6 +424,7 @@ SELECT citus.mitmproxy('conn.matches(b"CREATE INDEX").killall()'); SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open +WARNING: failed to clean up 1 orphaned shards out of 5 after a citus_move_shard_placement operation failed ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open -- cleanup leftovers SELECT citus.mitmproxy('conn.allow()'); @@ -445,6 +462,7 @@ SELECT citus.mitmproxy('conn.matches(b"CREATE INDEX").killall()'); SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open +WARNING: failed to clean up 1 orphaned shards out of 5 after a citus_move_shard_placement operation failed ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open -- failure on parallel create index ALTER SYSTEM RESET citus.max_adaptive_executor_pool_size; @@ -463,6 +481,7 @@ SELECT citus.mitmproxy('conn.matches(b"CREATE INDEX").killall()'); SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open +WARNING: failed to clean up 1 orphaned shards out of 5 after a citus_move_shard_placement operation failed ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open -- Verify that the shard is not moved and the number of rows are still 100k SELECT citus.mitmproxy('conn.allow()'); diff --git a/src/test/regress/expected/shard_move_deferred_delete.out b/src/test/regress/expected/shard_move_deferred_delete.out index b0f2e32c0..0f025e3c1 100644 --- a/src/test/regress/expected/shard_move_deferred_delete.out +++ b/src/test/regress/expected/shard_move_deferred_delete.out @@ -141,6 +141,50 @@ SELECT master_move_shard_placement(20000000, 'localhost', :worker_2_port, 'local (1 row) +-- when a move tx is aborted, there remains a shard on the target node +BEGIN; +SELECT master_move_shard_placement(20000000, 'localhost', :worker_1_port, 'localhost', :worker_2_port); + master_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +ROLLBACK; +-- see the cleanup record for the shard on the target node +-- https://github.com/citusdata/citus/issues/6580 +select object_name, object_type from pg_dist_cleanup; + object_name | object_type +--------------------------------------------------------------------- + shard_move_deferred_delete.t1_20000000 | 1 +(1 row) + +-- see the shard on both workers +SELECT run_command_on_workers($cmd$ + SELECT count(*) FROM pg_class WHERE relname = 't1_20000000'; +$cmd$); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,1) + (localhost,57638,t,1) +(2 rows) + +-- clean it up +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + +-- verify the shard is cleaned up +SELECT run_command_on_workers($cmd$ + SELECT count(*) FROM pg_class WHERE relname = 't1_20000000'; +$cmd$); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,1) + (localhost,57638,t,0) +(2 rows) + -- override the function for testing purpose -- since it is extension owned function, propagate it to workers manually create or replace function pg_catalog.citus_local_disk_space_stats(OUT available_disk_size bigint, OUT total_disk_size bigint) @@ -178,7 +222,7 @@ SELECT citus_shard_cost_by_disk_size(20000001); -- When there's not enough space the move should fail SELECT master_move_shard_placement(20000001, 'localhost', :worker_2_port, 'localhost', :worker_1_port); -ERROR: not enough empty space on node if the shard is moved, actual available space after move will be 0 bytes, desired available space after move is 850 bytes,estimated size increase on node after move is 8192 bytes. +ERROR: not enough empty space on node if the shard is moved, actual available space after move will be 0 bytes, desired available space after move is 850 bytes, estimated size increase on node after move is 8192 bytes. HINT: consider lowering citus.desired_percent_disk_available_after_move. BEGIN; -- when we disable the setting, the move should not give "not enough space" error @@ -230,7 +274,7 @@ $BODY$ language plpgsql; SET search_path TO shard_move_deferred_delete; -- When there would not be enough free space left after the move, the move should fail SELECT master_move_shard_placement(20000001, 'localhost', :worker_2_port, 'localhost', :worker_1_port); -ERROR: not enough empty space on node if the shard is moved, actual available space after move will be 108 bytes, desired available space after move is 850 bytes,estimated size increase on node after move is 8192 bytes. +ERROR: not enough empty space on node if the shard is moved, actual available space after move will be 108 bytes, desired available space after move is 850 bytes, estimated size increase on node after move is 8192 bytes. HINT: consider lowering citus.desired_percent_disk_available_after_move. -- Restore the original function on workers \c - - - :worker_1_port diff --git a/src/test/regress/sql/failure_online_move_shard_placement.sql b/src/test/regress/sql/failure_online_move_shard_placement.sql index e5754b1c4..5007c194f 100644 --- a/src/test/regress/sql/failure_online_move_shard_placement.sql +++ b/src/test/regress/sql/failure_online_move_shard_placement.sql @@ -138,7 +138,7 @@ SELECT citus.mitmproxy('conn.allow()'); -- we expect the drop query will succeed on only one node SELECT COUNT(*) FROM run_command_on_workers( - $$DROP SUBSCRIPTION citus_shard_move_subscription_10_15$$) + $$DROP SUBSCRIPTION citus_shard_move_subscription_10_19$$) WHERE success AND result = 'DROP SUBSCRIPTION'; -- reset back @@ -161,6 +161,10 @@ SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost' SELECT citus.mitmproxy('conn.onQuery(query="t_pkey").cancel(' || :pid || ')'); SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); +-- cleanup leftovers +SELECT citus.mitmproxy('conn.allow()'); +SELECT public.wait_for_resource_cleanup(); + -- failure on create index SELECT citus.mitmproxy('conn.matches(b"CREATE INDEX").killall()'); SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); diff --git a/src/test/regress/sql/shard_move_deferred_delete.sql b/src/test/regress/sql/shard_move_deferred_delete.sql index aa1578f10..73a403ddc 100644 --- a/src/test/regress/sql/shard_move_deferred_delete.sql +++ b/src/test/regress/sql/shard_move_deferred_delete.sql @@ -78,6 +78,27 @@ $cmd$); -- needed. SELECT master_move_shard_placement(20000000, 'localhost', :worker_2_port, 'localhost', :worker_1_port); +-- when a move tx is aborted, there remains a shard on the target node +BEGIN; +SELECT master_move_shard_placement(20000000, 'localhost', :worker_1_port, 'localhost', :worker_2_port); +ROLLBACK; + +-- see the cleanup record for the shard on the target node +-- https://github.com/citusdata/citus/issues/6580 +select object_name, object_type from pg_dist_cleanup; + +-- see the shard on both workers +SELECT run_command_on_workers($cmd$ + SELECT count(*) FROM pg_class WHERE relname = 't1_20000000'; +$cmd$); + +-- clean it up +SELECT public.wait_for_resource_cleanup(); + +-- verify the shard is cleaned up +SELECT run_command_on_workers($cmd$ + SELECT count(*) FROM pg_class WHERE relname = 't1_20000000'; +$cmd$); -- override the function for testing purpose -- since it is extension owned function, propagate it to workers manually