diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 86dc282b0..e647f2a43 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -3507,12 +3507,18 @@ CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement, */ ShardPlacementAccess *placementAccess = CreatePlacementAccess(placement, PLACEMENT_ACCESS_DML); - MultiConnection *connection = GetConnectionIfPlacementAccessedInXact(connectionFlags, - list_make1( - placementAccess), - NULL); + MultiConnection *connection = + GetConnectionIfPlacementAccessedInXact(connectionFlags, + list_make1(placementAccess), NULL); if (connection != NULL) { + /* + * Errors are supposed to cause immediate aborts (i.e. we don't + * want to/can't invalidate placements), mark the connection as + * critical so later errors cause failures. + */ + MarkRemoteTransactionCritical(connection); + return connection; } @@ -3547,7 +3553,21 @@ CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement, if (placement->partitionMethod == DISTRIBUTE_BY_HASH && MultiShardConnectionType != SEQUENTIAL_CONNECTION) { - connectionFlags |= CONNECTION_PER_PLACEMENT; + /* + * Claiming the connection exclusively (done below) would also have the + * effect of opening multiple connections, but claiming the connection + * exclusively prevents GetConnectionIfPlacementAccessedInXact from returning + * the connection if it is needed for a different shard placement. + * + * By setting the REQUIRE_CLEAN_CONNECTION flag we are guaranteed to get + * connection that will not be returned by GetConnectionIfPlacementAccessedInXact + * for the remainder of the COPY, hence it safe to claim the connection + * exclusively. Claiming a connection exclusively prevents it from being + * used in other distributed queries that happen during the COPY (e.g. if + * the copy logic calls a function to calculate a default value, and the + * function does a distributed query). + */ + connectionFlags |= REQUIRE_CLEAN_CONNECTION; } connection = GetPlacementConnection(connectionFlags, placement, nodeUser); diff --git a/src/backend/distributed/connection/placement_connection.c b/src/backend/distributed/connection/placement_connection.c index 756f3089a..5cfd25e39 100644 --- a/src/backend/distributed/connection/placement_connection.c +++ b/src/backend/distributed/connection/placement_connection.c @@ -301,7 +301,7 @@ StartPlacementListConnection(uint32 flags, List *placementAccessList, Assert((flags & OPTIONAL_CONNECTION) == 0); Assert(chosenConnection != NULL); - if ((flags & CONNECTION_PER_PLACEMENT) && + if ((flags & REQUIRE_CLEAN_CONNECTION) && ConnectionAccessedDifferentPlacement(chosenConnection, placement)) { /* diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 21043bfb0..875baee96 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -46,8 +46,8 @@ enum MultiConnectionMode FOR_DML = 1 << 2, - /* open a connection per (co-located set of) placement(s) */ - CONNECTION_PER_PLACEMENT = 1 << 3, + /* connection must not have accessed any non-co-located placements */ + REQUIRE_CLEAN_CONNECTION = 1 << 3, OUTSIDE_TRANSACTION = 1 << 4,