From fe10ca453d29c6c53500a849950397348725549d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 8 Aug 2019 21:59:58 +0000 Subject: [PATCH] Implement FileCompat to abstract pg12 requiring API consumer to track file offsets --- src/backend/distributed/commands/transmit.c | 20 +++--- .../executor/intermediate_results.c | 23 +++--- .../worker/worker_partition_protocol.c | 47 ++++++------ .../worker/worker_sql_task_protocol.c | 24 ++++--- src/include/distributed/version_compat.h | 72 +++++++++++++++++++ src/include/distributed/worker_protocol.h | 2 +- 6 files changed, 136 insertions(+), 52 deletions(-) diff --git a/src/backend/distributed/commands/transmit.c b/src/backend/distributed/commands/transmit.c index bfa434b31..9e6b0f059 100644 --- a/src/backend/distributed/commands/transmit.c +++ b/src/backend/distributed/commands/transmit.c @@ -43,11 +43,10 @@ RedirectCopyDataToRegularFile(const char *filename) { StringInfo copyData = makeStringInfo(); bool copyDone = false; - File fileDesc = -1; const int fileFlags = (O_APPEND | O_CREAT | O_RDWR | O_TRUNC | PG_BINARY); const int fileMode = (S_IRUSR | S_IWUSR); - - fileDesc = FileOpenForTransmit(filename, fileFlags, fileMode); + File fileDesc = FileOpenForTransmit(filename, fileFlags, fileMode); + FileCompat fileCompat = FileCompatFromFileStart(fileDesc); SendCopyInStart(); @@ -57,8 +56,8 @@ RedirectCopyDataToRegularFile(const char *filename) /* if received data has contents, append to regular file */ if (copyData->len > 0) { - int appended = FileWrite(fileDesc, copyData->data, copyData->len, - PG_WAIT_IO); + int appended = FileWriteCompat(&fileCompat, copyData->data, + copyData->len, PG_WAIT_IO); if (appended != copyData->len) { @@ -84,7 +83,6 @@ RedirectCopyDataToRegularFile(const char *filename) void SendRegularFile(const char *filename) { - File fileDesc = -1; StringInfo fileBuffer = NULL; int readBytes = -1; const uint32 fileBufferSize = 32768; /* 32 KB */ @@ -92,7 +90,8 @@ SendRegularFile(const char *filename) const int fileMode = 0; /* we currently do not check if the caller has permissions for this file */ - fileDesc = FileOpenForTransmit(filename, fileFlags, fileMode); + File fileDesc = FileOpenForTransmit(filename, fileFlags, fileMode); + FileCompat fileCompat = FileCompatFromFileStart(fileDesc); /* * We read file's contents into buffers of 32 KB. This buffer size is twice @@ -103,7 +102,8 @@ SendRegularFile(const char *filename) SendCopyOutStart(); - readBytes = FileRead(fileDesc, fileBuffer->data, fileBufferSize, PG_WAIT_IO); + readBytes = FileReadCompat(&fileCompat, fileBuffer->data, fileBufferSize, + PG_WAIT_IO); while (readBytes > 0) { fileBuffer->len = readBytes; @@ -111,8 +111,8 @@ SendRegularFile(const char *filename) SendCopyData(fileBuffer); resetStringInfo(fileBuffer); - readBytes = FileRead(fileDesc, fileBuffer->data, fileBufferSize, - PG_WAIT_IO); + readBytes = FileReadCompat(&fileCompat, fileBuffer->data, fileBufferSize, + PG_WAIT_IO); } SendCopyDone(); diff --git a/src/backend/distributed/executor/intermediate_results.c b/src/backend/distributed/executor/intermediate_results.c index 71da880c6..cec12ce34 100644 --- a/src/backend/distributed/executor/intermediate_results.c +++ b/src/backend/distributed/executor/intermediate_results.c @@ -27,6 +27,7 @@ #include "distributed/remote_commands.h" #include "distributed/transmit.h" #include "distributed/transaction_identifier.h" +#include "distributed/version_compat.h" #include "distributed/worker_protocol.h" #include "nodes/makefuncs.h" #include "nodes/parsenodes.h" @@ -65,7 +66,7 @@ typedef struct RemoteFileDestReceiver /* whether to write to a local file */ bool writeLocalFile; - File fileDesc; + FileCompat fileCompat; /* state on how to copy out data types */ CopyOutState copyOutState; @@ -79,7 +80,7 @@ typedef struct RemoteFileDestReceiver static void RemoteFileDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc inputTupleDescriptor); static StringInfo ConstructCopyResultStatement(const char *resultId); -static void WriteToLocalFile(StringInfo copyData, File fileDesc); +static void WriteToLocalFile(StringInfo copyData, FileCompat *fileCompat); static bool RemoteFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest); static void BroadcastCopyData(StringInfo dataBuffer, List *connectionList); static void SendCopyDataOverConnection(StringInfo dataBuffer, @@ -263,7 +264,9 @@ RemoteFileDestReceiverStartup(DestReceiver *dest, int operation, elog(DEBUG1, "writing to local file \"%s\"", fileName); - resultDest->fileDesc = FileOpenForTransmit(fileName, fileFlags, fileMode); + resultDest->fileCompat = FileCompatFromFileStart(FileOpenForTransmit(fileName, + fileFlags, + fileMode)); } foreach(initialNodeCell, initialNodeList) @@ -329,7 +332,7 @@ RemoteFileDestReceiverStartup(DestReceiver *dest, int operation, if (resultDest->writeLocalFile) { - WriteToLocalFile(copyOutState->fe_msgbuf, resultDest->fileDesc); + WriteToLocalFile(copyOutState->fe_msgbuf, &resultDest->fileCompat); } } @@ -394,7 +397,7 @@ RemoteFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) /* write to local file (if applicable) */ if (resultDest->writeLocalFile) { - WriteToLocalFile(copyOutState->fe_msgbuf, resultDest->fileDesc); + WriteToLocalFile(copyOutState->fe_msgbuf, &resultDest->fileCompat); } MemoryContextSwitchTo(oldContext); @@ -411,9 +414,11 @@ RemoteFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) * WriteToLocalResultsFile writes the bytes in a StringInfo to a local file. */ static void -WriteToLocalFile(StringInfo copyData, File fileDesc) +WriteToLocalFile(StringInfo copyData, FileCompat *fileCompat) { - int bytesWritten = FileWrite(fileDesc, copyData->data, copyData->len, PG_WAIT_IO); + int bytesWritten = FileWriteCompat(fileCompat, copyData->data, + copyData->len, + PG_WAIT_IO); if (bytesWritten < 0) { ereport(ERROR, (errcode_for_file_access(), @@ -444,7 +449,7 @@ RemoteFileDestReceiverShutdown(DestReceiver *destReceiver) if (resultDest->writeLocalFile) { - WriteToLocalFile(copyOutState->fe_msgbuf, resultDest->fileDesc); + WriteToLocalFile(copyOutState->fe_msgbuf, &resultDest->fileCompat); } } @@ -453,7 +458,7 @@ RemoteFileDestReceiverShutdown(DestReceiver *destReceiver) if (resultDest->writeLocalFile) { - FileClose(resultDest->fileDesc); + FileClose(resultDest->fileCompat.fd); } } diff --git a/src/backend/distributed/worker/worker_partition_protocol.c b/src/backend/distributed/worker/worker_partition_protocol.c index d22ffe1d7..89f06a54f 100644 --- a/src/backend/distributed/worker/worker_partition_protocol.c +++ b/src/backend/distributed/worker/worker_partition_protocol.c @@ -65,8 +65,8 @@ static uint32 FileBufferSize(int partitionBufferSizeInKB, uint32 fileCount); static FileOutputStream * OpenPartitionFiles(StringInfo directoryName, uint32 fileCount); static void ClosePartitionFiles(FileOutputStream *partitionFileArray, uint32 fileCount); static void RenameDirectory(StringInfo oldDirectoryName, StringInfo newDirectoryName); -static void FileOutputStreamWrite(FileOutputStream file, StringInfo dataToWrite); -static void FileOutputStreamFlush(FileOutputStream file); +static void FileOutputStreamWrite(FileOutputStream *file, StringInfo dataToWrite); +static void FileOutputStreamFlush(FileOutputStream *file); static void FilterAndPartitionTable(const char *filterQuery, const char *columnName, Oid columnType, uint32 (*PartitionIdFunction)(Datum, const void *), @@ -221,6 +221,7 @@ worker_hash_partition_table(PG_FUNCTION_ARGS) partitionContext->hashFunction = hashFunction; partitionContext->partitionCount = partitionCount; + partitionContext->collation = PG_GET_COLLATION(); /* we'll use binary search, we need the comparison function */ if (!partitionContext->hasUniformHashDistribution) @@ -464,7 +465,7 @@ OpenPartitionFiles(StringInfo directoryName, uint32 fileCount) FileOutputStream *partitionFileArray = NULL; File fileDescriptor = 0; uint32 fileIndex = 0; - const int fileFlags = (O_APPEND | O_CREAT | O_RDWR | PG_BINARY); + const int fileFlags = (O_APPEND | O_CREAT | O_RDWR | O_TRUNC | PG_BINARY); const int fileMode = (S_IRUSR | S_IWUSR); partitionFileArray = palloc0(fileCount * sizeof(FileOutputStream)); @@ -480,7 +481,8 @@ OpenPartitionFiles(StringInfo directoryName, uint32 fileCount) errmsg("could not open file \"%s\": %m", filePath->data))); } - partitionFileArray[fileIndex].fileDescriptor = fileDescriptor; + partitionFileArray[fileIndex].fileCompat = FileCompatFromFileStart( + fileDescriptor); partitionFileArray[fileIndex].fileBuffer = makeStringInfo(); partitionFileArray[fileIndex].filePath = filePath; } @@ -500,13 +502,13 @@ ClosePartitionFiles(FileOutputStream *partitionFileArray, uint32 fileCount) uint32 fileIndex = 0; for (fileIndex = 0; fileIndex < fileCount; fileIndex++) { - FileOutputStream partitionFile = partitionFileArray[fileIndex]; + FileOutputStream *partitionFile = &partitionFileArray[fileIndex]; FileOutputStreamFlush(partitionFile); - FileClose(partitionFile.fileDescriptor); - FreeStringInfo(partitionFile.fileBuffer); - FreeStringInfo(partitionFile.filePath); + FileClose(partitionFile->fileCompat.fd); + FreeStringInfo(partitionFile->fileBuffer); + FreeStringInfo(partitionFile->filePath); } pfree(partitionFileArray); @@ -829,9 +831,9 @@ RenameDirectory(StringInfo oldDirectoryName, StringInfo newDirectoryName) * if so, the function flushes the buffer to the underlying file. */ static void -FileOutputStreamWrite(FileOutputStream file, StringInfo dataToWrite) +FileOutputStreamWrite(FileOutputStream *file, StringInfo dataToWrite) { - StringInfo fileBuffer = file.fileBuffer; + StringInfo fileBuffer = file->fileBuffer; uint32 newBufferSize = fileBuffer->len + dataToWrite->len; appendBinaryStringInfo(fileBuffer, dataToWrite->data, dataToWrite->len); @@ -847,19 +849,19 @@ FileOutputStreamWrite(FileOutputStream file, StringInfo dataToWrite) /* Flushes data buffered in the file stream object to the underlying file. */ static void -FileOutputStreamFlush(FileOutputStream file) +FileOutputStreamFlush(FileOutputStream *file) { - StringInfo fileBuffer = file.fileBuffer; + StringInfo fileBuffer = file->fileBuffer; int written = 0; errno = 0; - written = FileWrite(file.fileDescriptor, fileBuffer->data, fileBuffer->len, - PG_WAIT_IO); + written = FileWriteCompat(&file->fileCompat, fileBuffer->data, fileBuffer->len, + PG_WAIT_IO); if (written != fileBuffer->len) { ereport(ERROR, (errcode_for_file_access(), errmsg("could not write %d bytes to partition file \"%s\"", - fileBuffer->len, file.filePath->data))); + fileBuffer->len, file->filePath->data))); } } @@ -952,7 +954,7 @@ FilterAndPartitionTable(const char *filterQuery, { HeapTuple row = SPI_tuptable->vals[rowIndex]; TupleDesc rowDescriptor = SPI_tuptable->tupdesc; - FileOutputStream partitionFile = { 0, 0, 0 }; + FileOutputStream *partitionFile = NULL; StringInfo rowText = NULL; Datum partitionKey = 0; bool partitionKeyNull = false; @@ -988,7 +990,7 @@ FilterAndPartitionTable(const char *filterQuery, rowText = rowOutputState->fe_msgbuf; - partitionFile = partitionFileArray[partitionId]; + partitionFile = &partitionFileArray[partitionId]; FileOutputStreamWrite(partitionFile, rowText); resetStringInfo(rowText); @@ -1136,7 +1138,7 @@ OutputBinaryHeaders(FileOutputStream *partitionFileArray, uint32 fileCount) for (fileIndex = 0; fileIndex < fileCount; fileIndex++) { /* Generate header for a binary copy */ - FileOutputStream partitionFile = { 0, 0, 0 }; + FileOutputStream partitionFile = { }; CopyOutStateData headerOutputStateData; CopyOutState headerOutputState = (CopyOutState) & headerOutputStateData; @@ -1146,7 +1148,7 @@ OutputBinaryHeaders(FileOutputStream *partitionFileArray, uint32 fileCount) AppendCopyBinaryHeaders(headerOutputState); partitionFile = partitionFileArray[fileIndex]; - FileOutputStreamWrite(partitionFile, headerOutputState->fe_msgbuf); + FileOutputStreamWrite(&partitionFile, headerOutputState->fe_msgbuf); } } @@ -1162,7 +1164,7 @@ OutputBinaryFooters(FileOutputStream *partitionFileArray, uint32 fileCount) for (fileIndex = 0; fileIndex < fileCount; fileIndex++) { /* Generate footer for a binary copy */ - FileOutputStream partitionFile = { 0, 0, 0 }; + FileOutputStream partitionFile = { }; CopyOutStateData footerOutputStateData; CopyOutState footerOutputState = (CopyOutState) & footerOutputStateData; @@ -1172,7 +1174,7 @@ OutputBinaryFooters(FileOutputStream *partitionFileArray, uint32 fileCount) AppendCopyBinaryFooters(footerOutputState); partitionFile = partitionFileArray[fileIndex]; - FileOutputStreamWrite(partitionFile, footerOutputState->fe_msgbuf); + FileOutputStreamWrite(&partitionFile, footerOutputState->fe_msgbuf); } } @@ -1263,7 +1265,8 @@ HashPartitionId(Datum partitionValue, const void *context) ShardInterval **syntheticShardIntervalArray = hashPartitionContext->syntheticShardIntervalArray; FmgrInfo *comparisonFunction = hashPartitionContext->comparisonFunction; - Datum hashDatum = FunctionCall1(hashFunction, partitionValue); + Datum hashDatum = FunctionCall1Coll(hashFunction, hashPartitionContext->collation, + partitionValue); int32 hashResult = 0; uint32 hashPartitionId = 0; diff --git a/src/backend/distributed/worker/worker_sql_task_protocol.c b/src/backend/distributed/worker/worker_sql_task_protocol.c index f0a8a1b53..8c1cee426 100644 --- a/src/backend/distributed/worker/worker_sql_task_protocol.c +++ b/src/backend/distributed/worker/worker_sql_task_protocol.c @@ -16,6 +16,7 @@ #include "distributed/commands/multi_copy.h" #include "distributed/multi_executor.h" #include "distributed/transmit.h" +#include "distributed/version_compat.h" #include "distributed/worker_protocol.h" #include "utils/builtins.h" #include "utils/memutils.h" @@ -38,7 +39,7 @@ typedef struct TaskFileDestReceiver /* output file */ char *filePath; - File fileDesc; + FileCompat fileCompat; bool binaryCopyFormat; /* state on how to copy out data types */ @@ -55,7 +56,7 @@ static DestReceiver * CreateTaskFileDestReceiver(char *filePath, EState *executo static void TaskFileDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc inputTupleDescriptor); static bool TaskFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest); -static void WriteToLocalFile(StringInfo copyData, File fileDesc); +static void WriteToLocalFile(StringInfo copyData, TaskFileDestReceiver *taskFileDest); static void TaskFileDestReceiverShutdown(DestReceiver *destReceiver); static void TaskFileDestReceiverDestroy(DestReceiver *destReceiver); @@ -183,8 +184,10 @@ TaskFileDestReceiverStartup(DestReceiver *dest, int operation, taskFileDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor, copyOutState->binary); - taskFileDest->fileDesc = FileOpenForTransmit(taskFileDest->filePath, fileFlags, - fileMode); + taskFileDest->fileCompat = FileCompatFromFileStart(FileOpenForTransmit( + taskFileDest->filePath, + fileFlags, + fileMode)); if (copyOutState->binary) { @@ -192,7 +195,7 @@ TaskFileDestReceiverStartup(DestReceiver *dest, int operation, resetStringInfo(copyOutState->fe_msgbuf); AppendCopyBinaryHeaders(copyOutState); - WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest->fileDesc); + WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest); } MemoryContextSwitchTo(oldContext); @@ -233,7 +236,7 @@ TaskFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) AppendCopyRowData(columnValues, columnNulls, tupleDescriptor, copyOutState, columnOutputFunctions, NULL); - WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest->fileDesc); + WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest); MemoryContextSwitchTo(oldContext); @@ -249,9 +252,10 @@ TaskFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) * WriteToLocalResultsFile writes the bytes in a StringInfo to a local file. */ static void -WriteToLocalFile(StringInfo copyData, File fileDesc) +WriteToLocalFile(StringInfo copyData, TaskFileDestReceiver *taskFileDest) { - int bytesWritten = FileWrite(fileDesc, copyData->data, copyData->len, PG_WAIT_IO); + int bytesWritten = FileWriteCompat(&taskFileDest->fileCompat, copyData->data, + copyData->len, PG_WAIT_IO); if (bytesWritten < 0) { ereport(ERROR, (errcode_for_file_access(), @@ -276,10 +280,10 @@ TaskFileDestReceiverShutdown(DestReceiver *destReceiver) /* write footers when using binary encoding */ resetStringInfo(copyOutState->fe_msgbuf); AppendCopyBinaryFooters(copyOutState); - WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest->fileDesc); + WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest); } - FileClose(taskFileDest->fileDesc); + FileClose(taskFileDest->fileCompat.fd); } diff --git a/src/include/distributed/version_compat.h b/src/include/distributed/version_compat.h index c21ce8302..7dbe2f137 100644 --- a/src/include/distributed/version_compat.h +++ b/src/include/distributed/version_compat.h @@ -257,6 +257,48 @@ RangeVarGetRelidInternal(const RangeVar *relation, LOCKMODE lockmode, uint32 fla #define GetSysCacheOid3Compat GetSysCacheOid3 #define GetSysCacheOid4Compat GetSysCacheOid4 +typedef struct +{ + File fd; + off_t offset; +} FileCompat; + +static inline int +FileWriteCompat(FileCompat *file, char *buffer, int amount, uint32 wait_event_info) +{ + int count = FileWrite(file->fd, buffer, amount, file->offset, wait_event_info); + if (count > 0) + { + file->offset += count; + } + return count; +} + + +static inline int +FileReadCompat(FileCompat *file, char *buffer, int amount, uint32 wait_event_info) +{ + int count = FileRead(file->fd, buffer, amount, file->offset, wait_event_info); + if (count > 0) + { + file->offset += count; + } + return count; +} + + +static inline FileCompat +FileCompatFromFileStart(File fileDesc) +{ + FileCompat fc = { + .fd = fileDesc, + .offset = 0 + }; + + return fc; +} + + #else /* pre PG12 */ #define QTW_EXAMINE_RTES_BEFORE QTW_EXAMINE_RTES #define MakeSingleTupleTableSlotCompat(tupleDesc, tts_opts) \ @@ -277,6 +319,36 @@ RangeVarGetRelidInternal(const RangeVar *relation, LOCKMODE lockmode, uint32 fla #define GetSysCacheOid4Compat(cacheId, oidcol, key1, key2, key3, key4) \ GetSysCacheOid4(cacheId, key1, key2, key3, key4) +typedef struct +{ + File fd; +} FileCompat; + +static inline int +FileWriteCompat(FileCompat *file, char *buffer, int amount, uint32 wait_event_info) +{ + return FileWrite(file->fd, buffer, amount, wait_event_info); +} + + +static inline int +FileReadCompat(FileCompat *file, char *buffer, int amount, uint32 wait_event_info) +{ + return FileRead(file->fd, buffer, amount, wait_event_info); +} + + +static inline FileCompat +FileCompatFromFileStart(File fileDesc) +{ + FileCompat fc = { + .fd = fileDesc, + }; + + return fc; +} + + #endif /* PG12 */ #endif /* VERSION_COMPAT_H */ diff --git a/src/include/distributed/worker_protocol.h b/src/include/distributed/worker_protocol.h index 5dca935cb..5ee20d68f 100644 --- a/src/include/distributed/worker_protocol.h +++ b/src/include/distributed/worker_protocol.h @@ -91,7 +91,7 @@ typedef struct HashPartitionContext */ typedef struct FileOutputStream { - File fileDescriptor; + FileCompat fileCompat; StringInfo fileBuffer; StringInfo filePath; } FileOutputStream;