From 63676f5d65439670cc28d4141b089eecacbb1f46 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Wed, 21 Jun 2017 12:38:19 +0200 Subject: [PATCH 1/6] Allow choosing a connection for multiple placements with GetPlacementListConnection --- .../connection/placement_connection.c | 739 ++++++++++-------- .../distributed/placement_connection.h | 31 + 2 files changed, 438 insertions(+), 332 deletions(-) diff --git a/src/backend/distributed/connection/placement_connection.c b/src/backend/distributed/connection/placement_connection.c index 2cf2d0f83..047fe041b 100644 --- a/src/backend/distributed/connection/placement_connection.c +++ b/src/backend/distributed/connection/placement_connection.c @@ -14,7 +14,9 @@ #include "access/hash.h" #include "distributed/connection_management.h" #include "distributed/hash_helpers.h" +#include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" +#include "distributed/multi_planner.h" #include "distributed/placement_connection.h" #include "utils/hsearch.h" #include "utils/memutils.h" @@ -47,17 +49,14 @@ typedef struct ConnectionReference bool hadDML; bool hadDDL; - /* - * Membership in ConnectionPlacementHashEntry->connectionReferences or - * ColocatedPlacementsHashEntry->connectionReferences. - */ - dlist_node placementNode; - /* membership in MultiConnection->referencedPlacements */ dlist_node connectionNode; } ConnectionReference; +struct ColocatedPlacementsHashEntry; + + /* * Hash table mapping placements to a list of connections. * @@ -85,11 +84,14 @@ typedef struct ConnectionPlacementHashEntry /* did any remote transactions fail? */ bool failed; - /* list of connections to remote nodes */ - dlist_head connectionReferences; + /* primary connection used to access the placement */ + ConnectionReference *primaryConnection; - /* if non-NULL, connection executing DML/DDL*/ - ConnectionReference *modifyingConnection; + /* are any other connections reading from the placements? */ + bool hasSecondaryConnections; + + /* entry for the set of co-located placements */ + struct ColocatedPlacementsHashEntry *colocatedEntry; /* membership in ConnectionShardHashEntry->placementConnections */ dlist_node shardNode; @@ -137,11 +139,11 @@ typedef struct ColocatedPlacementsHashEntry { ColocatedPlacementsHashKey key; - /* list of connections to remote nodes */ - dlist_head connectionReferences; + /* primary connection used to access the co-located placements */ + ConnectionReference *primaryConnection; - /* if non-NULL, connection executing DML/DDL*/ - ConnectionReference *modifyingConnection; + /* are any other connections reading from the placements? */ + bool hasSecondaryConnections; } ColocatedPlacementsHashEntry; static HTAB *ColocatedPlacementsHash; @@ -172,19 +174,10 @@ typedef struct ConnectionShardHashEntry static HTAB *ConnectionShardHash; -static MultiConnection * StartColocatedPlacementConnection(uint32 flags, - ShardPlacement *placement, - const char *userName); -static ConnectionReference * CheckExistingPlacementConnections(uint32 flags, - ConnectionPlacementHashEntry - *placementEntry, - const char *userName); -static ConnectionReference * CheckExistingColocatedConnections(uint32 flags, - ColocatedPlacementsHashEntry - *connectionEntry, - const char *userName); +static ConnectionPlacementHashEntry * FindOrCreatePlacementEntry( + ShardPlacement *placement); static bool CanUseExistingConnection(uint32 flags, const char *userName, - ConnectionReference *connectionReference); + ConnectionReference *placementConnection); static void AssociatePlacementWithShard(ConnectionPlacementHashEntry *placementEntry, ShardPlacement *placement); static bool CheckShardPlacements(ConnectionShardHashEntry *shardEntry); @@ -229,76 +222,350 @@ GetPlacementConnection(uint32 flags, ShardPlacement *placement, const char *user MultiConnection * StartPlacementConnection(uint32 flags, ShardPlacement *placement, const char *userName) { - ConnectionPlacementHashKey key; - ConnectionPlacementHashEntry *placementEntry = NULL; - ConnectionReference *returnConnectionReference = NULL; + ShardPlacementAccess *placementAccess = + (ShardPlacementAccess *) palloc0(sizeof(ShardPlacementAccess)); + + placementAccess->placement = placement; + + if (flags & FOR_DDL) + { + placementAccess->accessType = PLACEMENT_ACCESS_DDL; + } + else if (flags & FOR_DML) + { + placementAccess->accessType = PLACEMENT_ACCESS_DML; + } + else + { + placementAccess->accessType = PLACEMENT_ACCESS_SELECT; + } + + return StartPlacementListConnection(flags, list_make1(placementAccess), userName); +} + + +/* + * GetPlacementListConnection establishes a connection for a set of placement + * accesses. + * + * See StartPlacementListConnection for details. + */ +MultiConnection * +GetPlacementListConnection(uint32 flags, List *placementAccessList, const char *userName) +{ + MultiConnection *connection = StartPlacementListConnection(flags, placementAccessList, + userName); + + FinishConnectionEstablishment(connection); + return connection; +} + + +/* + * StartPlacementListConnection returns a connection to a remote node suitable for + * a placement accesses (SELECT, DML, DDL) or throws an error if no suitable + * connection can be established if would cause a self-deadlock or consistency + * violation. + */ +MultiConnection * +StartPlacementListConnection(uint32 flags, List *placementAccessList, + const char *userName) +{ char *freeUserName = NULL; - bool found = false; + bool foundModifyingConnection = false; + ListCell *placementAccessCell = NULL; + List *placementEntryList = NIL; + ListCell *placementEntryCell = NULL; + MultiConnection *chosenConnection = NULL; if (userName == NULL) { userName = freeUserName = CurrentUserName(); } - key.placementId = placement->placementId; - - /* lookup relevant hash entry */ - placementEntry = hash_search(ConnectionPlacementHash, &key, HASH_ENTER, &found); - if (!found) - { - dlist_init(&placementEntry->connectionReferences); - placementEntry->failed = false; - placementEntry->modifyingConnection = NULL; - } - /* - * Check whether any of the connections already associated with the - * placement can be reused, or violates FOR_DML/FOR_DDL constraints. - */ - returnConnectionReference = - CheckExistingPlacementConnections(flags, placementEntry, userName); - - /* - * Either no caching desired, or no connection already associated with the - * placement present. Check whether there's a usable connection associated - * for the set of colocated placements, or establish a new one. + * Go through all placement accesses to find a suitable connection. * - * Allocations are performed in transaction context, so we don't have to - * care about freeing in case of an early disconnect. + * If none of the placements have been accessed in this transaction, connection + * remains NULL. + * + * If one or more of the placements have been modified in this transaction, then + * use the connection that performed the write. If placements have been written + * over multiple connections or the connection is not available, error out. + * + * If placements have only been read in this transaction, then use the last + * suitable connection found for a placement in the placementAccessList. */ - if (returnConnectionReference == NULL) + foreach(placementAccessCell, placementAccessList) { - MultiConnection *connection = - StartColocatedPlacementConnection(flags, placement, userName); + ShardPlacementAccess *placementAccess = + (ShardPlacementAccess *) lfirst(placementAccessCell); + ShardPlacement *placement = placementAccess->placement; + ShardPlacementAccessType accessType = placementAccess->accessType; - returnConnectionReference = (ConnectionReference *) - MemoryContextAlloc(TopTransactionContext, - sizeof(ConnectionReference)); - returnConnectionReference->connection = connection; - returnConnectionReference->hadDDL = false; - returnConnectionReference->hadDML = false; - returnConnectionReference->userName = - MemoryContextStrdup(TopTransactionContext, userName); - dlist_push_tail(&placementEntry->connectionReferences, - &returnConnectionReference->placementNode); + ConnectionPlacementHashEntry *placementEntry = NULL; + ColocatedPlacementsHashEntry *colocatedEntry = NULL; + ConnectionReference *placementConnection = NULL; - /* record association with shard, for invalidation */ - AssociatePlacementWithShard(placementEntry, placement); + if (placement->shardId == INVALID_SHARD_ID) + { + /* + * When a SELECT prunes down to 0 shard, we use a dummy placement. + * In that case, we can fall back to the default connection. + * + * FIXME: this can be removed if we evaluate empty SELECTs locally. + */ + continue; + } - /* record association with connection, to handle connection closure */ - dlist_push_tail(&connection->referencedPlacements, - &returnConnectionReference->connectionNode); + placementEntry = FindOrCreatePlacementEntry(placement); + colocatedEntry = placementEntry->colocatedEntry; + placementConnection = placementEntry->primaryConnection; + + /* note: the Asserts below are primarily for clarifying the conditions */ + + if (placementConnection->connection == NULL) + { + /* no connection has been chosen for the placement */ + } + else if (accessType == PLACEMENT_ACCESS_DDL && + placementEntry->hasSecondaryConnections) + { + /* + * If a placement has been read over multiple connections (typically as + * a result of a reference table join) then a DDL command on the placement + * would create a self-deadlock. + */ + + Assert(placementConnection != NULL); + + ereport(ERROR, + (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("cannot perform DDL on a placement which has been read over " + "multiple connections"))); + } + else if (accessType == PLACEMENT_ACCESS_DDL && colocatedEntry != NULL && + colocatedEntry->hasSecondaryConnections) + { + /* + * If a placement has been read over multiple (uncommitted) connections + * then a DDL command on a co-located placement may create a self-deadlock + * if there exist some relationship between the co-located placements + * (e.g. foreign key, partitioning). + */ + + Assert(placementConnection != NULL); + + ereport(ERROR, + (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("cannot perform DDL on a placement if a co-located placement " + "has been read over multiple connections"))); + } + else if (foundModifyingConnection) + { + /* + * We already found a connection that performed writes on of the placements + * and must use it. + */ + + if ((placementConnection->hadDDL || placementConnection->hadDML) && + placementConnection->connection != chosenConnection) + { + /* + * The current placement may have been modified over a different + * connection. Neither connection is guaranteed to see all uncomitted + * writes and therefore we cannot proceed. + */ + ereport(ERROR, + (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("cannot perform query with placements that were " + "modified over multiple connections"))); + } + } + else if (CanUseExistingConnection(flags, userName, placementConnection)) + { + /* + * There is an existing connection for the placement and we can use it. + */ + + Assert(placementConnection != NULL); + + chosenConnection = placementConnection->connection; + + if (placementConnection->hadDDL || placementConnection->hadDML) + { + /* this connection performed writes, we must use it */ + foundModifyingConnection = true; + } + } + else if (placementConnection->hadDDL) + { + /* + * There is an existing connection, but we cannot use it and it executed + * DDL. Any subsequent operation needs to be able to see the results of + * the DDL command and thus cannot proceed if it cannot use the connection. + */ + + Assert(placementConnection != NULL); + Assert(!CanUseExistingConnection(flags, userName, placementConnection)); + + ereport(ERROR, + (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("cannot establish new placement connection when DDL has " + "been executed on existing placement connection"))); + } + else if (placementConnection->hadDML) + { + /* + * There is an existing connection, but we cannot use it and it executed + * DML. Any subsequent operation needs to be able to see the results of + * the DML command and thus cannot proceed if it cannot use the connection. + * + * Note that this is not meaningfully different from the previous case. We + * just produce a different error message based on whether DDL was or only + * DML was executed. + */ + + Assert(placementConnection != NULL); + Assert(!CanUseExistingConnection(flags, userName, placementConnection)); + Assert(!placementConnection->hadDDL); + + ereport(ERROR, + (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("cannot establish new placement connection when DML has " + "been executed on existing placement connection"))); + } + else if (accessType == PLACEMENT_ACCESS_DDL) + { + /* + * There is an existing connection, but we cannot use it and we want to + * execute DDL. The operation on the existing connection might conflict + * with the DDL statement. + */ + + Assert(placementConnection != NULL); + Assert(!CanUseExistingConnection(flags, userName, placementConnection)); + Assert(!placementConnection->hadDDL); + Assert(!placementConnection->hadDML); + + ereport(ERROR, + (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("cannot perform parallel DDL command because multiple " + "placements have been accessed over the same connection"))); + } + else + { + /* + * The placement has a connection assigned to it, but it cannot be used, + * most likely because it has been claimed exclusively. Fortunately, it + * has only been used for reads and we're not performing a DDL command. + * We can therefore use a different connection for this placement. + */ + + Assert(placementConnection != NULL); + Assert(!CanUseExistingConnection(flags, userName, placementConnection)); + Assert(!placementConnection->hadDDL); + Assert(!placementConnection->hadDML); + Assert(accessType != PLACEMENT_ACCESS_DDL); + } + + placementEntryList = lappend(placementEntryList, placementEntry); } - if (flags & FOR_DDL) + if (chosenConnection == NULL) { - placementEntry->modifyingConnection = returnConnectionReference; - returnConnectionReference->hadDDL = true; + /* use the first placement from the list to extract nodename and nodeport */ + ShardPlacementAccess *placementAccess = + (ShardPlacementAccess *) linitial(placementAccessList); + ShardPlacement *placement = placementAccess->placement; + + /* + * No suitable connection in the placement->connection mapping, get one from + * the node->connection pool. + */ + chosenConnection = StartNodeConnection(flags, placement->nodeName, + placement->nodePort); } - if (flags & FOR_DML) + + /* + * Now that a connection has been chosen, initialise or update the connection + * references for all placements. + */ + forboth(placementAccessCell, placementAccessList, + placementEntryCell, placementEntryList) { - placementEntry->modifyingConnection = returnConnectionReference; - returnConnectionReference->hadDML = true; + ShardPlacementAccess *placementAccess = + (ShardPlacementAccess *) lfirst(placementAccessCell); + ShardPlacementAccessType accessType = placementAccess->accessType; + ConnectionPlacementHashEntry *placementEntry = + (ConnectionPlacementHashEntry *) lfirst(placementEntryCell); + ConnectionReference *placementConnection = placementEntry->primaryConnection; + + if (placementConnection->connection == chosenConnection) + { + /* using the connection that was already assigned to the placement */ + } + else if (placementConnection->connection == NULL) + { + /* placement does not have a connection assigned yet */ + placementConnection->connection = chosenConnection; + placementConnection->hadDDL = false; + placementConnection->hadDML = false; + placementConnection->userName = MemoryContextStrdup(TopTransactionContext, + userName); + + /* record association with connection, to handle connection closure */ + dlist_push_tail(&chosenConnection->referencedPlacements, + &placementConnection->connectionNode); + } + else + { + /* using a different connection than the one assigned to the placement */ + + if (accessType != PLACEMENT_ACCESS_SELECT) + { + /* + * We previously read from the placement, but now we're writing to + * it (if we had written to the placement, we would have either chosen + * the same connection, or errored out). Update the connection reference + * to point to the connection used for writing. We don't need to remember + * the existing connection since we won't be able to reuse it for + * accessing the placement. However, we do register that it exists in + * hasSecondaryConnections. + */ + placementConnection->connection = chosenConnection; + placementConnection->userName = MemoryContextStrdup(TopTransactionContext, + userName); + + Assert(!placementConnection->hadDDL); + Assert(!placementConnection->hadDML); + } + + /* + * There are now multiple connections that read from the placement + * and DDL commands are forbidden. + */ + placementEntry->hasSecondaryConnections = true; + + if (placementEntry->colocatedEntry != NULL) + { + /* we also remember this for co-located placements */ + placementEntry->colocatedEntry->hasSecondaryConnections = true; + } + } + + /* + * Remember that we used the current connection for writes. + */ + if (accessType == PLACEMENT_ACCESS_DDL) + { + placementConnection->hadDDL = true; + } + + if (accessType == PLACEMENT_ACCESS_DML) + { + placementConnection->hadDML = true; + } } if (freeUserName) @@ -306,195 +573,82 @@ StartPlacementConnection(uint32 flags, ShardPlacement *placement, const char *us pfree(freeUserName); } - return returnConnectionReference->connection; + return chosenConnection; } /* - * CheckExistingPlacementConnections check whether any of the existing - * connections is usable. If so, return it, otherwise return NULL. - * - * A connection is usable if it is not in use, the user matches, DDL/DML usage - * match and cached connection are allowed. If no existing connection - * matches, but a new connection would conflict (e.g. a connection already - * exists but isn't usable, and the desired connection needs to execute - * DML/DML) an error is thrown. + * FindOrCreatePlacementEntry finds a placement entry in either the + * placement->connection hash or the co-located placements->connection hash, + * or adds a new entry if the placement has not yet been accessed in the + * current transaction. */ -static ConnectionReference * -CheckExistingPlacementConnections(uint32 flags, - ConnectionPlacementHashEntry *placementEntry, - const char *userName) +static ConnectionPlacementHashEntry * +FindOrCreatePlacementEntry(ShardPlacement *placement) { - dlist_iter it; + ConnectionPlacementHashKey key; + ConnectionPlacementHashEntry *placementEntry = NULL; + bool found = false; - /* - * If there's a connection that has executed DML/DDL, always return it if - * possible. That's because we only can execute DML/DDL over that - * connection. Checking this first also allows us to detect conflicts due - * to opening a second connection that wants to execute DML/DDL. - */ - if (placementEntry->modifyingConnection) + key.placementId = placement->placementId; + + placementEntry = hash_search(ConnectionPlacementHash, &key, HASH_ENTER, &found); + if (!found) { - ConnectionReference *connectionReference = placementEntry->modifyingConnection; + /* no connection has been chosen for this placement */ + placementEntry->failed = false; + placementEntry->primaryConnection = NULL; + placementEntry->hasSecondaryConnections = false; + placementEntry->colocatedEntry = NULL; - if (CanUseExistingConnection(flags, userName, connectionReference)) + if (placement->partitionMethod == DISTRIBUTE_BY_HASH || + placement->partitionMethod == DISTRIBUTE_BY_NONE) { - return connectionReference; - } + ColocatedPlacementsHashKey key; + ColocatedPlacementsHashEntry *colocatedEntry = NULL; - if (connectionReference->hadDDL) - { - /* would deadlock otherwise */ - ereport(ERROR, - (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), - errmsg("cannot establish new placement connection when DDL has been " - "executed on existing placement connection"))); - } - else if (connectionReference->hadDML) - { - /* we'd not see the other connection's contents */ - ereport(ERROR, - (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), - errmsg("cannot establish new placement connection when DML has been " - "executed on existing placement connection"))); + strcpy(key.nodeName, placement->nodeName); + key.nodePort = placement->nodePort; + key.colocationGroupId = placement->colocationGroupId; + key.representativeValue = placement->representativeValue; + + /* look for a connection assigned to co-located placements */ + colocatedEntry = hash_search(ColocatedPlacementsHash, &key, HASH_ENTER, + &found); + if (!found) + { + void *conRef = MemoryContextAllocZero(TopTransactionContext, + sizeof(ConnectionReference)); + + /* + * Create a connection reference that can be used for the entire + * set of co-located placements. + */ + colocatedEntry->primaryConnection = (ConnectionReference *) conRef; + + colocatedEntry->hasSecondaryConnections = false; + } + + /* + * Assign the connection reference for the set of co-located placements + * to the current placement. + */ + placementEntry->primaryConnection = colocatedEntry->primaryConnection; + placementEntry->colocatedEntry = colocatedEntry; } else { - /* modifying xact should have executed DML/DDL */ - Assert(false); + void *conRef = MemoryContextAllocZero(TopTransactionContext, + sizeof(ConnectionReference)); + + placementEntry->primaryConnection = (ConnectionReference *) conRef; } } - /* - * Search existing connections for a reusable connection. - */ - dlist_foreach(it, &placementEntry->connectionReferences) - { - ConnectionReference *connectionReference = - dlist_container(ConnectionReference, placementNode, it.cur); + /* record association with shard, for invalidation */ + AssociatePlacementWithShard(placementEntry, placement); - if (CanUseExistingConnection(flags, userName, connectionReference)) - { - return connectionReference; - } - - /* - * If not using the connection, verify that FOR_DML/DDL flags are - * compatible. - */ - if (flags & FOR_DDL) - { - /* would deadlock otherwise */ - ereport(ERROR, - (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), - errmsg("cannot establish new placement connection for DDL when " - "other connections exist"))); - } - else if (flags & FOR_DML) - { - /* - * We could allow this case (as presumably the SELECT is done, but - * it'd be a bit prone to programming errors, so we disallow for - * now. - */ - ereport(ERROR, - (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), - errmsg("cannot establish new placement connection for DML when " - "other connections exist"))); - } - } - - /* establish a new connection */ - return NULL; -} - - -static ConnectionReference * -CheckExistingColocatedConnections(uint32 flags, - ColocatedPlacementsHashEntry *connectionsEntry, - const char *userName) -{ - dlist_iter it; - - /* - * If there's a connection that has executed DML/DDL, always return it if - * possible. That's because we only can execute DML/DDL over that - * connection. Checking this first also allows us to detect conflicts due - * to opening a second connection that wants to execute DML/DDL. - */ - if (connectionsEntry->modifyingConnection) - { - ConnectionReference *connectionReference = connectionsEntry->modifyingConnection; - - if (CanUseExistingConnection(flags, userName, connectionReference)) - { - return connectionReference; - } - - if (connectionReference->hadDDL) - { - /* would deadlock otherwise */ - ereport(ERROR, - (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), - errmsg("cannot establish new placement connection when DDL has been " - "executed on existing placement connection for a colocated placement"))); - } - else if (connectionReference->hadDML) - { - /* we'd not see the other connection's contents */ - ereport(ERROR, - (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), - errmsg("cannot establish new placement connection when DML has been " - "executed on existing placement connection for a colocated placement"))); - } - else - { - /* modifying xact should have executed DML/DDL */ - Assert(false); - } - } - - /* - * Search existing connections for a reusable connection. - */ - dlist_foreach(it, &connectionsEntry->connectionReferences) - { - ConnectionReference *connectionReference = - dlist_container(ConnectionReference, placementNode, it.cur); - - if (CanUseExistingConnection(flags, userName, connectionReference)) - { - return connectionReference; - } - - /* - * If not using the connection, verify that FOR_DML/DDL flags are - * compatible. - */ - if (flags & FOR_DDL) - { - /* would deadlock otherwise */ - ereport(ERROR, - (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), - errmsg("cannot establish new placement connection for DDL when " - "other connections exist for colocated placements"))); - } - else if (flags & FOR_DML) - { - /* - * We could allow this case (as presumably the SELECT is done, but - * it'd be a bit prone to programming errors, so we disallow for - * now. - */ - ereport(ERROR, - (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), - errmsg("cannot establish new placement connection for DML when " - "other connections exist for colocated placements"))); - } - } - - /* establish a new connection */ - return NULL; + return placementEntry; } @@ -535,86 +689,6 @@ CanUseExistingConnection(uint32 flags, const char *userName, } -/* - * StartColocatedPlacementConnection returns a connection that's usable for - * the passed in flags/placement. - * - * It does so by first checking whether any connection is already associated - * with the relevant entry in the colocated placement hash, or by asking for a - * formerly unassociated (possibly new) connection. Errors out if there's - * conflicting connections. - */ -static MultiConnection * -StartColocatedPlacementConnection(uint32 flags, ShardPlacement *placement, - const char *userName) -{ - ConnectionReference *connectionReference = NULL; - ColocatedPlacementsHashKey key; - ColocatedPlacementsHashEntry *entry; - bool found; - - /* - * Currently only hash partitioned tables can be colocated. For everything - * else we just return a new connection. - */ - if (placement->partitionMethod != DISTRIBUTE_BY_HASH) - { - return StartNodeConnection(flags, placement->nodeName, placement->nodePort); - } - - strcpy(key.nodeName, placement->nodeName); - key.nodePort = placement->nodePort; - key.colocationGroupId = placement->colocationGroupId; - key.representativeValue = placement->representativeValue; - - entry = hash_search(ColocatedPlacementsHash, &key, HASH_ENTER, &found); - if (!found) - { - memset(((char *) entry) + sizeof(ColocatedPlacementsHashKey), - 0, - sizeof(ColocatedPlacementsHashEntry) - sizeof(ColocatedPlacementsHashKey)); - dlist_init(&entry->connectionReferences); - } - - connectionReference = - CheckExistingColocatedConnections(flags, entry, userName); - - if (!connectionReference) - { - MultiConnection *connection = NULL; - - connection = StartNodeConnection(flags, placement->nodeName, placement->nodePort); - connectionReference = (ConnectionReference *) - MemoryContextAlloc(TopTransactionContext, - sizeof(ConnectionReference)); - connectionReference->connection = connection; - connectionReference->hadDDL = false; - connectionReference->hadDML = false; - connectionReference->userName = - MemoryContextStrdup(TopTransactionContext, userName); - dlist_push_tail(&entry->connectionReferences, - &connectionReference->placementNode); - - /* record association with connection, to handle connection closure */ - dlist_push_tail(&connection->referencedPlacements, - &connectionReference->connectionNode); - } - - if (flags & FOR_DDL) - { - entry->modifyingConnection = connectionReference; - connectionReference->hadDDL = true; - } - if (flags & FOR_DML) - { - entry->modifyingConnection = connectionReference; - connectionReference->hadDML = true; - } - - return connectionReference->connection; -} - - /* * AssociatePlacementWithShard records shard->placement relation in * ConnectionShardHash. @@ -680,7 +754,7 @@ CloseShardPlacementAssociation(struct MultiConnection *connection) /* * Note that we don't reset ConnectionPlacementHashEntry's - * modifyingConnection here, that'd more complicated than it seems + * primaryConnection here, that'd more complicated than it seems * worth. That means we'll error out spuriously if a DML/DDL * executing connection is closed earlier in a transaction. */ @@ -823,16 +897,17 @@ CheckShardPlacements(ConnectionShardHashEntry *shardEntry) { ConnectionPlacementHashEntry *placementEntry = dlist_container(ConnectionPlacementHashEntry, shardNode, placementIter.cur); - ConnectionReference *modifyingConnection = placementEntry->modifyingConnection; + ConnectionReference *primaryConnection = placementEntry->primaryConnection; MultiConnection *connection = NULL; /* we only consider shards that are modified */ - if (modifyingConnection == NULL) + if (primaryConnection == NULL || + !(primaryConnection->hadDDL || primaryConnection->hadDML)) { continue; } - connection = modifyingConnection->connection; + connection = primaryConnection->connection; if (!connection || connection->remoteTransaction.transactionFailed) { diff --git a/src/include/distributed/placement_connection.h b/src/include/distributed/placement_connection.h index 493e74c52..71e7a1358 100644 --- a/src/include/distributed/placement_connection.h +++ b/src/include/distributed/placement_connection.h @@ -15,6 +15,30 @@ /* forward declare, to avoid dependency on ShardPlacement definition */ struct ShardPlacement; +/* represents the way in which a placement is accessed */ +typedef enum ShardPlacementAccessType +{ + /* read from placement */ + PLACEMENT_ACCESS_SELECT, + + /* modify rows in placement */ + PLACEMENT_ACCESS_DML, + + /* modify placement schema */ + PLACEMENT_ACCESS_DDL +} ShardPlacementAccessType; + +/* represents access to a placement */ +typedef struct ShardPlacementAccess +{ + /* placement that is accessed */ + struct ShardPlacement *placement; + + /* the way in which the placement is accessed */ + ShardPlacementAccessType accessType; +} ShardPlacementAccess; + + extern MultiConnection * GetPlacementConnection(uint32 flags, struct ShardPlacement *placement, const char *userName); @@ -22,6 +46,13 @@ extern MultiConnection * StartPlacementConnection(uint32 flags, struct ShardPlacement *placement, const char *userName); +extern MultiConnection * GetPlacementListConnection(uint32 flags, + List *placementAccessList, + const char *userName); +extern MultiConnection * StartPlacementListConnection(uint32 flags, + List *placementAccessList, + const char *userName); + extern void ResetPlacementConnectionManagement(void); extern void MarkFailedShardPlacements(void); extern void PostCommitMarkFailedShardPlacements(bool using2PC); From 01c9b1f921b4907f977e35b6db042c8c6341c8df Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Wed, 21 Jun 2017 13:25:49 +0200 Subject: [PATCH 2/6] Use GetPlacementListConnection for router SELECTs --- .../executor/multi_router_executor.c | 81 ++++++++++++++++++- .../distributed/utils/metadata_cache.c | 37 +++++++++ src/include/distributed/metadata_cache.h | 2 + .../distributed/multi_physical_planner.h | 2 +- .../distributed/multi_router_executor.h | 2 + 5 files changed, 121 insertions(+), 3 deletions(-) diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index ed7e1a2f5..6a4beb1c8 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -75,6 +75,8 @@ bool EnableDeadlockPrevention = true; /* functions needed during run phase */ static void ReacquireMetadataLocks(List *taskList); static void AssignInsertTaskShardId(Query *jobQuery, List *taskList); +static ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement, + ShardPlacementAccessType accessType); static void ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResults); static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task); @@ -601,6 +603,7 @@ ExecuteSingleSelectTask(CitusScanState *scanState, Task *task) List *taskPlacementList = task->taskPlacementList; ListCell *taskPlacementCell = NULL; char *queryString = task->queryString; + List *relationShardList = task->relationShardList; if (XactModificationLevel == XACT_MODIFICATION_MULTI_SHARD) { @@ -620,8 +623,32 @@ ExecuteSingleSelectTask(CitusScanState *scanState, Task *task) bool dontFailOnError = false; int64 currentAffectedTupleCount = 0; int connectionFlags = SESSION_LIFESPAN; - MultiConnection *connection = - GetPlacementConnection(connectionFlags, taskPlacement, NULL); + List *placementAccessList = NIL; + MultiConnection *connection = NULL; + + if (list_length(relationShardList) > 0) + { + placementAccessList = BuildPlacementSelectList(taskPlacement->nodeName, + taskPlacement->nodePort, + relationShardList); + } + else + { + /* + * When the SELECT prunes down to 0 shards, just use the dummy placement. + * + * FIXME: it would be preferable to evaluate the SELECT locally since no + * data from the workers is required. + */ + + ShardPlacementAccess *placementAccess = + CreatePlacementAccess(taskPlacement, PLACEMENT_ACCESS_SELECT); + + placementAccessList = list_make1(placementAccess); + } + + connection = GetPlacementListConnection(connectionFlags, placementAccessList, + NULL); queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo); if (!queryOK) @@ -641,6 +668,56 @@ ExecuteSingleSelectTask(CitusScanState *scanState, Task *task) } +/* + * BuildPlacementSelectList builds a list of SELECT placement accesses + * which can be used to call StartPlacementListConnection or + * GetPlacementListConnection. + */ +List * +BuildPlacementSelectList(char *nodeName, int nodePort, List *relationShardList) +{ + ListCell *relationShardCell = NULL; + List *placementAccessList = NIL; + + foreach(relationShardCell, relationShardList) + { + RelationShard *relationShard = (RelationShard *) lfirst(relationShardCell); + ShardPlacement *placement = NULL; + ShardPlacementAccess *placementAccess = NULL; + + placement = FindShardPlacementOnNode(nodeName, nodePort, relationShard->shardId); + if (placement == NULL) + { + ereport(ERROR, (errmsg("no active placement of shard %ld found on node " + "%s:%d", + relationShard->shardId, nodeName, nodePort))); + } + + placementAccess = CreatePlacementAccess(placement, PLACEMENT_ACCESS_SELECT); + placementAccessList = lappend(placementAccessList, placementAccess); + } + + return placementAccessList; +} + + +/* + * CreatePlacementAccess returns a new ShardPlacementAccess for the given placement + * and access type. + */ +static ShardPlacementAccess * +CreatePlacementAccess(ShardPlacement *placement, ShardPlacementAccessType accessType) +{ + ShardPlacementAccess *placementAccess = NULL; + + placementAccess = (ShardPlacementAccess *) palloc0(sizeof(ShardPlacementAccess)); + placementAccess->placement = placement; + placementAccess->accessType = accessType; + + return placementAccess; +} + + /* * ExecuteSingleModifyTask executes the task on the remote node, retrieves the * results and stores them, if RETURNING is used, in a tuple store. diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 9dd62087e..a9428568c 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -356,6 +356,43 @@ LoadShardPlacement(uint64 shardId, uint64 placementId) } +/* + * FindShardPlacementOnNode returns the shard placement for the given shard + * on the given node, or returns NULL of no placement for the shard exists + * on the node. + */ +ShardPlacement * +FindShardPlacementOnNode(char *nodeName, int nodePort, uint64 shardId) +{ + ShardCacheEntry *shardEntry = NULL; + DistTableCacheEntry *tableEntry = NULL; + ShardPlacement *placementArray = NULL; + int numberOfPlacements = 0; + ShardPlacement *placementOnNode = NULL; + int placementIndex = 0; + + shardEntry = LookupShardCacheEntry(shardId); + tableEntry = shardEntry->tableEntry; + placementArray = tableEntry->arrayOfPlacementArrays[shardEntry->shardIndex]; + numberOfPlacements = tableEntry->arrayOfPlacementArrayLengths[shardEntry->shardIndex]; + + for (placementIndex = 0; placementIndex < numberOfPlacements; placementIndex++) + { + ShardPlacement *placement = &placementArray[placementIndex]; + + if (strncmp(nodeName, placement->nodeName, WORKER_LENGTH) == 0 && + nodePort == placement->nodePort) + { + placementOnNode = CitusMakeNode(ShardPlacement); + CopyShardPlacement(placement, placementOnNode); + break; + } + } + + return placementOnNode; +} + + /* * ShardPlacementList returns the list of placements for the given shard from * the cache. diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 48b03672e..ce3ac71f7 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -70,6 +70,8 @@ extern bool IsDistributedTable(Oid relationId); extern List * DistributedTableList(void); extern ShardInterval * LoadShardInterval(uint64 shardId); extern ShardPlacement * LoadShardPlacement(uint64 shardId, uint64 placementId); +extern ShardPlacement * FindShardPlacementOnNode(char *nodeName, int nodePort, + uint64 shardId); extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId); extern int GetLocalGroupId(void); extern List * DistTableOidList(void); diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 4593da4da..1f43851f3 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -186,7 +186,7 @@ typedef struct Task char replicationModel; /* only applies to modify tasks */ bool insertSelectQuery; - List *relationShardList; /* only applies INSERT/SELECT tasks */ + List *relationShardList; } Task; diff --git a/src/include/distributed/multi_router_executor.h b/src/include/distributed/multi_router_executor.h index b5db9ddf9..5aae0190d 100644 --- a/src/include/distributed/multi_router_executor.h +++ b/src/include/distributed/multi_router_executor.h @@ -43,5 +43,7 @@ extern TupleTableSlot * RouterMultiModifyExecScan(CustomScanState *node); extern int64 ExecuteModifyTasksWithoutResults(List *taskList); extern void ExecuteTasksSequentiallyWithoutResults(List *taskList); +extern List * BuildPlacementSelectList(char *nodeName, int nodePort, + List *relationShardList); #endif /* MULTI_ROUTER_EXECUTOR_H_ */ From 29f21fea596bce87f2c96607876e646423b9e3c3 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Wed, 21 Jun 2017 13:29:08 +0200 Subject: [PATCH 3/6] Use GetPlacementListConnection for multi-shard commands --- .../executor/multi_router_executor.c | 30 +--------- .../master/master_modify_multiple_shards.c | 2 +- .../transaction/multi_shard_transaction.c | 57 +++++++++++++++---- .../distributed/multi_shard_transaction.h | 3 +- 4 files changed, 48 insertions(+), 44 deletions(-) diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 6a4beb1c8..d8cfd88db 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -86,7 +86,6 @@ static void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList, bool isModificationQuery, bool expectResults); static int64 ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListInfo, CitusScanState *scanState); -static List * TaskShardIntervalList(List *taskList); static void AcquireExecutorShardLock(Task *task, CmdType commandType); static void AcquireExecutorMultiShardLocks(List *taskList); static bool RequiresConsistentSnapshot(Task *task); @@ -1020,7 +1019,6 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn ListCell *taskCell = NULL; Task *firstTask = NULL; int connectionFlags = 0; - List *shardIntervalList = NIL; List *affectedTupleCountList = NIL; HTAB *shardConnectionHash = NULL; bool tasksPending = true; @@ -1039,8 +1037,6 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn "commands"))); } - shardIntervalList = TaskShardIntervalList(taskList); - /* ensure that there are no concurrent modifications on the same shards */ AcquireExecutorMultiShardLocks(taskList); @@ -1064,8 +1060,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn } /* open connection to all relevant placements, if not already open */ - shardConnectionHash = OpenTransactionsToAllShardPlacements(shardIntervalList, - connectionFlags); + shardConnectionHash = OpenTransactionsForAllTasks(taskList, connectionFlags); XactModificationLevel = XACT_MODIFICATION_MULTI_SHARD; @@ -1203,29 +1198,6 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn } -/* - * TaskShardIntervalList returns a list of shard intervals for a given list of - * tasks. - */ -static List * -TaskShardIntervalList(List *taskList) -{ - ListCell *taskCell = NULL; - List *shardIntervalList = NIL; - - foreach(taskCell, taskList) - { - Task *task = (Task *) lfirst(taskCell); - int64 shardId = task->anchorShardId; - ShardInterval *shardInterval = LoadShardInterval(shardId); - - shardIntervalList = lappend(shardIntervalList, shardInterval); - } - - return shardIntervalList; -} - - /* * SendQueryInSingleRowMode sends the given query on the connection in an * asynchronous way. The function also sets the single-row mode on the diff --git a/src/backend/distributed/master/master_modify_multiple_shards.c b/src/backend/distributed/master/master_modify_multiple_shards.c index 08d9bc291..0224759d3 100644 --- a/src/backend/distributed/master/master_modify_multiple_shards.c +++ b/src/backend/distributed/master/master_modify_multiple_shards.c @@ -210,7 +210,7 @@ ModifyMultipleShardsTaskList(Query *query, List *shardIntervalList, Oid relation task = CitusMakeNode(Task); task->jobId = jobId; task->taskId = taskId++; - task->taskType = SQL_TASK; + task->taskType = MODIFY_TASK; task->queryString = shardQueryString->data; task->dependedTaskList = NULL; task->replicationModel = REPLICATION_MODEL_INVALID; diff --git a/src/backend/distributed/transaction/multi_shard_transaction.c b/src/backend/distributed/transaction/multi_shard_transaction.c index d5962f57b..c6e841dd3 100644 --- a/src/backend/distributed/transaction/multi_shard_transaction.c +++ b/src/backend/distributed/transaction/multi_shard_transaction.c @@ -17,6 +17,7 @@ #include "distributed/connection_management.h" #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" +#include "distributed/multi_router_executor.h" #include "distributed/multi_shard_transaction.h" #include "distributed/placement_connection.h" #include "distributed/shardinterval_utils.h" @@ -30,25 +31,27 @@ /* - * OpenTransactionsToAllShardPlacements opens connections to all placements - * using the provided shard identifier list and returns it as a shard ID -> - * ShardConnections hash. connectionFlags can be used to specify whether - * the command is FOR_DML or FOR_DDL. + * OpenTransactionsForAllTasks opens a connection for each task, + * taking into account which shards are read and modified by the task + * to select the appopriate connection, or error out if no appropriate + * connection can be found. The set of connections is returned as an + * anchor shard ID -> ShardConnections hash. */ HTAB * -OpenTransactionsToAllShardPlacements(List *shardIntervalList, int connectionFlags) +OpenTransactionsForAllTasks(List *taskList, int connectionFlags) { HTAB *shardConnectionHash = NULL; - ListCell *shardIntervalCell = NULL; + ListCell *taskCell = NULL; List *newConnectionList = NIL; shardConnectionHash = CreateShardConnectionHash(CurrentMemoryContext); /* open connections to shards which don't have connections yet */ - foreach(shardIntervalCell, shardIntervalList) + foreach(taskCell, taskList) { - ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); - uint64 shardId = shardInterval->shardId; + Task *task = (Task *) lfirst(taskCell); + ShardPlacementAccessType accessType = PLACEMENT_ACCESS_SELECT; + uint64 shardId = task->anchorShardId; ShardConnections *shardConnections = NULL; bool shardConnectionsFound = false; List *shardPlacementList = NIL; @@ -69,9 +72,24 @@ OpenTransactionsToAllShardPlacements(List *shardIntervalList, int connectionFlag UINT64_FORMAT, shardId))); } + if (task->taskType == MODIFY_TASK) + { + accessType = PLACEMENT_ACCESS_DML; + } + else + { + /* can only open connections for DDL and DML commands */ + Assert(task->taskType == DDL_TASK); + + accessType = PLACEMENT_ACCESS_DDL; + } + foreach(placementCell, shardPlacementList) { ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(placementCell); + ShardPlacementAccess placementModification; + List *placementAccessList = NIL; + List *placementSelectList = NIL; MultiConnection *connection = NULL; WorkerNode *workerNode = FindWorkerNode(shardPlacement->nodeName, @@ -83,9 +101,24 @@ OpenTransactionsToAllShardPlacements(List *shardIntervalList, int connectionFlag shardPlacement->nodePort))); } - connection = StartPlacementConnection(connectionFlags, - shardPlacement, - NULL); + /* add placement access for modification */ + placementModification.placement = shardPlacement; + placementModification.accessType = accessType; + + placementAccessList = lappend(placementAccessList, &placementModification); + + /* add additional placement accesses for subselects (e.g. INSERT .. SELECT) */ + placementSelectList = BuildPlacementSelectList(shardPlacement->nodeName, + shardPlacement->nodePort, + task->relationShardList); + placementAccessList = list_concat(placementAccessList, placementSelectList); + + /* + * Find a connection that sees preceding writes and cannot self-deadlock, + * or error out if no such connection exists. + */ + connection = StartPlacementListConnection(connectionFlags, + placementAccessList, NULL); ClaimConnectionExclusively(connection); diff --git a/src/include/distributed/multi_shard_transaction.h b/src/include/distributed/multi_shard_transaction.h index 9f5baf0c5..1f73c5831 100644 --- a/src/include/distributed/multi_shard_transaction.h +++ b/src/include/distributed/multi_shard_transaction.h @@ -27,8 +27,7 @@ typedef struct ShardConnections } ShardConnections; -extern HTAB * OpenTransactionsToAllShardPlacements(List *shardIdList, - int connectionFlags); +extern HTAB * OpenTransactionsForAllTasks(List *taskList, int connectionFlags); extern HTAB * CreateShardConnectionHash(MemoryContext memoryContext); extern ShardConnections * GetShardHashConnections(HTAB *connectionHash, int64 shardId, bool *connectionsFound); From 710fe8666bf5009e6c4d87e0ec51cd46b01abedb Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Wed, 21 Jun 2017 13:35:37 +0200 Subject: [PATCH 4/6] Use GetPlacementListConnection for router DML --- .../executor/multi_router_executor.c | 30 ++++++++++++------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index d8cfd88db..ccc19e616 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -80,7 +80,7 @@ static ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement, static void ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResults); static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task); -static List * GetModifyConnections(List *taskPlacementList, bool markCritical, +static List * GetModifyConnections(Task *task, bool markCritical, bool startedInTransaction); static void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList, bool isModificationQuery, bool expectResults); @@ -781,7 +781,7 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResult * establish the connection, mark as critical (when modifying reference * table) and start a transaction (when in a transaction). */ - connectionList = GetModifyConnections(taskPlacementList, + connectionList = GetModifyConnections(task, taskRequiresTwoPhaseCommit, startedInTransaction); @@ -884,10 +884,12 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResult * transaction in progress. */ static List * -GetModifyConnections(List *taskPlacementList, bool markCritical, bool noNewTransactions) +GetModifyConnections(Task *task, bool markCritical, bool noNewTransactions) { + List *taskPlacementList = task->taskPlacementList; ListCell *taskPlacementCell = NULL; List *multiConnectionList = NIL; + List *relationShardList = task->relationShardList; /* first initiate connection establishment for all necessary connections */ foreach(taskPlacementCell, taskPlacementList) @@ -895,14 +897,22 @@ GetModifyConnections(List *taskPlacementList, bool markCritical, bool noNewTrans ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell); int connectionFlags = SESSION_LIFESPAN | FOR_DML; MultiConnection *multiConnection = NULL; + List *placementAccessList = NIL; + ShardPlacementAccess *placementModification = NULL; - /* - * FIXME: It's not actually correct to use only one shard placement - * here for router queries involving multiple relations. We should - * check that this connection is the only modifying one associated - * with all the involved shards. - */ - multiConnection = StartPlacementConnection(connectionFlags, taskPlacement, NULL); + /* create placement accesses for placements that appear in a subselect */ + placementAccessList = BuildPlacementSelectList(taskPlacement->nodeName, + taskPlacement->nodePort, + relationShardList); + + /* create placement access for the placement that we're modifying */ + placementModification = CreatePlacementAccess(taskPlacement, + PLACEMENT_ACCESS_DML); + placementAccessList = lappend(placementAccessList, placementModification); + + /* get an appropriate connection for the DML statement */ + multiConnection = GetPlacementListConnection(connectionFlags, placementAccessList, + NULL); /* * If already in a transaction, disallow expanding set of remote From d3785b97c0bd1b571df51ffff01775ba3b42c5b9 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Wed, 21 Jun 2017 13:37:40 +0200 Subject: [PATCH 5/6] Remove XactModificationLevel distinction between DML and multi-shard --- .../executor/multi_router_executor.c | 25 +- .../distributed/executor/multi_utility.c | 8 - .../master/master_metadata_utility.c | 2 +- .../multi_alter_table_add_constraints.out | 6 +- .../regress/expected/multi_insert_select.out | 20 +- .../expected/multi_modifying_xacts.out | 236 +++++++++++++++++- .../expected/multi_mx_modifying_xacts.out | 6 - .../expected/multi_reference_table.out | 11 +- .../sql/multi_alter_table_add_constraints.sql | 3 +- src/test/regress/sql/multi_insert_select.sql | 17 +- .../regress/sql/multi_modifying_xacts.sql | 160 +++++++++++- .../regress/sql/multi_mx_modifying_xacts.sql | 6 - .../regress/sql/multi_reference_table.sql | 4 +- 13 files changed, 424 insertions(+), 80 deletions(-) diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index ccc19e616..4aac78970 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -604,13 +604,6 @@ ExecuteSingleSelectTask(CitusScanState *scanState, Task *task) char *queryString = task->queryString; List *relationShardList = task->relationShardList; - if (XactModificationLevel == XACT_MODIFICATION_MULTI_SHARD) - { - ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), - errmsg("single-shard query may not appear in transaction blocks " - "which contain multi-shard data modifications"))); - } - /* * Try to run the query to completion on one placement. If the query fails * attempt the query on the next placement. @@ -745,14 +738,6 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResult bool startedInTransaction = InCoordinatedTransaction() && XactModificationLevel == XACT_MODIFICATION_DATA; - if (XactModificationLevel == XACT_MODIFICATION_MULTI_SHARD) - { - ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), - errmsg("single-shard DML commands must not appear in " - "transaction blocks which contain multi-shard data " - "modifications"))); - } - /* * Modifications for reference tables are always done using 2PC. First * ensure that distributed transaction is started. Then force the @@ -1039,14 +1024,6 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn return 0; } - if (XactModificationLevel == XACT_MODIFICATION_DATA) - { - ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), - errmsg("multi-shard data modifications must not appear in " - "transaction blocks which contain single-shard DML " - "commands"))); - } - /* ensure that there are no concurrent modifications on the same shards */ AcquireExecutorMultiShardLocks(taskList); @@ -1072,7 +1049,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn /* open connection to all relevant placements, if not already open */ shardConnectionHash = OpenTransactionsForAllTasks(taskList, connectionFlags); - XactModificationLevel = XACT_MODIFICATION_MULTI_SHARD; + XactModificationLevel = XACT_MODIFICATION_DATA; /* iterate over placements in rounds, to ensure in-order execution */ while (tasksPending) diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index 7e69ea697..30390dc06 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -2494,14 +2494,6 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) { bool shouldSyncMetadata = ShouldSyncTableMetadata(ddlJob->targetRelationId); - if (XactModificationLevel == XACT_MODIFICATION_DATA) - { - ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), - errmsg("distributed DDL commands must not appear within " - "transaction blocks containing single-shard data " - "modifications"))); - } - EnsureCoordinator(); if (!ddlJob->concurrentIndexCmd) diff --git a/src/backend/distributed/master/master_metadata_utility.c b/src/backend/distributed/master/master_metadata_utility.c index cc1453435..e025268ed 100644 --- a/src/backend/distributed/master/master_metadata_utility.c +++ b/src/backend/distributed/master/master_metadata_utility.c @@ -146,7 +146,7 @@ DistributedTableSize(Oid relationId, char *sizeQuery) ListCell *workerNodeCell = NULL; uint64 totalRelationSize = 0; - if (XactModificationLevel == XACT_MODIFICATION_MULTI_SHARD) + if (XactModificationLevel == XACT_MODIFICATION_DATA) { ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), errmsg("citus size functions cannot be called in transaction" diff --git a/src/test/regress/expected/multi_alter_table_add_constraints.out b/src/test/regress/expected/multi_alter_table_add_constraints.out index c9688c4d4..83618f48f 100644 --- a/src/test/regress/expected/multi_alter_table_add_constraints.out +++ b/src/test/regress/expected/multi_alter_table_add_constraints.out @@ -441,9 +441,9 @@ SELECT create_distributed_table('products', 'product_no'); BEGIN; INSERT INTO products VALUES(1,'product_1', 5); --- Should error out since conflicts with the above single-shard data modification command. +-- DDL may error out after an INSERT because it might pick the wrong connection ALTER TABLE products ADD CONSTRAINT unn_pno UNIQUE(product_no); -ERROR: distributed DDL commands must not appear within transaction blocks containing single-shard data modifications +ERROR: cannot establish new placement connection when DML has been executed on existing placement connection ROLLBACK; BEGIN; -- Add constraints @@ -451,9 +451,7 @@ ALTER TABLE products ADD CONSTRAINT unn_pno UNIQUE(product_no); ALTER TABLE products ADD CONSTRAINT check_price CHECK(price > discounted_price); ALTER TABLE products ALTER COLUMN product_no SET NOT NULL; ALTER TABLE products ADD CONSTRAINT p_key_product PRIMARY KEY(product_no); --- Single shard DML command can't be located in the same transaction with above commands. INSERT INTO products VALUES(1,'product_1', 10, 8); -ERROR: single-shard DML commands must not appear in transaction blocks which contain multi-shard data modifications ROLLBACK; -- There should be no constraint on master and worker(s) SELECT "Constraint", "Definition" FROM table_checks WHERE relid='products'::regclass; diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index 33296cbe9..3f9b216a6 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -1636,15 +1636,24 @@ ROLLBACK; BEGIN; ALTER TABLE raw_events_second DROP COLUMN value_4; INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 100; -ERROR: single-shard DML commands must not appear in transaction blocks which contain multi-shard data modifications ROLLBACK; --- Insert after copy is currently disallowed because of the way the --- transaction modification state is currently handled. Copy is also --- rolled back. +-- Altering a reference table and then performing an INSERT ... SELECT which +-- joins with the reference table is not allowed, since the INSERT ... SELECT +-- would read from the reference table over others connections than the ones +-- that performed the DDL. +BEGIN; +ALTER TABLE reference_table ADD COLUMN z int; +INSERT INTO raw_events_first (user_id) +SELECT user_id FROM raw_events_second JOIN reference_table USING (user_id); +ERROR: cannot establish new placement connection when DDL has been executed on existing placement connection +ROLLBACK; +-- Insert after copy is disallowed when the INSERT INTO ... SELECT chooses +-- to use a connection for one shard, while the connection already modified +-- another shard. BEGIN; COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ','; INSERT INTO raw_events_first SELECT * FROM raw_events_second; -ERROR: multi-shard data modifications must not appear in transaction blocks which contain single-shard DML commands +ERROR: cannot establish new placement connection when DML has been executed on existing placement connection ROLLBACK; -- Insert after copy is currently allowed for single-shard operation. -- Both insert and copy are rolled back successfully. @@ -1658,7 +1667,6 @@ SELECT user_id FROM raw_events_first WHERE user_id = 101; (1 row) ROLLBACK; --- Copy after insert is currently disallowed. BEGIN; INSERT INTO raw_events_first SELECT * FROM raw_events_second; COPY raw_events_first (user_id, value_1) FROM STDIN DELIMITER ','; diff --git a/src/test/regress/expected/multi_modifying_xacts.out b/src/test/regress/expected/multi_modifying_xacts.out index a03010787..8eebdc5bf 100644 --- a/src/test/regress/expected/multi_modifying_xacts.out +++ b/src/test/regress/expected/multi_modifying_xacts.out @@ -172,18 +172,16 @@ SELECT count(*) FROM researchers WHERE lab_id = 6; (1 row) ABORT; --- applies to DDL, too +-- we can mix DDL and INSERT BEGIN; INSERT INTO labs VALUES (6, 'Bell Labs'); ALTER TABLE labs ADD COLUMN motto text; -ERROR: distributed DDL commands must not appear within transaction blocks containing single-shard data modifications -COMMIT; +ABORT; -- whether it occurs first or second BEGIN; ALTER TABLE labs ADD COLUMN motto text; INSERT INTO labs VALUES (6, 'Bell Labs'); -ERROR: single-shard DML commands must not appear in transaction blocks which contain multi-shard data modifications -COMMIT; +ABORT; -- but the DDL should correctly roll back SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.labs'::regclass; Column | Type | Modifiers @@ -383,16 +381,29 @@ ORDER BY nodeport; localhost | 57638 | t | DROP FUNCTION (2 rows) --- ALTER TABLE and COPY are compatible if ALTER TABLE precedes COPY +-- ALTER and copy are compatible BEGIN; ALTER TABLE labs ADD COLUMN motto text; \copy labs from stdin delimiter ',' ROLLBACK; --- but not if COPY precedes ALTER TABLE BEGIN; \copy labs from stdin delimiter ',' ALTER TABLE labs ADD COLUMN motto text; -ERROR: distributed DDL commands must not appear within transaction blocks containing single-shard data modifications +ABORT; +-- cannot perform DDL once a connection is used for multiple shards +BEGIN; +SELECT lab_id FROM researchers WHERE lab_id = 1 AND id = 0; + lab_id +-------- +(0 rows) + +SELECT lab_id FROM researchers WHERE lab_id = 2 AND id = 0; + lab_id +-------- +(0 rows) + +ALTER TABLE researchers ADD COLUMN motto text; +ERROR: cannot perform parallel DDL command because multiple placements have been accessed over the same connection ROLLBACK; -- multi-shard operations can co-exist with DDL in a transactional way BEGIN; @@ -1283,3 +1294,212 @@ SELECT * FROM run_command_on_workers('DROP USER test_user'); (2 rows) DROP USER test_user; +-- set up foreign keys to test transactions with co-located and reference tables +BEGIN; +SET LOCAL citus.shard_replication_factor TO 1; +SET LOCAL citus.shard_count TO 4; +CREATE TABLE usergroups ( + gid int PRIMARY KEY, + name text +); +SELECT create_reference_table('usergroups'); + create_reference_table +------------------------ + +(1 row) + +CREATE TABLE itemgroups ( + gid int PRIMARY KEY, + name text +); +SELECT create_reference_table('itemgroups'); + create_reference_table +------------------------ + +(1 row) + +CREATE TABLE users ( + id int PRIMARY KEY, + name text, + user_group int +); +SELECT create_distributed_table('users', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE items ( + user_id int REFERENCES users (id) ON DELETE CASCADE, + item_name text, + item_group int +); +SELECT create_distributed_table('items', 'user_id'); + create_distributed_table +-------------------------- + +(1 row) + +-- Table to find values that live in different shards on the same node +SELECT id, shard_name('users', shardid), nodename, nodeport +FROM + pg_dist_shard_placement +JOIN + ( SELECT id, get_shard_id_for_distribution_column('users', id) shardid FROM generate_series(1,10) id ) ids +USING (shardid) +ORDER BY + id; + id | shard_name | nodename | nodeport +----+---------------+-----------+---------- + 1 | users_1200022 | localhost | 57637 + 2 | users_1200025 | localhost | 57638 + 3 | users_1200023 | localhost | 57638 + 4 | users_1200023 | localhost | 57638 + 5 | users_1200022 | localhost | 57637 + 6 | users_1200024 | localhost | 57637 + 7 | users_1200023 | localhost | 57638 + 8 | users_1200022 | localhost | 57637 + 9 | users_1200025 | localhost | 57638 + 10 | users_1200022 | localhost | 57637 +(10 rows) + +END; +-- the INSERTs into items should see the users +BEGIN; +\COPY users FROM STDIN WITH CSV +INSERT INTO items VALUES (1, 'item-1'); +INSERT INTO items VALUES (6, 'item-6'); +END; +SELECT user_id FROM items ORDER BY user_id; + user_id +--------- + 1 + 6 +(2 rows) + +-- should not be able to open multiple connections per node after INSERTing over one connection +BEGIN; +INSERT INTO users VALUES (2, 'burak'); +INSERT INTO users VALUES (3, 'burak'); +\COPY items FROM STDIN WITH CSV +ERROR: cannot establish new placement connection when DML has been executed on existing placement connection +END; +-- cannot perform DDL after a co-located table has been read over 1 connection +BEGIN; +SELECT id FROM users WHERE id = 1; + id +---- + 1 +(1 row) + +SELECT id FROM users WHERE id = 6; + id +---- + 6 +(1 row) + +ALTER TABLE items ADD COLUMN last_update timestamptz; +NOTICE: using one-phase commit for distributed DDL commands +ERROR: cannot perform parallel DDL command because multiple placements have been accessed over the same connection +END; +-- but the other way around is fine +BEGIN; +ALTER TABLE items ADD COLUMN last_update timestamptz; +SELECT id FROM users JOIN items ON (id = user_id) WHERE id = 1; + id +---- + 1 +(1 row) + +SELECT id FROM users JOIN items ON (id = user_id) WHERE id = 6; + id +---- + 6 +(1 row) + +END; +BEGIN; +-- establish multiple connections to a node +\COPY users FROM STDIN WITH CSV +-- now read from the reference table over each connection +SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id = 2; + user_id +--------- +(0 rows) + +SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id = 3; + user_id +--------- +(0 rows) + +-- perform a DDL command on the reference table +ALTER TABLE itemgroups ADD COLUMN last_update timestamptz; +ERROR: cannot perform DDL on a placement which has been read over multiple connections +END; +BEGIN; +-- establish multiple connections to a node +\COPY users FROM STDIN WITH CSV +-- read from the reference table over each connection +SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id = 2; + user_id +--------- +(0 rows) + +SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id = 3; + user_id +--------- +(0 rows) + +-- perform a DDL command on a co-located reference table +ALTER TABLE usergroups ADD COLUMN last_update timestamptz; +ERROR: cannot perform DDL on a placement if a co-located placement has been read over multiple connections +END; +BEGIN; +-- make a modification over connection 1 +INSERT INTO usergroups VALUES (0,'istanbul'); +-- copy over connections 1 and 2 +\COPY users FROM STDIN WITH CSV +-- cannot read modifications made over different connections +SELECT id FROM users JOIN usergroups ON (gid = user_group) WHERE id = 3; +ERROR: cannot perform query with placements that were modified over multiple connections +END; +-- make sure we can see cascading deletes +BEGIN; +SELECT master_modify_multiple_shards('DELETE FROM users'); + master_modify_multiple_shards +------------------------------- + 2 +(1 row) + +SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id = 1; + user_id +--------- +(0 rows) + +SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id = 6; + user_id +--------- +(0 rows) + +END; +-- test visibility after COPY +INSERT INTO usergroups VALUES (2,'group'); +BEGIN; +-- opens two separate connections to node +\COPY users FROM STDIN WITH CSV +-- Uses first connection, which wrote the row with id = 2 +SELECT * FROM users JOIN usergroups ON (user_group = gid) WHERE id = 2; + id | name | user_group | gid | name +----+-------+------------+-----+------- + 2 | onder | 2 | 2 | group +(1 row) + +-- Should use second connection, which wrote the row with id = 4 +SELECT * FROM users JOIN usergroups ON (user_group = gid) WHERE id = 4; + id | name | user_group | gid | name +----+-------+------------+-----+------- + 4 | murat | 2 | 2 | group +(1 row) + +END; +DROP TABLE items, users, itemgroups, usergroups; diff --git a/src/test/regress/expected/multi_mx_modifying_xacts.out b/src/test/regress/expected/multi_mx_modifying_xacts.out index 92d29bf82..433fddc0d 100644 --- a/src/test/regress/expected/multi_mx_modifying_xacts.out +++ b/src/test/regress/expected/multi_mx_modifying_xacts.out @@ -179,12 +179,6 @@ SELECT count(*) FROM researchers_mx WHERE lab_id = 6; (1 row) ABORT; --- applies to DDL -BEGIN; -INSERT INTO labs_mx VALUES (6, 'Bell labs_mx'); -ALTER TABLE labs_mx ADD COLUMN motto text; -ERROR: distributed DDL commands must not appear within transaction blocks containing single-shard data modifications -COMMIT; -- doesn't apply to COPY after modifications BEGIN; INSERT INTO labs_mx VALUES (6, 'Bell labs_mx'); diff --git a/src/test/regress/expected/multi_reference_table.out b/src/test/regress/expected/multi_reference_table.out index e54e4428e..c83912132 100644 --- a/src/test/regress/expected/multi_reference_table.out +++ b/src/test/regress/expected/multi_reference_table.out @@ -1609,19 +1609,22 @@ SELECT * FROM reference_table_test; 10 | 2 | 2 | Fri Dec 02 00:00:00 2016 (1 row) --- do not allow mixing transactions +-- DML+master_modify_multiple_shards is allowed BEGIN; INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02'); SELECT master_modify_multiple_shards('DELETE FROM colocated_table_test'); -ERROR: multi-shard data modifications must not appear in transaction blocks which contain single-shard DML commands + master_modify_multiple_shards +------------------------------- + 10 +(1 row) + ROLLBACK; --- Do not allow DDL and modification in the same transaction +-- DDL+DML is allowed BEGIN; ALTER TABLE reference_table_test ADD COLUMN value_dummy INT; NOTICE: using one-phase commit for distributed DDL commands HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02'); -ERROR: single-shard DML commands must not appear in transaction blocks which contain multi-shard data modifications ROLLBACK; -- clean up tables DROP TABLE reference_table_test, reference_table_test_second, reference_table_test_third, diff --git a/src/test/regress/sql/multi_alter_table_add_constraints.sql b/src/test/regress/sql/multi_alter_table_add_constraints.sql index 58c420b49..4e41472f7 100644 --- a/src/test/regress/sql/multi_alter_table_add_constraints.sql +++ b/src/test/regress/sql/multi_alter_table_add_constraints.sql @@ -382,7 +382,7 @@ SELECT create_distributed_table('products', 'product_no'); BEGIN; INSERT INTO products VALUES(1,'product_1', 5); --- Should error out since conflicts with the above single-shard data modification command. +-- DDL may error out after an INSERT because it might pick the wrong connection ALTER TABLE products ADD CONSTRAINT unn_pno UNIQUE(product_no); ROLLBACK; @@ -393,7 +393,6 @@ ALTER TABLE products ADD CONSTRAINT check_price CHECK(price > discounted_price); ALTER TABLE products ALTER COLUMN product_no SET NOT NULL; ALTER TABLE products ADD CONSTRAINT p_key_product PRIMARY KEY(product_no); --- Single shard DML command can't be located in the same transaction with above commands. INSERT INTO products VALUES(1,'product_1', 10, 8); ROLLBACK; diff --git a/src/test/regress/sql/multi_insert_select.sql b/src/test/regress/sql/multi_insert_select.sql index 7d49b6e2b..3cf7c2cd1 100644 --- a/src/test/regress/sql/multi_insert_select.sql +++ b/src/test/regress/sql/multi_insert_select.sql @@ -1332,9 +1332,19 @@ ALTER TABLE raw_events_second DROP COLUMN value_4; INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 100; ROLLBACK; --- Insert after copy is currently disallowed because of the way the --- transaction modification state is currently handled. Copy is also --- rolled back. +-- Altering a reference table and then performing an INSERT ... SELECT which +-- joins with the reference table is not allowed, since the INSERT ... SELECT +-- would read from the reference table over others connections than the ones +-- that performed the DDL. +BEGIN; +ALTER TABLE reference_table ADD COLUMN z int; +INSERT INTO raw_events_first (user_id) +SELECT user_id FROM raw_events_second JOIN reference_table USING (user_id); +ROLLBACK; + +-- Insert after copy is disallowed when the INSERT INTO ... SELECT chooses +-- to use a connection for one shard, while the connection already modified +-- another shard. BEGIN; COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ','; 100,100 @@ -1352,7 +1362,6 @@ INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 101 SELECT user_id FROM raw_events_first WHERE user_id = 101; ROLLBACK; --- Copy after insert is currently disallowed. BEGIN; INSERT INTO raw_events_first SELECT * FROM raw_events_second; COPY raw_events_first (user_id, value_1) FROM STDIN DELIMITER ','; diff --git a/src/test/regress/sql/multi_modifying_xacts.sql b/src/test/regress/sql/multi_modifying_xacts.sql index 233a64be7..17fb859ea 100644 --- a/src/test/regress/sql/multi_modifying_xacts.sql +++ b/src/test/regress/sql/multi_modifying_xacts.sql @@ -138,17 +138,17 @@ INSERT INTO labs VALUES (6, 'Bell Labs'); SELECT count(*) FROM researchers WHERE lab_id = 6; ABORT; --- applies to DDL, too +-- we can mix DDL and INSERT BEGIN; INSERT INTO labs VALUES (6, 'Bell Labs'); ALTER TABLE labs ADD COLUMN motto text; -COMMIT; +ABORT; -- whether it occurs first or second BEGIN; ALTER TABLE labs ADD COLUMN motto text; INSERT INTO labs VALUES (6, 'Bell Labs'); -COMMIT; +ABORT; -- but the DDL should correctly roll back SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.labs'::regclass; @@ -289,7 +289,7 @@ ORDER BY nodeport, shardid; SELECT * FROM run_command_on_workers('drop function reject_large_id()') ORDER BY nodeport; --- ALTER TABLE and COPY are compatible if ALTER TABLE precedes COPY +-- ALTER and copy are compatible BEGIN; ALTER TABLE labs ADD COLUMN motto text; \copy labs from stdin delimiter ',' @@ -297,12 +297,18 @@ ALTER TABLE labs ADD COLUMN motto text; \. ROLLBACK; --- but not if COPY precedes ALTER TABLE BEGIN; \copy labs from stdin delimiter ',' 12,fsociety \. ALTER TABLE labs ADD COLUMN motto text; +ABORT; + +-- cannot perform DDL once a connection is used for multiple shards +BEGIN; +SELECT lab_id FROM researchers WHERE lab_id = 1 AND id = 0; +SELECT lab_id FROM researchers WHERE lab_id = 2 AND id = 0; +ALTER TABLE researchers ADD COLUMN motto text; ROLLBACK; -- multi-shard operations can co-exist with DDL in a transactional way @@ -944,3 +950,147 @@ DROP TABLE reference_modifying_xacts, hash_modifying_xacts, hash_modifying_xacts SELECT * FROM run_command_on_workers('DROP USER test_user'); DROP USER test_user; + +-- set up foreign keys to test transactions with co-located and reference tables +BEGIN; +SET LOCAL citus.shard_replication_factor TO 1; +SET LOCAL citus.shard_count TO 4; + +CREATE TABLE usergroups ( + gid int PRIMARY KEY, + name text +); +SELECT create_reference_table('usergroups'); + +CREATE TABLE itemgroups ( + gid int PRIMARY KEY, + name text +); +SELECT create_reference_table('itemgroups'); + +CREATE TABLE users ( + id int PRIMARY KEY, + name text, + user_group int +); +SELECT create_distributed_table('users', 'id'); + +CREATE TABLE items ( + user_id int REFERENCES users (id) ON DELETE CASCADE, + item_name text, + item_group int +); +SELECT create_distributed_table('items', 'user_id'); + +-- Table to find values that live in different shards on the same node +SELECT id, shard_name('users', shardid), nodename, nodeport +FROM + pg_dist_shard_placement +JOIN + ( SELECT id, get_shard_id_for_distribution_column('users', id) shardid FROM generate_series(1,10) id ) ids +USING (shardid) +ORDER BY + id; + +END; + +-- the INSERTs into items should see the users +BEGIN; +\COPY users FROM STDIN WITH CSV +1,brian,0 +6,metin,0 +\. +INSERT INTO items VALUES (1, 'item-1'); +INSERT INTO items VALUES (6, 'item-6'); +END; + +SELECT user_id FROM items ORDER BY user_id; + +-- should not be able to open multiple connections per node after INSERTing over one connection +BEGIN; +INSERT INTO users VALUES (2, 'burak'); +INSERT INTO users VALUES (3, 'burak'); +\COPY items FROM STDIN WITH CSV +2,item-2,0 +3,item-3,0 +\. +END; + +-- cannot perform DDL after a co-located table has been read over 1 connection +BEGIN; +SELECT id FROM users WHERE id = 1; +SELECT id FROM users WHERE id = 6; +ALTER TABLE items ADD COLUMN last_update timestamptz; +END; + +-- but the other way around is fine +BEGIN; +ALTER TABLE items ADD COLUMN last_update timestamptz; +SELECT id FROM users JOIN items ON (id = user_id) WHERE id = 1; +SELECT id FROM users JOIN items ON (id = user_id) WHERE id = 6; +END; + +BEGIN; +-- establish multiple connections to a node +\COPY users FROM STDIN WITH CSV +2,burak,0 +3,burak,0 +\. +-- now read from the reference table over each connection +SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id = 2; +SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id = 3; +-- perform a DDL command on the reference table +ALTER TABLE itemgroups ADD COLUMN last_update timestamptz; +END; + +BEGIN; +-- establish multiple connections to a node +\COPY users FROM STDIN WITH CSV +2,burak,0 +3,burak,0 +\. +-- read from the reference table over each connection +SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id = 2; +SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id = 3; +-- perform a DDL command on a co-located reference table +ALTER TABLE usergroups ADD COLUMN last_update timestamptz; +END; + +BEGIN; +-- make a modification over connection 1 +INSERT INTO usergroups VALUES (0,'istanbul'); +-- copy over connections 1 and 2 +\COPY users FROM STDIN WITH CSV +2,burak,0 +3,burak,0 +\. +-- cannot read modifications made over different connections +SELECT id FROM users JOIN usergroups ON (gid = user_group) WHERE id = 3; +END; + +-- make sure we can see cascading deletes +BEGIN; +SELECT master_modify_multiple_shards('DELETE FROM users'); +SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id = 1; +SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id = 6; +END; + +-- test visibility after COPY + +INSERT INTO usergroups VALUES (2,'group'); + +BEGIN; +-- opens two separate connections to node +\COPY users FROM STDIN WITH CSV +2,onder,2 +4,murat,2 +\. + +-- Uses first connection, which wrote the row with id = 2 +SELECT * FROM users JOIN usergroups ON (user_group = gid) WHERE id = 2; + +-- Should use second connection, which wrote the row with id = 4 +SELECT * FROM users JOIN usergroups ON (user_group = gid) WHERE id = 4; +END; + +DROP TABLE items, users, itemgroups, usergroups; diff --git a/src/test/regress/sql/multi_mx_modifying_xacts.sql b/src/test/regress/sql/multi_mx_modifying_xacts.sql index 24f25876e..508fe7e6d 100644 --- a/src/test/regress/sql/multi_mx_modifying_xacts.sql +++ b/src/test/regress/sql/multi_mx_modifying_xacts.sql @@ -150,12 +150,6 @@ INSERT INTO labs_mx VALUES (6, 'Bell labs_mx'); SELECT count(*) FROM researchers_mx WHERE lab_id = 6; ABORT; --- applies to DDL -BEGIN; -INSERT INTO labs_mx VALUES (6, 'Bell labs_mx'); -ALTER TABLE labs_mx ADD COLUMN motto text; -COMMIT; - -- doesn't apply to COPY after modifications BEGIN; INSERT INTO labs_mx VALUES (6, 'Bell labs_mx'); diff --git a/src/test/regress/sql/multi_reference_table.sql b/src/test/regress/sql/multi_reference_table.sql index 9c22ce6c1..c2ff55a82 100644 --- a/src/test/regress/sql/multi_reference_table.sql +++ b/src/test/regress/sql/multi_reference_table.sql @@ -994,13 +994,13 @@ UPDATE reference_table_test SET value_1 = 10 WHERE value_1 = 2; COMMIT; SELECT * FROM reference_table_test; --- do not allow mixing transactions +-- DML+master_modify_multiple_shards is allowed BEGIN; INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02'); SELECT master_modify_multiple_shards('DELETE FROM colocated_table_test'); ROLLBACK; --- Do not allow DDL and modification in the same transaction +-- DDL+DML is allowed BEGIN; ALTER TABLE reference_table_test ADD COLUMN value_dummy INT; INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02'); From 9f7e4769e2cd728270e456048f0b25f5977aa4b9 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Fri, 7 Jul 2017 17:08:57 +0200 Subject: [PATCH 6/6] Clarify placement connection error messages --- .../connection/placement_connection.c | 23 +++++++----- .../multi_alter_table_add_constraints.out | 2 +- .../regress/expected/multi_insert_select.out | 4 +-- .../expected/multi_modifying_xacts.out | 36 +++++++++---------- 4 files changed, 35 insertions(+), 30 deletions(-) diff --git a/src/backend/distributed/connection/placement_connection.c b/src/backend/distributed/connection/placement_connection.c index 047fe041b..26d8287d9 100644 --- a/src/backend/distributed/connection/placement_connection.c +++ b/src/backend/distributed/connection/placement_connection.c @@ -341,8 +341,10 @@ StartPlacementListConnection(uint32 flags, List *placementAccessList, ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), - errmsg("cannot perform DDL on a placement which has been read over " - "multiple connections"))); + errmsg( + "cannot perform DDL on placement %ld, which has been read over " + "multiple connections", + placement->placementId))); } else if (accessType == PLACEMENT_ACCESS_DDL && colocatedEntry != NULL && colocatedEntry->hasSecondaryConnections) @@ -358,8 +360,9 @@ StartPlacementListConnection(uint32 flags, List *placementAccessList, ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), - errmsg("cannot perform DDL on a placement if a co-located placement " - "has been read over multiple connections"))); + errmsg("cannot perform DDL on placement %ld since a co-located " + "placement has been read over multiple connections", + placement->placementId))); } else if (foundModifyingConnection) { @@ -411,8 +414,9 @@ StartPlacementListConnection(uint32 flags, List *placementAccessList, ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), - errmsg("cannot establish new placement connection when DDL has " - "been executed on existing placement connection"))); + errmsg("cannot establish a new connection for placement %ld, since " + "DDL has been executed on a connection that is in use", + placement->placementId))); } else if (placementConnection->hadDML) { @@ -432,8 +436,9 @@ StartPlacementListConnection(uint32 flags, List *placementAccessList, ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), - errmsg("cannot establish new placement connection when DML has " - "been executed on existing placement connection"))); + errmsg("cannot establish a new connection for placement %ld, since " + "DML has been executed on a connection that is in use", + placement->placementId))); } else if (accessType == PLACEMENT_ACCESS_DDL) { @@ -450,7 +455,7 @@ StartPlacementListConnection(uint32 flags, List *placementAccessList, ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), - errmsg("cannot perform parallel DDL command because multiple " + errmsg("cannot perform a parallel DDL command because multiple " "placements have been accessed over the same connection"))); } else diff --git a/src/test/regress/expected/multi_alter_table_add_constraints.out b/src/test/regress/expected/multi_alter_table_add_constraints.out index 83618f48f..3192f1ad9 100644 --- a/src/test/regress/expected/multi_alter_table_add_constraints.out +++ b/src/test/regress/expected/multi_alter_table_add_constraints.out @@ -443,7 +443,7 @@ BEGIN; INSERT INTO products VALUES(1,'product_1', 5); -- DDL may error out after an INSERT because it might pick the wrong connection ALTER TABLE products ADD CONSTRAINT unn_pno UNIQUE(product_no); -ERROR: cannot establish new placement connection when DML has been executed on existing placement connection +ERROR: cannot establish a new connection for placement 2327, since DML has been executed on a connection that is in use ROLLBACK; BEGIN; -- Add constraints diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index 3f9b216a6..30184b38b 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -1645,7 +1645,7 @@ BEGIN; ALTER TABLE reference_table ADD COLUMN z int; INSERT INTO raw_events_first (user_id) SELECT user_id FROM raw_events_second JOIN reference_table USING (user_id); -ERROR: cannot establish new placement connection when DDL has been executed on existing placement connection +ERROR: cannot establish a new connection for placement 655, since DDL has been executed on a connection that is in use ROLLBACK; -- Insert after copy is disallowed when the INSERT INTO ... SELECT chooses -- to use a connection for one shard, while the connection already modified @@ -1653,7 +1653,7 @@ ROLLBACK; BEGIN; COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ','; INSERT INTO raw_events_first SELECT * FROM raw_events_second; -ERROR: cannot establish new placement connection when DML has been executed on existing placement connection +ERROR: cannot establish a new connection for placement 636, since DML has been executed on a connection that is in use ROLLBACK; -- Insert after copy is currently allowed for single-shard operation. -- Both insert and copy are rolled back successfully. diff --git a/src/test/regress/expected/multi_modifying_xacts.out b/src/test/regress/expected/multi_modifying_xacts.out index 8eebdc5bf..f41fc45bb 100644 --- a/src/test/regress/expected/multi_modifying_xacts.out +++ b/src/test/regress/expected/multi_modifying_xacts.out @@ -205,7 +205,7 @@ BEGIN; INSERT INTO researchers VALUES (2, 1, 'Knuth Donald'); INSERT INTO researchers VALUES (10, 6, 'Lamport Leslie'); \copy researchers from stdin delimiter ',' -ERROR: cannot establish new placement connection when DML has been executed on existing placement connection +ERROR: cannot establish a new connection for placement 2704, since DML has been executed on a connection that is in use CONTEXT: COPY researchers, line 2: "10,6,Lesport Lampie" ROLLBACK; -- after a COPY you can modify multiple shards, since they'll use different connections @@ -403,7 +403,7 @@ SELECT lab_id FROM researchers WHERE lab_id = 2 AND id = 0; (0 rows) ALTER TABLE researchers ADD COLUMN motto text; -ERROR: cannot perform parallel DDL command because multiple placements have been accessed over the same connection +ERROR: cannot perform a parallel DDL command because multiple placements have been accessed over the same connection ROLLBACK; -- multi-shard operations can co-exist with DDL in a transactional way BEGIN; @@ -1349,18 +1349,18 @@ JOIN USING (shardid) ORDER BY id; - id | shard_name | nodename | nodeport -----+---------------+-----------+---------- - 1 | users_1200022 | localhost | 57637 - 2 | users_1200025 | localhost | 57638 - 3 | users_1200023 | localhost | 57638 - 4 | users_1200023 | localhost | 57638 - 5 | users_1200022 | localhost | 57637 - 6 | users_1200024 | localhost | 57637 - 7 | users_1200023 | localhost | 57638 - 8 | users_1200022 | localhost | 57637 - 9 | users_1200025 | localhost | 57638 - 10 | users_1200022 | localhost | 57637 + id | shard_name | nodename | nodeport +----+----------------------+-----------+---------- + 1 | public.users_1200022 | localhost | 57637 + 2 | public.users_1200025 | localhost | 57638 + 3 | public.users_1200023 | localhost | 57638 + 4 | public.users_1200023 | localhost | 57638 + 5 | public.users_1200022 | localhost | 57637 + 6 | public.users_1200024 | localhost | 57637 + 7 | public.users_1200023 | localhost | 57638 + 8 | public.users_1200022 | localhost | 57637 + 9 | public.users_1200025 | localhost | 57638 + 10 | public.users_1200022 | localhost | 57637 (10 rows) END; @@ -1382,7 +1382,7 @@ BEGIN; INSERT INTO users VALUES (2, 'burak'); INSERT INTO users VALUES (3, 'burak'); \COPY items FROM STDIN WITH CSV -ERROR: cannot establish new placement connection when DML has been executed on existing placement connection +ERROR: cannot establish a new connection for placement 2743, since DML has been executed on a connection that is in use END; -- cannot perform DDL after a co-located table has been read over 1 connection BEGIN; @@ -1400,7 +1400,7 @@ SELECT id FROM users WHERE id = 6; ALTER TABLE items ADD COLUMN last_update timestamptz; NOTICE: using one-phase commit for distributed DDL commands -ERROR: cannot perform parallel DDL command because multiple placements have been accessed over the same connection +ERROR: cannot perform a parallel DDL command because multiple placements have been accessed over the same connection END; -- but the other way around is fine BEGIN; @@ -1434,7 +1434,7 @@ SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id = -- perform a DDL command on the reference table ALTER TABLE itemgroups ADD COLUMN last_update timestamptz; -ERROR: cannot perform DDL on a placement which has been read over multiple connections +ERROR: cannot perform DDL on placement 2737, which has been read over multiple connections END; BEGIN; -- establish multiple connections to a node @@ -1452,7 +1452,7 @@ SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id = -- perform a DDL command on a co-located reference table ALTER TABLE usergroups ADD COLUMN last_update timestamptz; -ERROR: cannot perform DDL on a placement if a co-located placement has been read over multiple connections +ERROR: cannot perform DDL on placement 2735 since a co-located placement has been read over multiple connections END; BEGIN; -- make a modification over connection 1