Merge pull request #5466 from citusdata/improve_error_msgs

pull/5800/head
Marco Slot 2022-03-14 15:23:41 +01:00 committed by GitHub
commit 5d07273ca6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 169 additions and 116 deletions

View File

@ -870,7 +870,19 @@ WaitEventSetFromMultiConnectionStates(List *connections, int *waitCount)
int eventMask = MultiConnectionStateEventMask(connectionState); int eventMask = MultiConnectionStateEventMask(connectionState);
AddWaitEventToSet(waitEventSet, eventMask, sock, NULL, connectionState); int waitEventSetIndex =
CitusAddWaitEventSetToSet(waitEventSet, eventMask, sock,
NULL, (void *) connectionState);
if (waitEventSetIndex == WAIT_EVENT_SET_INDEX_FAILED)
{
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("connection establishment for node %s:%d failed",
connectionState->connection->hostname,
connectionState->connection->port),
errhint("Check both the local and remote server logs for the "
"connection establishment errors.")));
}
numEventsAdded++; numEventsAdded++;
if (waitCount) if (waitCount)
@ -1020,7 +1032,19 @@ FinishConnectionListEstablishment(List *multiConnectionList)
{ {
/* connection state changed, reset the event mask */ /* connection state changed, reset the event mask */
uint32 eventMask = MultiConnectionStateEventMask(connectionState); uint32 eventMask = MultiConnectionStateEventMask(connectionState);
ModifyWaitEvent(waitEventSet, event->pos, eventMask, NULL); bool success =
CitusModifyWaitEvent(waitEventSet, event->pos,
eventMask, NULL);
if (!success)
{
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("connection establishment for node %s:%d "
"failed", connection->hostname,
connection->port),
errhint("Check both the local and remote server "
"logs for the connection establishment "
"errors.")));
}
} }
/* /*
@ -1521,3 +1545,95 @@ MarkConnectionConnected(MultiConnection *connection)
INSTR_TIME_SET_CURRENT(connection->connectionEstablishmentEnd); INSTR_TIME_SET_CURRENT(connection->connectionEstablishmentEnd);
} }
} }
/*
* CitusAddWaitEventSetToSet is a wrapper around Postgres' AddWaitEventToSet().
*
* AddWaitEventToSet() may throw hard errors. For example, when the
* underlying socket for a connection is closed by the remote server
* and already reflected by the OS, however Citus hasn't had a chance
* to get this information. In that case, if replication factor is >1,
* Citus can failover to other nodes for executing the query. Even if
* replication factor = 1, Citus can give much nicer errors.
*
* So CitusAddWaitEventSetToSet simply puts ModifyWaitEvent into a
* PG_TRY/PG_CATCH block in order to catch any hard errors, and
* returns this information to the caller.
*/
int
CitusAddWaitEventSetToSet(WaitEventSet *set, uint32 events, pgsocket fd,
Latch *latch, void *user_data)
{
volatile int waitEventSetIndex = WAIT_EVENT_SET_INDEX_NOT_INITIALIZED;
MemoryContext savedContext = CurrentMemoryContext;
PG_TRY();
{
waitEventSetIndex =
AddWaitEventToSet(set, events, fd, latch, (void *) user_data);
}
PG_CATCH();
{
/*
* We might be in an arbitrary memory context when the
* error is thrown and we should get back to one we had
* at PG_TRY() time, especially because we are not
* re-throwing the error.
*/
MemoryContextSwitchTo(savedContext);
FlushErrorState();
/* let the callers know about the failure */
waitEventSetIndex = WAIT_EVENT_SET_INDEX_FAILED;
}
PG_END_TRY();
return waitEventSetIndex;
}
/*
* CitusModifyWaitEvent is a wrapper around Postgres' ModifyWaitEvent().
*
* ModifyWaitEvent may throw hard errors. For example, when the underlying
* socket for a connection is closed by the remote server and already
* reflected by the OS, however Citus hasn't had a chance to get this
* information. In that case, if replication factor is >1, Citus can
* failover to other nodes for executing the query. Even if replication
* factor = 1, Citus can give much nicer errors.
*
* So CitusModifyWaitEvent simply puts ModifyWaitEvent into a PG_TRY/PG_CATCH
* block in order to catch any hard errors, and returns this information to the
* caller.
*/
bool
CitusModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
{
volatile bool success = true;
MemoryContext savedContext = CurrentMemoryContext;
PG_TRY();
{
ModifyWaitEvent(set, pos, events, latch);
}
PG_CATCH();
{
/*
* We might be in an arbitrary memory context when the
* error is thrown and we should get back to one we had
* at PG_TRY() time, especially because we are not
* re-throwing the error.
*/
MemoryContextSwitchTo(savedContext);
FlushErrorState();
/* let the callers know about the failure */
success = false;
}
PG_END_TRY();
return success;
}

View File

