Re-acquire metadata locks in RouterExecutorStart

pull/916/head
Marco Slot 2016-10-26 12:36:47 +02:00
parent 1e6d1ef67e
commit 275378aa45
5 changed files with 98 additions and 3 deletions

View File

@ -57,6 +57,7 @@ multi_ExecutorStart(QueryDesc *queryDesc, int eflags)
executorType = JobExecutorType(multiPlan);
if (executorType == MULTI_EXECUTOR_ROUTER)
{
List *taskList = workerJob->taskList;
TupleDesc tupleDescriptor = ExecCleanTypeFromTL(
planStatement->planTree->targetlist, false);
List *dependendJobList PG_USED_FOR_ASSERTS_ONLY = workerJob->dependedJobList;
@ -68,7 +69,7 @@ multi_ExecutorStart(QueryDesc *queryDesc, int eflags)
queryDesc->tupDesc = tupleDescriptor;
/* drop into the router executor */
RouterExecutorStart(queryDesc, eflags);
RouterExecutorStart(queryDesc, eflags, taskList);
}
else
{

View File

@ -30,6 +30,7 @@
#include "distributed/citus_ruleutils.h"
#include "distributed/commit_protocol.h"
#include "distributed/connection_cache.h"
#include "distributed/listutils.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_executor.h"
@ -90,6 +91,7 @@ static void InitTransactionStateForTask(Task *task);
static HTAB * CreateXactParticipantHash(void);
/* functions needed during run phase */
static void ReacquireMetadataLocks(List *taskList);
static bool ExecuteSingleTask(QueryDesc *queryDesc, Task *task,
bool isModificationQuery, bool expectResults);
static void ExecuteMultipleTasks(QueryDesc *queryDesc, List *taskList,
@ -133,11 +135,23 @@ static void MarkRemainingInactivePlacements(void);
* execution.
*/
void
RouterExecutorStart(QueryDesc *queryDesc, int eflags)
RouterExecutorStart(QueryDesc *queryDesc, int eflags, List *taskList)
{
EState *executorState = NULL;
CmdType commandType = queryDesc->operation;
/*
* If we are executing a prepared statement, then we may not yet have obtained
* the metadata locks in this transaction. To prevent a concurrent shard copy,
* we re-obtain them here or error out if a shard copy has already started.
*
* If a shard copy finishes in between fetching a plan from cache and
* re-acquiring the locks, then we might still run a stale plan, which could
* cause shard placements to diverge. To minimize this window, we take the
* locks as early as possible.
*/
ReacquireMetadataLocks(taskList);
/* disallow triggers during distributed modify commands */
if (commandType != CMD_SELECT)
{
@ -163,6 +177,63 @@ RouterExecutorStart(QueryDesc *queryDesc, int eflags)
}
/*
* ReacquireMetadataLocks re-acquires the metadata locks that are normally
* acquired during planning.
*
* If we are executing a prepared statement, then planning might have
* happened in a separate transaction and advisory locks are no longer
* held. If a shard is currently being repaired/copied/moved, then
* obtaining the locks will fail and this function throws an error to
* prevent executing a stale plan.
*
* If we are executing a non-prepared statement or planning happened in
* the same transaction, then we already have the locks and obtain them
* again here. Since we always release these locks at the end of the
* transaction, this is effectively a no-op.
*/
static void
ReacquireMetadataLocks(List *taskList)
{
ListCell *taskCell = NULL;
/*
* Note: to avoid the overhead of additional sorting, we assume tasks
* to be already sorted by shard ID such that deadlocks are avoided.
* This is true for INSERT/SELECT, which is the only multi-shard
* command right now.
*/
foreach(taskCell, taskList)
{
Task *task = (Task *) lfirst(taskCell);
/*
* Only obtain metadata locks for modifications to allow reads to
* proceed during shard copy.
*/
if (task->taskType == MODIFY_TASK &&
!TryLockShardDistributionMetadata(task->anchorShardId, ShareLock))
{
/*
* We could error out immediately to give quick feedback to the
* client, but this might complicate flow control and our default
* behaviour during shard copy is to block.
*
* Block until the lock becomes available such that the next command
* will likely succeed and use the serialization failure error code
* to signal to the client that it should retry the current command.
*/
LockShardDistributionMetadata(task->anchorShardId, ShareLock);
ereport(ERROR, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("prepared modifications cannot be executed on "
"a shard while it is being copied")));
}
}
}
/*
* InitTransactionStateForTask is called during executor start with the first
* modifying (INSERT/UPDATE/DELETE) task during a transaction. It creates the

View File

@ -45,6 +45,28 @@ LockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode)
}
/*
* TryLockShardDistributionMetadata tries to grab a lock for distribution
* metadata related to the specified shard, returning false if the lock
* is currently taken. Any locks acquired using this method are released
* at transaction end.
*/
bool
TryLockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode)
{
LOCKTAG tag;
const bool sessionLock = false;
const bool dontWait = true;
bool lockAcquired = false;
SET_LOCKTAG_SHARD_METADATA_RESOURCE(tag, MyDatabaseId, shardId);
lockAcquired = LockAcquire(&tag, lockMode, sessionLock, dontWait);
return lockAcquired;
}
/*
* LockRelationDistributionMetadata returns after getting a the lock used for a
* relation's distribution metadata, blocking if required. Only ExclusiveLock

View File

@ -33,7 +33,7 @@ typedef struct XactShardConnSet
extern bool AllModificationsCommutative;
extern void RouterExecutorStart(QueryDesc *queryDesc, int eflags);
extern void RouterExecutorStart(QueryDesc *queryDesc, int eflags, List *taskList);
extern void RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count);
extern void RouterExecutorFinish(QueryDesc *queryDesc);
extern void RouterExecutorEnd(QueryDesc *queryDesc);

View File

@ -65,6 +65,7 @@ typedef enum AdvisoryLocktagClass
/* Lock shard/relation metadata for safe modifications */
extern void LockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode);
extern bool TryLockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode);
extern void LockRelationDistributionMetadata(Oid relationId, LOCKMODE lockMode);
/* Lock shard data, for DML commands or remote fetches */