diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 2cbe3cce9..c1c05d4d0 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -251,7 +251,7 @@ static ShardInterval * SearchCachedShardInterval(Datum partitionColumnValue, ShardInterval **shardIntervalCache, int shardCount, FmgrInfo *compareFunction); -static void OpenShardConnections(CopyStmt *copyStatement, +static void OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections, int64 shardId); static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId); @@ -481,7 +481,11 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) &found); if (!found) { - OpenShardConnections(copyStatement, shardConnections, shardId); + /* intialize COPY transactions on shard placements */ + shardConnections->shardId = shardId; + shardConnections->connectionList = NIL; + + OpenCopyTransactions(copyStatement, shardConnections, shardId); } /* get the (truncated) line buffer */ @@ -508,7 +512,6 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) PG_CATCH(); { EndCopyFrom(copyState); - heap_close(rel, AccessShareLock); /* roll back all transactions */ connectionList = ConnectionList(shardConnectionHash); @@ -521,21 +524,11 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) PG_END_TRY(); EndCopyFrom(copyState); - heap_close(rel, AccessShareLock); + heap_close(rel, NoLock); error_context_stack = errorCallback.previous; - if (QueryCancelPending) - { - AbortTransactions(connectionList); - ereport(ERROR, (errcode(ERRCODE_QUERY_CANCELED), - errmsg("canceling statement due to user request"))); - } - else - { - CommitTransactions(connectionList); - } - + CommitTransactions(connectionList); CloseConnections(connectionList); if (completionTag != NULL) @@ -561,7 +554,7 @@ CreateShardConnectionHash(void) info.entrysize = sizeof(ShardConnections); info.hash = tag_hash; - shardConnectionsHash = hash_create("shardConnectionHash", + shardConnectionsHash = hash_create("Shard Connections Hash", INITIAL_CONNECTION_CACHE_SIZE, &info, HASH_ELEM | HASH_FUNCTION); @@ -739,12 +732,13 @@ SearchCachedShardInterval(Datum partitionColumnValue, ShardInterval **shardInter /* - * OpenShardConnections opens a connection for each placement of a shard. If a - * connection cannot be opened, the shard placement is marked as inactive and - * the COPY continues with the remaining shard placements. + * OpenCopyTransactions opens a connection for each placement of a shard and + * starts a COPY transaction. If a connection cannot be opened, then the shard + * placement is marked as inactive and the COPY continues with the remaining + * shard placements. */ static void -OpenShardConnections(CopyStmt *copyStatement, ShardConnections *shardConnections, +OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections, int64 shardId) { List *finalizedPlacementList = NIL; @@ -816,7 +810,6 @@ OpenShardConnections(CopyStmt *copyStatement, ShardConnections *shardConnections failedPlacement->nodeName, failedPlacement->nodePort); } - shardConnections->shardId = shardId; shardConnections->connectionList = connectionList; } @@ -966,14 +959,8 @@ ConnectionList(HTAB *connectionHash) shardConnections = (ShardConnections *) hash_seq_search(&status); while (shardConnections != NULL) { - ListCell *connectionCell = NULL; - foreach(connectionCell, shardConnections->connectionList) - { - TransactionConnection *transactionConnection = - (TransactionConnection *) lfirst(connectionCell); - - connectionList = lappend(connectionList, transactionConnection); - } + List *shardConnectionsList = list_copy(shardConnections->connectionList); + connectionList = list_concat(connectionList, shardConnectionsList); shardConnections = (ShardConnections *) hash_seq_search(&status); } @@ -985,7 +972,7 @@ ConnectionList(HTAB *connectionHash) /* * EndRemoteCopy ends the COPY input on all connections. If stopOnFailure * is true, then EndRemoteCopy reports an error on failure, otherwise it - * reports a warning. + * reports a warning or continues. */ static void EndRemoteCopy(List *connectionList, bool stopOnFailure)