mirror of https://github.com/citusdata/citus.git
Fix undefined connectionList bug and code style improvements
parent
24cad431f4
commit
168be97c36
|
@ -251,7 +251,7 @@ static ShardInterval * SearchCachedShardInterval(Datum partitionColumnValue,
|
|||
ShardInterval **shardIntervalCache,
|
||||
int shardCount,
|
||||
FmgrInfo *compareFunction);
|
||||
static void OpenShardConnections(CopyStmt *copyStatement,
|
||||
static void OpenCopyTransactions(CopyStmt *copyStatement,
|
||||
ShardConnections *shardConnections,
|
||||
int64 shardId);
|
||||
static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId);
|
||||
|
@ -481,7 +481,11 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
|
|||
&found);
|
||||
if (!found)
|
||||
{
|
||||
OpenShardConnections(copyStatement, shardConnections, shardId);
|
||||
/* intialize COPY transactions on shard placements */
|
||||
shardConnections->shardId = shardId;
|
||||
shardConnections->connectionList = NIL;
|
||||
|
||||
OpenCopyTransactions(copyStatement, shardConnections, shardId);
|
||||
}
|
||||
|
||||
/* get the (truncated) line buffer */
|
||||
|
@ -508,7 +512,6 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
|
|||
PG_CATCH();
|
||||
{
|
||||
EndCopyFrom(copyState);
|
||||
heap_close(rel, AccessShareLock);
|
||||
|
||||
/* roll back all transactions */
|
||||
connectionList = ConnectionList(shardConnectionHash);
|
||||
|
@ -521,21 +524,11 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
|
|||
PG_END_TRY();
|
||||
|
||||
EndCopyFrom(copyState);
|
||||
heap_close(rel, AccessShareLock);
|
||||
heap_close(rel, NoLock);
|
||||
|
||||
error_context_stack = errorCallback.previous;
|
||||
|
||||
if (QueryCancelPending)
|
||||
{
|
||||
AbortTransactions(connectionList);
|
||||
ereport(ERROR, (errcode(ERRCODE_QUERY_CANCELED),
|
||||
errmsg("canceling statement due to user request")));
|
||||
}
|
||||
else
|
||||
{
|
||||
CommitTransactions(connectionList);
|
||||
}
|
||||
|
||||
CommitTransactions(connectionList);
|
||||
CloseConnections(connectionList);
|
||||
|
||||
if (completionTag != NULL)
|
||||
|
@ -561,7 +554,7 @@ CreateShardConnectionHash(void)
|
|||
info.entrysize = sizeof(ShardConnections);
|
||||
info.hash = tag_hash;
|
||||
|
||||
shardConnectionsHash = hash_create("shardConnectionHash",
|
||||
shardConnectionsHash = hash_create("Shard Connections Hash",
|
||||
INITIAL_CONNECTION_CACHE_SIZE, &info,
|
||||
HASH_ELEM | HASH_FUNCTION);
|
||||
|
||||
|
@ -739,12 +732,13 @@ SearchCachedShardInterval(Datum partitionColumnValue, ShardInterval **shardInter
|
|||
|
||||
|
||||
/*
|
||||
* OpenShardConnections opens a connection for each placement of a shard. If a
|
||||
* connection cannot be opened, the shard placement is marked as inactive and
|
||||
* the COPY continues with the remaining shard placements.
|
||||
* OpenCopyTransactions opens a connection for each placement of a shard and
|
||||
* starts a COPY transaction. If a connection cannot be opened, then the shard
|
||||
* placement is marked as inactive and the COPY continues with the remaining
|
||||
* shard placements.
|
||||
*/
|
||||
static void
|
||||
OpenShardConnections(CopyStmt *copyStatement, ShardConnections *shardConnections,
|
||||
OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections,
|
||||
int64 shardId)
|
||||
{
|
||||
List *finalizedPlacementList = NIL;
|
||||
|
@ -816,7 +810,6 @@ OpenShardConnections(CopyStmt *copyStatement, ShardConnections *shardConnections
|
|||
failedPlacement->nodeName, failedPlacement->nodePort);
|
||||
}
|
||||
|
||||
shardConnections->shardId = shardId;
|
||||
shardConnections->connectionList = connectionList;
|
||||
}
|
||||
|
||||
|
@ -966,14 +959,8 @@ ConnectionList(HTAB *connectionHash)
|
|||
shardConnections = (ShardConnections *) hash_seq_search(&status);
|
||||
while (shardConnections != NULL)
|
||||
{
|
||||
ListCell *connectionCell = NULL;
|
||||
foreach(connectionCell, shardConnections->connectionList)
|
||||
{
|
||||
TransactionConnection *transactionConnection =
|
||||
(TransactionConnection *) lfirst(connectionCell);
|
||||
|
||||
connectionList = lappend(connectionList, transactionConnection);
|
||||
}
|
||||
List *shardConnectionsList = list_copy(shardConnections->connectionList);
|
||||
connectionList = list_concat(connectionList, shardConnectionsList);
|
||||
|
||||
shardConnections = (ShardConnections *) hash_seq_search(&status);
|
||||
}
|
||||
|
@ -985,7 +972,7 @@ ConnectionList(HTAB *connectionHash)
|
|||
/*
|
||||
* EndRemoteCopy ends the COPY input on all connections. If stopOnFailure
|
||||
* is true, then EndRemoteCopy reports an error on failure, otherwise it
|
||||
* reports a warning.
|
||||
* reports a warning or continues.
|
||||
*/
|
||||
static void
|
||||
EndRemoteCopy(List *connectionList, bool stopOnFailure)
|
||||
|
|
Loading…
Reference in New Issue