mirror of https://github.com/citusdata/citus.git
Merge pull request #812 from citusdata/fix_uniq_constraint_segfault
Fix unique-violation-in-xact segfault cr: @anarazelpull/806/head
commit
e8a942485c
|
@ -100,6 +100,7 @@ static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescrip
|
||||||
static PGconn * GetConnectionForPlacement(ShardPlacement *placement,
|
static PGconn * GetConnectionForPlacement(ShardPlacement *placement,
|
||||||
bool isModificationQuery);
|
bool isModificationQuery);
|
||||||
static void PurgeConnectionForPlacement(ShardPlacement *placement);
|
static void PurgeConnectionForPlacement(ShardPlacement *placement);
|
||||||
|
static void RemoveXactConnection(PGconn *connection);
|
||||||
static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo,
|
static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo,
|
||||||
Oid **parameterTypes,
|
Oid **parameterTypes,
|
||||||
const char ***parameterValues);
|
const char ***parameterValues);
|
||||||
|
@ -739,6 +740,12 @@ PurgeConnectionForPlacement(ShardPlacement *placement)
|
||||||
|
|
||||||
PurgeConnectionByKey(&nodeKey);
|
PurgeConnectionByKey(&nodeKey);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* The following is logically identical to RemoveXactConnection, but since
|
||||||
|
* we have a ShardPlacement to help build a NodeConnectionKey, we avoid
|
||||||
|
* any penalty incurred by calling BuildKeyForConnection, which must ex-
|
||||||
|
* tract host, port, and user from the connection options list.
|
||||||
|
*/
|
||||||
if (xactParticipantHash != NULL)
|
if (xactParticipantHash != NULL)
|
||||||
{
|
{
|
||||||
NodeConnectionEntry *participantEntry = NULL;
|
NodeConnectionEntry *participantEntry = NULL;
|
||||||
|
@ -758,6 +765,39 @@ PurgeConnectionForPlacement(ShardPlacement *placement)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Removes a given connection from the transaction participant hash, based on
|
||||||
|
* the host and port of the provided connection. If the hash is not NULL, it
|
||||||
|
* MUST contain the provided connection, or a FATAL error is raised.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
RemoveXactConnection(PGconn *connection)
|
||||||
|
{
|
||||||
|
NodeConnectionKey nodeKey;
|
||||||
|
NodeConnectionEntry *participantEntry = NULL;
|
||||||
|
bool entryFound = false;
|
||||||
|
|
||||||
|
if (xactParticipantHash == NULL)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
BuildKeyForConnection(connection, &nodeKey);
|
||||||
|
|
||||||
|
/* the participant hash doesn't use the user field */
|
||||||
|
MemSet(&nodeKey.nodeUser, 0, sizeof(nodeKey.nodeUser));
|
||||||
|
participantEntry = hash_search(xactParticipantHash, &nodeKey, HASH_FIND,
|
||||||
|
&entryFound);
|
||||||
|
|
||||||
|
if (!entryFound)
|
||||||
|
{
|
||||||
|
ereport(FATAL, (errmsg("could not find specified transaction connection")));
|
||||||
|
}
|
||||||
|
|
||||||
|
participantEntry->connection = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* SendQueryInSingleRowMode sends the given query on the connection in an
|
* SendQueryInSingleRowMode sends the given query on the connection in an
|
||||||
* asynchronous way. The function also sets the single-row mode on the
|
* asynchronous way. The function also sets the single-row mode on the
|
||||||
|
@ -926,6 +966,7 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection,
|
||||||
|
|
||||||
if (raiseError)
|
if (raiseError)
|
||||||
{
|
{
|
||||||
|
RemoveXactConnection(connection);
|
||||||
ReraiseRemoteError(connection, result);
|
ReraiseRemoteError(connection, result);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -1037,6 +1078,7 @@ ConsumeQueryResult(PGconn *connection, int64 *rows)
|
||||||
|
|
||||||
if (raiseError)
|
if (raiseError)
|
||||||
{
|
{
|
||||||
|
RemoveXactConnection(connection);
|
||||||
ReraiseRemoteError(connection, result);
|
ReraiseRemoteError(connection, result);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
|
|
@ -125,10 +125,34 @@ void
|
||||||
PurgeConnection(PGconn *connection)
|
PurgeConnection(PGconn *connection)
|
||||||
{
|
{
|
||||||
NodeConnectionKey nodeConnectionKey;
|
NodeConnectionKey nodeConnectionKey;
|
||||||
|
PGconn *purgedConnection = NULL;
|
||||||
|
|
||||||
|
BuildKeyForConnection(connection, &nodeConnectionKey);
|
||||||
|
purgedConnection = PurgeConnectionByKey(&nodeConnectionKey);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* It's possible the provided connection matches the host and port for
|
||||||
|
* an entry in the hash without being precisely the same connection. In
|
||||||
|
* that case, we will want to close the provided connection in addition
|
||||||
|
* to the one from the hash (which was closed by PurgeConnectionByKey).
|
||||||
|
*/
|
||||||
|
if (purgedConnection != connection)
|
||||||
|
{
|
||||||
|
PQfinish(connection);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Utility method to simplify populating a connection cache key with relevant
|
||||||
|
* fields from a provided connection.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
BuildKeyForConnection(PGconn *connection, NodeConnectionKey *connectionKey)
|
||||||
|
{
|
||||||
char *nodeNameString = NULL;
|
char *nodeNameString = NULL;
|
||||||
char *nodePortString = NULL;
|
char *nodePortString = NULL;
|
||||||
char *nodeUserString = NULL;
|
char *nodeUserString = NULL;
|
||||||
PGconn *purgedConnection = NULL;
|
|
||||||
|
|
||||||
nodeNameString = ConnectionGetOptionValue(connection, "host");
|
nodeNameString = ConnectionGetOptionValue(connection, "host");
|
||||||
if (nodeNameString == NULL)
|
if (nodeNameString == NULL)
|
||||||
|
@ -151,27 +175,14 @@ PurgeConnection(PGconn *connection)
|
||||||
errmsg("connection is missing user option")));
|
errmsg("connection is missing user option")));
|
||||||
}
|
}
|
||||||
|
|
||||||
memset(&nodeConnectionKey, 0, sizeof(nodeConnectionKey));
|
MemSet(connectionKey, 0, sizeof(NodeConnectionKey));
|
||||||
strlcpy(nodeConnectionKey.nodeName, nodeNameString, MAX_NODE_LENGTH + 1);
|
strlcpy(connectionKey->nodeName, nodeNameString, MAX_NODE_LENGTH + 1);
|
||||||
nodeConnectionKey.nodePort = pg_atoi(nodePortString, sizeof(int32), 0);
|
connectionKey->nodePort = pg_atoi(nodePortString, sizeof(int32), 0);
|
||||||
strlcpy(nodeConnectionKey.nodeUser, nodeUserString, NAMEDATALEN);
|
strlcpy(connectionKey->nodeUser, nodeUserString, NAMEDATALEN);
|
||||||
|
|
||||||
pfree(nodeNameString);
|
pfree(nodeNameString);
|
||||||
pfree(nodePortString);
|
pfree(nodePortString);
|
||||||
pfree(nodeUserString);
|
pfree(nodeUserString);
|
||||||
|
|
||||||
purgedConnection = PurgeConnectionByKey(&nodeConnectionKey);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* It's possible the provided connection matches the host and port for
|
|
||||||
* an entry in the hash without being precisely the same connection. In
|
|
||||||
* that case, we will want to close the provided connection in addition
|
|
||||||
* to the one from the hash (which was closed by PurgeConnectionByKey).
|
|
||||||
*/
|
|
||||||
if (purgedConnection != connection)
|
|
||||||
{
|
|
||||||
PQfinish(connection);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -69,6 +69,7 @@ extern XactModificationType XactModificationLevel;
|
||||||
/* function declarations for obtaining and using a connection */
|
/* function declarations for obtaining and using a connection */
|
||||||
extern PGconn * GetOrEstablishConnection(char *nodeName, int32 nodePort);
|
extern PGconn * GetOrEstablishConnection(char *nodeName, int32 nodePort);
|
||||||
extern void PurgeConnection(PGconn *connection);
|
extern void PurgeConnection(PGconn *connection);
|
||||||
|
extern void BuildKeyForConnection(PGconn *connection, NodeConnectionKey *connectionKey);
|
||||||
extern PGconn * PurgeConnectionByKey(NodeConnectionKey *nodeConnectionKey);
|
extern PGconn * PurgeConnectionByKey(NodeConnectionKey *nodeConnectionKey);
|
||||||
extern bool SqlStateMatchesCategory(char *sqlStateString, int category);
|
extern bool SqlStateMatchesCategory(char *sqlStateString, int category);
|
||||||
extern void WarnRemoteError(PGconn *connection, PGresult *result);
|
extern void WarnRemoteError(PGconn *connection, PGresult *result);
|
||||||
|
|
|
@ -36,6 +36,10 @@ SELECT master_create_worker_shards('labs', 1, 1);
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
-- might be confusing to have two people in the same lab with the same name
|
||||||
|
CREATE UNIQUE INDEX avoid_name_confusion_idx ON researchers (lab_id, name);
|
||||||
|
NOTICE: using one-phase commit for distributed DDL commands
|
||||||
|
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
|
||||||
-- add some data
|
-- add some data
|
||||||
INSERT INTO researchers VALUES (1, 1, 'Donald Knuth');
|
INSERT INTO researchers VALUES (1, 1, 'Donald Knuth');
|
||||||
INSERT INTO researchers VALUES (2, 1, 'Niklaus Wirth');
|
INSERT INTO researchers VALUES (2, 1, 'Niklaus Wirth');
|
||||||
|
@ -62,6 +66,13 @@ SELECT name FROM researchers WHERE lab_id = 1 AND id = 1;
|
||||||
Donald Knuth
|
Donald Knuth
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
-- trigger a unique constraint violation
|
||||||
|
BEGIN;
|
||||||
|
UPDATE researchers SET name = 'John Backus' WHERE id = 1 AND lab_id = 1;
|
||||||
|
ERROR: duplicate key value violates unique constraint "avoid_name_confusion_idx_1200000"
|
||||||
|
DETAIL: Key (lab_id, name)=(1, John Backus) already exists.
|
||||||
|
CONTEXT: while executing command on localhost:57637
|
||||||
|
ABORT;
|
||||||
-- creating savepoints should work...
|
-- creating savepoints should work...
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO researchers VALUES (5, 3, 'Dennis Ritchie');
|
INSERT INTO researchers VALUES (5, 3, 'Dennis Ritchie');
|
||||||
|
@ -160,8 +171,6 @@ COMMIT;
|
||||||
-- whether it occurs first or second
|
-- whether it occurs first or second
|
||||||
BEGIN;
|
BEGIN;
|
||||||
ALTER TABLE labs ADD COLUMN motto text;
|
ALTER TABLE labs ADD COLUMN motto text;
|
||||||
NOTICE: using one-phase commit for distributed DDL commands
|
|
||||||
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
|
|
||||||
INSERT INTO labs VALUES (6, 'Bell Labs');
|
INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||||
ERROR: distributed data modifications must not appear in transaction blocks which contain distributed DDL commands
|
ERROR: distributed data modifications must not appear in transaction blocks which contain distributed DDL commands
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
|
|
@ -24,6 +24,9 @@ SELECT master_create_worker_shards('researchers', 2, 2);
|
||||||
SELECT master_create_distributed_table('labs', 'id', 'hash');
|
SELECT master_create_distributed_table('labs', 'id', 'hash');
|
||||||
SELECT master_create_worker_shards('labs', 1, 1);
|
SELECT master_create_worker_shards('labs', 1, 1);
|
||||||
|
|
||||||
|
-- might be confusing to have two people in the same lab with the same name
|
||||||
|
CREATE UNIQUE INDEX avoid_name_confusion_idx ON researchers (lab_id, name);
|
||||||
|
|
||||||
-- add some data
|
-- add some data
|
||||||
INSERT INTO researchers VALUES (1, 1, 'Donald Knuth');
|
INSERT INTO researchers VALUES (1, 1, 'Donald Knuth');
|
||||||
INSERT INTO researchers VALUES (2, 1, 'Niklaus Wirth');
|
INSERT INTO researchers VALUES (2, 1, 'Niklaus Wirth');
|
||||||
|
@ -45,6 +48,11 @@ ABORT;
|
||||||
|
|
||||||
SELECT name FROM researchers WHERE lab_id = 1 AND id = 1;
|
SELECT name FROM researchers WHERE lab_id = 1 AND id = 1;
|
||||||
|
|
||||||
|
-- trigger a unique constraint violation
|
||||||
|
BEGIN;
|
||||||
|
UPDATE researchers SET name = 'John Backus' WHERE id = 1 AND lab_id = 1;
|
||||||
|
ABORT;
|
||||||
|
|
||||||
-- creating savepoints should work...
|
-- creating savepoints should work...
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO researchers VALUES (5, 3, 'Dennis Ritchie');
|
INSERT INTO researchers VALUES (5, 3, 'Dennis Ritchie');
|
||||||
|
|
Loading…
Reference in New Issue