mirror of https://github.com/citusdata/citus.git
read_write_etc
parent
15ade3b715
commit
3c902fd67f
|
@ -48,7 +48,7 @@ RedirectCopyDataToRegularFile(const char *filename)
|
||||||
const int fileMode = (S_IRUSR | S_IWUSR);
|
const int fileMode = (S_IRUSR | S_IWUSR);
|
||||||
off_t offset = 0;
|
off_t offset = 0;
|
||||||
|
|
||||||
fileDesc = FileOpenForTransmit(filename, fileFlags, fileMode);
|
fileDesc = open(filename, fileFlags, fileMode);
|
||||||
|
|
||||||
SendCopyInStart();
|
SendCopyInStart();
|
||||||
|
|
||||||
|
@ -58,8 +58,7 @@ RedirectCopyDataToRegularFile(const char *filename)
|
||||||
/* if received data has contents, append to regular file */
|
/* if received data has contents, append to regular file */
|
||||||
if (copyData->len > 0)
|
if (copyData->len > 0)
|
||||||
{
|
{
|
||||||
int appended = FileWriteCompat(fileDesc, copyData->data, copyData->len,
|
int appended = write(fileDesc, copyData->data, copyData->len);
|
||||||
offset, PG_WAIT_IO);
|
|
||||||
|
|
||||||
if (appended != copyData->len)
|
if (appended != copyData->len)
|
||||||
{
|
{
|
||||||
|
@ -75,7 +74,7 @@ RedirectCopyDataToRegularFile(const char *filename)
|
||||||
}
|
}
|
||||||
|
|
||||||
FreeStringInfo(copyData);
|
FreeStringInfo(copyData);
|
||||||
FileClose(fileDesc);
|
close(fileDesc);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -96,7 +95,7 @@ SendRegularFile(const char *filename)
|
||||||
off_t offset = 0;
|
off_t offset = 0;
|
||||||
|
|
||||||
/* we currently do not check if the caller has permissions for this file */
|
/* we currently do not check if the caller has permissions for this file */
|
||||||
fileDesc = FileOpenForTransmit(filename, fileFlags, fileMode);
|
fileDesc = open(filename, fileFlags, fileMode);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We read file's contents into buffers of 32 KB. This buffer size is twice
|
* We read file's contents into buffers of 32 KB. This buffer size is twice
|
||||||
|
@ -107,8 +106,7 @@ SendRegularFile(const char *filename)
|
||||||
|
|
||||||
SendCopyOutStart();
|
SendCopyOutStart();
|
||||||
|
|
||||||
readBytes = FileReadCompat(fileDesc, fileBuffer->data, fileBufferSize, offset,
|
readBytes = read(fileDesc, fileBuffer->data, fileBufferSize);
|
||||||
PG_WAIT_IO);
|
|
||||||
while (readBytes > 0)
|
while (readBytes > 0)
|
||||||
{
|
{
|
||||||
fileBuffer->len = readBytes;
|
fileBuffer->len = readBytes;
|
||||||
|
@ -116,15 +114,14 @@ SendRegularFile(const char *filename)
|
||||||
SendCopyData(fileBuffer);
|
SendCopyData(fileBuffer);
|
||||||
|
|
||||||
resetStringInfo(fileBuffer);
|
resetStringInfo(fileBuffer);
|
||||||
readBytes = FileReadCompat(fileDesc, fileBuffer->data, fileBufferSize,
|
readBytes = read(fileDesc, fileBuffer->data, fileBufferSize);
|
||||||
offset, PG_WAIT_IO);
|
|
||||||
offset += readBytes;
|
offset += readBytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
SendCopyDone();
|
SendCopyDone();
|
||||||
|
|
||||||
FreeStringInfo(fileBuffer);
|
FreeStringInfo(fileBuffer);
|
||||||
FileClose(fileDesc);
|
close(fileDesc);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -265,7 +265,7 @@ RemoteFileDestReceiverStartup(DestReceiver *dest, int operation,
|
||||||
|
|
||||||
elog(DEBUG1, "writing to local file \"%s\"", fileName);
|
elog(DEBUG1, "writing to local file \"%s\"", fileName);
|
||||||
|
|
||||||
resultDest->fileDesc = FileOpenForTransmit(fileName, fileFlags, fileMode);
|
resultDest->fileDesc = open(fileName, fileFlags, fileMode);
|
||||||
resultDest->offset = 0;
|
resultDest->offset = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -416,9 +416,7 @@ RemoteFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
|
||||||
static void
|
static void
|
||||||
WriteToLocalFile(StringInfo copyData, RemoteFileDestReceiver *fileDest)
|
WriteToLocalFile(StringInfo copyData, RemoteFileDestReceiver *fileDest)
|
||||||
{
|
{
|
||||||
int bytesWritten = FileWriteCompat(fileDest->fileDesc, copyData->data, copyData->len,
|
int bytesWritten = write(fileDest->fileDesc, copyData->data, copyData->len);
|
||||||
fileDest->offset,
|
|
||||||
PG_WAIT_IO);
|
|
||||||
if (bytesWritten < 0)
|
if (bytesWritten < 0)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode_for_file_access(),
|
ereport(ERROR, (errcode_for_file_access(),
|
||||||
|
@ -460,7 +458,7 @@ RemoteFileDestReceiverShutdown(DestReceiver *destReceiver)
|
||||||
|
|
||||||
if (resultDest->writeLocalFile)
|
if (resultDest->writeLocalFile)
|
||||||
{
|
{
|
||||||
FileClose(resultDest->fileDesc);
|
close(resultDest->fileDesc);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -474,7 +474,7 @@ OpenPartitionFiles(StringInfo directoryName, uint32 fileCount)
|
||||||
{
|
{
|
||||||
StringInfo filePath = UserPartitionFilename(directoryName, fileIndex);
|
StringInfo filePath = UserPartitionFilename(directoryName, fileIndex);
|
||||||
|
|
||||||
fileDescriptor = PathNameOpenFilePerm(filePath->data, fileFlags, fileMode);
|
fileDescriptor = open(filePath->data, fileFlags, fileMode);
|
||||||
if (fileDescriptor < 0)
|
if (fileDescriptor < 0)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode_for_file_access(),
|
ereport(ERROR, (errcode_for_file_access(),
|
||||||
|
@ -505,7 +505,7 @@ ClosePartitionFiles(FileOutputStream *partitionFileArray, uint32 fileCount)
|
||||||
|
|
||||||
FileOutputStreamFlush(partitionFile);
|
FileOutputStreamFlush(partitionFile);
|
||||||
|
|
||||||
FileClose(partitionFile->fileDescriptor);
|
close(partitionFile->fileDescriptor);
|
||||||
FreeStringInfo(partitionFile->fileBuffer);
|
FreeStringInfo(partitionFile->fileBuffer);
|
||||||
FreeStringInfo(partitionFile->filePath);
|
FreeStringInfo(partitionFile->filePath);
|
||||||
}
|
}
|
||||||
|
@ -854,8 +854,7 @@ FileOutputStreamFlush(FileOutputStream *file)
|
||||||
int written = 0;
|
int written = 0;
|
||||||
|
|
||||||
errno = 0;
|
errno = 0;
|
||||||
written = FileWriteCompat(file->fileDescriptor, fileBuffer->data, fileBuffer->len,
|
written = write(file->fileDescriptor, fileBuffer->data, fileBuffer->len);
|
||||||
file->offset, PG_WAIT_IO);
|
|
||||||
if (written != fileBuffer->len)
|
if (written != fileBuffer->len)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode_for_file_access(),
|
ereport(ERROR, (errcode_for_file_access(),
|
||||||
|
|
|
@ -185,8 +185,7 @@ TaskFileDestReceiverStartup(DestReceiver *dest, int operation,
|
||||||
taskFileDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor,
|
taskFileDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor,
|
||||||
copyOutState->binary);
|
copyOutState->binary);
|
||||||
|
|
||||||
taskFileDest->fileDesc = FileOpenForTransmit(taskFileDest->filePath, fileFlags,
|
taskFileDest->fileDesc = open(taskFileDest->filePath, fileFlags, fileMode);
|
||||||
fileMode);
|
|
||||||
taskFileDest->offset = 0;
|
taskFileDest->offset = 0;
|
||||||
|
|
||||||
if (copyOutState->binary)
|
if (copyOutState->binary)
|
||||||
|
@ -254,9 +253,8 @@ TaskFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
|
||||||
static void
|
static void
|
||||||
WriteToLocalFile(StringInfo copyData, TaskFileDestReceiver *taskFileDest)
|
WriteToLocalFile(StringInfo copyData, TaskFileDestReceiver *taskFileDest)
|
||||||
{
|
{
|
||||||
int bytesWritten = FileWriteCompat(taskFileDest->fileDesc, copyData->data,
|
int bytesWritten = write(taskFileDest->fileDesc, copyData->data,
|
||||||
copyData->len, taskFileDest->offset,
|
copyData->len);
|
||||||
PG_WAIT_IO);
|
|
||||||
if (bytesWritten < 0)
|
if (bytesWritten < 0)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode_for_file_access(),
|
ereport(ERROR, (errcode_for_file_access(),
|
||||||
|
@ -286,7 +284,7 @@ TaskFileDestReceiverShutdown(DestReceiver *destReceiver)
|
||||||
WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest);
|
WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest);
|
||||||
}
|
}
|
||||||
|
|
||||||
FileClose(taskFileDest->fileDesc);
|
close(taskFileDest->fileDesc);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue