diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 22e391b4f..724a6ec39 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -596,7 +596,7 @@ ConnectionHashCompare(const void *a, const void *b, Size keysize) ConnectionHashKey *ca = (ConnectionHashKey *) a; ConnectionHashKey *cb = (ConnectionHashKey *) b; - if (strncmp(ca->hostname, cb->hostname, NAMEDATALEN) != 0 || + if (strncmp(ca->hostname, cb->hostname, MAX_NODE_LENGTH) != 0 || ca->port != cb->port || strncmp(ca->user, cb->user, NAMEDATALEN) != 0 || strncmp(ca->database, cb->database, NAMEDATALEN) != 0) diff --git a/src/backend/distributed/connection/placement_connection.c b/src/backend/distributed/connection/placement_connection.c index 4ad5bb947..5dcbab9c3 100644 --- a/src/backend/distributed/connection/placement_connection.c +++ b/src/backend/distributed/connection/placement_connection.c @@ -11,6 +11,7 @@ #include "postgres.h" +#include "access/hash.h" #include "distributed/connection_management.h" #include "distributed/hash_helpers.h" #include "distributed/metadata_cache.h" @@ -21,14 +22,12 @@ /* * A connection reference is used to register that a connection has been used - * to read or modify a shard placement as a particular user. + * to read or modify either a) a shard placement as a particular user b) a + * group of colocated placements (which depend on whether the reference is + * from ConnectionPlacementHashEntry or ColocatedPlacementHashEntry). */ typedef struct ConnectionReference { - /* identity information about the connection */ - uint64 shardId; - uint64 placementId; - /* * The user used to read/modify the placement. We cannot reuse connections * that were performed using a different role, since it would not have the @@ -48,7 +47,10 @@ typedef struct ConnectionReference bool hadDML; bool hadDDL; - /* membership in ConnectionPlacementHashKey->connectionReferences */ + /* + * Membership in ConnectionPlacementHashEntry->connectionReferences or + * ColocatedPlacementsHashEntry->connectionReferences. + */ dlist_node placementNode; /* membership in MultiConnection->referencedPlacements */ @@ -97,6 +99,54 @@ typedef struct ConnectionPlacementHashEntry static HTAB *ConnectionPlacementHash; +/* + * A hash-table mapping colocated placements to connections. Colocated + * placements being the set of placements on a single node that represent the + * same value range. This is needed because connections for colocated + * placements (i.e. the corresponding placements for different colocated + * distributed tables) need to share connections. Otherwise things like + * foreign keys can very easily lead to unprincipled deadlocks. This means + * that there can only one DML/DDL connection for a set of colocated + * placements. + * + * A set of colocated placements is identified, besides node identifying + * information, by the associated colocation group id and the placement's + * 'representativeValue' which currently is the lower boundary of it's + * hash-range. + * + * Note that this hash-table only contains entries for hash-partitioned + * tables, because others so far don't support colocation. + */ + +/* hash key */ +typedef struct ColocatedPlacementsHashKey +{ + /* to identify host - database can't differ */ + char nodeName[MAX_NODE_LENGTH]; + uint32 nodePort; + + /* colocation group, or invalid */ + uint32 colocationGroupId; + + /* to represent the value range */ + uint32 representativeValue; +} ColocatedPlacementsHashKey; + +/* hash entry */ +typedef struct ColocatedPlacementsHashEntry +{ + ColocatedPlacementsHashKey key; + + /* list of connections to remote nodes */ + dlist_head connectionReferences; + + /* if non-NULL, connection executing DML/DDL*/ + ConnectionReference *modifyingConnection; +} ColocatedPlacementsHashEntry; + +static HTAB *ColocatedPlacementsHash; + + /* * Hash table mapping shard ids to placements. * @@ -122,15 +172,25 @@ typedef struct ConnectionShardHashEntry static HTAB *ConnectionShardHash; -static ConnectionReference * CheckExistingConnections(uint32 flags, const char *userName, - ConnectionPlacementHashEntry * - placementEntry); +static MultiConnection * StartColocatedPlacementConnection(uint32 flags, + ShardPlacement *placement, + const char *userName); +static ConnectionReference * CheckExistingPlacementConnections(uint32 flags, + ConnectionPlacementHashEntry + *placementEntry, + const char *userName); +static ConnectionReference * CheckExistingColocatedConnections(uint32 flags, + ColocatedPlacementsHashEntry + *connectionEntry, + const char *userName); static bool CanUseExistingConnection(uint32 flags, const char *userName, ConnectionReference *connectionReference); static void AssociatePlacementWithShard(ConnectionPlacementHashEntry *placementEntry, ShardPlacement *placement); static bool CheckShardPlacements(ConnectionShardHashEntry *shardEntry, bool preCommit, bool using2PC); +static uint32 ColocatedPlacementsHashHash(const void *key, Size keysize); +static int ColocatedPlacementsHashCompare(const void *a, const void *b, Size keysize); /* @@ -163,8 +223,9 @@ GetPlacementConnection(uint32 flags, ShardPlacement *placement, const char *user * - FOR_DDL - signal that connection is going to be used for DDL * * Only one connection associated with the placement may have FOR_DML or - * FOR_DDL set. This restriction prevents deadlocks and wrong results due to - * in-progress transactions. + * FOR_DDL set. For hash-partitioned tables only one connection for a set of + * colocated placements may have FOR_DML/DDL set. This restriction prevents + * deadlocks and wrong results due to in-progress transactions. */ MultiConnection * StartPlacementConnection(uint32 flags, ShardPlacement *placement, const char *userName) @@ -195,17 +256,21 @@ StartPlacementConnection(uint32 flags, ShardPlacement *placement, const char *us * Check whether any of the connections already associated with the * placement can be reused, or violates FOR_DML/FOR_DDL constraints. */ - returnConnectionReference = CheckExistingConnections(flags, userName, placementEntry); + returnConnectionReference = + CheckExistingPlacementConnections(flags, placementEntry, userName); /* - * Either no caching desired, or no connection present. Start connection - * establishment. Allocations are performed in transaction context, so we - * don't have to care about freeing in case of an early disconnect. + * Either no caching desired, or no connection already associated with the + * placement present. Check whether there's a usable connection associated + * for the set of colocated placements, or establish a new one. + * + * Allocations are performed in transaction context, so we don't have to + * care about freeing in case of an early disconnect. */ if (returnConnectionReference == NULL) { - MultiConnection *connection = StartNodeConnection(flags, placement->nodeName, - placement->nodePort); + MultiConnection *connection = + StartColocatedPlacementConnection(flags, placement, userName); returnConnectionReference = (ConnectionReference *) MemoryContextAlloc(TopTransactionContext, @@ -213,8 +278,6 @@ StartPlacementConnection(uint32 flags, ShardPlacement *placement, const char *us returnConnectionReference->connection = connection; returnConnectionReference->hadDDL = false; returnConnectionReference->hadDML = false; - returnConnectionReference->shardId = placement->shardId; - returnConnectionReference->placementId = placement->placementId; returnConnectionReference->userName = MemoryContextStrdup(TopTransactionContext, userName); dlist_push_tail(&placementEntry->connectionReferences, @@ -249,8 +312,8 @@ StartPlacementConnection(uint32 flags, ShardPlacement *placement, const char *us /* - * CheckExistingConnections check whether any of the existing connections is - * usable. If so, return it, otherwise return NULL. + * CheckExistingPlacementConnections check whether any of the existing + * connections is usable. If so, return it, otherwise return NULL. * * A connection is usable if it is not in use, the user matches, DDL/DML usage * match and cached connection are allowed. If no existing connection @@ -259,8 +322,9 @@ StartPlacementConnection(uint32 flags, ShardPlacement *placement, const char *us * DML/DML) an error is thrown. */ static ConnectionReference * -CheckExistingConnections(uint32 flags, const char *userName, - ConnectionPlacementHashEntry *placementEntry) +CheckExistingPlacementConnections(uint32 flags, + ConnectionPlacementHashEntry *placementEntry, + const char *userName) { dlist_iter it; @@ -346,6 +410,95 @@ CheckExistingConnections(uint32 flags, const char *userName, } +static ConnectionReference * +CheckExistingColocatedConnections(uint32 flags, + ColocatedPlacementsHashEntry *connectionsEntry, + const char *userName) +{ + dlist_iter it; + + /* + * If there's a connection that has executed DML/DDL, always return it if + * possible. That's because we only can execute DML/DDL over that + * connection. Checking this first also allows us to detect conflicts due + * to opening a second connection that wants to execute DML/DDL. + */ + if (connectionsEntry->modifyingConnection) + { + ConnectionReference *connectionReference = connectionsEntry->modifyingConnection; + + if (CanUseExistingConnection(flags, userName, connectionReference)) + { + return connectionReference; + } + + if (connectionReference->hadDDL) + { + /* would deadlock otherwise */ + ereport(ERROR, + (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("cannot establish new placement connection when DDL has been " + "executed on existing placement connection for a colocated placement"))); + } + else if (connectionReference->hadDML) + { + /* we'd not see the other connection's contents */ + ereport(ERROR, + (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("cannot establish new placement connection when DML has been " + "executed on existing placement connection for a colocated placement"))); + } + else + { + /* modifying xact should have executed DML/DDL */ + Assert(false); + } + } + + /* + * Search existing connections for a reusable connection. + */ + dlist_foreach(it, &connectionsEntry->connectionReferences) + { + ConnectionReference *connectionReference = + dlist_container(ConnectionReference, placementNode, it.cur); + + if (CanUseExistingConnection(flags, userName, connectionReference)) + { + return connectionReference; + } + + /* + * If not using the connection, verify that FOR_DML/DDL flags are + * compatible. + */ + if (flags & FOR_DDL) + { + /* would deadlock otherwise */ + ereport(ERROR, + (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("cannot establish new placement connection for DDL when " + "other connections exist for colocated placements"))); + } + else if (flags & FOR_DML) + { + /* + * We could allow this case (as presumably the SELECT is done, but + * it'd be a bit prone to programming errors, so we disallow for + * now. + */ + ereport(ERROR, + (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("cannot establish new placement connection for DML when " + "other connections exist for colocated placements"))); + } + } + + /* establish a new connection */ + return NULL; +} + + /* * CanUseExistingConnection is a helper function for CheckExistingConnections() * that checks whether an existing connection can be reused. @@ -383,6 +536,86 @@ CanUseExistingConnection(uint32 flags, const char *userName, } +/* + * StartColocatedPlacementConnection returns a connection that's usable for + * the passed in flags/placement. + * + * It does so by first checking whether any connection is already associated + * with the relevant entry in the colocated placement hash, or by asking for a + * formerly unassociated (possibly new) connection. Errors out if there's + * conflicting connections. + */ +static MultiConnection * +StartColocatedPlacementConnection(uint32 flags, ShardPlacement *placement, + const char *userName) +{ + ConnectionReference *connectionReference = NULL; + ColocatedPlacementsHashKey key; + ColocatedPlacementsHashEntry *entry; + bool found; + + /* + * Currently only hash partitioned tables can be colocated. For everything + * else we just return a new connection. + */ + if (placement->partitionMethod != DISTRIBUTE_BY_HASH) + { + return StartNodeConnection(flags, placement->nodeName, placement->nodePort); + } + + strcpy(key.nodeName, placement->nodeName); + key.nodePort = placement->nodePort; + key.colocationGroupId = placement->colocationGroupId; + key.representativeValue = placement->representativeValue; + + entry = hash_search(ColocatedPlacementsHash, &key, HASH_ENTER, &found); + if (!found) + { + memset(((char *) entry) + sizeof(ColocatedPlacementsHashKey), + 0, + sizeof(ColocatedPlacementsHashEntry) - sizeof(ColocatedPlacementsHashKey)); + dlist_init(&entry->connectionReferences); + } + + connectionReference = + CheckExistingColocatedConnections(flags, entry, userName); + + if (!connectionReference) + { + MultiConnection *connection = NULL; + + connection = StartNodeConnection(flags, placement->nodeName, placement->nodePort); + connectionReference = (ConnectionReference *) + MemoryContextAlloc(TopTransactionContext, + sizeof(ConnectionReference)); + connectionReference->connection = connection; + connectionReference->hadDDL = false; + connectionReference->hadDML = false; + connectionReference->userName = + MemoryContextStrdup(TopTransactionContext, userName); + dlist_push_tail(&entry->connectionReferences, + &connectionReference->placementNode); + + /* record association with connection, to handle connection closure */ + dlist_push_tail(&connection->referencedPlacements, + &connectionReference->connectionNode); + } + + if (flags & FOR_DDL) + { + entry->modifyingConnection = connectionReference; + connectionReference->hadDDL = true; + } + if (flags & FOR_DML) + { + entry->modifyingConnection = connectionReference; + connectionReference->hadDML = true; + } + + return connectionReference->connection; +} + + /* * AssociatePlacementWithShard records shard->placement relation in * ConnectionShardHash. @@ -480,6 +713,7 @@ ResetPlacementConnectionManagement(void) /* Simply delete all entries */ hash_delete_all(ConnectionPlacementHash); hash_delete_all(ConnectionShardHash); + hash_delete_all(ColocatedPlacementsHash); /* * NB: memory for ConnectionReference structs and subordinate data is @@ -632,6 +866,18 @@ InitPlacementConnectionManagement(void) ConnectionPlacementHash = hash_create("citus connection cache (placementid)", 64, &info, hashFlags); + /* create (colocated placement identity) -> [ConnectionReference] hash */ + memset(&info, 0, sizeof(info)); + info.keysize = sizeof(ColocatedPlacementsHashKey); + info.entrysize = sizeof(ColocatedPlacementsHashEntry); + info.hash = ColocatedPlacementsHashHash; + info.match = ColocatedPlacementsHashCompare; + info.hcxt = ConnectionContext; + hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT | HASH_COMPARE); + + ColocatedPlacementsHash = hash_create("citus connection cache (colocated placements)", + 64, &info, hashFlags); + /* create (shardId) -> [ConnectionShardHashEntry] hash */ memset(&info, 0, sizeof(info)); info.keysize = sizeof(ConnectionShardHashKey); @@ -643,3 +889,38 @@ InitPlacementConnectionManagement(void) ConnectionShardHash = hash_create("citus connection cache (shardid)", 64, &info, hashFlags); } + + +static uint32 +ColocatedPlacementsHashHash(const void *key, Size keysize) +{ + ColocatedPlacementsHashKey *entry = (ColocatedPlacementsHashKey *) key; + uint32 hash = 0; + + hash = string_hash(entry->nodeName, NAMEDATALEN); + hash = hash_combine(hash, hash_uint32(entry->nodePort)); + hash = hash_combine(hash, hash_uint32(entry->colocationGroupId)); + hash = hash_combine(hash, hash_uint32(entry->representativeValue)); + + return hash; +} + + +static int +ColocatedPlacementsHashCompare(const void *a, const void *b, Size keysize) +{ + ColocatedPlacementsHashKey *ca = (ColocatedPlacementsHashKey *) a; + ColocatedPlacementsHashKey *cb = (ColocatedPlacementsHashKey *) b; + + if (strncmp(ca->nodeName, cb->nodeName, MAX_NODE_LENGTH) != 0 || + ca->nodePort != cb->nodePort || + ca->colocationGroupId != cb->colocationGroupId || + ca->representativeValue != cb->representativeValue) + { + return 1; + } + else + { + return 0; + } +} diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 06fb2ef72..ecb4150ba 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -473,6 +473,10 @@ OutShardPlacement(OUTFUNC_ARGS) WRITE_ENUM_FIELD(shardState, RelayFileState); WRITE_STRING_FIELD(nodeName); WRITE_UINT_FIELD(nodePort); + /* so we can deal with 0 */ + WRITE_INT_FIELD(partitionMethod); + WRITE_UINT_FIELD(colocationGroupId); + WRITE_UINT_FIELD(representativeValue); } diff --git a/src/backend/distributed/utils/citus_readfuncs.c b/src/backend/distributed/utils/citus_readfuncs.c index e7dc31e60..c430df0cb 100644 --- a/src/backend/distributed/utils/citus_readfuncs.c +++ b/src/backend/distributed/utils/citus_readfuncs.c @@ -268,6 +268,10 @@ ReadShardPlacement(READFUNC_ARGS) READ_ENUM_FIELD(shardState, RelayFileState); READ_STRING_FIELD(nodeName); READ_UINT_FIELD(nodePort); + /* so we can deal with 0 */ + READ_INT_FIELD(partitionMethod); + READ_UINT_FIELD(colocationGroupId); + READ_UINT_FIELD(representativeValue); READ_DONE(); } diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 98623fc5d..21ff34447 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -703,9 +703,31 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry) foreach(placementCell, placementList) { ShardPlacement *srcPlacement = (ShardPlacement *) lfirst(placementCell); + ShardPlacement *dstPlacement = &placementArray[placementOffset]; - CopyShardPlacement(srcPlacement, &placementArray[placementOffset]); + CopyShardPlacement(srcPlacement, dstPlacement); + /* fill in remaining fields */ + Assert(cacheEntry->partitionMethod != 0); + dstPlacement->partitionMethod = cacheEntry->partitionMethod; + dstPlacement->colocationGroupId = cacheEntry->colocationId; + if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH) + { + Assert(shardInterval->minValueExists); + Assert(shardInterval->valueTypeId == INT4OID); + + /* + * Use the lower boundary of the interval's range to identify + * it for colocation purposes. That remains meaningful even if + * a concurrent session splits a shard. + */ + dstPlacement->representativeValue = + DatumGetInt32(shardInterval->minValue); + } + else + { + dstPlacement->representativeValue = 0; + } placementOffset++; } MemoryContextSwitchTo(oldContext); diff --git a/src/include/distributed/master_metadata_utility.h b/src/include/distributed/master_metadata_utility.h index 696016f78..8198e677c 100644 --- a/src/include/distributed/master_metadata_utility.h +++ b/src/include/distributed/master_metadata_utility.h @@ -53,6 +53,9 @@ typedef struct ShardPlacement RelayFileState shardState; char *nodeName; uint32 nodePort; + char partitionMethod; + uint32 colocationGroupId; + uint32 representativeValue; } ShardPlacement;