WriteToLocalFile: track offset

Remove InitFunctionCallInfoDataCompat
%m
read_write_etc
Philip Dubé 2019-07-15 23:09:59 +00:00
parent e3b3b85e35
commit 0528bba907
5 changed files with 31 additions and 26 deletions

View File

@ -67,6 +67,7 @@ typedef struct RemoteFileDestReceiver
/* whether to write to a local file */ /* whether to write to a local file */
bool writeLocalFile; bool writeLocalFile;
File fileDesc; File fileDesc;
off_t offset;
/* state on how to copy out data types */ /* state on how to copy out data types */
CopyOutState copyOutState; CopyOutState copyOutState;
@ -80,7 +81,7 @@ typedef struct RemoteFileDestReceiver
static void RemoteFileDestReceiverStartup(DestReceiver *dest, int operation, static void RemoteFileDestReceiverStartup(DestReceiver *dest, int operation,
TupleDesc inputTupleDescriptor); TupleDesc inputTupleDescriptor);
static StringInfo ConstructCopyResultStatement(const char *resultId); static StringInfo ConstructCopyResultStatement(const char *resultId);
static void WriteToLocalFile(StringInfo copyData, File fileDesc); static void WriteToLocalFile(StringInfo copyData, RemoteFileDestReceiver *fileDest);
static bool RemoteFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest); static bool RemoteFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest);
static void BroadcastCopyData(StringInfo dataBuffer, List *connectionList); static void BroadcastCopyData(StringInfo dataBuffer, List *connectionList);
static void SendCopyDataOverConnection(StringInfo dataBuffer, static void SendCopyDataOverConnection(StringInfo dataBuffer,
@ -265,6 +266,7 @@ RemoteFileDestReceiverStartup(DestReceiver *dest, int operation,
elog(DEBUG1, "writing to local file \"%s\"", fileName); elog(DEBUG1, "writing to local file \"%s\"", fileName);
resultDest->fileDesc = FileOpenForTransmit(fileName, fileFlags, fileMode); resultDest->fileDesc = FileOpenForTransmit(fileName, fileFlags, fileMode);
resultDest->offset = 0;
} }
foreach(initialNodeCell, initialNodeList) foreach(initialNodeCell, initialNodeList)
@ -330,7 +332,7 @@ RemoteFileDestReceiverStartup(DestReceiver *dest, int operation,
if (resultDest->writeLocalFile) if (resultDest->writeLocalFile)
{ {
WriteToLocalFile(copyOutState->fe_msgbuf, resultDest->fileDesc); WriteToLocalFile(copyOutState->fe_msgbuf, resultDest);
} }
} }
@ -395,7 +397,7 @@ RemoteFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
/* write to local file (if applicable) */ /* write to local file (if applicable) */
if (resultDest->writeLocalFile) if (resultDest->writeLocalFile)
{ {
WriteToLocalFile(copyOutState->fe_msgbuf, resultDest->fileDesc); WriteToLocalFile(copyOutState->fe_msgbuf, resultDest);
} }
MemoryContextSwitchTo(oldContext); MemoryContextSwitchTo(oldContext);
@ -412,15 +414,18 @@ RemoteFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
* WriteToLocalResultsFile writes the bytes in a StringInfo to a local file. * WriteToLocalResultsFile writes the bytes in a StringInfo to a local file.
*/ */
static void static void
WriteToLocalFile(StringInfo copyData, File fileDesc) WriteToLocalFile(StringInfo copyData, RemoteFileDestReceiver *fileDest)
{ {
int bytesWritten = FileWriteCompat(fileDesc, copyData->data, copyData->len, 0, int bytesWritten = FileWriteCompat(fileDest->fileDesc, copyData->data, copyData->len,
fileDest->offset,
PG_WAIT_IO); PG_WAIT_IO);
if (bytesWritten < 0) if (bytesWritten < 0)
{ {
ereport(ERROR, (errcode_for_file_access(), ereport(ERROR, (errcode_for_file_access(),
errmsg("could not append to file: %m"))); errmsg("could not append to file: %m")));
} }
fileDest->offset += bytesWritten;
} }
@ -446,7 +451,7 @@ RemoteFileDestReceiverShutdown(DestReceiver *destReceiver)
if (resultDest->writeLocalFile) if (resultDest->writeLocalFile)
{ {
WriteToLocalFile(copyOutState->fe_msgbuf, resultDest->fileDesc); WriteToLocalFile(copyOutState->fe_msgbuf, resultDest);
} }
} }

