From cfb633601da7657c1df0e251c3b0af91592f0156 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Wed, 22 Jul 2020 11:14:10 +0200 Subject: [PATCH] Minor refactorings in COPY command execution 1) Rename CONNECTION_PER_PLACEMENT to REQUIRE_CLEAN_CONNECTION. This is mostly to make things clear as the new name reveals more. 2) We also make sure that mark all the copy connections critical, even if they are accessed earlier in the transction --- src/backend/distributed/commands/multi_copy.c | 30 +++++++++++++++---- .../connection/placement_connection.c | 2 +- .../distributed/connection_management.h | 4 +-- 3 files changed, 28 insertions(+), 8 deletions(-) diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 86dc282b0..e647f2a43 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -3507,12 +3507,18 @@ CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement, */ ShardPlacementAccess *placementAccess = CreatePlacementAccess(placement, PLACEMENT_ACCESS_DML); - MultiConnection *connection = GetConnectionIfPlacementAccessedInXact(connectionFlags, - list_make1( - placementAccess), - NULL); + MultiConnection *connection = + GetConnectionIfPlacementAccessedInXact(connectionFlags, + list_make1(placementAccess), NULL); if (connection != NULL) { + /* + * Errors are supposed to cause immediate aborts (i.e. we don't + * want to/can't invalidate placements), mark the connection as + * critical so later errors cause failures. + */ + MarkRemoteTransactionCritical(connection); + return connection; } @@ -3547,7 +3553,21 @@ CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement, if (placement->partitionMethod == DISTRIBUTE_BY_HASH && MultiShardConnectionType != SEQUENTIAL_CONNECTION) { - connectionFlags |= CONNECTION_PER_PLACEMENT; + /* + * Claiming the connection exclusively (done below) would also have the + * effect of opening multiple connections, but claiming the connection + * exclusively prevents GetConnectionIfPlacementAccessedInXact from returning + * the connection if it is needed for a different shard placement. + * + * By setting the REQUIRE_CLEAN_CONNECTION flag we are guaranteed to get + * connection that will not be returned by GetConnectionIfPlacementAccessedInXact + * for the remainder of the COPY, hence it safe to claim the connection + * exclusively. Claiming a connection exclusively prevents it from being + * used in other distributed queries that happen during the COPY (e.g. if + * the copy logic calls a function to calculate a default value, and the + * function does a distributed query). + */ + connectionFlags |= REQUIRE_CLEAN_CONNECTION; } connection = GetPlacementConnection(connectionFlags, placement, nodeUser); diff --git a/src/backend/distributed/connection/placement_connection.c b/src/backend/distributed/connection/placement_connection.c index 756f3089a..5cfd25e39 100644 --- a/src/backend/distributed/connection/placement_connection.c +++ b/src/backend/distributed/connection/placement_connection.c @@ -301,7 +301,7 @@ StartPlacementListConnection(uint32 flags, List *placementAccessList, Assert((flags & OPTIONAL_CONNECTION) == 0); Assert(chosenConnection != NULL); - if ((flags & CONNECTION_PER_PLACEMENT) && + if ((flags & REQUIRE_CLEAN_CONNECTION) && ConnectionAccessedDifferentPlacement(chosenConnection, placement)) { /* diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 21043bfb0..875baee96 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -46,8 +46,8 @@ enum MultiConnectionMode FOR_DML = 1 << 2, - /* open a connection per (co-located set of) placement(s) */ - CONNECTION_PER_PLACEMENT = 1 << 3, + /* connection must not have accessed any non-co-located placements */ + REQUIRE_CLEAN_CONNECTION = 1 << 3, OUTSIDE_TRANSACTION = 1 << 4,