mirror of https://github.com/citusdata/citus.git
Use better naming
parent
14e71b1b0a
commit
31ab5012fd
|
@ -125,7 +125,7 @@ static void OpenCopyTransactions(CopyStmt *copyStatement,
|
||||||
ShardConnections *shardConnections,
|
ShardConnections *shardConnections,
|
||||||
int64 shardId);
|
int64 shardId);
|
||||||
static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId);
|
static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId);
|
||||||
static void SendCopyDataToPlacements(StringInfo lineBuf,
|
static void SendCopyDataToPlacements(StringInfo dataBuffer,
|
||||||
ShardConnections *shardConnections);
|
ShardConnections *shardConnections);
|
||||||
static List * ConnectionList(HTAB *connectionHash);
|
static List * ConnectionList(HTAB *connectionHash);
|
||||||
static void EndRemoteCopy(List *connectionList, bool stopOnFailure);
|
static void EndRemoteCopy(List *connectionList, bool stopOnFailure);
|
||||||
|
@ -361,13 +361,13 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
|
||||||
|
|
||||||
OpenCopyTransactions(copyStatement, shardConnections, shardId);
|
OpenCopyTransactions(copyStatement, shardConnections, shardId);
|
||||||
|
|
||||||
CopySendBinaryHeaders(copyOutState);
|
BuildCopyBinaryHeaders(copyOutState);
|
||||||
SendCopyDataToPlacements(copyOutState->fe_msgbuf, shardConnections);
|
SendCopyDataToPlacements(copyOutState->fe_msgbuf, shardConnections);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Replicate row to all shard placements */
|
/* Replicate row to all shard placements */
|
||||||
CopySendRow(columnValues, columnNulls, tupleDescriptor, copyOutState,
|
BuildCopyRowData(columnValues, columnNulls, tupleDescriptor,
|
||||||
columnOutputFunctions);
|
copyOutState, columnOutputFunctions);
|
||||||
SendCopyDataToPlacements(copyOutState->fe_msgbuf, shardConnections);
|
SendCopyDataToPlacements(copyOutState->fe_msgbuf, shardConnections);
|
||||||
|
|
||||||
processedRowCount += 1;
|
processedRowCount += 1;
|
||||||
|
@ -379,7 +379,7 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
|
||||||
shardConnections = (ShardConnections *) hash_seq_search(&status);
|
shardConnections = (ShardConnections *) hash_seq_search(&status);
|
||||||
while (shardConnections != NULL)
|
while (shardConnections != NULL)
|
||||||
{
|
{
|
||||||
CopySendBinaryFooters(copyOutState);
|
BuildCopyBinaryFooters(copyOutState);
|
||||||
SendCopyDataToPlacements(copyOutState->fe_msgbuf, shardConnections);
|
SendCopyDataToPlacements(copyOutState->fe_msgbuf, shardConnections);
|
||||||
|
|
||||||
shardConnections = (ShardConnections *) hash_seq_search(&status);
|
shardConnections = (ShardConnections *) hash_seq_search(&status);
|
||||||
|
@ -732,7 +732,7 @@ ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId)
|
||||||
* a shard.
|
* a shard.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
SendCopyDataToPlacements(StringInfo lineBuf, ShardConnections *shardConnections)
|
SendCopyDataToPlacements(StringInfo dataBuffer, ShardConnections *shardConnections)
|
||||||
{
|
{
|
||||||
ListCell *connectionCell = NULL;
|
ListCell *connectionCell = NULL;
|
||||||
foreach(connectionCell, shardConnections->connectionList)
|
foreach(connectionCell, shardConnections->connectionList)
|
||||||
|
@ -743,7 +743,7 @@ SendCopyDataToPlacements(StringInfo lineBuf, ShardConnections *shardConnections)
|
||||||
int64 shardId = shardConnections->shardId;
|
int64 shardId = shardConnections->shardId;
|
||||||
|
|
||||||
/* copy the line buffer into the placement */
|
/* copy the line buffer into the placement */
|
||||||
int copyResult = PQputCopyData(connection, lineBuf->data, lineBuf->len);
|
int copyResult = PQputCopyData(connection, dataBuffer->data, dataBuffer->len);
|
||||||
if (copyResult != 1)
|
if (copyResult != 1)
|
||||||
{
|
{
|
||||||
char *nodeName = ConnectionGetOptionValue(connection, "host");
|
char *nodeName = ConnectionGetOptionValue(connection, "host");
|
||||||
|
@ -911,7 +911,7 @@ ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CopySendRow serializes one row using the column output functions,
|
* BuildCopyRowData serializes one row using the column output functions,
|
||||||
* and appends the data to the row output state object's message buffer.
|
* and appends the data to the row output state object's message buffer.
|
||||||
* This function is modeled after the CopyOneRowTo() function in
|
* This function is modeled after the CopyOneRowTo() function in
|
||||||
* commands/copy.c, but only implements a subset of that functionality.
|
* commands/copy.c, but only implements a subset of that functionality.
|
||||||
|
@ -919,7 +919,7 @@ ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat)
|
||||||
* to not bloat memory usage.
|
* to not bloat memory usage.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
CopySendRow(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor,
|
BuildCopyRowData(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor,
|
||||||
CopyOutState rowOutputState, FmgrInfo *columnOutputFunctions)
|
CopyOutState rowOutputState, FmgrInfo *columnOutputFunctions)
|
||||||
{
|
{
|
||||||
MemoryContext oldContext = NULL;
|
MemoryContext oldContext = NULL;
|
||||||
|
@ -997,7 +997,7 @@ CopySendRow(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor,
|
||||||
|
|
||||||
/* Append binary headers to the copy buffer in headerOutputState. */
|
/* Append binary headers to the copy buffer in headerOutputState. */
|
||||||
void
|
void
|
||||||
CopySendBinaryHeaders(CopyOutState headerOutputState)
|
BuildCopyBinaryHeaders(CopyOutState headerOutputState)
|
||||||
{
|
{
|
||||||
const int32 zero = 0;
|
const int32 zero = 0;
|
||||||
|
|
||||||
|
@ -1016,7 +1016,7 @@ CopySendBinaryHeaders(CopyOutState headerOutputState)
|
||||||
|
|
||||||
/* Append binary footers to the copy buffer in footerOutputState. */
|
/* Append binary footers to the copy buffer in footerOutputState. */
|
||||||
void
|
void
|
||||||
CopySendBinaryFooters(CopyOutState footerOutputState)
|
BuildCopyBinaryFooters(CopyOutState footerOutputState)
|
||||||
{
|
{
|
||||||
int16 negative = -1;
|
int16 negative = -1;
|
||||||
|
|
||||||
|
|
|
@ -814,8 +814,8 @@ FilterAndPartitionTable(const char *filterQuery,
|
||||||
/* deconstruct the tuple; this is faster than repeated heap_getattr */
|
/* deconstruct the tuple; this is faster than repeated heap_getattr */
|
||||||
heap_deform_tuple(row, rowDescriptor, valueArray, isNullArray);
|
heap_deform_tuple(row, rowDescriptor, valueArray, isNullArray);
|
||||||
|
|
||||||
CopySendRow(valueArray, isNullArray, rowDescriptor, rowOutputState,
|
BuildCopyRowData(valueArray, isNullArray, rowDescriptor,
|
||||||
columnOutputFunctions);
|
rowOutputState, columnOutputFunctions);
|
||||||
rowText = rowOutputState->fe_msgbuf;
|
rowText = rowOutputState->fe_msgbuf;
|
||||||
|
|
||||||
partitionFile = partitionFileArray[partitionId];
|
partitionFile = partitionFileArray[partitionId];
|
||||||
|
@ -973,7 +973,7 @@ OutputBinaryHeaders(FileOutputStream *partitionFileArray, uint32 fileCount)
|
||||||
memset(headerOutputState, 0, sizeof(CopyOutStateData));
|
memset(headerOutputState, 0, sizeof(CopyOutStateData));
|
||||||
headerOutputState->fe_msgbuf = makeStringInfo();
|
headerOutputState->fe_msgbuf = makeStringInfo();
|
||||||
|
|
||||||
CopySendBinaryHeaders(headerOutputState);
|
BuildCopyBinaryHeaders(headerOutputState);
|
||||||
|
|
||||||
partitionFile = partitionFileArray[fileIndex];
|
partitionFile = partitionFileArray[fileIndex];
|
||||||
FileOutputStreamWrite(partitionFile, headerOutputState->fe_msgbuf);
|
FileOutputStreamWrite(partitionFile, headerOutputState->fe_msgbuf);
|
||||||
|
@ -999,7 +999,7 @@ OutputBinaryFooters(FileOutputStream *partitionFileArray, uint32 fileCount)
|
||||||
memset(footerOutputState, 0, sizeof(CopyOutStateData));
|
memset(footerOutputState, 0, sizeof(CopyOutStateData));
|
||||||
footerOutputState->fe_msgbuf = makeStringInfo();
|
footerOutputState->fe_msgbuf = makeStringInfo();
|
||||||
|
|
||||||
CopySendBinaryFooters(footerOutputState);
|
BuildCopyBinaryFooters(footerOutputState);
|
||||||
|
|
||||||
partitionFile = partitionFileArray[fileIndex];
|
partitionFile = partitionFileArray[fileIndex];
|
||||||
FileOutputStreamWrite(partitionFile, footerOutputState->fe_msgbuf);
|
FileOutputStreamWrite(partitionFile, footerOutputState->fe_msgbuf);
|
||||||
|
|
|
@ -46,10 +46,10 @@ typedef struct CopyOutStateData *CopyOutState;
|
||||||
|
|
||||||
/* function declarations for copying into a distributed table */
|
/* function declarations for copying into a distributed table */
|
||||||
extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat);
|
extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat);
|
||||||
extern void CopySendRow(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor,
|
extern void BuildCopyRowData(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor,
|
||||||
CopyOutState rowOutputState, FmgrInfo *columnOutputFunctions);
|
CopyOutState rowOutputState, FmgrInfo *columnOutputFunctions);
|
||||||
extern void CopySendBinaryHeaders(CopyOutState headerOutputState);
|
extern void BuildCopyBinaryHeaders(CopyOutState headerOutputState);
|
||||||
extern void CopySendBinaryFooters(CopyOutState footerOutputState);
|
extern void BuildCopyBinaryFooters(CopyOutState footerOutputState);
|
||||||
extern void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag);
|
extern void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag);
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue