mirror of https://github.com/citusdata/citus.git
Make placement_connection.c colocation aware.
Because of foreign keys and similar concerns there should only be a single modifying/DDL connection for a set of colocated placements to a node. To enforce placement_connection.c now has an additional hash-table keeping track of the connections to a set of colocated placements. In addition to enforcing per placement restrictions on connections, there's now very similar restrictions for sets of colocated placements.pull/1120/head
parent
6972186652
commit
b9385700ee
|
@ -11,6 +11,7 @@
|
|||
|
||||
#include "postgres.h"
|
||||
|
||||
#include "access/hash.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/hash_helpers.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
|
@ -21,7 +22,9 @@
|
|||
|
||||
/*
|
||||
* A connection reference is used to register that a connection has been used
|
||||
* to read or modify a shard placement as a particular user.
|
||||
* to read or modify either a) a shard placement as a particular user b) a
|
||||
* group of colocated placements (which depend on whether the reference is
|
||||
* from ConnectionPlacementHashEntry or ColocatedPlacementHashEntry).
|
||||
*/
|
||||
typedef struct ConnectionReference
|
||||
{
|
||||
|
@ -44,7 +47,10 @@ typedef struct ConnectionReference
|
|||
bool hadDML;
|
||||
bool hadDDL;
|
||||
|
||||
/* membership in ConnectionPlacementHashKey->connectionReferences */
|
||||
/*
|
||||
* Membership in ConnectionPlacementHashEntry->connectionReferences or
|
||||
* ColocatedPlacementsHashEntry->connectionReferences.
|
||||
*/
|
||||
dlist_node placementNode;
|
||||
|
||||
/* membership in MultiConnection->referencedPlacements */
|
||||
|
@ -93,6 +99,54 @@ typedef struct ConnectionPlacementHashEntry
|
|||
static HTAB *ConnectionPlacementHash;
|
||||
|
||||
|
||||
/*
|
||||
* A hash-table mapping colocated placements to connections. Colocated
|
||||
* placements being the set of placements on a single node that represent the
|
||||
* same value range. This is needed because connections for colocated
|
||||
* placements (i.e. the corresponding placements for different colocated
|
||||
* distributed tables) need to share connections. Otherwise things like
|
||||
* foreign keys can very easily lead to unprincipled deadlocks. This means
|
||||
* that there can only one DML/DDL connection for a set of colocated
|
||||
* placements.
|
||||
*
|
||||
* A set of colocated placements is identified, besides node identifying
|
||||
* information, by the associated colocation group id and the placement's
|
||||
* 'representativeValue' which currently is the lower boundary of it's
|
||||
* hash-range.
|
||||
*
|
||||
* Note that this hash-table only contains entries for hash-partitioned
|
||||
* tables, because others so far don't support colocation.
|
||||
*/
|
||||
|
||||
/* hash key */
|
||||
typedef struct ColocatedPlacementsHashKey
|
||||
{
|
||||
/* to identify host - database can't differ */
|
||||
char nodeName[MAX_NODE_LENGTH];
|
||||
uint32 nodePort;
|
||||
|
||||
/* colocation group, or invalid */
|
||||
uint32 colocationGroupId;
|
||||
|
||||
/* to represent the value range */
|
||||
uint32 representativeValue;
|
||||
} ColocatedPlacementsHashKey;
|
||||
|
||||
/* hash entry */
|
||||
typedef struct ColocatedPlacementsHashEntry
|
||||
{
|
||||
ColocatedPlacementsHashKey key;
|
||||
|
||||
/* list of connections to remote nodes */
|
||||
dlist_head connectionReferences;
|
||||
|
||||
/* if non-NULL, connection executing DML/DDL*/
|
||||
ConnectionReference *modifyingConnection;
|
||||
} ColocatedPlacementsHashEntry;
|
||||
|
||||
static HTAB *ColocatedPlacementsHash;
|
||||
|
||||
|
||||
/*
|
||||
* Hash table mapping shard ids to placements.
|
||||
*
|
||||
|
@ -118,15 +172,25 @@ typedef struct ConnectionShardHashEntry
|
|||
static HTAB *ConnectionShardHash;
|
||||
|
||||
|
||||
static ConnectionReference * CheckExistingConnections(uint32 flags, const char *userName,
|
||||
ConnectionPlacementHashEntry *
|
||||
placementEntry);
|
||||
static MultiConnection * StartColocatedPlacementConnection(uint32 flags,
|
||||
ShardPlacement *placement,
|
||||
const char *userName);
|
||||
static ConnectionReference * CheckExistingPlacementConnections(uint32 flags,
|
||||
ConnectionPlacementHashEntry
|
||||
*placementEntry,
|
||||
const char *userName);
|
||||
static ConnectionReference * CheckExistingColocatedConnections(uint32 flags,
|
||||
ColocatedPlacementsHashEntry
|
||||
*connectionEntry,
|
||||
const char *userName);
|
||||
static bool CanUseExistingConnection(uint32 flags, const char *userName,
|
||||
ConnectionReference *connectionReference);
|
||||
static void AssociatePlacementWithShard(ConnectionPlacementHashEntry *placementEntry,
|
||||
ShardPlacement *placement);
|
||||
static bool CheckShardPlacements(ConnectionShardHashEntry *shardEntry, bool preCommit,
|
||||
bool using2PC);
|
||||
static uint32 ColocatedPlacementsHashHash(const void *key, Size keysize);
|
||||
static int ColocatedPlacementsHashCompare(const void *a, const void *b, Size keysize);
|
||||
|
||||
|
||||
/*
|
||||
|
@ -159,8 +223,9 @@ GetPlacementConnection(uint32 flags, ShardPlacement *placement, const char *user
|
|||
* - FOR_DDL - signal that connection is going to be used for DDL
|
||||
*
|
||||
* Only one connection associated with the placement may have FOR_DML or
|
||||
* FOR_DDL set. This restriction prevents deadlocks and wrong results due to
|
||||
* in-progress transactions.
|
||||
* FOR_DDL set. For hash-partitioned tables only one connection for a set of
|
||||
* colocated placements may have FOR_DML/DDL set. This restriction prevents
|
||||
* deadlocks and wrong results due to in-progress transactions.
|
||||
*/
|
||||
MultiConnection *
|
||||
StartPlacementConnection(uint32 flags, ShardPlacement *placement, const char *userName)
|
||||
|
@ -191,17 +256,21 @@ StartPlacementConnection(uint32 flags, ShardPlacement *placement, const char *us
|
|||
* Check whether any of the connections already associated with the
|
||||
* placement can be reused, or violates FOR_DML/FOR_DDL constraints.
|
||||
*/
|
||||
returnConnectionReference = CheckExistingConnections(flags, userName, placementEntry);
|
||||
returnConnectionReference =
|
||||
CheckExistingPlacementConnections(flags, placementEntry, userName);
|
||||
|
||||
/*
|
||||
* Either no caching desired, or no connection present. Start connection
|
||||
* establishment. Allocations are performed in transaction context, so we
|
||||
* don't have to care about freeing in case of an early disconnect.
|
||||
* Either no caching desired, or no connection already associated with the
|
||||
* placement present. Check whether there's a usable connection associated
|
||||
* for the set of colocated placements, or establish a new one.
|
||||
*
|
||||
* Allocations are performed in transaction context, so we don't have to
|
||||
* care about freeing in case of an early disconnect.
|
||||
*/
|
||||
if (returnConnectionReference == NULL)
|
||||
{
|
||||
MultiConnection *connection = StartNodeConnection(flags, placement->nodeName,
|
||||
placement->nodePort);
|
||||
MultiConnection *connection =
|
||||
StartColocatedPlacementConnection(flags, placement, userName);
|
||||
|
||||
returnConnectionReference = (ConnectionReference *)
|
||||
MemoryContextAlloc(TopTransactionContext,
|
||||
|
@ -243,8 +312,8 @@ StartPlacementConnection(uint32 flags, ShardPlacement *placement, const char *us
|
|||
|
||||
|
||||
/*
|
||||
* CheckExistingConnections check whether any of the existing connections is
|
||||
* usable. If so, return it, otherwise return NULL.
|
||||
* CheckExistingPlacementConnections check whether any of the existing
|
||||
* connections is usable. If so, return it, otherwise return NULL.
|
||||
*
|
||||
* A connection is usable if it is not in use, the user matches, DDL/DML usage
|
||||
* match and cached connection are allowed. If no existing connection
|
||||
|
@ -253,8 +322,9 @@ StartPlacementConnection(uint32 flags, ShardPlacement *placement, const char *us
|
|||
* DML/DML) an error is thrown.
|
||||
*/
|
||||
static ConnectionReference *
|
||||
CheckExistingConnections(uint32 flags, const char *userName,
|
||||
ConnectionPlacementHashEntry *placementEntry)
|
||||
CheckExistingPlacementConnections(uint32 flags,
|
||||
ConnectionPlacementHashEntry *placementEntry,
|
||||
const char *userName)
|
||||
{
|
||||
dlist_iter it;
|
||||
|
||||
|
@ -340,6 +410,95 @@ CheckExistingConnections(uint32 flags, const char *userName,
|
|||
}
|
||||
|
||||
|
||||
static ConnectionReference *
|
||||
CheckExistingColocatedConnections(uint32 flags,
|
||||
ColocatedPlacementsHashEntry *connectionsEntry,
|
||||
const char *userName)
|
||||
{
|
||||
dlist_iter it;
|
||||
|
||||
/*
|
||||
* If there's a connection that has executed DML/DDL, always return it if
|
||||
* possible. That's because we only can execute DML/DDL over that
|
||||
* connection. Checking this first also allows us to detect conflicts due
|
||||
* to opening a second connection that wants to execute DML/DDL.
|
||||
*/
|
||||
if (connectionsEntry->modifyingConnection)
|
||||
{
|
||||
ConnectionReference *connectionReference = connectionsEntry->modifyingConnection;
|
||||
|
||||
if (CanUseExistingConnection(flags, userName, connectionReference))
|
||||
{
|
||||
return connectionReference;
|
||||
}
|
||||
|
||||
if (connectionReference->hadDDL)
|
||||
{
|
||||
/* would deadlock otherwise */
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||
errmsg("cannot establish new placement connection when DDL has been "
|
||||
"executed on existing placement connection for a colocated placement")));
|
||||
}
|
||||
else if (connectionReference->hadDML)
|
||||
{
|
||||
/* we'd not see the other connection's contents */
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||
errmsg("cannot establish new placement connection when DML has been "
|
||||
"executed on existing placement connection for a colocated placement")));
|
||||
}
|
||||
else
|
||||
{
|
||||
/* modifying xact should have executed DML/DDL */
|
||||
Assert(false);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Search existing connections for a reusable connection.
|
||||
*/
|
||||
dlist_foreach(it, &connectionsEntry->connectionReferences)
|
||||
{
|
||||
ConnectionReference *connectionReference =
|
||||
dlist_container(ConnectionReference, placementNode, it.cur);
|
||||
|
||||
if (CanUseExistingConnection(flags, userName, connectionReference))
|
||||
{
|
||||
return connectionReference;
|
||||
}
|
||||
|
||||
/*
|
||||
* If not using the connection, verify that FOR_DML/DDL flags are
|
||||
* compatible.
|
||||
*/
|
||||
if (flags & FOR_DDL)
|
||||
{
|
||||
/* would deadlock otherwise */
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||
errmsg("cannot establish new placement connection for DDL when "
|
||||
"other connections exist for colocated placements")));
|
||||
}
|
||||
else if (flags & FOR_DML)
|
||||
{
|
||||
/*
|
||||
* We could allow this case (as presumably the SELECT is done, but
|
||||
* it'd be a bit prone to programming errors, so we disallow for
|
||||
* now.
|
||||
*/
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||
errmsg("cannot establish new placement connection for DML when "
|
||||
"other connections exist for colocated placements")));
|
||||
}
|
||||
}
|
||||
|
||||
/* establish a new connection */
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CanUseExistingConnection is a helper function for CheckExistingConnections()
|
||||
* that checks whether an existing connection can be reused.
|
||||
|
@ -377,6 +536,86 @@ CanUseExistingConnection(uint32 flags, const char *userName,
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* StartColocatedPlacementConnection returns a connection that's usable for
|
||||
* the passed in flags/placement.
|
||||
*
|
||||
* It does so by first checking whether any connection is already associated
|
||||
* with the relevant entry in the colocated placement hash, or by asking for a
|
||||
* formerly unassociated (possibly new) connection. Errors out if there's
|
||||
* conflicting connections.
|
||||
*/
|
||||
static MultiConnection *
|
||||
StartColocatedPlacementConnection(uint32 flags, ShardPlacement *placement,
|
||||
const char *userName)
|
||||
{
|
||||
ConnectionReference *connectionReference = NULL;
|
||||
ColocatedPlacementsHashKey key;
|
||||
ColocatedPlacementsHashEntry *entry;
|
||||
bool found;
|
||||
|
||||
/*
|
||||
* Currently only hash partitioned tables can be colocated. For everything
|
||||
* else we just return a new connection.
|
||||
*/
|
||||
if (placement->partitionMethod != DISTRIBUTE_BY_HASH)
|
||||
{
|
||||
return StartNodeConnection(flags, placement->nodeName, placement->nodePort);
|
||||
}
|
||||
|
||||
strcpy(key.nodeName, placement->nodeName);
|
||||
key.nodePort = placement->nodePort;
|
||||
key.colocationGroupId = placement->colocationGroupId;
|
||||
key.representativeValue = placement->representativeValue;
|
||||
|
||||
entry = hash_search(ColocatedPlacementsHash, &key, HASH_ENTER, &found);
|
||||
if (!found)
|
||||
{
|
||||
memset(((char *) entry) + sizeof(ColocatedPlacementsHashKey),
|
||||
0,
|
||||
sizeof(ColocatedPlacementsHashEntry) - sizeof(ColocatedPlacementsHashKey));
|
||||
dlist_init(&entry->connectionReferences);
|
||||
}
|
||||
|
||||
connectionReference =
|
||||
CheckExistingColocatedConnections(flags, entry, userName);
|
||||
|
||||
if (!connectionReference)
|
||||
{
|
||||
MultiConnection *connection = NULL;
|
||||
|
||||
connection = StartNodeConnection(flags, placement->nodeName, placement->nodePort);
|
||||
connectionReference = (ConnectionReference *)
|
||||
MemoryContextAlloc(TopTransactionContext,
|
||||
sizeof(ConnectionReference));
|
||||
connectionReference->connection = connection;
|
||||
connectionReference->hadDDL = false;
|
||||
connectionReference->hadDML = false;
|
||||
connectionReference->userName =
|
||||
MemoryContextStrdup(TopTransactionContext, userName);
|
||||
dlist_push_tail(&entry->connectionReferences,
|
||||
&connectionReference->placementNode);
|
||||
|
||||
/* record association with connection, to handle connection closure */
|
||||
dlist_push_tail(&connection->referencedPlacements,
|
||||
&connectionReference->connectionNode);
|
||||
}
|
||||
|
||||
if (flags & FOR_DDL)
|
||||
{
|
||||
entry->modifyingConnection = connectionReference;
|
||||
connectionReference->hadDDL = true;
|
||||
}
|
||||
if (flags & FOR_DML)
|
||||
{
|
||||
entry->modifyingConnection = connectionReference;
|
||||
connectionReference->hadDML = true;
|
||||
}
|
||||
|
||||
return connectionReference->connection;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* AssociatePlacementWithShard records shard->placement relation in
|
||||
* ConnectionShardHash.
|
||||
|
@ -474,6 +713,7 @@ ResetPlacementConnectionManagement(void)
|
|||
/* Simply delete all entries */
|
||||
hash_delete_all(ConnectionPlacementHash);
|
||||
hash_delete_all(ConnectionShardHash);
|
||||
hash_delete_all(ColocatedPlacementsHash);
|
||||
|
||||
/*
|
||||
* NB: memory for ConnectionReference structs and subordinate data is
|
||||
|
@ -626,6 +866,18 @@ InitPlacementConnectionManagement(void)
|
|||
ConnectionPlacementHash = hash_create("citus connection cache (placementid)",
|
||||
64, &info, hashFlags);
|
||||
|
||||
/* create (colocated placement identity) -> [ConnectionReference] hash */
|
||||
memset(&info, 0, sizeof(info));
|
||||
info.keysize = sizeof(ColocatedPlacementsHashKey);
|
||||
info.entrysize = sizeof(ColocatedPlacementsHashEntry);
|
||||
info.hash = ColocatedPlacementsHashHash;
|
||||
info.match = ColocatedPlacementsHashCompare;
|
||||
info.hcxt = ConnectionContext;
|
||||
hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT | HASH_COMPARE);
|
||||
|
||||
ColocatedPlacementsHash = hash_create("citus connection cache (colocated placements)",
|
||||
64, &info, hashFlags);
|
||||
|
||||
/* create (shardId) -> [ConnectionShardHashEntry] hash */
|
||||
memset(&info, 0, sizeof(info));
|
||||
info.keysize = sizeof(ConnectionShardHashKey);
|
||||
|
@ -637,3 +889,38 @@ InitPlacementConnectionManagement(void)
|
|||
ConnectionShardHash = hash_create("citus connection cache (shardid)",
|
||||
64, &info, hashFlags);
|
||||
}
|
||||
|
||||
|
||||
static uint32
|
||||
ColocatedPlacementsHashHash(const void *key, Size keysize)
|
||||
{
|
||||
ColocatedPlacementsHashKey *entry = (ColocatedPlacementsHashKey *) key;
|
||||
uint32 hash = 0;
|
||||
|
||||
hash = string_hash(entry->nodeName, NAMEDATALEN);
|
||||
hash = hash_combine(hash, hash_uint32(entry->nodePort));
|
||||
hash = hash_combine(hash, hash_uint32(entry->colocationGroupId));
|
||||
hash = hash_combine(hash, hash_uint32(entry->representativeValue));
|
||||
|
||||
return hash;
|
||||
}
|
||||
|
||||
|
||||
static int
|
||||
ColocatedPlacementsHashCompare(const void *a, const void *b, Size keysize)
|
||||
{
|
||||
ColocatedPlacementsHashKey *ca = (ColocatedPlacementsHashKey *) a;
|
||||
ColocatedPlacementsHashKey *cb = (ColocatedPlacementsHashKey *) b;
|
||||
|
||||
if (strncmp(ca->nodeName, cb->nodeName, MAX_NODE_LENGTH) != 0 ||
|
||||
ca->nodePort != cb->nodePort ||
|
||||
ca->colocationGroupId != cb->colocationGroupId ||
|
||||
ca->representativeValue != cb->representativeValue)
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue