diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 6e9714101..df2a96d9f 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -28,6 +28,7 @@ #include "utils/memutils.h" +int NodeConnectionTimeout = 5000; HTAB *ConnectionHash = NULL; MemoryContext ConnectionContext = NULL; @@ -455,10 +456,10 @@ FinishConnectionEstablishment(MultiConnection *connection) if (TimestampDifferenceExceeds(connection->connectionStart, GetCurrentTimestamp(), - CLIENT_CONNECT_TIMEOUT_SECONDS_INT * 1000)) + NodeConnectionTimeout)) { ereport(WARNING, (errmsg("could not establish connection after %u ms", - CLIENT_CONNECT_TIMEOUT_SECONDS_INT * 1000))); + NodeConnectionTimeout))); /* close connection, otherwise we take up resource on the other side */ PQfinish(connection->pgConn); diff --git a/src/backend/distributed/executor/multi_real_time_executor.c b/src/backend/distributed/executor/multi_real_time_executor.c index 86a069584..5e0f54db8 100644 --- a/src/backend/distributed/executor/multi_real_time_executor.c +++ b/src/backend/distributed/executor/multi_real_time_executor.c @@ -24,6 +24,7 @@ #include #include "commands/dbcommands.h" +#include "distributed/connection_management.h" #include "distributed/multi_client_executor.h" #include "distributed/multi_physical_planner.h" #include "distributed/multi_server_executor.h" @@ -332,11 +333,11 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution, { if (TimestampDifferenceExceeds(taskExecution->connectStartTime, GetCurrentTimestamp(), - REMOTE_NODE_CONNECT_TIMEOUT)) + NodeConnectionTimeout)) { ereport(WARNING, (errmsg("could not establish asynchronous " "connection after %u ms", - REMOTE_NODE_CONNECT_TIMEOUT))); + NodeConnectionTimeout))); taskStatusArray[currentIndex] = EXEC_TASK_FAILED; } diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index 8bdcdacd3..9e32a9446 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -22,9 +22,11 @@ #include #include +#include #include "commands/dbcommands.h" #include "distributed/citus_nodes.h" +#include "distributed/connection_management.h" #include "distributed/multi_client_executor.h" #include "distributed/multi_physical_planner.h" #include "distributed/multi_server_executor.h" @@ -879,13 +881,14 @@ TrackerConnectPoll(TaskTracker *taskTracker) if (pollStatus == CLIENT_CONNECTION_BUSY_READ || pollStatus == CLIENT_CONNECTION_BUSY_WRITE) { - uint32 maxCount = REMOTE_NODE_CONNECT_TIMEOUT / RemoteTaskCheckInterval; + uint32 maxCount = + ceil(NodeConnectionTimeout * 1.0f / RemoteTaskCheckInterval); uint32 currentCount = taskTracker->connectPollCount; if (currentCount >= maxCount) { ereport(WARNING, (errmsg("could not establish asynchronous " "connection after %u ms", - REMOTE_NODE_CONNECT_TIMEOUT))); + NodeConnectionTimeout))); taskTracker->trackerStatus = TRACKER_CONNECTION_FAILED; @@ -2779,7 +2782,6 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask) uint64 jobId = jobCleanupTask->jobId; List *taskTrackerList = NIL; List *remainingTaskTrackerList = NIL; - const long timeoutDuration = 4000; /* milliseconds */ const long statusCheckInterval = 10000; /* microseconds */ bool timedOut = false; TimestampTz startTime = 0; @@ -2855,7 +2857,8 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask) pg_usleep(statusCheckInterval); currentTime = GetCurrentTimestamp(); - timedOut = TimestampDifferenceExceeds(startTime, currentTime, timeoutDuration); + timedOut = TimestampDifferenceExceeds(startTime, currentTime, + NodeConnectionTimeout); foreach(activeTaskTrackerCell, activeTackTrackerList) { diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index ac0c81cbd..bc176a110 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -19,6 +19,7 @@ #include "commands/explain.h" #include "executor/executor.h" #include "distributed/citus_nodefuncs.h" +#include "distributed/connection_management.h" #include "distributed/commit_protocol.h" #include "distributed/connection_management.h" #include "distributed/master_protocol.h" @@ -203,6 +204,16 @@ CreateRequiredDirectories(void) static void RegisterCitusConfigVariables(void) { + DefineCustomIntVariable( + "citus.node_connection_timeout", + gettext_noop("Sets the maximum duration to connect to worker nodes."), + NULL, + &NodeConnectionTimeout, + 5000, 10, 60 * 60 * 1000, + PGC_USERSET, + GUC_UNIT_MS, + NULL, NULL, NULL); + /* keeping temporarily for updates from pre-6.0 versions */ DefineCustomStringVariable( "citus.worker_list_file", @@ -422,7 +433,7 @@ RegisterCitusConfigVariables(void) "progress. This configuration value sets the time " "interval between two consequent checks."), &RemoteTaskCheckInterval, - 10, 1, REMOTE_NODE_CONNECT_TIMEOUT, + 10, 1, INT_MAX, PGC_USERSET, GUC_UNIT_MS, NULL, NULL, NULL); diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index ddc0b26e1..969b652c9 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -20,8 +20,6 @@ /* maximum (textual) lengths of hostname and port */ #define MAX_NODE_LENGTH 255 /* includes 0 byte */ -#define CLIENT_CONNECT_TIMEOUT_SECONDS_INT 5 - /* forward declare, to avoid forcing large headers on everyone */ struct pg_conn; /* target of the PGconn typedef */ struct MemoryContextData; @@ -98,6 +96,9 @@ typedef struct ConnectionHashEntry dlist_head *connections; } ConnectionHashEntry; +/* maximum duration to wait for connection */ +extern int NodeConnectionTimeout; + /* the hash table */ extern HTAB *ConnectionHash; diff --git a/src/include/distributed/multi_server_executor.h b/src/include/distributed/multi_server_executor.h index 9067125bf..fd4713cbe 100644 --- a/src/include/distributed/multi_server_executor.h +++ b/src/include/distributed/multi_server_executor.h @@ -21,7 +21,6 @@ #define MAX_TASK_EXECUTION_FAILURES 3 /* allowed failure count for one task */ #define MAX_TRACKER_FAILURE_COUNT 3 /* allowed failure count for one tracker */ -#define REMOTE_NODE_CONNECT_TIMEOUT 4000 /* async connect timeout in ms */ #define RESERVED_FD_COUNT 64 /* file descriptors unavailable to executor */ /* copy out query results */