mirror of https://github.com/citusdata/citus.git
Merge pull request #912 from citusdata/add_timeout_guc
Add citus.node_connection_timeout GUCpull/1018/head
commit
a71b79983b
|
@ -28,6 +28,7 @@
|
||||||
#include "utils/memutils.h"
|
#include "utils/memutils.h"
|
||||||
|
|
||||||
|
|
||||||
|
int NodeConnectionTimeout = 5000;
|
||||||
HTAB *ConnectionHash = NULL;
|
HTAB *ConnectionHash = NULL;
|
||||||
MemoryContext ConnectionContext = NULL;
|
MemoryContext ConnectionContext = NULL;
|
||||||
|
|
||||||
|
@ -455,10 +456,10 @@ FinishConnectionEstablishment(MultiConnection *connection)
|
||||||
|
|
||||||
if (TimestampDifferenceExceeds(connection->connectionStart,
|
if (TimestampDifferenceExceeds(connection->connectionStart,
|
||||||
GetCurrentTimestamp(),
|
GetCurrentTimestamp(),
|
||||||
CLIENT_CONNECT_TIMEOUT_SECONDS_INT * 1000))
|
NodeConnectionTimeout))
|
||||||
{
|
{
|
||||||
ereport(WARNING, (errmsg("could not establish connection after %u ms",
|
ereport(WARNING, (errmsg("could not establish connection after %u ms",
|
||||||
CLIENT_CONNECT_TIMEOUT_SECONDS_INT * 1000)));
|
NodeConnectionTimeout)));
|
||||||
|
|
||||||
/* close connection, otherwise we take up resource on the other side */
|
/* close connection, otherwise we take up resource on the other side */
|
||||||
PQfinish(connection->pgConn);
|
PQfinish(connection->pgConn);
|
||||||
|
|
|
@ -24,6 +24,7 @@
|
||||||
#include <poll.h>
|
#include <poll.h>
|
||||||
|
|
||||||
#include "commands/dbcommands.h"
|
#include "commands/dbcommands.h"
|
||||||
|
#include "distributed/connection_management.h"
|
||||||
#include "distributed/multi_client_executor.h"
|
#include "distributed/multi_client_executor.h"
|
||||||
#include "distributed/multi_physical_planner.h"
|
#include "distributed/multi_physical_planner.h"
|
||||||
#include "distributed/multi_server_executor.h"
|
#include "distributed/multi_server_executor.h"
|
||||||
|
@ -332,11 +333,11 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution,
|
||||||
{
|
{
|
||||||
if (TimestampDifferenceExceeds(taskExecution->connectStartTime,
|
if (TimestampDifferenceExceeds(taskExecution->connectStartTime,
|
||||||
GetCurrentTimestamp(),
|
GetCurrentTimestamp(),
|
||||||
REMOTE_NODE_CONNECT_TIMEOUT))
|
NodeConnectionTimeout))
|
||||||
{
|
{
|
||||||
ereport(WARNING, (errmsg("could not establish asynchronous "
|
ereport(WARNING, (errmsg("could not establish asynchronous "
|
||||||
"connection after %u ms",
|
"connection after %u ms",
|
||||||
REMOTE_NODE_CONNECT_TIMEOUT)));
|
NodeConnectionTimeout)));
|
||||||
|
|
||||||
taskStatusArray[currentIndex] = EXEC_TASK_FAILED;
|
taskStatusArray[currentIndex] = EXEC_TASK_FAILED;
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,9 +22,11 @@
|
||||||
|
|
||||||
#include <sys/stat.h>
|
#include <sys/stat.h>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
|
#include <math.h>
|
||||||
|
|
||||||
#include "commands/dbcommands.h"
|
#include "commands/dbcommands.h"
|
||||||
#include "distributed/citus_nodes.h"
|
#include "distributed/citus_nodes.h"
|
||||||
|
#include "distributed/connection_management.h"
|
||||||
#include "distributed/multi_client_executor.h"
|
#include "distributed/multi_client_executor.h"
|
||||||
#include "distributed/multi_physical_planner.h"
|
#include "distributed/multi_physical_planner.h"
|
||||||
#include "distributed/multi_server_executor.h"
|
#include "distributed/multi_server_executor.h"
|
||||||
|
@ -879,13 +881,14 @@ TrackerConnectPoll(TaskTracker *taskTracker)
|
||||||
if (pollStatus == CLIENT_CONNECTION_BUSY_READ ||
|
if (pollStatus == CLIENT_CONNECTION_BUSY_READ ||
|
||||||
pollStatus == CLIENT_CONNECTION_BUSY_WRITE)
|
pollStatus == CLIENT_CONNECTION_BUSY_WRITE)
|
||||||
{
|
{
|
||||||
uint32 maxCount = REMOTE_NODE_CONNECT_TIMEOUT / RemoteTaskCheckInterval;
|
uint32 maxCount =
|
||||||
|
ceil(NodeConnectionTimeout * 1.0f / RemoteTaskCheckInterval);
|
||||||
uint32 currentCount = taskTracker->connectPollCount;
|
uint32 currentCount = taskTracker->connectPollCount;
|
||||||
if (currentCount >= maxCount)
|
if (currentCount >= maxCount)
|
||||||
{
|
{
|
||||||
ereport(WARNING, (errmsg("could not establish asynchronous "
|
ereport(WARNING, (errmsg("could not establish asynchronous "
|
||||||
"connection after %u ms",
|
"connection after %u ms",
|
||||||
REMOTE_NODE_CONNECT_TIMEOUT)));
|
NodeConnectionTimeout)));
|
||||||
|
|
||||||
taskTracker->trackerStatus = TRACKER_CONNECTION_FAILED;
|
taskTracker->trackerStatus = TRACKER_CONNECTION_FAILED;
|
||||||
|
|
||||||
|
@ -2779,7 +2782,6 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask)
|
||||||
uint64 jobId = jobCleanupTask->jobId;
|
uint64 jobId = jobCleanupTask->jobId;
|
||||||
List *taskTrackerList = NIL;
|
List *taskTrackerList = NIL;
|
||||||
List *remainingTaskTrackerList = NIL;
|
List *remainingTaskTrackerList = NIL;
|
||||||
const long timeoutDuration = 4000; /* milliseconds */
|
|
||||||
const long statusCheckInterval = 10000; /* microseconds */
|
const long statusCheckInterval = 10000; /* microseconds */
|
||||||
bool timedOut = false;
|
bool timedOut = false;
|
||||||
TimestampTz startTime = 0;
|
TimestampTz startTime = 0;
|
||||||
|
@ -2855,7 +2857,8 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask)
|
||||||
|
|
||||||
pg_usleep(statusCheckInterval);
|
pg_usleep(statusCheckInterval);
|
||||||
currentTime = GetCurrentTimestamp();
|
currentTime = GetCurrentTimestamp();
|
||||||
timedOut = TimestampDifferenceExceeds(startTime, currentTime, timeoutDuration);
|
timedOut = TimestampDifferenceExceeds(startTime, currentTime,
|
||||||
|
NodeConnectionTimeout);
|
||||||
|
|
||||||
foreach(activeTaskTrackerCell, activeTackTrackerList)
|
foreach(activeTaskTrackerCell, activeTackTrackerList)
|
||||||
{
|
{
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
#include "commands/explain.h"
|
#include "commands/explain.h"
|
||||||
#include "executor/executor.h"
|
#include "executor/executor.h"
|
||||||
#include "distributed/citus_nodefuncs.h"
|
#include "distributed/citus_nodefuncs.h"
|
||||||
|
#include "distributed/connection_management.h"
|
||||||
#include "distributed/commit_protocol.h"
|
#include "distributed/commit_protocol.h"
|
||||||
#include "distributed/connection_management.h"
|
#include "distributed/connection_management.h"
|
||||||
#include "distributed/master_protocol.h"
|
#include "distributed/master_protocol.h"
|
||||||
|
@ -203,6 +204,16 @@ CreateRequiredDirectories(void)
|
||||||
static void
|
static void
|
||||||
RegisterCitusConfigVariables(void)
|
RegisterCitusConfigVariables(void)
|
||||||
{
|
{
|
||||||
|
DefineCustomIntVariable(
|
||||||
|
"citus.node_connection_timeout",
|
||||||
|
gettext_noop("Sets the maximum duration to connect to worker nodes."),
|
||||||
|
NULL,
|
||||||
|
&NodeConnectionTimeout,
|
||||||
|
5000, 10, 60 * 60 * 1000,
|
||||||
|
PGC_USERSET,
|
||||||
|
GUC_UNIT_MS,
|
||||||
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
/* keeping temporarily for updates from pre-6.0 versions */
|
/* keeping temporarily for updates from pre-6.0 versions */
|
||||||
DefineCustomStringVariable(
|
DefineCustomStringVariable(
|
||||||
"citus.worker_list_file",
|
"citus.worker_list_file",
|
||||||
|
@ -422,7 +433,7 @@ RegisterCitusConfigVariables(void)
|
||||||
"progress. This configuration value sets the time "
|
"progress. This configuration value sets the time "
|
||||||
"interval between two consequent checks."),
|
"interval between two consequent checks."),
|
||||||
&RemoteTaskCheckInterval,
|
&RemoteTaskCheckInterval,
|
||||||
10, 1, REMOTE_NODE_CONNECT_TIMEOUT,
|
10, 1, INT_MAX,
|
||||||
PGC_USERSET,
|
PGC_USERSET,
|
||||||
GUC_UNIT_MS,
|
GUC_UNIT_MS,
|
||||||
NULL, NULL, NULL);
|
NULL, NULL, NULL);
|
||||||
|
|
|
@ -20,8 +20,6 @@
|
||||||
/* maximum (textual) lengths of hostname and port */
|
/* maximum (textual) lengths of hostname and port */
|
||||||
#define MAX_NODE_LENGTH 255 /* includes 0 byte */
|
#define MAX_NODE_LENGTH 255 /* includes 0 byte */
|
||||||
|
|
||||||
#define CLIENT_CONNECT_TIMEOUT_SECONDS_INT 5
|
|
||||||
|
|
||||||
/* forward declare, to avoid forcing large headers on everyone */
|
/* forward declare, to avoid forcing large headers on everyone */
|
||||||
struct pg_conn; /* target of the PGconn typedef */
|
struct pg_conn; /* target of the PGconn typedef */
|
||||||
struct MemoryContextData;
|
struct MemoryContextData;
|
||||||
|
@ -98,6 +96,9 @@ typedef struct ConnectionHashEntry
|
||||||
dlist_head *connections;
|
dlist_head *connections;
|
||||||
} ConnectionHashEntry;
|
} ConnectionHashEntry;
|
||||||
|
|
||||||
|
/* maximum duration to wait for connection */
|
||||||
|
extern int NodeConnectionTimeout;
|
||||||
|
|
||||||
/* the hash table */
|
/* the hash table */
|
||||||
extern HTAB *ConnectionHash;
|
extern HTAB *ConnectionHash;
|
||||||
|
|
||||||
|
|
|
@ -21,7 +21,6 @@
|
||||||
|
|
||||||
#define MAX_TASK_EXECUTION_FAILURES 3 /* allowed failure count for one task */
|
#define MAX_TASK_EXECUTION_FAILURES 3 /* allowed failure count for one task */
|
||||||
#define MAX_TRACKER_FAILURE_COUNT 3 /* allowed failure count for one tracker */
|
#define MAX_TRACKER_FAILURE_COUNT 3 /* allowed failure count for one tracker */
|
||||||
#define REMOTE_NODE_CONNECT_TIMEOUT 4000 /* async connect timeout in ms */
|
|
||||||
#define RESERVED_FD_COUNT 64 /* file descriptors unavailable to executor */
|
#define RESERVED_FD_COUNT 64 /* file descriptors unavailable to executor */
|
||||||
|
|
||||||
/* copy out query results */
|
/* copy out query results */
|
||||||
|
|
Loading…
Reference in New Issue