Fix undefined connectionList bug and code style improvements

pull/366/head
Marco Slot 2016-03-10 21:23:01 +01:00 committed by Metin Doslu
parent ae7a42d957
commit f7dfc7df1b
1 changed files with 17 additions and 30 deletions

View File

@ -251,7 +251,7 @@ static ShardInterval * SearchCachedShardInterval(Datum partitionColumnValue,
ShardInterval **shardIntervalCache,
int shardCount,
FmgrInfo *compareFunction);
static void OpenShardConnections(CopyStmt *copyStatement,
static void OpenCopyTransactions(CopyStmt *copyStatement,
ShardConnections *shardConnections,
int64 shardId);
static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId);
@ -481,7 +481,11 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
&found);
if (!found)
{
OpenShardConnections(copyStatement, shardConnections, shardId);
/* intialize COPY transactions on shard placements */
shardConnections->shardId = shardId;
shardConnections->connectionList = NIL;
OpenCopyTransactions(copyStatement, shardConnections, shardId);
}
/* get the (truncated) line buffer */
@ -508,7 +512,6 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
PG_CATCH();
{
EndCopyFrom(copyState);
heap_close(rel, AccessShareLock);
/* roll back all transactions */
connectionList = ConnectionList(shardConnectionHash);
@ -521,21 +524,11 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
PG_END_TRY();
EndCopyFrom(copyState);
heap_close(rel, AccessShareLock);
heap_close(rel, NoLock);
error_context_stack = errorCallback.previous;
if (QueryCancelPending)
{
AbortTransactions(connectionList);
ereport(ERROR, (errcode(ERRCODE_QUERY_CANCELED),
errmsg("canceling statement due to user request")));
}
else
{
CommitTransactions(connectionList);
}
CloseConnections(connectionList);
if (completionTag != NULL)
@ -561,7 +554,7 @@ CreateShardConnectionHash(void)
info.entrysize = sizeof(ShardConnections);
info.hash = tag_hash;
shardConnectionsHash = hash_create("shardConnectionHash",
shardConnectionsHash = hash_create("Shard Connections Hash",
INITIAL_CONNECTION_CACHE_SIZE, &info,
HASH_ELEM | HASH_FUNCTION);
@ -739,12 +732,13 @@ SearchCachedShardInterval(Datum partitionColumnValue, ShardInterval **shardInter
/*
* OpenShardConnections opens a connection for each placement of a shard. If a
* connection cannot be opened, the shard placement is marked as inactive and
* the COPY continues with the remaining shard placements.
* OpenCopyTransactions opens a connection for each placement of a shard and
* starts a COPY transaction. If a connection cannot be opened, then the shard
* placement is marked as inactive and the COPY continues with the remaining
* shard placements.
*/
static void
OpenShardConnections(CopyStmt *copyStatement, ShardConnections *shardConnections,
OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections,
int64 shardId)
{
List *finalizedPlacementList = NIL;
@ -816,7 +810,6 @@ OpenShardConnections(CopyStmt *copyStatement, ShardConnections *shardConnections
failedPlacement->nodeName, failedPlacement->nodePort);
}
shardConnections->shardId = shardId;
shardConnections->connectionList = connectionList;
}
@ -966,14 +959,8 @@ ConnectionList(HTAB *connectionHash)
shardConnections = (ShardConnections *) hash_seq_search(&status);
while (shardConnections != NULL)
{
ListCell *connectionCell = NULL;
foreach(connectionCell, shardConnections->connectionList)
{
TransactionConnection *transactionConnection =
(TransactionConnection *) lfirst(connectionCell);
connectionList = lappend(connectionList, transactionConnection);
}
List *shardConnectionsList = list_copy(shardConnections->connectionList);
connectionList = list_concat(connectionList, shardConnectionsList);
shardConnections = (ShardConnections *) hash_seq_search(&status);
}
@ -985,7 +972,7 @@ ConnectionList(HTAB *connectionHash)
/*
* EndRemoteCopy ends the COPY input on all connections. If stopOnFailure
* is true, then EndRemoteCopy reports an error on failure, otherwise it
* reports a warning.
* reports a warning or continues.
*/
static void
EndRemoteCopy(List *connectionList, bool stopOnFailure)