From 0430b568bea30f0ed6f1d378a2716d3716d682c4 Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Tue, 7 Apr 2020 17:30:34 +0300 Subject: [PATCH] explicitly return false if transaction connected to local node (#3715) * explicitly return false if transaction connected to local node * not set TransactionConnectedToLocalGroup if we are writing to a file We use TransactionConnectedToLocalGroup to prevent local execution from happening as that might cause visibility problems. As files are visible to all transactions, we shouldn't set this variable if we are writing to a file. --- src/backend/distributed/commands/multi_copy.c | 35 +++++++++++++------ .../distributed/executor/local_executor.c | 11 +++++- 2 files changed, 34 insertions(+), 12 deletions(-) diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index c5a320096..7724d2018 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -246,7 +246,7 @@ static CopyConnectionState * GetConnectionState(HTAB *connectionStateHash, static CopyShardState * GetShardState(uint64 shardId, HTAB *shardStateHash, HTAB *connectionStateHash, bool stopOnFailure, bool *found, bool shouldUseLocalCopy, CopyOutState - copyOutState); + copyOutState, bool isCopyToIntermediateFile); static MultiConnection * CopyGetPlacementConnection(ShardPlacement *placement, bool stopOnFailure); static List * ConnectionStateList(HTAB *connectionStateHash); @@ -254,7 +254,8 @@ static void InitializeCopyShardState(CopyShardState *shardState, HTAB *connectionStateHash, uint64 shardId, bool stopOnFailure, bool canUseLocalCopy, - CopyOutState copyOutState); + CopyOutState copyOutState, + bool isCopyToIntermediateFile); static void StartPlacementStateCopyCommand(CopyPlacementState *placementState, CopyStmt *copyStatement, CopyOutState copyOutState); @@ -2292,12 +2293,14 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest /* connections hash is kept in memory context */ MemoryContextSwitchTo(copyDest->memoryContext); + bool isIntermediateResult = copyDest->intermediateResultIdPrefix != NULL; CopyShardState *shardState = GetShardState(shardId, copyDest->shardStateHash, copyDest->connectionStateHash, stopOnFailure, &cachedShardStateFound, copyDest->shouldUseLocalCopy, - copyDest->copyOutState); + copyDest->copyOutState, + isIntermediateResult); if (!cachedShardStateFound) { firstTupleInShard = true; @@ -3219,7 +3222,8 @@ ConnectionStateList(HTAB *connectionStateHash) static CopyShardState * GetShardState(uint64 shardId, HTAB *shardStateHash, HTAB *connectionStateHash, bool stopOnFailure, bool *found, bool - shouldUseLocalCopy, CopyOutState copyOutState) + shouldUseLocalCopy, CopyOutState copyOutState, bool + isCopyToIntermediateFile) { CopyShardState *shardState = (CopyShardState *) hash_search(shardStateHash, &shardId, HASH_ENTER, found); @@ -3227,7 +3231,7 @@ GetShardState(uint64 shardId, HTAB *shardStateHash, { InitializeCopyShardState(shardState, connectionStateHash, shardId, stopOnFailure, shouldUseLocalCopy, - copyOutState); + copyOutState, isCopyToIntermediateFile); } return shardState; @@ -3243,7 +3247,7 @@ static void InitializeCopyShardState(CopyShardState *shardState, HTAB *connectionStateHash, uint64 shardId, bool stopOnFailure, bool shouldUseLocalCopy, CopyOutState - copyOutState) + copyOutState, bool isCopyToIntermediateFile) { ListCell *placementCell = NULL; int failedPlacementCount = 0; @@ -3281,6 +3285,19 @@ InitializeCopyShardState(CopyShardState *shardState, continue; } + if (placement->groupId == GetLocalGroupId()) + { + /* + * if we are copying into an intermediate file we won't use local copy. + * Files are visible to all transactions so we can still use local execution, therefore + * we shouldn't restrict only using connection in this case. + */ + if (!isCopyToIntermediateFile) + { + TransactionConnectedToLocalGroup = true; + } + } + MultiConnection *connection = CopyGetPlacementConnection(placement, stopOnFailure); if (connection == NULL) @@ -3289,6 +3306,7 @@ InitializeCopyShardState(CopyShardState *shardState, continue; } + CopyConnectionState *connectionState = GetConnectionState(connectionStateHash, connection); @@ -3439,11 +3457,6 @@ CopyGetPlacementConnection(ShardPlacement *placement, bool stopOnFailure) ClaimConnectionExclusively(connection); } - if (!TransactionConnectedToLocalGroup && placement->groupId == GetLocalGroupId()) - { - TransactionConnectedToLocalGroup = true; - } - return connection; } diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index 52c720060..e2a91079b 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -598,6 +598,15 @@ ShouldExecuteTasksLocally(List *taskList) return false; } + if (TransactionConnectedToLocalGroup) + { + /* + * if the current transaction accessed the local node over a connection + * then we can't use local execution because of visibility problems. + */ + return false; + } + if (TransactionAccessedLocalPlacement) { bool isValidLocalExecutionPath PG_USED_FOR_ASSERTS_ONLY = false; @@ -657,7 +666,7 @@ ShouldExecuteTasksLocally(List *taskList) * has happened because that'd break transaction visibility rules and * many other things. */ - return !TransactionConnectedToLocalGroup; + return true; } if (!singleTask)