mirror of https://github.com/citusdata/citus.git
Send binary footers to all shards
parent
7afac2a377
commit
627b042991
|
@ -154,6 +154,8 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
|
|||
char partitionMethod = '\0';
|
||||
Var *partitionColumn = NULL;
|
||||
HTAB *shardConnectionHash = NULL;
|
||||
HASH_SEQ_STATUS status;
|
||||
ShardConnections *shardConnections = NULL;
|
||||
List *connectionList = NIL;
|
||||
MemoryContext tupleContext = NULL;
|
||||
CopyState copyState = NULL;
|
||||
|
@ -170,7 +172,6 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
|
|||
int shardCount = 0;
|
||||
uint64 processedRowCount = 0;
|
||||
ErrorContextCallback errorCallback;
|
||||
ShardConnections *shardConnections = NULL;
|
||||
CopyOutState copyOutState = NULL;
|
||||
FmgrInfo *columnOutputFunctions = NULL;
|
||||
|
||||
|
@ -379,10 +380,16 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
|
|||
MemoryContextReset(tupleContext);
|
||||
}
|
||||
|
||||
if (shardConnections != NULL)
|
||||
/* send binary footers to all shards */
|
||||
hash_seq_init(&status, shardConnectionHash);
|
||||
|
||||
shardConnections = (ShardConnections *) hash_seq_search(&status);
|
||||
while (shardConnections != NULL)
|
||||
{
|
||||
CopySendBinaryFooters(copyOutState);
|
||||
SendCopyDataToPlacements(copyOutState->fe_msgbuf, shardConnections);
|
||||
|
||||
shardConnections = (ShardConnections *) hash_seq_search(&status);
|
||||
}
|
||||
|
||||
connectionList = ConnectionList(shardConnectionHash);
|
||||
|
|
Loading…
Reference in New Issue