From 275378aa457c1c0184127cc702ea56fe0ff92f3a Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Wed, 26 Oct 2016 12:36:47 +0200 Subject: [PATCH] Re-acquire metadata locks in RouterExecutorStart --- .../distributed/executor/multi_executor.c | 3 +- .../executor/multi_router_executor.c | 73 ++++++++++++++++++- src/backend/distributed/utils/resource_lock.c | 22 ++++++ .../distributed/multi_router_executor.h | 2 +- src/include/distributed/resource_lock.h | 1 + 5 files changed, 98 insertions(+), 3 deletions(-) diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 13359d3d5..9d2a8ea61 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -57,6 +57,7 @@ multi_ExecutorStart(QueryDesc *queryDesc, int eflags) executorType = JobExecutorType(multiPlan); if (executorType == MULTI_EXECUTOR_ROUTER) { + List *taskList = workerJob->taskList; TupleDesc tupleDescriptor = ExecCleanTypeFromTL( planStatement->planTree->targetlist, false); List *dependendJobList PG_USED_FOR_ASSERTS_ONLY = workerJob->dependedJobList; @@ -68,7 +69,7 @@ multi_ExecutorStart(QueryDesc *queryDesc, int eflags) queryDesc->tupDesc = tupleDescriptor; /* drop into the router executor */ - RouterExecutorStart(queryDesc, eflags); + RouterExecutorStart(queryDesc, eflags, taskList); } else { diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 40f9df47f..1919f8f62 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -30,6 +30,7 @@ #include "distributed/citus_ruleutils.h" #include "distributed/commit_protocol.h" #include "distributed/connection_cache.h" +#include "distributed/listutils.h" #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" #include "distributed/multi_executor.h" @@ -90,6 +91,7 @@ static void InitTransactionStateForTask(Task *task); static HTAB * CreateXactParticipantHash(void); /* functions needed during run phase */ +static void ReacquireMetadataLocks(List *taskList); static bool ExecuteSingleTask(QueryDesc *queryDesc, Task *task, bool isModificationQuery, bool expectResults); static void ExecuteMultipleTasks(QueryDesc *queryDesc, List *taskList, @@ -133,11 +135,23 @@ static void MarkRemainingInactivePlacements(void); * execution. */ void -RouterExecutorStart(QueryDesc *queryDesc, int eflags) +RouterExecutorStart(QueryDesc *queryDesc, int eflags, List *taskList) { EState *executorState = NULL; CmdType commandType = queryDesc->operation; + /* + * If we are executing a prepared statement, then we may not yet have obtained + * the metadata locks in this transaction. To prevent a concurrent shard copy, + * we re-obtain them here or error out if a shard copy has already started. + * + * If a shard copy finishes in between fetching a plan from cache and + * re-acquiring the locks, then we might still run a stale plan, which could + * cause shard placements to diverge. To minimize this window, we take the + * locks as early as possible. + */ + ReacquireMetadataLocks(taskList); + /* disallow triggers during distributed modify commands */ if (commandType != CMD_SELECT) { @@ -163,6 +177,63 @@ RouterExecutorStart(QueryDesc *queryDesc, int eflags) } +/* + * ReacquireMetadataLocks re-acquires the metadata locks that are normally + * acquired during planning. + * + * If we are executing a prepared statement, then planning might have + * happened in a separate transaction and advisory locks are no longer + * held. If a shard is currently being repaired/copied/moved, then + * obtaining the locks will fail and this function throws an error to + * prevent executing a stale plan. + * + * If we are executing a non-prepared statement or planning happened in + * the same transaction, then we already have the locks and obtain them + * again here. Since we always release these locks at the end of the + * transaction, this is effectively a no-op. + */ +static void +ReacquireMetadataLocks(List *taskList) +{ + ListCell *taskCell = NULL; + + /* + * Note: to avoid the overhead of additional sorting, we assume tasks + * to be already sorted by shard ID such that deadlocks are avoided. + * This is true for INSERT/SELECT, which is the only multi-shard + * command right now. + */ + + foreach(taskCell, taskList) + { + Task *task = (Task *) lfirst(taskCell); + + /* + * Only obtain metadata locks for modifications to allow reads to + * proceed during shard copy. + */ + if (task->taskType == MODIFY_TASK && + !TryLockShardDistributionMetadata(task->anchorShardId, ShareLock)) + { + /* + * We could error out immediately to give quick feedback to the + * client, but this might complicate flow control and our default + * behaviour during shard copy is to block. + * + * Block until the lock becomes available such that the next command + * will likely succeed and use the serialization failure error code + * to signal to the client that it should retry the current command. + */ + LockShardDistributionMetadata(task->anchorShardId, ShareLock); + + ereport(ERROR, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("prepared modifications cannot be executed on " + "a shard while it is being copied"))); + } + } +} + + /* * InitTransactionStateForTask is called during executor start with the first * modifying (INSERT/UPDATE/DELETE) task during a transaction. It creates the diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index e7af6f27a..ce3a6ad35 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -45,6 +45,28 @@ LockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode) } +/* + * TryLockShardDistributionMetadata tries to grab a lock for distribution + * metadata related to the specified shard, returning false if the lock + * is currently taken. Any locks acquired using this method are released + * at transaction end. + */ +bool +TryLockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode) +{ + LOCKTAG tag; + const bool sessionLock = false; + const bool dontWait = true; + bool lockAcquired = false; + + SET_LOCKTAG_SHARD_METADATA_RESOURCE(tag, MyDatabaseId, shardId); + + lockAcquired = LockAcquire(&tag, lockMode, sessionLock, dontWait); + + return lockAcquired; +} + + /* * LockRelationDistributionMetadata returns after getting a the lock used for a * relation's distribution metadata, blocking if required. Only ExclusiveLock diff --git a/src/include/distributed/multi_router_executor.h b/src/include/distributed/multi_router_executor.h index fd3cc1c5e..48f7f6499 100644 --- a/src/include/distributed/multi_router_executor.h +++ b/src/include/distributed/multi_router_executor.h @@ -33,7 +33,7 @@ typedef struct XactShardConnSet extern bool AllModificationsCommutative; -extern void RouterExecutorStart(QueryDesc *queryDesc, int eflags); +extern void RouterExecutorStart(QueryDesc *queryDesc, int eflags, List *taskList); extern void RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count); extern void RouterExecutorFinish(QueryDesc *queryDesc); extern void RouterExecutorEnd(QueryDesc *queryDesc); diff --git a/src/include/distributed/resource_lock.h b/src/include/distributed/resource_lock.h index 454e8085f..fd9836bae 100644 --- a/src/include/distributed/resource_lock.h +++ b/src/include/distributed/resource_lock.h @@ -65,6 +65,7 @@ typedef enum AdvisoryLocktagClass /* Lock shard/relation metadata for safe modifications */ extern void LockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode); +extern bool TryLockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode); extern void LockRelationDistributionMetadata(Oid relationId, LOCKMODE lockMode); /* Lock shard data, for DML commands or remote fetches */