Merge pull request #1120 from citusdata/feature/colocation_mapping

Colocation aware placement connections
pull/1125/head
Andres Freund 2017-01-16 13:54:59 -08:00 committed by GitHub
commit 3ea73b7879
6 changed files with 339 additions and 25 deletions

View File

@ -596,7 +596,7 @@ ConnectionHashCompare(const void *a, const void *b, Size keysize)
ConnectionHashKey *ca = (ConnectionHashKey *) a;
ConnectionHashKey *cb = (ConnectionHashKey *) b;
if (strncmp(ca->hostname, cb->hostname, NAMEDATALEN) != 0 ||
if (strncmp(ca->hostname, cb->hostname, MAX_NODE_LENGTH) != 0 ||
ca->port != cb->port ||
strncmp(ca->user, cb->user, NAMEDATALEN) != 0 ||
strncmp(ca->database, cb->database, NAMEDATALEN) != 0)

View File

@ -11,6 +11,7 @@
#include "postgres.h"
#include "access/hash.h"
#include "distributed/connection_management.h"
#include "distributed/hash_helpers.h"
#include "distributed/metadata_cache.h"
@ -21,14 +22,12 @@
/*
* A connection reference is used to register that a connection has been used
* to read or modify a shard placement as a particular user.
* to read or modify either a) a shard placement as a particular user b) a
* group of colocated placements (which depend on whether the reference is
* from ConnectionPlacementHashEntry or ColocatedPlacementHashEntry).
*/
typedef struct ConnectionReference
{
/* identity information about the connection */
uint64 shardId;
uint64 placementId;
/*
* The user used to read/modify the placement. We cannot reuse connections
* that were performed using a different role, since it would not have the
@ -48,7 +47,10 @@ typedef struct ConnectionReference
bool hadDML;
bool hadDDL;
/* membership in ConnectionPlacementHashKey->connectionReferences */
/*
* Membership in ConnectionPlacementHashEntry->connectionReferences or
* ColocatedPlacementsHashEntry->connectionReferences.
*/
dlist_node placementNode;
/* membership in MultiConnection->referencedPlacements */
@ -97,6 +99,54 @@ typedef struct ConnectionPlacementHashEntry
static HTAB *ConnectionPlacementHash;
/*
* A hash-table mapping colocated placements to connections. Colocated
* placements being the set of placements on a single node that represent the
* same value range. This is needed because connections for colocated
* placements (i.e. the corresponding placements for different colocated
* distributed tables) need to share connections. Otherwise things like
* foreign keys can very easily lead to unprincipled deadlocks. This means
* that there can only one DML/DDL connection for a set of colocated
* placements.
*
* A set of colocated placements is identified, besides node identifying
* information, by the associated colocation group id and the placement's
* 'representativeValue' which currently is the lower boundary of it's
* hash-range.
*
* Note that this hash-table only contains entries for hash-partitioned
* tables, because others so far don't support colocation.
*/
/* hash key */
typedef struct ColocatedPlacementsHashKey
{
/* to identify host - database can't differ */
char nodeName[MAX_NODE_LENGTH];
uint32 nodePort;
/* colocation group, or invalid */
uint32 colocationGroupId;
/* to represent the value range */
uint32 representativeValue;
} ColocatedPlacementsHashKey;
/* hash entry */
typedef struct ColocatedPlacementsHashEntry
{
ColocatedPlacementsHashKey key;
/* list of connections to remote nodes */
dlist_head connectionReferences;
/* if non-NULL, connection executing DML/DDL*/
ConnectionReference *modifyingConnection;
} ColocatedPlacementsHashEntry;
static HTAB *ColocatedPlacementsHash;
/*
* Hash table mapping shard ids to placements.
*
@ -122,15 +172,25 @@ typedef struct ConnectionShardHashEntry
static HTAB *ConnectionShardHash;
static ConnectionReference * CheckExistingConnections(uint32 flags, const char *userName,
ConnectionPlacementHashEntry *
placementEntry);
static MultiConnection * StartColocatedPlacementConnection(uint32 flags,
ShardPlacement *placement,
const char *userName);
static ConnectionReference * CheckExistingPlacementConnections(uint32 flags,
ConnectionPlacementHashEntry
*placementEntry,
const char *userName);
static ConnectionReference * CheckExistingColocatedConnections(uint32 flags,
ColocatedPlacementsHashEntry
*connectionEntry,
const char *userName);
static bool CanUseExistingConnection(uint32 flags, const char *userName,
ConnectionReference *connectionReference);
static void AssociatePlacementWithShard(ConnectionPlacementHashEntry *placementEntry,
ShardPlacement *placement);
static bool CheckShardPlacements(ConnectionShardHashEntry *shardEntry, bool preCommit,
bool using2PC);
static uint32 ColocatedPlacementsHashHash(const void *key, Size keysize);
static int ColocatedPlacementsHashCompare(const void *a, const void *b, Size keysize);
/*
@ -163,8 +223,9 @@ GetPlacementConnection(uint32 flags, ShardPlacement *placement, const char *user
* - FOR_DDL - signal that connection is going to be used for DDL
*
* Only one connection associated with the placement may have FOR_DML or
* FOR_DDL set. This restriction prevents deadlocks and wrong results due to
* in-progress transactions.
* FOR_DDL set. For hash-partitioned tables only one connection for a set of
* colocated placements may have FOR_DML/DDL set. This restriction prevents
* deadlocks and wrong results due to in-progress transactions.
*/
MultiConnection *
StartPlacementConnection(uint32 flags, ShardPlacement *placement, const char *userName)
@ -195,17 +256,21 @@ StartPlacementConnection(uint32 flags, ShardPlacement *placement, const char *us
* Check whether any of the connections already associated with the
* placement can be reused, or violates FOR_DML/FOR_DDL constraints.
*/
returnConnectionReference = CheckExistingConnections(flags, userName, placementEntry);
returnConnectionReference =
CheckExistingPlacementConnections(flags, placementEntry, userName);
/*
* Either no caching desired, or no connection present. Start connection
* establishment. Allocations are performed in transaction context, so we
* don't have to care about freeing in case of an early disconnect.
* Either no caching desired, or no connection already associated with the
* placement present. Check whether there's a usable connection associated
* for the set of colocated placements, or establish a new one.
*
* Allocations are performed in transaction context, so we don't have to
* care about freeing in case of an early disconnect.
*/
if (returnConnectionReference == NULL)
{
MultiConnection *connection = StartNodeConnection(flags, placement->nodeName,
placement->nodePort);
MultiConnection *connection =
StartColocatedPlacementConnection(flags, placement, userName);
returnConnectionReference = (ConnectionReference *)
MemoryContextAlloc(TopTransactionContext,
@ -213,8 +278,6 @@ StartPlacementConnection(uint32 flags, ShardPlacement *placement, const char *us
returnConnectionReference->connection = connection;
returnConnectionReference->hadDDL = false;
returnConnectionReference->hadDML = false;
returnConnectionReference->shardId = placement->shardId;
returnConnectionReference->placementId = placement->placementId;
returnConnectionReference->userName =
MemoryContextStrdup(TopTransactionContext, userName);
dlist_push_tail(&placementEntry->connectionReferences,
@ -249,8 +312,8 @@ StartPlacementConnection(uint32 flags, ShardPlacement *placement, const char *us
/*
* CheckExistingConnections check whether any of the existing connections is
* usable. If so, return it, otherwise return NULL.
* CheckExistingPlacementConnections check whether any of the existing
* connections is usable. If so, return it, otherwise return NULL.
*
* A connection is usable if it is not in use, the user matches, DDL/DML usage
* match and cached connection are allowed. If no existing connection
@ -259,8 +322,9 @@ StartPlacementConnection(uint32 flags, ShardPlacement *placement, const char *us
* DML/DML) an error is thrown.
*/
static ConnectionReference *
CheckExistingConnections(uint32 flags, const char *userName,
ConnectionPlacementHashEntry *placementEntry)
CheckExistingPlacementConnections(uint32 flags,
ConnectionPlacementHashEntry *placementEntry,
const char *userName)
{
dlist_iter it;
@ -346,6 +410,95 @@ CheckExistingConnections(uint32 flags, const char *userName,
}
static ConnectionReference *
CheckExistingColocatedConnections(uint32 flags,
ColocatedPlacementsHashEntry *connectionsEntry,
const char *userName)
{
dlist_iter it;
/*
* If there's a connection that has executed DML/DDL, always return it if
* possible. That's because we only can execute DML/DDL over that
* connection. Checking this first also allows us to detect conflicts due
* to opening a second connection that wants to execute DML/DDL.
*/
if (connectionsEntry->modifyingConnection)
{
ConnectionReference *connectionReference = connectionsEntry->modifyingConnection;
if (CanUseExistingConnection(flags, userName, connectionReference))
{
return connectionReference;
}
if (connectionReference->hadDDL)
{
/* would deadlock otherwise */
ereport(ERROR,
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("cannot establish new placement connection when DDL has been "
"executed on existing placement connection for a colocated placement")));
}
else if (connectionReference->hadDML)
{
/* we'd not see the other connection's contents */
ereport(ERROR,
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("cannot establish new placement connection when DML has been "
"executed on existing placement connection for a colocated placement")));
}
else
{
/* modifying xact should have executed DML/DDL */
Assert(false);
}
}
/*
* Search existing connections for a reusable connection.
*/
dlist_foreach(it, &connectionsEntry->connectionReferences)
{
ConnectionReference *connectionReference =
dlist_container(ConnectionReference, placementNode, it.cur);
if (CanUseExistingConnection(flags, userName, connectionReference))
{
return connectionReference;
}
/*
* If not using the connection, verify that FOR_DML/DDL flags are
* compatible.
*/
if (flags & FOR_DDL)
{
/* would deadlock otherwise */
ereport(ERROR,
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("cannot establish new placement connection for DDL when "
"other connections exist for colocated placements")));
}
else if (flags & FOR_DML)
{
/*
* We could allow this case (as presumably the SELECT is done, but
* it'd be a bit prone to programming errors, so we disallow for
* now.
*/
ereport(ERROR,
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("cannot establish new placement connection for DML when "
"other connections exist for colocated placements")));
}
}
/* establish a new connection */
return NULL;
}
/*
* CanUseExistingConnection is a helper function for CheckExistingConnections()
* that checks whether an existing connection can be reused.
@ -383,6 +536,86 @@ CanUseExistingConnection(uint32 flags, const char *userName,
}
/*
* StartColocatedPlacementConnection returns a connection that's usable for
* the passed in flags/placement.
*
* It does so by first checking whether any connection is already associated
* with the relevant entry in the colocated placement hash, or by asking for a
* formerly unassociated (possibly new) connection. Errors out if there's
* conflicting connections.
*/
static MultiConnection *
StartColocatedPlacementConnection(uint32 flags, ShardPlacement *placement,
const char *userName)
{
ConnectionReference *connectionReference = NULL;
ColocatedPlacementsHashKey key;
ColocatedPlacementsHashEntry *entry;
bool found;
/*
* Currently only hash partitioned tables can be colocated. For everything
* else we just return a new connection.
*/
if (placement->partitionMethod != DISTRIBUTE_BY_HASH)
{
return StartNodeConnection(flags, placement->nodeName, placement->nodePort);
}
strcpy(key.nodeName, placement->nodeName);
key.nodePort = placement->nodePort;
key.colocationGroupId = placement->colocationGroupId;
key.representativeValue = placement->representativeValue;
entry = hash_search(ColocatedPlacementsHash, &key, HASH_ENTER, &found);
if (!found)
{
memset(((char *) entry) + sizeof(ColocatedPlacementsHashKey),
0,
sizeof(ColocatedPlacementsHashEntry) - sizeof(ColocatedPlacementsHashKey));
dlist_init(&entry->connectionReferences);
}
connectionReference =
CheckExistingColocatedConnections(flags, entry, userName);
if (!connectionReference)
{
MultiConnection *connection = NULL;
connection = StartNodeConnection(flags, placement->nodeName, placement->nodePort);
connectionReference = (ConnectionReference *)
MemoryContextAlloc(TopTransactionContext,
sizeof(ConnectionReference));
connectionReference->connection = connection;
connectionReference->hadDDL = false;
connectionReference->hadDML = false;
connectionReference->userName =
MemoryContextStrdup(TopTransactionContext, userName);
dlist_push_tail(&entry->connectionReferences,
&connectionReference->placementNode);
/* record association with connection, to handle connection closure */
dlist_push_tail(&connection->referencedPlacements,
&connectionReference->connectionNode);
}
if (flags & FOR_DDL)
{
entry->modifyingConnection = connectionReference;
connectionReference->hadDDL = true;
}
if (flags & FOR_DML)
{
entry->modifyingConnection = connectionReference;
connectionReference->hadDML = true;
}
return connectionReference->connection;
}
/*
* AssociatePlacementWithShard records shard->placement relation in
* ConnectionShardHash.
@ -480,6 +713,7 @@ ResetPlacementConnectionManagement(void)
/* Simply delete all entries */
hash_delete_all(ConnectionPlacementHash);
hash_delete_all(ConnectionShardHash);
hash_delete_all(ColocatedPlacementsHash);
/*
* NB: memory for ConnectionReference structs and subordinate data is
@ -632,6 +866,18 @@ InitPlacementConnectionManagement(void)
ConnectionPlacementHash = hash_create("citus connection cache (placementid)",
64, &info, hashFlags);
/* create (colocated placement identity) -> [ConnectionReference] hash */
memset(&info, 0, sizeof(info));
info.keysize = sizeof(ColocatedPlacementsHashKey);
info.entrysize = sizeof(ColocatedPlacementsHashEntry);
info.hash = ColocatedPlacementsHashHash;
info.match = ColocatedPlacementsHashCompare;
info.hcxt = ConnectionContext;
hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT | HASH_COMPARE);
ColocatedPlacementsHash = hash_create("citus connection cache (colocated placements)",
64, &info, hashFlags);
/* create (shardId) -> [ConnectionShardHashEntry] hash */
memset(&info, 0, sizeof(info));
info.keysize = sizeof(ConnectionShardHashKey);
@ -643,3 +889,38 @@ InitPlacementConnectionManagement(void)
ConnectionShardHash = hash_create("citus connection cache (shardid)",
64, &info, hashFlags);
}
static uint32
ColocatedPlacementsHashHash(const void *key, Size keysize)
{
ColocatedPlacementsHashKey *entry = (ColocatedPlacementsHashKey *) key;
uint32 hash = 0;
hash = string_hash(entry->nodeName, NAMEDATALEN);
hash = hash_combine(hash, hash_uint32(entry->nodePort));
hash = hash_combine(hash, hash_uint32(entry->colocationGroupId));
hash = hash_combine(hash, hash_uint32(entry->representativeValue));
return hash;
}
static int
ColocatedPlacementsHashCompare(const void *a, const void *b, Size keysize)
{
ColocatedPlacementsHashKey *ca = (ColocatedPlacementsHashKey *) a;
ColocatedPlacementsHashKey *cb = (ColocatedPlacementsHashKey *) b;
if (strncmp(ca->nodeName, cb->nodeName, MAX_NODE_LENGTH) != 0 ||
ca->nodePort != cb->nodePort ||
ca->colocationGroupId != cb->colocationGroupId ||
ca->representativeValue != cb->representativeValue)
{
return 1;
}
else
{
return 0;
}
}

View File

@ -473,6 +473,10 @@ OutShardPlacement(OUTFUNC_ARGS)
WRITE_ENUM_FIELD(shardState, RelayFileState);
WRITE_STRING_FIELD(nodeName);
WRITE_UINT_FIELD(nodePort);
/* so we can deal with 0 */
WRITE_INT_FIELD(partitionMethod);
WRITE_UINT_FIELD(colocationGroupId);
WRITE_UINT_FIELD(representativeValue);
}

View File

@ -268,6 +268,10 @@ ReadShardPlacement(READFUNC_ARGS)
READ_ENUM_FIELD(shardState, RelayFileState);
READ_STRING_FIELD(nodeName);
READ_UINT_FIELD(nodePort);
/* so we can deal with 0 */
READ_INT_FIELD(partitionMethod);
READ_UINT_FIELD(colocationGroupId);
READ_UINT_FIELD(representativeValue);
READ_DONE();
}

View File

@ -703,9 +703,31 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry)
foreach(placementCell, placementList)
{
ShardPlacement *srcPlacement = (ShardPlacement *) lfirst(placementCell);
ShardPlacement *dstPlacement = &placementArray[placementOffset];
CopyShardPlacement(srcPlacement, &placementArray[placementOffset]);
CopyShardPlacement(srcPlacement, dstPlacement);
/* fill in remaining fields */
Assert(cacheEntry->partitionMethod != 0);
dstPlacement->partitionMethod = cacheEntry->partitionMethod;
dstPlacement->colocationGroupId = cacheEntry->colocationId;
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH)
{
Assert(shardInterval->minValueExists);
Assert(shardInterval->valueTypeId == INT4OID);
/*
* Use the lower boundary of the interval's range to identify
* it for colocation purposes. That remains meaningful even if
* a concurrent session splits a shard.
*/
dstPlacement->representativeValue =
DatumGetInt32(shardInterval->minValue);
}
else
{
dstPlacement->representativeValue = 0;
}
placementOffset++;
}
MemoryContextSwitchTo(oldContext);

View File

@ -53,6 +53,9 @@ typedef struct ShardPlacement
RelayFileState shardState;
char *nodeName;
uint32 nodePort;
char partitionMethod;
uint32 colocationGroupId;
uint32 representativeValue;
} ShardPlacement;