From 2f29d4e53e3b2b35509bffd86195bb53a76bd09a Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Tue, 21 Jan 2020 15:42:46 +0100 Subject: [PATCH] Continue to remove shards after first failure in DropMarkedShards The comment of DropMarkedShards described the behaviour that after a failure we would continue trying to drop other shards. However the code did not do this and would stop after the first failure. Instead of simply fixing the comment I fixed the code, because the described behaviour is more useful. Now a single shard that cannot be removed yet does not block others from being removed. --- .../distributed/metadata/metadata_sync.c | 44 --------- .../distributed/operations/shard_cleaner.c | 99 +++++++++++++------ .../transaction/worker_transaction.c | 47 +++++++++ src/include/distributed/metadata_sync.h | 4 - src/include/distributed/worker_transaction.h | 4 + .../isolation_rebalancer_deferred_drop.out | 45 +++++++++ .../isolation_rebalancer_deferred_drop.spec | 36 ++++++- 7 files changed, 199 insertions(+), 80 deletions(-) diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 8aa8618e3..686a206b4 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -319,50 +319,6 @@ SyncMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError) } -/* - * SendOptionalCommandListToWorkerInTransaction sends the given command list to - * the given worker in a single transaction. If any of the commands fail, it - * rollbacks the transaction, and otherwise commits. - */ -bool -SendOptionalCommandListToWorkerInTransaction(const char *nodeName, int32 nodePort, - const char *nodeUser, List *commandList) -{ - int connectionFlags = FORCE_NEW_CONNECTION; - bool failed = false; - - MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags, - nodeName, nodePort, - nodeUser, NULL); - - RemoteTransactionBegin(workerConnection); - - /* iterate over the commands and execute them in the same connection */ - const char *commandString = NULL; - foreach_ptr(commandString, commandList) - { - if (ExecuteOptionalRemoteCommand(workerConnection, commandString, NULL) != 0) - { - failed = true; - break; - } - } - - if (failed) - { - RemoteTransactionAbort(workerConnection); - } - else - { - RemoteTransactionCommit(workerConnection); - } - - CloseConnection(workerConnection); - - return !failed; -} - - /* * MetadataCreateCommands returns list of queries that are * required to create the current metadata snapshot of the node that the diff --git a/src/backend/distributed/operations/shard_cleaner.c b/src/backend/distributed/operations/shard_cleaner.c index 59e068459..edfc19625 100644 --- a/src/backend/distributed/operations/shard_cleaner.c +++ b/src/backend/distributed/operations/shard_cleaner.c @@ -25,7 +25,7 @@ PG_FUNCTION_INFO_V1(master_defer_delete_shards); static int DropMarkedShards(bool waitForCleanupLock); - +static bool TryDropShard(GroupShardPlacement *placement); /* * master_defer_delete_shards implements a user-facing UDF to deleter orphaned shards that @@ -85,11 +85,11 @@ TryDropMarkedShards(bool waitForCleanupLock) /* * DropMarkedShards removes shards that were marked SHARD_STATE_TO_DELETE before. * - * It does so by taking an exclusive lock on the shard and its colocated - * placements before removing. If the lock cannot be obtained it skips the - * group and continues with others. The group that has been skipped will be - * removed at a later time when there are no locks held anymore on those - * placements. + * It does so by trying to take an exclusive lock on the shard and its + * colocated placements before removing. If the lock cannot be obtained it + * skips the group and continues with others. The group that has been skipped + * will be removed at a later time when there are no locks held anymore on + * those placements. * * Before doing any of this it will take an exclusive PlacementCleanup lock. * This is to ensure that this function is not being run concurrently. @@ -118,6 +118,7 @@ DropMarkedShards(bool waitForCleanupLock) return 0; } + int failedShardDropCount = 0; List *shardPlacementList = AllShardPlacementsWithShardPlacementState( SHARD_STATE_TO_DELETE); foreach(shardPlacementCell, shardPlacementList) @@ -131,32 +132,70 @@ DropMarkedShards(bool waitForCleanupLock) continue; } - ShardPlacement *shardPlacement = LoadShardPlacement(placement->shardId, - placement->placementId); - ShardInterval *shardInterval = LoadShardInterval(shardPlacement->shardId); + if (TryDropShard(placement)) + { + removedShardCount++; + } + else + { + failedShardDropCount++; + } + } - ereport(LOG, (errmsg("dropping shard placement " INT64_FORMAT " of shard " - INT64_FORMAT " on %s:%d after it was moved away", - shardPlacement->placementId, shardPlacement->shardId, - shardPlacement->nodeName, shardPlacement->nodePort))); - - /* prepare sql query to execute to drop the shard */ - StringInfo dropQuery = makeStringInfo(); - char *qualifiedTableName = ConstructQualifiedShardName(shardInterval); - appendStringInfo(dropQuery, DROP_REGULAR_TABLE_COMMAND, qualifiedTableName); - - List *dropCommandList = list_make2("SET LOCAL lock_timeout TO '1s'", - dropQuery->data); - - /* remove the shard from the node and the placement information */ - SendCommandListToWorkerInSingleTransaction(shardPlacement->nodeName, - shardPlacement->nodePort, - NULL, dropCommandList); - - DeleteShardPlacementRow(placement->placementId); - - removedShardCount++; + if (failedShardDropCount > 0) + { + ereport(WARNING, (errmsg("Failed to drop %d old shards out of %d", + failedShardDropCount, list_length(shardPlacementList)))); } return removedShardCount; } + + +/* + * TryDropShard tries to drop the given shard placement and returns + * true on success. On failure, this method swallows errors and emits them + * as WARNINGs. + */ +static bool +TryDropShard(GroupShardPlacement *placement) +{ + ShardPlacement *shardPlacement = LoadShardPlacement(placement->shardId, + placement->placementId); + ShardInterval *shardInterval = LoadShardInterval(shardPlacement->shardId); + + ereport(LOG, (errmsg("dropping shard placement " INT64_FORMAT " of shard " + INT64_FORMAT " on %s:%d after it was moved away", + shardPlacement->placementId, shardPlacement->shardId, + shardPlacement->nodeName, shardPlacement->nodePort))); + + /* prepare sql query to execute to drop the shard */ + StringInfo dropQuery = makeStringInfo(); + char *qualifiedTableName = ConstructQualifiedShardName(shardInterval); + appendStringInfo(dropQuery, DROP_REGULAR_TABLE_COMMAND, qualifiedTableName); + + /* + * We set a lock_timeout here so that if there are running queries on the + * shards we won't get blocked more than 1s and fail. + * + * The lock timeout also avoids getting stuck in a distributed deadlock, which + * can occur because we might be holding pg_dist_placement locks while also + * taking locks on the shard placements, and this code interrupts the + * distributed deadlock detector. + */ + List *dropCommandList = list_make2("SET LOCAL lock_timeout TO '1s'", + dropQuery->data); + + /* remove the shard from the node */ + bool success = + SendOptionalCommandListToWorkerInTransaction(shardPlacement->nodeName, + shardPlacement->nodePort, + NULL, dropCommandList); + if (success) + { + /* delete the actual placement */ + DeleteShardPlacementRow(placement->placementId); + } + + return success; +} diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index 0f3c3222d..b8422bfcc 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -509,6 +509,53 @@ SendCommandListToWorkerInSingleTransaction(const char *nodeName, int32 nodePort, } +/* + * SendOptionalCommandListToWorkerInTransaction sends the given command list to + * the given worker in a single transaction. If any of the commands fail, it + * rollbacks the transaction, and otherwise commits. + */ +bool +SendOptionalCommandListToWorkerInTransaction(const char *nodeName, int32 nodePort, + const char *nodeUser, List *commandList) +{ + int connectionFlags = FORCE_NEW_CONNECTION; + bool failed = false; + + MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags, + nodeName, nodePort, + nodeUser, NULL); + if (PQstatus(workerConnection->pgConn) != CONNECTION_OK) + { + return false; + } + RemoteTransactionBegin(workerConnection); + + /* iterate over the commands and execute them in the same connection */ + const char *commandString = NULL; + foreach_ptr(commandString, commandList) + { + if (ExecuteOptionalRemoteCommand(workerConnection, commandString, NULL) != 0) + { + failed = true; + break; + } + } + + if (failed) + { + RemoteTransactionAbort(workerConnection); + } + else + { + RemoteTransactionCommit(workerConnection); + } + + CloseConnection(workerConnection); + + return !failed; +} + + /* * ErrorIfAnyMetadataNodeOutOfSync raises an error if any of the given * metadata nodes are out of sync. It is safer to avoid metadata changing diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index 8538830ba..7ccc38495 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -51,10 +51,6 @@ extern void CreateTableMetadataOnWorkers(Oid relationId); extern void MarkNodeHasMetadata(const char *nodeName, int32 nodePort, bool hasMetadata); extern void MarkNodeMetadataSynced(const char *nodeName, int32 nodePort, bool synced); extern BackgroundWorkerHandle * SpawnSyncMetadataToNodes(Oid database, Oid owner); -extern bool SendOptionalCommandListToWorkerInTransaction(const char *nodeName, int32 - nodePort, - const char *nodeUser, - List *commandList); extern void SyncMetadataToNodesMain(Datum main_arg); extern void SignalMetadataSyncDaemon(Oid database, int sig); extern bool ShouldInitiateMetadataSync(bool *lockFailure); diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index 3322596af..d48dd1d42 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -37,6 +37,10 @@ extern void SendCommandToWorkersAsUser(TargetWorkerSet targetWorkerSet, const char *nodeUser, const char *command); extern void SendCommandToWorkerAsUser(const char *nodeName, int32 nodePort, const char *nodeUser, const char *command); +extern bool SendOptionalCommandListToWorkerInTransaction(const char *nodeName, int32 + nodePort, + const char *nodeUser, + List *commandList); extern void SendCommandToWorkersWithMetadata(const char *command); extern void SendBareCommandListToMetadataWorkers(List *commandList); extern int SendBareOptionalCommandListToAllWorkersAsUser(List *commandList, diff --git a/src/test/regress/expected/isolation_rebalancer_deferred_drop.out b/src/test/regress/expected/isolation_rebalancer_deferred_drop.out index fca41ec08..779e70252 100644 --- a/src/test/regress/expected/isolation_rebalancer_deferred_drop.out +++ b/src/test/regress/expected/isolation_rebalancer_deferred_drop.out @@ -54,3 +54,48 @@ master_defer_delete_shards step s1-commit: COMMIT; + +starting permutation: s1-begin s1-move-placement s2-start-session-level-connection s2-lock-table-on-worker s1-drop-marked-shards s1-commit s2-stop-connection +step s1-begin: + BEGIN; + +step s1-move-placement: + SET citus.defer_drop_after_shard_move TO ON; + SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); + +master_move_shard_placement + + +step s2-start-session-level-connection: + SELECT start_session_level_connection_to_node('localhost', 57637); + +start_session_level_connection_to_node + + +step s2-lock-table-on-worker: + SELECT run_commands_on_session_level_connection_to_node('BEGIN;'); + SELECT run_commands_on_session_level_connection_to_node('LOCK TABLE t1_120000'); + +run_commands_on_session_level_connection_to_node + + +run_commands_on_session_level_connection_to_node + + +s1: WARNING: canceling statement due to lock timeout +s1: WARNING: Failed to drop 1 old shards out of 1 +step s1-drop-marked-shards: + SELECT public.master_defer_delete_shards(); + +master_defer_delete_shards + +0 +step s1-commit: + COMMIT; + +step s2-stop-connection: + SELECT stop_session_level_connection_to_node(); + +stop_session_level_connection_to_node + + diff --git a/src/test/regress/spec/isolation_rebalancer_deferred_drop.spec b/src/test/regress/spec/isolation_rebalancer_deferred_drop.spec index 4ccb33583..6e91e97be 100644 --- a/src/test/regress/spec/isolation_rebalancer_deferred_drop.spec +++ b/src/test/regress/spec/isolation_rebalancer_deferred_drop.spec @@ -2,6 +2,22 @@ // so setting the corresponding shard here is useful setup { + + CREATE OR REPLACE FUNCTION start_session_level_connection_to_node(text, integer) + RETURNS void + LANGUAGE C STRICT VOLATILE + AS 'citus', $$start_session_level_connection_to_node$$; + + CREATE OR REPLACE FUNCTION run_commands_on_session_level_connection_to_node(text) + RETURNS void + LANGUAGE C STRICT VOLATILE + AS 'citus', $$run_commands_on_session_level_connection_to_node$$; + + CREATE OR REPLACE FUNCTION stop_session_level_connection_to_node() + RETURNS void + LANGUAGE C STRICT VOLATILE + AS 'citus', $$stop_session_level_connection_to_node$$; + SELECT citus_internal.replace_isolation_tester_func(); SELECT citus_internal.refresh_isolation_tester_prepared_statement(); @@ -12,6 +28,7 @@ CREATE OR REPLACE FUNCTION master_defer_delete_shards() COMMENT ON FUNCTION master_defer_delete_shards() IS 'remove orphaned shards'; + SET citus.next_shard_id to 120000; SET citus.shard_count TO 8; SET citus.shard_replication_factor TO 1; SET citus.defer_drop_after_shard_move TO ON; @@ -55,6 +72,22 @@ step "s1-commit" session "s2" +step "s2-start-session-level-connection" +{ + SELECT start_session_level_connection_to_node('localhost', 57637); +} + +step "s2-stop-connection" +{ + SELECT stop_session_level_connection_to_node(); +} + +step "s2-lock-table-on-worker" +{ + SELECT run_commands_on_session_level_connection_to_node('BEGIN;'); + SELECT run_commands_on_session_level_connection_to_node('LOCK TABLE t1_120000'); +} + step "s2-drop-marked-shards" { SELECT public.master_defer_delete_shards(); @@ -62,5 +95,4 @@ step "s2-drop-marked-shards" permutation "s1-begin" "s1-move-placement" "s1-drop-marked-shards" "s2-drop-marked-shards" "s1-commit" permutation "s1-begin" "s1-move-placement" "s2-drop-marked-shards" "s1-drop-marked-shards" "s1-commit" - - +permutation "s1-begin" "s1-move-placement" "s2-start-session-level-connection" "s2-lock-table-on-worker" "s1-drop-marked-shards" "s1-commit" "s2-stop-connection"