diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index cbfb62027..1dd545c97 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -827,7 +827,7 @@ MasterPartitionMethod(RangeVar *relation) } else { - ReportRemoteError(masterConnection, queryResult, false); + WarnRemoteError(masterConnection, queryResult); ereport(ERROR, (errmsg("could not get the partition method of the " "distributed table"))); } @@ -924,7 +924,7 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections result = PQexec(connection, "BEGIN"); if (PQresultStatus(result) != PGRES_COMMAND_OK) { - ReportRemoteError(connection, result, false); + WarnRemoteError(connection, result); failedPlacementList = lappend(failedPlacementList, placement); PQclear(result); @@ -937,7 +937,7 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections result = PQexec(connection, copyCommand->data); if (PQresultStatus(result) != PGRES_COPY_IN) { - ReportRemoteError(connection, result, false); + WarnRemoteError(connection, result); failedPlacementList = lappend(failedPlacementList, placement); PQclear(result); @@ -1504,7 +1504,7 @@ RemoteCreateEmptyShard(char *relationName) } else { - ReportRemoteError(masterConnection, queryResult, false); + WarnRemoteError(masterConnection, queryResult); ereport(ERROR, (errmsg("could not create a new empty shard on the remote node"))); } diff --git a/src/backend/distributed/executor/multi_client_executor.c b/src/backend/distributed/executor/multi_client_executor.c index 241e62daa..c51c4d79b 100644 --- a/src/backend/distributed/executor/multi_client_executor.c +++ b/src/backend/distributed/executor/multi_client_executor.c @@ -141,7 +141,7 @@ MultiClientConnect(const char *nodeName, uint32 nodePort, const char *nodeDataba } else { - ReportRemoteError(connection, NULL, false); + WarnRemoteError(connection, NULL); PQfinish(connection); connectionId = INVALID_CONNECTION_ID; @@ -194,7 +194,7 @@ MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeD } else { - ReportRemoteError(connection, NULL, false); + WarnRemoteError(connection, NULL); PQfinish(connection); connectionId = INVALID_CONNECTION_ID; @@ -249,7 +249,7 @@ MultiClientConnectPoll(int32 connectionId) } else if (pollingStatus == PGRES_POLLING_FAILED) { - ReportRemoteError(connection, NULL, false); + WarnRemoteError(connection, NULL); connectStatus = CLIENT_CONNECTION_BAD; } @@ -433,7 +433,7 @@ MultiClientQueryResult(int32 connectionId, void **queryResult, int *rowCount, } else { - ReportRemoteError(connection, result, false); + WarnRemoteError(connection, result); PQclear(result); } @@ -500,7 +500,7 @@ MultiClientBatchResult(int32 connectionId, void **queryResult, int *rowCount, } else { - ReportRemoteError(connection, result, false); + WarnRemoteError(connection, result); PQclear(result); queryStatus = CLIENT_BATCH_QUERY_FAILED; } @@ -585,7 +585,7 @@ MultiClientQueryStatus(int32 connectionId) copyResults = true; } - ReportRemoteError(connection, result, false); + WarnRemoteError(connection, result); } /* clear the result object */ @@ -675,7 +675,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor) { copyStatus = CLIENT_COPY_FAILED; - ReportRemoteError(connection, result, false); + WarnRemoteError(connection, result); } PQclear(result); @@ -685,7 +685,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor) /* received an error */ copyStatus = CLIENT_COPY_FAILED; - ReportRemoteError(connection, NULL, false); + WarnRemoteError(connection, NULL); } /* if copy out completed, make sure we drain all results from libpq */ diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 5720ab63d..3758d3134 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -279,7 +279,14 @@ ExecuteDistributedModify(Task *task) category = ERRCODE_TO_CATEGORY(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION); raiseError = SqlStateMatchesCategory(sqlStateString, category); - ReportRemoteError(connection, result, raiseError); + if (raiseError) + { + ReraiseRemoteError(connection, result); + } + else + { + WarnRemoteError(connection, result); + } PQclear(result); failedPlacementList = lappend(failedPlacementList, taskPlacement); @@ -463,14 +470,14 @@ SendQueryInSingleRowMode(PGconn *connection, char *query) querySent = PQsendQuery(connection, query); if (querySent == 0) { - ReportRemoteError(connection, NULL, false); + WarnRemoteError(connection, NULL); return false; } singleRowMode = PQsetSingleRowMode(connection); if (singleRowMode == 0) { - ReportRemoteError(connection, NULL, false); + WarnRemoteError(connection, NULL); return false; } @@ -517,7 +524,7 @@ StoreQueryResult(PGconn *connection, TupleDesc tupleDescriptor, resultStatus = PQresultStatus(result); if ((resultStatus != PGRES_SINGLE_TUPLE) && (resultStatus != PGRES_TUPLES_OK)) { - ReportRemoteError(connection, result, false); + WarnRemoteError(connection, result); PQclear(result); return false; diff --git a/src/backend/distributed/master/master_modify_multiple_shards.c b/src/backend/distributed/master/master_modify_multiple_shards.c index 3933eaae1..0ca5afc52 100644 --- a/src/backend/distributed/master/master_modify_multiple_shards.c +++ b/src/backend/distributed/master/master_modify_multiple_shards.c @@ -371,14 +371,14 @@ SendQueryToPlacements(char *shardQueryString, ShardConnections *shardConnections result = PQexec(connection, "BEGIN"); if (PQresultStatus(result) != PGRES_COMMAND_OK) { - ReportRemoteError(connection, result, false); + WarnRemoteError(connection, result); ereport(ERROR, (errmsg("could not send query to shard placement"))); } result = PQexec(connection, shardQueryString); if (PQresultStatus(result) != PGRES_COMMAND_OK) { - ReportRemoteError(connection, result, false); + WarnRemoteError(connection, result); ereport(ERROR, (errmsg("could not send query to shard placement"))); } diff --git a/src/backend/distributed/test/connection_cache.c b/src/backend/distributed/test/connection_cache.c index 1e99836a1..3d641d837 100644 --- a/src/backend/distributed/test/connection_cache.c +++ b/src/backend/distributed/test/connection_cache.c @@ -59,7 +59,7 @@ initialize_remote_temp_table(PG_FUNCTION_ARGS) result = PQexec(connection, POPULATE_TEMP_TABLE); if (PQresultStatus(result) != PGRES_COMMAND_OK) { - ReportRemoteError(connection, result, false); + WarnRemoteError(connection, result); } PQclear(result); @@ -90,7 +90,7 @@ count_remote_temp_table_rows(PG_FUNCTION_ARGS) result = PQexec(connection, COUNT_TEMP_TABLE); if (PQresultStatus(result) != PGRES_TUPLES_OK) { - ReportRemoteError(connection, result, false); + WarnRemoteError(connection, result); } else { diff --git a/src/backend/distributed/utils/connection_cache.c b/src/backend/distributed/utils/connection_cache.c index eeca2d8f7..07ae1fe2f 100644 --- a/src/backend/distributed/utils/connection_cache.c +++ b/src/backend/distributed/utils/connection_cache.c @@ -41,6 +41,7 @@ static HTAB *NodeConnectionHash = NULL; /* local function forward declarations */ static HTAB * CreateNodeConnectionHash(void); +static void ReportRemoteError(PGconn *connection, PGresult *result, bool raiseError); /* @@ -214,11 +215,35 @@ SqlStateMatchesCategory(char *sqlStateString, int category) /* - * ReportRemoteError retrieves various error fields from the a remote result and - * produces an error report at the WARNING level or at the ERROR level if raise - * error is set. + * WarnRemoteError retrieves error fields from a remote result and produces an + * error report at the WARNING level after amending the error with a CONTEXT + * field containing the remote node host and port information. */ void +WarnRemoteError(PGconn *connection, PGresult *result) +{ + ReportRemoteError(connection, result, false); +} + + +/* + * ReraiseRemoteError retrieves error fields from a remote result and re-raises + * the error after amending it with a CONTEXT field containing the remote node + * host and port information. + */ +void +ReraiseRemoteError(PGconn *connection, PGresult *result) +{ + ReportRemoteError(connection, result, true); +} + + +/* + * ReportRemoteError is an internal helper function which implements logic + * needed by both WarnRemoteError and ReraiseRemoteError. They wrap this + * function to provide explicit names for the possible behaviors. + */ +static void ReportRemoteError(PGconn *connection, PGresult *result, bool raiseError) { char *sqlStateString = PQresultErrorField(result, PG_DIAG_SQLSTATE); @@ -346,7 +371,7 @@ ConnectToNode(char *nodeName, int32 nodePort, char *nodeUser) /* warn if still erroring on final attempt */ if (attemptIndex == MAX_CONNECT_ATTEMPTS - 1) { - ReportRemoteError(connection, NULL, false); + WarnRemoteError(connection, NULL); } PQfinish(connection); diff --git a/src/backend/distributed/utils/multi_transaction.c b/src/backend/distributed/utils/multi_transaction.c index d9e48b270..7204dfd92 100644 --- a/src/backend/distributed/utils/multi_transaction.c +++ b/src/backend/distributed/utils/multi_transaction.c @@ -75,7 +75,7 @@ PrepareRemoteTransactions(List *connectionList) /* a failure to prepare is an implicit rollback */ transactionConnection->transactionState = TRANSACTION_STATE_CLOSED; - ReportRemoteError(connection, result, false); + WarnRemoteError(connection, result); PQclear(result); ereport(ERROR, (errcode(ERRCODE_IO_ERROR), diff --git a/src/include/distributed/connection_cache.h b/src/include/distributed/connection_cache.h index 9d8492794..326e229c2 100644 --- a/src/include/distributed/connection_cache.h +++ b/src/include/distributed/connection_cache.h @@ -57,7 +57,8 @@ typedef struct NodeConnectionEntry extern PGconn * GetOrEstablishConnection(char *nodeName, int32 nodePort); extern void PurgeConnection(PGconn *connection); extern bool SqlStateMatchesCategory(char *sqlStateString, int category); -extern void ReportRemoteError(PGconn *connection, PGresult *result, bool raiseError); +extern void WarnRemoteError(PGconn *connection, PGresult *result); +extern void ReraiseRemoteError(PGconn *connection, PGresult *result); extern PGconn * ConnectToNode(char *nodeName, int nodePort, char *nodeUser); extern char * ConnectionGetOptionValue(PGconn *connection, char *optionKeyword); #endif /* CONNECTION_CACHE_H */