From b9385700eec8881084276655b6d1e95dcc89e0a8 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Wed, 11 Jan 2017 16:36:56 -0800 Subject: [PATCH] Make placement_connection.c colocation aware. Because of foreign keys and similar concerns there should only be a single modifying/DDL connection for a set of colocated placements to a node. To enforce placement_connection.c now has an additional hash-table keeping track of the connections to a set of colocated placements. In addition to enforcing per placement restrictions on connections, there's now very similar restrictions for sets of colocated placements. --- .../connection/placement_connection.c | 321 +++++++++++++++++- 1 file changed, 304 insertions(+), 17 deletions(-) diff --git a/src/backend/distributed/connection/placement_connection.c b/src/backend/distributed/connection/placement_connection.c index 308466b27..5dcbab9c3 100644 --- a/src/backend/distributed/connection/placement_connection.c +++ b/src/backend/distributed/connection/placement_connection.c @@ -11,6 +11,7 @@ #include "postgres.h" +#include "access/hash.h" #include "distributed/connection_management.h" #include "distributed/hash_helpers.h" #include "distributed/metadata_cache.h" @@ -21,7 +22,9 @@ /* * A connection reference is used to register that a connection has been used - * to read or modify a shard placement as a particular user. + * to read or modify either a) a shard placement as a particular user b) a + * group of colocated placements (which depend on whether the reference is + * from ConnectionPlacementHashEntry or ColocatedPlacementHashEntry). */ typedef struct ConnectionReference { @@ -44,7 +47,10 @@ typedef struct ConnectionReference bool hadDML; bool hadDDL; - /* membership in ConnectionPlacementHashKey->connectionReferences */ + /* + * Membership in ConnectionPlacementHashEntry->connectionReferences or + * ColocatedPlacementsHashEntry->connectionReferences. + */ dlist_node placementNode; /* membership in MultiConnection->referencedPlacements */ @@ -93,6 +99,54 @@ typedef struct ConnectionPlacementHashEntry static HTAB *ConnectionPlacementHash; +/* + * A hash-table mapping colocated placements to connections. Colocated + * placements being the set of placements on a single node that represent the + * same value range. This is needed because connections for colocated + * placements (i.e. the corresponding placements for different colocated + * distributed tables) need to share connections. Otherwise things like + * foreign keys can very easily lead to unprincipled deadlocks. This means + * that there can only one DML/DDL connection for a set of colocated + * placements. + * + * A set of colocated placements is identified, besides node identifying + * information, by the associated colocation group id and the placement's + * 'representativeValue' which currently is the lower boundary of it's + * hash-range. + * + * Note that this hash-table only contains entries for hash-partitioned + * tables, because others so far don't support colocation. + */ + +/* hash key */ +typedef struct ColocatedPlacementsHashKey +{ + /* to identify host - database can't differ */ + char nodeName[MAX_NODE_LENGTH]; + uint32 nodePort; + + /* colocation group, or invalid */ + uint32 colocationGroupId; + + /* to represent the value range */ + uint32 representativeValue; +} ColocatedPlacementsHashKey; + +/* hash entry */ +typedef struct ColocatedPlacementsHashEntry +{ + ColocatedPlacementsHashKey key; + + /* list of connections to remote nodes */ + dlist_head connectionReferences; + + /* if non-NULL, connection executing DML/DDL*/ + ConnectionReference *modifyingConnection; +} ColocatedPlacementsHashEntry; + +static HTAB *ColocatedPlacementsHash; + + /* * Hash table mapping shard ids to placements. * @@ -118,15 +172,25 @@ typedef struct ConnectionShardHashEntry static HTAB *ConnectionShardHash; -static ConnectionReference * CheckExistingConnections(uint32 flags, const char *userName, - ConnectionPlacementHashEntry * - placementEntry); +static MultiConnection * StartColocatedPlacementConnection(uint32 flags, + ShardPlacement *placement, + const char *userName); +static ConnectionReference * CheckExistingPlacementConnections(uint32 flags, + ConnectionPlacementHashEntry + *placementEntry, + const char *userName); +static ConnectionReference * CheckExistingColocatedConnections(uint32 flags, + ColocatedPlacementsHashEntry + *connectionEntry, + const char *userName); static bool CanUseExistingConnection(uint32 flags, const char *userName, ConnectionReference *connectionReference); static void AssociatePlacementWithShard(ConnectionPlacementHashEntry *placementEntry, ShardPlacement *placement); static bool CheckShardPlacements(ConnectionShardHashEntry *shardEntry, bool preCommit, bool using2PC); +static uint32 ColocatedPlacementsHashHash(const void *key, Size keysize); +static int ColocatedPlacementsHashCompare(const void *a, const void *b, Size keysize); /* @@ -159,8 +223,9 @@ GetPlacementConnection(uint32 flags, ShardPlacement *placement, const char *user * - FOR_DDL - signal that connection is going to be used for DDL * * Only one connection associated with the placement may have FOR_DML or - * FOR_DDL set. This restriction prevents deadlocks and wrong results due to - * in-progress transactions. + * FOR_DDL set. For hash-partitioned tables only one connection for a set of + * colocated placements may have FOR_DML/DDL set. This restriction prevents + * deadlocks and wrong results due to in-progress transactions. */ MultiConnection * StartPlacementConnection(uint32 flags, ShardPlacement *placement, const char *userName) @@ -191,17 +256,21 @@ StartPlacementConnection(uint32 flags, ShardPlacement *placement, const char *us * Check whether any of the connections already associated with the * placement can be reused, or violates FOR_DML/FOR_DDL constraints. */ - returnConnectionReference = CheckExistingConnections(flags, userName, placementEntry); + returnConnectionReference = + CheckExistingPlacementConnections(flags, placementEntry, userName); /* - * Either no caching desired, or no connection present. Start connection - * establishment. Allocations are performed in transaction context, so we - * don't have to care about freeing in case of an early disconnect. + * Either no caching desired, or no connection already associated with the + * placement present. Check whether there's a usable connection associated + * for the set of colocated placements, or establish a new one. + * + * Allocations are performed in transaction context, so we don't have to + * care about freeing in case of an early disconnect. */ if (returnConnectionReference == NULL) { - MultiConnection *connection = StartNodeConnection(flags, placement->nodeName, - placement->nodePort); + MultiConnection *connection = + StartColocatedPlacementConnection(flags, placement, userName); returnConnectionReference = (ConnectionReference *) MemoryContextAlloc(TopTransactionContext, @@ -243,8 +312,8 @@ StartPlacementConnection(uint32 flags, ShardPlacement *placement, const char *us /* - * CheckExistingConnections check whether any of the existing connections is - * usable. If so, return it, otherwise return NULL. + * CheckExistingPlacementConnections check whether any of the existing + * connections is usable. If so, return it, otherwise return NULL. * * A connection is usable if it is not in use, the user matches, DDL/DML usage * match and cached connection are allowed. If no existing connection @@ -253,8 +322,9 @@ StartPlacementConnection(uint32 flags, ShardPlacement *placement, const char *us * DML/DML) an error is thrown. */ static ConnectionReference * -CheckExistingConnections(uint32 flags, const char *userName, - ConnectionPlacementHashEntry *placementEntry) +CheckExistingPlacementConnections(uint32 flags, + ConnectionPlacementHashEntry *placementEntry, + const char *userName) { dlist_iter it; @@ -340,6 +410,95 @@ CheckExistingConnections(uint32 flags, const char *userName, } +static ConnectionReference * +CheckExistingColocatedConnections(uint32 flags, + ColocatedPlacementsHashEntry *connectionsEntry, + const char *userName) +{ + dlist_iter it; + + /* + * If there's a connection that has executed DML/DDL, always return it if + * possible. That's because we only can execute DML/DDL over that + * connection. Checking this first also allows us to detect conflicts due + * to opening a second connection that wants to execute DML/DDL. + */ + if (connectionsEntry->modifyingConnection) + { + ConnectionReference *connectionReference = connectionsEntry->modifyingConnection; + + if (CanUseExistingConnection(flags, userName, connectionReference)) + { + return connectionReference; + } + + if (connectionReference->hadDDL) + { + /* would deadlock otherwise */ + ereport(ERROR, + (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("cannot establish new placement connection when DDL has been " + "executed on existing placement connection for a colocated placement"))); + } + else if (connectionReference->hadDML) + { + /* we'd not see the other connection's contents */ + ereport(ERROR, + (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("cannot establish new placement connection when DML has been " + "executed on existing placement connection for a colocated placement"))); + } + else + { + /* modifying xact should have executed DML/DDL */ + Assert(false); + } + } + + /* + * Search existing connections for a reusable connection. + */ + dlist_foreach(it, &connectionsEntry->connectionReferences) + { + ConnectionReference *connectionReference = + dlist_container(ConnectionReference, placementNode, it.cur); + + if (CanUseExistingConnection(flags, userName, connectionReference)) + { + return connectionReference; + } + + /* + * If not using the connection, verify that FOR_DML/DDL flags are + * compatible. + */ + if (flags & FOR_DDL) + { + /* would deadlock otherwise */ + ereport(ERROR, + (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("cannot establish new placement connection for DDL when " + "other connections exist for colocated placements"))); + } + else if (flags & FOR_DML) + { + /* + * We could allow this case (as presumably the SELECT is done, but + * it'd be a bit prone to programming errors, so we disallow for + * now. + */ + ereport(ERROR, + (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("cannot establish new placement connection for DML when " + "other connections exist for colocated placements"))); + } + } + + /* establish a new connection */ + return NULL; +} + + /* * CanUseExistingConnection is a helper function for CheckExistingConnections() * that checks whether an existing connection can be reused. @@ -377,6 +536,86 @@ CanUseExistingConnection(uint32 flags, const char *userName, } +/* + * StartColocatedPlacementConnection returns a connection that's usable for + * the passed in flags/placement. + * + * It does so by first checking whether any connection is already associated + * with the relevant entry in the colocated placement hash, or by asking for a + * formerly unassociated (possibly new) connection. Errors out if there's + * conflicting connections. + */ +static MultiConnection * +StartColocatedPlacementConnection(uint32 flags, ShardPlacement *placement, + const char *userName) +{ + ConnectionReference *connectionReference = NULL; + ColocatedPlacementsHashKey key; + ColocatedPlacementsHashEntry *entry; + bool found; + + /* + * Currently only hash partitioned tables can be colocated. For everything + * else we just return a new connection. + */ + if (placement->partitionMethod != DISTRIBUTE_BY_HASH) + { + return StartNodeConnection(flags, placement->nodeName, placement->nodePort); + } + + strcpy(key.nodeName, placement->nodeName); + key.nodePort = placement->nodePort; + key.colocationGroupId = placement->colocationGroupId; + key.representativeValue = placement->representativeValue; + + entry = hash_search(ColocatedPlacementsHash, &key, HASH_ENTER, &found); + if (!found) + { + memset(((char *) entry) + sizeof(ColocatedPlacementsHashKey), + 0, + sizeof(ColocatedPlacementsHashEntry) - sizeof(ColocatedPlacementsHashKey)); + dlist_init(&entry->connectionReferences); + } + + connectionReference = + CheckExistingColocatedConnections(flags, entry, userName); + + if (!connectionReference) + { + MultiConnection *connection = NULL; + + connection = StartNodeConnection(flags, placement->nodeName, placement->nodePort); + connectionReference = (ConnectionReference *) + MemoryContextAlloc(TopTransactionContext, + sizeof(ConnectionReference)); + connectionReference->connection = connection; + connectionReference->hadDDL = false; + connectionReference->hadDML = false; + connectionReference->userName = + MemoryContextStrdup(TopTransactionContext, userName); + dlist_push_tail(&entry->connectionReferences, + &connectionReference->placementNode); + + /* record association with connection, to handle connection closure */ + dlist_push_tail(&connection->referencedPlacements, + &connectionReference->connectionNode); + } + + if (flags & FOR_DDL) + { + entry->modifyingConnection = connectionReference; + connectionReference->hadDDL = true; + } + if (flags & FOR_DML) + { + entry->modifyingConnection = connectionReference; + connectionReference->hadDML = true; + } + + return connectionReference->connection; +} + + /* * AssociatePlacementWithShard records shard->placement relation in * ConnectionShardHash. @@ -474,6 +713,7 @@ ResetPlacementConnectionManagement(void) /* Simply delete all entries */ hash_delete_all(ConnectionPlacementHash); hash_delete_all(ConnectionShardHash); + hash_delete_all(ColocatedPlacementsHash); /* * NB: memory for ConnectionReference structs and subordinate data is @@ -626,6 +866,18 @@ InitPlacementConnectionManagement(void) ConnectionPlacementHash = hash_create("citus connection cache (placementid)", 64, &info, hashFlags); + /* create (colocated placement identity) -> [ConnectionReference] hash */ + memset(&info, 0, sizeof(info)); + info.keysize = sizeof(ColocatedPlacementsHashKey); + info.entrysize = sizeof(ColocatedPlacementsHashEntry); + info.hash = ColocatedPlacementsHashHash; + info.match = ColocatedPlacementsHashCompare; + info.hcxt = ConnectionContext; + hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT | HASH_COMPARE); + + ColocatedPlacementsHash = hash_create("citus connection cache (colocated placements)", + 64, &info, hashFlags); + /* create (shardId) -> [ConnectionShardHashEntry] hash */ memset(&info, 0, sizeof(info)); info.keysize = sizeof(ConnectionShardHashKey); @@ -637,3 +889,38 @@ InitPlacementConnectionManagement(void) ConnectionShardHash = hash_create("citus connection cache (shardid)", 64, &info, hashFlags); } + + +static uint32 +ColocatedPlacementsHashHash(const void *key, Size keysize) +{ + ColocatedPlacementsHashKey *entry = (ColocatedPlacementsHashKey *) key; + uint32 hash = 0; + + hash = string_hash(entry->nodeName, NAMEDATALEN); + hash = hash_combine(hash, hash_uint32(entry->nodePort)); + hash = hash_combine(hash, hash_uint32(entry->colocationGroupId)); + hash = hash_combine(hash, hash_uint32(entry->representativeValue)); + + return hash; +} + + +static int +ColocatedPlacementsHashCompare(const void *a, const void *b, Size keysize) +{ + ColocatedPlacementsHashKey *ca = (ColocatedPlacementsHashKey *) a; + ColocatedPlacementsHashKey *cb = (ColocatedPlacementsHashKey *) b; + + if (strncmp(ca->nodeName, cb->nodeName, MAX_NODE_LENGTH) != 0 || + ca->nodePort != cb->nodePort || + ca->colocationGroupId != cb->colocationGroupId || + ca->representativeValue != cb->representativeValue) + { + return 1; + } + else + { + return 0; + } +}