diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 6767a41a9..7909e9d75 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -23,6 +23,7 @@ #include "distributed/connection_management.h" #include "distributed/metadata_cache.h" #include "distributed/hash_helpers.h" +#include "distributed/placement_connection.h" #include "mb/pg_wchar.h" #include "utils/hsearch.h" #include "utils/memutils.h" @@ -234,6 +235,7 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, connection = StartConnectionEstablishment(&key); dlist_push_tail(entry->connections, &connection->connectionNode); + ResetShardPlacementAssociation(connection); if (flags & SESSION_LIFESPAN) { @@ -333,6 +335,7 @@ CloseNodeConnections(char *nodeName, int nodePort) /* same for transaction state */ CloseRemoteTransaction(connection); + CloseShardPlacementAssociation(connection); /* we leave the per-host entry alive */ pfree(connection); @@ -366,8 +369,9 @@ CloseConnection(MultiConnection *connection) /* unlink from list of open connections */ dlist_delete(&connection->connectionNode); - /* same for transaction state */ + /* same for transaction state and shard/placement machinery */ CloseRemoteTransaction(connection); + CloseShardPlacementAssociation(connection); /* we leave the per-host entry alive */ pfree(connection); @@ -692,6 +696,7 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit) { /* reset per-transaction state */ ResetRemoteTransaction(connection); + ResetShardPlacementAssociation(connection); UnclaimConnection(connection); } diff --git a/src/backend/distributed/connection/placement_connection.c b/src/backend/distributed/connection/placement_connection.c new file mode 100644 index 000000000..0b3d7e78b --- /dev/null +++ b/src/backend/distributed/connection/placement_connection.c @@ -0,0 +1,660 @@ +/*------------------------------------------------------------------------- + * + * placement_connection.c + * Per placement connection handling. + * + * Copyright (c) 2016-2017, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + + +#include "postgres.h" + +#include "distributed/connection_management.h" +#include "distributed/hash_helpers.h" +#include "distributed/metadata_cache.h" +#include "distributed/placement_connection.h" +#include "utils/hsearch.h" +#include "utils/memutils.h" + + +/* + * A connection reference is used to register that a connection has been used + * to read or modify a shard placement as a particular user. + */ +typedef struct ConnectionReference +{ + /* identity information about the connection */ + uint64 shardId; + uint64 placementId; + + /* + * The user used to read/modify the placement. We cannot reuse connections + * that were performed using a different role, since it would not have the + * right permissions. + */ + const char *userName; + + /* the connection */ + MultiConnection *connection; + + /* + * Information about what the connection is used for. There can only be + * one connection executing DDL/DML for a placement to avoid deadlock + * issues/read-your-own-writes violations. The difference between DDL/DML + * currently is only used to emit more precise error messages. + */ + bool hadDML; + bool hadDDL; + + /* membership in ConnectionPlacementHashKey->connectionReferences */ + dlist_node placementNode; + + /* membership in MultiConnection->referencedPlacements */ + dlist_node connectionNode; +} ConnectionReference; + + +/* + * Hash table mapping placements to a list of connections. + * + * This stores a list of connections for each placement, because multiple + * connections to the same placement may exist at the same time. E.g. a + * real-time executor query may reference the same placement in several + * sub-tasks. + * + * We keep track about a connection having executed DML or DDL, since we can + * only ever allow a single transaction to do either to prevent deadlocks and + * consistency violations (e.g. read-your-own-writes). + */ + +/* hash key */ +typedef struct ConnectionPlacementHashKey +{ + uint64 placementId; +} ConnectionPlacementHashKey; + +/* hash entry */ +typedef struct ConnectionPlacementHashEntry +{ + ConnectionPlacementHashKey key; + + /* did any remote transactions fail? */ + bool failed; + + /* list of connections to remote nodes */ + dlist_head connectionReferences; + + /* if non-NULL, connection executing DML/DDL*/ + ConnectionReference *modifyingConnection; + + /* membership in ConnectionShardHashEntry->placementConnections */ + dlist_node shardNode; +} ConnectionPlacementHashEntry; + +/* hash table */ +static HTAB *ConnectionPlacementHash; + + +/* + * Hash table mapping shard ids to placements. + * + * This is used to track whether placements of a shard have to be marked + * invalid after a failure, or whether a coordinated transaction has to be + * aborted, to avoid all placements of a shard to be marked invalid. + */ + +/* hash key */ +typedef struct ConnectionShardHashKey +{ + uint64 shardId; +} ConnectionShardHashKey; + +/* hash entry */ +typedef struct ConnectionShardHashEntry +{ + ConnectionShardHashKey key; + dlist_head placementConnections; +} ConnectionShardHashEntry; + +/* hash table */ +static HTAB *ConnectionShardHash; + + +static ConnectionReference * CheckExistingConnections(uint32 flags, const char *userName, + ConnectionPlacementHashEntry * + placementEntry); +static bool CanUseExistingConnection(uint32 flags, const char *userName, + ConnectionReference *connectionReference); +static void AssociatePlacementWithShard(ConnectionPlacementHashEntry *placementEntry, + ShardPlacement *placement); +static bool CheckShardPlacements(ConnectionShardHashEntry *shardEntry, bool preCommit, + bool using2PC); + + +/* + * GetPlacementConnection establishes a connection for a placement. + * + * See StartPlacementConnection for details. + */ +MultiConnection * +GetPlacementConnection(uint32 flags, ShardPlacement *placement, const char *userName) +{ + MultiConnection *connection = StartPlacementConnection(flags, placement, userName); + + FinishConnectionEstablishment(connection); + return connection; +} + + +/* + * StartPlacementConnection initiates a connection to a remote node, + * associated with the placement and transaction. + * + * The connection is established for the current database. If userName is NULL + * the current user is used, otherwise the provided one. + * + * See StartNodeUserDatabaseConnection for details. + * + * Flags have the corresponding meaning from StartNodeUserDatabaseConnection, + * except that two additional flags have an effect: + * - FOR_DML - signal that connection is going to be used for DML (modifications) + * - FOR_DDL - signal that connection is going to be used for DDL + * + * Only one connection associated with the placement may have FOR_DML or + * FOR_DDL set. This restriction prevents deadlocks and wrong results due to + * in-progress transactions. + */ +MultiConnection * +StartPlacementConnection(uint32 flags, ShardPlacement *placement, const char *userName) +{ + ConnectionPlacementHashKey key; + ConnectionPlacementHashEntry *placementEntry = NULL; + ConnectionReference *returnConnectionReference = NULL; + char *freeUserName = NULL; + bool found = false; + + if (userName == NULL) + { + userName = freeUserName = CurrentUserName(); + } + + key.placementId = placement->placementId; + + /* lookup relevant hash entry */ + placementEntry = hash_search(ConnectionPlacementHash, &key, HASH_ENTER, &found); + if (!found) + { + dlist_init(&placementEntry->connectionReferences); + placementEntry->failed = false; + placementEntry->modifyingConnection = NULL; + } + + /* + * Check whether any of the connections already associated with the + * placement can be reused, or violates FOR_DML/FOR_DDL constraints. + */ + returnConnectionReference = CheckExistingConnections(flags, userName, placementEntry); + + /* + * Either no caching desired, or no connection present. Start connection + * establishment. Allocations are performed in transaction context, so we + * don't have to care about freeing in case of an early disconnect. + */ + if (returnConnectionReference == NULL) + { + MultiConnection *connection = StartNodeConnection(flags, placement->nodeName, + placement->nodePort); + + returnConnectionReference = (ConnectionReference *) + MemoryContextAlloc(TopTransactionContext, + sizeof(ConnectionReference)); + returnConnectionReference->connection = connection; + returnConnectionReference->hadDDL = false; + returnConnectionReference->hadDML = false; + returnConnectionReference->shardId = placement->shardId; + returnConnectionReference->placementId = placement->placementId; + returnConnectionReference->userName = + MemoryContextStrdup(TopTransactionContext, userName); + dlist_push_tail(&placementEntry->connectionReferences, + &returnConnectionReference->placementNode); + + /* record association with shard, for invalidation */ + AssociatePlacementWithShard(placementEntry, placement); + + /* record association with connection, to handle connection closure */ + dlist_push_tail(&connection->referencedPlacements, + &returnConnectionReference->connectionNode); + } + + if (flags & FOR_DDL) + { + placementEntry->modifyingConnection = returnConnectionReference; + returnConnectionReference->hadDDL = true; + } + if (flags & FOR_DML) + { + placementEntry->modifyingConnection = returnConnectionReference; + returnConnectionReference->hadDML = true; + } + + if (freeUserName) + { + pfree(freeUserName); + } + + return returnConnectionReference->connection; +} + + +/* + * CheckExistingConnections check whether any of the existing connections is + * usable. If so, return it, otherwise return NULL. + * + * A connection is usable if it is not in use, the user matches, DDL/DML usage + * match and cached connection are allowed. If no existing connection + * matches, but a new connection would conflict (e.g. a connection already + * exists but isn't usable, and the desired connection needs to execute + * DML/DML) an error is thrown. + */ +static ConnectionReference * +CheckExistingConnections(uint32 flags, const char *userName, + ConnectionPlacementHashEntry *placementEntry) +{ + dlist_iter it; + + /* + * If there's a connection that has executed DML/DDL, always return it if + * possible. That's because we only can execute DML/DDL over that + * connection. Checking this first also allows us to detect conflicts due + * to opening a second connection that wants to execute DML/DDL. + */ + if (placementEntry->modifyingConnection) + { + ConnectionReference *connectionReference = placementEntry->modifyingConnection; + + if (CanUseExistingConnection(flags, userName, connectionReference)) + { + return connectionReference; + } + + if (connectionReference->hadDDL) + { + /* would deadlock otherwise */ + ereport(ERROR, + (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("cannot establish new placement connection when DDL has been " + "executed on existing placement connection"))); + } + else if (connectionReference->hadDML) + { + /* we'd not see the other connection's contents */ + ereport(ERROR, + (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("cannot establish new placement connection when DML has been " + "executed on existing placement connection"))); + } + else + { + /* modifying xact should have executed DML/DDL */ + Assert(false); + } + } + + /* + * Search existing connections for a reusable connection. + */ + dlist_foreach(it, &placementEntry->connectionReferences) + { + ConnectionReference *connectionReference = + dlist_container(ConnectionReference, placementNode, it.cur); + + if (CanUseExistingConnection(flags, userName, connectionReference)) + { + return connectionReference; + } + + /* + * If not using the connection, verify that FOR_DML/DDL flags are + * compatible. + */ + if (flags & FOR_DDL) + { + /* would deadlock otherwise */ + ereport(ERROR, + (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("cannot establish new placement connection for DDL when " + "other connections exist"))); + } + else if (flags & FOR_DML) + { + /* + * We could allow this case (as presumably the SELECT is done, but + * it'd be a bit prone to programming errors, so we disallow for + * now. + */ + ereport(ERROR, + (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("cannot establish new placement connection for DML when " + "other connections exist"))); + } + } + + /* establish a new connection */ + return NULL; +} + + +/* + * CanUseExistingConnection is a helper function for CheckExistingConnections() + * that checks whether an existing connection can be reused. + */ +static bool +CanUseExistingConnection(uint32 flags, const char *userName, + ConnectionReference *connectionReference) +{ + MultiConnection *connection = connectionReference->connection; + + if (!connection) + { + /* if already closed connection obviously not usable */ + return false; + } + else if (connection->claimedExclusively) + { + /* already used */ + return false; + } + else if (flags & FORCE_NEW_CONNECTION) + { + /* no connection reuse desired */ + return false; + } + else if (strcmp(connectionReference->userName, userName) != 0) + { + /* connection for different user, check for conflict */ + return false; + } + else + { + return true; + } +} + + +/* + * AssociatePlacementWithShard records shard->placement relation in + * ConnectionShardHash. + * + * That association is later used, in CheckForFailedPlacements, to invalidate + * shard placements if necessary. + */ +static void +AssociatePlacementWithShard(ConnectionPlacementHashEntry *placementEntry, + ShardPlacement *placement) +{ + ConnectionShardHashKey shardKey; + ConnectionShardHashEntry *shardEntry = NULL; + bool found = false; + dlist_iter placementIter; + + shardKey.shardId = placement->shardId; + shardEntry = hash_search(ConnectionShardHash, &shardKey, HASH_ENTER, &found); + if (!found) + { + dlist_init(&shardEntry->placementConnections); + } + + /* + * Check if placement is already associated with shard (happens if there's + * multiple connections for a placement). There'll usually only be few + * placement per shard, so the price of iterating isn't large. + */ + dlist_foreach(placementIter, &shardEntry->placementConnections) + { + ConnectionPlacementHashEntry *placementEntry = + dlist_container(ConnectionPlacementHashEntry, shardNode, placementIter.cur); + + if (placementEntry->key.placementId == placement->placementId) + { + return; + } + } + + /* otherwise add */ + dlist_push_tail(&shardEntry->placementConnections, &placementEntry->shardNode); +} + + +/* + * CloseShardPlacementAssociation handles a connection being closed before + * transaction end. + * + * This should only be called by connection_management.c. + */ +void +CloseShardPlacementAssociation(struct MultiConnection *connection) +{ + dlist_iter placementIter; + + /* set connection to NULL for all references to the connection */ + dlist_foreach(placementIter, &connection->referencedPlacements) + { + ConnectionReference *reference = + dlist_container(ConnectionReference, connectionNode, placementIter.cur); + + reference->connection = NULL; + + /* + * Note that we don't reset ConnectionPlacementHashEntry's + * modifyingConnection here, that'd more complicated than it seems + * worth. That means we'll error out spuriously if a DML/DDL + * executing connection is closed earlier in a transaction. + */ + } +} + + +/* + * ResetShardPlacementAssociation resets the association of connections to + * shard placements at the end of a transaction. + * + * This should only be called by connection_management.c. + */ +void +ResetShardPlacementAssociation(struct MultiConnection *connection) +{ + dlist_init(&connection->referencedPlacements); +} + + +/* + * ResetPlacementConnectionManagement() disassociates connections from + * placements and shards. This will be called at the end of XACT_EVENT_COMMIT + * and XACT_EVENT_ABORT. + */ +void +ResetPlacementConnectionManagement(void) +{ + /* Simply delete all entries */ + hash_delete_all(ConnectionPlacementHash); + hash_delete_all(ConnectionShardHash); + + /* + * NB: memory for ConnectionReference structs and subordinate data is + * deleted by virtue of being allocated in TopTransactionContext. + */ +} + + +/* + * CheckForFailedPlacements checks which placements have to be marked as + * invalid, and/or whether sufficiently many placements have failed to abort + * the entire coordinated transaction. + * + * This will usually be called twice. Once before the remote commit is done, + * and once after. This is so we can abort before executing remote commits, + * and so we can handle remote transactions that failed during commit. + * + * When preCommit or using2PC is true, failures on transactions marked as + * critical will abort the entire coordinated transaction. If not we can't + * roll back, because some remote transactions might have already committed. + */ +void +CheckForFailedPlacements(bool preCommit, bool using2PC) +{ + HASH_SEQ_STATUS status; + ConnectionShardHashEntry *shardEntry = NULL; + int successes = 0; + int attempts = 0; + + hash_seq_init(&status, ConnectionShardHash); + while ((shardEntry = (ConnectionShardHashEntry *) hash_seq_search(&status)) != 0) + { + attempts++; + if (CheckShardPlacements(shardEntry, preCommit, using2PC)) + { + successes++; + } + } + + /* + * If no shards could be modified at all, error out. Doesn't matter if + * we're post-commit - there's nothing to invalidate. + */ + if (attempts > 0 && successes == 0) + { + ereport(ERROR, (errmsg("could not commit transaction on any active node"))); + } +} + + +/* + * CheckShardPlacements is a helper function for CheckForFailedPlacements that + * performs the per-shard work. + */ +static bool +CheckShardPlacements(ConnectionShardHashEntry *shardEntry, + bool preCommit, bool using2PC) +{ + int failures = 0; + int successes = 0; + dlist_iter placementIter; + + dlist_foreach(placementIter, &shardEntry->placementConnections) + { + ConnectionPlacementHashEntry *placementEntry = + dlist_container(ConnectionPlacementHashEntry, shardNode, placementIter.cur); + dlist_iter referenceIter; + + dlist_foreach(referenceIter, &placementEntry->connectionReferences) + { + ConnectionReference *reference = + dlist_container(ConnectionReference, placementNode, referenceIter.cur); + MultiConnection *connection = reference->connection; + + /* + * If neither DDL nor DML were executed, there's no need for + * invalidation. + */ + if (!reference->hadDDL && !reference->hadDML) + { + continue; + } + + /* + * Failed if connection was closed, or remote transaction was + * unsuccessful. + */ + if (!connection || connection->remoteTransaction.transactionFailed) + { + placementEntry->failed = true; + } + } + + if (placementEntry->failed) + { + failures++; + } + else + { + successes++; + } + } + + if (failures > 0 && successes == 0) + { + int elevel = 0; + + /* + * Only error out if we're pre-commit or using 2PC. Can't error + * otherwise as we can end up with a state where some shard + * modifications have already committed successfully. If no + * modifications at all succeed, CheckForFailedPlacements() will error + * out. This sucks. + */ + if (preCommit || using2PC) + { + elevel = ERROR; + } + else + { + elevel = WARNING; + } + + ereport(elevel, + (errmsg("could not commit transaction for shard " INT64_FORMAT + " on any active node", + shardEntry->key.shardId))); + return false; + } + + /* mark all failed placements invalid */ + dlist_foreach(placementIter, &shardEntry->placementConnections) + { + ConnectionPlacementHashEntry *placementEntry = + dlist_container(ConnectionPlacementHashEntry, shardNode, placementIter.cur); + + if (placementEntry->failed) + { + UpdateShardPlacementState(placementEntry->key.placementId, FILE_INACTIVE); + } + } + + return true; +} + + +/* + * InitPlacementConnectionManagement performs initialization of the + * infrastructure in this file at server start. + */ +void +InitPlacementConnectionManagement(void) +{ + HASHCTL info; + uint32 hashFlags = 0; + + /* create (placementId) -> [ConnectionReference] hash */ + memset(&info, 0, sizeof(info)); + info.keysize = sizeof(ConnectionPlacementHashKey); + info.entrysize = sizeof(ConnectionPlacementHashEntry); + info.hash = tag_hash; + info.hcxt = ConnectionContext; + hashFlags = (HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + + ConnectionPlacementHash = hash_create("citus connection cache (placementid)", + 64, &info, hashFlags); + + /* create (shardId) -> [ConnectionShardHashEntry] hash */ + memset(&info, 0, sizeof(info)); + info.keysize = sizeof(ConnectionShardHashKey); + info.entrysize = sizeof(ConnectionShardHashEntry); + info.hash = tag_hash; + info.hcxt = ConnectionContext; + hashFlags = (HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + + ConnectionShardHash = hash_create("citus connection cache (shardid)", + 64, &info, hashFlags); +} diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 2af22d251..bf04d1f81 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -41,6 +41,7 @@ #include "distributed/multi_router_executor.h" #include "distributed/multi_router_planner.h" #include "distributed/multi_shard_transaction.h" +#include "distributed/placement_connection.h" #include "distributed/relay_utility.h" #include "distributed/remote_commands.h" #include "distributed/remote_transaction.h" @@ -72,30 +73,14 @@ /* controls use of locks to enforce safe commutativity */ bool AllModificationsCommutative = false; - -/* - * The following static variables are necessary to track the progression of - * multi-statement transactions managed by the router executor. After the first - * modification within a transaction, the executor populates a hash with the - * transaction's initial participants (nodes hit by that initial modification). - * - * To keep track of the reverse mapping (from shards to nodes), we have a list - * of XactShardConnSets, which map a shard identifier to a set of connection - * hash entries. This list is walked by MarkRemainingInactivePlacements to - * ensure we mark placements as failed if they reject a COMMIT. - */ -static HTAB *xactParticipantHash = NULL; -static List *xactShardConnSetList = NIL; - -/* functions needed during start phase */ -static void InitTransactionStateForTask(Task *task); -static HTAB * CreateXactParticipantHash(void); - /* functions needed during run phase */ static void ReacquireMetadataLocks(List *taskList); -static bool ExecuteSingleTask(QueryDesc *queryDesc, Task *task, - bool isModificationQuery, bool expectResults); -static void GetPlacementConnectionsReadyForTwoPhaseCommit(List *taskPlacementList); +static void ExecuteSingleModifyTask(QueryDesc *queryDesc, Task *task, + bool expectResults); +static void ExecuteSingleSelectTask(QueryDesc *queryDesc, Task *task); +static List * GetModifyConnections(List *taskPlacementList, + bool markCritical, + bool startedInTransaction); static void ExecuteMultipleTasks(QueryDesc *queryDesc, List *taskList, bool isModificationQuery, bool expectResults); static int64 ExecuteModifyTasks(List *taskList, bool expectResults, @@ -109,23 +94,15 @@ static bool RequiresConsistentSnapshot(Task *task); static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor, DestReceiver *destination, Tuplestorestate *tupleStore); -static PGconn * GetConnectionForPlacement(ShardPlacement *placement, - bool isModificationQuery); -static void PurgeConnectionForPlacement(PGconn *connection, ShardPlacement *placement); -static void RemoveXactConnection(PGconn *connection); static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, Oid **parameterTypes, const char ***parameterValues); -static bool SendQueryInSingleRowMode(PGconn *connection, char *query, +static bool SendQueryInSingleRowMode(MultiConnection *connection, char *query, ParamListInfo paramListInfo); -static bool StoreQueryResult(MaterialState *routerState, PGconn *connection, +static bool StoreQueryResult(MaterialState *routerState, MultiConnection *connection, TupleDesc tupleDescriptor, bool failOnError, int64 *rows); -static bool ConsumeQueryResult(PGconn *connection, bool failOnError, int64 *rows); -static void RecordShardIdParticipant(uint64 affectedShardId, - NodeConnectionEntry *participantEntry); - -/* to verify the health of shards after a transactional modification command */ -static void MarkRemainingInactivePlacements(void); +static bool ConsumeQueryResult(MultiConnection *connection, bool failOnError, + int64 *rows); /* @@ -232,52 +209,6 @@ ReacquireMetadataLocks(List *taskList) } -/* - * InitTransactionStateForTask is called during executor start with the first - * modifying (INSERT/UPDATE/DELETE) task during a transaction. It creates the - * transaction participant hash, opens connections to this task's nodes, and - * populates the hash with those connections after sending BEGIN commands to - * each. If a node fails to respond, its connection is set to NULL to prevent - * further interaction with it during the transaction. - */ -static void -InitTransactionStateForTask(Task *task) -{ - ListCell *placementCell = NULL; - - BeginOrContinueCoordinatedTransaction(); - - xactParticipantHash = CreateXactParticipantHash(); - - foreach(placementCell, task->taskPlacementList) - { - ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell); - NodeConnectionKey participantKey; - NodeConnectionEntry *participantEntry = NULL; - bool entryFound = false; - int connectionFlags = SESSION_LIFESPAN; - MultiConnection *connection = - GetNodeConnection(connectionFlags, placement->nodeName, placement->nodePort); - - MemSet(&participantKey, 0, sizeof(participantKey)); - strlcpy(participantKey.nodeName, placement->nodeName, - MAX_NODE_LENGTH + 1); - participantKey.nodePort = placement->nodePort; - - participantEntry = hash_search(xactParticipantHash, &participantKey, - HASH_ENTER, &entryFound); - Assert(!entryFound); - - /* issue BEGIN if necessary */ - RemoteTransactionBeginIfNecessary(connection); - - participantEntry->connection = connection; - } - - XactModificationLevel = XACT_MODIFICATION_DATA; -} - - /* * AcquireExecutorShardLock acquires a lock on the shard for the given task and * command type if necessary to avoid divergence between multiple replicas of @@ -527,33 +458,6 @@ RequiresConsistentSnapshot(Task *task) } -/* - * CreateXactParticipantHash initializes the map used to store the connections - * needed to process distributed transactions. Unlike the connection cache, we - * permit NULL connections here to signify that a participant has seen an error - * and is no longer receiving commands during a transaction. This hash should - * be walked at transaction end to send final COMMIT or ABORT commands. - */ -static HTAB * -CreateXactParticipantHash(void) -{ - HTAB *xactParticipantHash = NULL; - HASHCTL info; - int hashFlags = 0; - - MemSet(&info, 0, sizeof(info)); - info.keysize = sizeof(NodeConnectionKey); - info.entrysize = sizeof(NodeConnectionEntry); - info.hcxt = TopTransactionContext; - hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_BLOBS); - - xactParticipantHash = hash_create("citus xact participant hash", 32, &info, - hashFlags); - - return xactParticipantHash; -} - - /* * RouterExecutorRun actually executes a single task on a worker. */ @@ -633,13 +537,14 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count) if (list_length(taskList) == 1) { Task *task = (Task *) linitial(taskList); - bool resultsOK = false; - resultsOK = ExecuteSingleTask(queryDesc, task, isModificationQuery, - sendTuples); - if (!resultsOK) + if (isModificationQuery) { - ereport(ERROR, (errmsg("could not receive query results"))); + ExecuteSingleModifyTask(queryDesc, task, sendTuples); + } + else + { + ExecuteSingleSelectTask(queryDesc, task); } } else @@ -692,19 +597,73 @@ out: /* - * ExecuteSingleTask executes the task on the remote node, retrieves the - * results and stores them, if SELECT or RETURNING is used, in a tuple - * store. + * ExecuteSingleSelectTask executes the task on the remote node, retrieves the + * results and stores them in a tuple store. * * If the task fails on one of the placements, the function retries it on - * other placements (SELECT), reraises the remote error (constraint violation - * in DML), marks the affected placement as invalid (DML on some placement - * failed), or errors out (DML failed on all placements). + * other placements or errors out if the query fails on all placements. */ -static bool -ExecuteSingleTask(QueryDesc *queryDesc, Task *task, - bool isModificationQuery, - bool expectResults) +static void +ExecuteSingleSelectTask(QueryDesc *queryDesc, Task *task) +{ + TupleDesc tupleDescriptor = queryDesc->tupDesc; + MaterialState *routerState = (MaterialState *) queryDesc->planstate; + ParamListInfo paramListInfo = queryDesc->params; + List *taskPlacementList = task->taskPlacementList; + ListCell *taskPlacementCell = NULL; + char *queryString = task->queryString; + + if (XactModificationLevel == XACT_MODIFICATION_MULTI_SHARD) + { + ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("single-shard query may not appear in transaction blocks " + "which contain multi-shard data modifications"))); + } + + /* + * Try to run the query to completion on one placement. If the query fails + * attempt the query on the next placement. + */ + foreach(taskPlacementCell, taskPlacementList) + { + ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell); + bool queryOK = false; + bool dontFailOnError = false; + int64 currentAffectedTupleCount = 0; + int connectionFlags = SESSION_LIFESPAN; + MultiConnection *connection = + GetPlacementConnection(connectionFlags, taskPlacement, NULL); + + queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo); + if (!queryOK) + { + continue; + } + + queryOK = StoreQueryResult(routerState, connection, tupleDescriptor, + dontFailOnError, ¤tAffectedTupleCount); + if (queryOK) + { + return; + } + } + + ereport(ERROR, (errmsg("could not receive query results"))); +} + + +/* + * ExecuteSingleModifyTask executes the task on the remote node, retrieves the + * results and stores them, if RETURNING is used, in a tuple store. + * + * If the task fails on one of the placements, the function reraises the + * remote error (constraint violation in DML), marks the affected placement as + * invalid (other error on some placements, via the placement connection + * framework), or errors out (failed on all placements). + */ +static void +ExecuteSingleModifyTask(QueryDesc *queryDesc, Task *task, + bool expectResults) { CmdType operation = queryDesc->operation; TupleDesc tupleDescriptor = queryDesc->tupDesc; @@ -713,12 +672,15 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task, ParamListInfo paramListInfo = queryDesc->params; bool resultsOK = false; List *taskPlacementList = task->taskPlacementList; + List *connectionList = NIL; ListCell *taskPlacementCell = NULL; - List *failedPlacementList = NIL; + ListCell *connectionCell = NULL; int64 affectedTupleCount = -1; bool gotResults = false; char *queryString = task->queryString; bool taskRequiresTwoPhaseCommit = (task->replicationModel == REPLICATION_MODEL_2PC); + bool startedInTransaction = + InCoordinatedTransaction() && XactModificationLevel == XACT_MODIFICATION_DATA; if (XactModificationLevel == XACT_MODIFICATION_MULTI_SHARD) { @@ -729,59 +691,52 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task, } /* - * Firstly ensure that distributed transaction is started. Then, force - * the transaction manager to use 2PC while running the task on the placements. + * Modifications for reference tables are always done using 2PC. First + * ensure that distributed transaction is started. Then force the + * transaction manager to use 2PC while running the task on the + * placements. */ if (taskRequiresTwoPhaseCommit) { BeginOrContinueCoordinatedTransaction(); CoordinatedTransactionUse2PC(); - - /* - * Mark connections for all placements as critical and establish connections - * to all placements at once. - */ - GetPlacementConnectionsReadyForTwoPhaseCommit(taskPlacementList); } /* - * We could naturally handle function-based transactions (i.e. those - * using PL/pgSQL or similar) by checking the type of queryDesc->dest, - * but some customers already use functions that touch multiple shards - * from within a function, so we'll ignore functions for now. + * We could naturally handle function-based transactions (i.e. those using + * PL/pgSQL or similar) by checking the type of queryDesc->dest, but some + * customers already use functions that touch multiple shards from within + * a function, so we'll ignore functions for now. */ - if (operation != CMD_SELECT && xactParticipantHash == NULL && IsTransactionBlock()) + if (IsTransactionBlock()) { - InitTransactionStateForTask(task); + BeginOrContinueCoordinatedTransaction(); } + /* + * Get connections required to execute task. This will, if necessary, + * establish the connection, mark as critical (when modifying reference + * table) and start a transaction (when in a transaction). + */ + connectionList = GetModifyConnections(taskPlacementList, + taskRequiresTwoPhaseCommit, + startedInTransaction); + /* prevent replicas of the same shard from diverging */ AcquireExecutorShardLock(task, operation); - /* - * Try to run the query to completion on one placement. If the query fails - * attempt the query on the next placement. - */ - foreach(taskPlacementCell, taskPlacementList) + /* try to execute modification on all placements */ + forboth(taskPlacementCell, taskPlacementList, connectionCell, connectionList) { ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell); + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); bool queryOK = false; bool failOnError = false; int64 currentAffectedTupleCount = 0; - PGconn *connection = GetConnectionForPlacement(taskPlacement, - isModificationQuery); - - if (connection == NULL) - { - failedPlacementList = lappend(failedPlacementList, taskPlacement); - continue; - } queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo); if (!queryOK) { - PurgeConnectionForPlacement(connection, taskPlacement); - failedPlacementList = lappend(failedPlacementList, taskPlacement); continue; } @@ -829,87 +784,88 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task, resultsOK = true; gotResults = true; - - /* - * Modifications have to be executed on all placements, but for - * read queries we can stop here. - */ - if (!isModificationQuery) - { - break; - } - } - else - { - PurgeConnectionForPlacement(connection, taskPlacement); - - failedPlacementList = lappend(failedPlacementList, taskPlacement); - - continue; } } - if (isModificationQuery) + /* if all placements failed, error out */ + if (!resultsOK) { - ListCell *failedPlacementCell = NULL; - - /* if all placements failed, error out */ - if (list_length(failedPlacementList) == list_length(task->taskPlacementList)) - { - ereport(ERROR, (errmsg("could not modify any active placements"))); - } - - /* - * Otherwise, mark failed placements as inactive: they're stale. Note that - * connections for tasks that require 2PC has already failed the whole transaction - * and there is no way that they're marked stale here. - */ - foreach(failedPlacementCell, failedPlacementList) - { - ShardPlacement *failedPlacement = - (ShardPlacement *) lfirst(failedPlacementCell); - - Assert(!taskRequiresTwoPhaseCommit); - - UpdateShardPlacementState(failedPlacement->placementId, FILE_INACTIVE); - } - - executorState->es_processed = affectedTupleCount; + ereport(ERROR, (errmsg("could not modify any active placements"))); } - return resultsOK; + executorState->es_processed = affectedTupleCount; + + if (IsTransactionBlock()) + { + XactModificationLevel = XACT_MODIFICATION_DATA; + } } /* - * GetPlacementConnectionsReadyForTwoPhaseCommit iterates over the task placement list, - * starts the connections to the nodes and marks them critical. In the second iteration, - * the connection establishments are finished. Finally, BEGIN commands are sent, - * if necessary. + * GetModifyConnections returns the list of connections required to execute + * modify commands on the placements in tasPlacementList. If necessary remote + * transactions are started. + * + * If markCritical is true remote transactions are marked as critical. If + * noNewTransactions is true, this function errors out if there's no + * transaction in progress. */ -static void -GetPlacementConnectionsReadyForTwoPhaseCommit(List *taskPlacementList) +static List * +GetModifyConnections(List *taskPlacementList, bool markCritical, bool noNewTransactions) { ListCell *taskPlacementCell = NULL; List *multiConnectionList = NIL; - /* in the first iteration start the connections */ + /* first initiate connection establishment for all necessary connections */ foreach(taskPlacementCell, taskPlacementList) { ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell); - int connectionFlags = SESSION_LIFESPAN; - MultiConnection *multiConnection = StartNodeConnection(connectionFlags, - taskPlacement->nodeName, - taskPlacement->nodePort); + int connectionFlags = SESSION_LIFESPAN | FOR_DML; + MultiConnection *multiConnection = NULL; - MarkRemoteTransactionCritical(multiConnection); + /* + * FIXME: It's not actually correct to use only one shard placement + * here for router queries involving multiple relations. We should + * check that this connection is the only modifying one associated + * with all the involved shards. + */ + multiConnection = StartPlacementConnection(connectionFlags, taskPlacement, NULL); + + /* + * If already in a transaction, disallow expanding set of remote + * transactions. That prevents some forms of distributed deadlocks. + */ + if (noNewTransactions) + { + RemoteTransaction *transaction = &multiConnection->remoteTransaction; + + if (transaction->transactionState == REMOTE_TRANS_INVALID) + { + ereport(ERROR, (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), + errmsg("no transaction participant matches %s:%d", + taskPlacement->nodeName, taskPlacement->nodePort), + errdetail("Transactions which modify distributed tables " + "may only target nodes affected by the " + "modification command which began the transaction."))); + } + } + + if (markCritical) + { + MarkRemoteTransactionCritical(multiConnection); + } multiConnectionList = lappend(multiConnectionList, multiConnection); } + /* then finish in parallel */ FinishConnectionListEstablishment(multiConnectionList); + /* and start transactions if applicable */ RemoteTransactionsBeginIfNecessary(multiConnectionList); + + return multiConnectionList; } @@ -1009,8 +965,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn bool shardConnectionsFound = false; ShardConnections *shardConnections = NULL; List *connectionList = NIL; - MultiConnection *multiConnection = NULL; - PGconn *connection = NULL; + MultiConnection *connection = NULL; bool queryOK = false; shardConnections = GetShardConnections(shardId, &shardConnectionsFound); @@ -1022,14 +977,13 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn continue; } - multiConnection = + connection = (MultiConnection *) list_nth(connectionList, placementIndex); - connection = multiConnection->pgConn; queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo); if (!queryOK) { - ReraiseRemoteError(connection, NULL); + ReportConnectionError(connection, ERROR); } } @@ -1041,8 +995,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn bool shardConnectionsFound = false; ShardConnections *shardConnections = NULL; List *connectionList = NIL; - MultiConnection *multiConnection = NULL; - PGconn *connection = NULL; + MultiConnection *connection = NULL; int64 currentAffectedTupleCount = 0; bool failOnError = true; bool queryOK PG_USED_FOR_ASSERTS_ONLY = false; @@ -1060,9 +1013,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn continue; } - multiConnection = - (MultiConnection *) list_nth(connectionList, placementIndex); - connection = multiConnection->pgConn; + connection = (MultiConnection *) list_nth(connectionList, placementIndex); /* * If caller is interested, store query results the first time @@ -1101,16 +1052,13 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn if (currentAffectedTupleCount != previousAffectedTupleCount) { - char *nodeName = ConnectionGetOptionValue(connection, "host"); - char *nodePort = ConnectionGetOptionValue(connection, "port"); - ereport(WARNING, (errmsg("modified "INT64_FORMAT " tuples of shard " UINT64_FORMAT ", but expected to modify "INT64_FORMAT, currentAffectedTupleCount, shardId, previousAffectedTupleCount), - errdetail("modified placement on %s:%s", nodeName, - nodePort))); + errdetail("modified placement on %s:%d", + connection->hostname, connection->port))); } } @@ -1199,143 +1147,14 @@ ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor, } -/* - * GetConnectionForPlacement is the main entry point for acquiring a connection - * within the router executor. By using placements (rather than node names and - * ports) to identify connections, the router executor can keep track of shards - * used by multi-statement transactions and error out if a transaction tries - * to reach a new node altogether). In the single-statement commands context, - * GetConnectionForPlacement simply falls through to GetOrEstablishConnection. - */ -static PGconn * -GetConnectionForPlacement(ShardPlacement *placement, bool isModificationQuery) -{ - NodeConnectionKey participantKey; - NodeConnectionEntry *participantEntry = NULL; - bool entryFound = false; - - /* if not in a transaction, fall through to connection cache */ - if (xactParticipantHash == NULL) - { - PGconn *connection = GetOrEstablishConnection(placement->nodeName, - placement->nodePort); - - return connection; - } - - Assert(IsTransactionBlock()); - - MemSet(&participantKey, 0, sizeof(participantKey)); - strlcpy(participantKey.nodeName, placement->nodeName, MAX_NODE_LENGTH + 1); - participantKey.nodePort = placement->nodePort; - - participantEntry = hash_search(xactParticipantHash, &participantKey, HASH_FIND, - &entryFound); - - if (entryFound) - { - if (isModificationQuery) - { - RecordShardIdParticipant(placement->shardId, participantEntry); - } - - return participantEntry->connection->pgConn; - } - else - { - ereport(ERROR, (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), - errmsg("no transaction participant matches %s:%d", - placement->nodeName, placement->nodePort), - errdetail("Transactions which modify distributed tables may only " - "target nodes affected by the modification command " - "which began the transaction."))); - } -} - - -/* - * PurgeConnectionForPlacement provides a way to purge an invalid connection - * from all relevant connection hashes using the placement involved in the - * query at the time of the error. If a transaction is ongoing, this function - * ensures the right node's connection is set to NULL in the participant map - * for the transaction in addition to purging the connection cache's entry. - */ -static void -PurgeConnectionForPlacement(PGconn *connection, ShardPlacement *placement) -{ - CloseConnectionByPGconn(connection); - - /* - * The following is logically identical to RemoveXactConnection, but since - * we have a ShardPlacement to help build a NodeConnectionKey, we avoid - * any penalty incurred by calling BuildKeyForConnection, which must ex- - * tract host, port, and user from the connection options list. - */ - if (xactParticipantHash != NULL) - { - NodeConnectionEntry *participantEntry = NULL; - bool entryFound = false; - NodeConnectionKey nodeKey; - char *currentUser = CurrentUserName(); - - MemSet(&nodeKey, 0, sizeof(NodeConnectionKey)); - strlcpy(nodeKey.nodeName, placement->nodeName, MAX_NODE_LENGTH + 1); - nodeKey.nodePort = placement->nodePort; - strlcpy(nodeKey.nodeUser, currentUser, NAMEDATALEN); - Assert(IsTransactionBlock()); - - /* the participant hash doesn't use the user field */ - MemSet(&nodeKey.nodeUser, 0, sizeof(nodeKey.nodeUser)); - participantEntry = hash_search(xactParticipantHash, &nodeKey, HASH_FIND, - &entryFound); - - Assert(entryFound); - - participantEntry->connection = NULL; - } -} - - -/* - * Removes a given connection from the transaction participant hash, based on - * the host and port of the provided connection. If the hash is not NULL, it - * MUST contain the provided connection, or a FATAL error is raised. - */ -static void -RemoveXactConnection(PGconn *connection) -{ - NodeConnectionKey nodeKey; - NodeConnectionEntry *participantEntry = NULL; - bool entryFound = false; - - if (xactParticipantHash == NULL) - { - return; - } - - BuildKeyForConnection(connection, &nodeKey); - - /* the participant hash doesn't use the user field */ - MemSet(&nodeKey.nodeUser, 0, sizeof(nodeKey.nodeUser)); - participantEntry = hash_search(xactParticipantHash, &nodeKey, HASH_FIND, - &entryFound); - - if (!entryFound) - { - ereport(FATAL, (errmsg("could not find specified transaction connection"))); - } - - participantEntry->connection = NULL; -} - - /* * SendQueryInSingleRowMode sends the given query on the connection in an * asynchronous way. The function also sets the single-row mode on the * connection so that we receive results a row at a time. */ static bool -SendQueryInSingleRowMode(PGconn *connection, char *query, ParamListInfo paramListInfo) +SendQueryInSingleRowMode(MultiConnection *connection, char *query, + ParamListInfo paramListInfo) { int querySent = 0; int singleRowMode = 0; @@ -1349,24 +1168,27 @@ SendQueryInSingleRowMode(PGconn *connection, char *query, ParamListInfo paramLis ExtractParametersFromParamListInfo(paramListInfo, ¶meterTypes, ¶meterValues); - querySent = PQsendQueryParams(connection, query, parameterCount, parameterTypes, - parameterValues, NULL, NULL, 0); + querySent = PQsendQueryParams(connection->pgConn, query, + parameterCount, parameterTypes, parameterValues, + NULL, NULL, 0); } else { - querySent = PQsendQuery(connection, query); + querySent = PQsendQuery(connection->pgConn, query); } if (querySent == 0) { - WarnRemoteError(connection, NULL); + MarkRemoteTransactionFailed(connection, false); + ReportConnectionError(connection, WARNING); return false; } - singleRowMode = PQsetSingleRowMode(connection); + singleRowMode = PQsetSingleRowMode(connection->pgConn); if (singleRowMode == 0) { - WarnRemoteError(connection, NULL); + MarkRemoteTransactionFailed(connection, false); + ReportConnectionError(connection, WARNING); return false; } @@ -1451,7 +1273,7 @@ ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, Oid **parameterT * the connection. */ static bool -StoreQueryResult(MaterialState *routerState, PGconn *connection, +StoreQueryResult(MaterialState *routerState, MultiConnection *connection, TupleDesc tupleDescriptor, bool failOnError, int64 *rows) { AttInMetadata *attributeInputMetadata = TupleDescGetAttInMetadata(tupleDescriptor); @@ -1486,7 +1308,7 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection, uint32 columnCount = 0; ExecStatusType resultStatus = 0; - PGresult *result = PQgetResult(connection); + PGresult *result = PQgetResult(connection->pgConn); if (result == NULL) { break; @@ -1499,6 +1321,8 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection, int category = 0; bool isConstraintViolation = false; + MarkRemoteTransactionFailed(connection, false); + /* * If the error code is in constraint violation class, we want to * fail fast because we must get the same error from all shard @@ -1509,12 +1333,11 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection, if (isConstraintViolation || failOnError) { - RemoveXactConnection(connection); - ReraiseRemoteError(connection, result); + ReportResultError(connection, result, ERROR); } else { - WarnRemoteError(connection, result); + ReportResultError(connection, result, WARNING); } PQclear(result); @@ -1579,7 +1402,7 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection, * has been an error. */ static bool -ConsumeQueryResult(PGconn *connection, bool failOnError, int64 *rows) +ConsumeQueryResult(MultiConnection *connection, bool failOnError, int64 *rows) { bool commandFailed = false; bool gotResponse = false; @@ -1593,7 +1416,7 @@ ConsumeQueryResult(PGconn *connection, bool failOnError, int64 *rows) */ while (true) { - PGresult *result = PQgetResult(connection); + PGresult *result = PQgetResult(connection->pgConn); ExecStatusType status = PGRES_COMMAND_OK; if (result == NULL) @@ -1611,6 +1434,8 @@ ConsumeQueryResult(PGconn *connection, bool failOnError, int64 *rows) int category = 0; bool isConstraintViolation = false; + MarkRemoteTransactionFailed(connection, false); + /* * If the error code is in constraint violation class, we want to * fail fast because we must get the same error from all shard @@ -1621,13 +1446,13 @@ ConsumeQueryResult(PGconn *connection, bool failOnError, int64 *rows) if (isConstraintViolation || failOnError) { - RemoveXactConnection(connection); - ReraiseRemoteError(connection, result); + ReportResultError(connection, result, ERROR); } else { - WarnRemoteError(connection, result); + ReportResultError(connection, result, WARNING); } + PQclear(result); commandFailed = true; @@ -1667,50 +1492,6 @@ ConsumeQueryResult(PGconn *connection, bool failOnError, int64 *rows) } -/* - * RecordShardIdParticipant registers a connection as being involved with a - * particular shard during a multi-statement transaction. - */ -static void -RecordShardIdParticipant(uint64 affectedShardId, NodeConnectionEntry *participantEntry) -{ - XactShardConnSet *shardConnSetMatch = NULL; - ListCell *listCell = NULL; - MemoryContext oldContext = NULL; - List *connectionEntryList = NIL; - - /* check whether an entry already exists for this shard */ - foreach(listCell, xactShardConnSetList) - { - XactShardConnSet *shardConnSet = (XactShardConnSet *) lfirst(listCell); - - if (shardConnSet->shardId == affectedShardId) - { - shardConnSetMatch = shardConnSet; - } - } - - /* entries must last through the whole top-level transaction */ - oldContext = MemoryContextSwitchTo(TopTransactionContext); - - /* if no entry found, make one */ - if (shardConnSetMatch == NULL) - { - shardConnSetMatch = (XactShardConnSet *) palloc0(sizeof(XactShardConnSet)); - shardConnSetMatch->shardId = affectedShardId; - - xactShardConnSetList = lappend(xactShardConnSetList, shardConnSetMatch); - } - - /* add connection, avoiding duplicates */ - connectionEntryList = shardConnSetMatch->connectionEntryList; - shardConnSetMatch->connectionEntryList = list_append_unique_ptr(connectionEntryList, - participantEntry); - - MemoryContextSwitchTo(oldContext); -} - - /* * RouterExecutorFinish cleans up after a distributed execution. */ @@ -1745,114 +1526,3 @@ RouterExecutorEnd(QueryDesc *queryDesc) queryDesc->estate = NULL; queryDesc->totaltime = NULL; } - - -/* - * RouterExecutorPreCommitCheck() gets called after remote transactions have - * committed, so it can invalidate failed shards and perform related checks. - */ -void -RouterExecutorPreCommitCheck(void) -{ - /* no transactional router modification were issued, nothing to do */ - if (xactParticipantHash == NULL) - { - return; - } - - MarkRemainingInactivePlacements(); -} - - -/* - * Cleanup callback called after a transaction commits or aborts. - */ -void -RouterExecutorPostCommit(void) -{ - /* reset transaction state */ - xactParticipantHash = NULL; - xactShardConnSetList = NIL; -} - - -/* - * MarkRemainingInactivePlacements takes care of marking placements of a shard - * inactive after some of the placements rejected the final COMMIT phase of a - * transaction. - * - * Failures are detected by checking the connection & transaction state for - * each of the entries in the connection set for each shard. - */ -static void -MarkRemainingInactivePlacements(void) -{ - ListCell *shardConnSetCell = NULL; - int totalSuccesses = 0; - - if (xactParticipantHash == NULL) - { - return; - } - - foreach(shardConnSetCell, xactShardConnSetList) - { - XactShardConnSet *shardConnSet = (XactShardConnSet *) lfirst(shardConnSetCell); - List *participantList = shardConnSet->connectionEntryList; - ListCell *participantCell = NULL; - int successes = list_length(participantList); /* assume full success */ - - /* determine how many actual successes there were: subtract failures */ - foreach(participantCell, participantList) - { - NodeConnectionEntry *participant = - (NodeConnectionEntry *) lfirst(participantCell); - MultiConnection *connection = participant->connection; - - /* - * Fail if the connection has been set to NULL after an error, or - * if the transaction failed for other reasons (e.g. COMMIT - * failed). - */ - if (connection == NULL || connection->remoteTransaction.transactionFailed) - { - successes--; - } - } - - /* if no nodes succeeded for this shard, don't do anything */ - if (successes == 0) - { - continue; - } - - /* otherwise, ensure failed placements are marked inactive */ - foreach(participantCell, participantList) - { - NodeConnectionEntry *participant = NULL; - participant = (NodeConnectionEntry *) lfirst(participantCell); - - if (participant->connection == NULL || - participant->connection->remoteTransaction.transactionFailed) - { - uint64 shardId = shardConnSet->shardId; - NodeConnectionKey *nodeKey = &participant->cacheKey; - uint64 shardLength = 0; - uint64 placementId = INVALID_PLACEMENT_ID; - - placementId = DeleteShardPlacementRow(shardId, nodeKey->nodeName, - nodeKey->nodePort); - InsertShardPlacementRow(shardId, placementId, FILE_INACTIVE, shardLength, - nodeKey->nodeName, nodeKey->nodePort); - } - } - - totalSuccesses++; - } - - /* If no shards could be modified at all, error out. */ - if (totalSuccesses == 0) - { - ereport(ERROR, (errmsg("could not commit transaction on any active nodes"))); - } -} diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index bc176a110..509ab0381 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -33,6 +33,7 @@ #include "distributed/multi_router_planner.h" #include "distributed/multi_server_executor.h" #include "distributed/multi_utility.h" +#include "distributed/placement_connection.h" #include "distributed/remote_commands.h" #include "distributed/task_tracker.h" #include "distributed/transaction_management.h" @@ -160,6 +161,7 @@ _PG_init(void) /* initialize coordinated transaction management */ InitializeTransactionManagement(); InitializeConnectionManagement(); + InitPlacementConnectionManagement(); /* enable modification of pg_catalog tables during pg_upgrade */ if (IsBinaryUpgrade) diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 7fe79ce75..40a0e4651 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -21,9 +21,9 @@ #include "access/xact.h" #include "distributed/connection_management.h" #include "distributed/hash_helpers.h" -#include "distributed/multi_router_executor.h" #include "distributed/multi_shard_transaction.h" #include "distributed/transaction_management.h" +#include "distributed/placement_connection.h" #include "utils/hsearch.h" #include "utils/guc.h" @@ -149,7 +149,6 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) * callbacks still can perform work if needed. */ ResetShardPlacementTransactionState(); - RouterExecutorPostCommit(); if (CurrentCoordinatedTransactionState == COORD_TRANS_PREPARED) { @@ -160,6 +159,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) /* close connections etc. */ if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE) { + ResetPlacementConnectionManagement(); AfterXactConnectionHandling(true); } @@ -185,7 +185,6 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) * callbacks still can perform work if needed. */ ResetShardPlacementTransactionState(); - RouterExecutorPostCommit(); /* handles both already prepared and open transactions */ if (CurrentCoordinatedTransactionState > COORD_TRANS_IDLE) @@ -196,6 +195,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) /* close connections etc. */ if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE) { + ResetPlacementConnectionManagement(); AfterXactConnectionHandling(false); } @@ -233,6 +233,21 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) break; } + /* + * TODO: It'd probably be a good idea to force constraints and + * such to 'immediate' here. Deferred triggers might try to send + * stuff to the remote side, which'd not be good. Doing so + * remotely would also catch a class of errors where committing + * fails, which can lead to divergence when not using 2PC. + */ + + /* + * Check whether the coordinated transaction is in a state we want + * to persist, or whether we want to error out. This handles the + * case that iteratively executed commands marked all placements + * as invalid. + */ + CheckForFailedPlacements(true, CurrentTransactionUse2PC); if (CurrentTransactionUse2PC) { @@ -251,12 +266,11 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) } /* - * Call other parts of citus that need to integrate into - * transaction management. Call them *after* committing/preparing - * the remote transactions, to allow marking shards as invalid - * (e.g. if the remote commit failed). + * + * Check again whether shards/placement successfully + * committed. This handles failure at COMMIT/PREPARE time. */ - RouterExecutorPreCommitCheck(); + CheckForFailedPlacements(false, CurrentTransactionUse2PC); } break; diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 981ca8cb9..c3967365d 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -33,7 +33,11 @@ enum MultiConnectionMode FORCE_NEW_CONNECTION = 1 << 0, /* mark returned connection as having session lifespan */ - SESSION_LIFESPAN = 1 << 1 + SESSION_LIFESPAN = 1 << 1, + + FOR_DDL = 1 << 2, + + FOR_DML = 1 << 3 }; @@ -68,6 +72,9 @@ typedef struct MultiConnection /* membership in list of in-progress transactions */ dlist_node transactionNode; + + /* list of all placements referenced by this connection */ + dlist_head referencedPlacements; } MultiConnection; diff --git a/src/include/distributed/multi_router_executor.h b/src/include/distributed/multi_router_executor.h index f584abdc8..bb16f5e15 100644 --- a/src/include/distributed/multi_router_executor.h +++ b/src/include/distributed/multi_router_executor.h @@ -37,8 +37,6 @@ extern void RouterExecutorStart(QueryDesc *queryDesc, int eflags, List *taskList extern void RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count); extern void RouterExecutorFinish(QueryDesc *queryDesc); extern void RouterExecutorEnd(QueryDesc *queryDesc); -extern void RouterExecutorPreCommitCheck(void); -extern void RouterExecutorPostCommit(void); extern int64 ExecuteModifyTasksWithoutResults(List *taskList); diff --git a/src/include/distributed/placement_connection.h b/src/include/distributed/placement_connection.h new file mode 100644 index 000000000..c76d838aa --- /dev/null +++ b/src/include/distributed/placement_connection.h @@ -0,0 +1,33 @@ +/*------------------------------------------------------------------------- + * placement_connection.h + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef PLACEMENT_CONNECTION_H +#define PLACEMENT_CONNECTION_H + + +#include "distributed/connection_management.h" + +/* forward declare, to avoid dependency on ShardPlacement definition */ +struct ShardPlacement; + +extern MultiConnection * GetPlacementConnection(uint32 flags, + struct ShardPlacement *placement, + const char *userName); +extern MultiConnection * StartPlacementConnection(uint32 flags, + struct ShardPlacement *placement, + const char *userName); + +extern void ResetPlacementConnectionManagement(void); +extern void CheckForFailedPlacements(bool preCommit, bool using2PC); + +extern void CloseShardPlacementAssociation(struct MultiConnection *connection); +extern void ResetShardPlacementAssociation(struct MultiConnection *connection); + +extern void InitPlacementConnectionManagement(void); + +#endif /* PLACEMENT_CONNECTION_H */ diff --git a/src/test/regress/expected/multi_modifying_xacts.out b/src/test/regress/expected/multi_modifying_xacts.out index 6dffd0583..aecaf0191 100644 --- a/src/test/regress/expected/multi_modifying_xacts.out +++ b/src/test/regress/expected/multi_modifying_xacts.out @@ -140,8 +140,8 @@ INSERT INTO researchers VALUES (9, 6, 'Leslie Lamport'); ERROR: no transaction participant matches localhost:57638 DETAIL: Transactions which modify distributed tables may only target nodes affected by the modification command which began the transaction. COMMIT; --- this logic even applies to router SELECTs occurring after a modification: --- selecting from the modified node is fine... +-- SELECTs may occur after a modification: First check that selecting +-- from the modified node works. BEGIN; INSERT INTO labs VALUES (6, 'Bell Labs'); SELECT count(*) FROM researchers WHERE lab_id = 6; @@ -151,7 +151,7 @@ SELECT count(*) FROM researchers WHERE lab_id = 6; (1 row) ABORT; --- but if a SELECT needs to go to new node, that's a problem... +-- then check that SELECT going to new node still is fine BEGIN; UPDATE pg_dist_shard_placement AS sp SET shardstate = 3 FROM pg_dist_shard AS s @@ -161,8 +161,11 @@ AND sp.nodeport = :worker_1_port AND s.logicalrelid = 'researchers'::regclass; INSERT INTO labs VALUES (6, 'Bell Labs'); SELECT count(*) FROM researchers WHERE lab_id = 6; -ERROR: no transaction participant matches localhost:57638 -DETAIL: Transactions which modify distributed tables may only target nodes affected by the modification command which began the transaction. + count +------- + 0 +(1 row) + ABORT; -- applies to DDL, too BEGIN; @@ -494,7 +497,9 @@ WARNING: illegal value WARNING: failed to commit transaction on localhost:57637 WARNING: illegal value WARNING: failed to commit transaction on localhost:57638 -ERROR: could not commit transaction on any active nodes +WARNING: could not commit transaction for shard 1200002 on any active node +WARNING: could not commit transaction for shard 1200003 on any active node +ERROR: could not commit transaction on any active node -- data should NOT be persisted SELECT * FROM objects WHERE id = 1; id | name @@ -530,6 +535,7 @@ INSERT INTO labs VALUES (9, 'BAD'); COMMIT; WARNING: illegal value WARNING: failed to commit transaction on localhost:57637 +WARNING: could not commit transaction for shard 1200002 on any active node \set VERBOSITY default -- data to objects should be persisted, but labs should not... SELECT * FROM objects WHERE id = 1; diff --git a/src/test/regress/sql/multi_modifying_xacts.sql b/src/test/regress/sql/multi_modifying_xacts.sql index cc10b385c..8a2497e57 100644 --- a/src/test/regress/sql/multi_modifying_xacts.sql +++ b/src/test/regress/sql/multi_modifying_xacts.sql @@ -111,15 +111,14 @@ INSERT INTO labs VALUES (6, 'Bell Labs'); INSERT INTO researchers VALUES (9, 6, 'Leslie Lamport'); COMMIT; --- this logic even applies to router SELECTs occurring after a modification: --- selecting from the modified node is fine... +-- SELECTs may occur after a modification: First check that selecting +-- from the modified node works. BEGIN; INSERT INTO labs VALUES (6, 'Bell Labs'); SELECT count(*) FROM researchers WHERE lab_id = 6; ABORT; --- but if a SELECT needs to go to new node, that's a problem... - +-- then check that SELECT going to new node still is fine BEGIN; UPDATE pg_dist_shard_placement AS sp SET shardstate = 3