From bfa742d794d0300f9c6898003f30096ceb964a9c Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Mon, 2 Jan 2017 03:58:22 -0800 Subject: [PATCH 1/2] Centralized shard/placement connection and state management. Currently there are several places in citus that map placements to connections and that manage placement health. Centralize this knowledge. Because of the centralized knowledge about which connection has previously been used for which shard/placement, this also provides the basis for relaxing restrictions around combining various forms of DDL/DML. Connections for a placement can now be acquired using GetPlacementConnection(). If the connection is used for DML or DDL the FOR_DDL/DML flags should be used respectively. If an individual remote transaction fails (but the transaction on the master succeeds) and FOR_DDL/DML have been specified, the placement is marked as invalid, unless that'd mark all placements for a shard as invalid. --- .../connection/connection_management.c | 7 +- .../connection/placement_connection.c | 660 ++++++++++++++++++ src/backend/distributed/shared_library_init.c | 2 + .../transaction/transaction_management.c | 25 + .../distributed/connection_management.h | 9 +- .../distributed/placement_connection.h | 33 + 6 files changed, 734 insertions(+), 2 deletions(-) create mode 100644 src/backend/distributed/connection/placement_connection.c create mode 100644 src/include/distributed/placement_connection.h diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 6767a41a9..7909e9d75 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -23,6 +23,7 @@ #include "distributed/connection_management.h" #include "distributed/metadata_cache.h" #include "distributed/hash_helpers.h" +#include "distributed/placement_connection.h" #include "mb/pg_wchar.h" #include "utils/hsearch.h" #include "utils/memutils.h" @@ -234,6 +235,7 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, connection = StartConnectionEstablishment(&key); dlist_push_tail(entry->connections, &connection->connectionNode); + ResetShardPlacementAssociation(connection); if (flags & SESSION_LIFESPAN) { @@ -333,6 +335,7 @@ CloseNodeConnections(char *nodeName, int nodePort) /* same for transaction state */ CloseRemoteTransaction(connection); + CloseShardPlacementAssociation(connection); /* we leave the per-host entry alive */ pfree(connection); @@ -366,8 +369,9 @@ CloseConnection(MultiConnection *connection) /* unlink from list of open connections */ dlist_delete(&connection->connectionNode); - /* same for transaction state */ + /* same for transaction state and shard/placement machinery */ CloseRemoteTransaction(connection); + CloseShardPlacementAssociation(connection); /* we leave the per-host entry alive */ pfree(connection); @@ -692,6 +696,7 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit) { /* reset per-transaction state */ ResetRemoteTransaction(connection); + ResetShardPlacementAssociation(connection); UnclaimConnection(connection); } diff --git a/src/backend/distributed/connection/placement_connection.c b/src/backend/distributed/connection/placement_connection.c new file mode 100644 index 000000000..0b3d7e78b --- /dev/null +++ b/src/backend/distributed/connection/placement_connection.c @@ -0,0 +1,660 @@ +/*------------------------------------------------------------------------- + * + * placement_connection.c + * Per placement connection handling. + * + * Copyright (c) 2016-2017, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + + +#include "postgres.h" + +#include "distributed/connection_management.h" +#include "distributed/hash_helpers.h" +#include "distributed/metadata_cache.h" +#include "distributed/placement_connection.h" +#include "utils/hsearch.h" +#include "utils/memutils.h" + + +/* + * A connection reference is used to register that a connection has been used + * to read or modify a shard placement as a particular user. + */ +typedef struct ConnectionReference +{ + /* identity information about the connection */ + uint64 shardId; + uint64 placementId; + + /* + * The user used to read/modify the placement. We cannot reuse connections + * that were performed using a different role, since it would not have the + * right permissions. + */ + const char *userName; + + /* the connection */ + MultiConnection *connection; + + /* + * Information about what the connection is used for. There can only be + * one connection executing DDL/DML for a placement to avoid deadlock + * issues/read-your-own-writes violations. The difference between DDL/DML + * currently is only used to emit more precise error messages. + */ + bool hadDML; + bool hadDDL; + + /* membership in ConnectionPlacementHashKey->connectionReferences */ + dlist_node placementNode; + + /* membership in MultiConnection->referencedPlacements */ + dlist_node connectionNode; +} ConnectionReference; + + +/* + * Hash table mapping placements to a list of connections. + * + * This stores a list of connections for each placement, because multiple + * connections to the same placement may exist at the same time. E.g. a + * real-time executor query may reference the same placement in several + * sub-tasks. + * + * We keep track about a connection having executed DML or DDL, since we can + * only ever allow a single transaction to do either to prevent deadlocks and + * consistency violations (e.g. read-your-own-writes). + */ + +/* hash key */ +typedef struct ConnectionPlacementHashKey +{ + uint64 placementId; +} ConnectionPlacementHashKey; + +/* hash entry */ +typedef struct ConnectionPlacementHashEntry +{ + ConnectionPlacementHashKey key; + + /* did any remote transactions fail? */ + bool failed; + + /* list of connections to remote nodes */ + dlist_head connectionReferences; + + /* if non-NULL, connection executing DML/DDL*/ + ConnectionReference *modifyingConnection; + + /* membership in ConnectionShardHashEntry->placementConnections */ + dlist_node shardNode; +} ConnectionPlacementHashEntry; + +/* hash table */ +static HTAB *ConnectionPlacementHash; + + +/* + * Hash table mapping shard ids to placements. + * + * This is used to track whether placements of a shard have to be marked + * invalid after a failure, or whether a coordinated transaction has to be + * aborted, to avoid all placements of a shard to be marked invalid. + */ + +/* hash key */ +typedef struct ConnectionShardHashKey +{ + uint64 shardId; +} ConnectionShardHashKey; + +/* hash entry */ +typedef struct ConnectionShardHashEntry +{ + ConnectionShardHashKey key; + dlist_head placementConnections; +} ConnectionShardHashEntry; + +/* hash table */ +static HTAB *ConnectionShardHash; + + +static ConnectionReference * CheckExistingConnections(uint32 flags, const char *userName, + ConnectionPlacementHashEntry * + placementEntry); +static bool CanUseExistingConnection(uint32 flags, const char *userName, + ConnectionReference *connectionReference); +static void AssociatePlacementWithShard(ConnectionPlacementHashEntry *placementEntry, + ShardPlacement *placement); +static bool CheckShardPlacements(ConnectionShardHashEntry *shardEntry, bool preCommit, + bool using2PC); + + +/* + * GetPlacementConnection establishes a connection for a placement. + * + * See StartPlacementConnection for details. + */ +MultiConnection * +GetPlacementConnection(uint32 flags, ShardPlacement *placement, const char *userName) +{ + MultiConnection *connection = StartPlacementConnection(flags, placement, userName); + + FinishConnectionEstablishment(connection); + return connection; +} + + +/* + * StartPlacementConnection initiates a connection to a remote node, + * associated with the placement and transaction. + * + * The connection is established for the current database. If userName is NULL + * the current user is used, otherwise the provided one. + * + * See StartNodeUserDatabaseConnection for details. + * + * Flags have the corresponding meaning from StartNodeUserDatabaseConnection, + * except that two additional flags have an effect: + * - FOR_DML - signal that connection is going to be used for DML (modifications) + * - FOR_DDL - signal that connection is going to be used for DDL + * + * Only one connection associated with the placement may have FOR_DML or + * FOR_DDL set. This restriction prevents deadlocks and wrong results due to + * in-progress transactions. + */ +MultiConnection * +StartPlacementConnection(uint32 flags, ShardPlacement *placement, const char *userName) +{ + ConnectionPlacementHashKey key; + ConnectionPlacementHashEntry *placementEntry = NULL; + ConnectionReference *returnConnectionReference = NULL; + char *freeUserName = NULL; + bool found = false; + + if (userName == NULL) + { + userName = freeUserName = CurrentUserName(); + } + + key.placementId = placement->placementId; + + /* lookup relevant hash entry */ + placementEntry = hash_search(ConnectionPlacementHash, &key, HASH_ENTER, &found); + if (!found) + { + dlist_init(&placementEntry->connectionReferences); + placementEntry->failed = false; + placementEntry->modifyingConnection = NULL; + } + + /* + * Check whether any of the connections already associated with the + * placement can be reused, or violates FOR_DML/FOR_DDL constraints. + */ + returnConnectionReference = CheckExistingConnections(flags, userName, placementEntry); + + /* + * Either no caching desired, or no connection present. Start connection + * establishment. Allocations are performed in transaction context, so we + * don't have to care about freeing in case of an early disconnect. + */ + if (returnConnectionReference == NULL) + { + MultiConnection *connection = StartNodeConnection(flags, placement->nodeName, + placement->nodePort); + + returnConnectionReference = (ConnectionReference *) + MemoryContextAlloc(TopTransactionContext, + sizeof(ConnectionReference)); + returnConnectionReference->connection = connection; + returnConnectionReference->hadDDL = false; + returnConnectionReference->hadDML = false; + returnConnectionReference->shardId = placement->shardId; + returnConnectionReference->placementId = placement->placementId; + returnConnectionReference->userName = + MemoryContextStrdup(TopTransactionContext, userName); + dlist_push_tail(&placementEntry->connectionReferences, + &returnConnectionReference->placementNode); + + /* record association with shard, for invalidation */ + AssociatePlacementWithShard(placementEntry, placement); + + /* record association with connection, to handle connection closure */ + dlist_push_tail(&connection->referencedPlacements, + &returnConnectionReference->connectionNode); + } + + if (flags & FOR_DDL) + { + placementEntry->modifyingConnection = returnConnectionReference; + returnConnectionReference->hadDDL = true; + } + if (flags & FOR_DML) + { + placementEntry->modifyingConnection = returnConnectionReference; + returnConnectionReference->hadDML = true; + } + + if (freeUserName) + { + pfree(freeUserName); + } + + return returnConnectionReference->connection; +} + + +/* + * CheckExistingConnections check whether any of the existing connections is + * usable. If so, return it, otherwise return NULL. + * + * A connection is usable if it is not in use, the user matches, DDL/DML usage + * match and cached connection are allowed. If no existing connection + * matches, but a new connection would conflict (e.g. a connection already + * exists but isn't usable, and the desired connection needs to execute + * DML/DML) an error is thrown. + */ +static ConnectionReference * +CheckExistingConnections(uint32 flags, const char *userName, + ConnectionPlacementHashEntry *placementEntry) +{ + dlist_iter it; + + /* + * If there's a connection that has executed DML/DDL, always return it if + * possible. That's because we only can execute DML/DDL over that + * connection. Checking this first also allows us to detect conflicts due + * to opening a second connection that wants to execute DML/DDL. + */ + if (placementEntry->modifyingConnection) + { + ConnectionReference *connectionReference = placementEntry->modifyingConnection; + + if (CanUseExistingConnection(flags, userName, connectionReference)) + { + return connectionReference; + } + + if (connectionReference->hadDDL) + { + /* would deadlock otherwise */ + ereport(ERROR, + (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("cannot establish new placement connection when DDL has been " + "executed on existing placement connection"))); + } + else if (connectionReference->hadDML) + { + /* we'd not see the other connection's contents */ + ereport(ERROR, + (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("cannot establish new placement connection when DML has been " + "executed on existing placement connection"))); + } + else + { + /* modifying xact should have executed DML/DDL */ + Assert(false); + } + } + + /* + * Search existing connections for a reusable connection. + */ + dlist_foreach(it, &placementEntry->connectionReferences) + { + ConnectionReference *connectionReference = + dlist_container(ConnectionReference, placementNode, it.cur); + + if (CanUseExistingConnection(flags, userName, connectionReference)) + { + return connectionReference; + } + + /* + * If not using the connection, verify that FOR_DML/DDL flags are + * compatible. + */ + if (flags & FOR_DDL) + { + /* would deadlock otherwise */ + ereport(ERROR, + (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("cannot establish new placement connection for DDL when " + "other connections exist"))); + } + else if (flags & FOR_DML) + { + /* + * We could allow this case (as presumably the SELECT is done, but + * it'd be a bit prone to programming errors, so we disallow for + * now. + */ + ereport(ERROR, + (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("cannot establish new placement connection for DML when " + "other connections exist"))); + } + } + + /* establish a new connection */ + return NULL; +} + + +/* + * CanUseExistingConnection is a helper function for CheckExistingConnections() + * that checks whether an existing connection can be reused. + */ +static bool +CanUseExistingConnection(uint32 flags, const char *userName, + ConnectionReference *connectionReference) +{ + MultiConnection *connection = connectionReference->connection; + + if (!connection) + { + /* if already closed connection obviously not usable */ + return false; + } + else if (connection->claimedExclusively) + { + /* already used */ + return false; + } + else if (flags & FORCE_NEW_CONNECTION) + { + /* no connection reuse desired */ + return false; + } + else if (strcmp(connectionReference->userName, userName) != 0) + { + /* connection for different user, check for conflict */ + return false; + } + else + { + return true; + } +} + + +/* + * AssociatePlacementWithShard records shard->placement relation in + * ConnectionShardHash. + * + * That association is later used, in CheckForFailedPlacements, to invalidate + * shard placements if necessary. + */ +static void +AssociatePlacementWithShard(ConnectionPlacementHashEntry *placementEntry, + ShardPlacement *placement) +{ + ConnectionShardHashKey shardKey; + ConnectionShardHashEntry *shardEntry = NULL; + bool found = false; + dlist_iter placementIter; + + shardKey.shardId = placement->shardId; + shardEntry = hash_search(ConnectionShardHash, &shardKey, HASH_ENTER, &found); + if (!found) + { + dlist_init(&shardEntry->placementConnections); + } + + /* + * Check if placement is already associated with shard (happens if there's + * multiple connections for a placement). There'll usually only be few + * placement per shard, so the price of iterating isn't large. + */ + dlist_foreach(placementIter, &shardEntry->placementConnections) + { + ConnectionPlacementHashEntry *placementEntry = + dlist_container(ConnectionPlacementHashEntry, shardNode, placementIter.cur); + + if (placementEntry->key.placementId == placement->placementId) + { + return; + } + } + + /* otherwise add */ + dlist_push_tail(&shardEntry->placementConnections, &placementEntry->shardNode); +} + + +/* + * CloseShardPlacementAssociation handles a connection being closed before + * transaction end. + * + * This should only be called by connection_management.c. + */ +void +CloseShardPlacementAssociation(struct MultiConnection *connection) +{ + dlist_iter placementIter; + + /* set connection to NULL for all references to the connection */ + dlist_foreach(placementIter, &connection->referencedPlacements) + { + ConnectionReference *reference = + dlist_container(ConnectionReference, connectionNode, placementIter.cur); + + reference->connection = NULL; + + /* + * Note that we don't reset ConnectionPlacementHashEntry's + * modifyingConnection here, that'd more complicated than it seems + * worth. That means we'll error out spuriously if a DML/DDL + * executing connection is closed earlier in a transaction. + */ + } +} + + +/* + * ResetShardPlacementAssociation resets the association of connections to + * shard placements at the end of a transaction. + * + * This should only be called by connection_management.c. + */ +void +ResetShardPlacementAssociation(struct MultiConnection *connection) +{ + dlist_init(&connection->referencedPlacements); +} + + +/* + * ResetPlacementConnectionManagement() disassociates connections from + * placements and shards. This will be called at the end of XACT_EVENT_COMMIT + * and XACT_EVENT_ABORT. + */ +void +ResetPlacementConnectionManagement(void) +{ + /* Simply delete all entries */ + hash_delete_all(ConnectionPlacementHash); + hash_delete_all(ConnectionShardHash); + + /* + * NB: memory for ConnectionReference structs and subordinate data is + * deleted by virtue of being allocated in TopTransactionContext. + */ +} + + +/* + * CheckForFailedPlacements checks which placements have to be marked as + * invalid, and/or whether sufficiently many placements have failed to abort + * the entire coordinated transaction. + * + * This will usually be called twice. Once before the remote commit is done, + * and once after. This is so we can abort before executing remote commits, + * and so we can handle remote transactions that failed during commit. + * + * When preCommit or using2PC is true, failures on transactions marked as + * critical will abort the entire coordinated transaction. If not we can't + * roll back, because some remote transactions might have already committed. + */ +void +CheckForFailedPlacements(bool preCommit, bool using2PC) +{ + HASH_SEQ_STATUS status; + ConnectionShardHashEntry *shardEntry = NULL; + int successes = 0; + int attempts = 0; + + hash_seq_init(&status, ConnectionShardHash); + while ((shardEntry = (ConnectionShardHashEntry *) hash_seq_search(&status)) != 0) + { + attempts++; + if (CheckShardPlacements(shardEntry, preCommit, using2PC)) + { + successes++; + } + } + + /* + * If no shards could be modified at all, error out. Doesn't matter if + * we're post-commit - there's nothing to invalidate. + */ + if (attempts > 0 && successes == 0) + { + ereport(ERROR, (errmsg("could not commit transaction on any active node"))); + } +} + + +/* + * CheckShardPlacements is a helper function for CheckForFailedPlacements that + * performs the per-shard work. + */ +static bool +CheckShardPlacements(ConnectionShardHashEntry *shardEntry, + bool preCommit, bool using2PC) +{ + int failures = 0; + int successes = 0; + dlist_iter placementIter; + + dlist_foreach(placementIter, &shardEntry->placementConnections) + { + ConnectionPlacementHashEntry *placementEntry = + dlist_container(ConnectionPlacementHashEntry, shardNode, placementIter.cur); + dlist_iter referenceIter; + + dlist_foreach(referenceIter, &placementEntry->connectionReferences) + { + ConnectionReference *reference = + dlist_container(ConnectionReference, placementNode, referenceIter.cur); + MultiConnection *connection = reference->connection; + + /* + * If neither DDL nor DML were executed, there's no need for + * invalidation. + */ + if (!reference->hadDDL && !reference->hadDML) + { + continue; + } + + /* + * Failed if connection was closed, or remote transaction was + * unsuccessful. + */ + if (!connection || connection->remoteTransaction.transactionFailed) + { + placementEntry->failed = true; + } + } + + if (placementEntry->failed) + { + failures++; + } + else + { + successes++; + } + } + + if (failures > 0 && successes == 0) + { + int elevel = 0; + + /* + * Only error out if we're pre-commit or using 2PC. Can't error + * otherwise as we can end up with a state where some shard + * modifications have already committed successfully. If no + * modifications at all succeed, CheckForFailedPlacements() will error + * out. This sucks. + */ + if (preCommit || using2PC) + { + elevel = ERROR; + } + else + { + elevel = WARNING; + } + + ereport(elevel, + (errmsg("could not commit transaction for shard " INT64_FORMAT + " on any active node", + shardEntry->key.shardId))); + return false; + } + + /* mark all failed placements invalid */ + dlist_foreach(placementIter, &shardEntry->placementConnections) + { + ConnectionPlacementHashEntry *placementEntry = + dlist_container(ConnectionPlacementHashEntry, shardNode, placementIter.cur); + + if (placementEntry->failed) + { + UpdateShardPlacementState(placementEntry->key.placementId, FILE_INACTIVE); + } + } + + return true; +} + + +/* + * InitPlacementConnectionManagement performs initialization of the + * infrastructure in this file at server start. + */ +void +InitPlacementConnectionManagement(void) +{ + HASHCTL info; + uint32 hashFlags = 0; + + /* create (placementId) -> [ConnectionReference] hash */ + memset(&info, 0, sizeof(info)); + info.keysize = sizeof(ConnectionPlacementHashKey); + info.entrysize = sizeof(ConnectionPlacementHashEntry); + info.hash = tag_hash; + info.hcxt = ConnectionContext; + hashFlags = (HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + + ConnectionPlacementHash = hash_create("citus connection cache (placementid)", + 64, &info, hashFlags); + + /* create (shardId) -> [ConnectionShardHashEntry] hash */ + memset(&info, 0, sizeof(info)); + info.keysize = sizeof(ConnectionShardHashKey); + info.entrysize = sizeof(ConnectionShardHashEntry); + info.hash = tag_hash; + info.hcxt = ConnectionContext; + hashFlags = (HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + + ConnectionShardHash = hash_create("citus connection cache (shardid)", + 64, &info, hashFlags); +} diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index bc176a110..509ab0381 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -33,6 +33,7 @@ #include "distributed/multi_router_planner.h" #include "distributed/multi_server_executor.h" #include "distributed/multi_utility.h" +#include "distributed/placement_connection.h" #include "distributed/remote_commands.h" #include "distributed/task_tracker.h" #include "distributed/transaction_management.h" @@ -160,6 +161,7 @@ _PG_init(void) /* initialize coordinated transaction management */ InitializeTransactionManagement(); InitializeConnectionManagement(); + InitPlacementConnectionManagement(); /* enable modification of pg_catalog tables during pg_upgrade */ if (IsBinaryUpgrade) diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 7fe79ce75..92ba56631 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -24,6 +24,7 @@ #include "distributed/multi_router_executor.h" #include "distributed/multi_shard_transaction.h" #include "distributed/transaction_management.h" +#include "distributed/placement_connection.h" #include "utils/hsearch.h" #include "utils/guc.h" @@ -160,6 +161,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) /* close connections etc. */ if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE) { + ResetPlacementConnectionManagement(); AfterXactConnectionHandling(true); } @@ -196,6 +198,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) /* close connections etc. */ if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE) { + ResetPlacementConnectionManagement(); AfterXactConnectionHandling(false); } @@ -233,6 +236,21 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) break; } + /* + * TODO: It'd probably be a good idea to force constraints and + * such to 'immediate' here. Deferred triggers might try to send + * stuff to the remote side, which'd not be good. Doing so + * remotely would also catch a class of errors where committing + * fails, which can lead to divergence when not using 2PC. + */ + + /* + * Check whether the coordinated transaction is in a state we want + * to persist, or whether we want to error out. This handles the + * case that iteratively executed commands marked all placements + * as invalid. + */ + CheckForFailedPlacements(true, CurrentTransactionUse2PC); if (CurrentTransactionUse2PC) { @@ -257,6 +275,13 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) * (e.g. if the remote commit failed). */ RouterExecutorPreCommitCheck(); + + /* + * + * Check again whether shards/placement successfully + * committed. This handles failure at COMMIT/PREPARE time. + */ + CheckForFailedPlacements(false, CurrentTransactionUse2PC); } break; diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 981ca8cb9..c3967365d 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -33,7 +33,11 @@ enum MultiConnectionMode FORCE_NEW_CONNECTION = 1 << 0, /* mark returned connection as having session lifespan */ - SESSION_LIFESPAN = 1 << 1 + SESSION_LIFESPAN = 1 << 1, + + FOR_DDL = 1 << 2, + + FOR_DML = 1 << 3 }; @@ -68,6 +72,9 @@ typedef struct MultiConnection /* membership in list of in-progress transactions */ dlist_node transactionNode; + + /* list of all placements referenced by this connection */ + dlist_head referencedPlacements; } MultiConnection; diff --git a/src/include/distributed/placement_connection.h b/src/include/distributed/placement_connection.h new file mode 100644 index 000000000..c76d838aa --- /dev/null +++ b/src/include/distributed/placement_connection.h @@ -0,0 +1,33 @@ +/*------------------------------------------------------------------------- + * placement_connection.h + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef PLACEMENT_CONNECTION_H +#define PLACEMENT_CONNECTION_H + + +#include "distributed/connection_management.h" + +/* forward declare, to avoid dependency on ShardPlacement definition */ +struct ShardPlacement; + +extern MultiConnection * GetPlacementConnection(uint32 flags, + struct ShardPlacement *placement, + const char *userName); +extern MultiConnection * StartPlacementConnection(uint32 flags, + struct ShardPlacement *placement, + const char *userName); + +extern void ResetPlacementConnectionManagement(void); +extern void CheckForFailedPlacements(bool preCommit, bool using2PC); + +extern void CloseShardPlacementAssociation(struct MultiConnection *connection); +extern void ResetShardPlacementAssociation(struct MultiConnection *connection); + +extern void InitPlacementConnectionManagement(void); + +#endif /* PLACEMENT_CONNECTION_H */ From 7320c17f007b763d0ef9266bd9e5a59db41fea7a Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Thu, 5 Jan 2017 22:14:54 -0800 Subject: [PATCH 2/2] Convert router executor to placement connection management infrastructure. Remove the router specific transaction and shard management, and replace it with the new placement connection API. This mostly leaves behaviour alone, except that it is now, inside a transaction, legal to select from a shard to which no pre-existing connection exists. To simplify code the code handling task executions for select and modify has been split into two - the previous coding was starting to get confusing due to the amount of only conditionally applicable code. Modification connections & transactions are now always established in parallel, not just for reference tables. --- .../executor/multi_router_executor.c | 718 +++++------------- .../transaction/transaction_management.c | 11 - .../distributed/multi_router_executor.h | 2 - .../expected/multi_modifying_xacts.out | 18 +- .../regress/sql/multi_modifying_xacts.sql | 7 +- 5 files changed, 209 insertions(+), 547 deletions(-) diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 2af22d251..bf04d1f81 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -41,6 +41,7 @@ #include "distributed/multi_router_executor.h" #include "distributed/multi_router_planner.h" #include "distributed/multi_shard_transaction.h" +#include "distributed/placement_connection.h" #include "distributed/relay_utility.h" #include "distributed/remote_commands.h" #include "distributed/remote_transaction.h" @@ -72,30 +73,14 @@ /* controls use of locks to enforce safe commutativity */ bool AllModificationsCommutative = false; - -/* - * The following static variables are necessary to track the progression of - * multi-statement transactions managed by the router executor. After the first - * modification within a transaction, the executor populates a hash with the - * transaction's initial participants (nodes hit by that initial modification). - * - * To keep track of the reverse mapping (from shards to nodes), we have a list - * of XactShardConnSets, which map a shard identifier to a set of connection - * hash entries. This list is walked by MarkRemainingInactivePlacements to - * ensure we mark placements as failed if they reject a COMMIT. - */ -static HTAB *xactParticipantHash = NULL; -static List *xactShardConnSetList = NIL; - -/* functions needed during start phase */ -static void InitTransactionStateForTask(Task *task); -static HTAB * CreateXactParticipantHash(void); - /* functions needed during run phase */ static void ReacquireMetadataLocks(List *taskList); -static bool ExecuteSingleTask(QueryDesc *queryDesc, Task *task, - bool isModificationQuery, bool expectResults); -static void GetPlacementConnectionsReadyForTwoPhaseCommit(List *taskPlacementList); +static void ExecuteSingleModifyTask(QueryDesc *queryDesc, Task *task, + bool expectResults); +static void ExecuteSingleSelectTask(QueryDesc *queryDesc, Task *task); +static List * GetModifyConnections(List *taskPlacementList, + bool markCritical, + bool startedInTransaction); static void ExecuteMultipleTasks(QueryDesc *queryDesc, List *taskList, bool isModificationQuery, bool expectResults); static int64 ExecuteModifyTasks(List *taskList, bool expectResults, @@ -109,23 +94,15 @@ static bool RequiresConsistentSnapshot(Task *task); static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor, DestReceiver *destination, Tuplestorestate *tupleStore); -static PGconn * GetConnectionForPlacement(ShardPlacement *placement, - bool isModificationQuery); -static void PurgeConnectionForPlacement(PGconn *connection, ShardPlacement *placement); -static void RemoveXactConnection(PGconn *connection); static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, Oid **parameterTypes, const char ***parameterValues); -static bool SendQueryInSingleRowMode(PGconn *connection, char *query, +static bool SendQueryInSingleRowMode(MultiConnection *connection, char *query, ParamListInfo paramListInfo); -static bool StoreQueryResult(MaterialState *routerState, PGconn *connection, +static bool StoreQueryResult(MaterialState *routerState, MultiConnection *connection, TupleDesc tupleDescriptor, bool failOnError, int64 *rows); -static bool ConsumeQueryResult(PGconn *connection, bool failOnError, int64 *rows); -static void RecordShardIdParticipant(uint64 affectedShardId, - NodeConnectionEntry *participantEntry); - -/* to verify the health of shards after a transactional modification command */ -static void MarkRemainingInactivePlacements(void); +static bool ConsumeQueryResult(MultiConnection *connection, bool failOnError, + int64 *rows); /* @@ -232,52 +209,6 @@ ReacquireMetadataLocks(List *taskList) } -/* - * InitTransactionStateForTask is called during executor start with the first - * modifying (INSERT/UPDATE/DELETE) task during a transaction. It creates the - * transaction participant hash, opens connections to this task's nodes, and - * populates the hash with those connections after sending BEGIN commands to - * each. If a node fails to respond, its connection is set to NULL to prevent - * further interaction with it during the transaction. - */ -static void -InitTransactionStateForTask(Task *task) -{ - ListCell *placementCell = NULL; - - BeginOrContinueCoordinatedTransaction(); - - xactParticipantHash = CreateXactParticipantHash(); - - foreach(placementCell, task->taskPlacementList) - { - ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell); - NodeConnectionKey participantKey; - NodeConnectionEntry *participantEntry = NULL; - bool entryFound = false; - int connectionFlags = SESSION_LIFESPAN; - MultiConnection *connection = - GetNodeConnection(connectionFlags, placement->nodeName, placement->nodePort); - - MemSet(&participantKey, 0, sizeof(participantKey)); - strlcpy(participantKey.nodeName, placement->nodeName, - MAX_NODE_LENGTH + 1); - participantKey.nodePort = placement->nodePort; - - participantEntry = hash_search(xactParticipantHash, &participantKey, - HASH_ENTER, &entryFound); - Assert(!entryFound); - - /* issue BEGIN if necessary */ - RemoteTransactionBeginIfNecessary(connection); - - participantEntry->connection = connection; - } - - XactModificationLevel = XACT_MODIFICATION_DATA; -} - - /* * AcquireExecutorShardLock acquires a lock on the shard for the given task and * command type if necessary to avoid divergence between multiple replicas of @@ -527,33 +458,6 @@ RequiresConsistentSnapshot(Task *task) } -/* - * CreateXactParticipantHash initializes the map used to store the connections - * needed to process distributed transactions. Unlike the connection cache, we - * permit NULL connections here to signify that a participant has seen an error - * and is no longer receiving commands during a transaction. This hash should - * be walked at transaction end to send final COMMIT or ABORT commands. - */ -static HTAB * -CreateXactParticipantHash(void) -{ - HTAB *xactParticipantHash = NULL; - HASHCTL info; - int hashFlags = 0; - - MemSet(&info, 0, sizeof(info)); - info.keysize = sizeof(NodeConnectionKey); - info.entrysize = sizeof(NodeConnectionEntry); - info.hcxt = TopTransactionContext; - hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_BLOBS); - - xactParticipantHash = hash_create("citus xact participant hash", 32, &info, - hashFlags); - - return xactParticipantHash; -} - - /* * RouterExecutorRun actually executes a single task on a worker. */ @@ -633,13 +537,14 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count) if (list_length(taskList) == 1) { Task *task = (Task *) linitial(taskList); - bool resultsOK = false; - resultsOK = ExecuteSingleTask(queryDesc, task, isModificationQuery, - sendTuples); - if (!resultsOK) + if (isModificationQuery) { - ereport(ERROR, (errmsg("could not receive query results"))); + ExecuteSingleModifyTask(queryDesc, task, sendTuples); + } + else + { + ExecuteSingleSelectTask(queryDesc, task); } } else @@ -692,19 +597,73 @@ out: /* - * ExecuteSingleTask executes the task on the remote node, retrieves the - * results and stores them, if SELECT or RETURNING is used, in a tuple - * store. + * ExecuteSingleSelectTask executes the task on the remote node, retrieves the + * results and stores them in a tuple store. * * If the task fails on one of the placements, the function retries it on - * other placements (SELECT), reraises the remote error (constraint violation - * in DML), marks the affected placement as invalid (DML on some placement - * failed), or errors out (DML failed on all placements). + * other placements or errors out if the query fails on all placements. */ -static bool -ExecuteSingleTask(QueryDesc *queryDesc, Task *task, - bool isModificationQuery, - bool expectResults) +static void +ExecuteSingleSelectTask(QueryDesc *queryDesc, Task *task) +{ + TupleDesc tupleDescriptor = queryDesc->tupDesc; + MaterialState *routerState = (MaterialState *) queryDesc->planstate; + ParamListInfo paramListInfo = queryDesc->params; + List *taskPlacementList = task->taskPlacementList; + ListCell *taskPlacementCell = NULL; + char *queryString = task->queryString; + + if (XactModificationLevel == XACT_MODIFICATION_MULTI_SHARD) + { + ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("single-shard query may not appear in transaction blocks " + "which contain multi-shard data modifications"))); + } + + /* + * Try to run the query to completion on one placement. If the query fails + * attempt the query on the next placement. + */ + foreach(taskPlacementCell, taskPlacementList) + { + ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell); + bool queryOK = false; + bool dontFailOnError = false; + int64 currentAffectedTupleCount = 0; + int connectionFlags = SESSION_LIFESPAN; + MultiConnection *connection = + GetPlacementConnection(connectionFlags, taskPlacement, NULL); + + queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo); + if (!queryOK) + { + continue; + } + + queryOK = StoreQueryResult(routerState, connection, tupleDescriptor, + dontFailOnError, ¤tAffectedTupleCount); + if (queryOK) + { + return; + } + } + + ereport(ERROR, (errmsg("could not receive query results"))); +} + + +/* + * ExecuteSingleModifyTask executes the task on the remote node, retrieves the + * results and stores them, if RETURNING is used, in a tuple store. + * + * If the task fails on one of the placements, the function reraises the + * remote error (constraint violation in DML), marks the affected placement as + * invalid (other error on some placements, via the placement connection + * framework), or errors out (failed on all placements). + */ +static void +ExecuteSingleModifyTask(QueryDesc *queryDesc, Task *task, + bool expectResults) { CmdType operation = queryDesc->operation; TupleDesc tupleDescriptor = queryDesc->tupDesc; @@ -713,12 +672,15 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task, ParamListInfo paramListInfo = queryDesc->params; bool resultsOK = false; List *taskPlacementList = task->taskPlacementList; + List *connectionList = NIL; ListCell *taskPlacementCell = NULL; - List *failedPlacementList = NIL; + ListCell *connectionCell = NULL; int64 affectedTupleCount = -1; bool gotResults = false; char *queryString = task->queryString; bool taskRequiresTwoPhaseCommit = (task->replicationModel == REPLICATION_MODEL_2PC); + bool startedInTransaction = + InCoordinatedTransaction() && XactModificationLevel == XACT_MODIFICATION_DATA; if (XactModificationLevel == XACT_MODIFICATION_MULTI_SHARD) { @@ -729,59 +691,52 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task, } /* - * Firstly ensure that distributed transaction is started. Then, force - * the transaction manager to use 2PC while running the task on the placements. + * Modifications for reference tables are always done using 2PC. First + * ensure that distributed transaction is started. Then force the + * transaction manager to use 2PC while running the task on the + * placements. */ if (taskRequiresTwoPhaseCommit) { BeginOrContinueCoordinatedTransaction(); CoordinatedTransactionUse2PC(); - - /* - * Mark connections for all placements as critical and establish connections - * to all placements at once. - */ - GetPlacementConnectionsReadyForTwoPhaseCommit(taskPlacementList); } /* - * We could naturally handle function-based transactions (i.e. those - * using PL/pgSQL or similar) by checking the type of queryDesc->dest, - * but some customers already use functions that touch multiple shards - * from within a function, so we'll ignore functions for now. + * We could naturally handle function-based transactions (i.e. those using + * PL/pgSQL or similar) by checking the type of queryDesc->dest, but some + * customers already use functions that touch multiple shards from within + * a function, so we'll ignore functions for now. */ - if (operation != CMD_SELECT && xactParticipantHash == NULL && IsTransactionBlock()) + if (IsTransactionBlock()) { - InitTransactionStateForTask(task); + BeginOrContinueCoordinatedTransaction(); } + /* + * Get connections required to execute task. This will, if necessary, + * establish the connection, mark as critical (when modifying reference + * table) and start a transaction (when in a transaction). + */ + connectionList = GetModifyConnections(taskPlacementList, + taskRequiresTwoPhaseCommit, + startedInTransaction); + /* prevent replicas of the same shard from diverging */ AcquireExecutorShardLock(task, operation); - /* - * Try to run the query to completion on one placement. If the query fails - * attempt the query on the next placement. - */ - foreach(taskPlacementCell, taskPlacementList) + /* try to execute modification on all placements */ + forboth(taskPlacementCell, taskPlacementList, connectionCell, connectionList) { ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell); + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); bool queryOK = false; bool failOnError = false; int64 currentAffectedTupleCount = 0; - PGconn *connection = GetConnectionForPlacement(taskPlacement, - isModificationQuery); - - if (connection == NULL) - { - failedPlacementList = lappend(failedPlacementList, taskPlacement); - continue; - } queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo); if (!queryOK) { - PurgeConnectionForPlacement(connection, taskPlacement); - failedPlacementList = lappend(failedPlacementList, taskPlacement); continue; } @@ -829,87 +784,88 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task, resultsOK = true; gotResults = true; - - /* - * Modifications have to be executed on all placements, but for - * read queries we can stop here. - */ - if (!isModificationQuery) - { - break; - } - } - else - { - PurgeConnectionForPlacement(connection, taskPlacement); - - failedPlacementList = lappend(failedPlacementList, taskPlacement); - - continue; } } - if (isModificationQuery) + /* if all placements failed, error out */ + if (!resultsOK) { - ListCell *failedPlacementCell = NULL; - - /* if all placements failed, error out */ - if (list_length(failedPlacementList) == list_length(task->taskPlacementList)) - { - ereport(ERROR, (errmsg("could not modify any active placements"))); - } - - /* - * Otherwise, mark failed placements as inactive: they're stale. Note that - * connections for tasks that require 2PC has already failed the whole transaction - * and there is no way that they're marked stale here. - */ - foreach(failedPlacementCell, failedPlacementList) - { - ShardPlacement *failedPlacement = - (ShardPlacement *) lfirst(failedPlacementCell); - - Assert(!taskRequiresTwoPhaseCommit); - - UpdateShardPlacementState(failedPlacement->placementId, FILE_INACTIVE); - } - - executorState->es_processed = affectedTupleCount; + ereport(ERROR, (errmsg("could not modify any active placements"))); } - return resultsOK; + executorState->es_processed = affectedTupleCount; + + if (IsTransactionBlock()) + { + XactModificationLevel = XACT_MODIFICATION_DATA; + } } /* - * GetPlacementConnectionsReadyForTwoPhaseCommit iterates over the task placement list, - * starts the connections to the nodes and marks them critical. In the second iteration, - * the connection establishments are finished. Finally, BEGIN commands are sent, - * if necessary. + * GetModifyConnections returns the list of connections required to execute + * modify commands on the placements in tasPlacementList. If necessary remote + * transactions are started. + * + * If markCritical is true remote transactions are marked as critical. If + * noNewTransactions is true, this function errors out if there's no + * transaction in progress. */ -static void -GetPlacementConnectionsReadyForTwoPhaseCommit(List *taskPlacementList) +static List * +GetModifyConnections(List *taskPlacementList, bool markCritical, bool noNewTransactions) { ListCell *taskPlacementCell = NULL; List *multiConnectionList = NIL; - /* in the first iteration start the connections */ + /* first initiate connection establishment for all necessary connections */ foreach(taskPlacementCell, taskPlacementList) { ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell); - int connectionFlags = SESSION_LIFESPAN; - MultiConnection *multiConnection = StartNodeConnection(connectionFlags, - taskPlacement->nodeName, - taskPlacement->nodePort); + int connectionFlags = SESSION_LIFESPAN | FOR_DML; + MultiConnection *multiConnection = NULL; - MarkRemoteTransactionCritical(multiConnection); + /* + * FIXME: It's not actually correct to use only one shard placement + * here for router queries involving multiple relations. We should + * check that this connection is the only modifying one associated + * with all the involved shards. + */ + multiConnection = StartPlacementConnection(connectionFlags, taskPlacement, NULL); + + /* + * If already in a transaction, disallow expanding set of remote + * transactions. That prevents some forms of distributed deadlocks. + */ + if (noNewTransactions) + { + RemoteTransaction *transaction = &multiConnection->remoteTransaction; + + if (transaction->transactionState == REMOTE_TRANS_INVALID) + { + ereport(ERROR, (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), + errmsg("no transaction participant matches %s:%d", + taskPlacement->nodeName, taskPlacement->nodePort), + errdetail("Transactions which modify distributed tables " + "may only target nodes affected by the " + "modification command which began the transaction."))); + } + } + + if (markCritical) + { + MarkRemoteTransactionCritical(multiConnection); + } multiConnectionList = lappend(multiConnectionList, multiConnection); } + /* then finish in parallel */ FinishConnectionListEstablishment(multiConnectionList); + /* and start transactions if applicable */ RemoteTransactionsBeginIfNecessary(multiConnectionList); + + return multiConnectionList; } @@ -1009,8 +965,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn bool shardConnectionsFound = false; ShardConnections *shardConnections = NULL; List *connectionList = NIL; - MultiConnection *multiConnection = NULL; - PGconn *connection = NULL; + MultiConnection *connection = NULL; bool queryOK = false; shardConnections = GetShardConnections(shardId, &shardConnectionsFound); @@ -1022,14 +977,13 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn continue; } - multiConnection = + connection = (MultiConnection *) list_nth(connectionList, placementIndex); - connection = multiConnection->pgConn; queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo); if (!queryOK) { - ReraiseRemoteError(connection, NULL); + ReportConnectionError(connection, ERROR); } } @@ -1041,8 +995,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn bool shardConnectionsFound = false; ShardConnections *shardConnections = NULL; List *connectionList = NIL; - MultiConnection *multiConnection = NULL; - PGconn *connection = NULL; + MultiConnection *connection = NULL; int64 currentAffectedTupleCount = 0; bool failOnError = true; bool queryOK PG_USED_FOR_ASSERTS_ONLY = false; @@ -1060,9 +1013,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn continue; } - multiConnection = - (MultiConnection *) list_nth(connectionList, placementIndex); - connection = multiConnection->pgConn; + connection = (MultiConnection *) list_nth(connectionList, placementIndex); /* * If caller is interested, store query results the first time @@ -1101,16 +1052,13 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn if (currentAffectedTupleCount != previousAffectedTupleCount) { - char *nodeName = ConnectionGetOptionValue(connection, "host"); - char *nodePort = ConnectionGetOptionValue(connection, "port"); - ereport(WARNING, (errmsg("modified "INT64_FORMAT " tuples of shard " UINT64_FORMAT ", but expected to modify "INT64_FORMAT, currentAffectedTupleCount, shardId, previousAffectedTupleCount), - errdetail("modified placement on %s:%s", nodeName, - nodePort))); + errdetail("modified placement on %s:%d", + connection->hostname, connection->port))); } } @@ -1199,143 +1147,14 @@ ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor, } -/* - * GetConnectionForPlacement is the main entry point for acquiring a connection - * within the router executor. By using placements (rather than node names and - * ports) to identify connections, the router executor can keep track of shards - * used by multi-statement transactions and error out if a transaction tries - * to reach a new node altogether). In the single-statement commands context, - * GetConnectionForPlacement simply falls through to GetOrEstablishConnection. - */ -static PGconn * -GetConnectionForPlacement(ShardPlacement *placement, bool isModificationQuery) -{ - NodeConnectionKey participantKey; - NodeConnectionEntry *participantEntry = NULL; - bool entryFound = false; - - /* if not in a transaction, fall through to connection cache */ - if (xactParticipantHash == NULL) - { - PGconn *connection = GetOrEstablishConnection(placement->nodeName, - placement->nodePort); - - return connection; - } - - Assert(IsTransactionBlock()); - - MemSet(&participantKey, 0, sizeof(participantKey)); - strlcpy(participantKey.nodeName, placement->nodeName, MAX_NODE_LENGTH + 1); - participantKey.nodePort = placement->nodePort; - - participantEntry = hash_search(xactParticipantHash, &participantKey, HASH_FIND, - &entryFound); - - if (entryFound) - { - if (isModificationQuery) - { - RecordShardIdParticipant(placement->shardId, participantEntry); - } - - return participantEntry->connection->pgConn; - } - else - { - ereport(ERROR, (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), - errmsg("no transaction participant matches %s:%d", - placement->nodeName, placement->nodePort), - errdetail("Transactions which modify distributed tables may only " - "target nodes affected by the modification command " - "which began the transaction."))); - } -} - - -/* - * PurgeConnectionForPlacement provides a way to purge an invalid connection - * from all relevant connection hashes using the placement involved in the - * query at the time of the error. If a transaction is ongoing, this function - * ensures the right node's connection is set to NULL in the participant map - * for the transaction in addition to purging the connection cache's entry. - */ -static void -PurgeConnectionForPlacement(PGconn *connection, ShardPlacement *placement) -{ - CloseConnectionByPGconn(connection); - - /* - * The following is logically identical to RemoveXactConnection, but since - * we have a ShardPlacement to help build a NodeConnectionKey, we avoid - * any penalty incurred by calling BuildKeyForConnection, which must ex- - * tract host, port, and user from the connection options list. - */ - if (xactParticipantHash != NULL) - { - NodeConnectionEntry *participantEntry = NULL; - bool entryFound = false; - NodeConnectionKey nodeKey; - char *currentUser = CurrentUserName(); - - MemSet(&nodeKey, 0, sizeof(NodeConnectionKey)); - strlcpy(nodeKey.nodeName, placement->nodeName, MAX_NODE_LENGTH + 1); - nodeKey.nodePort = placement->nodePort; - strlcpy(nodeKey.nodeUser, currentUser, NAMEDATALEN); - Assert(IsTransactionBlock()); - - /* the participant hash doesn't use the user field */ - MemSet(&nodeKey.nodeUser, 0, sizeof(nodeKey.nodeUser)); - participantEntry = hash_search(xactParticipantHash, &nodeKey, HASH_FIND, - &entryFound); - - Assert(entryFound); - - participantEntry->connection = NULL; - } -} - - -/* - * Removes a given connection from the transaction participant hash, based on - * the host and port of the provided connection. If the hash is not NULL, it - * MUST contain the provided connection, or a FATAL error is raised. - */ -static void -RemoveXactConnection(PGconn *connection) -{ - NodeConnectionKey nodeKey; - NodeConnectionEntry *participantEntry = NULL; - bool entryFound = false; - - if (xactParticipantHash == NULL) - { - return; - } - - BuildKeyForConnection(connection, &nodeKey); - - /* the participant hash doesn't use the user field */ - MemSet(&nodeKey.nodeUser, 0, sizeof(nodeKey.nodeUser)); - participantEntry = hash_search(xactParticipantHash, &nodeKey, HASH_FIND, - &entryFound); - - if (!entryFound) - { - ereport(FATAL, (errmsg("could not find specified transaction connection"))); - } - - participantEntry->connection = NULL; -} - - /* * SendQueryInSingleRowMode sends the given query on the connection in an * asynchronous way. The function also sets the single-row mode on the * connection so that we receive results a row at a time. */ static bool -SendQueryInSingleRowMode(PGconn *connection, char *query, ParamListInfo paramListInfo) +SendQueryInSingleRowMode(MultiConnection *connection, char *query, + ParamListInfo paramListInfo) { int querySent = 0; int singleRowMode = 0; @@ -1349,24 +1168,27 @@ SendQueryInSingleRowMode(PGconn *connection, char *query, ParamListInfo paramLis ExtractParametersFromParamListInfo(paramListInfo, ¶meterTypes, ¶meterValues); - querySent = PQsendQueryParams(connection, query, parameterCount, parameterTypes, - parameterValues, NULL, NULL, 0); + querySent = PQsendQueryParams(connection->pgConn, query, + parameterCount, parameterTypes, parameterValues, + NULL, NULL, 0); } else { - querySent = PQsendQuery(connection, query); + querySent = PQsendQuery(connection->pgConn, query); } if (querySent == 0) { - WarnRemoteError(connection, NULL); + MarkRemoteTransactionFailed(connection, false); + ReportConnectionError(connection, WARNING); return false; } - singleRowMode = PQsetSingleRowMode(connection); + singleRowMode = PQsetSingleRowMode(connection->pgConn); if (singleRowMode == 0) { - WarnRemoteError(connection, NULL); + MarkRemoteTransactionFailed(connection, false); + ReportConnectionError(connection, WARNING); return false; } @@ -1451,7 +1273,7 @@ ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, Oid **parameterT * the connection. */ static bool -StoreQueryResult(MaterialState *routerState, PGconn *connection, +StoreQueryResult(MaterialState *routerState, MultiConnection *connection, TupleDesc tupleDescriptor, bool failOnError, int64 *rows) { AttInMetadata *attributeInputMetadata = TupleDescGetAttInMetadata(tupleDescriptor); @@ -1486,7 +1308,7 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection, uint32 columnCount = 0; ExecStatusType resultStatus = 0; - PGresult *result = PQgetResult(connection); + PGresult *result = PQgetResult(connection->pgConn); if (result == NULL) { break; @@ -1499,6 +1321,8 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection, int category = 0; bool isConstraintViolation = false; + MarkRemoteTransactionFailed(connection, false); + /* * If the error code is in constraint violation class, we want to * fail fast because we must get the same error from all shard @@ -1509,12 +1333,11 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection, if (isConstraintViolation || failOnError) { - RemoveXactConnection(connection); - ReraiseRemoteError(connection, result); + ReportResultError(connection, result, ERROR); } else { - WarnRemoteError(connection, result); + ReportResultError(connection, result, WARNING); } PQclear(result); @@ -1579,7 +1402,7 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection, * has been an error. */ static bool -ConsumeQueryResult(PGconn *connection, bool failOnError, int64 *rows) +ConsumeQueryResult(MultiConnection *connection, bool failOnError, int64 *rows) { bool commandFailed = false; bool gotResponse = false; @@ -1593,7 +1416,7 @@ ConsumeQueryResult(PGconn *connection, bool failOnError, int64 *rows) */ while (true) { - PGresult *result = PQgetResult(connection); + PGresult *result = PQgetResult(connection->pgConn); ExecStatusType status = PGRES_COMMAND_OK; if (result == NULL) @@ -1611,6 +1434,8 @@ ConsumeQueryResult(PGconn *connection, bool failOnError, int64 *rows) int category = 0; bool isConstraintViolation = false; + MarkRemoteTransactionFailed(connection, false); + /* * If the error code is in constraint violation class, we want to * fail fast because we must get the same error from all shard @@ -1621,13 +1446,13 @@ ConsumeQueryResult(PGconn *connection, bool failOnError, int64 *rows) if (isConstraintViolation || failOnError) { - RemoveXactConnection(connection); - ReraiseRemoteError(connection, result); + ReportResultError(connection, result, ERROR); } else { - WarnRemoteError(connection, result); + ReportResultError(connection, result, WARNING); } + PQclear(result); commandFailed = true; @@ -1667,50 +1492,6 @@ ConsumeQueryResult(PGconn *connection, bool failOnError, int64 *rows) } -/* - * RecordShardIdParticipant registers a connection as being involved with a - * particular shard during a multi-statement transaction. - */ -static void -RecordShardIdParticipant(uint64 affectedShardId, NodeConnectionEntry *participantEntry) -{ - XactShardConnSet *shardConnSetMatch = NULL; - ListCell *listCell = NULL; - MemoryContext oldContext = NULL; - List *connectionEntryList = NIL; - - /* check whether an entry already exists for this shard */ - foreach(listCell, xactShardConnSetList) - { - XactShardConnSet *shardConnSet = (XactShardConnSet *) lfirst(listCell); - - if (shardConnSet->shardId == affectedShardId) - { - shardConnSetMatch = shardConnSet; - } - } - - /* entries must last through the whole top-level transaction */ - oldContext = MemoryContextSwitchTo(TopTransactionContext); - - /* if no entry found, make one */ - if (shardConnSetMatch == NULL) - { - shardConnSetMatch = (XactShardConnSet *) palloc0(sizeof(XactShardConnSet)); - shardConnSetMatch->shardId = affectedShardId; - - xactShardConnSetList = lappend(xactShardConnSetList, shardConnSetMatch); - } - - /* add connection, avoiding duplicates */ - connectionEntryList = shardConnSetMatch->connectionEntryList; - shardConnSetMatch->connectionEntryList = list_append_unique_ptr(connectionEntryList, - participantEntry); - - MemoryContextSwitchTo(oldContext); -} - - /* * RouterExecutorFinish cleans up after a distributed execution. */ @@ -1745,114 +1526,3 @@ RouterExecutorEnd(QueryDesc *queryDesc) queryDesc->estate = NULL; queryDesc->totaltime = NULL; } - - -/* - * RouterExecutorPreCommitCheck() gets called after remote transactions have - * committed, so it can invalidate failed shards and perform related checks. - */ -void -RouterExecutorPreCommitCheck(void) -{ - /* no transactional router modification were issued, nothing to do */ - if (xactParticipantHash == NULL) - { - return; - } - - MarkRemainingInactivePlacements(); -} - - -/* - * Cleanup callback called after a transaction commits or aborts. - */ -void -RouterExecutorPostCommit(void) -{ - /* reset transaction state */ - xactParticipantHash = NULL; - xactShardConnSetList = NIL; -} - - -/* - * MarkRemainingInactivePlacements takes care of marking placements of a shard - * inactive after some of the placements rejected the final COMMIT phase of a - * transaction. - * - * Failures are detected by checking the connection & transaction state for - * each of the entries in the connection set for each shard. - */ -static void -MarkRemainingInactivePlacements(void) -{ - ListCell *shardConnSetCell = NULL; - int totalSuccesses = 0; - - if (xactParticipantHash == NULL) - { - return; - } - - foreach(shardConnSetCell, xactShardConnSetList) - { - XactShardConnSet *shardConnSet = (XactShardConnSet *) lfirst(shardConnSetCell); - List *participantList = shardConnSet->connectionEntryList; - ListCell *participantCell = NULL; - int successes = list_length(participantList); /* assume full success */ - - /* determine how many actual successes there were: subtract failures */ - foreach(participantCell, participantList) - { - NodeConnectionEntry *participant = - (NodeConnectionEntry *) lfirst(participantCell); - MultiConnection *connection = participant->connection; - - /* - * Fail if the connection has been set to NULL after an error, or - * if the transaction failed for other reasons (e.g. COMMIT - * failed). - */ - if (connection == NULL || connection->remoteTransaction.transactionFailed) - { - successes--; - } - } - - /* if no nodes succeeded for this shard, don't do anything */ - if (successes == 0) - { - continue; - } - - /* otherwise, ensure failed placements are marked inactive */ - foreach(participantCell, participantList) - { - NodeConnectionEntry *participant = NULL; - participant = (NodeConnectionEntry *) lfirst(participantCell); - - if (participant->connection == NULL || - participant->connection->remoteTransaction.transactionFailed) - { - uint64 shardId = shardConnSet->shardId; - NodeConnectionKey *nodeKey = &participant->cacheKey; - uint64 shardLength = 0; - uint64 placementId = INVALID_PLACEMENT_ID; - - placementId = DeleteShardPlacementRow(shardId, nodeKey->nodeName, - nodeKey->nodePort); - InsertShardPlacementRow(shardId, placementId, FILE_INACTIVE, shardLength, - nodeKey->nodeName, nodeKey->nodePort); - } - } - - totalSuccesses++; - } - - /* If no shards could be modified at all, error out. */ - if (totalSuccesses == 0) - { - ereport(ERROR, (errmsg("could not commit transaction on any active nodes"))); - } -} diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 92ba56631..40a0e4651 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -21,7 +21,6 @@ #include "access/xact.h" #include "distributed/connection_management.h" #include "distributed/hash_helpers.h" -#include "distributed/multi_router_executor.h" #include "distributed/multi_shard_transaction.h" #include "distributed/transaction_management.h" #include "distributed/placement_connection.h" @@ -150,7 +149,6 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) * callbacks still can perform work if needed. */ ResetShardPlacementTransactionState(); - RouterExecutorPostCommit(); if (CurrentCoordinatedTransactionState == COORD_TRANS_PREPARED) { @@ -187,7 +185,6 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) * callbacks still can perform work if needed. */ ResetShardPlacementTransactionState(); - RouterExecutorPostCommit(); /* handles both already prepared and open transactions */ if (CurrentCoordinatedTransactionState > COORD_TRANS_IDLE) @@ -268,14 +265,6 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) CurrentCoordinatedTransactionState = COORD_TRANS_COMMITTED; } - /* - * Call other parts of citus that need to integrate into - * transaction management. Call them *after* committing/preparing - * the remote transactions, to allow marking shards as invalid - * (e.g. if the remote commit failed). - */ - RouterExecutorPreCommitCheck(); - /* * * Check again whether shards/placement successfully diff --git a/src/include/distributed/multi_router_executor.h b/src/include/distributed/multi_router_executor.h index f584abdc8..bb16f5e15 100644 --- a/src/include/distributed/multi_router_executor.h +++ b/src/include/distributed/multi_router_executor.h @@ -37,8 +37,6 @@ extern void RouterExecutorStart(QueryDesc *queryDesc, int eflags, List *taskList extern void RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count); extern void RouterExecutorFinish(QueryDesc *queryDesc); extern void RouterExecutorEnd(QueryDesc *queryDesc); -extern void RouterExecutorPreCommitCheck(void); -extern void RouterExecutorPostCommit(void); extern int64 ExecuteModifyTasksWithoutResults(List *taskList); diff --git a/src/test/regress/expected/multi_modifying_xacts.out b/src/test/regress/expected/multi_modifying_xacts.out index 6dffd0583..aecaf0191 100644 --- a/src/test/regress/expected/multi_modifying_xacts.out +++ b/src/test/regress/expected/multi_modifying_xacts.out @@ -140,8 +140,8 @@ INSERT INTO researchers VALUES (9, 6, 'Leslie Lamport'); ERROR: no transaction participant matches localhost:57638 DETAIL: Transactions which modify distributed tables may only target nodes affected by the modification command which began the transaction. COMMIT; --- this logic even applies to router SELECTs occurring after a modification: --- selecting from the modified node is fine... +-- SELECTs may occur after a modification: First check that selecting +-- from the modified node works. BEGIN; INSERT INTO labs VALUES (6, 'Bell Labs'); SELECT count(*) FROM researchers WHERE lab_id = 6; @@ -151,7 +151,7 @@ SELECT count(*) FROM researchers WHERE lab_id = 6; (1 row) ABORT; --- but if a SELECT needs to go to new node, that's a problem... +-- then check that SELECT going to new node still is fine BEGIN; UPDATE pg_dist_shard_placement AS sp SET shardstate = 3 FROM pg_dist_shard AS s @@ -161,8 +161,11 @@ AND sp.nodeport = :worker_1_port AND s.logicalrelid = 'researchers'::regclass; INSERT INTO labs VALUES (6, 'Bell Labs'); SELECT count(*) FROM researchers WHERE lab_id = 6; -ERROR: no transaction participant matches localhost:57638 -DETAIL: Transactions which modify distributed tables may only target nodes affected by the modification command which began the transaction. + count +------- + 0 +(1 row) + ABORT; -- applies to DDL, too BEGIN; @@ -494,7 +497,9 @@ WARNING: illegal value WARNING: failed to commit transaction on localhost:57637 WARNING: illegal value WARNING: failed to commit transaction on localhost:57638 -ERROR: could not commit transaction on any active nodes +WARNING: could not commit transaction for shard 1200002 on any active node +WARNING: could not commit transaction for shard 1200003 on any active node +ERROR: could not commit transaction on any active node -- data should NOT be persisted SELECT * FROM objects WHERE id = 1; id | name @@ -530,6 +535,7 @@ INSERT INTO labs VALUES (9, 'BAD'); COMMIT; WARNING: illegal value WARNING: failed to commit transaction on localhost:57637 +WARNING: could not commit transaction for shard 1200002 on any active node \set VERBOSITY default -- data to objects should be persisted, but labs should not... SELECT * FROM objects WHERE id = 1; diff --git a/src/test/regress/sql/multi_modifying_xacts.sql b/src/test/regress/sql/multi_modifying_xacts.sql index cc10b385c..8a2497e57 100644 --- a/src/test/regress/sql/multi_modifying_xacts.sql +++ b/src/test/regress/sql/multi_modifying_xacts.sql @@ -111,15 +111,14 @@ INSERT INTO labs VALUES (6, 'Bell Labs'); INSERT INTO researchers VALUES (9, 6, 'Leslie Lamport'); COMMIT; --- this logic even applies to router SELECTs occurring after a modification: --- selecting from the modified node is fine... +-- SELECTs may occur after a modification: First check that selecting +-- from the modified node works. BEGIN; INSERT INTO labs VALUES (6, 'Bell Labs'); SELECT count(*) FROM researchers WHERE lab_id = 6; ABORT; --- but if a SELECT needs to go to new node, that's a problem... - +-- then check that SELECT going to new node still is fine BEGIN; UPDATE pg_dist_shard_placement AS sp SET shardstate = 3