diff --git a/src/backend/distributed/commands/local_multi_copy.c b/src/backend/distributed/commands/local_multi_copy.c index efa8aa954..60eaef3ce 100644 --- a/src/backend/distributed/commands/local_multi_copy.c +++ b/src/backend/distributed/commands/local_multi_copy.c @@ -88,6 +88,7 @@ WriteTupleToLocalShard(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, in DoLocalCopy(localCopyOutState->fe_msgbuf, copyDest->distributedRelationId, shardId, copyDest->copyStatement, isEndOfCopy); + resetStringInfo(localCopyOutState->fe_msgbuf); } } @@ -173,7 +174,6 @@ DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, CopyStmt *copyStat heap_close(shard, NoLock); free_parsestate(pState); - resetStringInfo(buffer); } diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 34852becb..0c17a32e4 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -291,7 +291,7 @@ static void CitusCopyDestReceiverDestroy(DestReceiver *destReceiver); static bool ContainsLocalPlacement(int64 shardId); static void FinishLocalCopy(CitusCopyDestReceiver *copyDest); static void CloneCopyOutStateForLocalCopy(CopyOutState from, CopyOutState to); -static bool ShouldExecuteCopyLocally(void); +static bool ShouldExecuteCopyLocally(bool isIntermediateResult); static void LogLocalCopyExecution(uint64 shardId); @@ -1984,8 +1984,6 @@ CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColu CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) palloc0( sizeof(CitusCopyDestReceiver)); - copyDest->shouldUseLocalCopy = ShouldExecuteCopyLocally(); - /* set up the DestReceiver function pointers */ copyDest->pub.receiveSlot = CitusCopyDestReceiverReceive; copyDest->pub.rStartup = CitusCopyDestReceiverStartup; @@ -2011,13 +2009,23 @@ CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColu * operation should be done locally for local placements. */ static bool -ShouldExecuteCopyLocally() +ShouldExecuteCopyLocally(bool isIntermediateResult) { if (!EnableLocalExecution) { return false; } + /* + * Intermediate files are written to a file, and files are visible to all + * transactions, and we use a custom copy format for copy therefore we will + * use the existing logic for that. + */ + if (isIntermediateResult) + { + return false; + } + if (TransactionAccessedLocalPlacement) { /* @@ -2056,6 +2064,8 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, { CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) dest; + bool isIntermediateResult = copyDest->intermediateResultIdPrefix != NULL; + copyDest->shouldUseLocalCopy = ShouldExecuteCopyLocally(isIntermediateResult); Oid tableId = copyDest->distributedRelationId; char *relationName = get_rel_name(tableId);