Send binary footers to all shards

pull/390/head
Metin Doslu 2016-03-21 10:05:59 -07:00
parent 87fd1355a7
commit 566a9462b8
1 changed files with 9 additions and 2 deletions

View File

@ -154,6 +154,8 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
char partitionMethod = '\0'; char partitionMethod = '\0';
Var *partitionColumn = NULL; Var *partitionColumn = NULL;
HTAB *shardConnectionHash = NULL; HTAB *shardConnectionHash = NULL;
HASH_SEQ_STATUS status;
ShardConnections *shardConnections = NULL;
List *connectionList = NIL; List *connectionList = NIL;
MemoryContext tupleContext = NULL; MemoryContext tupleContext = NULL;
CopyState copyState = NULL; CopyState copyState = NULL;
@ -170,7 +172,6 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
int shardCount = 0; int shardCount = 0;
uint64 processedRowCount = 0; uint64 processedRowCount = 0;
ErrorContextCallback errorCallback; ErrorContextCallback errorCallback;
ShardConnections *shardConnections = NULL;
CopyOutState copyOutState = NULL; CopyOutState copyOutState = NULL;
FmgrInfo *columnOutputFunctions = NULL; FmgrInfo *columnOutputFunctions = NULL;
@ -379,10 +380,16 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
MemoryContextReset(tupleContext); MemoryContextReset(tupleContext);
} }
if (shardConnections != NULL) /* send binary footers to all shards */
hash_seq_init(&status, shardConnectionHash);
shardConnections = (ShardConnections *) hash_seq_search(&status);
while (shardConnections != NULL)
{ {
CopySendBinaryFooters(copyOutState); CopySendBinaryFooters(copyOutState);
SendCopyDataToPlacements(copyOutState->fe_msgbuf, shardConnections); SendCopyDataToPlacements(copyOutState->fe_msgbuf, shardConnections);
shardConnections = (ShardConnections *) hash_seq_search(&status);
} }
connectionList = ConnectionList(shardConnectionHash); connectionList = ConnectionList(shardConnectionHash);