diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 9f186146a..3806fa0fe 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -763,7 +763,7 @@ OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections, ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell); char *nodeUser = CurrentUserName(); MultiConnection *connection = NULL; - uint32 connectionFlags = FOR_DML; + uint32 connectionFlags = FOR_DML | CONNECTION_PER_PLACEMENT; StringInfo copyCommand = NULL; PGresult *result = NULL; diff --git a/src/backend/distributed/connection/placement_connection.c b/src/backend/distributed/connection/placement_connection.c index ce802c7e5..1d3ab9008 100644 --- a/src/backend/distributed/connection/placement_connection.c +++ b/src/backend/distributed/connection/placement_connection.c @@ -12,6 +12,7 @@ #include "postgres.h" #include "access/hash.h" +#include "distributed/colocation_utils.h" #include "distributed/connection_management.h" #include "distributed/hash_helpers.h" #include "distributed/master_protocol.h" @@ -49,6 +50,10 @@ typedef struct ConnectionReference bool hadDML; bool hadDDL; + /* colocation group of the placement, if any */ + uint32 colocationGroupId; + uint32 representativeValue; + /* membership in MultiConnection->referencedPlacements */ dlist_node connectionNode; } ConnectionReference; @@ -174,10 +179,15 @@ typedef struct ConnectionShardHashEntry static HTAB *ConnectionShardHash; +static MultiConnection * FindPlacementListConnection(int flags, List *placementAccessList, + const char *userName, + List **placementEntryList); static ConnectionPlacementHashEntry * FindOrCreatePlacementEntry( ShardPlacement *placement); static bool CanUseExistingConnection(uint32 flags, const char *userName, ConnectionReference *placementConnection); +static bool ConnectionAccessedDifferentPlacement(MultiConnection *connection, + ShardPlacement *placement); static void AssociatePlacementWithShard(ConnectionPlacementHashEntry *placementEntry, ShardPlacement *placement); static bool CheckShardPlacements(ConnectionShardHashEntry *shardEntry); @@ -272,7 +282,6 @@ StartPlacementListConnection(uint32 flags, List *placementAccessList, const char *userName) { char *freeUserName = NULL; - bool foundModifyingConnection = false; ListCell *placementAccessCell = NULL; List *placementEntryList = NIL; ListCell *placementEntryCell = NULL; @@ -283,6 +292,163 @@ StartPlacementListConnection(uint32 flags, List *placementAccessList, userName = freeUserName = CurrentUserName(); } + chosenConnection = FindPlacementListConnection(flags, placementAccessList, userName, + &placementEntryList); + if (chosenConnection == NULL) + { + /* use the first placement from the list to extract nodename and nodeport */ + ShardPlacementAccess *placementAccess = + (ShardPlacementAccess *) linitial(placementAccessList); + ShardPlacement *placement = placementAccess->placement; + char *nodeName = placement->nodeName; + int nodePort = placement->nodePort; + + /* + * No suitable connection in the placement->connection mapping, get one from + * the node->connection pool. + */ + chosenConnection = StartNodeConnection(flags, nodeName, nodePort); + + if (flags & CONNECTION_PER_PLACEMENT && + ConnectionAccessedDifferentPlacement(chosenConnection, placement)) + { + /* + * Cached connection accessed a non-co-located placement in the same + * table or co-location group, while the caller asked for a connection + * per placement. Open a new connection instead. + * + * We use this for situations in which we want to use a different + * connection for every placement, such as COPY. If we blindly returned + * a cached conection that already modified a different, non-co-located + * placement B in the same table or in a table with the same co-location + * ID as the current placement, then we'd no longer able to write to + * placement B later in the COPY. + */ + chosenConnection = StartNodeConnection(flags | FORCE_NEW_CONNECTION, nodeName, + nodePort); + + Assert(!ConnectionAccessedDifferentPlacement(chosenConnection, placement)); + } + } + + /* + * Now that a connection has been chosen, initialise or update the connection + * references for all placements. + */ + forboth(placementAccessCell, placementAccessList, + placementEntryCell, placementEntryList) + { + ShardPlacementAccess *placementAccess = + (ShardPlacementAccess *) lfirst(placementAccessCell); + ShardPlacementAccessType accessType = placementAccess->accessType; + ConnectionPlacementHashEntry *placementEntry = + (ConnectionPlacementHashEntry *) lfirst(placementEntryCell); + ConnectionReference *placementConnection = placementEntry->primaryConnection; + + if (placementConnection->connection == chosenConnection) + { + /* using the connection that was already assigned to the placement */ + } + else if (placementConnection->connection == NULL) + { + /* placement does not have a connection assigned yet */ + placementConnection->connection = chosenConnection; + placementConnection->hadDDL = false; + placementConnection->hadDML = false; + placementConnection->userName = MemoryContextStrdup(TopTransactionContext, + userName); + + /* record association with connection */ + dlist_push_tail(&chosenConnection->referencedPlacements, + &placementConnection->connectionNode); + } + else + { + /* using a different connection than the one assigned to the placement */ + + if (accessType != PLACEMENT_ACCESS_SELECT) + { + /* + * We previously read from the placement, but now we're writing to + * it (if we had written to the placement, we would have either chosen + * the same connection, or errored out). Update the connection reference + * to point to the connection used for writing. We don't need to remember + * the existing connection since we won't be able to reuse it for + * accessing the placement. However, we do register that it exists in + * hasSecondaryConnections. + */ + placementConnection->connection = chosenConnection; + placementConnection->userName = MemoryContextStrdup(TopTransactionContext, + userName); + + Assert(!placementConnection->hadDDL); + Assert(!placementConnection->hadDML); + + /* record association with connection */ + dlist_push_tail(&chosenConnection->referencedPlacements, + &placementConnection->connectionNode); + } + + /* + * There are now multiple connections that read from the placement + * and DDL commands are forbidden. + */ + placementEntry->hasSecondaryConnections = true; + + if (placementEntry->colocatedEntry != NULL) + { + /* we also remember this for co-located placements */ + placementEntry->colocatedEntry->hasSecondaryConnections = true; + } + } + + /* + * Remember that we used the current connection for writes. + */ + if (accessType == PLACEMENT_ACCESS_DDL) + { + placementConnection->hadDDL = true; + } + + if (accessType == PLACEMENT_ACCESS_DML) + { + placementConnection->hadDML = true; + } + } + + if (freeUserName) + { + pfree(freeUserName); + } + + return chosenConnection; +} + + +/* + * FindPlacementListConnection determines whether there is a connection that must + * be used to perform the given placement accesses. + * + * If a placement was only read in this transaction, then the same connection must + * be used for DDL to prevent self-deadlock. If a placement was modified in this + * transaction, then the same connection must be used for all subsequent accesses + * to ensure read-your-writes consistency and prevent self-deadlock. If those + * conditions cannot be met, because a connection is in use or the placements in + * the placement access list were modified over multiple connections, then this + * function throws an error. + * + * The function returns the connection that needs to be used, if such a connection + * exists, and the current placement entries for all placements in the placement + * access list. + */ +static MultiConnection * +FindPlacementListConnection(int flags, List *placementAccessList, const char *userName, + List **placementEntryList) +{ + bool foundModifyingConnection = false; + ListCell *placementAccessCell = NULL; + MultiConnection *chosenConnection = NULL; + /* * Go through all placement accesses to find a suitable connection. * @@ -474,108 +640,7 @@ StartPlacementListConnection(uint32 flags, List *placementAccessList, Assert(accessType != PLACEMENT_ACCESS_DDL); } - placementEntryList = lappend(placementEntryList, placementEntry); - } - - if (chosenConnection == NULL) - { - /* use the first placement from the list to extract nodename and nodeport */ - ShardPlacementAccess *placementAccess = - (ShardPlacementAccess *) linitial(placementAccessList); - ShardPlacement *placement = placementAccess->placement; - - /* - * No suitable connection in the placement->connection mapping, get one from - * the node->connection pool. - */ - chosenConnection = StartNodeConnection(flags, placement->nodeName, - placement->nodePort); - } - - /* - * Now that a connection has been chosen, initialise or update the connection - * references for all placements. - */ - forboth(placementAccessCell, placementAccessList, - placementEntryCell, placementEntryList) - { - ShardPlacementAccess *placementAccess = - (ShardPlacementAccess *) lfirst(placementAccessCell); - ShardPlacementAccessType accessType = placementAccess->accessType; - ConnectionPlacementHashEntry *placementEntry = - (ConnectionPlacementHashEntry *) lfirst(placementEntryCell); - ConnectionReference *placementConnection = placementEntry->primaryConnection; - - if (placementConnection->connection == chosenConnection) - { - /* using the connection that was already assigned to the placement */ - } - else if (placementConnection->connection == NULL) - { - /* placement does not have a connection assigned yet */ - placementConnection->connection = chosenConnection; - placementConnection->hadDDL = false; - placementConnection->hadDML = false; - placementConnection->userName = MemoryContextStrdup(TopTransactionContext, - userName); - - /* record association with connection, to handle connection closure */ - dlist_push_tail(&chosenConnection->referencedPlacements, - &placementConnection->connectionNode); - } - else - { - /* using a different connection than the one assigned to the placement */ - - if (accessType != PLACEMENT_ACCESS_SELECT) - { - /* - * We previously read from the placement, but now we're writing to - * it (if we had written to the placement, we would have either chosen - * the same connection, or errored out). Update the connection reference - * to point to the connection used for writing. We don't need to remember - * the existing connection since we won't be able to reuse it for - * accessing the placement. However, we do register that it exists in - * hasSecondaryConnections. - */ - placementConnection->connection = chosenConnection; - placementConnection->userName = MemoryContextStrdup(TopTransactionContext, - userName); - - Assert(!placementConnection->hadDDL); - Assert(!placementConnection->hadDML); - } - - /* - * There are now multiple connections that read from the placement - * and DDL commands are forbidden. - */ - placementEntry->hasSecondaryConnections = true; - - if (placementEntry->colocatedEntry != NULL) - { - /* we also remember this for co-located placements */ - placementEntry->colocatedEntry->hasSecondaryConnections = true; - } - } - - /* - * Remember that we used the current connection for writes. - */ - if (accessType == PLACEMENT_ACCESS_DDL) - { - placementConnection->hadDDL = true; - } - - if (accessType == PLACEMENT_ACCESS_DML) - { - placementConnection->hadDML = true; - } - } - - if (freeUserName) - { - pfree(freeUserName); + *placementEntryList = lappend(*placementEntryList, placementEntry); } return chosenConnection; @@ -625,11 +690,21 @@ FindOrCreatePlacementEntry(ShardPlacement *placement) void *conRef = MemoryContextAllocZero(TopTransactionContext, sizeof(ConnectionReference)); + ConnectionReference *connectionReference = (ConnectionReference *) conRef; + + /* + * Store the co-location group information such that we can later + * determine whether a connection accessed different placements + * of the same co-location group. + */ + connectionReference->colocationGroupId = placement->colocationGroupId; + connectionReference->representativeValue = placement->representativeValue; + /* * Create a connection reference that can be used for the entire * set of co-located placements. */ - colocatedEntry->primaryConnection = (ConnectionReference *) conRef; + colocatedEntry->primaryConnection = connectionReference; colocatedEntry->hasSecondaryConnections = false; } @@ -694,6 +769,35 @@ CanUseExistingConnection(uint32 flags, const char *userName, } +/* + * ConnectionAccessedDifferentPlacement returns true if the connection accessed another + * placement in the same colocation group with a different representative value, + * meaning it's not strictly colocated. + */ +static bool +ConnectionAccessedDifferentPlacement(MultiConnection *connection, + ShardPlacement *placement) +{ + dlist_iter placementIter; + + dlist_foreach(placementIter, &connection->referencedPlacements) + { + ConnectionReference *connectionReference = + dlist_container(ConnectionReference, connectionNode, placementIter.cur); + + if (placement->colocationGroupId != INVALID_COLOCATION_ID && + placement->colocationGroupId == connectionReference->colocationGroupId && + placement->representativeValue != connectionReference->representativeValue) + { + /* non-co-located placements from the same co-location group */ + return true; + } + } + + return false; +} + + /* * AssociatePlacementWithShard records shard->placement relation in * ConnectionShardHash. diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index 765fd3084..423b5e551 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -482,6 +482,12 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements, List *claimedConnectionList = NIL; ListCell *connectionCell = NULL; ListCell *shardPlacementCell = NULL; + int connectionFlags = FOR_DDL; + + if (useExclusiveConnection) + { + connectionFlags |= CONNECTION_PER_PLACEMENT; + } BeginOrContinueCoordinatedTransaction(); @@ -498,7 +504,7 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements, shardIndex = ShardIndex(shardInterval); } - connection = GetPlacementConnection(FOR_DDL, shardPlacement, + connection = GetPlacementConnection(connectionFlags, shardPlacement, placementOwner); if (useExclusiveConnection) { diff --git a/src/backend/distributed/transaction/multi_shard_transaction.c b/src/backend/distributed/transaction/multi_shard_transaction.c index af330858d..90c582d27 100644 --- a/src/backend/distributed/transaction/multi_shard_transaction.c +++ b/src/backend/distributed/transaction/multi_shard_transaction.c @@ -46,6 +46,8 @@ OpenTransactionsForAllTasks(List *taskList, int connectionFlags) shardConnectionHash = CreateShardConnectionHash(CurrentMemoryContext); + connectionFlags |= CONNECTION_PER_PLACEMENT; + /* open connections to shards which don't have connections yet */ foreach(taskCell, taskList) { diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 21d8a7bbe..fa736240e 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -37,7 +37,10 @@ enum MultiConnectionMode FOR_DDL = 1 << 2, - FOR_DML = 1 << 3 + FOR_DML = 1 << 3, + + /* open a connection per (co-located set of) placement(s) */ + CONNECTION_PER_PLACEMENT = 1 << 4 }; diff --git a/src/test/regress/expected/multi_alter_table_add_constraints.out b/src/test/regress/expected/multi_alter_table_add_constraints.out index 4f14d5575..7ae817e8a 100644 --- a/src/test/regress/expected/multi_alter_table_add_constraints.out +++ b/src/test/regress/expected/multi_alter_table_add_constraints.out @@ -441,9 +441,8 @@ SELECT create_distributed_table('products', 'product_no'); BEGIN; INSERT INTO products VALUES(1,'product_1', 5); --- DDL may error out after an INSERT because it might pick the wrong connection +-- DDL should pick the right connections after a single INSERT ALTER TABLE products ADD CONSTRAINT unn_pno UNIQUE(product_no); -ERROR: cannot establish a new connection for placement 1450407, since DML has been executed on a connection that is in use ROLLBACK; BEGIN; -- Add constraints diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index 6681b2544..67a21468b 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -1648,13 +1648,10 @@ INSERT INTO raw_events_first (user_id) SELECT user_id FROM raw_events_second JOIN reference_table USING (user_id); ERROR: cannot establish a new connection for placement 13300024, since DDL has been executed on a connection that is in use ROLLBACK; --- Insert after copy is disallowed when the INSERT INTO ... SELECT chooses --- to use a connection for one shard, while the connection already modified --- another shard. +-- Insert after copy is allowed BEGIN; COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ','; INSERT INTO raw_events_first SELECT * FROM raw_events_second; -ERROR: cannot establish a new connection for placement 13300005, since DML has been executed on a connection that is in use ROLLBACK; -- Insert after copy is currently allowed for single-shard operation. -- Both insert and copy are rolled back successfully. diff --git a/src/test/regress/sql/multi_alter_table_add_constraints.sql b/src/test/regress/sql/multi_alter_table_add_constraints.sql index a30d56296..51ce02a1a 100644 --- a/src/test/regress/sql/multi_alter_table_add_constraints.sql +++ b/src/test/regress/sql/multi_alter_table_add_constraints.sql @@ -382,7 +382,7 @@ SELECT create_distributed_table('products', 'product_no'); BEGIN; INSERT INTO products VALUES(1,'product_1', 5); --- DDL may error out after an INSERT because it might pick the wrong connection +-- DDL should pick the right connections after a single INSERT ALTER TABLE products ADD CONSTRAINT unn_pno UNIQUE(product_no); ROLLBACK; diff --git a/src/test/regress/sql/multi_insert_select.sql b/src/test/regress/sql/multi_insert_select.sql index c9e480a82..dfc70455a 100644 --- a/src/test/regress/sql/multi_insert_select.sql +++ b/src/test/regress/sql/multi_insert_select.sql @@ -1343,9 +1343,7 @@ INSERT INTO raw_events_first (user_id) SELECT user_id FROM raw_events_second JOIN reference_table USING (user_id); ROLLBACK; --- Insert after copy is disallowed when the INSERT INTO ... SELECT chooses --- to use a connection for one shard, while the connection already modified --- another shard. +-- Insert after copy is allowed BEGIN; COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ','; 100,100