From 0caf0d95f13793805ae2b7ef902a1a13ef923d53 Mon Sep 17 00:00:00 2001 From: Jason Petersen Date: Tue, 27 Sep 2016 15:16:50 -0600 Subject: [PATCH] Fix unique-violation-in-xact segfault An interaction between ReraiseRemoteError and DML transaction support causes segfaults: * ReraiseRemoteError calls PurgeConnection, freeing a connection... * That connection is still in the xactParticipantHash At transaction end, the memory in the freed connection might happen to pass the "is this connection OK?" check, causing us to try to send an ABORT over that connection. By removing it from the transaction hash before calling ReraiseRemoteError, we avoid this possibility. --- .../executor/multi_router_executor.c | 42 +++++++++++++++++ .../distributed/utils/connection_cache.c | 47 ++++++++++++------- src/include/distributed/connection_cache.h | 1 + .../expected/multi_modifying_xacts.out | 13 ++++- .../regress/sql/multi_modifying_xacts.sql | 8 ++++ 5 files changed, 91 insertions(+), 20 deletions(-) diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index b80de46d6..9dbd053e0 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -100,6 +100,7 @@ static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescrip static PGconn * GetConnectionForPlacement(ShardPlacement *placement, bool isModificationQuery); static void PurgeConnectionForPlacement(ShardPlacement *placement); +static void RemoveXactConnection(PGconn *connection); static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, Oid **parameterTypes, const char ***parameterValues); @@ -739,6 +740,12 @@ PurgeConnectionForPlacement(ShardPlacement *placement) PurgeConnectionByKey(&nodeKey); + /* + * The following is logically identical to RemoveXactConnection, but since + * we have a ShardPlacement to help build a NodeConnectionKey, we avoid + * any penalty incurred by calling BuildKeyForConnection, which must ex- + * tract host, port, and user from the connection options list. + */ if (xactParticipantHash != NULL) { NodeConnectionEntry *participantEntry = NULL; @@ -758,6 +765,39 @@ PurgeConnectionForPlacement(ShardPlacement *placement) } +/* + * Removes a given connection from the transaction participant hash, based on + * the host and port of the provided connection. If the hash is not NULL, it + * MUST contain the provided connection, or a FATAL error is raised. + */ +static void +RemoveXactConnection(PGconn *connection) +{ + NodeConnectionKey nodeKey; + NodeConnectionEntry *participantEntry = NULL; + bool entryFound = false; + + if (xactParticipantHash == NULL) + { + return; + } + + BuildKeyForConnection(connection, &nodeKey); + + /* the participant hash doesn't use the user field */ + MemSet(&nodeKey.nodeUser, 0, sizeof(nodeKey.nodeUser)); + participantEntry = hash_search(xactParticipantHash, &nodeKey, HASH_FIND, + &entryFound); + + if (!entryFound) + { + ereport(FATAL, (errmsg("could not find specified transaction connection"))); + } + + participantEntry->connection = NULL; +} + + /* * SendQueryInSingleRowMode sends the given query on the connection in an * asynchronous way. The function also sets the single-row mode on the @@ -926,6 +966,7 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection, if (raiseError) { + RemoveXactConnection(connection); ReraiseRemoteError(connection, result); } else @@ -1037,6 +1078,7 @@ ConsumeQueryResult(PGconn *connection, int64 *rows) if (raiseError) { + RemoveXactConnection(connection); ReraiseRemoteError(connection, result); } else diff --git a/src/backend/distributed/utils/connection_cache.c b/src/backend/distributed/utils/connection_cache.c index 191a25c4b..8f852fdd4 100644 --- a/src/backend/distributed/utils/connection_cache.c +++ b/src/backend/distributed/utils/connection_cache.c @@ -125,10 +125,34 @@ void PurgeConnection(PGconn *connection) { NodeConnectionKey nodeConnectionKey; + PGconn *purgedConnection = NULL; + + BuildKeyForConnection(connection, &nodeConnectionKey); + purgedConnection = PurgeConnectionByKey(&nodeConnectionKey); + + /* + * It's possible the provided connection matches the host and port for + * an entry in the hash without being precisely the same connection. In + * that case, we will want to close the provided connection in addition + * to the one from the hash (which was closed by PurgeConnectionByKey). + */ + if (purgedConnection != connection) + { + PQfinish(connection); + } +} + + +/* + * Utility method to simplify populating a connection cache key with relevant + * fields from a provided connection. + */ +void +BuildKeyForConnection(PGconn *connection, NodeConnectionKey *connectionKey) +{ char *nodeNameString = NULL; char *nodePortString = NULL; char *nodeUserString = NULL; - PGconn *purgedConnection = NULL; nodeNameString = ConnectionGetOptionValue(connection, "host"); if (nodeNameString == NULL) @@ -151,27 +175,14 @@ PurgeConnection(PGconn *connection) errmsg("connection is missing user option"))); } - memset(&nodeConnectionKey, 0, sizeof(nodeConnectionKey)); - strlcpy(nodeConnectionKey.nodeName, nodeNameString, MAX_NODE_LENGTH + 1); - nodeConnectionKey.nodePort = pg_atoi(nodePortString, sizeof(int32), 0); - strlcpy(nodeConnectionKey.nodeUser, nodeUserString, NAMEDATALEN); + MemSet(connectionKey, 0, sizeof(NodeConnectionKey)); + strlcpy(connectionKey->nodeName, nodeNameString, MAX_NODE_LENGTH + 1); + connectionKey->nodePort = pg_atoi(nodePortString, sizeof(int32), 0); + strlcpy(connectionKey->nodeUser, nodeUserString, NAMEDATALEN); pfree(nodeNameString); pfree(nodePortString); pfree(nodeUserString); - - purgedConnection = PurgeConnectionByKey(&nodeConnectionKey); - - /* - * It's possible the provided connection matches the host and port for - * an entry in the hash without being precisely the same connection. In - * that case, we will want to close the provided connection in addition - * to the one from the hash (which was closed by PurgeConnectionByKey). - */ - if (purgedConnection != connection) - { - PQfinish(connection); - } } diff --git a/src/include/distributed/connection_cache.h b/src/include/distributed/connection_cache.h index 0e2c0af76..54fb97a22 100644 --- a/src/include/distributed/connection_cache.h +++ b/src/include/distributed/connection_cache.h @@ -69,6 +69,7 @@ extern XactModificationType XactModificationLevel; /* function declarations for obtaining and using a connection */ extern PGconn * GetOrEstablishConnection(char *nodeName, int32 nodePort); extern void PurgeConnection(PGconn *connection); +extern void BuildKeyForConnection(PGconn *connection, NodeConnectionKey *connectionKey); extern PGconn * PurgeConnectionByKey(NodeConnectionKey *nodeConnectionKey); extern bool SqlStateMatchesCategory(char *sqlStateString, int category); extern void WarnRemoteError(PGconn *connection, PGresult *result); diff --git a/src/test/regress/expected/multi_modifying_xacts.out b/src/test/regress/expected/multi_modifying_xacts.out index 0bef4ff32..b8b83a5e9 100644 --- a/src/test/regress/expected/multi_modifying_xacts.out +++ b/src/test/regress/expected/multi_modifying_xacts.out @@ -36,6 +36,10 @@ SELECT master_create_worker_shards('labs', 1, 1); (1 row) +-- might be confusing to have two people in the same lab with the same name +CREATE UNIQUE INDEX avoid_name_confusion_idx ON researchers (lab_id, name); +NOTICE: using one-phase commit for distributed DDL commands +HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' -- add some data INSERT INTO researchers VALUES (1, 1, 'Donald Knuth'); INSERT INTO researchers VALUES (2, 1, 'Niklaus Wirth'); @@ -62,6 +66,13 @@ SELECT name FROM researchers WHERE lab_id = 1 AND id = 1; Donald Knuth (1 row) +-- trigger a unique constraint violation +BEGIN; +UPDATE researchers SET name = 'John Backus' WHERE id = 1 AND lab_id = 1; +ERROR: duplicate key value violates unique constraint "avoid_name_confusion_idx_1200000" +DETAIL: Key (lab_id, name)=(1, John Backus) already exists. +CONTEXT: while executing command on localhost:57637 +ABORT; -- creating savepoints should work... BEGIN; INSERT INTO researchers VALUES (5, 3, 'Dennis Ritchie'); @@ -160,8 +171,6 @@ COMMIT; -- whether it occurs first or second BEGIN; ALTER TABLE labs ADD COLUMN motto text; -NOTICE: using one-phase commit for distributed DDL commands -HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' INSERT INTO labs VALUES (6, 'Bell Labs'); ERROR: distributed data modifications must not appear in transaction blocks which contain distributed DDL commands COMMIT; diff --git a/src/test/regress/sql/multi_modifying_xacts.sql b/src/test/regress/sql/multi_modifying_xacts.sql index 109d371c2..7c3395705 100644 --- a/src/test/regress/sql/multi_modifying_xacts.sql +++ b/src/test/regress/sql/multi_modifying_xacts.sql @@ -24,6 +24,9 @@ SELECT master_create_worker_shards('researchers', 2, 2); SELECT master_create_distributed_table('labs', 'id', 'hash'); SELECT master_create_worker_shards('labs', 1, 1); +-- might be confusing to have two people in the same lab with the same name +CREATE UNIQUE INDEX avoid_name_confusion_idx ON researchers (lab_id, name); + -- add some data INSERT INTO researchers VALUES (1, 1, 'Donald Knuth'); INSERT INTO researchers VALUES (2, 1, 'Niklaus Wirth'); @@ -45,6 +48,11 @@ ABORT; SELECT name FROM researchers WHERE lab_id = 1 AND id = 1; +-- trigger a unique constraint violation +BEGIN; +UPDATE researchers SET name = 'John Backus' WHERE id = 1 AND lab_id = 1; +ABORT; + -- creating savepoints should work... BEGIN; INSERT INTO researchers VALUES (5, 3, 'Dennis Ritchie');