diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index c5a320096..7724d2018 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -246,7 +246,7 @@ static CopyConnectionState * GetConnectionState(HTAB *connectionStateHash, static CopyShardState * GetShardState(uint64 shardId, HTAB *shardStateHash, HTAB *connectionStateHash, bool stopOnFailure, bool *found, bool shouldUseLocalCopy, CopyOutState - copyOutState); + copyOutState, bool isCopyToIntermediateFile); static MultiConnection * CopyGetPlacementConnection(ShardPlacement *placement, bool stopOnFailure); static List * ConnectionStateList(HTAB *connectionStateHash); @@ -254,7 +254,8 @@ static void InitializeCopyShardState(CopyShardState *shardState, HTAB *connectionStateHash, uint64 shardId, bool stopOnFailure, bool canUseLocalCopy, - CopyOutState copyOutState); + CopyOutState copyOutState, + bool isCopyToIntermediateFile); static void StartPlacementStateCopyCommand(CopyPlacementState *placementState, CopyStmt *copyStatement, CopyOutState copyOutState); @@ -2292,12 +2293,14 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest /* connections hash is kept in memory context */ MemoryContextSwitchTo(copyDest->memoryContext); + bool isIntermediateResult = copyDest->intermediateResultIdPrefix != NULL; CopyShardState *shardState = GetShardState(shardId, copyDest->shardStateHash, copyDest->connectionStateHash, stopOnFailure, &cachedShardStateFound, copyDest->shouldUseLocalCopy, - copyDest->copyOutState); + copyDest->copyOutState, + isIntermediateResult); if (!cachedShardStateFound) { firstTupleInShard = true; @@ -3219,7 +3222,8 @@ ConnectionStateList(HTAB *connectionStateHash) static CopyShardState * GetShardState(uint64 shardId, HTAB *shardStateHash, HTAB *connectionStateHash, bool stopOnFailure, bool *found, bool - shouldUseLocalCopy, CopyOutState copyOutState) + shouldUseLocalCopy, CopyOutState copyOutState, bool + isCopyToIntermediateFile) { CopyShardState *shardState = (CopyShardState *) hash_search(shardStateHash, &shardId, HASH_ENTER, found); @@ -3227,7 +3231,7 @@ GetShardState(uint64 shardId, HTAB *shardStateHash, { InitializeCopyShardState(shardState, connectionStateHash, shardId, stopOnFailure, shouldUseLocalCopy, - copyOutState); + copyOutState, isCopyToIntermediateFile); } return shardState; @@ -3243,7 +3247,7 @@ static void InitializeCopyShardState(CopyShardState *shardState, HTAB *connectionStateHash, uint64 shardId, bool stopOnFailure, bool shouldUseLocalCopy, CopyOutState - copyOutState) + copyOutState, bool isCopyToIntermediateFile) { ListCell *placementCell = NULL; int failedPlacementCount = 0; @@ -3281,6 +3285,19 @@ InitializeCopyShardState(CopyShardState *shardState, continue; } + if (placement->groupId == GetLocalGroupId()) + { + /* + * if we are copying into an intermediate file we won't use local copy. + * Files are visible to all transactions so we can still use local execution, therefore + * we shouldn't restrict only using connection in this case. + */ + if (!isCopyToIntermediateFile) + { + TransactionConnectedToLocalGroup = true; + } + } + MultiConnection *connection = CopyGetPlacementConnection(placement, stopOnFailure); if (connection == NULL) @@ -3289,6 +3306,7 @@ InitializeCopyShardState(CopyShardState *shardState, continue; } + CopyConnectionState *connectionState = GetConnectionState(connectionStateHash, connection); @@ -3439,11 +3457,6 @@ CopyGetPlacementConnection(ShardPlacement *placement, bool stopOnFailure) ClaimConnectionExclusively(connection); } - if (!TransactionConnectedToLocalGroup && placement->groupId == GetLocalGroupId()) - { - TransactionConnectedToLocalGroup = true; - } - return connection; } diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index 52c720060..e2a91079b 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -598,6 +598,15 @@ ShouldExecuteTasksLocally(List *taskList) return false; } + if (TransactionConnectedToLocalGroup) + { + /* + * if the current transaction accessed the local node over a connection + * then we can't use local execution because of visibility problems. + */ + return false; + } + if (TransactionAccessedLocalPlacement) { bool isValidLocalExecutionPath PG_USED_FOR_ASSERTS_ONLY = false; @@ -657,7 +666,7 @@ ShouldExecuteTasksLocally(List *taskList) * has happened because that'd break transaction visibility rules and * many other things. */ - return !TransactionConnectedToLocalGroup; + return true; } if (!singleTask)