From 63676f5d65439670cc28d4141b089eecacbb1f46 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Wed, 21 Jun 2017 12:38:19 +0200 Subject: [PATCH] Allow choosing a connection for multiple placements with GetPlacementListConnection --- .../connection/placement_connection.c | 739 ++++++++++-------- .../distributed/placement_connection.h | 31 + 2 files changed, 438 insertions(+), 332 deletions(-) diff --git a/src/backend/distributed/connection/placement_connection.c b/src/backend/distributed/connection/placement_connection.c index 2cf2d0f83..047fe041b 100644 --- a/src/backend/distributed/connection/placement_connection.c +++ b/src/backend/distributed/connection/placement_connection.c @@ -14,7 +14,9 @@ #include "access/hash.h" #include "distributed/connection_management.h" #include "distributed/hash_helpers.h" +#include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" +#include "distributed/multi_planner.h" #include "distributed/placement_connection.h" #include "utils/hsearch.h" #include "utils/memutils.h" @@ -47,17 +49,14 @@ typedef struct ConnectionReference bool hadDML; bool hadDDL; - /* - * Membership in ConnectionPlacementHashEntry->connectionReferences or - * ColocatedPlacementsHashEntry->connectionReferences. - */ - dlist_node placementNode; - /* membership in MultiConnection->referencedPlacements */ dlist_node connectionNode; } ConnectionReference; +struct ColocatedPlacementsHashEntry; + + /* * Hash table mapping placements to a list of connections. * @@ -85,11 +84,14 @@ typedef struct ConnectionPlacementHashEntry /* did any remote transactions fail? */ bool failed; - /* list of connections to remote nodes */ - dlist_head connectionReferences; + /* primary connection used to access the placement */ + ConnectionReference *primaryConnection; - /* if non-NULL, connection executing DML/DDL*/ - ConnectionReference *modifyingConnection; + /* are any other connections reading from the placements? */ + bool hasSecondaryConnections; + + /* entry for the set of co-located placements */ + struct ColocatedPlacementsHashEntry *colocatedEntry; /* membership in ConnectionShardHashEntry->placementConnections */ dlist_node shardNode; @@ -137,11 +139,11 @@ typedef struct ColocatedPlacementsHashEntry { ColocatedPlacementsHashKey key; - /* list of connections to remote nodes */ - dlist_head connectionReferences; + /* primary connection used to access the co-located placements */ + ConnectionReference *primaryConnection; - /* if non-NULL, connection executing DML/DDL*/ - ConnectionReference *modifyingConnection; + /* are any other connections reading from the placements? */ + bool hasSecondaryConnections; } ColocatedPlacementsHashEntry; static HTAB *ColocatedPlacementsHash; @@ -172,19 +174,10 @@ typedef struct ConnectionShardHashEntry static HTAB *ConnectionShardHash; -static MultiConnection * StartColocatedPlacementConnection(uint32 flags, - ShardPlacement *placement, - const char *userName); -static ConnectionReference * CheckExistingPlacementConnections(uint32 flags, - ConnectionPlacementHashEntry - *placementEntry, - const char *userName); -static ConnectionReference * CheckExistingColocatedConnections(uint32 flags, - ColocatedPlacementsHashEntry - *connectionEntry, - const char *userName); +static ConnectionPlacementHashEntry * FindOrCreatePlacementEntry( + ShardPlacement *placement); static bool CanUseExistingConnection(uint32 flags, const char *userName, - ConnectionReference *connectionReference); + ConnectionReference *placementConnection); static void AssociatePlacementWithShard(ConnectionPlacementHashEntry *placementEntry, ShardPlacement *placement); static bool CheckShardPlacements(ConnectionShardHashEntry *shardEntry); @@ -229,76 +222,350 @@ GetPlacementConnection(uint32 flags, ShardPlacement *placement, const char *user MultiConnection * StartPlacementConnection(uint32 flags, ShardPlacement *placement, const char *userName) { - ConnectionPlacementHashKey key; - ConnectionPlacementHashEntry *placementEntry = NULL; - ConnectionReference *returnConnectionReference = NULL; + ShardPlacementAccess *placementAccess = + (ShardPlacementAccess *) palloc0(sizeof(ShardPlacementAccess)); + + placementAccess->placement = placement; + + if (flags & FOR_DDL) + { + placementAccess->accessType = PLACEMENT_ACCESS_DDL; + } + else if (flags & FOR_DML) + { + placementAccess->accessType = PLACEMENT_ACCESS_DML; + } + else + { + placementAccess->accessType = PLACEMENT_ACCESS_SELECT; + } + + return StartPlacementListConnection(flags, list_make1(placementAccess), userName); +} + + +/* + * GetPlacementListConnection establishes a connection for a set of placement + * accesses. + * + * See StartPlacementListConnection for details. + */ +MultiConnection * +GetPlacementListConnection(uint32 flags, List *placementAccessList, const char *userName) +{ + MultiConnection *connection = StartPlacementListConnection(flags, placementAccessList, + userName); + + FinishConnectionEstablishment(connection); + return connection; +} + + +/* + * StartPlacementListConnection returns a connection to a remote node suitable for + * a placement accesses (SELECT, DML, DDL) or throws an error if no suitable + * connection can be established if would cause a self-deadlock or consistency + * violation. + */ +MultiConnection * +StartPlacementListConnection(uint32 flags, List *placementAccessList, + const char *userName) +{ char *freeUserName = NULL; - bool found = false; + bool foundModifyingConnection = false; + ListCell *placementAccessCell = NULL; + List *placementEntryList = NIL; + ListCell *placementEntryCell = NULL; + MultiConnection *chosenConnection = NULL; if (userName == NULL) { userName = freeUserName = CurrentUserName(); } - key.placementId = placement->placementId; - - /* lookup relevant hash entry */ - placementEntry = hash_search(ConnectionPlacementHash, &key, HASH_ENTER, &found); - if (!found) - { - dlist_init(&placementEntry->connectionReferences); - placementEntry->failed = false; - placementEntry->modifyingConnection = NULL; - } - /* - * Check whether any of the connections already associated with the - * placement can be reused, or violates FOR_DML/FOR_DDL constraints. - */ - returnConnectionReference = - CheckExistingPlacementConnections(flags, placementEntry, userName); - - /* - * Either no caching desired, or no connection already associated with the - * placement present. Check whether there's a usable connection associated - * for the set of colocated placements, or establish a new one. + * Go through all placement accesses to find a suitable connection. * - * Allocations are performed in transaction context, so we don't have to - * care about freeing in case of an early disconnect. + * If none of the placements have been accessed in this transaction, connection + * remains NULL. + * + * If one or more of the placements have been modified in this transaction, then + * use the connection that performed the write. If placements have been written + * over multiple connections or the connection is not available, error out. + * + * If placements have only been read in this transaction, then use the last + * suitable connection found for a placement in the placementAccessList. */ - if (returnConnectionReference == NULL) + foreach(placementAccessCell, placementAccessList) { - MultiConnection *connection = - StartColocatedPlacementConnection(flags, placement, userName); + ShardPlacementAccess *placementAccess = + (ShardPlacementAccess *) lfirst(placementAccessCell); + ShardPlacement *placement = placementAccess->placement; + ShardPlacementAccessType accessType = placementAccess->accessType; - returnConnectionReference = (ConnectionReference *) - MemoryContextAlloc(TopTransactionContext, - sizeof(ConnectionReference)); - returnConnectionReference->connection = connection; - returnConnectionReference->hadDDL = false; - returnConnectionReference->hadDML = false; - returnConnectionReference->userName = - MemoryContextStrdup(TopTransactionContext, userName); - dlist_push_tail(&placementEntry->connectionReferences, - &returnConnectionReference->placementNode); + ConnectionPlacementHashEntry *placementEntry = NULL; + ColocatedPlacementsHashEntry *colocatedEntry = NULL; + ConnectionReference *placementConnection = NULL; - /* record association with shard, for invalidation */ - AssociatePlacementWithShard(placementEntry, placement); + if (placement->shardId == INVALID_SHARD_ID) + { + /* + * When a SELECT prunes down to 0 shard, we use a dummy placement. + * In that case, we can fall back to the default connection. + * + * FIXME: this can be removed if we evaluate empty SELECTs locally. + */ + continue; + } - /* record association with connection, to handle connection closure */ - dlist_push_tail(&connection->referencedPlacements, - &returnConnectionReference->connectionNode); + placementEntry = FindOrCreatePlacementEntry(placement); + colocatedEntry = placementEntry->colocatedEntry; + placementConnection = placementEntry->primaryConnection; + + /* note: the Asserts below are primarily for clarifying the conditions */ + + if (placementConnection->connection == NULL) + { + /* no connection has been chosen for the placement */ + } + else if (accessType == PLACEMENT_ACCESS_DDL && + placementEntry->hasSecondaryConnections) + { + /* + * If a placement has been read over multiple connections (typically as + * a result of a reference table join) then a DDL command on the placement + * would create a self-deadlock. + */ + + Assert(placementConnection != NULL); + + ereport(ERROR, + (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("cannot perform DDL on a placement which has been read over " + "multiple connections"))); + } + else if (accessType == PLACEMENT_ACCESS_DDL && colocatedEntry != NULL && + colocatedEntry->hasSecondaryConnections) + { + /* + * If a placement has been read over multiple (uncommitted) connections + * then a DDL command on a co-located placement may create a self-deadlock + * if there exist some relationship between the co-located placements + * (e.g. foreign key, partitioning). + */ + + Assert(placementConnection != NULL); + + ereport(ERROR, + (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("cannot perform DDL on a placement if a co-located placement " + "has been read over multiple connections"))); + } + else if (foundModifyingConnection) + { + /* + * We already found a connection that performed writes on of the placements + * and must use it. + */ + + if ((placementConnection->hadDDL || placementConnection->hadDML) && + placementConnection->connection != chosenConnection) + { + /* + * The current placement may have been modified over a different + * connection. Neither connection is guaranteed to see all uncomitted + * writes and therefore we cannot proceed. + */ + ereport(ERROR, + (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("cannot perform query with placements that were " + "modified over multiple connections"))); + } + } + else if (CanUseExistingConnection(flags, userName, placementConnection)) + { + /* + * There is an existing connection for the placement and we can use it. + */ + + Assert(placementConnection != NULL); + + chosenConnection = placementConnection->connection; + + if (placementConnection->hadDDL || placementConnection->hadDML) + { + /* this connection performed writes, we must use it */ + foundModifyingConnection = true; + } + } + else if (placementConnection->hadDDL) + { + /* + * There is an existing connection, but we cannot use it and it executed + * DDL. Any subsequent operation needs to be able to see the results of + * the DDL command and thus cannot proceed if it cannot use the connection. + */ + + Assert(placementConnection != NULL); + Assert(!CanUseExistingConnection(flags, userName, placementConnection)); + + ereport(ERROR, + (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("cannot establish new placement connection when DDL has " + "been executed on existing placement connection"))); + } + else if (placementConnection->hadDML) + { + /* + * There is an existing connection, but we cannot use it and it executed + * DML. Any subsequent operation needs to be able to see the results of + * the DML command and thus cannot proceed if it cannot use the connection. + * + * Note that this is not meaningfully different from the previous case. We + * just produce a different error message based on whether DDL was or only + * DML was executed. + */ + + Assert(placementConnection != NULL); + Assert(!CanUseExistingConnection(flags, userName, placementConnection)); + Assert(!placementConnection->hadDDL); + + ereport(ERROR, + (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("cannot establish new placement connection when DML has " + "been executed on existing placement connection"))); + } + else if (accessType == PLACEMENT_ACCESS_DDL) + { + /* + * There is an existing connection, but we cannot use it and we want to + * execute DDL. The operation on the existing connection might conflict + * with the DDL statement. + */ + + Assert(placementConnection != NULL); + Assert(!CanUseExistingConnection(flags, userName, placementConnection)); + Assert(!placementConnection->hadDDL); + Assert(!placementConnection->hadDML); + + ereport(ERROR, + (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("cannot perform parallel DDL command because multiple " + "placements have been accessed over the same connection"))); + } + else + { + /* + * The placement has a connection assigned to it, but it cannot be used, + * most likely because it has been claimed exclusively. Fortunately, it + * has only been used for reads and we're not performing a DDL command. + * We can therefore use a different connection for this placement. + */ + + Assert(placementConnection != NULL); + Assert(!CanUseExistingConnection(flags, userName, placementConnection)); + Assert(!placementConnection->hadDDL); + Assert(!placementConnection->hadDML); + Assert(accessType != PLACEMENT_ACCESS_DDL); + } + + placementEntryList = lappend(placementEntryList, placementEntry); } - if (flags & FOR_DDL) + if (chosenConnection == NULL) { - placementEntry->modifyingConnection = returnConnectionReference; - returnConnectionReference->hadDDL = true; + /* use the first placement from the list to extract nodename and nodeport */ + ShardPlacementAccess *placementAccess = + (ShardPlacementAccess *) linitial(placementAccessList); + ShardPlacement *placement = placementAccess->placement; + + /* + * No suitable connection in the placement->connection mapping, get one from + * the node->connection pool. + */ + chosenConnection = StartNodeConnection(flags, placement->nodeName, + placement->nodePort); } - if (flags & FOR_DML) + + /* + * Now that a connection has been chosen, initialise or update the connection + * references for all placements. + */ + forboth(placementAccessCell, placementAccessList, + placementEntryCell, placementEntryList) { - placementEntry->modifyingConnection = returnConnectionReference; - returnConnectionReference->hadDML = true; + ShardPlacementAccess *placementAccess = + (ShardPlacementAccess *) lfirst(placementAccessCell); + ShardPlacementAccessType accessType = placementAccess->accessType; + ConnectionPlacementHashEntry *placementEntry = + (ConnectionPlacementHashEntry *) lfirst(placementEntryCell); + ConnectionReference *placementConnection = placementEntry->primaryConnection; + + if (placementConnection->connection == chosenConnection) + { + /* using the connection that was already assigned to the placement */ + } + else if (placementConnection->connection == NULL) + { + /* placement does not have a connection assigned yet */ + placementConnection->connection = chosenConnection; + placementConnection->hadDDL = false; + placementConnection->hadDML = false; + placementConnection->userName = MemoryContextStrdup(TopTransactionContext, + userName); + + /* record association with connection, to handle connection closure */ + dlist_push_tail(&chosenConnection->referencedPlacements, + &placementConnection->connectionNode); + } + else + { + /* using a different connection than the one assigned to the placement */ + + if (accessType != PLACEMENT_ACCESS_SELECT) + { + /* + * We previously read from the placement, but now we're writing to + * it (if we had written to the placement, we would have either chosen + * the same connection, or errored out). Update the connection reference + * to point to the connection used for writing. We don't need to remember + * the existing connection since we won't be able to reuse it for + * accessing the placement. However, we do register that it exists in + * hasSecondaryConnections. + */ + placementConnection->connection = chosenConnection; + placementConnection->userName = MemoryContextStrdup(TopTransactionContext, + userName); + + Assert(!placementConnection->hadDDL); + Assert(!placementConnection->hadDML); + } + + /* + * There are now multiple connections that read from the placement + * and DDL commands are forbidden. + */ + placementEntry->hasSecondaryConnections = true; + + if (placementEntry->colocatedEntry != NULL) + { + /* we also remember this for co-located placements */ + placementEntry->colocatedEntry->hasSecondaryConnections = true; + } + } + + /* + * Remember that we used the current connection for writes. + */ + if (accessType == PLACEMENT_ACCESS_DDL) + { + placementConnection->hadDDL = true; + } + + if (accessType == PLACEMENT_ACCESS_DML) + { + placementConnection->hadDML = true; + } } if (freeUserName) @@ -306,195 +573,82 @@ StartPlacementConnection(uint32 flags, ShardPlacement *placement, const char *us pfree(freeUserName); } - return returnConnectionReference->connection; + return chosenConnection; } /* - * CheckExistingPlacementConnections check whether any of the existing - * connections is usable. If so, return it, otherwise return NULL. - * - * A connection is usable if it is not in use, the user matches, DDL/DML usage - * match and cached connection are allowed. If no existing connection - * matches, but a new connection would conflict (e.g. a connection already - * exists but isn't usable, and the desired connection needs to execute - * DML/DML) an error is thrown. + * FindOrCreatePlacementEntry finds a placement entry in either the + * placement->connection hash or the co-located placements->connection hash, + * or adds a new entry if the placement has not yet been accessed in the + * current transaction. */ -static ConnectionReference * -CheckExistingPlacementConnections(uint32 flags, - ConnectionPlacementHashEntry *placementEntry, - const char *userName) +static ConnectionPlacementHashEntry * +FindOrCreatePlacementEntry(ShardPlacement *placement) { - dlist_iter it; + ConnectionPlacementHashKey key; + ConnectionPlacementHashEntry *placementEntry = NULL; + bool found = false; - /* - * If there's a connection that has executed DML/DDL, always return it if - * possible. That's because we only can execute DML/DDL over that - * connection. Checking this first also allows us to detect conflicts due - * to opening a second connection that wants to execute DML/DDL. - */ - if (placementEntry->modifyingConnection) + key.placementId = placement->placementId; + + placementEntry = hash_search(ConnectionPlacementHash, &key, HASH_ENTER, &found); + if (!found) { - ConnectionReference *connectionReference = placementEntry->modifyingConnection; + /* no connection has been chosen for this placement */ + placementEntry->failed = false; + placementEntry->primaryConnection = NULL; + placementEntry->hasSecondaryConnections = false; + placementEntry->colocatedEntry = NULL; - if (CanUseExistingConnection(flags, userName, connectionReference)) + if (placement->partitionMethod == DISTRIBUTE_BY_HASH || + placement->partitionMethod == DISTRIBUTE_BY_NONE) { - return connectionReference; - } + ColocatedPlacementsHashKey key; + ColocatedPlacementsHashEntry *colocatedEntry = NULL; - if (connectionReference->hadDDL) - { - /* would deadlock otherwise */ - ereport(ERROR, - (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), - errmsg("cannot establish new placement connection when DDL has been " - "executed on existing placement connection"))); - } - else if (connectionReference->hadDML) - { - /* we'd not see the other connection's contents */ - ereport(ERROR, - (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), - errmsg("cannot establish new placement connection when DML has been " - "executed on existing placement connection"))); + strcpy(key.nodeName, placement->nodeName); + key.nodePort = placement->nodePort; + key.colocationGroupId = placement->colocationGroupId; + key.representativeValue = placement->representativeValue; + + /* look for a connection assigned to co-located placements */ + colocatedEntry = hash_search(ColocatedPlacementsHash, &key, HASH_ENTER, + &found); + if (!found) + { + void *conRef = MemoryContextAllocZero(TopTransactionContext, + sizeof(ConnectionReference)); + + /* + * Create a connection reference that can be used for the entire + * set of co-located placements. + */ + colocatedEntry->primaryConnection = (ConnectionReference *) conRef; + + colocatedEntry->hasSecondaryConnections = false; + } + + /* + * Assign the connection reference for the set of co-located placements + * to the current placement. + */ + placementEntry->primaryConnection = colocatedEntry->primaryConnection; + placementEntry->colocatedEntry = colocatedEntry; } else { - /* modifying xact should have executed DML/DDL */ - Assert(false); + void *conRef = MemoryContextAllocZero(TopTransactionContext, + sizeof(ConnectionReference)); + + placementEntry->primaryConnection = (ConnectionReference *) conRef; } } - /* - * Search existing connections for a reusable connection. - */ - dlist_foreach(it, &placementEntry->connectionReferences) - { - ConnectionReference *connectionReference = - dlist_container(ConnectionReference, placementNode, it.cur); + /* record association with shard, for invalidation */ + AssociatePlacementWithShard(placementEntry, placement); - if (CanUseExistingConnection(flags, userName, connectionReference)) - { - return connectionReference; - } - - /* - * If not using the connection, verify that FOR_DML/DDL flags are - * compatible. - */ - if (flags & FOR_DDL) - { - /* would deadlock otherwise */ - ereport(ERROR, - (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), - errmsg("cannot establish new placement connection for DDL when " - "other connections exist"))); - } - else if (flags & FOR_DML) - { - /* - * We could allow this case (as presumably the SELECT is done, but - * it'd be a bit prone to programming errors, so we disallow for - * now. - */ - ereport(ERROR, - (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), - errmsg("cannot establish new placement connection for DML when " - "other connections exist"))); - } - } - - /* establish a new connection */ - return NULL; -} - - -static ConnectionReference * -CheckExistingColocatedConnections(uint32 flags, - ColocatedPlacementsHashEntry *connectionsEntry, - const char *userName) -{ - dlist_iter it; - - /* - * If there's a connection that has executed DML/DDL, always return it if - * possible. That's because we only can execute DML/DDL over that - * connection. Checking this first also allows us to detect conflicts due - * to opening a second connection that wants to execute DML/DDL. - */ - if (connectionsEntry->modifyingConnection) - { - ConnectionReference *connectionReference = connectionsEntry->modifyingConnection; - - if (CanUseExistingConnection(flags, userName, connectionReference)) - { - return connectionReference; - } - - if (connectionReference->hadDDL) - { - /* would deadlock otherwise */ - ereport(ERROR, - (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), - errmsg("cannot establish new placement connection when DDL has been " - "executed on existing placement connection for a colocated placement"))); - } - else if (connectionReference->hadDML) - { - /* we'd not see the other connection's contents */ - ereport(ERROR, - (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), - errmsg("cannot establish new placement connection when DML has been " - "executed on existing placement connection for a colocated placement"))); - } - else - { - /* modifying xact should have executed DML/DDL */ - Assert(false); - } - } - - /* - * Search existing connections for a reusable connection. - */ - dlist_foreach(it, &connectionsEntry->connectionReferences) - { - ConnectionReference *connectionReference = - dlist_container(ConnectionReference, placementNode, it.cur); - - if (CanUseExistingConnection(flags, userName, connectionReference)) - { - return connectionReference; - } - - /* - * If not using the connection, verify that FOR_DML/DDL flags are - * compatible. - */ - if (flags & FOR_DDL) - { - /* would deadlock otherwise */ - ereport(ERROR, - (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), - errmsg("cannot establish new placement connection for DDL when " - "other connections exist for colocated placements"))); - } - else if (flags & FOR_DML) - { - /* - * We could allow this case (as presumably the SELECT is done, but - * it'd be a bit prone to programming errors, so we disallow for - * now. - */ - ereport(ERROR, - (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), - errmsg("cannot establish new placement connection for DML when " - "other connections exist for colocated placements"))); - } - } - - /* establish a new connection */ - return NULL; + return placementEntry; } @@ -535,86 +689,6 @@ CanUseExistingConnection(uint32 flags, const char *userName, } -/* - * StartColocatedPlacementConnection returns a connection that's usable for - * the passed in flags/placement. - * - * It does so by first checking whether any connection is already associated - * with the relevant entry in the colocated placement hash, or by asking for a - * formerly unassociated (possibly new) connection. Errors out if there's - * conflicting connections. - */ -static MultiConnection * -StartColocatedPlacementConnection(uint32 flags, ShardPlacement *placement, - const char *userName) -{ - ConnectionReference *connectionReference = NULL; - ColocatedPlacementsHashKey key; - ColocatedPlacementsHashEntry *entry; - bool found; - - /* - * Currently only hash partitioned tables can be colocated. For everything - * else we just return a new connection. - */ - if (placement->partitionMethod != DISTRIBUTE_BY_HASH) - { - return StartNodeConnection(flags, placement->nodeName, placement->nodePort); - } - - strcpy(key.nodeName, placement->nodeName); - key.nodePort = placement->nodePort; - key.colocationGroupId = placement->colocationGroupId; - key.representativeValue = placement->representativeValue; - - entry = hash_search(ColocatedPlacementsHash, &key, HASH_ENTER, &found); - if (!found) - { - memset(((char *) entry) + sizeof(ColocatedPlacementsHashKey), - 0, - sizeof(ColocatedPlacementsHashEntry) - sizeof(ColocatedPlacementsHashKey)); - dlist_init(&entry->connectionReferences); - } - - connectionReference = - CheckExistingColocatedConnections(flags, entry, userName); - - if (!connectionReference) - { - MultiConnection *connection = NULL; - - connection = StartNodeConnection(flags, placement->nodeName, placement->nodePort); - connectionReference = (ConnectionReference *) - MemoryContextAlloc(TopTransactionContext, - sizeof(ConnectionReference)); - connectionReference->connection = connection; - connectionReference->hadDDL = false; - connectionReference->hadDML = false; - connectionReference->userName = - MemoryContextStrdup(TopTransactionContext, userName); - dlist_push_tail(&entry->connectionReferences, - &connectionReference->placementNode); - - /* record association with connection, to handle connection closure */ - dlist_push_tail(&connection->referencedPlacements, - &connectionReference->connectionNode); - } - - if (flags & FOR_DDL) - { - entry->modifyingConnection = connectionReference; - connectionReference->hadDDL = true; - } - if (flags & FOR_DML) - { - entry->modifyingConnection = connectionReference; - connectionReference->hadDML = true; - } - - return connectionReference->connection; -} - - /* * AssociatePlacementWithShard records shard->placement relation in * ConnectionShardHash. @@ -680,7 +754,7 @@ CloseShardPlacementAssociation(struct MultiConnection *connection) /* * Note that we don't reset ConnectionPlacementHashEntry's - * modifyingConnection here, that'd more complicated than it seems + * primaryConnection here, that'd more complicated than it seems * worth. That means we'll error out spuriously if a DML/DDL * executing connection is closed earlier in a transaction. */ @@ -823,16 +897,17 @@ CheckShardPlacements(ConnectionShardHashEntry *shardEntry) { ConnectionPlacementHashEntry *placementEntry = dlist_container(ConnectionPlacementHashEntry, shardNode, placementIter.cur); - ConnectionReference *modifyingConnection = placementEntry->modifyingConnection; + ConnectionReference *primaryConnection = placementEntry->primaryConnection; MultiConnection *connection = NULL; /* we only consider shards that are modified */ - if (modifyingConnection == NULL) + if (primaryConnection == NULL || + !(primaryConnection->hadDDL || primaryConnection->hadDML)) { continue; } - connection = modifyingConnection->connection; + connection = primaryConnection->connection; if (!connection || connection->remoteTransaction.transactionFailed) { diff --git a/src/include/distributed/placement_connection.h b/src/include/distributed/placement_connection.h index 493e74c52..71e7a1358 100644 --- a/src/include/distributed/placement_connection.h +++ b/src/include/distributed/placement_connection.h @@ -15,6 +15,30 @@ /* forward declare, to avoid dependency on ShardPlacement definition */ struct ShardPlacement; +/* represents the way in which a placement is accessed */ +typedef enum ShardPlacementAccessType +{ + /* read from placement */ + PLACEMENT_ACCESS_SELECT, + + /* modify rows in placement */ + PLACEMENT_ACCESS_DML, + + /* modify placement schema */ + PLACEMENT_ACCESS_DDL +} ShardPlacementAccessType; + +/* represents access to a placement */ +typedef struct ShardPlacementAccess +{ + /* placement that is accessed */ + struct ShardPlacement *placement; + + /* the way in which the placement is accessed */ + ShardPlacementAccessType accessType; +} ShardPlacementAccess; + + extern MultiConnection * GetPlacementConnection(uint32 flags, struct ShardPlacement *placement, const char *userName); @@ -22,6 +46,13 @@ extern MultiConnection * StartPlacementConnection(uint32 flags, struct ShardPlacement *placement, const char *userName); +extern MultiConnection * GetPlacementListConnection(uint32 flags, + List *placementAccessList, + const char *userName); +extern MultiConnection * StartPlacementListConnection(uint32 flags, + List *placementAccessList, + const char *userName); + extern void ResetPlacementConnectionManagement(void); extern void MarkFailedShardPlacements(void); extern void PostCommitMarkFailedShardPlacements(bool using2PC);