Merge pull request #4054 from citusdata/rename_connection_flag

Minor refactorings in COPY command execution
pull/4055/head
Önder Kalacı 2020-07-23 15:58:16 +02:00 committed by GitHub
commit 20a46f8f57
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 28 additions and 8 deletions

View File

@ -3507,12 +3507,18 @@ CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement,
*/
ShardPlacementAccess *placementAccess = CreatePlacementAccess(placement,
PLACEMENT_ACCESS_DML);
MultiConnection *connection = GetConnectionIfPlacementAccessedInXact(connectionFlags,
list_make1(
placementAccess),
NULL);
MultiConnection *connection =
GetConnectionIfPlacementAccessedInXact(connectionFlags,
list_make1(placementAccess), NULL);
if (connection != NULL)
{
/*
* Errors are supposed to cause immediate aborts (i.e. we don't
* want to/can't invalidate placements), mark the connection as
* critical so later errors cause failures.
*/
MarkRemoteTransactionCritical(connection);
return connection;
}
@ -3547,7 +3553,21 @@ CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement,
if (placement->partitionMethod == DISTRIBUTE_BY_HASH &&
MultiShardConnectionType != SEQUENTIAL_CONNECTION)
{
connectionFlags |= CONNECTION_PER_PLACEMENT;
/*
* Claiming the connection exclusively (done below) would also have the
* effect of opening multiple connections, but claiming the connection
* exclusively prevents GetConnectionIfPlacementAccessedInXact from returning
* the connection if it is needed for a different shard placement.
*
* By setting the REQUIRE_CLEAN_CONNECTION flag we are guaranteed to get
* connection that will not be returned by GetConnectionIfPlacementAccessedInXact
* for the remainder of the COPY, hence it safe to claim the connection
* exclusively. Claiming a connection exclusively prevents it from being
* used in other distributed queries that happen during the COPY (e.g. if
* the copy logic calls a function to calculate a default value, and the
* function does a distributed query).
*/
connectionFlags |= REQUIRE_CLEAN_CONNECTION;
}
connection = GetPlacementConnection(connectionFlags, placement, nodeUser);

View File

@ -301,7 +301,7 @@ StartPlacementListConnection(uint32 flags, List *placementAccessList,
Assert((flags & OPTIONAL_CONNECTION) == 0);
Assert(chosenConnection != NULL);
if ((flags & CONNECTION_PER_PLACEMENT) &&
if ((flags & REQUIRE_CLEAN_CONNECTION) &&
ConnectionAccessedDifferentPlacement(chosenConnection, placement))
{
/*

View File

@ -46,8 +46,8 @@ enum MultiConnectionMode
FOR_DML = 1 << 2,
/* open a connection per (co-located set of) placement(s) */
CONNECTION_PER_PLACEMENT = 1 << 3,
/* connection must not have accessed any non-co-located placements */
REQUIRE_CLEAN_CONNECTION = 1 << 3,
OUTSIDE_TRANSACTION = 1 << 4,