mirror of https://github.com/citusdata/citus.git
Refactor ReportRemoteError to remove boolean arg
Broke it into two explicitly-named functions instead: WarnRemoteError and ReraiseRemoteError.pull/552/head
parent
7d0c90b398
commit
9ba02928ac
|
@ -827,7 +827,7 @@ MasterPartitionMethod(RangeVar *relation)
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
ReportRemoteError(masterConnection, queryResult, false);
|
WarnRemoteError(masterConnection, queryResult);
|
||||||
ereport(ERROR, (errmsg("could not get the partition method of the "
|
ereport(ERROR, (errmsg("could not get the partition method of the "
|
||||||
"distributed table")));
|
"distributed table")));
|
||||||
}
|
}
|
||||||
|
@ -924,7 +924,7 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
|
||||||
result = PQexec(connection, "BEGIN");
|
result = PQexec(connection, "BEGIN");
|
||||||
if (PQresultStatus(result) != PGRES_COMMAND_OK)
|
if (PQresultStatus(result) != PGRES_COMMAND_OK)
|
||||||
{
|
{
|
||||||
ReportRemoteError(connection, result, false);
|
WarnRemoteError(connection, result);
|
||||||
failedPlacementList = lappend(failedPlacementList, placement);
|
failedPlacementList = lappend(failedPlacementList, placement);
|
||||||
|
|
||||||
PQclear(result);
|
PQclear(result);
|
||||||
|
@ -937,7 +937,7 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
|
||||||
result = PQexec(connection, copyCommand->data);
|
result = PQexec(connection, copyCommand->data);
|
||||||
if (PQresultStatus(result) != PGRES_COPY_IN)
|
if (PQresultStatus(result) != PGRES_COPY_IN)
|
||||||
{
|
{
|
||||||
ReportRemoteError(connection, result, false);
|
WarnRemoteError(connection, result);
|
||||||
failedPlacementList = lappend(failedPlacementList, placement);
|
failedPlacementList = lappend(failedPlacementList, placement);
|
||||||
|
|
||||||
PQclear(result);
|
PQclear(result);
|
||||||
|
@ -1504,7 +1504,7 @@ RemoteCreateEmptyShard(char *relationName)
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
ReportRemoteError(masterConnection, queryResult, false);
|
WarnRemoteError(masterConnection, queryResult);
|
||||||
ereport(ERROR, (errmsg("could not create a new empty shard on the remote node")));
|
ereport(ERROR, (errmsg("could not create a new empty shard on the remote node")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -141,7 +141,7 @@ MultiClientConnect(const char *nodeName, uint32 nodePort, const char *nodeDataba
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
ReportRemoteError(connection, NULL, false);
|
WarnRemoteError(connection, NULL);
|
||||||
|
|
||||||
PQfinish(connection);
|
PQfinish(connection);
|
||||||
connectionId = INVALID_CONNECTION_ID;
|
connectionId = INVALID_CONNECTION_ID;
|
||||||
|
@ -194,7 +194,7 @@ MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeD
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
ReportRemoteError(connection, NULL, false);
|
WarnRemoteError(connection, NULL);
|
||||||
|
|
||||||
PQfinish(connection);
|
PQfinish(connection);
|
||||||
connectionId = INVALID_CONNECTION_ID;
|
connectionId = INVALID_CONNECTION_ID;
|
||||||
|
@ -249,7 +249,7 @@ MultiClientConnectPoll(int32 connectionId)
|
||||||
}
|
}
|
||||||
else if (pollingStatus == PGRES_POLLING_FAILED)
|
else if (pollingStatus == PGRES_POLLING_FAILED)
|
||||||
{
|
{
|
||||||
ReportRemoteError(connection, NULL, false);
|
WarnRemoteError(connection, NULL);
|
||||||
|
|
||||||
connectStatus = CLIENT_CONNECTION_BAD;
|
connectStatus = CLIENT_CONNECTION_BAD;
|
||||||
}
|
}
|
||||||
|
@ -433,7 +433,7 @@ MultiClientQueryResult(int32 connectionId, void **queryResult, int *rowCount,
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
ReportRemoteError(connection, result, false);
|
WarnRemoteError(connection, result);
|
||||||
PQclear(result);
|
PQclear(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -500,7 +500,7 @@ MultiClientBatchResult(int32 connectionId, void **queryResult, int *rowCount,
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
ReportRemoteError(connection, result, false);
|
WarnRemoteError(connection, result);
|
||||||
PQclear(result);
|
PQclear(result);
|
||||||
queryStatus = CLIENT_BATCH_QUERY_FAILED;
|
queryStatus = CLIENT_BATCH_QUERY_FAILED;
|
||||||
}
|
}
|
||||||
|
@ -585,7 +585,7 @@ MultiClientQueryStatus(int32 connectionId)
|
||||||
copyResults = true;
|
copyResults = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
ReportRemoteError(connection, result, false);
|
WarnRemoteError(connection, result);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* clear the result object */
|
/* clear the result object */
|
||||||
|
@ -675,7 +675,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor)
|
||||||
{
|
{
|
||||||
copyStatus = CLIENT_COPY_FAILED;
|
copyStatus = CLIENT_COPY_FAILED;
|
||||||
|
|
||||||
ReportRemoteError(connection, result, false);
|
WarnRemoteError(connection, result);
|
||||||
}
|
}
|
||||||
|
|
||||||
PQclear(result);
|
PQclear(result);
|
||||||
|
@ -685,7 +685,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor)
|
||||||
/* received an error */
|
/* received an error */
|
||||||
copyStatus = CLIENT_COPY_FAILED;
|
copyStatus = CLIENT_COPY_FAILED;
|
||||||
|
|
||||||
ReportRemoteError(connection, NULL, false);
|
WarnRemoteError(connection, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* if copy out completed, make sure we drain all results from libpq */
|
/* if copy out completed, make sure we drain all results from libpq */
|
||||||
|
|
|
@ -279,7 +279,14 @@ ExecuteDistributedModify(Task *task)
|
||||||
category = ERRCODE_TO_CATEGORY(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION);
|
category = ERRCODE_TO_CATEGORY(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION);
|
||||||
raiseError = SqlStateMatchesCategory(sqlStateString, category);
|
raiseError = SqlStateMatchesCategory(sqlStateString, category);
|
||||||
|
|
||||||
ReportRemoteError(connection, result, raiseError);
|
if (raiseError)
|
||||||
|
{
|
||||||
|
ReraiseRemoteError(connection, result);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
WarnRemoteError(connection, result);
|
||||||
|
}
|
||||||
PQclear(result);
|
PQclear(result);
|
||||||
|
|
||||||
failedPlacementList = lappend(failedPlacementList, taskPlacement);
|
failedPlacementList = lappend(failedPlacementList, taskPlacement);
|
||||||
|
@ -463,14 +470,14 @@ SendQueryInSingleRowMode(PGconn *connection, char *query)
|
||||||
querySent = PQsendQuery(connection, query);
|
querySent = PQsendQuery(connection, query);
|
||||||
if (querySent == 0)
|
if (querySent == 0)
|
||||||
{
|
{
|
||||||
ReportRemoteError(connection, NULL, false);
|
WarnRemoteError(connection, NULL);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
singleRowMode = PQsetSingleRowMode(connection);
|
singleRowMode = PQsetSingleRowMode(connection);
|
||||||
if (singleRowMode == 0)
|
if (singleRowMode == 0)
|
||||||
{
|
{
|
||||||
ReportRemoteError(connection, NULL, false);
|
WarnRemoteError(connection, NULL);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -517,7 +524,7 @@ StoreQueryResult(PGconn *connection, TupleDesc tupleDescriptor,
|
||||||
resultStatus = PQresultStatus(result);
|
resultStatus = PQresultStatus(result);
|
||||||
if ((resultStatus != PGRES_SINGLE_TUPLE) && (resultStatus != PGRES_TUPLES_OK))
|
if ((resultStatus != PGRES_SINGLE_TUPLE) && (resultStatus != PGRES_TUPLES_OK))
|
||||||
{
|
{
|
||||||
ReportRemoteError(connection, result, false);
|
WarnRemoteError(connection, result);
|
||||||
PQclear(result);
|
PQclear(result);
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -371,14 +371,14 @@ SendQueryToPlacements(char *shardQueryString, ShardConnections *shardConnections
|
||||||
result = PQexec(connection, "BEGIN");
|
result = PQexec(connection, "BEGIN");
|
||||||
if (PQresultStatus(result) != PGRES_COMMAND_OK)
|
if (PQresultStatus(result) != PGRES_COMMAND_OK)
|
||||||
{
|
{
|
||||||
ReportRemoteError(connection, result, false);
|
WarnRemoteError(connection, result);
|
||||||
ereport(ERROR, (errmsg("could not send query to shard placement")));
|
ereport(ERROR, (errmsg("could not send query to shard placement")));
|
||||||
}
|
}
|
||||||
|
|
||||||
result = PQexec(connection, shardQueryString);
|
result = PQexec(connection, shardQueryString);
|
||||||
if (PQresultStatus(result) != PGRES_COMMAND_OK)
|
if (PQresultStatus(result) != PGRES_COMMAND_OK)
|
||||||
{
|
{
|
||||||
ReportRemoteError(connection, result, false);
|
WarnRemoteError(connection, result);
|
||||||
ereport(ERROR, (errmsg("could not send query to shard placement")));
|
ereport(ERROR, (errmsg("could not send query to shard placement")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -59,7 +59,7 @@ initialize_remote_temp_table(PG_FUNCTION_ARGS)
|
||||||
result = PQexec(connection, POPULATE_TEMP_TABLE);
|
result = PQexec(connection, POPULATE_TEMP_TABLE);
|
||||||
if (PQresultStatus(result) != PGRES_COMMAND_OK)
|
if (PQresultStatus(result) != PGRES_COMMAND_OK)
|
||||||
{
|
{
|
||||||
ReportRemoteError(connection, result, false);
|
WarnRemoteError(connection, result);
|
||||||
}
|
}
|
||||||
|
|
||||||
PQclear(result);
|
PQclear(result);
|
||||||
|
@ -90,7 +90,7 @@ count_remote_temp_table_rows(PG_FUNCTION_ARGS)
|
||||||
result = PQexec(connection, COUNT_TEMP_TABLE);
|
result = PQexec(connection, COUNT_TEMP_TABLE);
|
||||||
if (PQresultStatus(result) != PGRES_TUPLES_OK)
|
if (PQresultStatus(result) != PGRES_TUPLES_OK)
|
||||||
{
|
{
|
||||||
ReportRemoteError(connection, result, false);
|
WarnRemoteError(connection, result);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
|
|
@ -41,6 +41,7 @@ static HTAB *NodeConnectionHash = NULL;
|
||||||
|
|
||||||
/* local function forward declarations */
|
/* local function forward declarations */
|
||||||
static HTAB * CreateNodeConnectionHash(void);
|
static HTAB * CreateNodeConnectionHash(void);
|
||||||
|
static void ReportRemoteError(PGconn *connection, PGresult *result, bool raiseError);
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -214,11 +215,35 @@ SqlStateMatchesCategory(char *sqlStateString, int category)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ReportRemoteError retrieves various error fields from the a remote result and
|
* WarnRemoteError retrieves error fields from a remote result and produces an
|
||||||
* produces an error report at the WARNING level or at the ERROR level if raise
|
* error report at the WARNING level after amending the error with a CONTEXT
|
||||||
* error is set.
|
* field containing the remote node host and port information.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
|
WarnRemoteError(PGconn *connection, PGresult *result)
|
||||||
|
{
|
||||||
|
ReportRemoteError(connection, result, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ReraiseRemoteError retrieves error fields from a remote result and re-raises
|
||||||
|
* the error after amending it with a CONTEXT field containing the remote node
|
||||||
|
* host and port information.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
ReraiseRemoteError(PGconn *connection, PGresult *result)
|
||||||
|
{
|
||||||
|
ReportRemoteError(connection, result, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ReportRemoteError is an internal helper function which implements logic
|
||||||
|
* needed by both WarnRemoteError and ReraiseRemoteError. They wrap this
|
||||||
|
* function to provide explicit names for the possible behaviors.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
ReportRemoteError(PGconn *connection, PGresult *result, bool raiseError)
|
ReportRemoteError(PGconn *connection, PGresult *result, bool raiseError)
|
||||||
{
|
{
|
||||||
char *sqlStateString = PQresultErrorField(result, PG_DIAG_SQLSTATE);
|
char *sqlStateString = PQresultErrorField(result, PG_DIAG_SQLSTATE);
|
||||||
|
@ -346,7 +371,7 @@ ConnectToNode(char *nodeName, int32 nodePort, char *nodeUser)
|
||||||
/* warn if still erroring on final attempt */
|
/* warn if still erroring on final attempt */
|
||||||
if (attemptIndex == MAX_CONNECT_ATTEMPTS - 1)
|
if (attemptIndex == MAX_CONNECT_ATTEMPTS - 1)
|
||||||
{
|
{
|
||||||
ReportRemoteError(connection, NULL, false);
|
WarnRemoteError(connection, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
PQfinish(connection);
|
PQfinish(connection);
|
||||||
|
|
|
@ -75,7 +75,7 @@ PrepareRemoteTransactions(List *connectionList)
|
||||||
/* a failure to prepare is an implicit rollback */
|
/* a failure to prepare is an implicit rollback */
|
||||||
transactionConnection->transactionState = TRANSACTION_STATE_CLOSED;
|
transactionConnection->transactionState = TRANSACTION_STATE_CLOSED;
|
||||||
|
|
||||||
ReportRemoteError(connection, result, false);
|
WarnRemoteError(connection, result);
|
||||||
PQclear(result);
|
PQclear(result);
|
||||||
|
|
||||||
ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
|
ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
|
||||||
|
|
|
@ -57,7 +57,8 @@ typedef struct NodeConnectionEntry
|
||||||
extern PGconn * GetOrEstablishConnection(char *nodeName, int32 nodePort);
|
extern PGconn * GetOrEstablishConnection(char *nodeName, int32 nodePort);
|
||||||
extern void PurgeConnection(PGconn *connection);
|
extern void PurgeConnection(PGconn *connection);
|
||||||
extern bool SqlStateMatchesCategory(char *sqlStateString, int category);
|
extern bool SqlStateMatchesCategory(char *sqlStateString, int category);
|
||||||
extern void ReportRemoteError(PGconn *connection, PGresult *result, bool raiseError);
|
extern void WarnRemoteError(PGconn *connection, PGresult *result);
|
||||||
|
extern void ReraiseRemoteError(PGconn *connection, PGresult *result);
|
||||||
extern PGconn * ConnectToNode(char *nodeName, int nodePort, char *nodeUser);
|
extern PGconn * ConnectToNode(char *nodeName, int nodePort, char *nodeUser);
|
||||||
extern char * ConnectionGetOptionValue(PGconn *connection, char *optionKeyword);
|
extern char * ConnectionGetOptionValue(PGconn *connection, char *optionKeyword);
|
||||||
#endif /* CONNECTION_CACHE_H */
|
#endif /* CONNECTION_CACHE_H */
|
||||||
|
|
Loading…
Reference in New Issue