Fix leftover shard copy on the target node when tx with move is aborted (#6583)

DESCRIPTION: Cleanup the shard on the target node in case of a
failed/aborted shard move

Inserts a cleanup record for the moved shard placement on the target
node. If the move operation succeeds, the record will be deleted. If
not, it will remain there to be cleaned up later.

fixes: #6580
pull/6585/head
Ahmet Gedemenli 2022-12-27 22:42:46 +03:00 committed by GitHub
parent e937935935
commit eba9abeee2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 128 additions and 22 deletions

View File

@ -84,12 +84,14 @@ static void ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName
char shardReplicationMode);
static void CopyShardTables(List *shardIntervalList, char *sourceNodeName,
int32 sourceNodePort, char *targetNodeName,
int32 targetNodePort, bool useLogicalReplication);
int32 targetNodePort, bool useLogicalReplication,
char *operationName);
static void CopyShardTablesViaLogicalReplication(List *shardIntervalList,
char *sourceNodeName,
int32 sourceNodePort,
char *targetNodeName,
int32 targetNodePort);
int32 targetNodePort,
char *operationName);
static void CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName,
int32 sourceNodePort,
@ -381,7 +383,7 @@ citus_move_shard_placement(PG_FUNCTION_ARGS)
* shards.
*/
CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort, targetNodeName,
targetNodePort, useLogicalReplication);
targetNodePort, useLogicalReplication, "citus_move_shard_placement");
ShardInterval *colocatedShard = NULL;
foreach_ptr(colocatedShard, colocatedShardList)
@ -1059,7 +1061,8 @@ ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName,
}
CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort,
targetNodeName, targetNodePort, useLogicalReplication);
targetNodeName, targetNodePort, useLogicalReplication,
"citus_copy_shard_placement");
/*
* Finally insert the placements to pg_dist_placement and sync it to the
@ -1150,7 +1153,8 @@ EnsureTableListSuitableForReplication(List *tableIdList)
*/
static void
CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort,
char *targetNodeName, int32 targetNodePort, bool useLogicalReplication)
char *targetNodeName, int32 targetNodePort, bool useLogicalReplication,
char *operationName)
{
if (list_length(shardIntervalList) < 1)
{
@ -1159,17 +1163,25 @@ CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodeP
DropOrphanedResourcesInSeparateTransaction();
/* Start operation to prepare for generating cleanup records */
RegisterOperationNeedingCleanup();
if (useLogicalReplication)
{
CopyShardTablesViaLogicalReplication(shardIntervalList, sourceNodeName,
sourceNodePort, targetNodeName,
targetNodePort);
targetNodePort, operationName);
}
else
{
CopyShardTablesViaBlockWrites(shardIntervalList, sourceNodeName, sourceNodePort,
targetNodeName, targetNodePort);
}
/*
* Drop temporary objects that were marked as CLEANUP_ALWAYS.
*/
FinalizeOperationNeedingCleanupOnSuccess(operationName);
}
@ -1180,7 +1192,7 @@ CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodeP
static void
CopyShardTablesViaLogicalReplication(List *shardIntervalList, char *sourceNodeName,
int32 sourceNodePort, char *targetNodeName,
int32 targetNodePort)
int32 targetNodePort, char *operationName)
{
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
"CopyShardTablesViaLogicalReplication",
@ -1202,6 +1214,12 @@ CopyShardTablesViaLogicalReplication(List *shardIntervalList, char *sourceNodeNa
char *tableOwner = TableOwner(shardInterval->relationId);
/* drop the shard we created on the target, in case of failure */
InsertCleanupRecordInSubtransaction(CLEANUP_OBJECT_SHARD_PLACEMENT,
ConstructQualifiedShardName(shardInterval),
GroupForNode(targetNodeName, targetNodePort),
CLEANUP_ON_FAILURE);
SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
tableOwner,
tableRecreationCommandList);
@ -1211,17 +1229,10 @@ CopyShardTablesViaLogicalReplication(List *shardIntervalList, char *sourceNodeNa
MemoryContextSwitchTo(oldContext);
/* Start operation to prepare for generating cleanup records */
RegisterOperationNeedingCleanup();
/* data copy is done seperately when logical replication is used */
LogicallyReplicateShards(shardIntervalList, sourceNodeName,
sourceNodePort, targetNodeName, targetNodePort);
/*
* Drop temporary objects that were marked as CLEANUP_ALWAYS.
*/
FinalizeOperationNeedingCleanupOnSuccess("citus_[move/copy]_shard_placement");
sourceNodePort, targetNodeName, targetNodePort,
operationName);
}
@ -1276,6 +1287,13 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName,
List *ddlCommandList = RecreateShardDDLCommandList(shardInterval, sourceNodeName,
sourceNodePort);
char *tableOwner = TableOwner(shardInterval->relationId);
/* drop the shard we created on the target, in case of failure */
InsertCleanupRecordInSubtransaction(CLEANUP_OBJECT_SHARD_PLACEMENT,
ConstructQualifiedShardName(shardInterval),
GroupForNode(targetNodeName, targetNodePort),
CLEANUP_ON_FAILURE);
SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
tableOwner, ddlCommandList);
}

View File

@ -152,7 +152,7 @@ static void WaitForGroupedLogicalRepTargetsToCatchUp(XLogRecPtr sourcePosition,
*/
void
LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePort,
char *targetNodeName, int targetNodePort)
char *targetNodeName, int targetNodePort, char *operationName)
{
AcquireLogicalReplicationLock();
char *superUser = CitusExtensionOwnerName();
@ -277,7 +277,7 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo
* Drop temporary objects that were marked as CLEANUP_ON_FAILURE
* or CLEANUP_ALWAYS.
*/
FinalizeOperationNeedingCleanupOnFailure("citus_[move/copy]_shard_placement");
FinalizeOperationNeedingCleanupOnFailure(operationName);
PG_RE_THROW();
}

