mirror of https://github.com/citusdata/citus.git
Fail fast on constraint violations in router executor
parent
15eed396b3
commit
7d0c90b398
|
@ -827,7 +827,7 @@ MasterPartitionMethod(RangeVar *relation)
|
|||
}
|
||||
else
|
||||
{
|
||||
ReportRemoteError(masterConnection, queryResult);
|
||||
ReportRemoteError(masterConnection, queryResult, false);
|
||||
ereport(ERROR, (errmsg("could not get the partition method of the "
|
||||
"distributed table")));
|
||||
}
|
||||
|
@ -924,7 +924,7 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
|
|||
result = PQexec(connection, "BEGIN");
|
||||
if (PQresultStatus(result) != PGRES_COMMAND_OK)
|
||||
{
|
||||
ReportRemoteError(connection, result);
|
||||
ReportRemoteError(connection, result, false);
|
||||
failedPlacementList = lappend(failedPlacementList, placement);
|
||||
|
||||
PQclear(result);
|
||||
|
@ -937,7 +937,7 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
|
|||
result = PQexec(connection, copyCommand->data);
|
||||
if (PQresultStatus(result) != PGRES_COPY_IN)
|
||||
{
|
||||
ReportRemoteError(connection, result);
|
||||
ReportRemoteError(connection, result, false);
|
||||
failedPlacementList = lappend(failedPlacementList, placement);
|
||||
|
||||
PQclear(result);
|
||||
|
@ -1504,7 +1504,7 @@ RemoteCreateEmptyShard(char *relationName)
|
|||
}
|
||||
else
|
||||
{
|
||||
ReportRemoteError(masterConnection, queryResult);
|
||||
ReportRemoteError(masterConnection, queryResult, false);
|
||||
ereport(ERROR, (errmsg("could not create a new empty shard on the remote node")));
|
||||
}
|
||||
|
||||
|
|
|
@ -141,7 +141,7 @@ MultiClientConnect(const char *nodeName, uint32 nodePort, const char *nodeDataba
|
|||
}
|
||||
else
|
||||
{
|
||||
ReportRemoteError(connection, NULL);
|
||||
ReportRemoteError(connection, NULL, false);
|
||||
|
||||
PQfinish(connection);
|
||||
connectionId = INVALID_CONNECTION_ID;
|
||||
|
@ -194,7 +194,7 @@ MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeD
|
|||
}
|
||||
else
|
||||
{
|
||||
ReportRemoteError(connection, NULL);
|
||||
ReportRemoteError(connection, NULL, false);
|
||||
|
||||
PQfinish(connection);
|
||||
connectionId = INVALID_CONNECTION_ID;
|
||||
|
@ -249,7 +249,7 @@ MultiClientConnectPoll(int32 connectionId)
|
|||
}
|
||||
else if (pollingStatus == PGRES_POLLING_FAILED)
|
||||
{
|
||||
ReportRemoteError(connection, NULL);
|
||||
ReportRemoteError(connection, NULL, false);
|
||||
|
||||
connectStatus = CLIENT_CONNECTION_BAD;
|
||||
}
|
||||
|
@ -433,7 +433,7 @@ MultiClientQueryResult(int32 connectionId, void **queryResult, int *rowCount,
|
|||
}
|
||||
else
|
||||
{
|
||||
ReportRemoteError(connection, result);
|
||||
ReportRemoteError(connection, result, false);
|
||||
PQclear(result);
|
||||
}
|
||||
|
||||
|
@ -500,7 +500,7 @@ MultiClientBatchResult(int32 connectionId, void **queryResult, int *rowCount,
|
|||
}
|
||||
else
|
||||
{
|
||||
ReportRemoteError(connection, result);
|
||||
ReportRemoteError(connection, result, false);
|
||||
PQclear(result);
|
||||
queryStatus = CLIENT_BATCH_QUERY_FAILED;
|
||||
}
|
||||
|
@ -585,7 +585,7 @@ MultiClientQueryStatus(int32 connectionId)
|
|||
copyResults = true;
|
||||
}
|
||||
|
||||
ReportRemoteError(connection, result);
|
||||
ReportRemoteError(connection, result, false);
|
||||
}
|
||||
|
||||
/* clear the result object */
|
||||
|
@ -675,7 +675,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor)
|
|||
{
|
||||
copyStatus = CLIENT_COPY_FAILED;
|
||||
|
||||
ReportRemoteError(connection, result);
|
||||
ReportRemoteError(connection, result, false);
|
||||
}
|
||||
|
||||
PQclear(result);
|
||||
|
@ -685,7 +685,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor)
|
|||
/* received an error */
|
||||
copyStatus = CLIENT_COPY_FAILED;
|
||||
|
||||
ReportRemoteError(connection, NULL);
|
||||
ReportRemoteError(connection, NULL, false);
|
||||
}
|
||||
|
||||
/* if copy out completed, make sure we drain all results from libpq */
|
||||
|
|
|
@ -267,7 +267,19 @@ ExecuteDistributedModify(Task *task)
|
|||
result = PQexec(connection, task->queryString);
|
||||
if (PQresultStatus(result) != PGRES_COMMAND_OK)
|
||||
{
|
||||
ReportRemoteError(connection, result);
|
||||
char *sqlStateString = PQresultErrorField(result, PG_DIAG_SQLSTATE);
|
||||
int category = 0;
|
||||
bool raiseError = false;
|
||||
|
||||
/*
|
||||
* If the error code is in constraint violation class, we want to
|
||||
* fail fast because we must get the same error from all shard
|
||||
* placements.
|
||||
*/
|
||||
category = ERRCODE_TO_CATEGORY(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION);
|
||||
raiseError = SqlStateMatchesCategory(sqlStateString, category);
|
||||
|
||||
ReportRemoteError(connection, result, raiseError);
|
||||
PQclear(result);
|
||||
|
||||
failedPlacementList = lappend(failedPlacementList, taskPlacement);
|
||||
|
@ -451,14 +463,14 @@ SendQueryInSingleRowMode(PGconn *connection, char *query)
|
|||
querySent = PQsendQuery(connection, query);
|
||||
if (querySent == 0)
|
||||
{
|
||||
ReportRemoteError(connection, NULL);
|
||||
ReportRemoteError(connection, NULL, false);
|
||||
return false;
|
||||
}
|
||||
|
||||
singleRowMode = PQsetSingleRowMode(connection);
|
||||
if (singleRowMode == 0)
|
||||
{
|
||||
ReportRemoteError(connection, NULL);
|
||||
ReportRemoteError(connection, NULL, false);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -505,7 +517,7 @@ StoreQueryResult(PGconn *connection, TupleDesc tupleDescriptor,
|
|||
resultStatus = PQresultStatus(result);
|
||||
if ((resultStatus != PGRES_SINGLE_TUPLE) && (resultStatus != PGRES_TUPLES_OK))
|
||||
{
|
||||
ReportRemoteError(connection, result);
|
||||
ReportRemoteError(connection, result, false);
|
||||
PQclear(result);
|
||||
|
||||
return false;
|
||||
|
|
|
@ -371,14 +371,14 @@ SendQueryToPlacements(char *shardQueryString, ShardConnections *shardConnections
|
|||
result = PQexec(connection, "BEGIN");
|
||||
if (PQresultStatus(result) != PGRES_COMMAND_OK)
|
||||
{
|
||||
ReportRemoteError(connection, result);
|
||||
ReportRemoteError(connection, result, false);
|
||||
ereport(ERROR, (errmsg("could not send query to shard placement")));
|
||||
}
|
||||
|
||||
result = PQexec(connection, shardQueryString);
|
||||
if (PQresultStatus(result) != PGRES_COMMAND_OK)
|
||||
{
|
||||
ReportRemoteError(connection, result);
|
||||
ReportRemoteError(connection, result, false);
|
||||
ereport(ERROR, (errmsg("could not send query to shard placement")));
|
||||
}
|
||||
|
||||
|
|
|
@ -59,7 +59,7 @@ initialize_remote_temp_table(PG_FUNCTION_ARGS)
|
|||
result = PQexec(connection, POPULATE_TEMP_TABLE);
|
||||
if (PQresultStatus(result) != PGRES_COMMAND_OK)
|
||||
{
|
||||
ReportRemoteError(connection, result);
|
||||
ReportRemoteError(connection, result, false);
|
||||
}
|
||||
|
||||
PQclear(result);
|
||||
|
@ -90,7 +90,7 @@ count_remote_temp_table_rows(PG_FUNCTION_ARGS)
|
|||
result = PQexec(connection, COUNT_TEMP_TABLE);
|
||||
if (PQresultStatus(result) != PGRES_TUPLES_OK)
|
||||
{
|
||||
ReportRemoteError(connection, result);
|
||||
ReportRemoteError(connection, result, false);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
|
@ -190,12 +190,36 @@ PurgeConnection(PGconn *connection)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* SqlStateMatchesCategory returns true if the given sql state is in the given
|
||||
* error category. Note that we use ERRCODE_TO_CATEGORY macro to determine error
|
||||
* category of the sql state and expect the caller to use the same macro for the
|
||||
* error category.
|
||||
*/
|
||||
bool
|
||||
SqlStateMatchesCategory(char *sqlStateString, int category)
|
||||
{
|
||||
bool sqlStateMatchesCategory = false;
|
||||
int sqlState = MAKE_SQLSTATE(sqlStateString[0], sqlStateString[1], sqlStateString[2],
|
||||
sqlStateString[3], sqlStateString[4]);
|
||||
|
||||
int sqlStateCategory = ERRCODE_TO_CATEGORY(sqlState);
|
||||
if (sqlStateCategory == category)
|
||||
{
|
||||
sqlStateMatchesCategory = true;
|
||||
}
|
||||
|
||||
return sqlStateMatchesCategory;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ReportRemoteError retrieves various error fields from the a remote result and
|
||||
* produces an error report at the WARNING level.
|
||||
* produces an error report at the WARNING level or at the ERROR level if raise
|
||||
* error is set.
|
||||
*/
|
||||
void
|
||||
ReportRemoteError(PGconn *connection, PGresult *result)
|
||||
ReportRemoteError(PGconn *connection, PGresult *result, bool raiseError)
|
||||
{
|
||||
char *sqlStateString = PQresultErrorField(result, PG_DIAG_SQLSTATE);
|
||||
char *messagePrimary = PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY);
|
||||
|
@ -206,6 +230,7 @@ ReportRemoteError(PGconn *connection, PGresult *result)
|
|||
char *nodeName = ConnectionGetOptionValue(connection, "host");
|
||||
char *nodePort = ConnectionGetOptionValue(connection, "port");
|
||||
int sqlState = ERRCODE_CONNECTION_FAILURE;
|
||||
int errorLevel = WARNING;
|
||||
|
||||
if (sqlStateString != NULL)
|
||||
{
|
||||
|
@ -213,6 +238,11 @@ ReportRemoteError(PGconn *connection, PGresult *result)
|
|||
sqlStateString[3], sqlStateString[4]);
|
||||
}
|
||||
|
||||
if (raiseError)
|
||||
{
|
||||
errorLevel = ERROR;
|
||||
}
|
||||
|
||||
/*
|
||||
* If the PGresult did not contain a message, the connection may provide a
|
||||
* suitable top level one. At worst, this is an empty string.
|
||||
|
@ -233,18 +263,18 @@ ReportRemoteError(PGconn *connection, PGresult *result)
|
|||
|
||||
if (sqlState == ERRCODE_CONNECTION_FAILURE)
|
||||
{
|
||||
ereport(WARNING, (errcode(sqlState),
|
||||
errmsg("connection failed to %s:%s", nodeName, nodePort),
|
||||
errdetail("%s", messagePrimary)));
|
||||
ereport(errorLevel, (errcode(sqlState),
|
||||
errmsg("connection failed to %s:%s", nodeName, nodePort),
|
||||
errdetail("%s", messagePrimary)));
|
||||
}
|
||||
else
|
||||
{
|
||||
ereport(WARNING, (errcode(sqlState), errmsg("%s", messagePrimary),
|
||||
messageDetail ? errdetail("%s", messageDetail) : 0,
|
||||
messageHint ? errhint("%s", messageHint) : 0,
|
||||
messageContext ? errcontext("%s", messageContext) : 0,
|
||||
errcontext("Error occurred on remote connection to %s:%s.",
|
||||
nodeName, nodePort)));
|
||||
ereport(errorLevel, (errcode(sqlState), errmsg("%s", messagePrimary),
|
||||
messageDetail ? errdetail("%s", messageDetail) : 0,
|
||||
messageHint ? errhint("%s", messageHint) : 0,
|
||||
messageContext ? errcontext("%s", messageContext) : 0,
|
||||
errcontext("Error occurred on remote connection to %s:%s.",
|
||||
nodeName, nodePort)));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -316,7 +346,7 @@ ConnectToNode(char *nodeName, int32 nodePort, char *nodeUser)
|
|||
/* warn if still erroring on final attempt */
|
||||
if (attemptIndex == MAX_CONNECT_ATTEMPTS - 1)
|
||||
{
|
||||
ReportRemoteError(connection, NULL);
|
||||
ReportRemoteError(connection, NULL, false);
|
||||
}
|
||||
|
||||
PQfinish(connection);
|
||||
|
|
|
@ -75,7 +75,7 @@ PrepareRemoteTransactions(List *connectionList)
|
|||
/* a failure to prepare is an implicit rollback */
|
||||
transactionConnection->transactionState = TRANSACTION_STATE_CLOSED;
|
||||
|
||||
ReportRemoteError(connection, result);
|
||||
ReportRemoteError(connection, result, false);
|
||||
PQclear(result);
|
||||
|
||||
ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
|
||||
|
|
|
@ -56,7 +56,8 @@ typedef struct NodeConnectionEntry
|
|||
/* function declarations for obtaining and using a connection */
|
||||
extern PGconn * GetOrEstablishConnection(char *nodeName, int32 nodePort);
|
||||
extern void PurgeConnection(PGconn *connection);
|
||||
extern void ReportRemoteError(PGconn *connection, PGresult *result);
|
||||
extern bool SqlStateMatchesCategory(char *sqlStateString, int category);
|
||||
extern void ReportRemoteError(PGconn *connection, PGresult *result, bool raiseError);
|
||||
extern PGconn * ConnectToNode(char *nodeName, int nodePort, char *nodeUser);
|
||||
extern char * ConnectionGetOptionValue(PGconn *connection, char *optionKeyword);
|
||||
#endif /* CONNECTION_CACHE_H */
|
||||
|
|
|
@ -68,7 +68,7 @@ INSERT INTO pg_temp.:"proxy_tablename" (id) VALUES (1);
|
|||
-- test copy with bad row in middle
|
||||
\set VERBOSITY terse
|
||||
COPY pg_temp.:"proxy_tablename" FROM stdin;
|
||||
ERROR: could not modify any active placements
|
||||
ERROR: null value in column "data" violates not-null constraint
|
||||
\set VERBOSITY default
|
||||
-- verify rows were copied to distributed table
|
||||
SELECT * FROM insert_target ORDER BY id ASC;
|
||||
|
|
|
@ -149,10 +149,14 @@ ERROR: cannot plan INSERT using row with NULL value in partition column
|
|||
-- INSERT violating column constraint
|
||||
INSERT INTO limit_orders VALUES (18811, 'BUD', 14962, '2014-04-05 08:32:16', 'sell',
|
||||
-5.00);
|
||||
ERROR: could not modify any active placements
|
||||
ERROR: new row for relation "limit_orders_750000" violates check constraint "limit_orders_limit_price_check"
|
||||
DETAIL: Failing row contains (18811, BUD, 14962, 2014-04-05 08:32:16, sell, -5.00).
|
||||
CONTEXT: Error occurred on remote connection to localhost:57637.
|
||||
-- INSERT violating primary key constraint
|
||||
INSERT INTO limit_orders VALUES (32743, 'LUV', 5994, '2001-04-16 03:37:28', 'buy', 0.58);
|
||||
ERROR: could not modify any active placements
|
||||
ERROR: duplicate key value violates unique constraint "limit_orders_pkey_750001"
|
||||
DETAIL: Key (id)=(32743) already exists.
|
||||
CONTEXT: Error occurred on remote connection to localhost:57638.
|
||||
SET client_min_messages TO DEFAULT;
|
||||
-- commands with non-constant partition values are unsupported
|
||||
INSERT INTO limit_orders VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:50:45',
|
||||
|
@ -261,26 +265,25 @@ SELECT kind, limit_price FROM limit_orders WHERE id = 246;
|
|||
buy | 0.00
|
||||
(1 row)
|
||||
|
||||
-- Test that shards which miss a modification are marked unhealthy
|
||||
-- First: Mark all placements for a node as inactive
|
||||
UPDATE pg_dist_shard_placement
|
||||
SET shardstate = 3
|
||||
WHERE nodename = 'localhost' AND
|
||||
nodeport = :worker_1_port;
|
||||
-- Second: Perform an INSERT to the remaining node
|
||||
-- Test that on unique contraint violations, we fail fast
|
||||
INSERT INTO limit_orders VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67);
|
||||
-- Third: Mark the original placements as healthy again
|
||||
UPDATE pg_dist_shard_placement
|
||||
SET shardstate = 1
|
||||
WHERE nodename = 'localhost' AND
|
||||
nodeport = :worker_1_port;
|
||||
-- Fourth: Perform the same INSERT (primary key violation)
|
||||
INSERT INTO limit_orders VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67);
|
||||
WARNING: duplicate key value violates unique constraint "limit_orders_pkey_750001"
|
||||
ERROR: duplicate key value violates unique constraint "limit_orders_pkey_750001"
|
||||
DETAIL: Key (id)=(275) already exists.
|
||||
CONTEXT: Error occurred on remote connection to localhost:57638.
|
||||
-- Last: Verify the insert worked but the placement with the PK violation is now unhealthy
|
||||
SELECT count(*) FROM limit_orders WHERE id = 275;
|
||||
-- Test that shards which miss a modification are marked unhealthy
|
||||
-- First: Connect to the second worker node
|
||||
\c - - - :worker_2_port
|
||||
-- Second: Drop limit_orders shard on the second worker node
|
||||
DROP TABLE limit_orders_750000;
|
||||
-- Third: Connect back to master node
|
||||
\c - - - :master_port
|
||||
-- Fourth: Perform an INSERT on the remaining node
|
||||
INSERT INTO limit_orders VALUES (276, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67);
|
||||
WARNING: relation "limit_orders_750000" does not exist
|
||||
CONTEXT: Error occurred on remote connection to localhost:57638.
|
||||
-- Last: Verify the insert worked but the deleted placement is now unhealthy
|
||||
SELECT count(*) FROM limit_orders WHERE id = 276;
|
||||
count
|
||||
-------
|
||||
1
|
||||
|
|
|
@ -190,28 +190,26 @@ SELECT bidder_id FROM limit_orders WHERE id = 246;
|
|||
UPDATE limit_orders SET (kind, limit_price) = ('buy', DEFAULT) WHERE id = 246;
|
||||
SELECT kind, limit_price FROM limit_orders WHERE id = 246;
|
||||
|
||||
-- Test that on unique contraint violations, we fail fast
|
||||
INSERT INTO limit_orders VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67);
|
||||
INSERT INTO limit_orders VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67);
|
||||
|
||||
-- Test that shards which miss a modification are marked unhealthy
|
||||
|
||||
-- First: Mark all placements for a node as inactive
|
||||
UPDATE pg_dist_shard_placement
|
||||
SET shardstate = 3
|
||||
WHERE nodename = 'localhost' AND
|
||||
nodeport = :worker_1_port;
|
||||
-- First: Connect to the second worker node
|
||||
\c - - - :worker_2_port
|
||||
|
||||
-- Second: Perform an INSERT to the remaining node
|
||||
INSERT INTO limit_orders VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67);
|
||||
-- Second: Drop limit_orders shard on the second worker node
|
||||
DROP TABLE limit_orders_750000;
|
||||
|
||||
-- Third: Mark the original placements as healthy again
|
||||
UPDATE pg_dist_shard_placement
|
||||
SET shardstate = 1
|
||||
WHERE nodename = 'localhost' AND
|
||||
nodeport = :worker_1_port;
|
||||
-- Third: Connect back to master node
|
||||
\c - - - :master_port
|
||||
|
||||
-- Fourth: Perform the same INSERT (primary key violation)
|
||||
INSERT INTO limit_orders VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67);
|
||||
-- Fourth: Perform an INSERT on the remaining node
|
||||
INSERT INTO limit_orders VALUES (276, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67);
|
||||
|
||||
-- Last: Verify the insert worked but the placement with the PK violation is now unhealthy
|
||||
SELECT count(*) FROM limit_orders WHERE id = 275;
|
||||
-- Last: Verify the insert worked but the deleted placement is now unhealthy
|
||||
SELECT count(*) FROM limit_orders WHERE id = 276;
|
||||
SELECT count(*)
|
||||
FROM pg_dist_shard_placement AS sp,
|
||||
pg_dist_shard AS s
|
||||
|
|
Loading…
Reference in New Issue