mirror of https://github.com/citusdata/citus.git
Move the logic that initilize connections/local files into a function
parent
9b29a32d7a
commit
721daec9a5
|
@ -82,6 +82,7 @@ typedef struct RemoteFileDestReceiver
|
||||||
|
|
||||||
static void RemoteFileDestReceiverStartup(DestReceiver *dest, int operation,
|
static void RemoteFileDestReceiverStartup(DestReceiver *dest, int operation,
|
||||||
TupleDesc inputTupleDescriptor);
|
TupleDesc inputTupleDescriptor);
|
||||||
|
static void PrepareIntermediateResultBroadcast(RemoteFileDestReceiver *resultDest);
|
||||||
static StringInfo ConstructCopyResultStatement(const char *resultId);
|
static StringInfo ConstructCopyResultStatement(const char *resultId);
|
||||||
static void WriteToLocalFile(StringInfo copyData, FileCompat *fileCompat);
|
static void WriteToLocalFile(StringInfo copyData, FileCompat *fileCompat);
|
||||||
static bool RemoteFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest);
|
static bool RemoteFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest);
|
||||||
|
@ -233,14 +234,9 @@ RemoteFileDestReceiverStartup(DestReceiver *dest, int operation,
|
||||||
{
|
{
|
||||||
RemoteFileDestReceiver *resultDest = (RemoteFileDestReceiver *) dest;
|
RemoteFileDestReceiver *resultDest = (RemoteFileDestReceiver *) dest;
|
||||||
|
|
||||||
const char *resultId = resultDest->resultId;
|
|
||||||
|
|
||||||
const char *delimiterCharacter = "\t";
|
const char *delimiterCharacter = "\t";
|
||||||
const char *nullPrintCharacter = "\\N";
|
const char *nullPrintCharacter = "\\N";
|
||||||
|
|
||||||
List *initialNodeList = resultDest->initialNodeList;
|
|
||||||
List *connectionList = NIL;
|
|
||||||
|
|
||||||
resultDest->tupleDescriptor = inputTupleDescriptor;
|
resultDest->tupleDescriptor = inputTupleDescriptor;
|
||||||
|
|
||||||
/* define how tuples will be serialised */
|
/* define how tuples will be serialised */
|
||||||
|
@ -256,6 +252,23 @@ RemoteFileDestReceiverStartup(DestReceiver *dest, int operation,
|
||||||
resultDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor,
|
resultDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor,
|
||||||
copyOutState->binary);
|
copyOutState->binary);
|
||||||
|
|
||||||
|
PrepareIntermediateResultBroadcast(resultDest);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* PrepareIntermediateResultBroadcast gets a RemoteFileDestReceiver and does
|
||||||
|
* the necessary initilizations including initiating the remote connnections
|
||||||
|
* and creating the local file, which is necessary (it might be both).
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
PrepareIntermediateResultBroadcast(RemoteFileDestReceiver *resultDest)
|
||||||
|
{
|
||||||
|
List *initialNodeList = resultDest->initialNodeList;
|
||||||
|
const char *resultId = resultDest->resultId;
|
||||||
|
List *connectionList = NIL;
|
||||||
|
CopyOutState copyOutState = resultDest->copyOutState;
|
||||||
|
|
||||||
if (resultDest->writeLocalFile)
|
if (resultDest->writeLocalFile)
|
||||||
{
|
{
|
||||||
const int fileFlags = (O_APPEND | O_CREAT | O_RDWR | O_TRUNC | PG_BINARY);
|
const int fileFlags = (O_APPEND | O_CREAT | O_RDWR | O_TRUNC | PG_BINARY);
|
||||||
|
|
Loading…
Reference in New Issue