Avoid connections that accessed non-colocated placements in multi-shard commands

pull/1519/head
Marco Slot 2017-07-19 14:42:54 +02:00
parent 7060ade6fe
commit d3e9746236
9 changed files with 226 additions and 117 deletions

View File

@ -763,7 +763,7 @@ OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections,
ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell);
char *nodeUser = CurrentUserName();
MultiConnection *connection = NULL;
uint32 connectionFlags = FOR_DML;
uint32 connectionFlags = FOR_DML | CONNECTION_PER_PLACEMENT;
StringInfo copyCommand = NULL;
PGresult *result = NULL;

View File

@ -12,6 +12,7 @@
#include "postgres.h"
#include "access/hash.h"
#include "distributed/colocation_utils.h"
#include "distributed/connection_management.h"
#include "distributed/hash_helpers.h"
#include "distributed/master_protocol.h"
@ -49,6 +50,10 @@ typedef struct ConnectionReference
bool hadDML;
bool hadDDL;
/* colocation group of the placement, if any */
uint32 colocationGroupId;
uint32 representativeValue;
/* membership in MultiConnection->referencedPlacements */
dlist_node connectionNode;
} ConnectionReference;
@ -174,10 +179,15 @@ typedef struct ConnectionShardHashEntry
static HTAB *ConnectionShardHash;
static MultiConnection * FindPlacementListConnection(int flags, List *placementAccessList,
const char *userName,
List **placementEntryList);
static ConnectionPlacementHashEntry * FindOrCreatePlacementEntry(
ShardPlacement *placement);
static bool CanUseExistingConnection(uint32 flags, const char *userName,
ConnectionReference *placementConnection);
static bool ConnectionAccessedDifferentPlacement(MultiConnection *connection,
ShardPlacement *placement);
static void AssociatePlacementWithShard(ConnectionPlacementHashEntry *placementEntry,
ShardPlacement *placement);
static bool CheckShardPlacements(ConnectionShardHashEntry *shardEntry);
@ -272,7 +282,6 @@ StartPlacementListConnection(uint32 flags, List *placementAccessList,
const char *userName)
{
char *freeUserName = NULL;
bool foundModifyingConnection = false;
ListCell *placementAccessCell = NULL;
List *placementEntryList = NIL;
ListCell *placementEntryCell = NULL;
@ -283,6 +292,163 @@ StartPlacementListConnection(uint32 flags, List *placementAccessList,
userName = freeUserName = CurrentUserName();
}
chosenConnection = FindPlacementListConnection(flags, placementAccessList, userName,
&placementEntryList);
if (chosenConnection == NULL)
{
/* use the first placement from the list to extract nodename and nodeport */
ShardPlacementAccess *placementAccess =
(ShardPlacementAccess *) linitial(placementAccessList);
ShardPlacement *placement = placementAccess->placement;
char *nodeName = placement->nodeName;
int nodePort = placement->nodePort;
/*
* No suitable connection in the placement->connection mapping, get one from
* the node->connection pool.
*/
chosenConnection = StartNodeConnection(flags, nodeName, nodePort);
if (flags & CONNECTION_PER_PLACEMENT &&
ConnectionAccessedDifferentPlacement(chosenConnection, placement))
{
/*
* Cached connection accessed a non-co-located placement in the same
* table or co-location group, while the caller asked for a connection
* per placement. Open a new connection instead.
*
* We use this for situations in which we want to use a different
* connection for every placement, such as COPY. If we blindly returned
* a cached conection that already modified a different, non-co-located
* placement B in the same table or in a table with the same co-location
* ID as the current placement, then we'd no longer able to write to
* placement B later in the COPY.
*/
chosenConnection = StartNodeConnection(flags | FORCE_NEW_CONNECTION, nodeName,
nodePort);
Assert(!ConnectionAccessedDifferentPlacement(chosenConnection, placement));
}
}
/*
* Now that a connection has been chosen, initialise or update the connection
* references for all placements.
*/
forboth(placementAccessCell, placementAccessList,
placementEntryCell, placementEntryList)
{
ShardPlacementAccess *placementAccess =
(ShardPlacementAccess *) lfirst(placementAccessCell);
ShardPlacementAccessType accessType = placementAccess->accessType;
ConnectionPlacementHashEntry *placementEntry =
(ConnectionPlacementHashEntry *) lfirst(placementEntryCell);
ConnectionReference *placementConnection = placementEntry->primaryConnection;
if (placementConnection->connection == chosenConnection)
{
/* using the connection that was already assigned to the placement */
}
else if (placementConnection->connection == NULL)
{
/* placement does not have a connection assigned yet */
placementConnection->connection = chosenConnection;
placementConnection->hadDDL = false;
placementConnection->hadDML = false;
placementConnection->userName = MemoryContextStrdup(TopTransactionContext,
userName);
/* record association with connection */
dlist_push_tail(&chosenConnection->referencedPlacements,
&placementConnection->connectionNode);
}
else
{
/* using a different connection than the one assigned to the placement */
if (accessType != PLACEMENT_ACCESS_SELECT)
{
/*
* We previously read from the placement, but now we're writing to
* it (if we had written to the placement, we would have either chosen
* the same connection, or errored out). Update the connection reference
* to point to the connection used for writing. We don't need to remember
* the existing connection since we won't be able to reuse it for
* accessing the placement. However, we do register that it exists in
* hasSecondaryConnections.
*/
placementConnection->connection = chosenConnection;
placementConnection->userName = MemoryContextStrdup(TopTransactionContext,
userName);
Assert(!placementConnection->hadDDL);
Assert(!placementConnection->hadDML);
/* record association with connection */
dlist_push_tail(&chosenConnection->referencedPlacements,
&placementConnection->connectionNode);
}
/*
* There are now multiple connections that read from the placement
* and DDL commands are forbidden.
*/
placementEntry->hasSecondaryConnections = true;
if (placementEntry->colocatedEntry != NULL)
{
/* we also remember this for co-located placements */
placementEntry->colocatedEntry->hasSecondaryConnections = true;
}
}
/*
* Remember that we used the current connection for writes.
*/
if (accessType == PLACEMENT_ACCESS_DDL)
{
placementConnection->hadDDL = true;
}
if (accessType == PLACEMENT_ACCESS_DML)
{
placementConnection->hadDML = true;
}
}
if (freeUserName)
{
pfree(freeUserName);
}
return chosenConnection;
}
/*
* FindPlacementListConnection determines whether there is a connection that must
* be used to perform the given placement accesses.
*
* If a placement was only read in this transaction, then the same connection must
* be used for DDL to prevent self-deadlock. If a placement was modified in this
* transaction, then the same connection must be used for all subsequent accesses
* to ensure read-your-writes consistency and prevent self-deadlock. If those
* conditions cannot be met, because a connection is in use or the placements in
* the placement access list were modified over multiple connections, then this
* function throws an error.
*
* The function returns the connection that needs to be used, if such a connection
* exists, and the current placement entries for all placements in the placement
* access list.
*/
static MultiConnection *
FindPlacementListConnection(int flags, List *placementAccessList, const char *userName,
List **placementEntryList)
{
bool foundModifyingConnection = false;
ListCell *placementAccessCell = NULL;
MultiConnection *chosenConnection = NULL;
/*
* Go through all placement accesses to find a suitable connection.
*
@ -474,108 +640,7 @@ StartPlacementListConnection(uint32 flags, List *placementAccessList,
Assert(accessType != PLACEMENT_ACCESS_DDL);
}
placementEntryList = lappend(placementEntryList, placementEntry);
}
if (chosenConnection == NULL)
{
/* use the first placement from the list to extract nodename and nodeport */
ShardPlacementAccess *placementAccess =
(ShardPlacementAccess *) linitial(placementAccessList);
ShardPlacement *placement = placementAccess->placement;
/*
* No suitable connection in the placement->connection mapping, get one from
* the node->connection pool.
*/
chosenConnection = StartNodeConnection(flags, placement->nodeName,
placement->nodePort);
}
/*
* Now that a connection has been chosen, initialise or update the connection
* references for all placements.
*/
forboth(placementAccessCell, placementAccessList,
placementEntryCell, placementEntryList)
{
ShardPlacementAccess *placementAccess =
(ShardPlacementAccess *) lfirst(placementAccessCell);
ShardPlacementAccessType accessType = placementAccess->accessType;
ConnectionPlacementHashEntry *placementEntry =
(ConnectionPlacementHashEntry *) lfirst(placementEntryCell);
ConnectionReference *placementConnection = placementEntry->primaryConnection;
if (placementConnection->connection == chosenConnection)
{
/* using the connection that was already assigned to the placement */
}
else if (placementConnection->connection == NULL)
{
/* placement does not have a connection assigned yet */
placementConnection->connection = chosenConnection;
placementConnection->hadDDL = false;
placementConnection->hadDML = false;
placementConnection->userName = MemoryContextStrdup(TopTransactionContext,
userName);
/* record association with connection, to handle connection closure */
dlist_push_tail(&chosenConnection->referencedPlacements,
&placementConnection->connectionNode);
}
else
{
/* using a different connection than the one assigned to the placement */
if (accessType != PLACEMENT_ACCESS_SELECT)
{
/*
* We previously read from the placement, but now we're writing to
* it (if we had written to the placement, we would have either chosen
* the same connection, or errored out). Update the connection reference
* to point to the connection used for writing. We don't need to remember
* the existing connection since we won't be able to reuse it for
* accessing the placement. However, we do register that it exists in
* hasSecondaryConnections.
*/
placementConnection->connection = chosenConnection;
placementConnection->userName = MemoryContextStrdup(TopTransactionContext,
userName);
Assert(!placementConnection->hadDDL);
Assert(!placementConnection->hadDML);
}
/*
* There are now multiple connections that read from the placement
* and DDL commands are forbidden.
*/
placementEntry->hasSecondaryConnections = true;
if (placementEntry->colocatedEntry != NULL)
{
/* we also remember this for co-located placements */
placementEntry->colocatedEntry->hasSecondaryConnections = true;
}
}
/*
* Remember that we used the current connection for writes.
*/
if (accessType == PLACEMENT_ACCESS_DDL)
{
placementConnection->hadDDL = true;
}
if (accessType == PLACEMENT_ACCESS_DML)
{
placementConnection->hadDML = true;
}
}
if (freeUserName)
{
pfree(freeUserName);
*placementEntryList = lappend(*placementEntryList, placementEntry);
}
return chosenConnection;
@ -625,11 +690,21 @@ FindOrCreatePlacementEntry(ShardPlacement *placement)
void *conRef = MemoryContextAllocZero(TopTransactionContext,
sizeof(ConnectionReference));
ConnectionReference *connectionReference = (ConnectionReference *) conRef;
/*
* Store the co-location group information such that we can later
* determine whether a connection accessed different placements
* of the same co-location group.
*/
connectionReference->colocationGroupId = placement->colocationGroupId;
connectionReference->representativeValue = placement->representativeValue;
/*
* Create a connection reference that can be used for the entire
* set of co-located placements.
*/
colocatedEntry->primaryConnection = (ConnectionReference *) conRef;
colocatedEntry->primaryConnection = connectionReference;
colocatedEntry->hasSecondaryConnections = false;
}
@ -694,6 +769,35 @@ CanUseExistingConnection(uint32 flags, const char *userName,
}
/*
* ConnectionAccessedDifferentPlacement returns true if the connection accessed another
* placement in the same colocation group with a different representative value,
* meaning it's not strictly colocated.
*/
static bool
ConnectionAccessedDifferentPlacement(MultiConnection *connection,
ShardPlacement *placement)
{
dlist_iter placementIter;
dlist_foreach(placementIter, &connection->referencedPlacements)
{
ConnectionReference *connectionReference =
dlist_container(ConnectionReference, connectionNode, placementIter.cur);
if (placement->colocationGroupId != INVALID_COLOCATION_ID &&
placement->colocationGroupId == connectionReference->colocationGroupId &&
placement->representativeValue != connectionReference->representativeValue)
{
/* non-co-located placements from the same co-location group */
return true;
}
}
return false;
}
/*
* AssociatePlacementWithShard records shard->placement relation in
* ConnectionShardHash.

View File

@ -482,6 +482,12 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
List *claimedConnectionList = NIL;
ListCell *connectionCell = NULL;
ListCell *shardPlacementCell = NULL;
int connectionFlags = FOR_DDL;
if (useExclusiveConnection)
{
connectionFlags |= CONNECTION_PER_PLACEMENT;
}
BeginOrContinueCoordinatedTransaction();
@ -498,7 +504,7 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
shardIndex = ShardIndex(shardInterval);
}
connection = GetPlacementConnection(FOR_DDL, shardPlacement,
connection = GetPlacementConnection(connectionFlags, shardPlacement,
placementOwner);
if (useExclusiveConnection)
{

View File

@ -46,6 +46,8 @@ OpenTransactionsForAllTasks(List *taskList, int connectionFlags)
shardConnectionHash = CreateShardConnectionHash(CurrentMemoryContext);
connectionFlags |= CONNECTION_PER_PLACEMENT;
/* open connections to shards which don't have connections yet */
foreach(taskCell, taskList)
{

View File

@ -37,7 +37,10 @@ enum MultiConnectionMode
FOR_DDL = 1 << 2,
FOR_DML = 1 << 3
FOR_DML = 1 << 3,
/* open a connection per (co-located set of) placement(s) */
CONNECTION_PER_PLACEMENT = 1 << 4
};

