diff --git a/src/backend/distributed/executor/intermediate_results.c b/src/backend/distributed/executor/intermediate_results.c index c0ca7deb1..d1ed74d28 100644 --- a/src/backend/distributed/executor/intermediate_results.c +++ b/src/backend/distributed/executor/intermediate_results.c @@ -67,6 +67,7 @@ typedef struct RemoteFileDestReceiver /* whether to write to a local file */ bool writeLocalFile; File fileDesc; + off_t offset; /* state on how to copy out data types */ CopyOutState copyOutState; @@ -80,7 +81,7 @@ typedef struct RemoteFileDestReceiver static void RemoteFileDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc inputTupleDescriptor); static StringInfo ConstructCopyResultStatement(const char *resultId); -static void WriteToLocalFile(StringInfo copyData, File fileDesc); +static void WriteToLocalFile(StringInfo copyData, RemoteFileDestReceiver *fileDest); static bool RemoteFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest); static void BroadcastCopyData(StringInfo dataBuffer, List *connectionList); static void SendCopyDataOverConnection(StringInfo dataBuffer, @@ -265,6 +266,7 @@ RemoteFileDestReceiverStartup(DestReceiver *dest, int operation, elog(DEBUG1, "writing to local file \"%s\"", fileName); resultDest->fileDesc = FileOpenForTransmit(fileName, fileFlags, fileMode); + resultDest->offset = 0; } foreach(initialNodeCell, initialNodeList) @@ -330,7 +332,7 @@ RemoteFileDestReceiverStartup(DestReceiver *dest, int operation, if (resultDest->writeLocalFile) { - WriteToLocalFile(copyOutState->fe_msgbuf, resultDest->fileDesc); + WriteToLocalFile(copyOutState->fe_msgbuf, resultDest); } } @@ -395,7 +397,7 @@ RemoteFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) /* write to local file (if applicable) */ if (resultDest->writeLocalFile) { - WriteToLocalFile(copyOutState->fe_msgbuf, resultDest->fileDesc); + WriteToLocalFile(copyOutState->fe_msgbuf, resultDest); } MemoryContextSwitchTo(oldContext); @@ -412,15 +414,18 @@ RemoteFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) * WriteToLocalResultsFile writes the bytes in a StringInfo to a local file. */ static void -WriteToLocalFile(StringInfo copyData, File fileDesc) +WriteToLocalFile(StringInfo copyData, RemoteFileDestReceiver *fileDest) { - int bytesWritten = FileWriteCompat(fileDesc, copyData->data, copyData->len, 0, + int bytesWritten = FileWriteCompat(fileDest->fileDesc, copyData->data, copyData->len, + fileDest->offset, PG_WAIT_IO); if (bytesWritten < 0) { ereport(ERROR, (errcode_for_file_access(), errmsg("could not append to file: %m"))); } + + fileDest->offset += bytesWritten; } @@ -446,7 +451,7 @@ RemoteFileDestReceiverShutdown(DestReceiver *destReceiver) if (resultDest->writeLocalFile) { - WriteToLocalFile(copyOutState->fe_msgbuf, resultDest->fileDesc); + WriteToLocalFile(copyOutState->fe_msgbuf, resultDest); } } diff --git a/src/backend/distributed/planner/shard_pruning.c b/src/backend/distributed/planner/shard_pruning.c index e765bbb4f..8ad9130b0 100644 --- a/src/backend/distributed/planner/shard_pruning.c +++ b/src/backend/distributed/planner/shard_pruning.c @@ -273,10 +273,10 @@ PruneShards(Oid relationId, Index rangeTableId, List *whereClauseList, if (cacheEntry->shardIntervalCompareFunction) { /* initiate function call info once (allows comparators to cache metadata) */ - InitFunctionCallInfoDataCompat(*(FunctionCallInfo) & - context.compareIntervalFunctionCall, - cacheEntry->shardIntervalCompareFunction, - 2, DEFAULT_COLLATION_OID, NULL, NULL); + InitFunctionCallInfoData(*(FunctionCallInfo) & + context.compareIntervalFunctionCall, + cacheEntry->shardIntervalCompareFunction, + 2, DEFAULT_COLLATION_OID, NULL, NULL); } else { @@ -287,10 +287,10 @@ PruneShards(Oid relationId, Index rangeTableId, List *whereClauseList, if (cacheEntry->shardColumnCompareFunction) { /* initiate function call info once (allows comparators to cache metadata) */ - InitFunctionCallInfoDataCompat(*(FunctionCallInfo) & - context.compareValueFunctionCall, - cacheEntry->shardColumnCompareFunction, - 2, DEFAULT_COLLATION_OID, NULL, NULL); + InitFunctionCallInfoData(*(FunctionCallInfo) & + context.compareValueFunctionCall, + cacheEntry->shardColumnCompareFunction, + 2, DEFAULT_COLLATION_OID, NULL, NULL); } else { diff --git a/src/backend/distributed/relay/relay_event_utility.c b/src/backend/distributed/relay/relay_event_utility.c index 3174be1fa..7cfe5d0bd 100644 --- a/src/backend/distributed/relay/relay_event_utility.c +++ b/src/backend/distributed/relay/relay_event_utility.c @@ -756,10 +756,8 @@ AppendShardIdToName(char **name, uint64 shardId) neededBytes = snprintf((*name), NAMEDATALEN, "%s", extendedName); if (neededBytes < 0) { - char *strerrno = strerror(errno); - ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), - errmsg("out of memory: %s", strerrno))); + errmsg("out of memory: %m"))); } else if (neededBytes >= NAMEDATALEN) { diff --git a/src/backend/distributed/worker/worker_sql_task_protocol.c b/src/backend/distributed/worker/worker_sql_task_protocol.c index 99589779d..89e7669af 100644 --- a/src/backend/distributed/worker/worker_sql_task_protocol.c +++ b/src/backend/distributed/worker/worker_sql_task_protocol.c @@ -41,6 +41,7 @@ typedef struct TaskFileDestReceiver char *filePath; File fileDesc; bool binaryCopyFormat; + off_t offset; /* state on how to copy out data types */ CopyOutState copyOutState; @@ -56,7 +57,7 @@ static DestReceiver * CreateTaskFileDestReceiver(char *filePath, EState *executo static void TaskFileDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc inputTupleDescriptor); static bool TaskFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest); -static void WriteToLocalFile(StringInfo copyData, File fileDesc); +static void WriteToLocalFile(StringInfo copyData, TaskFileDestReceiver *taskFileDest); static void TaskFileDestReceiverShutdown(DestReceiver *destReceiver); static void TaskFileDestReceiverDestroy(DestReceiver *destReceiver); @@ -186,6 +187,7 @@ TaskFileDestReceiverStartup(DestReceiver *dest, int operation, taskFileDest->fileDesc = FileOpenForTransmit(taskFileDest->filePath, fileFlags, fileMode); + taskFileDest->offset = 0; if (copyOutState->binary) { @@ -193,7 +195,7 @@ TaskFileDestReceiverStartup(DestReceiver *dest, int operation, resetStringInfo(copyOutState->fe_msgbuf); AppendCopyBinaryHeaders(copyOutState); - WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest->fileDesc); + WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest); } MemoryContextSwitchTo(oldContext); @@ -234,7 +236,7 @@ TaskFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) AppendCopyRowData(columnValues, columnNulls, tupleDescriptor, copyOutState, columnOutputFunctions, NULL); - WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest->fileDesc); + WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest); MemoryContextSwitchTo(oldContext); @@ -250,15 +252,18 @@ TaskFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) * WriteToLocalResultsFile writes the bytes in a StringInfo to a local file. */ static void -WriteToLocalFile(StringInfo copyData, File fileDesc) +WriteToLocalFile(StringInfo copyData, TaskFileDestReceiver *taskFileDest) { - int bytesWritten = FileWriteCompat(fileDesc, copyData->data, copyData->len, 0, + int bytesWritten = FileWriteCompat(taskFileDest->fileDesc, copyData->data, + copyData->len, taskFileDest->offset, PG_WAIT_IO); if (bytesWritten < 0) { ereport(ERROR, (errcode_for_file_access(), errmsg("could not append to file: %m"))); } + + taskFileDest->offset += bytesWritten; } @@ -278,7 +283,7 @@ TaskFileDestReceiverShutdown(DestReceiver *destReceiver) /* write footers when using binary encoding */ resetStringInfo(copyOutState->fe_msgbuf); AppendCopyBinaryFooters(copyOutState); - WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest->fileDesc); + WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest); } FileClose(taskFileDest->fileDesc); diff --git a/src/include/distributed/version_compat.h b/src/include/distributed/version_compat.h index e56f91875..465122281 100644 --- a/src/include/distributed/version_compat.h +++ b/src/include/distributed/version_compat.h @@ -267,7 +267,6 @@ RangeVarGetRelidInternal(const RangeVar *relation, LOCKMODE lockmode, uint32 fla ((fc)->args[n].isnull = false, (fc)->args[n].value = (argval)) #define fcSetArgNull(fc, n) \ ((fc)->args[n].isnull = true, (fc)->args[n].value = (Datum) 0) -#define InitFunctionCallInfoDataCompat InitFunctionCallInfoData #else /* pre PG12 */ #define QTW_EXAMINE_RTES_BEFORE QTW_EXAMINE_RTES @@ -297,8 +296,6 @@ RangeVarGetRelidInternal(const RangeVar *relation, LOCKMODE lockmode, uint32 fla (((fc)->argnull[n] = false), ((fc)->arg[n] = (value))) #define fcSetArgNull(fc, n) \ (((fc)->argnull[n] = true), ((fc)->arg[n] = (Datum) 0)) -#define InitFunctionCallInfoDataCompat(fc, fn, nargs, collation, ctx, result) \ - InitFunctionCallInfoData(fc, fn, nargs, collation, ctx, result) #define FileReadCompat(file, buffer, amount, offset, wait_event_info) \ FileRead(file, buffer, amount, wait_event_info)