Continue to remove shards after first failure in DropMarkedShards

The comment of DropMarkedShards described the behaviour that after a
failure we would continue trying to drop other shards. However the code
did not do this and would stop after the first failure. Instead of
simply fixing the comment I fixed the code, because the described
behaviour is more useful. Now a single shard that cannot be removed yet
does not block others from being removed.
pull/4919/head
Jelte Fennema 2020-01-21 15:42:46 +01:00 committed by Sait Talha Nisanci
parent ca5d281784
commit 2f29d4e53e
7 changed files with 199 additions and 80 deletions

View File

@ -319,50 +319,6 @@ SyncMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError)
}
/*
* SendOptionalCommandListToWorkerInTransaction sends the given command list to
* the given worker in a single transaction. If any of the commands fail, it
* rollbacks the transaction, and otherwise commits.
*/
bool
SendOptionalCommandListToWorkerInTransaction(const char *nodeName, int32 nodePort,
const char *nodeUser, List *commandList)
{
int connectionFlags = FORCE_NEW_CONNECTION;
bool failed = false;
MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags,
nodeName, nodePort,
nodeUser, NULL);
RemoteTransactionBegin(workerConnection);
/* iterate over the commands and execute them in the same connection */
const char *commandString = NULL;
foreach_ptr(commandString, commandList)
{
if (ExecuteOptionalRemoteCommand(workerConnection, commandString, NULL) != 0)
{
failed = true;
break;
}
}
if (failed)
{
RemoteTransactionAbort(workerConnection);
}
else
{
RemoteTransactionCommit(workerConnection);
}
CloseConnection(workerConnection);
return !failed;
}
/*
* MetadataCreateCommands returns list of queries that are
* required to create the current metadata snapshot of the node that the

View File

@ -25,7 +25,7 @@ PG_FUNCTION_INFO_V1(master_defer_delete_shards);
static int DropMarkedShards(bool waitForCleanupLock);
static bool TryDropShard(GroupShardPlacement *placement);
/*
* master_defer_delete_shards implements a user-facing UDF to deleter orphaned shards that
@ -85,11 +85,11 @@ TryDropMarkedShards(bool waitForCleanupLock)
/*
* DropMarkedShards removes shards that were marked SHARD_STATE_TO_DELETE before.
*
* It does so by taking an exclusive lock on the shard and its colocated
* placements before removing. If the lock cannot be obtained it skips the
* group and continues with others. The group that has been skipped will be
* removed at a later time when there are no locks held anymore on those
* placements.
* It does so by trying to take an exclusive lock on the shard and its
* colocated placements before removing. If the lock cannot be obtained it
* skips the group and continues with others. The group that has been skipped
* will be removed at a later time when there are no locks held anymore on
* those placements.
*
* Before doing any of this it will take an exclusive PlacementCleanup lock.
* This is to ensure that this function is not being run concurrently.
@ -118,6 +118,7 @@ DropMarkedShards(bool waitForCleanupLock)
return 0;
}
int failedShardDropCount = 0;
List *shardPlacementList = AllShardPlacementsWithShardPlacementState(
SHARD_STATE_TO_DELETE);
foreach(shardPlacementCell, shardPlacementList)
@ -131,32 +132,70 @@ DropMarkedShards(bool waitForCleanupLock)
continue;
}
ShardPlacement *shardPlacement = LoadShardPlacement(placement->shardId,
placement->placementId);
ShardInterval *shardInterval = LoadShardInterval(shardPlacement->shardId);
if (TryDropShard(placement))
{
removedShardCount++;
}
else
{
failedShardDropCount++;
}
}
ereport(LOG, (errmsg("dropping shard placement " INT64_FORMAT " of shard "
INT64_FORMAT " on %s:%d after it was moved away",
shardPlacement->placementId, shardPlacement->shardId,
shardPlacement->nodeName, shardPlacement->nodePort)));
/* prepare sql query to execute to drop the shard */
StringInfo dropQuery = makeStringInfo();
char *qualifiedTableName = ConstructQualifiedShardName(shardInterval);
appendStringInfo(dropQuery, DROP_REGULAR_TABLE_COMMAND, qualifiedTableName);
List *dropCommandList = list_make2("SET LOCAL lock_timeout TO '1s'",
dropQuery->data);
/* remove the shard from the node and the placement information */
SendCommandListToWorkerInSingleTransaction(shardPlacement->nodeName,
shardPlacement->nodePort,
NULL, dropCommandList);
DeleteShardPlacementRow(placement->placementId);
removedShardCount++;
if (failedShardDropCount > 0)
{
ereport(WARNING, (errmsg("Failed to drop %d old shards out of %d",
failedShardDropCount, list_length(shardPlacementList))));
}
return removedShardCount;
}
/*
* TryDropShard tries to drop the given shard placement and returns
* true on success. On failure, this method swallows errors and emits them
* as WARNINGs.
*/
static bool
TryDropShard(GroupShardPlacement *placement)
{
ShardPlacement *shardPlacement = LoadShardPlacement(placement->shardId,
placement->placementId);
ShardInterval *shardInterval = LoadShardInterval(shardPlacement->shardId);
ereport(LOG, (errmsg("dropping shard placement " INT64_FORMAT " of shard "
INT64_FORMAT " on %s:%d after it was moved away",
shardPlacement->placementId, shardPlacement->shardId,
shardPlacement->nodeName, shardPlacement->nodePort)));
/* prepare sql query to execute to drop the shard */
StringInfo dropQuery = makeStringInfo();
char *qualifiedTableName = ConstructQualifiedShardName(shardInterval);
appendStringInfo(dropQuery, DROP_REGULAR_TABLE_COMMAND, qualifiedTableName);
/*
* We set a lock_timeout here so that if there are running queries on the
* shards we won't get blocked more than 1s and fail.
*
* The lock timeout also avoids getting stuck in a distributed deadlock, which
* can occur because we might be holding pg_dist_placement locks while also
* taking locks on the shard placements, and this code interrupts the
* distributed deadlock detector.
*/
List *dropCommandList = list_make2("SET LOCAL lock_timeout TO '1s'",
dropQuery->data);
/* remove the shard from the node */
bool success =
SendOptionalCommandListToWorkerInTransaction(shardPlacement->nodeName,
shardPlacement->nodePort,
NULL, dropCommandList);
if (success)
{
/* delete the actual placement */
DeleteShardPlacementRow(placement->placementId);
}
return success;
}