View File

@ -130,7 +130,7 @@ typedef enum LogicalRepType
extern void LogicallyReplicateShards(List *shardList, char *sourceNodeName,
int sourceNodePort, char *targetNodeName,
int targetNodePort);
int targetNodePort, char *operationName);
extern void ConflictWithIsolationTestingBeforeCopy(void);
extern void ConflictWithIsolationTestingAfterCopy(void);

View File

@ -397,7 +397,23 @@ SELECT citus.mitmproxy('conn.onQuery(query="t_pkey").cancel(' || :pid || ')');
(1 row)
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
WARNING: canceling statement due to lock timeout
CONTEXT: while executing command on localhost:xxxxx
WARNING: failed to clean up 1 orphaned shards out of 5 after a citus_move_shard_placement operation failed
ERROR: canceling statement due to user request
-- cleanup leftovers
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT public.wait_for_resource_cleanup();
wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
-- failure on create index
SELECT citus.mitmproxy('conn.matches(b"CREATE INDEX").killall()');
mitmproxy
@ -408,6 +424,7 @@ SELECT citus.mitmproxy('conn.matches(b"CREATE INDEX").killall()');
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: failed to clean up 1 orphaned shards out of 5 after a citus_move_shard_placement operation failed
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- cleanup leftovers
SELECT citus.mitmproxy('conn.allow()');
@ -445,6 +462,7 @@ SELECT citus.mitmproxy('conn.matches(b"CREATE INDEX").killall()');
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: failed to clean up 1 orphaned shards out of 5 after a citus_move_shard_placement operation failed
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- failure on parallel create index
ALTER SYSTEM RESET citus.max_adaptive_executor_pool_size;
@ -463,6 +481,7 @@ SELECT citus.mitmproxy('conn.matches(b"CREATE INDEX").killall()');
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: failed to clean up 1 orphaned shards out of 5 after a citus_move_shard_placement operation failed
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Verify that the shard is not moved and the number of rows are still 100k
SELECT citus.mitmproxy('conn.allow()');

View File

@ -141,6 +141,50 @@ SELECT master_move_shard_placement(20000000, 'localhost', :worker_2_port, 'local
(1 row)
-- when a move tx is aborted, there remains a shard on the target node
BEGIN;
SELECT master_move_shard_placement(20000000, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
master_move_shard_placement
---------------------------------------------------------------------
(1 row)
ROLLBACK;
-- see the cleanup record for the shard on the target node
-- https://github.com/citusdata/citus/issues/6580
select object_name, object_type from pg_dist_cleanup;
object_name | object_type
---------------------------------------------------------------------
shard_move_deferred_delete.t1_20000000 | 1
(1 row)
-- see the shard on both workers
SELECT run_command_on_workers($cmd$
SELECT count(*) FROM pg_class WHERE relname = 't1_20000000';
$cmd$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,1)
(localhost,57638,t,1)
(2 rows)
-- clean it up
SELECT public.wait_for_resource_cleanup();
wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
-- verify the shard is cleaned up
SELECT run_command_on_workers($cmd$
SELECT count(*) FROM pg_class WHERE relname = 't1_20000000';
$cmd$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,1)
(localhost,57638,t,0)
(2 rows)
-- override the function for testing purpose
-- since it is extension owned function, propagate it to workers manually
create or replace function pg_catalog.citus_local_disk_space_stats(OUT available_disk_size bigint, OUT total_disk_size bigint)

View File

@ -138,7 +138,7 @@ SELECT citus.mitmproxy('conn.allow()');
-- we expect the drop query will succeed on only one node
SELECT COUNT(*)
FROM run_command_on_workers(
$$DROP SUBSCRIPTION citus_shard_move_subscription_10_15$$)
$$DROP SUBSCRIPTION citus_shard_move_subscription_10_19$$)
WHERE success AND result = 'DROP SUBSCRIPTION';
-- reset back
@ -161,6 +161,10 @@ SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost'
SELECT citus.mitmproxy('conn.onQuery(query="t_pkey").cancel(' || :pid || ')');
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
-- cleanup leftovers
SELECT citus.mitmproxy('conn.allow()');
SELECT public.wait_for_resource_cleanup();
-- failure on create index
SELECT citus.mitmproxy('conn.matches(b"CREATE INDEX").killall()');
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);

View File

@ -78,6 +78,27 @@ $cmd$);
-- needed.
SELECT master_move_shard_placement(20000000, 'localhost', :worker_2_port, 'localhost', :worker_1_port);
-- when a move tx is aborted, there remains a shard on the target node
BEGIN;
SELECT master_move_shard_placement(20000000, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
ROLLBACK;
-- see the cleanup record for the shard on the target node
-- https://github.com/citusdata/citus/issues/6580
select object_name, object_type from pg_dist_cleanup;
-- see the shard on both workers
SELECT run_command_on_workers($cmd$
SELECT count(*) FROM pg_class WHERE relname = 't1_20000000';
$cmd$);
-- clean it up
SELECT public.wait_for_resource_cleanup();
-- verify the shard is cleaned up
SELECT run_command_on_workers($cmd$
SELECT count(*) FROM pg_class WHERE relname = 't1_20000000';
$cmd$);
-- override the function for testing purpose
-- since it is extension owned function, propagate it to workers manually