mirror of https://github.com/citusdata/citus.git
Merge pull request #4919 from citusdata/continue_dropping_shards
Continue to remove shards after first failure in DropMarkedShardspull/4925/head
commit
2b70987341
|
@ -319,50 +319,6 @@ SyncMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* SendOptionalCommandListToWorkerInTransaction sends the given command list to
|
|
||||||
* the given worker in a single transaction. If any of the commands fail, it
|
|
||||||
* rollbacks the transaction, and otherwise commits.
|
|
||||||
*/
|
|
||||||
bool
|
|
||||||
SendOptionalCommandListToWorkerInTransaction(const char *nodeName, int32 nodePort,
|
|
||||||
const char *nodeUser, List *commandList)
|
|
||||||
{
|
|
||||||
int connectionFlags = FORCE_NEW_CONNECTION;
|
|
||||||
bool failed = false;
|
|
||||||
|
|
||||||
MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags,
|
|
||||||
nodeName, nodePort,
|
|
||||||
nodeUser, NULL);
|
|
||||||
|
|
||||||
RemoteTransactionBegin(workerConnection);
|
|
||||||
|
|
||||||
/* iterate over the commands and execute them in the same connection */
|
|
||||||
const char *commandString = NULL;
|
|
||||||
foreach_ptr(commandString, commandList)
|
|
||||||
{
|
|
||||||
if (ExecuteOptionalRemoteCommand(workerConnection, commandString, NULL) != 0)
|
|
||||||
{
|
|
||||||
failed = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (failed)
|
|
||||||
{
|
|
||||||
RemoteTransactionAbort(workerConnection);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
RemoteTransactionCommit(workerConnection);
|
|
||||||
}
|
|
||||||
|
|
||||||
CloseConnection(workerConnection);
|
|
||||||
|
|
||||||
return !failed;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* MetadataCreateCommands returns list of queries that are
|
* MetadataCreateCommands returns list of queries that are
|
||||||
* required to create the current metadata snapshot of the node that the
|
* required to create the current metadata snapshot of the node that the
|
||||||
|
|
|
@ -25,7 +25,7 @@ PG_FUNCTION_INFO_V1(master_defer_delete_shards);
|
||||||
|
|
||||||
|
|
||||||
static int DropMarkedShards(bool waitForCleanupLock);
|
static int DropMarkedShards(bool waitForCleanupLock);
|
||||||
|
static bool TryDropShard(GroupShardPlacement *placement);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* master_defer_delete_shards implements a user-facing UDF to deleter orphaned shards that
|
* master_defer_delete_shards implements a user-facing UDF to deleter orphaned shards that
|
||||||
|
@ -85,11 +85,11 @@ TryDropMarkedShards(bool waitForCleanupLock)
|
||||||
/*
|
/*
|
||||||
* DropMarkedShards removes shards that were marked SHARD_STATE_TO_DELETE before.
|
* DropMarkedShards removes shards that were marked SHARD_STATE_TO_DELETE before.
|
||||||
*
|
*
|
||||||
* It does so by taking an exclusive lock on the shard and its colocated
|
* It does so by trying to take an exclusive lock on the shard and its
|
||||||
* placements before removing. If the lock cannot be obtained it skips the
|
* colocated placements before removing. If the lock cannot be obtained it
|
||||||
* group and continues with others. The group that has been skipped will be
|
* skips the group and continues with others. The group that has been skipped
|
||||||
* removed at a later time when there are no locks held anymore on those
|
* will be removed at a later time when there are no locks held anymore on
|
||||||
* placements.
|
* those placements.
|
||||||
*
|
*
|
||||||
* Before doing any of this it will take an exclusive PlacementCleanup lock.
|
* Before doing any of this it will take an exclusive PlacementCleanup lock.
|
||||||
* This is to ensure that this function is not being run concurrently.
|
* This is to ensure that this function is not being run concurrently.
|
||||||
|
@ -118,6 +118,7 @@ DropMarkedShards(bool waitForCleanupLock)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int failedShardDropCount = 0;
|
||||||
List *shardPlacementList = AllShardPlacementsWithShardPlacementState(
|
List *shardPlacementList = AllShardPlacementsWithShardPlacementState(
|
||||||
SHARD_STATE_TO_DELETE);
|
SHARD_STATE_TO_DELETE);
|
||||||
foreach(shardPlacementCell, shardPlacementList)
|
foreach(shardPlacementCell, shardPlacementList)
|
||||||
|
@ -131,32 +132,70 @@ DropMarkedShards(bool waitForCleanupLock)
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
ShardPlacement *shardPlacement = LoadShardPlacement(placement->shardId,
|
if (TryDropShard(placement))
|
||||||
placement->placementId);
|
{
|
||||||
ShardInterval *shardInterval = LoadShardInterval(shardPlacement->shardId);
|
removedShardCount++;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
failedShardDropCount++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
ereport(LOG, (errmsg("dropping shard placement " INT64_FORMAT " of shard "
|
if (failedShardDropCount > 0)
|
||||||
INT64_FORMAT " on %s:%d after it was moved away",
|
{
|
||||||
shardPlacement->placementId, shardPlacement->shardId,
|
ereport(WARNING, (errmsg("Failed to drop %d old shards out of %d",
|
||||||
shardPlacement->nodeName, shardPlacement->nodePort)));
|
failedShardDropCount, list_length(shardPlacementList))));
|
||||||
|
|
||||||
/* prepare sql query to execute to drop the shard */
|
|
||||||
StringInfo dropQuery = makeStringInfo();
|
|
||||||
char *qualifiedTableName = ConstructQualifiedShardName(shardInterval);
|
|
||||||
appendStringInfo(dropQuery, DROP_REGULAR_TABLE_COMMAND, qualifiedTableName);
|
|
||||||
|
|
||||||
List *dropCommandList = list_make2("SET LOCAL lock_timeout TO '1s'",
|
|
||||||
dropQuery->data);
|
|
||||||
|
|
||||||
/* remove the shard from the node and the placement information */
|
|
||||||
SendCommandListToWorkerInSingleTransaction(shardPlacement->nodeName,
|
|
||||||
shardPlacement->nodePort,
|
|
||||||
NULL, dropCommandList);
|
|
||||||
|
|
||||||
DeleteShardPlacementRow(placement->placementId);
|
|
||||||
|
|
||||||
removedShardCount++;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return removedShardCount;
|
return removedShardCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* TryDropShard tries to drop the given shard placement and returns
|
||||||
|
* true on success. On failure, this method swallows errors and emits them
|
||||||
|
* as WARNINGs.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
TryDropShard(GroupShardPlacement *placement)
|
||||||
|
{
|
||||||
|
ShardPlacement *shardPlacement = LoadShardPlacement(placement->shardId,
|
||||||
|
placement->placementId);
|
||||||
|
ShardInterval *shardInterval = LoadShardInterval(shardPlacement->shardId);
|
||||||
|
|
||||||
|
ereport(LOG, (errmsg("dropping shard placement " INT64_FORMAT " of shard "
|
||||||
|
INT64_FORMAT " on %s:%d after it was moved away",
|
||||||
|
shardPlacement->placementId, shardPlacement->shardId,
|
||||||
|
shardPlacement->nodeName, shardPlacement->nodePort)));
|
||||||
|
|
||||||
|
/* prepare sql query to execute to drop the shard */
|
||||||
|
StringInfo dropQuery = makeStringInfo();
|
||||||
|
char *qualifiedTableName = ConstructQualifiedShardName(shardInterval);
|
||||||
|
appendStringInfo(dropQuery, DROP_REGULAR_TABLE_COMMAND, qualifiedTableName);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We set a lock_timeout here so that if there are running queries on the
|
||||||
|
* shards we won't get blocked more than 1s and fail.
|
||||||
|
*
|
||||||
|
* The lock timeout also avoids getting stuck in a distributed deadlock, which
|
||||||
|
* can occur because we might be holding pg_dist_placement locks while also
|
||||||
|
* taking locks on the shard placements, and this code interrupts the
|
||||||
|
* distributed deadlock detector.
|
||||||
|
*/
|
||||||
|
List *dropCommandList = list_make2("SET LOCAL lock_timeout TO '1s'",
|
||||||
|
dropQuery->data);
|
||||||
|
|
||||||
|
/* remove the shard from the node */
|
||||||
|
bool success =
|
||||||
|
SendOptionalCommandListToWorkerInTransaction(shardPlacement->nodeName,
|
||||||
|
shardPlacement->nodePort,
|
||||||
|
NULL, dropCommandList);
|
||||||
|
if (success)
|
||||||
|
{
|
||||||
|
/* delete the actual placement */
|
||||||
|
DeleteShardPlacementRow(placement->placementId);
|
||||||
|
}
|
||||||
|
|
||||||
|
return success;
|
||||||
|
}
|
||||||
|
|
|
@ -509,6 +509,53 @@ SendCommandListToWorkerInSingleTransaction(const char *nodeName, int32 nodePort,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* SendOptionalCommandListToWorkerInTransaction sends the given command list to
|
||||||
|
* the given worker in a single transaction. If any of the commands fail, it
|
||||||
|
* rollbacks the transaction, and otherwise commits.
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
SendOptionalCommandListToWorkerInTransaction(const char *nodeName, int32 nodePort,
|
||||||
|
const char *nodeUser, List *commandList)
|
||||||
|
{
|
||||||
|
int connectionFlags = FORCE_NEW_CONNECTION;
|
||||||
|
bool failed = false;
|
||||||
|
|
||||||
|
MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags,
|
||||||
|
nodeName, nodePort,
|
||||||
|
nodeUser, NULL);
|
||||||
|
if (PQstatus(workerConnection->pgConn) != CONNECTION_OK)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
RemoteTransactionBegin(workerConnection);
|
||||||
|
|
||||||
|
/* iterate over the commands and execute them in the same connection */
|
||||||
|
const char *commandString = NULL;
|
||||||
|
foreach_ptr(commandString, commandList)
|
||||||
|
{
|
||||||
|
if (ExecuteOptionalRemoteCommand(workerConnection, commandString, NULL) != 0)
|
||||||
|
{
|
||||||
|
failed = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (failed)
|
||||||
|
{
|
||||||
|
RemoteTransactionAbort(workerConnection);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
RemoteTransactionCommit(workerConnection);
|
||||||
|
}
|
||||||
|
|
||||||
|
CloseConnection(workerConnection);
|
||||||
|
|
||||||
|
return !failed;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ErrorIfAnyMetadataNodeOutOfSync raises an error if any of the given
|
* ErrorIfAnyMetadataNodeOutOfSync raises an error if any of the given
|
||||||
* metadata nodes are out of sync. It is safer to avoid metadata changing
|
* metadata nodes are out of sync. It is safer to avoid metadata changing
|
||||||
|
|
|
@ -51,10 +51,6 @@ extern void CreateTableMetadataOnWorkers(Oid relationId);
|
||||||
extern void MarkNodeHasMetadata(const char *nodeName, int32 nodePort, bool hasMetadata);
|
extern void MarkNodeHasMetadata(const char *nodeName, int32 nodePort, bool hasMetadata);
|
||||||
extern void MarkNodeMetadataSynced(const char *nodeName, int32 nodePort, bool synced);
|
extern void MarkNodeMetadataSynced(const char *nodeName, int32 nodePort, bool synced);
|
||||||
extern BackgroundWorkerHandle * SpawnSyncMetadataToNodes(Oid database, Oid owner);
|
extern BackgroundWorkerHandle * SpawnSyncMetadataToNodes(Oid database, Oid owner);
|
||||||
extern bool SendOptionalCommandListToWorkerInTransaction(const char *nodeName, int32
|
|
||||||
nodePort,
|
|
||||||
const char *nodeUser,
|
|
||||||
List *commandList);
|
|
||||||
extern void SyncMetadataToNodesMain(Datum main_arg);
|
extern void SyncMetadataToNodesMain(Datum main_arg);
|
||||||
extern void SignalMetadataSyncDaemon(Oid database, int sig);
|
extern void SignalMetadataSyncDaemon(Oid database, int sig);
|
||||||
extern bool ShouldInitiateMetadataSync(bool *lockFailure);
|
extern bool ShouldInitiateMetadataSync(bool *lockFailure);
|
||||||
|
|
|
@ -37,6 +37,10 @@ extern void SendCommandToWorkersAsUser(TargetWorkerSet targetWorkerSet,
|
||||||
const char *nodeUser, const char *command);
|
const char *nodeUser, const char *command);
|
||||||
extern void SendCommandToWorkerAsUser(const char *nodeName, int32 nodePort,
|
extern void SendCommandToWorkerAsUser(const char *nodeName, int32 nodePort,
|
||||||
const char *nodeUser, const char *command);
|
const char *nodeUser, const char *command);
|
||||||
|
extern bool SendOptionalCommandListToWorkerInTransaction(const char *nodeName, int32
|
||||||
|
nodePort,
|
||||||
|
const char *nodeUser,
|
||||||
|
List *commandList);
|
||||||
extern void SendCommandToWorkersWithMetadata(const char *command);
|
extern void SendCommandToWorkersWithMetadata(const char *command);
|
||||||
extern void SendBareCommandListToMetadataWorkers(List *commandList);
|
extern void SendBareCommandListToMetadataWorkers(List *commandList);
|
||||||
extern int SendBareOptionalCommandListToAllWorkersAsUser(List *commandList,
|
extern int SendBareOptionalCommandListToAllWorkersAsUser(List *commandList,
|
||||||
|
|
|
@ -54,3 +54,48 @@ master_defer_delete_shards
|
||||||
step s1-commit:
|
step s1-commit:
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s1-begin s1-move-placement s2-start-session-level-connection s2-lock-table-on-worker s1-drop-marked-shards s1-commit s2-stop-connection
|
||||||
|
step s1-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s1-move-placement:
|
||||||
|
SET citus.defer_drop_after_shard_move TO ON;
|
||||||
|
SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
|
||||||
|
|
||||||
|
master_move_shard_placement
|
||||||
|
|
||||||
|
|
||||||
|
step s2-start-session-level-connection:
|
||||||
|
SELECT start_session_level_connection_to_node('localhost', 57637);
|
||||||
|
|
||||||
|
start_session_level_connection_to_node
|
||||||
|
|
||||||
|
|
||||||
|
step s2-lock-table-on-worker:
|
||||||
|
SELECT run_commands_on_session_level_connection_to_node('BEGIN;');
|
||||||
|
SELECT run_commands_on_session_level_connection_to_node('LOCK TABLE t1_120000');
|
||||||
|
|
||||||
|
run_commands_on_session_level_connection_to_node
|
||||||
|
|
||||||
|
|
||||||
|
run_commands_on_session_level_connection_to_node
|
||||||
|
|
||||||
|
|
||||||
|
s1: WARNING: canceling statement due to lock timeout
|
||||||
|
s1: WARNING: Failed to drop 1 old shards out of 1
|
||||||
|
step s1-drop-marked-shards:
|
||||||
|
SELECT public.master_defer_delete_shards();
|
||||||
|
|
||||||
|
master_defer_delete_shards
|
||||||
|
|
||||||
|
0
|
||||||
|
step s1-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s2-stop-connection:
|
||||||
|
SELECT stop_session_level_connection_to_node();
|
||||||
|
|
||||||
|
stop_session_level_connection_to_node
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -2,6 +2,22 @@
|
||||||
// so setting the corresponding shard here is useful
|
// so setting the corresponding shard here is useful
|
||||||
setup
|
setup
|
||||||
{
|
{
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION start_session_level_connection_to_node(text, integer)
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE C STRICT VOLATILE
|
||||||
|
AS 'citus', $$start_session_level_connection_to_node$$;
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION run_commands_on_session_level_connection_to_node(text)
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE C STRICT VOLATILE
|
||||||
|
AS 'citus', $$run_commands_on_session_level_connection_to_node$$;
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION stop_session_level_connection_to_node()
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE C STRICT VOLATILE
|
||||||
|
AS 'citus', $$stop_session_level_connection_to_node$$;
|
||||||
|
|
||||||
SELECT citus_internal.replace_isolation_tester_func();
|
SELECT citus_internal.replace_isolation_tester_func();
|
||||||
SELECT citus_internal.refresh_isolation_tester_prepared_statement();
|
SELECT citus_internal.refresh_isolation_tester_prepared_statement();
|
||||||
|
|
||||||
|
@ -12,6 +28,7 @@ CREATE OR REPLACE FUNCTION master_defer_delete_shards()
|
||||||
COMMENT ON FUNCTION master_defer_delete_shards()
|
COMMENT ON FUNCTION master_defer_delete_shards()
|
||||||
IS 'remove orphaned shards';
|
IS 'remove orphaned shards';
|
||||||
|
|
||||||
|
SET citus.next_shard_id to 120000;
|
||||||
SET citus.shard_count TO 8;
|
SET citus.shard_count TO 8;
|
||||||
SET citus.shard_replication_factor TO 1;
|
SET citus.shard_replication_factor TO 1;
|
||||||
SET citus.defer_drop_after_shard_move TO ON;
|
SET citus.defer_drop_after_shard_move TO ON;
|
||||||
|
@ -55,6 +72,22 @@ step "s1-commit"
|
||||||
|
|
||||||
session "s2"
|
session "s2"
|
||||||
|
|
||||||
|
step "s2-start-session-level-connection"
|
||||||
|
{
|
||||||
|
SELECT start_session_level_connection_to_node('localhost', 57637);
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s2-stop-connection"
|
||||||
|
{
|
||||||
|
SELECT stop_session_level_connection_to_node();
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s2-lock-table-on-worker"
|
||||||
|
{
|
||||||
|
SELECT run_commands_on_session_level_connection_to_node('BEGIN;');
|
||||||
|
SELECT run_commands_on_session_level_connection_to_node('LOCK TABLE t1_120000');
|
||||||
|
}
|
||||||
|
|
||||||
step "s2-drop-marked-shards"
|
step "s2-drop-marked-shards"
|
||||||
{
|
{
|
||||||
SELECT public.master_defer_delete_shards();
|
SELECT public.master_defer_delete_shards();
|
||||||
|
@ -62,5 +95,4 @@ step "s2-drop-marked-shards"
|
||||||
|
|
||||||
permutation "s1-begin" "s1-move-placement" "s1-drop-marked-shards" "s2-drop-marked-shards" "s1-commit"
|
permutation "s1-begin" "s1-move-placement" "s1-drop-marked-shards" "s2-drop-marked-shards" "s1-commit"
|
||||||
permutation "s1-begin" "s1-move-placement" "s2-drop-marked-shards" "s1-drop-marked-shards" "s1-commit"
|
permutation "s1-begin" "s1-move-placement" "s2-drop-marked-shards" "s1-drop-marked-shards" "s1-commit"
|
||||||
|
permutation "s1-begin" "s1-move-placement" "s2-start-session-level-connection" "s2-lock-table-on-worker" "s1-drop-marked-shards" "s1-commit" "s2-stop-connection"
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue