diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 7dbc8f0f1..cdbe46309 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -100,6 +100,7 @@ static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescrip static PGconn * GetConnectionForPlacement(ShardPlacement *placement, bool isModificationQuery); static void PurgeConnectionForPlacement(ShardPlacement *placement); +static void RemoveXactConnection(PGconn *connection); static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, Oid **parameterTypes, const char ***parameterValues); @@ -731,6 +732,12 @@ PurgeConnectionForPlacement(ShardPlacement *placement) PurgeConnectionByKey(&nodeKey); + /* + * The following is logically identical to RemoveXactConnection, but since + * we have a ShardPlacement to help build a NodeConnectionKey, we avoid + * any penalty incurred by calling BuildKeyForConnection, which must ex- + * tract host, port, and user from the connection options list. + */ if (xactParticipantHash != NULL) { NodeConnectionEntry *participantEntry = NULL; @@ -750,6 +757,39 @@ PurgeConnectionForPlacement(ShardPlacement *placement) } +/* + * Removes a given connection from the transaction participant hash, based on + * the host and port of the provided connection. If the hash is not NULL, it + * MUST contain the provided connection, or a FATAL error is raised. + */ +static void +RemoveXactConnection(PGconn *connection) +{ + NodeConnectionKey nodeKey; + NodeConnectionEntry *participantEntry = NULL; + bool entryFound = false; + + if (xactParticipantHash == NULL) + { + return; + } + + BuildKeyForConnection(connection, &nodeKey); + + /* the participant hash doesn't use the user field */ + MemSet(&nodeKey.nodeUser, 0, sizeof(nodeKey.nodeUser)); + participantEntry = hash_search(xactParticipantHash, &nodeKey, HASH_FIND, + &entryFound); + + if (!entryFound) + { + ereport(FATAL, (errmsg("could not find specified transaction connection"))); + } + + participantEntry->connection = NULL; +} + + /* * SendQueryInSingleRowMode sends the given query on the connection in an * asynchronous way. The function also sets the single-row mode on the @@ -918,6 +958,7 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection, if (raiseError) { + RemoveXactConnection(connection); ReraiseRemoteError(connection, result); } else @@ -1029,6 +1070,7 @@ ConsumeQueryResult(PGconn *connection, int64 *rows) if (raiseError) { + RemoveXactConnection(connection); ReraiseRemoteError(connection, result); } else diff --git a/src/backend/distributed/utils/connection_cache.c b/src/backend/distributed/utils/connection_cache.c index 766a5ade2..d9a036338 100644 --- a/src/backend/distributed/utils/connection_cache.c +++ b/src/backend/distributed/utils/connection_cache.c @@ -125,10 +125,34 @@ void PurgeConnection(PGconn *connection) { NodeConnectionKey nodeConnectionKey; + PGconn *purgedConnection = NULL; + + BuildKeyForConnection(connection, &nodeConnectionKey); + purgedConnection = PurgeConnectionByKey(&nodeConnectionKey); + + /* + * It's possible the provided connection matches the host and port for + * an entry in the hash without being precisely the same connection. In + * that case, we will want to close the provided connection in addition + * to the one from the hash (which was closed by PurgeConnectionByKey). + */ + if (purgedConnection != connection) + { + PQfinish(connection); + } +} + + +/* + * Utility method to simplify populating a connection cache key with relevant + * fields from a provided connection. + */ +void +BuildKeyForConnection(PGconn *connection, NodeConnectionKey *connectionKey) +{ char *nodeNameString = NULL; char *nodePortString = NULL; char *nodeUserString = NULL; - PGconn *purgedConnection = NULL; nodeNameString = ConnectionGetOptionValue(connection, "host"); if (nodeNameString == NULL) @@ -151,27 +175,14 @@ PurgeConnection(PGconn *connection) errmsg("connection is missing user option"))); } - memset(&nodeConnectionKey, 0, sizeof(nodeConnectionKey)); - strlcpy(nodeConnectionKey.nodeName, nodeNameString, MAX_NODE_LENGTH + 1); - nodeConnectionKey.nodePort = pg_atoi(nodePortString, sizeof(int32), 0); - strlcpy(nodeConnectionKey.nodeUser, nodeUserString, NAMEDATALEN); + MemSet(connectionKey, 0, sizeof(NodeConnectionKey)); + strlcpy(connectionKey->nodeName, nodeNameString, MAX_NODE_LENGTH + 1); + connectionKey->nodePort = pg_atoi(nodePortString, sizeof(int32), 0); + strlcpy(connectionKey->nodeUser, nodeUserString, NAMEDATALEN); pfree(nodeNameString); pfree(nodePortString); pfree(nodeUserString); - - purgedConnection = PurgeConnectionByKey(&nodeConnectionKey); - - /* - * It's possible the provided connection matches the host and port for - * an entry in the hash without being precisely the same connection. In - * that case, we will want to close the provided connection in addition - * to the one from the hash (which was closed by PurgeConnectionByKey). - */ - if (purgedConnection != connection) - { - PQfinish(connection); - } } diff --git a/src/include/distributed/connection_cache.h b/src/include/distributed/connection_cache.h index f335e0735..02df857ea 100644 --- a/src/include/distributed/connection_cache.h +++ b/src/include/distributed/connection_cache.h @@ -60,6 +60,7 @@ extern bool IsModifyingTransaction; /* function declarations for obtaining and using a connection */ extern PGconn * GetOrEstablishConnection(char *nodeName, int32 nodePort); extern void PurgeConnection(PGconn *connection); +extern void BuildKeyForConnection(PGconn *connection, NodeConnectionKey *connectionKey); extern PGconn * PurgeConnectionByKey(NodeConnectionKey *nodeConnectionKey); extern bool SqlStateMatchesCategory(char *sqlStateString, int category); extern void WarnRemoteError(PGconn *connection, PGresult *result); diff --git a/src/test/regress/expected/multi_modifying_xacts.out b/src/test/regress/expected/multi_modifying_xacts.out index e2b736f2f..247452f8d 100644 --- a/src/test/regress/expected/multi_modifying_xacts.out +++ b/src/test/regress/expected/multi_modifying_xacts.out @@ -36,6 +36,10 @@ SELECT master_create_worker_shards('labs', 1, 1); (1 row) +-- might be confusing to have two people in the same lab with the same name +CREATE UNIQUE INDEX avoid_name_confusion_idx ON researchers (lab_id, name); +NOTICE: using one-phase commit for distributed DDL commands +HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' -- add some data INSERT INTO researchers VALUES (1, 1, 'Donald Knuth'); INSERT INTO researchers VALUES (2, 1, 'Niklaus Wirth'); @@ -62,6 +66,13 @@ SELECT name FROM researchers WHERE lab_id = 1 AND id = 1; Donald Knuth (1 row) +-- trigger a unique constraint violation +BEGIN; +UPDATE researchers SET name = 'John Backus' WHERE id = 1 AND lab_id = 1; +ERROR: duplicate key value violates unique constraint "avoid_name_confusion_idx_1200000" +DETAIL: Key (lab_id, name)=(1, John Backus) already exists. +CONTEXT: while executing command on localhost:57637 +ABORT; -- creating savepoints should work... BEGIN; INSERT INTO researchers VALUES (5, 3, 'Dennis Ritchie'); diff --git a/src/test/regress/sql/multi_modifying_xacts.sql b/src/test/regress/sql/multi_modifying_xacts.sql index c875e4c15..e273632b6 100644 --- a/src/test/regress/sql/multi_modifying_xacts.sql +++ b/src/test/regress/sql/multi_modifying_xacts.sql @@ -24,6 +24,9 @@ SELECT master_create_worker_shards('researchers', 2, 2); SELECT master_create_distributed_table('labs', 'id', 'hash'); SELECT master_create_worker_shards('labs', 1, 1); +-- might be confusing to have two people in the same lab with the same name +CREATE UNIQUE INDEX avoid_name_confusion_idx ON researchers (lab_id, name); + -- add some data INSERT INTO researchers VALUES (1, 1, 'Donald Knuth'); INSERT INTO researchers VALUES (2, 1, 'Niklaus Wirth'); @@ -45,6 +48,11 @@ ABORT; SELECT name FROM researchers WHERE lab_id = 1 AND id = 1; +-- trigger a unique constraint violation +BEGIN; +UPDATE researchers SET name = 'John Backus' WHERE id = 1 AND lab_id = 1; +ABORT; + -- creating savepoints should work... BEGIN; INSERT INTO researchers VALUES (5, 3, 'Dennis Ritchie');