Minor refactorings in COPY command execution

1) Rename CONNECTION_PER_PLACEMENT to REQUIRE_CLEAN_CONNECTION. This is
mostly to make things clear as the new name reveals more.

2) We also make sure that mark all the copy connections critical,
even if they are accessed earlier in the transction
pull/4054/head
Onder Kalaci 2020-07-22 11:14:10 +02:00
parent 64469708af
commit cfb633601d
3 changed files with 28 additions and 8 deletions

View File

@ -3507,12 +3507,18 @@ CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement,
*/
ShardPlacementAccess *placementAccess = CreatePlacementAccess(placement,
PLACEMENT_ACCESS_DML);
MultiConnection *connection = GetConnectionIfPlacementAccessedInXact(connectionFlags,
list_make1(
placementAccess),
NULL);
MultiConnection *connection =
GetConnectionIfPlacementAccessedInXact(connectionFlags,
list_make1(placementAccess), NULL);
if (connection != NULL)
{
/*
* Errors are supposed to cause immediate aborts (i.e. we don't
* want to/can't invalidate placements), mark the connection as
* critical so later errors cause failures.
*/
MarkRemoteTransactionCritical(connection);
return connection;
}
@ -3547,7 +3553,21 @@ CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement,
if (placement->partitionMethod == DISTRIBUTE_BY_HASH &&
MultiShardConnectionType != SEQUENTIAL_CONNECTION)
{
connectionFlags |= CONNECTION_PER_PLACEMENT;
/*
* Claiming the connection exclusively (done below) would also have the
* effect of opening multiple connections, but claiming the connection
* exclusively prevents GetConnectionIfPlacementAccessedInXact from returning
* the connection if it is needed for a different shard placement.
*
* By setting the REQUIRE_CLEAN_CONNECTION flag we are guaranteed to get
* connection that will not be returned by GetConnectionIfPlacementAccessedInXact
* for the remainder of the COPY, hence it safe to claim the connection
* exclusively. Claiming a connection exclusively prevents it from being
* used in other distributed queries that happen during the COPY (e.g. if
* the copy logic calls a function to calculate a default value, and the
* function does a distributed query).
*/
connectionFlags |= REQUIRE_CLEAN_CONNECTION;
}
connection = GetPlacementConnection(connectionFlags, placement, nodeUser);

View File

@ -301,7 +301,7 @@ StartPlacementListConnection(uint32 flags, List *placementAccessList,
Assert((flags & OPTIONAL_CONNECTION) == 0);
Assert(chosenConnection != NULL);
if ((flags & CONNECTION_PER_PLACEMENT) &&
if ((flags & REQUIRE_CLEAN_CONNECTION) &&
ConnectionAccessedDifferentPlacement(chosenConnection, placement))
{
/*

View File

@ -46,8 +46,8 @@ enum MultiConnectionMode
FOR_DML = 1 << 2,
/* open a connection per (co-located set of) placement(s) */
CONNECTION_PER_PLACEMENT = 1 << 3,
/* connection must not have accessed any non-co-located placements */
REQUIRE_CLEAN_CONNECTION = 1 << 3,
OUTSIDE_TRANSACTION = 1 << 4,