View File

@ -273,7 +273,7 @@ PruneShards(Oid relationId, Index rangeTableId, List *whereClauseList,
if (cacheEntry->shardIntervalCompareFunction) if (cacheEntry->shardIntervalCompareFunction)
{ {
/* initiate function call info once (allows comparators to cache metadata) */ /* initiate function call info once (allows comparators to cache metadata) */
InitFunctionCallInfoDataCompat(*(FunctionCallInfo) & InitFunctionCallInfoData(*(FunctionCallInfo) &
context.compareIntervalFunctionCall, context.compareIntervalFunctionCall,
cacheEntry->shardIntervalCompareFunction, cacheEntry->shardIntervalCompareFunction,
2, DEFAULT_COLLATION_OID, NULL, NULL); 2, DEFAULT_COLLATION_OID, NULL, NULL);
@ -287,7 +287,7 @@ PruneShards(Oid relationId, Index rangeTableId, List *whereClauseList,
if (cacheEntry->shardColumnCompareFunction) if (cacheEntry->shardColumnCompareFunction)
{ {
/* initiate function call info once (allows comparators to cache metadata) */ /* initiate function call info once (allows comparators to cache metadata) */
InitFunctionCallInfoDataCompat(*(FunctionCallInfo) & InitFunctionCallInfoData(*(FunctionCallInfo) &
context.compareValueFunctionCall, context.compareValueFunctionCall,
cacheEntry->shardColumnCompareFunction, cacheEntry->shardColumnCompareFunction,
2, DEFAULT_COLLATION_OID, NULL, NULL); 2, DEFAULT_COLLATION_OID, NULL, NULL);

View File

@ -756,10 +756,8 @@ AppendShardIdToName(char **name, uint64 shardId)
neededBytes = snprintf((*name), NAMEDATALEN, "%s", extendedName); neededBytes = snprintf((*name), NAMEDATALEN, "%s", extendedName);
if (neededBytes < 0) if (neededBytes < 0)
{ {
char *strerrno = strerror(errno);
ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory: %s", strerrno))); errmsg("out of memory: %m")));
} }
else if (neededBytes >= NAMEDATALEN) else if (neededBytes >= NAMEDATALEN)
{ {

View File

@ -41,6 +41,7 @@ typedef struct TaskFileDestReceiver
char *filePath; char *filePath;
File fileDesc; File fileDesc;
bool binaryCopyFormat; bool binaryCopyFormat;
off_t offset;
/* state on how to copy out data types */ /* state on how to copy out data types */
CopyOutState copyOutState; CopyOutState copyOutState;
@ -56,7 +57,7 @@ static DestReceiver * CreateTaskFileDestReceiver(char *filePath, EState *executo
static void TaskFileDestReceiverStartup(DestReceiver *dest, int operation, static void TaskFileDestReceiverStartup(DestReceiver *dest, int operation,
TupleDesc inputTupleDescriptor); TupleDesc inputTupleDescriptor);
static bool TaskFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest); static bool TaskFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest);
static void WriteToLocalFile(StringInfo copyData, File fileDesc); static void WriteToLocalFile(StringInfo copyData, TaskFileDestReceiver *taskFileDest);
static void TaskFileDestReceiverShutdown(DestReceiver *destReceiver); static void TaskFileDestReceiverShutdown(DestReceiver *destReceiver);
static void TaskFileDestReceiverDestroy(DestReceiver *destReceiver); static void TaskFileDestReceiverDestroy(DestReceiver *destReceiver);
@ -186,6 +187,7 @@ TaskFileDestReceiverStartup(DestReceiver *dest, int operation,
taskFileDest->fileDesc = FileOpenForTransmit(taskFileDest->filePath, fileFlags, taskFileDest->fileDesc = FileOpenForTransmit(taskFileDest->filePath, fileFlags,
fileMode); fileMode);
taskFileDest->offset = 0;
if (copyOutState->binary) if (copyOutState->binary)
{ {
@ -193,7 +195,7 @@ TaskFileDestReceiverStartup(DestReceiver *dest, int operation,
resetStringInfo(copyOutState->fe_msgbuf); resetStringInfo(copyOutState->fe_msgbuf);
AppendCopyBinaryHeaders(copyOutState); AppendCopyBinaryHeaders(copyOutState);
WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest->fileDesc); WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest);
} }
MemoryContextSwitchTo(oldContext); MemoryContextSwitchTo(oldContext);
@ -234,7 +236,7 @@ TaskFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
AppendCopyRowData(columnValues, columnNulls, tupleDescriptor, AppendCopyRowData(columnValues, columnNulls, tupleDescriptor,
copyOutState, columnOutputFunctions, NULL); copyOutState, columnOutputFunctions, NULL);
WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest->fileDesc); WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest);
MemoryContextSwitchTo(oldContext); MemoryContextSwitchTo(oldContext);
@ -250,15 +252,18 @@ TaskFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
* WriteToLocalResultsFile writes the bytes in a StringInfo to a local file. * WriteToLocalResultsFile writes the bytes in a StringInfo to a local file.
*/ */
static void static void
WriteToLocalFile(StringInfo copyData, File fileDesc) WriteToLocalFile(StringInfo copyData, TaskFileDestReceiver *taskFileDest)
{ {
int bytesWritten = FileWriteCompat(fileDesc, copyData->data, copyData->len, 0, int bytesWritten = FileWriteCompat(taskFileDest->fileDesc, copyData->data,
copyData->len, taskFileDest->offset,
PG_WAIT_IO); PG_WAIT_IO);
if (bytesWritten < 0) if (bytesWritten < 0)
{ {
ereport(ERROR, (errcode_for_file_access(), ereport(ERROR, (errcode_for_file_access(),
errmsg("could not append to file: %m"))); errmsg("could not append to file: %m")));
} }
taskFileDest->offset += bytesWritten;
} }
@ -278,7 +283,7 @@ TaskFileDestReceiverShutdown(DestReceiver *destReceiver)
/* write footers when using binary encoding */ /* write footers when using binary encoding */
resetStringInfo(copyOutState->fe_msgbuf); resetStringInfo(copyOutState->fe_msgbuf);
AppendCopyBinaryFooters(copyOutState); AppendCopyBinaryFooters(copyOutState);
WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest->fileDesc); WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest);
} }
FileClose(taskFileDest->fileDesc); FileClose(taskFileDest->fileDesc);

View File

@ -267,7 +267,6 @@ RangeVarGetRelidInternal(const RangeVar *relation, LOCKMODE lockmode, uint32 fla
((fc)->args[n].isnull = false, (fc)->args[n].value = (argval)) ((fc)->args[n].isnull = false, (fc)->args[n].value = (argval))
#define fcSetArgNull(fc, n) \ #define fcSetArgNull(fc, n) \
((fc)->args[n].isnull = true, (fc)->args[n].value = (Datum) 0) ((fc)->args[n].isnull = true, (fc)->args[n].value = (Datum) 0)
#define InitFunctionCallInfoDataCompat InitFunctionCallInfoData
#else /* pre PG12 */ #else /* pre PG12 */
#define QTW_EXAMINE_RTES_BEFORE QTW_EXAMINE_RTES #define QTW_EXAMINE_RTES_BEFORE QTW_EXAMINE_RTES
@ -297,8 +296,6 @@ RangeVarGetRelidInternal(const RangeVar *relation, LOCKMODE lockmode, uint32 fla
(((fc)->argnull[n] = false), ((fc)->arg[n] = (value))) (((fc)->argnull[n] = false), ((fc)->arg[n] = (value)))
#define fcSetArgNull(fc, n) \ #define fcSetArgNull(fc, n) \
(((fc)->argnull[n] = true), ((fc)->arg[n] = (Datum) 0)) (((fc)->argnull[n] = true), ((fc)->arg[n] = (Datum) 0))
#define InitFunctionCallInfoDataCompat(fc, fn, nargs, collation, ctx, result) \
InitFunctionCallInfoData(fc, fn, nargs, collation, ctx, result)
#define FileReadCompat(file, buffer, amount, offset, wait_event_info) \ #define FileReadCompat(file, buffer, amount, offset, wait_event_info) \
FileRead(file, buffer, amount, wait_event_info) FileRead(file, buffer, amount, wait_event_info)