diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index f858bb660..cbfb62027 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -827,7 +827,7 @@ MasterPartitionMethod(RangeVar *relation) } else { - ReportRemoteError(masterConnection, queryResult); + ReportRemoteError(masterConnection, queryResult, false); ereport(ERROR, (errmsg("could not get the partition method of the " "distributed table"))); } @@ -924,7 +924,7 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections result = PQexec(connection, "BEGIN"); if (PQresultStatus(result) != PGRES_COMMAND_OK) { - ReportRemoteError(connection, result); + ReportRemoteError(connection, result, false); failedPlacementList = lappend(failedPlacementList, placement); PQclear(result); @@ -937,7 +937,7 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections result = PQexec(connection, copyCommand->data); if (PQresultStatus(result) != PGRES_COPY_IN) { - ReportRemoteError(connection, result); + ReportRemoteError(connection, result, false); failedPlacementList = lappend(failedPlacementList, placement); PQclear(result); @@ -1504,7 +1504,7 @@ RemoteCreateEmptyShard(char *relationName) } else { - ReportRemoteError(masterConnection, queryResult); + ReportRemoteError(masterConnection, queryResult, false); ereport(ERROR, (errmsg("could not create a new empty shard on the remote node"))); } diff --git a/src/backend/distributed/executor/multi_client_executor.c b/src/backend/distributed/executor/multi_client_executor.c index 0cccff038..241e62daa 100644 --- a/src/backend/distributed/executor/multi_client_executor.c +++ b/src/backend/distributed/executor/multi_client_executor.c @@ -141,7 +141,7 @@ MultiClientConnect(const char *nodeName, uint32 nodePort, const char *nodeDataba } else { - ReportRemoteError(connection, NULL); + ReportRemoteError(connection, NULL, false); PQfinish(connection); connectionId = INVALID_CONNECTION_ID; @@ -194,7 +194,7 @@ MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeD } else { - ReportRemoteError(connection, NULL); + ReportRemoteError(connection, NULL, false); PQfinish(connection); connectionId = INVALID_CONNECTION_ID; @@ -249,7 +249,7 @@ MultiClientConnectPoll(int32 connectionId) } else if (pollingStatus == PGRES_POLLING_FAILED) { - ReportRemoteError(connection, NULL); + ReportRemoteError(connection, NULL, false); connectStatus = CLIENT_CONNECTION_BAD; } @@ -433,7 +433,7 @@ MultiClientQueryResult(int32 connectionId, void **queryResult, int *rowCount, } else { - ReportRemoteError(connection, result); + ReportRemoteError(connection, result, false); PQclear(result); } @@ -500,7 +500,7 @@ MultiClientBatchResult(int32 connectionId, void **queryResult, int *rowCount, } else { - ReportRemoteError(connection, result); + ReportRemoteError(connection, result, false); PQclear(result); queryStatus = CLIENT_BATCH_QUERY_FAILED; } @@ -585,7 +585,7 @@ MultiClientQueryStatus(int32 connectionId) copyResults = true; } - ReportRemoteError(connection, result); + ReportRemoteError(connection, result, false); } /* clear the result object */ @@ -675,7 +675,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor) { copyStatus = CLIENT_COPY_FAILED; - ReportRemoteError(connection, result); + ReportRemoteError(connection, result, false); } PQclear(result); @@ -685,7 +685,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor) /* received an error */ copyStatus = CLIENT_COPY_FAILED; - ReportRemoteError(connection, NULL); + ReportRemoteError(connection, NULL, false); } /* if copy out completed, make sure we drain all results from libpq */ diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 46b7b9b7d..5720ab63d 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -267,7 +267,19 @@ ExecuteDistributedModify(Task *task) result = PQexec(connection, task->queryString); if (PQresultStatus(result) != PGRES_COMMAND_OK) { - ReportRemoteError(connection, result); + char *sqlStateString = PQresultErrorField(result, PG_DIAG_SQLSTATE); + int category = 0; + bool raiseError = false; + + /* + * If the error code is in constraint violation class, we want to + * fail fast because we must get the same error from all shard + * placements. + */ + category = ERRCODE_TO_CATEGORY(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION); + raiseError = SqlStateMatchesCategory(sqlStateString, category); + + ReportRemoteError(connection, result, raiseError); PQclear(result); failedPlacementList = lappend(failedPlacementList, taskPlacement); @@ -451,14 +463,14 @@ SendQueryInSingleRowMode(PGconn *connection, char *query) querySent = PQsendQuery(connection, query); if (querySent == 0) { - ReportRemoteError(connection, NULL); + ReportRemoteError(connection, NULL, false); return false; } singleRowMode = PQsetSingleRowMode(connection); if (singleRowMode == 0) { - ReportRemoteError(connection, NULL); + ReportRemoteError(connection, NULL, false); return false; } @@ -505,7 +517,7 @@ StoreQueryResult(PGconn *connection, TupleDesc tupleDescriptor, resultStatus = PQresultStatus(result); if ((resultStatus != PGRES_SINGLE_TUPLE) && (resultStatus != PGRES_TUPLES_OK)) { - ReportRemoteError(connection, result); + ReportRemoteError(connection, result, false); PQclear(result); return false; diff --git a/src/backend/distributed/master/master_modify_multiple_shards.c b/src/backend/distributed/master/master_modify_multiple_shards.c index 65aef5aa8..3933eaae1 100644 --- a/src/backend/distributed/master/master_modify_multiple_shards.c +++ b/src/backend/distributed/master/master_modify_multiple_shards.c @@ -371,14 +371,14 @@ SendQueryToPlacements(char *shardQueryString, ShardConnections *shardConnections result = PQexec(connection, "BEGIN"); if (PQresultStatus(result) != PGRES_COMMAND_OK) { - ReportRemoteError(connection, result); + ReportRemoteError(connection, result, false); ereport(ERROR, (errmsg("could not send query to shard placement"))); } result = PQexec(connection, shardQueryString); if (PQresultStatus(result) != PGRES_COMMAND_OK) { - ReportRemoteError(connection, result); + ReportRemoteError(connection, result, false); ereport(ERROR, (errmsg("could not send query to shard placement"))); } diff --git a/src/backend/distributed/test/connection_cache.c b/src/backend/distributed/test/connection_cache.c index 8ce7a2cce..1e99836a1 100644 --- a/src/backend/distributed/test/connection_cache.c +++ b/src/backend/distributed/test/connection_cache.c @@ -59,7 +59,7 @@ initialize_remote_temp_table(PG_FUNCTION_ARGS) result = PQexec(connection, POPULATE_TEMP_TABLE); if (PQresultStatus(result) != PGRES_COMMAND_OK) { - ReportRemoteError(connection, result); + ReportRemoteError(connection, result, false); } PQclear(result); @@ -90,7 +90,7 @@ count_remote_temp_table_rows(PG_FUNCTION_ARGS) result = PQexec(connection, COUNT_TEMP_TABLE); if (PQresultStatus(result) != PGRES_TUPLES_OK) { - ReportRemoteError(connection, result); + ReportRemoteError(connection, result, false); } else { diff --git a/src/backend/distributed/utils/connection_cache.c b/src/backend/distributed/utils/connection_cache.c index ce592d8b5..eeca2d8f7 100644 --- a/src/backend/distributed/utils/connection_cache.c +++ b/src/backend/distributed/utils/connection_cache.c @@ -190,12 +190,36 @@ PurgeConnection(PGconn *connection) } +/* + * SqlStateMatchesCategory returns true if the given sql state is in the given + * error category. Note that we use ERRCODE_TO_CATEGORY macro to determine error + * category of the sql state and expect the caller to use the same macro for the + * error category. + */ +bool +SqlStateMatchesCategory(char *sqlStateString, int category) +{ + bool sqlStateMatchesCategory = false; + int sqlState = MAKE_SQLSTATE(sqlStateString[0], sqlStateString[1], sqlStateString[2], + sqlStateString[3], sqlStateString[4]); + + int sqlStateCategory = ERRCODE_TO_CATEGORY(sqlState); + if (sqlStateCategory == category) + { + sqlStateMatchesCategory = true; + } + + return sqlStateMatchesCategory; +} + + /* * ReportRemoteError retrieves various error fields from the a remote result and - * produces an error report at the WARNING level. + * produces an error report at the WARNING level or at the ERROR level if raise + * error is set. */ void -ReportRemoteError(PGconn *connection, PGresult *result) +ReportRemoteError(PGconn *connection, PGresult *result, bool raiseError) { char *sqlStateString = PQresultErrorField(result, PG_DIAG_SQLSTATE); char *messagePrimary = PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY); @@ -206,6 +230,7 @@ ReportRemoteError(PGconn *connection, PGresult *result) char *nodeName = ConnectionGetOptionValue(connection, "host"); char *nodePort = ConnectionGetOptionValue(connection, "port"); int sqlState = ERRCODE_CONNECTION_FAILURE; + int errorLevel = WARNING; if (sqlStateString != NULL) { @@ -213,6 +238,11 @@ ReportRemoteError(PGconn *connection, PGresult *result) sqlStateString[3], sqlStateString[4]); } + if (raiseError) + { + errorLevel = ERROR; + } + /* * If the PGresult did not contain a message, the connection may provide a * suitable top level one. At worst, this is an empty string. @@ -233,18 +263,18 @@ ReportRemoteError(PGconn *connection, PGresult *result) if (sqlState == ERRCODE_CONNECTION_FAILURE) { - ereport(WARNING, (errcode(sqlState), - errmsg("connection failed to %s:%s", nodeName, nodePort), - errdetail("%s", messagePrimary))); + ereport(errorLevel, (errcode(sqlState), + errmsg("connection failed to %s:%s", nodeName, nodePort), + errdetail("%s", messagePrimary))); } else { - ereport(WARNING, (errcode(sqlState), errmsg("%s", messagePrimary), - messageDetail ? errdetail("%s", messageDetail) : 0, - messageHint ? errhint("%s", messageHint) : 0, - messageContext ? errcontext("%s", messageContext) : 0, - errcontext("Error occurred on remote connection to %s:%s.", - nodeName, nodePort))); + ereport(errorLevel, (errcode(sqlState), errmsg("%s", messagePrimary), + messageDetail ? errdetail("%s", messageDetail) : 0, + messageHint ? errhint("%s", messageHint) : 0, + messageContext ? errcontext("%s", messageContext) : 0, + errcontext("Error occurred on remote connection to %s:%s.", + nodeName, nodePort))); } } @@ -316,7 +346,7 @@ ConnectToNode(char *nodeName, int32 nodePort, char *nodeUser) /* warn if still erroring on final attempt */ if (attemptIndex == MAX_CONNECT_ATTEMPTS - 1) { - ReportRemoteError(connection, NULL); + ReportRemoteError(connection, NULL, false); } PQfinish(connection); diff --git a/src/backend/distributed/utils/multi_transaction.c b/src/backend/distributed/utils/multi_transaction.c index f9885f4e0..d9e48b270 100644 --- a/src/backend/distributed/utils/multi_transaction.c +++ b/src/backend/distributed/utils/multi_transaction.c @@ -75,7 +75,7 @@ PrepareRemoteTransactions(List *connectionList) /* a failure to prepare is an implicit rollback */ transactionConnection->transactionState = TRANSACTION_STATE_CLOSED; - ReportRemoteError(connection, result); + ReportRemoteError(connection, result, false); PQclear(result); ereport(ERROR, (errcode(ERRCODE_IO_ERROR), diff --git a/src/include/distributed/connection_cache.h b/src/include/distributed/connection_cache.h index fb4f01a89..9d8492794 100644 --- a/src/include/distributed/connection_cache.h +++ b/src/include/distributed/connection_cache.h @@ -56,7 +56,8 @@ typedef struct NodeConnectionEntry /* function declarations for obtaining and using a connection */ extern PGconn * GetOrEstablishConnection(char *nodeName, int32 nodePort); extern void PurgeConnection(PGconn *connection); -extern void ReportRemoteError(PGconn *connection, PGresult *result); +extern bool SqlStateMatchesCategory(char *sqlStateString, int category); +extern void ReportRemoteError(PGconn *connection, PGresult *result, bool raiseError); extern PGconn * ConnectToNode(char *nodeName, int nodePort, char *nodeUser); extern char * ConnectionGetOptionValue(PGconn *connection, char *optionKeyword); #endif /* CONNECTION_CACHE_H */ diff --git a/src/test/regress/expected/multi_create_insert_proxy.out b/src/test/regress/expected/multi_create_insert_proxy.out index 93faa5e71..a18205995 100644 --- a/src/test/regress/expected/multi_create_insert_proxy.out +++ b/src/test/regress/expected/multi_create_insert_proxy.out @@ -68,7 +68,7 @@ INSERT INTO pg_temp.:"proxy_tablename" (id) VALUES (1); -- test copy with bad row in middle \set VERBOSITY terse COPY pg_temp.:"proxy_tablename" FROM stdin; -ERROR: could not modify any active placements +ERROR: null value in column "data" violates not-null constraint \set VERBOSITY default -- verify rows were copied to distributed table SELECT * FROM insert_target ORDER BY id ASC; diff --git a/src/test/regress/expected/multi_modifications.out b/src/test/regress/expected/multi_modifications.out index 5d0dcdaee..52d725c31 100644 --- a/src/test/regress/expected/multi_modifications.out +++ b/src/test/regress/expected/multi_modifications.out @@ -149,10 +149,14 @@ ERROR: cannot plan INSERT using row with NULL value in partition column -- INSERT violating column constraint INSERT INTO limit_orders VALUES (18811, 'BUD', 14962, '2014-04-05 08:32:16', 'sell', -5.00); -ERROR: could not modify any active placements +ERROR: new row for relation "limit_orders_750000" violates check constraint "limit_orders_limit_price_check" +DETAIL: Failing row contains (18811, BUD, 14962, 2014-04-05 08:32:16, sell, -5.00). +CONTEXT: Error occurred on remote connection to localhost:57637. -- INSERT violating primary key constraint INSERT INTO limit_orders VALUES (32743, 'LUV', 5994, '2001-04-16 03:37:28', 'buy', 0.58); -ERROR: could not modify any active placements +ERROR: duplicate key value violates unique constraint "limit_orders_pkey_750001" +DETAIL: Key (id)=(32743) already exists. +CONTEXT: Error occurred on remote connection to localhost:57638. SET client_min_messages TO DEFAULT; -- commands with non-constant partition values are unsupported INSERT INTO limit_orders VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:50:45', @@ -261,26 +265,25 @@ SELECT kind, limit_price FROM limit_orders WHERE id = 246; buy | 0.00 (1 row) --- Test that shards which miss a modification are marked unhealthy --- First: Mark all placements for a node as inactive -UPDATE pg_dist_shard_placement -SET shardstate = 3 -WHERE nodename = 'localhost' AND - nodeport = :worker_1_port; --- Second: Perform an INSERT to the remaining node +-- Test that on unique contraint violations, we fail fast INSERT INTO limit_orders VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67); --- Third: Mark the original placements as healthy again -UPDATE pg_dist_shard_placement -SET shardstate = 1 -WHERE nodename = 'localhost' AND - nodeport = :worker_1_port; --- Fourth: Perform the same INSERT (primary key violation) INSERT INTO limit_orders VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67); -WARNING: duplicate key value violates unique constraint "limit_orders_pkey_750001" +ERROR: duplicate key value violates unique constraint "limit_orders_pkey_750001" DETAIL: Key (id)=(275) already exists. CONTEXT: Error occurred on remote connection to localhost:57638. --- Last: Verify the insert worked but the placement with the PK violation is now unhealthy -SELECT count(*) FROM limit_orders WHERE id = 275; +-- Test that shards which miss a modification are marked unhealthy +-- First: Connect to the second worker node +\c - - - :worker_2_port +-- Second: Drop limit_orders shard on the second worker node +DROP TABLE limit_orders_750000; +-- Third: Connect back to master node +\c - - - :master_port +-- Fourth: Perform an INSERT on the remaining node +INSERT INTO limit_orders VALUES (276, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67); +WARNING: relation "limit_orders_750000" does not exist +CONTEXT: Error occurred on remote connection to localhost:57638. +-- Last: Verify the insert worked but the deleted placement is now unhealthy +SELECT count(*) FROM limit_orders WHERE id = 276; count ------- 1 diff --git a/src/test/regress/sql/multi_modifications.sql b/src/test/regress/sql/multi_modifications.sql index 238fa57d2..15434dbb5 100644 --- a/src/test/regress/sql/multi_modifications.sql +++ b/src/test/regress/sql/multi_modifications.sql @@ -190,28 +190,26 @@ SELECT bidder_id FROM limit_orders WHERE id = 246; UPDATE limit_orders SET (kind, limit_price) = ('buy', DEFAULT) WHERE id = 246; SELECT kind, limit_price FROM limit_orders WHERE id = 246; +-- Test that on unique contraint violations, we fail fast +INSERT INTO limit_orders VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67); +INSERT INTO limit_orders VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67); + -- Test that shards which miss a modification are marked unhealthy --- First: Mark all placements for a node as inactive -UPDATE pg_dist_shard_placement -SET shardstate = 3 -WHERE nodename = 'localhost' AND - nodeport = :worker_1_port; +-- First: Connect to the second worker node +\c - - - :worker_2_port --- Second: Perform an INSERT to the remaining node -INSERT INTO limit_orders VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67); +-- Second: Drop limit_orders shard on the second worker node +DROP TABLE limit_orders_750000; --- Third: Mark the original placements as healthy again -UPDATE pg_dist_shard_placement -SET shardstate = 1 -WHERE nodename = 'localhost' AND - nodeport = :worker_1_port; +-- Third: Connect back to master node +\c - - - :master_port --- Fourth: Perform the same INSERT (primary key violation) -INSERT INTO limit_orders VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67); +-- Fourth: Perform an INSERT on the remaining node +INSERT INTO limit_orders VALUES (276, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67); --- Last: Verify the insert worked but the placement with the PK violation is now unhealthy -SELECT count(*) FROM limit_orders WHERE id = 275; +-- Last: Verify the insert worked but the deleted placement is now unhealthy +SELECT count(*) FROM limit_orders WHERE id = 276; SELECT count(*) FROM pg_dist_shard_placement AS sp, pg_dist_shard AS s