diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index caf4eeb2a..0f1462032 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -28,7 +28,6 @@ #include "catalog/pg_type.h" #include "distributed/citus_clauses.h" #include "distributed/citus_ruleutils.h" -#include "distributed/connection_cache.h" #include "distributed/connection_management.h" #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" @@ -36,9 +35,11 @@ #include "distributed/multi_physical_planner.h" #include "distributed/multi_planner.h" #include "distributed/multi_router_executor.h" +#include "distributed/placement_connection.h" #include "distributed/relay_utility.h" #include "distributed/remote_commands.h" #include "distributed/resource_lock.h" +#include "distributed/remote_commands.h" #include "executor/execdesc.h" #include "executor/executor.h" #include "executor/instrument.h" @@ -66,29 +67,9 @@ /* controls use of locks to enforce safe commutativity */ bool AllModificationsCommutative = false; -/* - * The following static variables are necessary to track the progression of - * multi-statement transactions managed by the router executor. After the first - * modification within a transaction, the executor populates a hash with the - * transaction's initial participants (nodes hit by that initial modification). - * - * To keep track of the reverse mapping (from shards to nodes), we have a list - * of XactShardConnSets, which map a shard identifier to a set of connection - * hash entries. This list is walked by MarkRemainingInactivePlacements to - * ensure we mark placements as failed if they reject a COMMIT. - * - * Beyond that, there's a backend hook to register xact callbacks and a flag to - * track when a user tries to roll back to a savepoint (not allowed). - */ -static HTAB *xactParticipantHash = NULL; -static List *xactShardConnSetList = NIL; -static bool subXactAbortAttempted = false; - /* functions needed during start phase */ -static void InitTransactionStateForTask(Task *task); static LOCKMODE CommutativityRuleToLockMode(CmdType commandType, bool upsertQuery); static void AcquireExecutorShardLock(Task *task, LOCKMODE lockMode); -static HTAB * CreateXactParticipantHash(void); /* functions needed during run phase */ static bool ExecuteTaskAndStoreResults(QueryDesc *queryDesc, @@ -98,27 +79,14 @@ static bool ExecuteTaskAndStoreResults(QueryDesc *queryDesc, static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor, DestReceiver *destination, Tuplestorestate *tupleStore); -static PGconn * GetConnectionForPlacement(ShardPlacement *placement, - bool isModificationQuery); -static void PurgeConnectionForPlacement(ShardPlacement *placement); -static void RemoveXactConnection(PGconn *connection); static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, Oid **parameterTypes, const char ***parameterValues); -static bool SendQueryInSingleRowMode(PGconn *connection, char *query, +static bool SendQueryInSingleRowMode(MultiConnection *connection, char *query, ParamListInfo paramListInfo); -static bool StoreQueryResult(MaterialState *routerState, PGconn *connection, +static bool StoreQueryResult(MaterialState *routerState, MultiConnection *connection, TupleDesc tupleDescriptor, int64 *rows); -static bool ConsumeQueryResult(PGconn *connection, int64 *rows); -static void RecordShardIdParticipant(uint64 affectedShardId, - NodeConnectionEntry *participantEntry); - -/* functions needed by callbacks and hooks */ -static void RouterTransactionCallback(XactEvent event, void *arg); -static void RouterSubtransactionCallback(SubXactEvent event, SubTransactionId subId, - SubTransactionId parentSubid, void *arg); -static void ExecuteTransactionEnd(bool commit); -static void MarkRemainingInactivePlacements(void); +static bool ConsumeQueryResult(MultiConnection *connection, int64 *rows); /* @@ -154,9 +122,9 @@ RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task) * but some customers already use functions that touch multiple shards * from within a function, so we'll ignore functions for now. */ - if (IsTransactionBlock() && xactParticipantHash == NULL) + if (IsTransactionBlock()) { - InitTransactionStateForTask(task); + BeginOrContinueCoordinatedTransaction(); } } @@ -186,62 +154,6 @@ RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task) } -/* - * InitTransactionStateForTask is called during executor start with the first - * modifying (INSERT/UPDATE/DELETE) task during a transaction. It creates the - * transaction participant hash, opens connections to this task's nodes, and - * populates the hash with those connections after sending BEGIN commands to - * each. If a node fails to respond, its connection is set to NULL to prevent - * further interaction with it during the transaction. - */ -static void -InitTransactionStateForTask(Task *task) -{ - ListCell *placementCell = NULL; - - xactParticipantHash = CreateXactParticipantHash(); - - foreach(placementCell, task->taskPlacementList) - { - ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell); - NodeConnectionKey participantKey; - NodeConnectionEntry *participantEntry = NULL; - bool entryFound = false; - - PGconn *connection = NULL; - - MemSet(&participantKey, 0, sizeof(participantKey)); - strlcpy(participantKey.nodeName, placement->nodeName, - MAX_NODE_LENGTH + 1); - participantKey.nodePort = placement->nodePort; - - participantEntry = hash_search(xactParticipantHash, &participantKey, - HASH_ENTER, &entryFound); - Assert(!entryFound); - - connection = GetOrEstablishConnection(placement->nodeName, - placement->nodePort); - if (connection != NULL) - { - PGresult *result = PQexec(connection, "BEGIN"); - if (PQresultStatus(result) != PGRES_COMMAND_OK) - { - WarnRemoteError(connection, result); - PurgeConnection(connection); - - connection = NULL; - } - - PQclear(result); - } - - participantEntry->connection = connection; - } - - XactModificationLevel = XACT_MODIFICATION_DATA; -} - - /* * CommutativityRuleToLockMode determines the commutativity rule for the given * command and returns the appropriate lock mode to enforce that rule. The @@ -311,33 +223,6 @@ AcquireExecutorShardLock(Task *task, LOCKMODE lockMode) } -/* - * CreateXactParticipantHash initializes the map used to store the connections - * needed to process distributed transactions. Unlike the connection cache, we - * permit NULL connections here to signify that a participant has seen an error - * and is no longer receiving commands during a transaction. This hash should - * be walked at transaction end to send final COMMIT or ABORT commands. - */ -static HTAB * -CreateXactParticipantHash(void) -{ - HTAB *xactParticipantHash = NULL; - HASHCTL info; - int hashFlags = 0; - - MemSet(&info, 0, sizeof(info)); - info.keysize = sizeof(NodeConnectionKey); - info.entrysize = sizeof(NodeConnectionEntry); - info.hcxt = TopTransactionContext; - hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_BLOBS); - - xactParticipantHash = hash_create("citus xact participant hash", 32, &info, - hashFlags); - - return xactParticipantHash; -} - - /* * RouterExecutorRun actually executes a single task on a worker. */ @@ -422,6 +307,12 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count) /* mark underlying query as having executed */ routerState->eof_underlying = true; + + /* have performed modifications now */ + if (isModificationQuery) + { + XactModificationLevel = XACT_MODIFICATION_DATA; + } } /* if the underlying query produced output, return it */ @@ -489,6 +380,7 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, int64 affectedTupleCount = -1; bool gotResults = false; char *queryString = task->queryString; + int connectionFlags = NEW_CONNECTION | CACHED_CONNECTION | SESSION_LIFESPAN; if (isModificationQuery && task->requiresMasterEvaluation) { @@ -506,6 +398,11 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, elog(DEBUG4, "query after master evaluation: %s", queryString); } + if (isModificationQuery) + { + connectionFlags |= FOR_DML; + } + /* * Try to run the query to completion on one placement. If the query fails * attempt the query on the next placement. @@ -515,8 +412,16 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell); bool queryOK = false; int64 currentAffectedTupleCount = 0; - PGconn *connection = GetConnectionForPlacement(taskPlacement, - isModificationQuery); + + /* + * FIXME: It's not actually correct to use only one shard placement + * here for router queries involving multiple relations. We should + * check that this connection is the only modifying one associated + * with all the involved shards. + */ + MultiConnection *connection = GetPlacementConnection(connectionFlags, + taskPlacement); + AdjustRemoteTransactionState(connection); if (connection == NULL) { @@ -527,7 +432,6 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo); if (!queryOK) { - PurgeConnectionForPlacement(taskPlacement); failedPlacementList = lappend(failedPlacementList, taskPlacement); continue; } @@ -584,8 +488,6 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, } else { - PurgeConnectionForPlacement(taskPlacement); - failedPlacementList = lappend(failedPlacementList, taskPlacement); continue; @@ -662,145 +564,14 @@ ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor, } -/* - * GetConnectionForPlacement is the main entry point for acquiring a connection - * within the router executor. By using placements (rather than node names and - * ports) to identify connections, the router executor can keep track of shards - * used by multi-statement transactions and error out if a transaction tries - * to reach a new node altogether). In the single-statement commands context, - * GetConnectionForPlacement simply falls through to GetOrEstablishConnection. - */ -static PGconn * -GetConnectionForPlacement(ShardPlacement *placement, bool isModificationQuery) -{ - NodeConnectionKey participantKey; - NodeConnectionEntry *participantEntry = NULL; - bool entryFound = false; - - /* if not in a transaction, fall through to connection cache */ - if (xactParticipantHash == NULL) - { - PGconn *connection = GetOrEstablishConnection(placement->nodeName, - placement->nodePort); - - return connection; - } - - Assert(IsTransactionBlock()); - - MemSet(&participantKey, 0, sizeof(participantKey)); - strlcpy(participantKey.nodeName, placement->nodeName, MAX_NODE_LENGTH + 1); - participantKey.nodePort = placement->nodePort; - - participantEntry = hash_search(xactParticipantHash, &participantKey, HASH_FIND, - &entryFound); - - if (entryFound) - { - if (isModificationQuery) - { - RecordShardIdParticipant(placement->shardId, participantEntry); - } - - return participantEntry->connection; - } - else - { - ereport(ERROR, (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), - errmsg("no transaction participant matches %s:%d", - placement->nodeName, placement->nodePort), - errdetail("Transactions which modify distributed tables may only " - "target nodes affected by the modification command " - "which began the transaction."))); - } -} - - -/* - * PurgeConnectionForPlacement provides a way to purge an invalid connection - * from all relevant connection hashes using the placement involved in the - * query at the time of the error. If a transaction is ongoing, this function - * ensures the right node's connection is set to NULL in the participant map - * for the transaction in addition to purging the connection cache's entry. - */ -static void -PurgeConnectionForPlacement(ShardPlacement *placement) -{ - NodeConnectionKey nodeKey; - char *currentUser = CurrentUserName(); - - MemSet(&nodeKey, 0, sizeof(NodeConnectionKey)); - strlcpy(nodeKey.nodeName, placement->nodeName, MAX_NODE_LENGTH + 1); - nodeKey.nodePort = placement->nodePort; - strlcpy(nodeKey.nodeUser, currentUser, NAMEDATALEN); - - PurgeConnectionByKey(&nodeKey); - - /* - * The following is logically identical to RemoveXactConnection, but since - * we have a ShardPlacement to help build a NodeConnectionKey, we avoid - * any penalty incurred by calling BuildKeyForConnection, which must ex- - * tract host, port, and user from the connection options list. - */ - if (xactParticipantHash != NULL) - { - NodeConnectionEntry *participantEntry = NULL; - bool entryFound = false; - - Assert(IsTransactionBlock()); - - /* the participant hash doesn't use the user field */ - MemSet(&nodeKey.nodeUser, 0, sizeof(nodeKey.nodeUser)); - participantEntry = hash_search(xactParticipantHash, &nodeKey, HASH_FIND, - &entryFound); - - Assert(entryFound); - - participantEntry->connection = NULL; - } -} - - -/* - * Removes a given connection from the transaction participant hash, based on - * the host and port of the provided connection. If the hash is not NULL, it - * MUST contain the provided connection, or a FATAL error is raised. - */ -static void -RemoveXactConnection(PGconn *connection) -{ - NodeConnectionKey nodeKey; - NodeConnectionEntry *participantEntry = NULL; - bool entryFound = false; - - if (xactParticipantHash == NULL) - { - return; - } - - BuildKeyForConnection(connection, &nodeKey); - - /* the participant hash doesn't use the user field */ - MemSet(&nodeKey.nodeUser, 0, sizeof(nodeKey.nodeUser)); - participantEntry = hash_search(xactParticipantHash, &nodeKey, HASH_FIND, - &entryFound); - - if (!entryFound) - { - ereport(FATAL, (errmsg("could not find specified transaction connection"))); - } - - participantEntry->connection = NULL; -} - - /* * SendQueryInSingleRowMode sends the given query on the connection in an * asynchronous way. The function also sets the single-row mode on the * connection so that we receive results a row at a time. */ static bool -SendQueryInSingleRowMode(PGconn *connection, char *query, ParamListInfo paramListInfo) +SendQueryInSingleRowMode(MultiConnection *connection, char *query, + ParamListInfo paramListInfo) { int querySent = 0; int singleRowMode = 0; @@ -814,24 +585,25 @@ SendQueryInSingleRowMode(PGconn *connection, char *query, ParamListInfo paramLis ExtractParametersFromParamListInfo(paramListInfo, ¶meterTypes, ¶meterValues); - querySent = PQsendQueryParams(connection, query, parameterCount, parameterTypes, - parameterValues, NULL, NULL, 0); + querySent = PQsendQueryParams(connection->conn, query, parameterCount, + parameterTypes, parameterValues, + NULL, NULL, 0); } else { - querySent = PQsendQuery(connection, query); + querySent = PQsendQuery(connection->conn, query); } if (querySent == 0) { - WarnRemoteError(connection, NULL); + ReportConnectionError(connection, WARNING); return false; } - singleRowMode = PQsetSingleRowMode(connection); + singleRowMode = PQsetSingleRowMode(connection->conn); if (singleRowMode == 0) { - WarnRemoteError(connection, NULL); + ReportConnectionError(connection, WARNING); return false; } @@ -904,7 +676,7 @@ ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, Oid **parameterT * the connection. */ static bool -StoreQueryResult(MaterialState *routerState, PGconn *connection, +StoreQueryResult(MaterialState *routerState, MultiConnection *connection, TupleDesc tupleDescriptor, int64 *rows) { AttInMetadata *attributeInputMetadata = TupleDescGetAttInMetadata(tupleDescriptor); @@ -939,7 +711,7 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection, uint32 columnCount = 0; ExecStatusType resultStatus = 0; - PGresult *result = PQgetResult(connection); + PGresult *result = PQgetResult(connection->conn); if (result == NULL) { break; @@ -962,12 +734,11 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection, if (raiseError) { - RemoveXactConnection(connection); - ReraiseRemoteError(connection, result); + ReportResultError(connection, result, ERROR); } else { - WarnRemoteError(connection, result); + ReportResultError(connection, result, WARNING); } PQclear(result); @@ -1032,7 +803,7 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection, * has been an error. */ static bool -ConsumeQueryResult(PGconn *connection, int64 *rows) +ConsumeQueryResult(MultiConnection *connection, int64 *rows) { bool commandFailed = false; bool gotResponse = false; @@ -1046,7 +817,7 @@ ConsumeQueryResult(PGconn *connection, int64 *rows) */ while (true) { - PGresult *result = PQgetResult(connection); + PGresult *result = PQgetResult(connection->conn); ExecStatusType status = PGRES_COMMAND_OK; if (result == NULL) @@ -1074,12 +845,11 @@ ConsumeQueryResult(PGconn *connection, int64 *rows) if (raiseError) { - RemoveXactConnection(connection); - ReraiseRemoteError(connection, result); + ReportResultError(connection, result, ERROR); } else { - WarnRemoteError(connection, result); + ReportResultError(connection, result, WARNING); } PQclear(result); @@ -1117,50 +887,6 @@ ConsumeQueryResult(PGconn *connection, int64 *rows) } -/* - * RecordShardIdParticipant registers a connection as being involved with a - * particular shard during a multi-statement transaction. - */ -static void -RecordShardIdParticipant(uint64 affectedShardId, NodeConnectionEntry *participantEntry) -{ - XactShardConnSet *shardConnSetMatch = NULL; - ListCell *listCell = NULL; - MemoryContext oldContext = NULL; - List *connectionEntryList = NIL; - - /* check whether an entry already exists for this shard */ - foreach(listCell, xactShardConnSetList) - { - XactShardConnSet *shardConnSet = (XactShardConnSet *) lfirst(listCell); - - if (shardConnSet->shardId == affectedShardId) - { - shardConnSetMatch = shardConnSet; - } - } - - /* entries must last through the whole top-level transaction */ - oldContext = MemoryContextSwitchTo(TopTransactionContext); - - /* if no entry found, make one */ - if (shardConnSetMatch == NULL) - { - shardConnSetMatch = (XactShardConnSet *) palloc0(sizeof(XactShardConnSet)); - shardConnSetMatch->shardId = affectedShardId; - - xactShardConnSetList = lappend(xactShardConnSetList, shardConnSetMatch); - } - - /* add connection, avoiding duplicates */ - connectionEntryList = shardConnSetMatch->connectionEntryList; - shardConnSetMatch->connectionEntryList = list_append_unique_ptr(connectionEntryList, - participantEntry); - - MemoryContextSwitchTo(oldContext); -} - - /* * RouterExecutorFinish cleans up after a distributed execution. */ @@ -1195,221 +921,3 @@ RouterExecutorEnd(QueryDesc *queryDesc) queryDesc->estate = NULL; queryDesc->totaltime = NULL; } - - -/* - * RegisterRouterExecutorXactCallbacks registers this executor's callbacks. - */ -void -RegisterRouterExecutorXactCallbacks(void) -{ - RegisterXactCallback(RouterTransactionCallback, NULL); - RegisterSubXactCallback(RouterSubtransactionCallback, NULL); -} - - -/* - * RouterTransactionCallback handles committing or aborting remote transactions - * after the local one has committed or aborted. It only sends COMMIT or ABORT - * commands to still-healthy remotes; the failed ones are marked as inactive if - * after a successful COMMIT (no need to mark on ABORTs). - */ -static void -RouterTransactionCallback(XactEvent event, void *arg) -{ - if (XactModificationLevel != XACT_MODIFICATION_DATA) - { - return; - } - - switch (event) - { - case XACT_EVENT_PARALLEL_COMMIT: - case XACT_EVENT_COMMIT: - { - break; - } - - case XACT_EVENT_PARALLEL_ABORT: - case XACT_EVENT_ABORT: - { - bool commit = false; - - ExecuteTransactionEnd(commit); - - break; - } - - /* no support for prepare with multi-statement transactions */ - case XACT_EVENT_PREPARE: - case XACT_EVENT_PRE_PREPARE: - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot prepare a transaction that modified " - "distributed tables"))); - - break; - } - - case XACT_EVENT_PARALLEL_PRE_COMMIT: - case XACT_EVENT_PRE_COMMIT: - { - bool commit = true; - - if (subXactAbortAttempted) - { - subXactAbortAttempted = false; - - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot ROLLBACK TO SAVEPOINT in transactions " - "which modify distributed tables"))); - } - - ExecuteTransactionEnd(commit); - MarkRemainingInactivePlacements(); - - /* leave early to avoid resetting transaction state */ - return; - } - } - - /* reset transaction state */ - xactParticipantHash = NULL; - xactShardConnSetList = NIL; - subXactAbortAttempted = false; -} - - -/* - * RouterSubtransactionCallback silently keeps track of any attempt to ROLLBACK - * TO SAVEPOINT, which is not permitted by this executor. At transaction end, - * the executor checks whether such a rollback was attempted and, if so, errors - * out entirely (with an appropriate message). - * - * This implementation permits savepoints so long as no rollbacks occur. - */ -static void -RouterSubtransactionCallback(SubXactEvent event, SubTransactionId subId, - SubTransactionId parentSubid, void *arg) -{ - if ((xactParticipantHash != NULL) && (event == SUBXACT_EVENT_ABORT_SUB)) - { - subXactAbortAttempted = true; - } -} - - -/* - * ExecuteTransactionEnd ends any remote transactions still taking place on - * remote nodes. It uses xactParticipantHash to know which nodes need any - * final COMMIT or ABORT commands. Nodes which fail a final COMMIT will have - * their connection field set to NULL to permit placement invalidation. - */ -static void -ExecuteTransactionEnd(bool commit) -{ - const char *sqlCommand = commit ? "COMMIT TRANSACTION" : "ABORT TRANSACTION"; - HASH_SEQ_STATUS scan; - NodeConnectionEntry *participant; - bool completed = !commit; /* aborts are assumed completed */ - - if (xactParticipantHash == NULL) - { - return; - } - - hash_seq_init(&scan, xactParticipantHash); - while ((participant = (NodeConnectionEntry *) hash_seq_search(&scan))) - { - PGconn *connection = participant->connection; - PGresult *result = NULL; - - if (PQstatus(connection) != CONNECTION_OK) - { - continue; - } - - result = PQexec(connection, sqlCommand); - if (PQresultStatus(result) == PGRES_COMMAND_OK) - { - completed = true; - } - else - { - WarnRemoteError(connection, result); - PurgeConnection(participant->connection); - - participant->connection = NULL; - } - - PQclear(result); - } - - if (!completed) - { - ereport(ERROR, (errmsg("could not commit transaction on any active nodes"))); - } -} - - -/* - * MarkRemainingInactivePlacements takes care of marking placements of a shard - * inactive after some of the placements rejected the final COMMIT phase of a - * transaction. This step is skipped if all placements reject the COMMIT, since - * in that case no modifications to the placement have persisted. - * - * Failures are detected by checking the connection field of the entries in the - * connection set for each shard: it is always set to NULL after errors. - */ -static void -MarkRemainingInactivePlacements(void) -{ - ListCell *shardConnSetCell = NULL; - - foreach(shardConnSetCell, xactShardConnSetList) - { - XactShardConnSet *shardConnSet = (XactShardConnSet *) lfirst(shardConnSetCell); - List *participantList = shardConnSet->connectionEntryList; - ListCell *participantCell = NULL; - int successes = list_length(participantList); /* assume full success */ - - /* determine how many actual successes there were: subtract failures */ - foreach(participantCell, participantList) - { - NodeConnectionEntry *participant = NULL; - participant = (NodeConnectionEntry *) lfirst(participantCell); - - /* other codes sets connection to NULL after errors */ - if (participant->connection == NULL) - { - successes--; - } - } - - /* if no nodes succeeded for this shard, don't do anything */ - if (successes == 0) - { - continue; - } - - /* otherwise, ensure failed placements are marked inactive */ - foreach(participantCell, participantList) - { - NodeConnectionEntry *participant = NULL; - participant = (NodeConnectionEntry *) lfirst(participantCell); - - if (participant->connection == NULL) - { - uint64 shardId = shardConnSet->shardId; - NodeConnectionKey *nodeKey = &participant->cacheKey; - uint64 shardLength = 0; - uint64 placementId = INVALID_PLACEMENT_ID; - - placementId = DeleteShardPlacementRow(shardId, nodeKey->nodeName, - nodeKey->nodePort); - InsertShardPlacementRow(shardId, placementId, FILE_INACTIVE, shardLength, - nodeKey->nodeName, nodeKey->nodePort); - } - } - } -} diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 48a634c88..cde09697e 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -160,7 +160,6 @@ _PG_init(void) InitPlacementConnectionManagement(); /* initialize transaction callbacks */ - RegisterRouterExecutorXactCallbacks(); RegisterShardPlacementXactCallbacks(); } diff --git a/src/include/distributed/multi_router_executor.h b/src/include/distributed/multi_router_executor.h index 236222f5c..14c4af8e4 100644 --- a/src/include/distributed/multi_router_executor.h +++ b/src/include/distributed/multi_router_executor.h @@ -17,18 +17,6 @@ #include "nodes/pg_list.h" -/* - * XactShardConnSet keeps track of the mapping from shard to the set of nodes - * involved in multi-statement transaction-wrapped modifications of that shard. - * This information is used to mark placements inactive at transaction close. - */ -typedef struct XactShardConnSet -{ - uint64 shardId; /* identifier of the shard that was modified */ - List *connectionEntryList; /* NodeConnectionEntry pointers to participating nodes */ -} XactShardConnSet; - - /* Config variables managed via guc.c */ extern bool AllModificationsCommutative; @@ -37,6 +25,5 @@ extern void RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task); extern void RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count); extern void RouterExecutorFinish(QueryDesc *queryDesc); extern void RouterExecutorEnd(QueryDesc *queryDesc); -extern void RegisterRouterExecutorXactCallbacks(void); #endif /* MULTI_ROUTER_EXECUTOR_H_ */ diff --git a/src/test/regress/expected/multi_modifying_xacts.out b/src/test/regress/expected/multi_modifying_xacts.out index b8b83a5e9..e3ae86ee0 100644 --- a/src/test/regress/expected/multi_modifying_xacts.out +++ b/src/test/regress/expected/multi_modifying_xacts.out @@ -135,8 +135,7 @@ SELECT * FROM researchers, labs WHERE labs.id = researchers.lab_id; BEGIN; INSERT INTO labs VALUES (6, 'Bell Labs'); INSERT INTO researchers VALUES (9, 6, 'Leslie Lamport'); -ERROR: no transaction participant matches localhost:57638 -DETAIL: Transactions which modify distributed tables may only target nodes affected by the modification command which began the transaction. +ERROR: cannot open new connections after the first modification command within a transaction COMMIT; -- this logic even applies to router SELECTs occurring after a modification: -- selecting from the modified node is fine... @@ -159,34 +158,8 @@ AND sp.nodeport = :worker_1_port AND s.logicalrelid = 'researchers'::regclass; INSERT INTO labs VALUES (6, 'Bell Labs'); SELECT count(*) FROM researchers WHERE lab_id = 6; -ERROR: no transaction participant matches localhost:57638 -DETAIL: Transactions which modify distributed tables may only target nodes affected by the modification command which began the transaction. +ERROR: cannot open new connections after the first modification command within a transaction ABORT; --- applies to DDL, too -BEGIN; -INSERT INTO labs VALUES (6, 'Bell Labs'); -ALTER TABLE labs ADD COLUMN motto text; -ERROR: distributed DDL commands must not appear within transaction blocks containing data modifications -COMMIT; --- whether it occurs first or second -BEGIN; -ALTER TABLE labs ADD COLUMN motto text; -INSERT INTO labs VALUES (6, 'Bell Labs'); -ERROR: distributed data modifications must not appear in transaction blocks which contain distributed DDL commands -COMMIT; --- but the DDL should correctly roll back -\d labs - Table "public.labs" - Column | Type | Modifiers ---------+--------+----------- - id | bigint | not null - name | text | not null - -SELECT * FROM labs WHERE id = 6; - id | name -----+------ -(0 rows) - -- COPY can't happen second, BEGIN; INSERT INTO labs VALUES (6, 'Bell Labs'); @@ -204,6 +177,7 @@ SELECT name FROM labs WHERE id = 10; (1 row) INSERT INTO labs VALUES (6, 'Bell Labs'); +ERROR: cannot open new connections after the first modification command within a transaction COMMIT; -- but a double-copy isn't allowed (the first will persist) BEGIN; @@ -220,7 +194,7 @@ SELECT name FROM labs WHERE id = 11; -- finally, ALTER and copy aren't compatible BEGIN; -ALTER TABLE labs ADD COLUMN motto text; +ALTER TABLE labs ADD COLUMN motto2 text; \copy labs from stdin delimiter ',' ERROR: distributed copy operations must not appear in transaction blocks containing other distributed modifications CONTEXT: COPY labs, line 1: "12,fsociety,lol" @@ -241,7 +215,7 @@ SELECT * FROM labs WHERE id = 12; -- and if the copy is before the ALTER... BEGIN; \copy labs from stdin delimiter ',' -ALTER TABLE labs ADD COLUMN motto text; +ALTER TABLE labs ADD COLUMN motto3 text; ERROR: distributed DDL commands must not appear within transaction blocks containing data modifications COMMIT; -- the DDL fails, but copy persists @@ -512,6 +486,7 @@ INSERT INTO labs VALUES (9, 'BAD'); COMMIT; WARNING: illegal value CONTEXT: while executing command on localhost:57637 +ERROR: could not commit transaction on any active nodes -- data to objects should be persisted, but labs should not... SELECT * FROM objects WHERE id = 1; id | name @@ -536,9 +511,8 @@ ORDER BY s.logicalrelid, sp.shardstate; logicalrelid | shardstate | count --------------+------------+------- labs | 1 | 1 - objects | 1 | 1 - objects | 3 | 1 -(3 rows) + objects | 1 | 2 +(2 rows) -- some append-partitioned tests for good measure CREATE TABLE append_researchers ( LIKE researchers ); diff --git a/src/test/regress/sql/multi_modifying_xacts.sql b/src/test/regress/sql/multi_modifying_xacts.sql index 7c3395705..1cd72b6ae 100644 --- a/src/test/regress/sql/multi_modifying_xacts.sql +++ b/src/test/regress/sql/multi_modifying_xacts.sql @@ -131,22 +131,6 @@ INSERT INTO labs VALUES (6, 'Bell Labs'); SELECT count(*) FROM researchers WHERE lab_id = 6; ABORT; --- applies to DDL, too -BEGIN; -INSERT INTO labs VALUES (6, 'Bell Labs'); -ALTER TABLE labs ADD COLUMN motto text; -COMMIT; - --- whether it occurs first or second -BEGIN; -ALTER TABLE labs ADD COLUMN motto text; -INSERT INTO labs VALUES (6, 'Bell Labs'); -COMMIT; - --- but the DDL should correctly roll back -\d labs -SELECT * FROM labs WHERE id = 6; - -- COPY can't happen second, BEGIN; INSERT INTO labs VALUES (6, 'Bell Labs'); @@ -178,7 +162,7 @@ SELECT name FROM labs WHERE id = 11; -- finally, ALTER and copy aren't compatible BEGIN; -ALTER TABLE labs ADD COLUMN motto text; +ALTER TABLE labs ADD COLUMN motto2 text; \copy labs from stdin delimiter ',' 12,fsociety,lol \. @@ -193,7 +177,7 @@ BEGIN; \copy labs from stdin delimiter ',' 12,fsociety \. -ALTER TABLE labs ADD COLUMN motto text; +ALTER TABLE labs ADD COLUMN motto3 text; COMMIT; -- the DDL fails, but copy persists