mirror of https://github.com/citusdata/citus.git
Allow choosing a connection for multiple placements with GetPlacementListConnection
parent
0d569b722e
commit
63676f5d65
|
@ -14,7 +14,9 @@
|
|||
#include "access/hash.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/hash_helpers.h"
|
||||
#include "distributed/master_protocol.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/multi_planner.h"
|
||||
#include "distributed/placement_connection.h"
|
||||
#include "utils/hsearch.h"
|
||||
#include "utils/memutils.h"
|
||||
|
@ -47,17 +49,14 @@ typedef struct ConnectionReference
|
|||
bool hadDML;
|
||||
bool hadDDL;
|
||||
|
||||
/*
|
||||
* Membership in ConnectionPlacementHashEntry->connectionReferences or
|
||||
* ColocatedPlacementsHashEntry->connectionReferences.
|
||||
*/
|
||||
dlist_node placementNode;
|
||||
|
||||
/* membership in MultiConnection->referencedPlacements */
|
||||
dlist_node connectionNode;
|
||||
} ConnectionReference;
|
||||
|
||||
|
||||
struct ColocatedPlacementsHashEntry;
|
||||
|
||||
|
||||
/*
|
||||
* Hash table mapping placements to a list of connections.
|
||||
*
|
||||
|
@ -85,11 +84,14 @@ typedef struct ConnectionPlacementHashEntry
|
|||
/* did any remote transactions fail? */
|
||||
bool failed;
|
||||
|
||||
/* list of connections to remote nodes */
|
||||
dlist_head connectionReferences;
|
||||
/* primary connection used to access the placement */
|
||||
ConnectionReference *primaryConnection;
|
||||
|
||||
/* if non-NULL, connection executing DML/DDL*/
|
||||
ConnectionReference *modifyingConnection;
|
||||
/* are any other connections reading from the placements? */
|
||||
bool hasSecondaryConnections;
|
||||
|
||||
/* entry for the set of co-located placements */
|
||||
struct ColocatedPlacementsHashEntry *colocatedEntry;
|
||||
|
||||
/* membership in ConnectionShardHashEntry->placementConnections */
|
||||
dlist_node shardNode;
|
||||
|
@ -137,11 +139,11 @@ typedef struct ColocatedPlacementsHashEntry
|
|||
{
|
||||
ColocatedPlacementsHashKey key;
|
||||
|
||||
/* list of connections to remote nodes */
|
||||
dlist_head connectionReferences;
|
||||
/* primary connection used to access the co-located placements */
|
||||
ConnectionReference *primaryConnection;
|
||||
|
||||
/* if non-NULL, connection executing DML/DDL*/
|
||||
ConnectionReference *modifyingConnection;
|
||||
/* are any other connections reading from the placements? */
|
||||
bool hasSecondaryConnections;
|
||||
} ColocatedPlacementsHashEntry;
|
||||
|
||||
static HTAB *ColocatedPlacementsHash;
|
||||
|
@ -172,19 +174,10 @@ typedef struct ConnectionShardHashEntry
|
|||
static HTAB *ConnectionShardHash;
|
||||
|
||||
|
||||
static MultiConnection * StartColocatedPlacementConnection(uint32 flags,
|
||||
ShardPlacement *placement,
|
||||
const char *userName);
|
||||
static ConnectionReference * CheckExistingPlacementConnections(uint32 flags,
|
||||
ConnectionPlacementHashEntry
|
||||
*placementEntry,
|
||||
const char *userName);
|
||||
static ConnectionReference * CheckExistingColocatedConnections(uint32 flags,
|
||||
ColocatedPlacementsHashEntry
|
||||
*connectionEntry,
|
||||
const char *userName);
|
||||
static ConnectionPlacementHashEntry * FindOrCreatePlacementEntry(
|
||||
ShardPlacement *placement);
|
||||
static bool CanUseExistingConnection(uint32 flags, const char *userName,
|
||||
ConnectionReference *connectionReference);
|
||||
ConnectionReference *placementConnection);
|
||||
static void AssociatePlacementWithShard(ConnectionPlacementHashEntry *placementEntry,
|
||||
ShardPlacement *placement);
|
||||
static bool CheckShardPlacements(ConnectionShardHashEntry *shardEntry);
|
||||
|
@ -229,76 +222,350 @@ GetPlacementConnection(uint32 flags, ShardPlacement *placement, const char *user
|
|||
MultiConnection *
|
||||
StartPlacementConnection(uint32 flags, ShardPlacement *placement, const char *userName)
|
||||
{
|
||||
ConnectionPlacementHashKey key;
|
||||
ConnectionPlacementHashEntry *placementEntry = NULL;
|
||||
ConnectionReference *returnConnectionReference = NULL;
|
||||
ShardPlacementAccess *placementAccess =
|
||||
(ShardPlacementAccess *) palloc0(sizeof(ShardPlacementAccess));
|
||||
|
||||
placementAccess->placement = placement;
|
||||
|
||||
if (flags & FOR_DDL)
|
||||
{
|
||||
placementAccess->accessType = PLACEMENT_ACCESS_DDL;
|
||||
}
|
||||
else if (flags & FOR_DML)
|
||||
{
|
||||
placementAccess->accessType = PLACEMENT_ACCESS_DML;
|
||||
}
|
||||
else
|
||||
{
|
||||
placementAccess->accessType = PLACEMENT_ACCESS_SELECT;
|
||||
}
|
||||
|
||||
return StartPlacementListConnection(flags, list_make1(placementAccess), userName);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetPlacementListConnection establishes a connection for a set of placement
|
||||
* accesses.
|
||||
*
|
||||
* See StartPlacementListConnection for details.
|
||||
*/
|
||||
MultiConnection *
|
||||
GetPlacementListConnection(uint32 flags, List *placementAccessList, const char *userName)
|
||||
{
|
||||
MultiConnection *connection = StartPlacementListConnection(flags, placementAccessList,
|
||||
userName);
|
||||
|
||||
FinishConnectionEstablishment(connection);
|
||||
return connection;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* StartPlacementListConnection returns a connection to a remote node suitable for
|
||||
* a placement accesses (SELECT, DML, DDL) or throws an error if no suitable
|
||||
* connection can be established if would cause a self-deadlock or consistency
|
||||
* violation.
|
||||
*/
|
||||
MultiConnection *
|
||||
StartPlacementListConnection(uint32 flags, List *placementAccessList,
|
||||
const char *userName)
|
||||
{
|
||||
char *freeUserName = NULL;
|
||||
bool found = false;
|
||||
bool foundModifyingConnection = false;
|
||||
ListCell *placementAccessCell = NULL;
|
||||
List *placementEntryList = NIL;
|
||||
ListCell *placementEntryCell = NULL;
|
||||
MultiConnection *chosenConnection = NULL;
|
||||
|
||||
if (userName == NULL)
|
||||
{
|
||||
userName = freeUserName = CurrentUserName();
|
||||
}
|
||||
|
||||
key.placementId = placement->placementId;
|
||||
|
||||
/* lookup relevant hash entry */
|
||||
placementEntry = hash_search(ConnectionPlacementHash, &key, HASH_ENTER, &found);
|
||||
if (!found)
|
||||
{
|
||||
dlist_init(&placementEntry->connectionReferences);
|
||||
placementEntry->failed = false;
|
||||
placementEntry->modifyingConnection = NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
* Check whether any of the connections already associated with the
|
||||
* placement can be reused, or violates FOR_DML/FOR_DDL constraints.
|
||||
*/
|
||||
returnConnectionReference =
|
||||
CheckExistingPlacementConnections(flags, placementEntry, userName);
|
||||
|
||||
/*
|
||||
* Either no caching desired, or no connection already associated with the
|
||||
* placement present. Check whether there's a usable connection associated
|
||||
* for the set of colocated placements, or establish a new one.
|
||||
* Go through all placement accesses to find a suitable connection.
|
||||
*
|
||||
* Allocations are performed in transaction context, so we don't have to
|
||||
* care about freeing in case of an early disconnect.
|
||||
* If none of the placements have been accessed in this transaction, connection
|
||||
* remains NULL.
|
||||
*
|
||||
* If one or more of the placements have been modified in this transaction, then
|
||||
* use the connection that performed the write. If placements have been written
|
||||
* over multiple connections or the connection is not available, error out.
|
||||
*
|
||||
* If placements have only been read in this transaction, then use the last
|
||||
* suitable connection found for a placement in the placementAccessList.
|
||||
*/
|
||||
if (returnConnectionReference == NULL)
|
||||
foreach(placementAccessCell, placementAccessList)
|
||||
{
|
||||
MultiConnection *connection =
|
||||
StartColocatedPlacementConnection(flags, placement, userName);
|
||||
ShardPlacementAccess *placementAccess =
|
||||
(ShardPlacementAccess *) lfirst(placementAccessCell);
|
||||
ShardPlacement *placement = placementAccess->placement;
|
||||
ShardPlacementAccessType accessType = placementAccess->accessType;
|
||||
|
||||
returnConnectionReference = (ConnectionReference *)
|
||||
MemoryContextAlloc(TopTransactionContext,
|
||||
sizeof(ConnectionReference));
|
||||
returnConnectionReference->connection = connection;
|
||||
returnConnectionReference->hadDDL = false;
|
||||
returnConnectionReference->hadDML = false;
|
||||
returnConnectionReference->userName =
|
||||
MemoryContextStrdup(TopTransactionContext, userName);
|
||||
dlist_push_tail(&placementEntry->connectionReferences,
|
||||
&returnConnectionReference->placementNode);
|
||||
ConnectionPlacementHashEntry *placementEntry = NULL;
|
||||
ColocatedPlacementsHashEntry *colocatedEntry = NULL;
|
||||
ConnectionReference *placementConnection = NULL;
|
||||
|
||||
/* record association with shard, for invalidation */
|
||||
AssociatePlacementWithShard(placementEntry, placement);
|
||||
if (placement->shardId == INVALID_SHARD_ID)
|
||||
{
|
||||
/*
|
||||
* When a SELECT prunes down to 0 shard, we use a dummy placement.
|
||||
* In that case, we can fall back to the default connection.
|
||||
*
|
||||
* FIXME: this can be removed if we evaluate empty SELECTs locally.
|
||||
*/
|
||||
continue;
|
||||
}
|
||||
|
||||
/* record association with connection, to handle connection closure */
|
||||
dlist_push_tail(&connection->referencedPlacements,
|
||||
&returnConnectionReference->connectionNode);
|
||||
placementEntry = FindOrCreatePlacementEntry(placement);
|
||||
colocatedEntry = placementEntry->colocatedEntry;
|
||||
placementConnection = placementEntry->primaryConnection;
|
||||
|
||||
/* note: the Asserts below are primarily for clarifying the conditions */
|
||||
|
||||
if (placementConnection->connection == NULL)
|
||||
{
|
||||
/* no connection has been chosen for the placement */
|
||||
}
|
||||
else if (accessType == PLACEMENT_ACCESS_DDL &&
|
||||
placementEntry->hasSecondaryConnections)
|
||||
{
|
||||
/*
|
||||
* If a placement has been read over multiple connections (typically as
|
||||
* a result of a reference table join) then a DDL command on the placement
|
||||
* would create a self-deadlock.
|
||||
*/
|
||||
|
||||
Assert(placementConnection != NULL);
|
||||
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||
errmsg("cannot perform DDL on a placement which has been read over "
|
||||
"multiple connections")));
|
||||
}
|
||||
else if (accessType == PLACEMENT_ACCESS_DDL && colocatedEntry != NULL &&
|
||||
colocatedEntry->hasSecondaryConnections)
|
||||
{
|
||||
/*
|
||||
* If a placement has been read over multiple (uncommitted) connections
|
||||
* then a DDL command on a co-located placement may create a self-deadlock
|
||||
* if there exist some relationship between the co-located placements
|
||||
* (e.g. foreign key, partitioning).
|
||||
*/
|
||||
|
||||
Assert(placementConnection != NULL);
|
||||
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||
errmsg("cannot perform DDL on a placement if a co-located placement "
|
||||
"has been read over multiple connections")));
|
||||
}
|
||||
else if (foundModifyingConnection)
|
||||
{
|
||||
/*
|
||||
* We already found a connection that performed writes on of the placements
|
||||
* and must use it.
|
||||
*/
|
||||
|
||||
if ((placementConnection->hadDDL || placementConnection->hadDML) &&
|
||||
placementConnection->connection != chosenConnection)
|
||||
{
|
||||
/*
|
||||
* The current placement may have been modified over a different
|
||||
* connection. Neither connection is guaranteed to see all uncomitted
|
||||
* writes and therefore we cannot proceed.
|
||||
*/
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||
errmsg("cannot perform query with placements that were "
|
||||
"modified over multiple connections")));
|
||||
}
|
||||
}
|
||||
else if (CanUseExistingConnection(flags, userName, placementConnection))
|
||||
{
|
||||
/*
|
||||
* There is an existing connection for the placement and we can use it.
|
||||
*/
|
||||
|
||||
Assert(placementConnection != NULL);
|
||||
|
||||
chosenConnection = placementConnection->connection;
|
||||
|
||||
if (placementConnection->hadDDL || placementConnection->hadDML)
|
||||
{
|
||||
/* this connection performed writes, we must use it */
|
||||
foundModifyingConnection = true;
|
||||
}
|
||||
}
|
||||
else if (placementConnection->hadDDL)
|
||||
{
|
||||
/*
|
||||
* There is an existing connection, but we cannot use it and it executed
|
||||
* DDL. Any subsequent operation needs to be able to see the results of
|
||||
* the DDL command and thus cannot proceed if it cannot use the connection.
|
||||
*/
|
||||
|
||||
Assert(placementConnection != NULL);
|
||||
Assert(!CanUseExistingConnection(flags, userName, placementConnection));
|
||||
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||
errmsg("cannot establish new placement connection when DDL has "
|
||||
"been executed on existing placement connection")));
|
||||
}
|
||||
else if (placementConnection->hadDML)
|
||||
{
|
||||
/*
|
||||
* There is an existing connection, but we cannot use it and it executed
|
||||
* DML. Any subsequent operation needs to be able to see the results of
|
||||
* the DML command and thus cannot proceed if it cannot use the connection.
|
||||
*
|
||||
* Note that this is not meaningfully different from the previous case. We
|
||||
* just produce a different error message based on whether DDL was or only
|
||||
* DML was executed.
|
||||
*/
|
||||
|
||||
Assert(placementConnection != NULL);
|
||||
Assert(!CanUseExistingConnection(flags, userName, placementConnection));
|
||||
Assert(!placementConnection->hadDDL);
|
||||
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||
errmsg("cannot establish new placement connection when DML has "
|
||||
"been executed on existing placement connection")));
|
||||
}
|
||||
else if (accessType == PLACEMENT_ACCESS_DDL)
|
||||
{
|
||||
/*
|
||||
* There is an existing connection, but we cannot use it and we want to
|
||||
* execute DDL. The operation on the existing connection might conflict
|
||||
* with the DDL statement.
|
||||
*/
|
||||
|
||||
Assert(placementConnection != NULL);
|
||||
Assert(!CanUseExistingConnection(flags, userName, placementConnection));
|
||||
Assert(!placementConnection->hadDDL);
|
||||
Assert(!placementConnection->hadDML);
|
||||
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||
errmsg("cannot perform parallel DDL command because multiple "
|
||||
"placements have been accessed over the same connection")));
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* The placement has a connection assigned to it, but it cannot be used,
|
||||
* most likely because it has been claimed exclusively. Fortunately, it
|
||||
* has only been used for reads and we're not performing a DDL command.
|
||||
* We can therefore use a different connection for this placement.
|
||||
*/
|
||||
|
||||
Assert(placementConnection != NULL);
|
||||
Assert(!CanUseExistingConnection(flags, userName, placementConnection));
|
||||
Assert(!placementConnection->hadDDL);
|
||||
Assert(!placementConnection->hadDML);
|
||||
Assert(accessType != PLACEMENT_ACCESS_DDL);
|
||||
}
|
||||
|
||||
placementEntryList = lappend(placementEntryList, placementEntry);
|
||||
}
|
||||
|
||||
if (flags & FOR_DDL)
|
||||
if (chosenConnection == NULL)
|
||||
{
|
||||
placementEntry->modifyingConnection = returnConnectionReference;
|
||||
returnConnectionReference->hadDDL = true;
|
||||
/* use the first placement from the list to extract nodename and nodeport */
|
||||
ShardPlacementAccess *placementAccess =
|
||||
(ShardPlacementAccess *) linitial(placementAccessList);
|
||||
ShardPlacement *placement = placementAccess->placement;
|
||||
|
||||
/*
|
||||
* No suitable connection in the placement->connection mapping, get one from
|
||||
* the node->connection pool.
|
||||
*/
|
||||
chosenConnection = StartNodeConnection(flags, placement->nodeName,
|
||||
placement->nodePort);
|
||||
}
|
||||
if (flags & FOR_DML)
|
||||
|
||||
/*
|
||||
* Now that a connection has been chosen, initialise or update the connection
|
||||
* references for all placements.
|
||||
*/
|
||||
forboth(placementAccessCell, placementAccessList,
|
||||
placementEntryCell, placementEntryList)
|
||||
{
|
||||
placementEntry->modifyingConnection = returnConnectionReference;
|
||||
returnConnectionReference->hadDML = true;
|
||||
ShardPlacementAccess *placementAccess =
|
||||
(ShardPlacementAccess *) lfirst(placementAccessCell);
|
||||
ShardPlacementAccessType accessType = placementAccess->accessType;
|
||||
ConnectionPlacementHashEntry *placementEntry =
|
||||
(ConnectionPlacementHashEntry *) lfirst(placementEntryCell);
|
||||
ConnectionReference *placementConnection = placementEntry->primaryConnection;
|
||||
|
||||
if (placementConnection->connection == chosenConnection)
|
||||
{
|
||||
/* using the connection that was already assigned to the placement */
|
||||
}
|
||||
else if (placementConnection->connection == NULL)
|
||||
{
|
||||
/* placement does not have a connection assigned yet */
|
||||
placementConnection->connection = chosenConnection;
|
||||
placementConnection->hadDDL = false;
|
||||
placementConnection->hadDML = false;
|
||||
placementConnection->userName = MemoryContextStrdup(TopTransactionContext,
|
||||
userName);
|
||||
|
||||
/* record association with connection, to handle connection closure */
|
||||
dlist_push_tail(&chosenConnection->referencedPlacements,
|
||||
&placementConnection->connectionNode);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* using a different connection than the one assigned to the placement */
|
||||
|
||||
if (accessType != PLACEMENT_ACCESS_SELECT)
|
||||
{
|
||||
/*
|
||||
* We previously read from the placement, but now we're writing to
|
||||
* it (if we had written to the placement, we would have either chosen
|
||||
* the same connection, or errored out). Update the connection reference
|
||||
* to point to the connection used for writing. We don't need to remember
|
||||
* the existing connection since we won't be able to reuse it for
|
||||
* accessing the placement. However, we do register that it exists in
|
||||
* hasSecondaryConnections.
|
||||
*/
|
||||
placementConnection->connection = chosenConnection;
|
||||
placementConnection->userName = MemoryContextStrdup(TopTransactionContext,
|
||||
userName);
|
||||
|
||||
Assert(!placementConnection->hadDDL);
|
||||
Assert(!placementConnection->hadDML);
|
||||
}
|
||||
|
||||
/*
|
||||
* There are now multiple connections that read from the placement
|
||||
* and DDL commands are forbidden.
|
||||
*/
|
||||
placementEntry->hasSecondaryConnections = true;
|
||||
|
||||
if (placementEntry->colocatedEntry != NULL)
|
||||
{
|
||||
/* we also remember this for co-located placements */
|
||||
placementEntry->colocatedEntry->hasSecondaryConnections = true;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Remember that we used the current connection for writes.
|
||||
*/
|
||||
if (accessType == PLACEMENT_ACCESS_DDL)
|
||||
{
|
||||
placementConnection->hadDDL = true;
|
||||
}
|
||||
|
||||
if (accessType == PLACEMENT_ACCESS_DML)
|
||||
{
|
||||
placementConnection->hadDML = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (freeUserName)
|
||||
|
@ -306,195 +573,82 @@ StartPlacementConnection(uint32 flags, ShardPlacement *placement, const char *us
|
|||
pfree(freeUserName);
|
||||
}
|
||||
|
||||
return returnConnectionReference->connection;
|
||||
return chosenConnection;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CheckExistingPlacementConnections check whether any of the existing
|
||||
* connections is usable. If so, return it, otherwise return NULL.
|
||||
*
|
||||
* A connection is usable if it is not in use, the user matches, DDL/DML usage
|
||||
* match and cached connection are allowed. If no existing connection
|
||||
* matches, but a new connection would conflict (e.g. a connection already
|
||||
* exists but isn't usable, and the desired connection needs to execute
|
||||
* DML/DML) an error is thrown.
|
||||
* FindOrCreatePlacementEntry finds a placement entry in either the
|
||||
* placement->connection hash or the co-located placements->connection hash,
|
||||
* or adds a new entry if the placement has not yet been accessed in the
|
||||
* current transaction.
|
||||
*/
|
||||
static ConnectionReference *
|
||||
CheckExistingPlacementConnections(uint32 flags,
|
||||
ConnectionPlacementHashEntry *placementEntry,
|
||||
const char *userName)
|
||||
static ConnectionPlacementHashEntry *
|
||||
FindOrCreatePlacementEntry(ShardPlacement *placement)
|
||||
{
|
||||
dlist_iter it;
|
||||
ConnectionPlacementHashKey key;
|
||||
ConnectionPlacementHashEntry *placementEntry = NULL;
|
||||
bool found = false;
|
||||
|
||||
/*
|
||||
* If there's a connection that has executed DML/DDL, always return it if
|
||||
* possible. That's because we only can execute DML/DDL over that
|
||||
* connection. Checking this first also allows us to detect conflicts due
|
||||
* to opening a second connection that wants to execute DML/DDL.
|
||||
*/
|
||||
if (placementEntry->modifyingConnection)
|
||||
key.placementId = placement->placementId;
|
||||
|
||||
placementEntry = hash_search(ConnectionPlacementHash, &key, HASH_ENTER, &found);
|
||||
if (!found)
|
||||
{
|
||||
ConnectionReference *connectionReference = placementEntry->modifyingConnection;
|
||||
/* no connection has been chosen for this placement */
|
||||
placementEntry->failed = false;
|
||||
placementEntry->primaryConnection = NULL;
|
||||
placementEntry->hasSecondaryConnections = false;
|
||||
placementEntry->colocatedEntry = NULL;
|
||||
|
||||
if (CanUseExistingConnection(flags, userName, connectionReference))
|
||||
if (placement->partitionMethod == DISTRIBUTE_BY_HASH ||
|
||||
placement->partitionMethod == DISTRIBUTE_BY_NONE)
|
||||
{
|
||||
return connectionReference;
|
||||
}
|
||||
ColocatedPlacementsHashKey key;
|
||||
ColocatedPlacementsHashEntry *colocatedEntry = NULL;
|
||||
|
||||
if (connectionReference->hadDDL)
|
||||
{
|
||||
/* would deadlock otherwise */
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||
errmsg("cannot establish new placement connection when DDL has been "
|
||||
"executed on existing placement connection")));
|
||||
}
|
||||
else if (connectionReference->hadDML)
|
||||
{
|
||||
/* we'd not see the other connection's contents */
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||
errmsg("cannot establish new placement connection when DML has been "
|
||||
"executed on existing placement connection")));
|
||||
strcpy(key.nodeName, placement->nodeName);
|
||||
key.nodePort = placement->nodePort;
|
||||
key.colocationGroupId = placement->colocationGroupId;
|
||||
key.representativeValue = placement->representativeValue;
|
||||
|
||||
/* look for a connection assigned to co-located placements */
|
||||
colocatedEntry = hash_search(ColocatedPlacementsHash, &key, HASH_ENTER,
|
||||
&found);
|
||||
if (!found)
|
||||
{
|
||||
void *conRef = MemoryContextAllocZero(TopTransactionContext,
|
||||
sizeof(ConnectionReference));
|
||||
|
||||
/*
|
||||
* Create a connection reference that can be used for the entire
|
||||
* set of co-located placements.
|
||||
*/
|
||||
colocatedEntry->primaryConnection = (ConnectionReference *) conRef;
|
||||
|
||||
colocatedEntry->hasSecondaryConnections = false;
|
||||
}
|
||||
|
||||
/*
|
||||
* Assign the connection reference for the set of co-located placements
|
||||
* to the current placement.
|
||||
*/
|
||||
placementEntry->primaryConnection = colocatedEntry->primaryConnection;
|
||||
placementEntry->colocatedEntry = colocatedEntry;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* modifying xact should have executed DML/DDL */
|
||||
Assert(false);
|
||||
void *conRef = MemoryContextAllocZero(TopTransactionContext,
|
||||
sizeof(ConnectionReference));
|
||||
|
||||
placementEntry->primaryConnection = (ConnectionReference *) conRef;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Search existing connections for a reusable connection.
|
||||
*/
|
||||
dlist_foreach(it, &placementEntry->connectionReferences)
|
||||
{
|
||||
ConnectionReference *connectionReference =
|
||||
dlist_container(ConnectionReference, placementNode, it.cur);
|
||||
/* record association with shard, for invalidation */
|
||||
AssociatePlacementWithShard(placementEntry, placement);
|
||||
|
||||
if (CanUseExistingConnection(flags, userName, connectionReference))
|
||||
{
|
||||
return connectionReference;
|
||||
}
|
||||
|
||||
/*
|
||||
* If not using the connection, verify that FOR_DML/DDL flags are
|
||||
* compatible.
|
||||
*/
|
||||
if (flags & FOR_DDL)
|
||||
{
|
||||
/* would deadlock otherwise */
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||
errmsg("cannot establish new placement connection for DDL when "
|
||||
"other connections exist")));
|
||||
}
|
||||
else if (flags & FOR_DML)
|
||||
{
|
||||
/*
|
||||
* We could allow this case (as presumably the SELECT is done, but
|
||||
* it'd be a bit prone to programming errors, so we disallow for
|
||||
* now.
|
||||
*/
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||
errmsg("cannot establish new placement connection for DML when "
|
||||
"other connections exist")));
|
||||
}
|
||||
}
|
||||
|
||||
/* establish a new connection */
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
static ConnectionReference *
|
||||
CheckExistingColocatedConnections(uint32 flags,
|
||||
ColocatedPlacementsHashEntry *connectionsEntry,
|
||||
const char *userName)
|
||||
{
|
||||
dlist_iter it;
|
||||
|
||||
/*
|
||||
* If there's a connection that has executed DML/DDL, always return it if
|
||||
* possible. That's because we only can execute DML/DDL over that
|
||||
* connection. Checking this first also allows us to detect conflicts due
|
||||
* to opening a second connection that wants to execute DML/DDL.
|
||||
*/
|
||||
if (connectionsEntry->modifyingConnection)
|
||||
{
|
||||
ConnectionReference *connectionReference = connectionsEntry->modifyingConnection;
|
||||
|
||||
if (CanUseExistingConnection(flags, userName, connectionReference))
|
||||
{
|
||||
return connectionReference;
|
||||
}
|
||||
|
||||
if (connectionReference->hadDDL)
|
||||
{
|
||||
/* would deadlock otherwise */
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||
errmsg("cannot establish new placement connection when DDL has been "
|
||||
"executed on existing placement connection for a colocated placement")));
|
||||
}
|
||||
else if (connectionReference->hadDML)
|
||||
{
|
||||
/* we'd not see the other connection's contents */
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||
errmsg("cannot establish new placement connection when DML has been "
|
||||
"executed on existing placement connection for a colocated placement")));
|
||||
}
|
||||
else
|
||||
{
|
||||
/* modifying xact should have executed DML/DDL */
|
||||
Assert(false);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Search existing connections for a reusable connection.
|
||||
*/
|
||||
dlist_foreach(it, &connectionsEntry->connectionReferences)
|
||||
{
|
||||
ConnectionReference *connectionReference =
|
||||
dlist_container(ConnectionReference, placementNode, it.cur);
|
||||
|
||||
if (CanUseExistingConnection(flags, userName, connectionReference))
|
||||
{
|
||||
return connectionReference;
|
||||
}
|
||||
|
||||
/*
|
||||
* If not using the connection, verify that FOR_DML/DDL flags are
|
||||
* compatible.
|
||||
*/
|
||||
if (flags & FOR_DDL)
|
||||
{
|
||||
/* would deadlock otherwise */
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||
errmsg("cannot establish new placement connection for DDL when "
|
||||
"other connections exist for colocated placements")));
|
||||
}
|
||||
else if (flags & FOR_DML)
|
||||
{
|
||||
/*
|
||||
* We could allow this case (as presumably the SELECT is done, but
|
||||
* it'd be a bit prone to programming errors, so we disallow for
|
||||
* now.
|
||||
*/
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||
errmsg("cannot establish new placement connection for DML when "
|
||||
"other connections exist for colocated placements")));
|
||||
}
|
||||
}
|
||||
|
||||
/* establish a new connection */
|
||||
return NULL;
|
||||
return placementEntry;
|
||||
}
|
||||
|
||||
|
||||
|
@ -535,86 +689,6 @@ CanUseExistingConnection(uint32 flags, const char *userName,
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* StartColocatedPlacementConnection returns a connection that's usable for
|
||||
* the passed in flags/placement.
|
||||
*
|
||||
* It does so by first checking whether any connection is already associated
|
||||
* with the relevant entry in the colocated placement hash, or by asking for a
|
||||
* formerly unassociated (possibly new) connection. Errors out if there's
|
||||
* conflicting connections.
|
||||
*/
|
||||
static MultiConnection *
|
||||
StartColocatedPlacementConnection(uint32 flags, ShardPlacement *placement,
|
||||
const char *userName)
|
||||
{
|
||||
ConnectionReference *connectionReference = NULL;
|
||||
ColocatedPlacementsHashKey key;
|
||||
ColocatedPlacementsHashEntry *entry;
|
||||
bool found;
|
||||
|
||||
/*
|
||||
* Currently only hash partitioned tables can be colocated. For everything
|
||||
* else we just return a new connection.
|
||||
*/
|
||||
if (placement->partitionMethod != DISTRIBUTE_BY_HASH)
|
||||
{
|
||||
return StartNodeConnection(flags, placement->nodeName, placement->nodePort);
|
||||
}
|
||||
|
||||
strcpy(key.nodeName, placement->nodeName);
|
||||
key.nodePort = placement->nodePort;
|
||||
key.colocationGroupId = placement->colocationGroupId;
|
||||
key.representativeValue = placement->representativeValue;
|
||||
|
||||
entry = hash_search(ColocatedPlacementsHash, &key, HASH_ENTER, &found);
|
||||
if (!found)
|
||||
{
|
||||
memset(((char *) entry) + sizeof(ColocatedPlacementsHashKey),
|
||||
0,
|
||||
sizeof(ColocatedPlacementsHashEntry) - sizeof(ColocatedPlacementsHashKey));
|
||||
dlist_init(&entry->connectionReferences);
|
||||
}
|
||||
|
||||
connectionReference =
|
||||
CheckExistingColocatedConnections(flags, entry, userName);
|
||||
|
||||
if (!connectionReference)
|
||||
{
|
||||
MultiConnection *connection = NULL;
|
||||
|
||||
connection = StartNodeConnection(flags, placement->nodeName, placement->nodePort);
|
||||
connectionReference = (ConnectionReference *)
|
||||
MemoryContextAlloc(TopTransactionContext,
|
||||
sizeof(ConnectionReference));
|
||||
connectionReference->connection = connection;
|
||||
connectionReference->hadDDL = false;
|
||||
connectionReference->hadDML = false;
|
||||
connectionReference->userName =
|
||||
MemoryContextStrdup(TopTransactionContext, userName);
|
||||
dlist_push_tail(&entry->connectionReferences,
|
||||
&connectionReference->placementNode);
|
||||
|
||||
/* record association with connection, to handle connection closure */
|
||||
dlist_push_tail(&connection->referencedPlacements,
|
||||
&connectionReference->connectionNode);
|
||||
}
|
||||
|
||||
if (flags & FOR_DDL)
|
||||
{
|
||||
entry->modifyingConnection = connectionReference;
|
||||
connectionReference->hadDDL = true;
|
||||
}
|
||||
if (flags & FOR_DML)
|
||||
{
|
||||
entry->modifyingConnection = connectionReference;
|
||||
connectionReference->hadDML = true;
|
||||
}
|
||||
|
||||
return connectionReference->connection;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* AssociatePlacementWithShard records shard->placement relation in
|
||||
* ConnectionShardHash.
|
||||
|
@ -680,7 +754,7 @@ CloseShardPlacementAssociation(struct MultiConnection *connection)
|
|||
|
||||
/*
|
||||
* Note that we don't reset ConnectionPlacementHashEntry's
|
||||
* modifyingConnection here, that'd more complicated than it seems
|
||||
* primaryConnection here, that'd more complicated than it seems
|
||||
* worth. That means we'll error out spuriously if a DML/DDL
|
||||
* executing connection is closed earlier in a transaction.
|
||||
*/
|
||||
|
@ -823,16 +897,17 @@ CheckShardPlacements(ConnectionShardHashEntry *shardEntry)
|
|||
{
|
||||
ConnectionPlacementHashEntry *placementEntry =
|
||||
dlist_container(ConnectionPlacementHashEntry, shardNode, placementIter.cur);
|
||||
ConnectionReference *modifyingConnection = placementEntry->modifyingConnection;
|
||||
ConnectionReference *primaryConnection = placementEntry->primaryConnection;
|
||||
MultiConnection *connection = NULL;
|
||||
|
||||
/* we only consider shards that are modified */
|
||||
if (modifyingConnection == NULL)
|
||||
if (primaryConnection == NULL ||
|
||||
!(primaryConnection->hadDDL || primaryConnection->hadDML))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
connection = modifyingConnection->connection;
|
||||
connection = primaryConnection->connection;
|
||||
|
||||
if (!connection || connection->remoteTransaction.transactionFailed)
|
||||
{
|
||||
|
|
|
@ -15,6 +15,30 @@
|
|||
/* forward declare, to avoid dependency on ShardPlacement definition */
|
||||
struct ShardPlacement;
|
||||
|
||||
/* represents the way in which a placement is accessed */
|
||||
typedef enum ShardPlacementAccessType
|
||||
{
|
||||
/* read from placement */
|
||||
PLACEMENT_ACCESS_SELECT,
|
||||
|
||||
/* modify rows in placement */
|
||||
PLACEMENT_ACCESS_DML,
|
||||
|
||||
/* modify placement schema */
|
||||
PLACEMENT_ACCESS_DDL
|
||||
} ShardPlacementAccessType;
|
||||
|
||||
/* represents access to a placement */
|
||||
typedef struct ShardPlacementAccess
|
||||
{
|
||||
/* placement that is accessed */
|
||||
struct ShardPlacement *placement;
|
||||
|
||||
/* the way in which the placement is accessed */
|
||||
ShardPlacementAccessType accessType;
|
||||
} ShardPlacementAccess;
|
||||
|
||||
|
||||
extern MultiConnection * GetPlacementConnection(uint32 flags,
|
||||
struct ShardPlacement *placement,
|
||||
const char *userName);
|
||||
|
@ -22,6 +46,13 @@ extern MultiConnection * StartPlacementConnection(uint32 flags,
|
|||
struct ShardPlacement *placement,
|
||||
const char *userName);
|
||||
|
||||
extern MultiConnection * GetPlacementListConnection(uint32 flags,
|
||||
List *placementAccessList,
|
||||
const char *userName);
|
||||
extern MultiConnection * StartPlacementListConnection(uint32 flags,
|
||||
List *placementAccessList,
|
||||
const char *userName);
|
||||
|
||||
extern void ResetPlacementConnectionManagement(void);
|
||||
extern void MarkFailedShardPlacements(void);
|
||||
extern void PostCommitMarkFailedShardPlacements(bool using2PC);
|
||||
|
|
Loading…
Reference in New Issue