Implement FileCompat to abstract pg12 requiring API consumer to track file offsets

pull/2844/head
Philip Dubé 2019-08-08 21:59:58 +00:00
parent 018ad1c58e
commit fe10ca453d
6 changed files with 136 additions and 52 deletions

View File

@ -43,11 +43,10 @@ RedirectCopyDataToRegularFile(const char *filename)
{
StringInfo copyData = makeStringInfo();
bool copyDone = false;
File fileDesc = -1;
const int fileFlags = (O_APPEND | O_CREAT | O_RDWR | O_TRUNC | PG_BINARY);
const int fileMode = (S_IRUSR | S_IWUSR);
fileDesc = FileOpenForTransmit(filename, fileFlags, fileMode);
File fileDesc = FileOpenForTransmit(filename, fileFlags, fileMode);
FileCompat fileCompat = FileCompatFromFileStart(fileDesc);
SendCopyInStart();
@ -57,8 +56,8 @@ RedirectCopyDataToRegularFile(const char *filename)
/* if received data has contents, append to regular file */
if (copyData->len > 0)
{
int appended = FileWrite(fileDesc, copyData->data, copyData->len,
PG_WAIT_IO);
int appended = FileWriteCompat(&fileCompat, copyData->data,
copyData->len, PG_WAIT_IO);
if (appended != copyData->len)
{
@ -84,7 +83,6 @@ RedirectCopyDataToRegularFile(const char *filename)
void
SendRegularFile(const char *filename)
{
File fileDesc = -1;
StringInfo fileBuffer = NULL;
int readBytes = -1;
const uint32 fileBufferSize = 32768; /* 32 KB */
@ -92,7 +90,8 @@ SendRegularFile(const char *filename)
const int fileMode = 0;
/* we currently do not check if the caller has permissions for this file */
fileDesc = FileOpenForTransmit(filename, fileFlags, fileMode);
File fileDesc = FileOpenForTransmit(filename, fileFlags, fileMode);
FileCompat fileCompat = FileCompatFromFileStart(fileDesc);
/*
* We read file's contents into buffers of 32 KB. This buffer size is twice
@ -103,7 +102,8 @@ SendRegularFile(const char *filename)
SendCopyOutStart();
readBytes = FileRead(fileDesc, fileBuffer->data, fileBufferSize, PG_WAIT_IO);
readBytes = FileReadCompat(&fileCompat, fileBuffer->data, fileBufferSize,
PG_WAIT_IO);
while (readBytes > 0)
{
fileBuffer->len = readBytes;
@ -111,8 +111,8 @@ SendRegularFile(const char *filename)
SendCopyData(fileBuffer);
resetStringInfo(fileBuffer);
readBytes = FileRead(fileDesc, fileBuffer->data, fileBufferSize,
PG_WAIT_IO);
readBytes = FileReadCompat(&fileCompat, fileBuffer->data, fileBufferSize,
PG_WAIT_IO);
}
SendCopyDone();

View File

@ -27,6 +27,7 @@
#include "distributed/remote_commands.h"
#include "distributed/transmit.h"
#include "distributed/transaction_identifier.h"
#include "distributed/version_compat.h"
#include "distributed/worker_protocol.h"
#include "nodes/makefuncs.h"
#include "nodes/parsenodes.h"
@ -65,7 +66,7 @@ typedef struct RemoteFileDestReceiver
/* whether to write to a local file */
bool writeLocalFile;
File fileDesc;
FileCompat fileCompat;
/* state on how to copy out data types */
CopyOutState copyOutState;
@ -79,7 +80,7 @@ typedef struct RemoteFileDestReceiver
static void RemoteFileDestReceiverStartup(DestReceiver *dest, int operation,
TupleDesc inputTupleDescriptor);
static StringInfo ConstructCopyResultStatement(const char *resultId);
static void WriteToLocalFile(StringInfo copyData, File fileDesc);
static void WriteToLocalFile(StringInfo copyData, FileCompat *fileCompat);
static bool RemoteFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest);
static void BroadcastCopyData(StringInfo dataBuffer, List *connectionList);
static void SendCopyDataOverConnection(StringInfo dataBuffer,
@ -263,7 +264,9 @@ RemoteFileDestReceiverStartup(DestReceiver *dest, int operation,
elog(DEBUG1, "writing to local file \"%s\"", fileName);
resultDest->fileDesc = FileOpenForTransmit(fileName, fileFlags, fileMode);
resultDest->fileCompat = FileCompatFromFileStart(FileOpenForTransmit(fileName,
fileFlags,
fileMode));
}
foreach(initialNodeCell, initialNodeList)
@ -329,7 +332,7 @@ RemoteFileDestReceiverStartup(DestReceiver *dest, int operation,
if (resultDest->writeLocalFile)
{
WriteToLocalFile(copyOutState->fe_msgbuf, resultDest->fileDesc);
WriteToLocalFile(copyOutState->fe_msgbuf, &resultDest->fileCompat);
}
}
@ -394,7 +397,7 @@ RemoteFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
/* write to local file (if applicable) */
if (resultDest->writeLocalFile)
{
WriteToLocalFile(copyOutState->fe_msgbuf, resultDest->fileDesc);
WriteToLocalFile(copyOutState->fe_msgbuf, &resultDest->fileCompat);
}
MemoryContextSwitchTo(oldContext);
@ -411,9 +414,11 @@ RemoteFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
* WriteToLocalResultsFile writes the bytes in a StringInfo to a local file.
*/
static void
WriteToLocalFile(StringInfo copyData, File fileDesc)
WriteToLocalFile(StringInfo copyData, FileCompat *fileCompat)
{
int bytesWritten = FileWrite(fileDesc, copyData->data, copyData->len, PG_WAIT_IO);
int bytesWritten = FileWriteCompat(fileCompat, copyData->data,
copyData->len,
PG_WAIT_IO);
if (bytesWritten < 0)
{
ereport(ERROR, (errcode_for_file_access(),
@ -444,7 +449,7 @@ RemoteFileDestReceiverShutdown(DestReceiver *destReceiver)
if (resultDest->writeLocalFile)
{
WriteToLocalFile(copyOutState->fe_msgbuf, resultDest->fileDesc);
WriteToLocalFile(copyOutState->fe_msgbuf, &resultDest->fileCompat);
}
}
@ -453,7 +458,7 @@ RemoteFileDestReceiverShutdown(DestReceiver *destReceiver)
if (resultDest->writeLocalFile)
{
FileClose(resultDest->fileDesc);
FileClose(resultDest->fileCompat.fd);
}
}

View File

@ -65,8 +65,8 @@ static uint32 FileBufferSize(int partitionBufferSizeInKB, uint32 fileCount);
static FileOutputStream * OpenPartitionFiles(StringInfo directoryName, uint32 fileCount);
static void ClosePartitionFiles(FileOutputStream *partitionFileArray, uint32 fileCount);
static void RenameDirectory(StringInfo oldDirectoryName, StringInfo newDirectoryName);
static void FileOutputStreamWrite(FileOutputStream file, StringInfo dataToWrite);
static void FileOutputStreamFlush(FileOutputStream file);
static void FileOutputStreamWrite(FileOutputStream *file, StringInfo dataToWrite);
static void FileOutputStreamFlush(FileOutputStream *file);
static void FilterAndPartitionTable(const char *filterQuery,
const char *columnName, Oid columnType,
uint32 (*PartitionIdFunction)(Datum, const void *),
@ -221,6 +221,7 @@ worker_hash_partition_table(PG_FUNCTION_ARGS)
partitionContext->hashFunction = hashFunction;
partitionContext->partitionCount = partitionCount;
partitionContext->collation = PG_GET_COLLATION();
/* we'll use binary search, we need the comparison function */
if (!partitionContext->hasUniformHashDistribution)
@ -464,7 +465,7 @@ OpenPartitionFiles(StringInfo directoryName, uint32 fileCount)
FileOutputStream *partitionFileArray = NULL;
File fileDescriptor = 0;
uint32 fileIndex = 0;
const int fileFlags = (O_APPEND | O_CREAT | O_RDWR | PG_BINARY);
const int fileFlags = (O_APPEND | O_CREAT | O_RDWR | O_TRUNC | PG_BINARY);
const int fileMode = (S_IRUSR | S_IWUSR);
partitionFileArray = palloc0(fileCount * sizeof(FileOutputStream));
@ -480,7 +481,8 @@ OpenPartitionFiles(StringInfo directoryName, uint32 fileCount)
errmsg("could not open file \"%s\": %m", filePath->data)));
}
partitionFileArray[fileIndex].fileDescriptor = fileDescriptor;
partitionFileArray[fileIndex].fileCompat = FileCompatFromFileStart(
fileDescriptor);
partitionFileArray[fileIndex].fileBuffer = makeStringInfo();
partitionFileArray[fileIndex].filePath = filePath;
}
@ -500,13 +502,13 @@ ClosePartitionFiles(FileOutputStream *partitionFileArray, uint32 fileCount)
uint32 fileIndex = 0;
for (fileIndex = 0; fileIndex < fileCount; fileIndex++)
{
FileOutputStream partitionFile = partitionFileArray[fileIndex];
FileOutputStream *partitionFile = &partitionFileArray[fileIndex];
FileOutputStreamFlush(partitionFile);
FileClose(partitionFile.fileDescriptor);
FreeStringInfo(partitionFile.fileBuffer);
FreeStringInfo(partitionFile.filePath);
FileClose(partitionFile->fileCompat.fd);
FreeStringInfo(partitionFile->fileBuffer);
FreeStringInfo(partitionFile->filePath);
}
pfree(partitionFileArray);
@ -829,9 +831,9 @@ RenameDirectory(StringInfo oldDirectoryName, StringInfo newDirectoryName)
* if so, the function flushes the buffer to the underlying file.
*/
static void
FileOutputStreamWrite(FileOutputStream file, StringInfo dataToWrite)
FileOutputStreamWrite(FileOutputStream *file, StringInfo dataToWrite)
{
StringInfo fileBuffer = file.fileBuffer;
StringInfo fileBuffer = file->fileBuffer;
uint32 newBufferSize = fileBuffer->len + dataToWrite->len;
appendBinaryStringInfo(fileBuffer, dataToWrite->data, dataToWrite->len);
@ -847,19 +849,19 @@ FileOutputStreamWrite(FileOutputStream file, StringInfo dataToWrite)
/* Flushes data buffered in the file stream object to the underlying file. */
static void
FileOutputStreamFlush(FileOutputStream file)
FileOutputStreamFlush(FileOutputStream *file)
{
StringInfo fileBuffer = file.fileBuffer;
StringInfo fileBuffer = file->fileBuffer;
int written = 0;
errno = 0;
written = FileWrite(file.fileDescriptor, fileBuffer->data, fileBuffer->len,
PG_WAIT_IO);
written = FileWriteCompat(&file->fileCompat, fileBuffer->data, fileBuffer->len,
PG_WAIT_IO);
if (written != fileBuffer->len)
{
ereport(ERROR, (errcode_for_file_access(),
errmsg("could not write %d bytes to partition file \"%s\"",
fileBuffer->len, file.filePath->data)));
fileBuffer->len, file->filePath->data)));
}
}
@ -952,7 +954,7 @@ FilterAndPartitionTable(const char *filterQuery,
{
HeapTuple row = SPI_tuptable->vals[rowIndex];
TupleDesc rowDescriptor = SPI_tuptable->tupdesc;
FileOutputStream partitionFile = { 0, 0, 0 };
FileOutputStream *partitionFile = NULL;
StringInfo rowText = NULL;
Datum partitionKey = 0;
bool partitionKeyNull = false;
@ -988,7 +990,7 @@ FilterAndPartitionTable(const char *filterQuery,
rowText = rowOutputState->fe_msgbuf;
partitionFile = partitionFileArray[partitionId];
partitionFile = &partitionFileArray[partitionId];
FileOutputStreamWrite(partitionFile, rowText);
resetStringInfo(rowText);
@ -1136,7 +1138,7 @@ OutputBinaryHeaders(FileOutputStream *partitionFileArray, uint32 fileCount)
for (fileIndex = 0; fileIndex < fileCount; fileIndex++)
{
/* Generate header for a binary copy */
FileOutputStream partitionFile = { 0, 0, 0 };
FileOutputStream partitionFile = { };
CopyOutStateData headerOutputStateData;
CopyOutState headerOutputState = (CopyOutState) & headerOutputStateData;
@ -1146,7 +1148,7 @@ OutputBinaryHeaders(FileOutputStream *partitionFileArray, uint32 fileCount)
AppendCopyBinaryHeaders(headerOutputState);
partitionFile = partitionFileArray[fileIndex];
FileOutputStreamWrite(partitionFile, headerOutputState->fe_msgbuf);
FileOutputStreamWrite(&partitionFile, headerOutputState->fe_msgbuf);
}
}
@ -1162,7 +1164,7 @@ OutputBinaryFooters(FileOutputStream *partitionFileArray, uint32 fileCount)
for (fileIndex = 0; fileIndex < fileCount; fileIndex++)
{
/* Generate footer for a binary copy */
FileOutputStream partitionFile = { 0, 0, 0 };
FileOutputStream partitionFile = { };
CopyOutStateData footerOutputStateData;
CopyOutState footerOutputState = (CopyOutState) & footerOutputStateData;
@ -1172,7 +1174,7 @@ OutputBinaryFooters(FileOutputStream *partitionFileArray, uint32 fileCount)
AppendCopyBinaryFooters(footerOutputState);
partitionFile = partitionFileArray[fileIndex];
FileOutputStreamWrite(partitionFile, footerOutputState->fe_msgbuf);
FileOutputStreamWrite(&partitionFile, footerOutputState->fe_msgbuf);
}
}
@ -1263,7 +1265,8 @@ HashPartitionId(Datum partitionValue, const void *context)
ShardInterval **syntheticShardIntervalArray =
hashPartitionContext->syntheticShardIntervalArray;
FmgrInfo *comparisonFunction = hashPartitionContext->comparisonFunction;
Datum hashDatum = FunctionCall1(hashFunction, partitionValue);
Datum hashDatum = FunctionCall1Coll(hashFunction, hashPartitionContext->collation,
partitionValue);
int32 hashResult = 0;
uint32 hashPartitionId = 0;

View File

@ -16,6 +16,7 @@
#include "distributed/commands/multi_copy.h"
#include "distributed/multi_executor.h"
#include "distributed/transmit.h"
#include "distributed/version_compat.h"
#include "distributed/worker_protocol.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
@ -38,7 +39,7 @@ typedef struct TaskFileDestReceiver
/* output file */
char *filePath;
File fileDesc;
FileCompat fileCompat;
bool binaryCopyFormat;
/* state on how to copy out data types */
@ -55,7 +56,7 @@ static DestReceiver * CreateTaskFileDestReceiver(char *filePath, EState *executo
static void TaskFileDestReceiverStartup(DestReceiver *dest, int operation,
TupleDesc inputTupleDescriptor);
static bool TaskFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest);
static void WriteToLocalFile(StringInfo copyData, File fileDesc);
static void WriteToLocalFile(StringInfo copyData, TaskFileDestReceiver *taskFileDest);
static void TaskFileDestReceiverShutdown(DestReceiver *destReceiver);
static void TaskFileDestReceiverDestroy(DestReceiver *destReceiver);
@ -183,8 +184,10 @@ TaskFileDestReceiverStartup(DestReceiver *dest, int operation,
taskFileDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor,
copyOutState->binary);
taskFileDest->fileDesc = FileOpenForTransmit(taskFileDest->filePath, fileFlags,
fileMode);
taskFileDest->fileCompat = FileCompatFromFileStart(FileOpenForTransmit(
taskFileDest->filePath,
fileFlags,
fileMode));
if (copyOutState->binary)
{
@ -192,7 +195,7 @@ TaskFileDestReceiverStartup(DestReceiver *dest, int operation,
resetStringInfo(copyOutState->fe_msgbuf);
AppendCopyBinaryHeaders(copyOutState);
WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest->fileDesc);
WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest);
}
MemoryContextSwitchTo(oldContext);
@ -233,7 +236,7 @@ TaskFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
AppendCopyRowData(columnValues, columnNulls, tupleDescriptor,
copyOutState, columnOutputFunctions, NULL);
WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest->fileDesc);
WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest);
MemoryContextSwitchTo(oldContext);
@ -249,9 +252,10 @@ TaskFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
* WriteToLocalResultsFile writes the bytes in a StringInfo to a local file.
*/
static void
WriteToLocalFile(StringInfo copyData, File fileDesc)
WriteToLocalFile(StringInfo copyData, TaskFileDestReceiver *taskFileDest)
{
int bytesWritten = FileWrite(fileDesc, copyData->data, copyData->len, PG_WAIT_IO);
int bytesWritten = FileWriteCompat(&taskFileDest->fileCompat, copyData->data,
copyData->len, PG_WAIT_IO);
if (bytesWritten < 0)
{
ereport(ERROR, (errcode_for_file_access(),
@ -276,10 +280,10 @@ TaskFileDestReceiverShutdown(DestReceiver *destReceiver)
/* write footers when using binary encoding */
resetStringInfo(copyOutState->fe_msgbuf);
AppendCopyBinaryFooters(copyOutState);
WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest->fileDesc);
WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest);
}
FileClose(taskFileDest->fileDesc);
FileClose(taskFileDest->fileCompat.fd);
}