View File

@ -441,9 +441,8 @@ SELECT create_distributed_table('products', 'product_no');
BEGIN;
INSERT INTO products VALUES(1,'product_1', 5);
-- DDL may error out after an INSERT because it might pick the wrong connection
-- DDL should pick the right connections after a single INSERT
ALTER TABLE products ADD CONSTRAINT unn_pno UNIQUE(product_no);
ERROR: cannot establish a new connection for placement 1450407, since DML has been executed on a connection that is in use
ROLLBACK;
BEGIN;
-- Add constraints

View File

@ -1648,13 +1648,10 @@ INSERT INTO raw_events_first (user_id)
SELECT user_id FROM raw_events_second JOIN reference_table USING (user_id);
ERROR: cannot establish a new connection for placement 13300024, since DDL has been executed on a connection that is in use
ROLLBACK;
-- Insert after copy is disallowed when the INSERT INTO ... SELECT chooses
-- to use a connection for one shard, while the connection already modified
-- another shard.
-- Insert after copy is allowed
BEGIN;
COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ',';
INSERT INTO raw_events_first SELECT * FROM raw_events_second;
ERROR: cannot establish a new connection for placement 13300005, since DML has been executed on a connection that is in use
ROLLBACK;
-- Insert after copy is currently allowed for single-shard operation.
-- Both insert and copy are rolled back successfully.

View File

@ -382,7 +382,7 @@ SELECT create_distributed_table('products', 'product_no');
BEGIN;
INSERT INTO products VALUES(1,'product_1', 5);
-- DDL may error out after an INSERT because it might pick the wrong connection
-- DDL should pick the right connections after a single INSERT
ALTER TABLE products ADD CONSTRAINT unn_pno UNIQUE(product_no);
ROLLBACK;

View File

@ -1343,9 +1343,7 @@ INSERT INTO raw_events_first (user_id)
SELECT user_id FROM raw_events_second JOIN reference_table USING (user_id);
ROLLBACK;
-- Insert after copy is disallowed when the INSERT INTO ... SELECT chooses
-- to use a connection for one shard, while the connection already modified
-- another shard.
-- Insert after copy is allowed
BEGIN;
COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ',';
100,100