@ -906,8 +906,20 @@ WaitForAllConnections(List *connectionList, bool raiseInterrupts)
else if (sendStatus == 0) else if (sendStatus == 0)
{ {
/* done writing, only wait for read events */ /* done writing, only wait for read events */
ModifyWaitEvent(waitEventSet, event->pos, WL_SOCKET_READABLE, bool success =
NULL); CitusModifyWaitEvent(waitEventSet, event->pos,
WL_SOCKET_READABLE, NULL);
if (!success)
{
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("connection establishment for "
"node %s:%d failed",
connection->hostname,
connection->port),
errhint("Check both the local and remote "
"server logs for the connection "
"establishment errors.")));
}
} }
} }
@ -1052,8 +1064,17 @@ BuildWaitEventSet(MultiConnection **allConnections, int totalConnectionCount,
* and writeability (server is ready to receive bytes). * and writeability (server is ready to receive bytes).
*/ */
int eventMask = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE; int eventMask = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE;
int waitEventSetIndex =
AddWaitEventToSet(waitEventSet, eventMask, sock, NULL, (void *) connection); CitusAddWaitEventSetToSet(waitEventSet, eventMask, sock,
NULL, (void *) connection);
if (waitEventSetIndex == WAIT_EVENT_SET_INDEX_FAILED)
{
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("connection establishment for node %s:%d failed",
connection->hostname, connection->port),
errhint("Check both the local and remote server logs for the "
"connection establishment errors.")));
}
} }
/* /*

View File

@ -178,8 +178,6 @@
#include "utils/timestamp.h" #include "utils/timestamp.h"
#define SLOW_START_DISABLED 0 #define SLOW_START_DISABLED 0
#define WAIT_EVENT_SET_INDEX_NOT_INITIALIZED -1
#define WAIT_EVENT_SET_INDEX_FAILED -2
/* /*
@ -678,10 +676,6 @@ static int UsableConnectionCount(WorkerPool *workerPool);
static long NextEventTimeout(DistributedExecution *execution); static long NextEventTimeout(DistributedExecution *execution);
static WaitEventSet * BuildWaitEventSet(List *sessionList); static WaitEventSet * BuildWaitEventSet(List *sessionList);
static void RebuildWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList); static void RebuildWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList);
static int CitusAddWaitEventSetToSet(WaitEventSet *set, uint32 events, pgsocket fd,
Latch *latch, void *user_data);
static bool CitusModifyWaitEvent(WaitEventSet *set, int pos, uint32 events,
Latch *latch);
static TaskPlacementExecution * PopPlacementExecution(WorkerSession *session); static TaskPlacementExecution * PopPlacementExecution(WorkerSession *session);
static TaskPlacementExecution * PopAssignedPlacementExecution(WorkerSession *session); static TaskPlacementExecution * PopAssignedPlacementExecution(WorkerSession *session);
static TaskPlacementExecution * PopUnassignedPlacementExecution(WorkerPool *workerPool); static TaskPlacementExecution * PopUnassignedPlacementExecution(WorkerPool *workerPool);
@ -5367,6 +5361,19 @@ BuildWaitEventSet(List *sessionList)
CitusAddWaitEventSetToSet(waitEventSet, connection->waitFlags, sock, CitusAddWaitEventSetToSet(waitEventSet, connection->waitFlags, sock,
NULL, (void *) session); NULL, (void *) session);
session->waitEventSetIndex = waitEventSetIndex; session->waitEventSetIndex = waitEventSetIndex;
/*
* Inform failed to add to wait event set with a debug message as this
* is too detailed information for users.
*/
if (session->waitEventSetIndex == WAIT_EVENT_SET_INDEX_FAILED)
{
ereport(DEBUG1, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("Adding wait event for node %s:%d failed. "
"The socket was: %d",
session->workerPool->nodeName,
session->workerPool->nodePort, sock)));
}
} }
CitusAddWaitEventSetToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, CitusAddWaitEventSetToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL,
@ -5378,64 +5385,6 @@ BuildWaitEventSet(List *sessionList)
} }
/*
* CitusAddWaitEventSetToSet is a wrapper around Postgres' AddWaitEventToSet().
*
* AddWaitEventToSet() may throw hard errors. For example, when the
* underlying socket for a connection is closed by the remote server
* and already reflected by the OS, however Citus hasn't had a chance
* to get this information. In that case, if replication factor is >1,
* Citus can failover to other nodes for executing the query. Even if
* replication factor = 1, Citus can give much nicer errors.
*
* So CitusAddWaitEventSetToSet simply puts ModifyWaitEvent into a
* PG_TRY/PG_CATCH block in order to catch any hard errors, and
* returns this information to the caller.
*/
static int
CitusAddWaitEventSetToSet(WaitEventSet *set, uint32 events, pgsocket fd,
Latch *latch, void *user_data)
{
volatile int waitEventSetIndex = WAIT_EVENT_SET_INDEX_NOT_INITIALIZED;
MemoryContext savedContext = CurrentMemoryContext;
PG_TRY();
{
waitEventSetIndex =
AddWaitEventToSet(set, events, fd, latch, (void *) user_data);
}
PG_CATCH();
{
/*
* We might be in an arbitrary memory context when the
* error is thrown and we should get back to one we had
* at PG_TRY() time, especially because we are not
* re-throwing the error.
*/
MemoryContextSwitchTo(savedContext);
FlushErrorState();
if (user_data != NULL)
{
WorkerSession *workerSession = (WorkerSession *) user_data;
ereport(DEBUG1, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("Adding wait event for node %s:%d failed. "
"The socket was: %d",
workerSession->workerPool->nodeName,
workerSession->workerPool->nodePort, fd)));
}
/* let the callers know about the failure */
waitEventSetIndex = WAIT_EVENT_SET_INDEX_FAILED;
}
PG_END_TRY();
return waitEventSetIndex;
}
/* /*
* GetEventSetSize returns the event set size for a list of sessions. * GetEventSetSize returns the event set size for a list of sessions.
*/ */
@ -5485,7 +5434,7 @@ RebuildWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList)
if (!success) if (!success)
{ {
ereport(DEBUG1, (errcode(ERRCODE_CONNECTION_FAILURE), ereport(DEBUG1, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("Modifying wait event for node %s:%d failed. " errmsg("modifying wait event for node %s:%d failed. "
"The wait event index was: %d", "The wait event index was: %d",
connection->hostname, connection->port, connection->hostname, connection->port,
waitEventSetIndex))); waitEventSetIndex)));
@ -5496,51 +5445,6 @@ RebuildWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList)
} }
/*
* CitusModifyWaitEvent is a wrapper around Postgres' ModifyWaitEvent().
*
* ModifyWaitEvent may throw hard errors. For example, when the underlying
* socket for a connection is closed by the remote server and already
* reflected by the OS, however Citus hasn't had a chance to get this
* information. In that case, if replication factor is >1, Citus can
* failover to other nodes for executing the query. Even if replication
* factor = 1, Citus can give much nicer errors.
*
* So CitusModifyWaitEvent simply puts ModifyWaitEvent into a PG_TRY/PG_CATCH
* block in order to catch any hard errors, and returns this information to the
* caller.
*/
static bool
CitusModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
{
volatile bool success = true;
MemoryContext savedContext = CurrentMemoryContext;
PG_TRY();
{
ModifyWaitEvent(set, pos, events, latch);
}
PG_CATCH();
{
/*
* We might be in an arbitrary memory context when the
* error is thrown and we should get back to one we had
* at PG_TRY() time, especially because we are not
* re-throwing the error.
*/
MemoryContextSwitchTo(savedContext);
FlushErrorState();
/* let the callers know about the failure */
success = false;
}
PG_END_TRY();
return success;
}
/* /*
* SetLocalForceMaxQueryParallelization is simply a C interface for setting * SetLocalForceMaxQueryParallelization is simply a C interface for setting
* the following: * the following:

View File

@ -18,6 +18,7 @@
#include "lib/ilist.h" #include "lib/ilist.h"
#include "pg_config.h" #include "pg_config.h"
#include "portability/instr_time.h" #include "portability/instr_time.h"
#include "storage/latch.h"
#include "utils/guc.h" #include "utils/guc.h"
#include "utils/hsearch.h" #include "utils/hsearch.h"
#include "utils/timestamp.h" #include "utils/timestamp.h"
@ -34,6 +35,10 @@
/* application name used for internal connections in rebalancer */ /* application name used for internal connections in rebalancer */
#define CITUS_REBALANCER_NAME "citus_rebalancer" #define CITUS_REBALANCER_NAME "citus_rebalancer"
/* deal with waiteventset errors */
#define WAIT_EVENT_SET_INDEX_NOT_INITIALIZED -1
#define WAIT_EVENT_SET_INDEX_FAILED -2
/* forward declare, to avoid forcing large headers on everyone */ /* forward declare, to avoid forcing large headers on everyone */
struct pg_conn; /* target of the PGconn typedef */ struct pg_conn; /* target of the PGconn typedef */
struct MemoryContextData; struct MemoryContextData;
@ -284,6 +289,13 @@ extern bool IsCitusInternalBackend(void);
extern bool IsRebalancerInternalBackend(void); extern bool IsRebalancerInternalBackend(void);
extern void MarkConnectionConnected(MultiConnection *connection); extern void MarkConnectionConnected(MultiConnection *connection);
/* waiteventset utilities */
extern int CitusAddWaitEventSetToSet(WaitEventSet *set, uint32 events, pgsocket fd,
Latch *latch, void *user_data);
extern bool CitusModifyWaitEvent(WaitEventSet *set, int pos, uint32 events,
Latch *latch);
/* time utilities */ /* time utilities */
extern double MillisecondsPassedSince(instr_time moment); extern double MillisecondsPassedSince(instr_time moment);
extern long MillisecondsToTimeout(instr_time start, long msAfterStart); extern long MillisecondsToTimeout(instr_time start, long msAfterStart);