View File

@ -257,6 +257,48 @@ RangeVarGetRelidInternal(const RangeVar *relation, LOCKMODE lockmode, uint32 fla
#define GetSysCacheOid3Compat GetSysCacheOid3
#define GetSysCacheOid4Compat GetSysCacheOid4
typedef struct
{
File fd;
off_t offset;
} FileCompat;
static inline int
FileWriteCompat(FileCompat *file, char *buffer, int amount, uint32 wait_event_info)
{
int count = FileWrite(file->fd, buffer, amount, file->offset, wait_event_info);
if (count > 0)
{
file->offset += count;
}
return count;
}
static inline int
FileReadCompat(FileCompat *file, char *buffer, int amount, uint32 wait_event_info)
{
int count = FileRead(file->fd, buffer, amount, file->offset, wait_event_info);
if (count > 0)
{
file->offset += count;
}
return count;
}
static inline FileCompat
FileCompatFromFileStart(File fileDesc)
{
FileCompat fc = {
.fd = fileDesc,
.offset = 0
};
return fc;
}
#else /* pre PG12 */
#define QTW_EXAMINE_RTES_BEFORE QTW_EXAMINE_RTES
#define MakeSingleTupleTableSlotCompat(tupleDesc, tts_opts) \
@ -277,6 +319,36 @@ RangeVarGetRelidInternal(const RangeVar *relation, LOCKMODE lockmode, uint32 fla
#define GetSysCacheOid4Compat(cacheId, oidcol, key1, key2, key3, key4) \
GetSysCacheOid4(cacheId, key1, key2, key3, key4)
typedef struct
{
File fd;
} FileCompat;
static inline int
FileWriteCompat(FileCompat *file, char *buffer, int amount, uint32 wait_event_info)
{
return FileWrite(file->fd, buffer, amount, wait_event_info);
}
static inline int
FileReadCompat(FileCompat *file, char *buffer, int amount, uint32 wait_event_info)
{
return FileRead(file->fd, buffer, amount, wait_event_info);
}
static inline FileCompat
FileCompatFromFileStart(File fileDesc)
{
FileCompat fc = {
.fd = fileDesc,
};
return fc;
}
#endif /* PG12 */
#endif /* VERSION_COMPAT_H */

View File

@ -91,7 +91,7 @@ typedef struct HashPartitionContext
*/
typedef struct FileOutputStream
{
File fileDescriptor;
FileCompat fileCompat;
StringInfo fileBuffer;
StringInfo filePath;
} FileOutputStream;