diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 2af22d251..bf04d1f81 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -41,6 +41,7 @@ #include "distributed/multi_router_executor.h" #include "distributed/multi_router_planner.h" #include "distributed/multi_shard_transaction.h" +#include "distributed/placement_connection.h" #include "distributed/relay_utility.h" #include "distributed/remote_commands.h" #include "distributed/remote_transaction.h" @@ -72,30 +73,14 @@ /* controls use of locks to enforce safe commutativity */ bool AllModificationsCommutative = false; - -/* - * The following static variables are necessary to track the progression of - * multi-statement transactions managed by the router executor. After the first - * modification within a transaction, the executor populates a hash with the - * transaction's initial participants (nodes hit by that initial modification). - * - * To keep track of the reverse mapping (from shards to nodes), we have a list - * of XactShardConnSets, which map a shard identifier to a set of connection - * hash entries. This list is walked by MarkRemainingInactivePlacements to - * ensure we mark placements as failed if they reject a COMMIT. - */ -static HTAB *xactParticipantHash = NULL; -static List *xactShardConnSetList = NIL; - -/* functions needed during start phase */ -static void InitTransactionStateForTask(Task *task); -static HTAB * CreateXactParticipantHash(void); - /* functions needed during run phase */ static void ReacquireMetadataLocks(List *taskList); -static bool ExecuteSingleTask(QueryDesc *queryDesc, Task *task, - bool isModificationQuery, bool expectResults); -static void GetPlacementConnectionsReadyForTwoPhaseCommit(List *taskPlacementList); +static void ExecuteSingleModifyTask(QueryDesc *queryDesc, Task *task, + bool expectResults); +static void ExecuteSingleSelectTask(QueryDesc *queryDesc, Task *task); +static List * GetModifyConnections(List *taskPlacementList, + bool markCritical, + bool startedInTransaction); static void ExecuteMultipleTasks(QueryDesc *queryDesc, List *taskList, bool isModificationQuery, bool expectResults); static int64 ExecuteModifyTasks(List *taskList, bool expectResults, @@ -109,23 +94,15 @@ static bool RequiresConsistentSnapshot(Task *task); static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor, DestReceiver *destination, Tuplestorestate *tupleStore); -static PGconn * GetConnectionForPlacement(ShardPlacement *placement, - bool isModificationQuery); -static void PurgeConnectionForPlacement(PGconn *connection, ShardPlacement *placement); -static void RemoveXactConnection(PGconn *connection); static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, Oid **parameterTypes, const char ***parameterValues); -static bool SendQueryInSingleRowMode(PGconn *connection, char *query, +static bool SendQueryInSingleRowMode(MultiConnection *connection, char *query, ParamListInfo paramListInfo); -static bool StoreQueryResult(MaterialState *routerState, PGconn *connection, +static bool StoreQueryResult(MaterialState *routerState, MultiConnection *connection, TupleDesc tupleDescriptor, bool failOnError, int64 *rows); -static bool ConsumeQueryResult(PGconn *connection, bool failOnError, int64 *rows); -static void RecordShardIdParticipant(uint64 affectedShardId, - NodeConnectionEntry *participantEntry); - -/* to verify the health of shards after a transactional modification command */ -static void MarkRemainingInactivePlacements(void); +static bool ConsumeQueryResult(MultiConnection *connection, bool failOnError, + int64 *rows); /* @@ -232,52 +209,6 @@ ReacquireMetadataLocks(List *taskList) } -/* - * InitTransactionStateForTask is called during executor start with the first - * modifying (INSERT/UPDATE/DELETE) task during a transaction. It creates the - * transaction participant hash, opens connections to this task's nodes, and - * populates the hash with those connections after sending BEGIN commands to - * each. If a node fails to respond, its connection is set to NULL to prevent - * further interaction with it during the transaction. - */ -static void -InitTransactionStateForTask(Task *task) -{ - ListCell *placementCell = NULL; - - BeginOrContinueCoordinatedTransaction(); - - xactParticipantHash = CreateXactParticipantHash(); - - foreach(placementCell, task->taskPlacementList) - { - ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell); - NodeConnectionKey participantKey; - NodeConnectionEntry *participantEntry = NULL; - bool entryFound = false; - int connectionFlags = SESSION_LIFESPAN; - MultiConnection *connection = - GetNodeConnection(connectionFlags, placement->nodeName, placement->nodePort); - - MemSet(&participantKey, 0, sizeof(participantKey)); - strlcpy(participantKey.nodeName, placement->nodeName, - MAX_NODE_LENGTH + 1); - participantKey.nodePort = placement->nodePort; - - participantEntry = hash_search(xactParticipantHash, &participantKey, - HASH_ENTER, &entryFound); - Assert(!entryFound); - - /* issue BEGIN if necessary */ - RemoteTransactionBeginIfNecessary(connection); - - participantEntry->connection = connection; - } - - XactModificationLevel = XACT_MODIFICATION_DATA; -} - - /* * AcquireExecutorShardLock acquires a lock on the shard for the given task and * command type if necessary to avoid divergence between multiple replicas of @@ -527,33 +458,6 @@ RequiresConsistentSnapshot(Task *task) } -/* - * CreateXactParticipantHash initializes the map used to store the connections - * needed to process distributed transactions. Unlike the connection cache, we - * permit NULL connections here to signify that a participant has seen an error - * and is no longer receiving commands during a transaction. This hash should - * be walked at transaction end to send final COMMIT or ABORT commands. - */ -static HTAB * -CreateXactParticipantHash(void) -{ - HTAB *xactParticipantHash = NULL; - HASHCTL info; - int hashFlags = 0; - - MemSet(&info, 0, sizeof(info)); - info.keysize = sizeof(NodeConnectionKey); - info.entrysize = sizeof(NodeConnectionEntry); - info.hcxt = TopTransactionContext; - hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_BLOBS); - - xactParticipantHash = hash_create("citus xact participant hash", 32, &info, - hashFlags); - - return xactParticipantHash; -} - - /* * RouterExecutorRun actually executes a single task on a worker. */ @@ -633,13 +537,14 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count) if (list_length(taskList) == 1) { Task *task = (Task *) linitial(taskList); - bool resultsOK = false; - resultsOK = ExecuteSingleTask(queryDesc, task, isModificationQuery, - sendTuples); - if (!resultsOK) + if (isModificationQuery) { - ereport(ERROR, (errmsg("could not receive query results"))); + ExecuteSingleModifyTask(queryDesc, task, sendTuples); + } + else + { + ExecuteSingleSelectTask(queryDesc, task); } } else @@ -692,19 +597,73 @@ out: /* - * ExecuteSingleTask executes the task on the remote node, retrieves the - * results and stores them, if SELECT or RETURNING is used, in a tuple - * store. + * ExecuteSingleSelectTask executes the task on the remote node, retrieves the + * results and stores them in a tuple store. * * If the task fails on one of the placements, the function retries it on - * other placements (SELECT), reraises the remote error (constraint violation - * in DML), marks the affected placement as invalid (DML on some placement - * failed), or errors out (DML failed on all placements). + * other placements or errors out if the query fails on all placements. */ -static bool -ExecuteSingleTask(QueryDesc *queryDesc, Task *task, - bool isModificationQuery, - bool expectResults) +static void +ExecuteSingleSelectTask(QueryDesc *queryDesc, Task *task) +{ + TupleDesc tupleDescriptor = queryDesc->tupDesc; + MaterialState *routerState = (MaterialState *) queryDesc->planstate; + ParamListInfo paramListInfo = queryDesc->params; + List *taskPlacementList = task->taskPlacementList; + ListCell *taskPlacementCell = NULL; + char *queryString = task->queryString; + + if (XactModificationLevel == XACT_MODIFICATION_MULTI_SHARD) + { + ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("single-shard query may not appear in transaction blocks " + "which contain multi-shard data modifications"))); + } + + /* + * Try to run the query to completion on one placement. If the query fails + * attempt the query on the next placement. + */ + foreach(taskPlacementCell, taskPlacementList) + { + ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell); + bool queryOK = false; + bool dontFailOnError = false; + int64 currentAffectedTupleCount = 0; + int connectionFlags = SESSION_LIFESPAN; + MultiConnection *connection = + GetPlacementConnection(connectionFlags, taskPlacement, NULL); + + queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo); + if (!queryOK) + { + continue; + } + + queryOK = StoreQueryResult(routerState, connection, tupleDescriptor, + dontFailOnError, ¤tAffectedTupleCount); + if (queryOK) + { + return; + } + } + + ereport(ERROR, (errmsg("could not receive query results"))); +} + + +/* + * ExecuteSingleModifyTask executes the task on the remote node, retrieves the + * results and stores them, if RETURNING is used, in a tuple store. + * + * If the task fails on one of the placements, the function reraises the + * remote error (constraint violation in DML), marks the affected placement as + * invalid (other error on some placements, via the placement connection + * framework), or errors out (failed on all placements). + */ +static void +ExecuteSingleModifyTask(QueryDesc *queryDesc, Task *task, + bool expectResults) { CmdType operation = queryDesc->operation; TupleDesc tupleDescriptor = queryDesc->tupDesc; @@ -713,12 +672,15 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task, ParamListInfo paramListInfo = queryDesc->params; bool resultsOK = false; List *taskPlacementList = task->taskPlacementList; + List *connectionList = NIL; ListCell *taskPlacementCell = NULL; - List *failedPlacementList = NIL; + ListCell *connectionCell = NULL; int64 affectedTupleCount = -1; bool gotResults = false; char *queryString = task->queryString; bool taskRequiresTwoPhaseCommit = (task->replicationModel == REPLICATION_MODEL_2PC); + bool startedInTransaction = + InCoordinatedTransaction() && XactModificationLevel == XACT_MODIFICATION_DATA; if (XactModificationLevel == XACT_MODIFICATION_MULTI_SHARD) { @@ -729,59 +691,52 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task, } /* - * Firstly ensure that distributed transaction is started. Then, force - * the transaction manager to use 2PC while running the task on the placements. + * Modifications for reference tables are always done using 2PC. First + * ensure that distributed transaction is started. Then force the + * transaction manager to use 2PC while running the task on the + * placements. */ if (taskRequiresTwoPhaseCommit) { BeginOrContinueCoordinatedTransaction(); CoordinatedTransactionUse2PC(); - - /* - * Mark connections for all placements as critical and establish connections - * to all placements at once. - */ - GetPlacementConnectionsReadyForTwoPhaseCommit(taskPlacementList); } /* - * We could naturally handle function-based transactions (i.e. those - * using PL/pgSQL or similar) by checking the type of queryDesc->dest, - * but some customers already use functions that touch multiple shards - * from within a function, so we'll ignore functions for now. + * We could naturally handle function-based transactions (i.e. those using + * PL/pgSQL or similar) by checking the type of queryDesc->dest, but some + * customers already use functions that touch multiple shards from within + * a function, so we'll ignore functions for now. */ - if (operation != CMD_SELECT && xactParticipantHash == NULL && IsTransactionBlock()) + if (IsTransactionBlock()) { - InitTransactionStateForTask(task); + BeginOrContinueCoordinatedTransaction(); } + /* + * Get connections required to execute task. This will, if necessary, + * establish the connection, mark as critical (when modifying reference + * table) and start a transaction (when in a transaction). + */ + connectionList = GetModifyConnections(taskPlacementList, + taskRequiresTwoPhaseCommit, + startedInTransaction); + /* prevent replicas of the same shard from diverging */ AcquireExecutorShardLock(task, operation); - /* - * Try to run the query to completion on one placement. If the query fails - * attempt the query on the next placement. - */ - foreach(taskPlacementCell, taskPlacementList) + /* try to execute modification on all placements */ + forboth(taskPlacementCell, taskPlacementList, connectionCell, connectionList) { ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell); + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); bool queryOK = false; bool failOnError = false; int64 currentAffectedTupleCount = 0; - PGconn *connection = GetConnectionForPlacement(taskPlacement, - isModificationQuery); - - if (connection == NULL) - { - failedPlacementList = lappend(failedPlacementList, taskPlacement); - continue; - } queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo); if (!queryOK) { - PurgeConnectionForPlacement(connection, taskPlacement); - failedPlacementList = lappend(failedPlacementList, taskPlacement); continue; } @@ -829,87 +784,88 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task, resultsOK = true; gotResults = true; - - /* - * Modifications have to be executed on all placements, but for - * read queries we can stop here. - */ - if (!isModificationQuery) - { - break; - } - } - else - { - PurgeConnectionForPlacement(connection, taskPlacement); - - failedPlacementList = lappend(failedPlacementList, taskPlacement); - - continue; } } - if (isModificationQuery) + /* if all placements failed, error out */ + if (!resultsOK) { - ListCell *failedPlacementCell = NULL; - - /* if all placements failed, error out */ - if (list_length(failedPlacementList) == list_length(task->taskPlacementList)) - { - ereport(ERROR, (errmsg("could not modify any active placements"))); - } - - /* - * Otherwise, mark failed placements as inactive: they're stale. Note that - * connections for tasks that require 2PC has already failed the whole transaction - * and there is no way that they're marked stale here. - */ - foreach(failedPlacementCell, failedPlacementList) - { - ShardPlacement *failedPlacement = - (ShardPlacement *) lfirst(failedPlacementCell); - - Assert(!taskRequiresTwoPhaseCommit); - - UpdateShardPlacementState(failedPlacement->placementId, FILE_INACTIVE); - } - - executorState->es_processed = affectedTupleCount; + ereport(ERROR, (errmsg("could not modify any active placements"))); } - return resultsOK; + executorState->es_processed = affectedTupleCount; + + if (IsTransactionBlock()) + { + XactModificationLevel = XACT_MODIFICATION_DATA; + } } /* - * GetPlacementConnectionsReadyForTwoPhaseCommit iterates over the task placement list, - * starts the connections to the nodes and marks them critical. In the second iteration, - * the connection establishments are finished. Finally, BEGIN commands are sent, - * if necessary. + * GetModifyConnections returns the list of connections required to execute + * modify commands on the placements in tasPlacementList. If necessary remote + * transactions are started. + * + * If markCritical is true remote transactions are marked as critical. If + * noNewTransactions is true, this function errors out if there's no + * transaction in progress. */ -static void -GetPlacementConnectionsReadyForTwoPhaseCommit(List *taskPlacementList) +static List * +GetModifyConnections(List *taskPlacementList, bool markCritical, bool noNewTransactions) { ListCell *taskPlacementCell = NULL; List *multiConnectionList = NIL; - /* in the first iteration start the connections */ + /* first initiate connection establishment for all necessary connections */ foreach(taskPlacementCell, taskPlacementList) { ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell); - int connectionFlags = SESSION_LIFESPAN; - MultiConnection *multiConnection = StartNodeConnection(connectionFlags, - taskPlacement->nodeName, - taskPlacement->nodePort); + int connectionFlags = SESSION_LIFESPAN | FOR_DML; + MultiConnection *multiConnection = NULL; - MarkRemoteTransactionCritical(multiConnection); + /* + * FIXME: It's not actually correct to use only one shard placement + * here for router queries involving multiple relations. We should + * check that this connection is the only modifying one associated + * with all the involved shards. + */ + multiConnection = StartPlacementConnection(connectionFlags, taskPlacement, NULL); + + /* + * If already in a transaction, disallow expanding set of remote + * transactions. That prevents some forms of distributed deadlocks. + */ + if (noNewTransactions) + { + RemoteTransaction *transaction = &multiConnection->remoteTransaction; + + if (transaction->transactionState == REMOTE_TRANS_INVALID) + { + ereport(ERROR, (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), + errmsg("no transaction participant matches %s:%d", + taskPlacement->nodeName, taskPlacement->nodePort), + errdetail("Transactions which modify distributed tables " + "may only target nodes affected by the " + "modification command which began the transaction."))); + } + } + + if (markCritical) + { + MarkRemoteTransactionCritical(multiConnection); + } multiConnectionList = lappend(multiConnectionList, multiConnection); } + /* then finish in parallel */ FinishConnectionListEstablishment(multiConnectionList); + /* and start transactions if applicable */ RemoteTransactionsBeginIfNecessary(multiConnectionList); + + return multiConnectionList; } @@ -1009,8 +965,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn bool shardConnectionsFound = false; ShardConnections *shardConnections = NULL; List *connectionList = NIL; - MultiConnection *multiConnection = NULL; - PGconn *connection = NULL; + MultiConnection *connection = NULL; bool queryOK = false; shardConnections = GetShardConnections(shardId, &shardConnectionsFound); @@ -1022,14 +977,13 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn continue; } - multiConnection = + connection = (MultiConnection *) list_nth(connectionList, placementIndex); - connection = multiConnection->pgConn; queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo); if (!queryOK) { - ReraiseRemoteError(connection, NULL); + ReportConnectionError(connection, ERROR); } } @@ -1041,8 +995,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn bool shardConnectionsFound = false; ShardConnections *shardConnections = NULL; List *connectionList = NIL; - MultiConnection *multiConnection = NULL; - PGconn *connection = NULL; + MultiConnection *connection = NULL; int64 currentAffectedTupleCount = 0; bool failOnError = true; bool queryOK PG_USED_FOR_ASSERTS_ONLY = false; @@ -1060,9 +1013,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn continue; } - multiConnection = - (MultiConnection *) list_nth(connectionList, placementIndex); - connection = multiConnection->pgConn; + connection = (MultiConnection *) list_nth(connectionList, placementIndex); /* * If caller is interested, store query results the first time @@ -1101,16 +1052,13 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn if (currentAffectedTupleCount != previousAffectedTupleCount) { - char *nodeName = ConnectionGetOptionValue(connection, "host"); - char *nodePort = ConnectionGetOptionValue(connection, "port"); - ereport(WARNING, (errmsg("modified "INT64_FORMAT " tuples of shard " UINT64_FORMAT ", but expected to modify "INT64_FORMAT, currentAffectedTupleCount, shardId, previousAffectedTupleCount), - errdetail("modified placement on %s:%s", nodeName, - nodePort))); + errdetail("modified placement on %s:%d", + connection->hostname, connection->port))); } } @@ -1199,143 +1147,14 @@ ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor, } -/* - * GetConnectionForPlacement is the main entry point for acquiring a connection - * within the router executor. By using placements (rather than node names and - * ports) to identify connections, the router executor can keep track of shards - * used by multi-statement transactions and error out if a transaction tries - * to reach a new node altogether). In the single-statement commands context, - * GetConnectionForPlacement simply falls through to GetOrEstablishConnection. - */ -static PGconn * -GetConnectionForPlacement(ShardPlacement *placement, bool isModificationQuery) -{ - NodeConnectionKey participantKey; - NodeConnectionEntry *participantEntry = NULL; - bool entryFound = false; - - /* if not in a transaction, fall through to connection cache */ - if (xactParticipantHash == NULL) - { - PGconn *connection = GetOrEstablishConnection(placement->nodeName, - placement->nodePort); - - return connection; - } - - Assert(IsTransactionBlock()); - - MemSet(&participantKey, 0, sizeof(participantKey)); - strlcpy(participantKey.nodeName, placement->nodeName, MAX_NODE_LENGTH + 1); - participantKey.nodePort = placement->nodePort; - - participantEntry = hash_search(xactParticipantHash, &participantKey, HASH_FIND, - &entryFound); - - if (entryFound) - { - if (isModificationQuery) - { - RecordShardIdParticipant(placement->shardId, participantEntry); - } - - return participantEntry->connection->pgConn; - } - else - { - ereport(ERROR, (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), - errmsg("no transaction participant matches %s:%d", - placement->nodeName, placement->nodePort), - errdetail("Transactions which modify distributed tables may only " - "target nodes affected by the modification command " - "which began the transaction."))); - } -} - - -/* - * PurgeConnectionForPlacement provides a way to purge an invalid connection - * from all relevant connection hashes using the placement involved in the - * query at the time of the error. If a transaction is ongoing, this function - * ensures the right node's connection is set to NULL in the participant map - * for the transaction in addition to purging the connection cache's entry. - */ -static void -PurgeConnectionForPlacement(PGconn *connection, ShardPlacement *placement) -{ - CloseConnectionByPGconn(connection); - - /* - * The following is logically identical to RemoveXactConnection, but since - * we have a ShardPlacement to help build a NodeConnectionKey, we avoid - * any penalty incurred by calling BuildKeyForConnection, which must ex- - * tract host, port, and user from the connection options list. - */ - if (xactParticipantHash != NULL) - { - NodeConnectionEntry *participantEntry = NULL; - bool entryFound = false; - NodeConnectionKey nodeKey; - char *currentUser = CurrentUserName(); - - MemSet(&nodeKey, 0, sizeof(NodeConnectionKey)); - strlcpy(nodeKey.nodeName, placement->nodeName, MAX_NODE_LENGTH + 1); - nodeKey.nodePort = placement->nodePort; - strlcpy(nodeKey.nodeUser, currentUser, NAMEDATALEN); - Assert(IsTransactionBlock()); - - /* the participant hash doesn't use the user field */ - MemSet(&nodeKey.nodeUser, 0, sizeof(nodeKey.nodeUser)); - participantEntry = hash_search(xactParticipantHash, &nodeKey, HASH_FIND, - &entryFound); - - Assert(entryFound); - - participantEntry->connection = NULL; - } -} - - -/* - * Removes a given connection from the transaction participant hash, based on - * the host and port of the provided connection. If the hash is not NULL, it - * MUST contain the provided connection, or a FATAL error is raised. - */ -static void -RemoveXactConnection(PGconn *connection) -{ - NodeConnectionKey nodeKey; - NodeConnectionEntry *participantEntry = NULL; - bool entryFound = false; - - if (xactParticipantHash == NULL) - { - return; - } - - BuildKeyForConnection(connection, &nodeKey); - - /* the participant hash doesn't use the user field */ - MemSet(&nodeKey.nodeUser, 0, sizeof(nodeKey.nodeUser)); - participantEntry = hash_search(xactParticipantHash, &nodeKey, HASH_FIND, - &entryFound); - - if (!entryFound) - { - ereport(FATAL, (errmsg("could not find specified transaction connection"))); - } - - participantEntry->connection = NULL; -} - - /* * SendQueryInSingleRowMode sends the given query on the connection in an * asynchronous way. The function also sets the single-row mode on the * connection so that we receive results a row at a time. */ static bool -SendQueryInSingleRowMode(PGconn *connection, char *query, ParamListInfo paramListInfo) +SendQueryInSingleRowMode(MultiConnection *connection, char *query, + ParamListInfo paramListInfo) { int querySent = 0; int singleRowMode = 0; @@ -1349,24 +1168,27 @@ SendQueryInSingleRowMode(PGconn *connection, char *query, ParamListInfo paramLis ExtractParametersFromParamListInfo(paramListInfo, ¶meterTypes, ¶meterValues); - querySent = PQsendQueryParams(connection, query, parameterCount, parameterTypes, - parameterValues, NULL, NULL, 0); + querySent = PQsendQueryParams(connection->pgConn, query, + parameterCount, parameterTypes, parameterValues, + NULL, NULL, 0); } else { - querySent = PQsendQuery(connection, query); + querySent = PQsendQuery(connection->pgConn, query); } if (querySent == 0) { - WarnRemoteError(connection, NULL); + MarkRemoteTransactionFailed(connection, false); + ReportConnectionError(connection, WARNING); return false; } - singleRowMode = PQsetSingleRowMode(connection); + singleRowMode = PQsetSingleRowMode(connection->pgConn); if (singleRowMode == 0) { - WarnRemoteError(connection, NULL); + MarkRemoteTransactionFailed(connection, false); + ReportConnectionError(connection, WARNING); return false; } @@ -1451,7 +1273,7 @@ ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, Oid **parameterT * the connection. */ static bool -StoreQueryResult(MaterialState *routerState, PGconn *connection, +StoreQueryResult(MaterialState *routerState, MultiConnection *connection, TupleDesc tupleDescriptor, bool failOnError, int64 *rows) { AttInMetadata *attributeInputMetadata = TupleDescGetAttInMetadata(tupleDescriptor); @@ -1486,7 +1308,7 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection, uint32 columnCount = 0; ExecStatusType resultStatus = 0; - PGresult *result = PQgetResult(connection); + PGresult *result = PQgetResult(connection->pgConn); if (result == NULL) { break; @@ -1499,6 +1321,8 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection, int category = 0; bool isConstraintViolation = false; + MarkRemoteTransactionFailed(connection, false); + /* * If the error code is in constraint violation class, we want to * fail fast because we must get the same error from all shard @@ -1509,12 +1333,11 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection, if (isConstraintViolation || failOnError) { - RemoveXactConnection(connection); - ReraiseRemoteError(connection, result); + ReportResultError(connection, result, ERROR); } else { - WarnRemoteError(connection, result); + ReportResultError(connection, result, WARNING); } PQclear(result); @@ -1579,7 +1402,7 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection, * has been an error. */ static bool -ConsumeQueryResult(PGconn *connection, bool failOnError, int64 *rows) +ConsumeQueryResult(MultiConnection *connection, bool failOnError, int64 *rows) { bool commandFailed = false; bool gotResponse = false; @@ -1593,7 +1416,7 @@ ConsumeQueryResult(PGconn *connection, bool failOnError, int64 *rows) */ while (true) { - PGresult *result = PQgetResult(connection); + PGresult *result = PQgetResult(connection->pgConn); ExecStatusType status = PGRES_COMMAND_OK; if (result == NULL) @@ -1611,6 +1434,8 @@ ConsumeQueryResult(PGconn *connection, bool failOnError, int64 *rows) int category = 0; bool isConstraintViolation = false; + MarkRemoteTransactionFailed(connection, false); + /* * If the error code is in constraint violation class, we want to * fail fast because we must get the same error from all shard @@ -1621,13 +1446,13 @@ ConsumeQueryResult(PGconn *connection, bool failOnError, int64 *rows) if (isConstraintViolation || failOnError) { - RemoveXactConnection(connection); - ReraiseRemoteError(connection, result); + ReportResultError(connection, result, ERROR); } else { - WarnRemoteError(connection, result); + ReportResultError(connection, result, WARNING); } + PQclear(result); commandFailed = true; @@ -1667,50 +1492,6 @@ ConsumeQueryResult(PGconn *connection, bool failOnError, int64 *rows) } -/* - * RecordShardIdParticipant registers a connection as being involved with a - * particular shard during a multi-statement transaction. - */ -static void -RecordShardIdParticipant(uint64 affectedShardId, NodeConnectionEntry *participantEntry) -{ - XactShardConnSet *shardConnSetMatch = NULL; - ListCell *listCell = NULL; - MemoryContext oldContext = NULL; - List *connectionEntryList = NIL; - - /* check whether an entry already exists for this shard */ - foreach(listCell, xactShardConnSetList) - { - XactShardConnSet *shardConnSet = (XactShardConnSet *) lfirst(listCell); - - if (shardConnSet->shardId == affectedShardId) - { - shardConnSetMatch = shardConnSet; - } - } - - /* entries must last through the whole top-level transaction */ - oldContext = MemoryContextSwitchTo(TopTransactionContext); - - /* if no entry found, make one */ - if (shardConnSetMatch == NULL) - { - shardConnSetMatch = (XactShardConnSet *) palloc0(sizeof(XactShardConnSet)); - shardConnSetMatch->shardId = affectedShardId; - - xactShardConnSetList = lappend(xactShardConnSetList, shardConnSetMatch); - } - - /* add connection, avoiding duplicates */ - connectionEntryList = shardConnSetMatch->connectionEntryList; - shardConnSetMatch->connectionEntryList = list_append_unique_ptr(connectionEntryList, - participantEntry); - - MemoryContextSwitchTo(oldContext); -} - - /* * RouterExecutorFinish cleans up after a distributed execution. */ @@ -1745,114 +1526,3 @@ RouterExecutorEnd(QueryDesc *queryDesc) queryDesc->estate = NULL; queryDesc->totaltime = NULL; } - - -/* - * RouterExecutorPreCommitCheck() gets called after remote transactions have - * committed, so it can invalidate failed shards and perform related checks. - */ -void -RouterExecutorPreCommitCheck(void) -{ - /* no transactional router modification were issued, nothing to do */ - if (xactParticipantHash == NULL) - { - return; - } - - MarkRemainingInactivePlacements(); -} - - -/* - * Cleanup callback called after a transaction commits or aborts. - */ -void -RouterExecutorPostCommit(void) -{ - /* reset transaction state */ - xactParticipantHash = NULL; - xactShardConnSetList = NIL; -} - - -/* - * MarkRemainingInactivePlacements takes care of marking placements of a shard - * inactive after some of the placements rejected the final COMMIT phase of a - * transaction. - * - * Failures are detected by checking the connection & transaction state for - * each of the entries in the connection set for each shard. - */ -static void -MarkRemainingInactivePlacements(void) -{ - ListCell *shardConnSetCell = NULL; - int totalSuccesses = 0; - - if (xactParticipantHash == NULL) - { - return; - } - - foreach(shardConnSetCell, xactShardConnSetList) - { - XactShardConnSet *shardConnSet = (XactShardConnSet *) lfirst(shardConnSetCell); - List *participantList = shardConnSet->connectionEntryList; - ListCell *participantCell = NULL; - int successes = list_length(participantList); /* assume full success */ - - /* determine how many actual successes there were: subtract failures */ - foreach(participantCell, participantList) - { - NodeConnectionEntry *participant = - (NodeConnectionEntry *) lfirst(participantCell); - MultiConnection *connection = participant->connection; - - /* - * Fail if the connection has been set to NULL after an error, or - * if the transaction failed for other reasons (e.g. COMMIT - * failed). - */ - if (connection == NULL || connection->remoteTransaction.transactionFailed) - { - successes--; - } - } - - /* if no nodes succeeded for this shard, don't do anything */ - if (successes == 0) - { - continue; - } - - /* otherwise, ensure failed placements are marked inactive */ - foreach(participantCell, participantList) - { - NodeConnectionEntry *participant = NULL; - participant = (NodeConnectionEntry *) lfirst(participantCell); - - if (participant->connection == NULL || - participant->connection->remoteTransaction.transactionFailed) - { - uint64 shardId = shardConnSet->shardId; - NodeConnectionKey *nodeKey = &participant->cacheKey; - uint64 shardLength = 0; - uint64 placementId = INVALID_PLACEMENT_ID; - - placementId = DeleteShardPlacementRow(shardId, nodeKey->nodeName, - nodeKey->nodePort); - InsertShardPlacementRow(shardId, placementId, FILE_INACTIVE, shardLength, - nodeKey->nodeName, nodeKey->nodePort); - } - } - - totalSuccesses++; - } - - /* If no shards could be modified at all, error out. */ - if (totalSuccesses == 0) - { - ereport(ERROR, (errmsg("could not commit transaction on any active nodes"))); - } -} diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 92ba56631..40a0e4651 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -21,7 +21,6 @@ #include "access/xact.h" #include "distributed/connection_management.h" #include "distributed/hash_helpers.h" -#include "distributed/multi_router_executor.h" #include "distributed/multi_shard_transaction.h" #include "distributed/transaction_management.h" #include "distributed/placement_connection.h" @@ -150,7 +149,6 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) * callbacks still can perform work if needed. */ ResetShardPlacementTransactionState(); - RouterExecutorPostCommit(); if (CurrentCoordinatedTransactionState == COORD_TRANS_PREPARED) { @@ -187,7 +185,6 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) * callbacks still can perform work if needed. */ ResetShardPlacementTransactionState(); - RouterExecutorPostCommit(); /* handles both already prepared and open transactions */ if (CurrentCoordinatedTransactionState > COORD_TRANS_IDLE) @@ -268,14 +265,6 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) CurrentCoordinatedTransactionState = COORD_TRANS_COMMITTED; } - /* - * Call other parts of citus that need to integrate into - * transaction management. Call them *after* committing/preparing - * the remote transactions, to allow marking shards as invalid - * (e.g. if the remote commit failed). - */ - RouterExecutorPreCommitCheck(); - /* * * Check again whether shards/placement successfully diff --git a/src/include/distributed/multi_router_executor.h b/src/include/distributed/multi_router_executor.h index f584abdc8..bb16f5e15 100644 --- a/src/include/distributed/multi_router_executor.h +++ b/src/include/distributed/multi_router_executor.h @@ -37,8 +37,6 @@ extern void RouterExecutorStart(QueryDesc *queryDesc, int eflags, List *taskList extern void RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count); extern void RouterExecutorFinish(QueryDesc *queryDesc); extern void RouterExecutorEnd(QueryDesc *queryDesc); -extern void RouterExecutorPreCommitCheck(void); -extern void RouterExecutorPostCommit(void); extern int64 ExecuteModifyTasksWithoutResults(List *taskList); diff --git a/src/test/regress/expected/multi_modifying_xacts.out b/src/test/regress/expected/multi_modifying_xacts.out index 6dffd0583..aecaf0191 100644 --- a/src/test/regress/expected/multi_modifying_xacts.out +++ b/src/test/regress/expected/multi_modifying_xacts.out @@ -140,8 +140,8 @@ INSERT INTO researchers VALUES (9, 6, 'Leslie Lamport'); ERROR: no transaction participant matches localhost:57638 DETAIL: Transactions which modify distributed tables may only target nodes affected by the modification command which began the transaction. COMMIT; --- this logic even applies to router SELECTs occurring after a modification: --- selecting from the modified node is fine... +-- SELECTs may occur after a modification: First check that selecting +-- from the modified node works. BEGIN; INSERT INTO labs VALUES (6, 'Bell Labs'); SELECT count(*) FROM researchers WHERE lab_id = 6; @@ -151,7 +151,7 @@ SELECT count(*) FROM researchers WHERE lab_id = 6; (1 row) ABORT; --- but if a SELECT needs to go to new node, that's a problem... +-- then check that SELECT going to new node still is fine BEGIN; UPDATE pg_dist_shard_placement AS sp SET shardstate = 3 FROM pg_dist_shard AS s @@ -161,8 +161,11 @@ AND sp.nodeport = :worker_1_port AND s.logicalrelid = 'researchers'::regclass; INSERT INTO labs VALUES (6, 'Bell Labs'); SELECT count(*) FROM researchers WHERE lab_id = 6; -ERROR: no transaction participant matches localhost:57638 -DETAIL: Transactions which modify distributed tables may only target nodes affected by the modification command which began the transaction. + count +------- + 0 +(1 row) + ABORT; -- applies to DDL, too BEGIN; @@ -494,7 +497,9 @@ WARNING: illegal value WARNING: failed to commit transaction on localhost:57637 WARNING: illegal value WARNING: failed to commit transaction on localhost:57638 -ERROR: could not commit transaction on any active nodes +WARNING: could not commit transaction for shard 1200002 on any active node +WARNING: could not commit transaction for shard 1200003 on any active node +ERROR: could not commit transaction on any active node -- data should NOT be persisted SELECT * FROM objects WHERE id = 1; id | name @@ -530,6 +535,7 @@ INSERT INTO labs VALUES (9, 'BAD'); COMMIT; WARNING: illegal value WARNING: failed to commit transaction on localhost:57637 +WARNING: could not commit transaction for shard 1200002 on any active node \set VERBOSITY default -- data to objects should be persisted, but labs should not... SELECT * FROM objects WHERE id = 1; diff --git a/src/test/regress/sql/multi_modifying_xacts.sql b/src/test/regress/sql/multi_modifying_xacts.sql index cc10b385c..8a2497e57 100644 --- a/src/test/regress/sql/multi_modifying_xacts.sql +++ b/src/test/regress/sql/multi_modifying_xacts.sql @@ -111,15 +111,14 @@ INSERT INTO labs VALUES (6, 'Bell Labs'); INSERT INTO researchers VALUES (9, 6, 'Leslie Lamport'); COMMIT; --- this logic even applies to router SELECTs occurring after a modification: --- selecting from the modified node is fine... +-- SELECTs may occur after a modification: First check that selecting +-- from the modified node works. BEGIN; INSERT INTO labs VALUES (6, 'Bell Labs'); SELECT count(*) FROM researchers WHERE lab_id = 6; ABORT; --- but if a SELECT needs to go to new node, that's a problem... - +-- then check that SELECT going to new node still is fine BEGIN; UPDATE pg_dist_shard_placement AS sp SET shardstate = 3