mirror of https://github.com/citusdata/citus.git
Fix unique-violation-in-xact segfault
An interaction between ReraiseRemoteError and DML transaction support causes segfaults: * ReraiseRemoteError calls PurgeConnection, freeing a connection... * That connection is still in the xactParticipantHash At transaction end, the memory in the freed connection might happen to pass the "is this connection OK?" check, causing us to try to send an ABORT over that connection. By removing it from the transaction hash before calling ReraiseRemoteError, we avoid this possibility.pull/817/head
parent
b8bb22e5cc
commit
6a7e930b42
|
@ -100,6 +100,7 @@ static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescrip
|
||||||
static PGconn * GetConnectionForPlacement(ShardPlacement *placement,
|
static PGconn * GetConnectionForPlacement(ShardPlacement *placement,
|
||||||
bool isModificationQuery);
|
bool isModificationQuery);
|
||||||
static void PurgeConnectionForPlacement(ShardPlacement *placement);
|
static void PurgeConnectionForPlacement(ShardPlacement *placement);
|
||||||
|
static void RemoveXactConnection(PGconn *connection);
|
||||||
static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo,
|
static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo,
|
||||||
Oid **parameterTypes,
|
Oid **parameterTypes,
|
||||||
const char ***parameterValues);
|
const char ***parameterValues);
|
||||||
|
@ -731,6 +732,12 @@ PurgeConnectionForPlacement(ShardPlacement *placement)
|
||||||
|
|
||||||
PurgeConnectionByKey(&nodeKey);
|
PurgeConnectionByKey(&nodeKey);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* The following is logically identical to RemoveXactConnection, but since
|
||||||
|
* we have a ShardPlacement to help build a NodeConnectionKey, we avoid
|
||||||
|
* any penalty incurred by calling BuildKeyForConnection, which must ex-
|
||||||
|
* tract host, port, and user from the connection options list.
|
||||||
|
*/
|
||||||
if (xactParticipantHash != NULL)
|
if (xactParticipantHash != NULL)
|
||||||
{
|
{
|
||||||
NodeConnectionEntry *participantEntry = NULL;
|
NodeConnectionEntry *participantEntry = NULL;
|
||||||
|
@ -750,6 +757,39 @@ PurgeConnectionForPlacement(ShardPlacement *placement)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Removes a given connection from the transaction participant hash, based on
|
||||||
|
* the host and port of the provided connection. If the hash is not NULL, it
|
||||||
|
* MUST contain the provided connection, or a FATAL error is raised.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
RemoveXactConnection(PGconn *connection)
|
||||||
|
{
|
||||||
|
NodeConnectionKey nodeKey;
|
||||||
|
NodeConnectionEntry *participantEntry = NULL;
|
||||||
|
bool entryFound = false;
|
||||||
|
|
||||||
|
if (xactParticipantHash == NULL)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
BuildKeyForConnection(connection, &nodeKey);
|
||||||
|
|
||||||
|
/* the participant hash doesn't use the user field */
|
||||||
|
MemSet(&nodeKey.nodeUser, 0, sizeof(nodeKey.nodeUser));
|
||||||
|
participantEntry = hash_search(xactParticipantHash, &nodeKey, HASH_FIND,
|
||||||
|
&entryFound);
|
||||||
|
|
||||||
|
if (!entryFound)
|
||||||
|
{
|
||||||
|
ereport(FATAL, (errmsg("could not find specified transaction connection")));
|
||||||
|
}
|
||||||
|
|
||||||
|
participantEntry->connection = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* SendQueryInSingleRowMode sends the given query on the connection in an
|
* SendQueryInSingleRowMode sends the given query on the connection in an
|
||||||
* asynchronous way. The function also sets the single-row mode on the
|
* asynchronous way. The function also sets the single-row mode on the
|
||||||
|
@ -918,6 +958,7 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection,
|
||||||
|
|
||||||
if (raiseError)
|
if (raiseError)
|
||||||
{
|
{
|
||||||
|
RemoveXactConnection(connection);
|
||||||
ReraiseRemoteError(connection, result);
|
ReraiseRemoteError(connection, result);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -1029,6 +1070,7 @@ ConsumeQueryResult(PGconn *connection, int64 *rows)
|
||||||
|
|
||||||
if (raiseError)
|
if (raiseError)
|
||||||
{
|
{
|
||||||
|
RemoveXactConnection(connection);
|
||||||
ReraiseRemoteError(connection, result);
|
ReraiseRemoteError(connection, result);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
|
|
@ -125,10 +125,34 @@ void
|
||||||
PurgeConnection(PGconn *connection)
|
PurgeConnection(PGconn *connection)
|
||||||
{
|
{
|
||||||
NodeConnectionKey nodeConnectionKey;
|
NodeConnectionKey nodeConnectionKey;
|
||||||
|
PGconn *purgedConnection = NULL;
|
||||||
|
|
||||||
|
BuildKeyForConnection(connection, &nodeConnectionKey);
|
||||||
|
purgedConnection = PurgeConnectionByKey(&nodeConnectionKey);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* It's possible the provided connection matches the host and port for
|
||||||
|
* an entry in the hash without being precisely the same connection. In
|
||||||
|
* that case, we will want to close the provided connection in addition
|
||||||
|
* to the one from the hash (which was closed by PurgeConnectionByKey).
|
||||||
|
*/
|
||||||
|
if (purgedConnection != connection)
|
||||||
|
{
|
||||||
|
PQfinish(connection);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Utility method to simplify populating a connection cache key with relevant
|
||||||
|
* fields from a provided connection.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
BuildKeyForConnection(PGconn *connection, NodeConnectionKey *connectionKey)
|
||||||
|
{
|
||||||
char *nodeNameString = NULL;
|
char *nodeNameString = NULL;
|
||||||
char *nodePortString = NULL;
|
char *nodePortString = NULL;
|
||||||
char *nodeUserString = NULL;
|
char *nodeUserString = NULL;
|
||||||
PGconn *purgedConnection = NULL;
|
|
||||||
|
|
||||||
nodeNameString = ConnectionGetOptionValue(connection, "host");
|
nodeNameString = ConnectionGetOptionValue(connection, "host");
|
||||||
if (nodeNameString == NULL)
|
if (nodeNameString == NULL)
|
||||||
|
@ -151,27 +175,14 @@ PurgeConnection(PGconn *connection)
|
||||||
errmsg("connection is missing user option")));
|
errmsg("connection is missing user option")));
|
||||||
}
|
}
|
||||||
|
|
||||||
memset(&nodeConnectionKey, 0, sizeof(nodeConnectionKey));
|
MemSet(connectionKey, 0, sizeof(NodeConnectionKey));
|
||||||
strlcpy(nodeConnectionKey.nodeName, nodeNameString, MAX_NODE_LENGTH + 1);
|
strlcpy(connectionKey->nodeName, nodeNameString, MAX_NODE_LENGTH + 1);
|
||||||
nodeConnectionKey.nodePort = pg_atoi(nodePortString, sizeof(int32), 0);
|
connectionKey->nodePort = pg_atoi(nodePortString, sizeof(int32), 0);
|
||||||
strlcpy(nodeConnectionKey.nodeUser, nodeUserString, NAMEDATALEN);
|
strlcpy(connectionKey->nodeUser, nodeUserString, NAMEDATALEN);
|
||||||
|
|
||||||
pfree(nodeNameString);
|
pfree(nodeNameString);
|
||||||
pfree(nodePortString);
|
pfree(nodePortString);
|
||||||
pfree(nodeUserString);
|
pfree(nodeUserString);
|
||||||
|
|
||||||
purgedConnection = PurgeConnectionByKey(&nodeConnectionKey);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* It's possible the provided connection matches the host and port for
|
|
||||||
* an entry in the hash without being precisely the same connection. In
|
|
||||||
* that case, we will want to close the provided connection in addition
|
|
||||||
* to the one from the hash (which was closed by PurgeConnectionByKey).
|
|
||||||
*/
|
|
||||||
if (purgedConnection != connection)
|
|
||||||
{
|
|
||||||
PQfinish(connection);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -60,6 +60,7 @@ extern bool IsModifyingTransaction;
|
||||||
/* function declarations for obtaining and using a connection */
|
/* function declarations for obtaining and using a connection */
|
||||||
extern PGconn * GetOrEstablishConnection(char *nodeName, int32 nodePort);
|
extern PGconn * GetOrEstablishConnection(char *nodeName, int32 nodePort);
|
||||||
extern void PurgeConnection(PGconn *connection);
|
extern void PurgeConnection(PGconn *connection);
|
||||||
|
extern void BuildKeyForConnection(PGconn *connection, NodeConnectionKey *connectionKey);
|
||||||
extern PGconn * PurgeConnectionByKey(NodeConnectionKey *nodeConnectionKey);
|
extern PGconn * PurgeConnectionByKey(NodeConnectionKey *nodeConnectionKey);
|
||||||
extern bool SqlStateMatchesCategory(char *sqlStateString, int category);
|
extern bool SqlStateMatchesCategory(char *sqlStateString, int category);
|
||||||
extern void WarnRemoteError(PGconn *connection, PGresult *result);
|
extern void WarnRemoteError(PGconn *connection, PGresult *result);
|
||||||
|
|
|
@ -36,6 +36,10 @@ SELECT master_create_worker_shards('labs', 1, 1);
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
-- might be confusing to have two people in the same lab with the same name
|
||||||
|
CREATE UNIQUE INDEX avoid_name_confusion_idx ON researchers (lab_id, name);
|
||||||
|
NOTICE: using one-phase commit for distributed DDL commands
|
||||||
|
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
|
||||||
-- add some data
|
-- add some data
|
||||||
INSERT INTO researchers VALUES (1, 1, 'Donald Knuth');
|
INSERT INTO researchers VALUES (1, 1, 'Donald Knuth');
|
||||||
INSERT INTO researchers VALUES (2, 1, 'Niklaus Wirth');
|
INSERT INTO researchers VALUES (2, 1, 'Niklaus Wirth');
|
||||||
|
@ -62,6 +66,13 @@ SELECT name FROM researchers WHERE lab_id = 1 AND id = 1;
|
||||||
Donald Knuth
|
Donald Knuth
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
-- trigger a unique constraint violation
|
||||||
|
BEGIN;
|
||||||
|
UPDATE researchers SET name = 'John Backus' WHERE id = 1 AND lab_id = 1;
|
||||||
|
ERROR: duplicate key value violates unique constraint "avoid_name_confusion_idx_1200000"
|
||||||
|
DETAIL: Key (lab_id, name)=(1, John Backus) already exists.
|
||||||
|
CONTEXT: while executing command on localhost:57637
|
||||||
|
ABORT;
|
||||||
-- creating savepoints should work...
|
-- creating savepoints should work...
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO researchers VALUES (5, 3, 'Dennis Ritchie');
|
INSERT INTO researchers VALUES (5, 3, 'Dennis Ritchie');
|
||||||
|
|
|
@ -24,6 +24,9 @@ SELECT master_create_worker_shards('researchers', 2, 2);
|
||||||
SELECT master_create_distributed_table('labs', 'id', 'hash');
|
SELECT master_create_distributed_table('labs', 'id', 'hash');
|
||||||
SELECT master_create_worker_shards('labs', 1, 1);
|
SELECT master_create_worker_shards('labs', 1, 1);
|
||||||
|
|
||||||
|
-- might be confusing to have two people in the same lab with the same name
|
||||||
|
CREATE UNIQUE INDEX avoid_name_confusion_idx ON researchers (lab_id, name);
|
||||||
|
|
||||||
-- add some data
|
-- add some data
|
||||||
INSERT INTO researchers VALUES (1, 1, 'Donald Knuth');
|
INSERT INTO researchers VALUES (1, 1, 'Donald Knuth');
|
||||||
INSERT INTO researchers VALUES (2, 1, 'Niklaus Wirth');
|
INSERT INTO researchers VALUES (2, 1, 'Niklaus Wirth');
|
||||||
|
@ -45,6 +48,11 @@ ABORT;
|
||||||
|
|
||||||
SELECT name FROM researchers WHERE lab_id = 1 AND id = 1;
|
SELECT name FROM researchers WHERE lab_id = 1 AND id = 1;
|
||||||
|
|
||||||
|
-- trigger a unique constraint violation
|
||||||
|
BEGIN;
|
||||||
|
UPDATE researchers SET name = 'John Backus' WHERE id = 1 AND lab_id = 1;
|
||||||
|
ABORT;
|
||||||
|
|
||||||
-- creating savepoints should work...
|
-- creating savepoints should work...
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO researchers VALUES (5, 3, 'Dennis Ritchie');
|
INSERT INTO researchers VALUES (5, 3, 'Dennis Ritchie');
|
||||||
|
|
Loading…
Reference in New Issue