View File

@ -509,6 +509,53 @@ SendCommandListToWorkerInSingleTransaction(const char *nodeName, int32 nodePort,
}
/*
* SendOptionalCommandListToWorkerInTransaction sends the given command list to
* the given worker in a single transaction. If any of the commands fail, it
* rollbacks the transaction, and otherwise commits.
*/
bool
SendOptionalCommandListToWorkerInTransaction(const char *nodeName, int32 nodePort,
const char *nodeUser, List *commandList)
{
int connectionFlags = FORCE_NEW_CONNECTION;
bool failed = false;
MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags,
nodeName, nodePort,
nodeUser, NULL);
if (PQstatus(workerConnection->pgConn) != CONNECTION_OK)
{
return false;
}
RemoteTransactionBegin(workerConnection);
/* iterate over the commands and execute them in the same connection */
const char *commandString = NULL;
foreach_ptr(commandString, commandList)
{
if (ExecuteOptionalRemoteCommand(workerConnection, commandString, NULL) != 0)
{
failed = true;
break;
}
}
if (failed)
{
RemoteTransactionAbort(workerConnection);
}
else
{
RemoteTransactionCommit(workerConnection);
}
CloseConnection(workerConnection);
return !failed;
}
/*
* ErrorIfAnyMetadataNodeOutOfSync raises an error if any of the given
* metadata nodes are out of sync. It is safer to avoid metadata changing

View File

@ -51,10 +51,6 @@ extern void CreateTableMetadataOnWorkers(Oid relationId);
extern void MarkNodeHasMetadata(const char *nodeName, int32 nodePort, bool hasMetadata);
extern void MarkNodeMetadataSynced(const char *nodeName, int32 nodePort, bool synced);
extern BackgroundWorkerHandle * SpawnSyncMetadataToNodes(Oid database, Oid owner);
extern bool SendOptionalCommandListToWorkerInTransaction(const char *nodeName, int32
nodePort,
const char *nodeUser,
List *commandList);
extern void SyncMetadataToNodesMain(Datum main_arg);
extern void SignalMetadataSyncDaemon(Oid database, int sig);
extern bool ShouldInitiateMetadataSync(bool *lockFailure);

View File

@ -37,6 +37,10 @@ extern void SendCommandToWorkersAsUser(TargetWorkerSet targetWorkerSet,
const char *nodeUser, const char *command);
extern void SendCommandToWorkerAsUser(const char *nodeName, int32 nodePort,
const char *nodeUser, const char *command);
extern bool SendOptionalCommandListToWorkerInTransaction(const char *nodeName, int32
nodePort,
const char *nodeUser,
List *commandList);
extern void SendCommandToWorkersWithMetadata(const char *command);
extern void SendBareCommandListToMetadataWorkers(List *commandList);
extern int SendBareOptionalCommandListToAllWorkersAsUser(List *commandList,

View File

@ -54,3 +54,48 @@ master_defer_delete_shards
step s1-commit:
COMMIT;
starting permutation: s1-begin s1-move-placement s2-start-session-level-connection s2-lock-table-on-worker s1-drop-marked-shards s1-commit s2-stop-connection
step s1-begin:
BEGIN;
step s1-move-placement:
SET citus.defer_drop_after_shard_move TO ON;
SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
master_move_shard_placement
step s2-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57637);
start_session_level_connection_to_node
step s2-lock-table-on-worker:
SELECT run_commands_on_session_level_connection_to_node('BEGIN;');
SELECT run_commands_on_session_level_connection_to_node('LOCK TABLE t1_120000');
run_commands_on_session_level_connection_to_node
run_commands_on_session_level_connection_to_node
s1: WARNING: canceling statement due to lock timeout
s1: WARNING: Failed to drop 1 old shards out of 1
step s1-drop-marked-shards:
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
0
step s1-commit:
COMMIT;
step s2-stop-connection:
SELECT stop_session_level_connection_to_node();
stop_session_level_connection_to_node

View File

@ -2,6 +2,22 @@
// so setting the corresponding shard here is useful
setup
{
CREATE OR REPLACE FUNCTION start_session_level_connection_to_node(text, integer)
RETURNS void
LANGUAGE C STRICT VOLATILE
AS 'citus', $$start_session_level_connection_to_node$$;
CREATE OR REPLACE FUNCTION run_commands_on_session_level_connection_to_node(text)
RETURNS void
LANGUAGE C STRICT VOLATILE
AS 'citus', $$run_commands_on_session_level_connection_to_node$$;
CREATE OR REPLACE FUNCTION stop_session_level_connection_to_node()
RETURNS void
LANGUAGE C STRICT VOLATILE
AS 'citus', $$stop_session_level_connection_to_node$$;
SELECT citus_internal.replace_isolation_tester_func();
SELECT citus_internal.refresh_isolation_tester_prepared_statement();
@ -12,6 +28,7 @@ CREATE OR REPLACE FUNCTION master_defer_delete_shards()
COMMENT ON FUNCTION master_defer_delete_shards()
IS 'remove orphaned shards';
SET citus.next_shard_id to 120000;
SET citus.shard_count TO 8;
SET citus.shard_replication_factor TO 1;
SET citus.defer_drop_after_shard_move TO ON;
@ -55,6 +72,22 @@ step "s1-commit"
session "s2"
step "s2-start-session-level-connection"
{
SELECT start_session_level_connection_to_node('localhost', 57637);
}
step "s2-stop-connection"
{
SELECT stop_session_level_connection_to_node();
}
step "s2-lock-table-on-worker"
{
SELECT run_commands_on_session_level_connection_to_node('BEGIN;');
SELECT run_commands_on_session_level_connection_to_node('LOCK TABLE t1_120000');
}
step "s2-drop-marked-shards"
{
SELECT public.master_defer_delete_shards();
@ -62,5 +95,4 @@ step "s2-drop-marked-shards"
permutation "s1-begin" "s1-move-placement" "s1-drop-marked-shards" "s2-drop-marked-shards" "s1-commit"
permutation "s1-begin" "s1-move-placement" "s2-drop-marked-shards" "s1-drop-marked-shards" "s1-commit"
permutation "s1-begin" "s1-move-placement" "s2-start-session-level-connection" "s2-lock-table-on-worker" "s1-drop-marked-shards" "s1-commit" "s2-stop-connection"