mirror of https://github.com/citusdata/citus.git
Add citus.node_connection_timeout GUC
parent
64c140e78e
commit
296e0bd33a
|
@ -28,6 +28,7 @@
|
|||
#include "utils/memutils.h"
|
||||
|
||||
|
||||
int NodeConnectionTimeout = 5000;
|
||||
HTAB *ConnectionHash = NULL;
|
||||
MemoryContext ConnectionContext = NULL;
|
||||
|
||||
|
@ -455,10 +456,10 @@ FinishConnectionEstablishment(MultiConnection *connection)
|
|||
|
||||
if (TimestampDifferenceExceeds(connection->connectionStart,
|
||||
GetCurrentTimestamp(),
|
||||
CLIENT_CONNECT_TIMEOUT_SECONDS_INT * 1000))
|
||||
NodeConnectionTimeout))
|
||||
{
|
||||
ereport(WARNING, (errmsg("could not establish connection after %u ms",
|
||||
CLIENT_CONNECT_TIMEOUT_SECONDS_INT * 1000)));
|
||||
NodeConnectionTimeout)));
|
||||
|
||||
/* close connection, otherwise we take up resource on the other side */
|
||||
PQfinish(connection->pgConn);
|
||||
|
|
|
@ -24,6 +24,7 @@
|
|||
#include <poll.h>
|
||||
|
||||
#include "commands/dbcommands.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/multi_client_executor.h"
|
||||
#include "distributed/multi_physical_planner.h"
|
||||
#include "distributed/multi_server_executor.h"
|
||||
|
@ -332,11 +333,11 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution,
|
|||
{
|
||||
if (TimestampDifferenceExceeds(taskExecution->connectStartTime,
|
||||
GetCurrentTimestamp(),
|
||||
REMOTE_NODE_CONNECT_TIMEOUT))
|
||||
NodeConnectionTimeout))
|
||||
{
|
||||
ereport(WARNING, (errmsg("could not establish asynchronous "
|
||||
"connection after %u ms",
|
||||
REMOTE_NODE_CONNECT_TIMEOUT)));
|
||||
NodeConnectionTimeout)));
|
||||
|
||||
taskStatusArray[currentIndex] = EXEC_TASK_FAILED;
|
||||
}
|
||||
|
|
|
@ -22,9 +22,11 @@
|
|||
|
||||
#include <sys/stat.h>
|
||||
#include <unistd.h>
|
||||
#include <math.h>
|
||||
|
||||
#include "commands/dbcommands.h"
|
||||
#include "distributed/citus_nodes.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/multi_client_executor.h"
|
||||
#include "distributed/multi_physical_planner.h"
|
||||
#include "distributed/multi_server_executor.h"
|
||||
|
@ -879,13 +881,14 @@ TrackerConnectPoll(TaskTracker *taskTracker)
|
|||
if (pollStatus == CLIENT_CONNECTION_BUSY_READ ||
|
||||
pollStatus == CLIENT_CONNECTION_BUSY_WRITE)
|
||||
{
|
||||
uint32 maxCount = REMOTE_NODE_CONNECT_TIMEOUT / RemoteTaskCheckInterval;
|
||||
uint32 maxCount =
|
||||
ceil(NodeConnectionTimeout * 1.0f / RemoteTaskCheckInterval);
|
||||
uint32 currentCount = taskTracker->connectPollCount;
|
||||
if (currentCount >= maxCount)
|
||||
{
|
||||
ereport(WARNING, (errmsg("could not establish asynchronous "
|
||||
"connection after %u ms",
|
||||
REMOTE_NODE_CONNECT_TIMEOUT)));
|
||||
NodeConnectionTimeout)));
|
||||
|
||||
taskTracker->trackerStatus = TRACKER_CONNECTION_FAILED;
|
||||
|
||||
|
@ -2779,7 +2782,6 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask)
|
|||
uint64 jobId = jobCleanupTask->jobId;
|
||||
List *taskTrackerList = NIL;
|
||||
List *remainingTaskTrackerList = NIL;
|
||||
const long timeoutDuration = 4000; /* milliseconds */
|
||||
const long statusCheckInterval = 10000; /* microseconds */
|
||||
bool timedOut = false;
|
||||
TimestampTz startTime = 0;
|
||||
|
@ -2855,7 +2857,8 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask)
|
|||
|
||||
pg_usleep(statusCheckInterval);
|
||||
currentTime = GetCurrentTimestamp();
|
||||
timedOut = TimestampDifferenceExceeds(startTime, currentTime, timeoutDuration);
|
||||
timedOut = TimestampDifferenceExceeds(startTime, currentTime,
|
||||
NodeConnectionTimeout);
|
||||
|
||||
foreach(activeTaskTrackerCell, activeTackTrackerList)
|
||||
{
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
#include "commands/explain.h"
|
||||
#include "executor/executor.h"
|
||||
#include "distributed/citus_nodefuncs.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/commit_protocol.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/master_protocol.h"
|
||||
|
@ -203,6 +204,16 @@ CreateRequiredDirectories(void)
|
|||
static void
|
||||
RegisterCitusConfigVariables(void)
|
||||
{
|
||||
DefineCustomIntVariable(
|
||||
"citus.node_connection_timeout",
|
||||
gettext_noop("Sets the maximum duration to connect to worker nodes."),
|
||||
NULL,
|
||||
&NodeConnectionTimeout,
|
||||
5000, 10, 60 * 60 * 1000,
|
||||
PGC_USERSET,
|
||||
GUC_UNIT_MS,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
/* keeping temporarily for updates from pre-6.0 versions */
|
||||
DefineCustomStringVariable(
|
||||
"citus.worker_list_file",
|
||||
|
@ -422,7 +433,7 @@ RegisterCitusConfigVariables(void)
|
|||
"progress. This configuration value sets the time "
|
||||
"interval between two consequent checks."),
|
||||
&RemoteTaskCheckInterval,
|
||||
10, 1, REMOTE_NODE_CONNECT_TIMEOUT,
|
||||
10, 1, INT_MAX,
|
||||
PGC_USERSET,
|
||||
GUC_UNIT_MS,
|
||||
NULL, NULL, NULL);
|
||||
|
|
|
@ -20,8 +20,6 @@
|
|||
/* maximum (textual) lengths of hostname and port */
|
||||
#define MAX_NODE_LENGTH 255 /* includes 0 byte */
|
||||
|
||||
#define CLIENT_CONNECT_TIMEOUT_SECONDS_INT 5
|
||||
|
||||
/* forward declare, to avoid forcing large headers on everyone */
|
||||
struct pg_conn; /* target of the PGconn typedef */
|
||||
struct MemoryContextData;
|
||||
|
@ -98,6 +96,9 @@ typedef struct ConnectionHashEntry
|
|||
dlist_head *connections;
|
||||
} ConnectionHashEntry;
|
||||
|
||||
/* maximum duration to wait for connection */
|
||||
extern int NodeConnectionTimeout;
|
||||
|
||||
/* the hash table */
|
||||
extern HTAB *ConnectionHash;
|
||||
|
||||
|
|
|
@ -21,7 +21,6 @@
|
|||
|
||||
#define MAX_TASK_EXECUTION_FAILURES 3 /* allowed failure count for one task */
|
||||
#define MAX_TRACKER_FAILURE_COUNT 3 /* allowed failure count for one tracker */
|
||||
#define REMOTE_NODE_CONNECT_TIMEOUT 4000 /* async connect timeout in ms */
|
||||
#define RESERVED_FD_COUNT 64 /* file descriptors unavailable to executor */
|
||||
|
||||
/* copy out query results */
|
||||
|
|
Loading…
Reference in New Issue