From 3dac0a4d1476757a80902159d0fbd875fd7437ab Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Tue, 31 May 2016 12:19:09 -0700 Subject: [PATCH] Rely less on remote_task_check_interval. When executing queries with citus.task_executor = 'real-time', query execution could, so far, spend a significant amount of time sleeping. That's because we were a) sleeping after several phases of query execution, even if we're not waiting for network IO b) sleeping for a fixed amount of time when waiting for network IO; often a lot longer than actually required. Just reducing the amount of time slept isn't a real solution, because that just increases CPU usage. Instead have the real-time executor's ManageTaskExecution return whether a task is currently being processed, waiting for reads or writes, or failed. When all tasks are waiting for IO use poll() to wait for IO readyness. That requires to slightly redefine how connection timeouts are handled: before we counted the number of times ManageTaskExecution() was called, and compared that with the timeout divided by the task check interval. That, if processing of tasks took a while, could significantly increase the time till a timeout occurred. Because it was based on the ManageTaskExecution() being called on a constant interval, this approach isn't feasible anymore. Instead measure the actual time since connection establishment was started. That could in theory, if task processing takes a very long time, lead to few passes over PQconnectPoll(). The problem of sleeping too much also exists for the 'task-tracker' executor, but is generally less problematic there, as processing the individual tasks usually will take longer. That said, for e.g. the regression tests it'd be helpful to use a similar approach. --- .../executor/multi_client_executor.c | 169 +++++++++++++++++- .../executor/multi_real_time_executor.c | 81 +++++++-- .../executor/multi_server_executor.c | 2 +- .../executor/multi_task_tracker_executor.c | 7 +- .../distributed/multi_client_executor.h | 35 +++- .../distributed/multi_physical_planner.h | 1 + .../distributed/multi_server_executor.h | 2 +- 7 files changed, 274 insertions(+), 23 deletions(-) diff --git a/src/backend/distributed/executor/multi_client_executor.c b/src/backend/distributed/executor/multi_client_executor.c index 76c53de1e..0cccff038 100644 --- a/src/backend/distributed/executor/multi_client_executor.c +++ b/src/backend/distributed/executor/multi_client_executor.c @@ -21,6 +21,7 @@ #include "distributed/metadata_cache.h" #include "distributed/connection_cache.h" #include "distributed/multi_client_executor.h" +#include "distributed/multi_server_executor.h" #include #include @@ -226,9 +227,12 @@ MultiClientConnectPoll(int32 connectionId) if (readReady) { ClientPollingStatusArray[connectionId] = PQconnectPoll(connection); + connectStatus = CLIENT_CONNECTION_BUSY; + } + else + { + connectStatus = CLIENT_CONNECTION_BUSY_READ; } - - connectStatus = CLIENT_CONNECTION_BUSY; } else if (pollingStatus == PGRES_POLLING_WRITING) { @@ -236,9 +240,12 @@ MultiClientConnectPoll(int32 connectionId) if (writeReady) { ClientPollingStatusArray[connectionId] = PQconnectPoll(connection); + connectStatus = CLIENT_CONNECTION_BUSY; + } + else + { + connectStatus = CLIENT_CONNECTION_BUSY_WRITE; } - - connectStatus = CLIENT_CONNECTION_BUSY; } else if (pollingStatus == PGRES_POLLING_FAILED) { @@ -691,6 +698,160 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor) } +/* + * MultiClientCreateWaitInfo creates a WaitInfo structure, capable of keeping + * track of what maxConnections connections are waiting for; to allow + * efficiently waiting for all of them at once. + * + * Connections can be added using MultiClientRegisterWait(). All added + * connections can then be waited upon together using MultiClientWait(). + */ +WaitInfo * +MultiClientCreateWaitInfo(int maxConnections) +{ + WaitInfo *waitInfo = palloc(sizeof(WaitInfo)); + + waitInfo->maxWaiters = maxConnections; + waitInfo->pollfds = palloc(maxConnections * sizeof(struct pollfd)); + + /* initialize remaining fields */ + MultiClientResetWaitInfo(waitInfo); + + return waitInfo; +} + + +/* MultiClientResetWaitInfo clears all pending waits from a WaitInfo. */ +void +MultiClientResetWaitInfo(WaitInfo *waitInfo) +{ + waitInfo->registeredWaiters = 0; + waitInfo->haveReadyWaiter = false; + waitInfo->haveFailedWaiter = false; +} + + +/* MultiClientFreeWaitInfo frees a resources associated with a waitInfo struct. */ +void +MultiClientFreeWaitInfo(WaitInfo *waitInfo) +{ + pfree(waitInfo->pollfds); + pfree(waitInfo); +} + + +/* + * MultiClientRegisterWait adds a connection to be waited upon, waiting for + * executionStatus. + */ +void +MultiClientRegisterWait(WaitInfo *waitInfo, TaskExecutionStatus executionStatus, + int32 connectionId) +{ + PGconn *connection = NULL; + struct pollfd *pollfd = NULL; + + Assert(waitInfo->registeredWaiters < waitInfo->maxWaiters); + + if (executionStatus == TASK_STATUS_READY) + { + waitInfo->haveReadyWaiter = true; + return; + } + else if (executionStatus == TASK_STATUS_ERROR) + { + waitInfo->haveFailedWaiter = true; + return; + } + + connection = ClientConnectionArray[connectionId]; + pollfd = &waitInfo->pollfds[waitInfo->registeredWaiters]; + pollfd->fd = PQsocket(connection); + if (executionStatus == TASK_STATUS_SOCKET_READ) + { + pollfd->events = POLLERR | POLLIN; + } + else if (executionStatus == TASK_STATUS_SOCKET_WRITE) + { + pollfd->events = POLLERR | POLLOUT; + } + waitInfo->registeredWaiters++; +} + + +/* + * MultiClientWait waits until at least one connection added with + * MultiClientRegisterWait is ready to be processed again. + */ +void +MultiClientWait(WaitInfo *waitInfo) +{ + long sleepIntervalPerCycle = RemoteTaskCheckInterval * 1000L; + + /* + * If we had a failure, we always want to sleep for a bit, to prevent + * flooding the other system, probably making the situation worse. + */ + if (waitInfo->haveFailedWaiter) + { + pg_usleep(sleepIntervalPerCycle); + return; + } + + /* if there are tasks that already need attention again, don't wait */ + if (waitInfo->haveReadyWaiter) + { + return; + } + + while (true) + { + /* + * Wait for activity on any of the sockets. Limit the maximum time + * spent waiting in one wait cycle, as insurance against edge + * cases. For efficiency we don't want wake up quite as often as + * citus.remote_task_check_interval, so rather arbitrarily sleep ten + * times as long. + */ + int rc = poll(waitInfo->pollfds, waitInfo->registeredWaiters, + sleepIntervalPerCycle * 10); + + if (rc < 0) + { + /* + * Signals that arrive can interrupt our poll(). In that case just + * check for interrupts, and try again. Every other error is + * unexpected and treated as such. + */ + if (errno == EAGAIN || errno == EINTR) + { + CHECK_FOR_INTERRUPTS(); + + /* maximum wait starts at max again, but that's ok, it's just a stopgap */ + continue; + } + else + { + ereport(ERROR, (errcode_for_file_access(), + errmsg("poll failed: %m"))); + } + } + else if (rc == 0) + { + ereport(DEBUG2, + (errmsg("waiting for activity on tasks took longer than %ld ms", + (long) RemoteTaskCheckInterval * 10))); + } + + /* + * At least one fd changed received a readiness notification, time to + * process tasks again. + */ + return; + } +} + + /* * ClearRemainingResults reads result objects from the connection until we get * null, and clears these results. This is the last step in completing an async diff --git a/src/backend/distributed/executor/multi_real_time_executor.c b/src/backend/distributed/executor/multi_real_time_executor.c index 665276ca8..230da5955 100644 --- a/src/backend/distributed/executor/multi_real_time_executor.c +++ b/src/backend/distributed/executor/multi_real_time_executor.c @@ -21,6 +21,7 @@ #include #include +#include #include "commands/dbcommands.h" #include "distributed/multi_client_executor.h" @@ -28,10 +29,12 @@ #include "distributed/multi_server_executor.h" #include "distributed/worker_protocol.h" #include "storage/fd.h" +#include "utils/timestamp.h" /* Local functions forward declarations */ -static ConnectAction ManageTaskExecution(Task *task, TaskExecution *taskExecution); +static ConnectAction ManageTaskExecution(Task *task, TaskExecution *taskExecution, + TaskExecutionStatus *executionStatus); static bool TaskExecutionReadyToStart(TaskExecution *taskExecution); static bool TaskExecutionCompleted(TaskExecution *taskExecution); static void CancelTaskExecutionIfActive(TaskExecution *taskExecution); @@ -76,6 +79,7 @@ MultiRealTimeExecute(Job *job) List *workerNodeList = NIL; HTAB *workerHash = NULL; const char *workerHashName = "Worker node hash"; + WaitInfo *waitInfo = MultiClientCreateWaitInfo(list_length(taskList)); workerNodeList = WorkerNodeList(); workerHash = WorkerHash(workerHashName, workerNodeList); @@ -99,12 +103,15 @@ MultiRealTimeExecute(Job *job) ListCell *taskCell = NULL; ListCell *taskExecutionCell = NULL; + MultiClientResetWaitInfo(waitInfo); + forboth(taskCell, taskList, taskExecutionCell, taskExecutionList) { Task *task = (Task *) lfirst(taskCell); TaskExecution *taskExecution = (TaskExecution *) lfirst(taskExecutionCell); ConnectAction connectAction = CONNECT_ACTION_NONE; WorkerNodeState *workerNodeState = NULL; + TaskExecutionStatus executionStatus; workerNodeState = LookupWorkerForTask(workerHash, task, taskExecution); @@ -117,7 +124,7 @@ MultiRealTimeExecute(Job *job) } /* call the function that performs the core task execution logic */ - connectAction = ManageTaskExecution(task, taskExecution); + connectAction = ManageTaskExecution(task, taskExecution, &executionStatus); /* update the connection counter for throttling */ UpdateConnectionCounter(workerNodeState, connectAction); @@ -139,20 +146,38 @@ MultiRealTimeExecute(Job *job) { completedTaskCount++; } + else + { + uint32 currentIndex = taskExecution->currentNodeIndex; + int32 *connectionIdArray = taskExecution->connectionIdArray; + int32 connectionId = connectionIdArray[currentIndex]; + + /* + * If not done with the task yet, make note of what this task + * and its associated connection is waiting for. + */ + MultiClientRegisterWait(waitInfo, executionStatus, connectionId); + } } - /* check if all tasks completed; otherwise sleep to avoid tight loop */ + /* + * Check if all tasks completed; otherwise wait as appropriate to + * avoid a tight loop. That means we immediately continue if tasks are + * ready to be processed further, and block when we're waiting for + * network IO. + */ if (completedTaskCount == taskCount) { allTasksCompleted = true; } else { - long sleepIntervalPerCycle = RemoteTaskCheckInterval * 1000L; - pg_usleep(sleepIntervalPerCycle); + MultiClientWait(waitInfo); } } + MultiClientFreeWaitInfo(waitInfo); + /* * We prevent cancel/die interrupts until we clean up connections to worker * nodes. Note that for the above while loop, if the user Ctrl+C's a query @@ -172,6 +197,9 @@ MultiRealTimeExecute(Job *job) /* * If cancel might have been sent, give remote backends some time to flush * their responses. This avoids some broken pipe logs on the backend-side. + * + * FIXME: This shouldn't be dependant on RemoteTaskCheckInterval; they're + * unrelated type of delays. */ if (taskFailed || QueryCancelPending) { @@ -213,10 +241,12 @@ MultiRealTimeExecute(Job *job) * Note that this function directly manages a task's execution by opening up a * separate connection to the worker node for each execution. The function * returns a ConnectAction enum indicating whether a connection has been opened - * or closed in this call. + * or closed in this call. Via the executionStatus parameter this function returns + * what a Task is blocked on. */ static ConnectAction -ManageTaskExecution(Task *task, TaskExecution *taskExecution) +ManageTaskExecution(Task *task, TaskExecution *taskExecution, + TaskExecutionStatus *executionStatus) { TaskExecStatus *taskStatusArray = taskExecution->taskStatusArray; int32 *connectionIdArray = taskExecution->connectionIdArray; @@ -229,6 +259,9 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution) uint32 nodePort = taskPlacement->nodePort; ConnectAction connectAction = CONNECT_ACTION_NONE; + /* as most state transitions don't require blocking, default to not waiting */ + *executionStatus = TASK_STATUS_READY; + switch (currentStatus) { case EXEC_TASK_CONNECT_START: @@ -246,12 +279,14 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution) if (connectionId != INVALID_CONNECTION_ID) { taskStatusArray[currentIndex] = EXEC_TASK_CONNECT_POLL; - taskExecution->connectPollCount = 0; + taskExecution->connectStartTime = GetCurrentTimestamp(); connectAction = CONNECT_ACTION_OPENED; } else { + *executionStatus = TASK_STATUS_ERROR; AdjustStateForFailure(taskExecution); + break; } break; @@ -273,6 +308,17 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution) } else if (pollStatus == CLIENT_CONNECTION_BUSY) { + /* immediately retry */ + taskStatusArray[currentIndex] = EXEC_TASK_CONNECT_POLL; + } + else if (pollStatus == CLIENT_CONNECTION_BUSY_READ) + { + *executionStatus = TASK_STATUS_SOCKET_READ; + taskStatusArray[currentIndex] = EXEC_TASK_CONNECT_POLL; + } + else if (pollStatus == CLIENT_CONNECTION_BUSY_WRITE) + { + *executionStatus = TASK_STATUS_SOCKET_WRITE; taskStatusArray[currentIndex] = EXEC_TASK_CONNECT_POLL; } else if (pollStatus == CLIENT_CONNECTION_BAD) @@ -281,12 +327,12 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution) } /* now check if we have been trying to connect for too long */ - taskExecution->connectPollCount++; - if (pollStatus == CLIENT_CONNECTION_BUSY) + if (pollStatus == CLIENT_CONNECTION_BUSY_READ || + pollStatus == CLIENT_CONNECTION_BUSY_WRITE) { - uint32 maxCount = REMOTE_NODE_CONNECT_TIMEOUT / RemoteTaskCheckInterval; - uint32 currentCount = taskExecution->connectPollCount; - if (currentCount >= maxCount) + if (TimestampDifferenceExceeds(taskExecution->connectStartTime, + GetCurrentTimestamp(), + REMOTE_NODE_CONNECT_TIMEOUT)) { ereport(WARNING, (errmsg("could not establish asynchronous " "connection after %u ms", @@ -317,6 +363,12 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution) /* try next worker node */ AdjustStateForFailure(taskExecution); + /* + * Add a delay, to avoid potentially excerbating problems by + * looping quickly + */ + *executionStatus = TASK_STATUS_ERROR; + break; } @@ -372,6 +424,7 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution) /* check if query results are in progress or unavailable */ if (resultStatus == CLIENT_RESULT_BUSY) { + *executionStatus = TASK_STATUS_SOCKET_READ; taskStatusArray[currentIndex] = EXEC_FETCH_TASK_RUNNING; break; } @@ -446,6 +499,7 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution) if (resultStatus == CLIENT_RESULT_BUSY) { taskStatusArray[currentIndex] = EXEC_COMPUTE_TASK_RUNNING; + *executionStatus = TASK_STATUS_SOCKET_READ; break; } else if (resultStatus == CLIENT_RESULT_UNAVAILABLE) @@ -511,6 +565,7 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution) if (copyStatus == CLIENT_COPY_MORE) { taskStatusArray[currentIndex] = EXEC_COMPUTE_TASK_COPYING; + *executionStatus = TASK_STATUS_SOCKET_READ; } else if (copyStatus == CLIENT_COPY_DONE) { diff --git a/src/backend/distributed/executor/multi_server_executor.c b/src/backend/distributed/executor/multi_server_executor.c index 443ca813e..48127ed92 100644 --- a/src/backend/distributed/executor/multi_server_executor.c +++ b/src/backend/distributed/executor/multi_server_executor.c @@ -226,7 +226,7 @@ InitTaskExecution(Task *task, TaskExecStatus initialTaskExecStatus) taskExecution->jobId = task->jobId; taskExecution->taskId = task->taskId; taskExecution->nodeCount = nodeCount; - taskExecution->connectPollCount = 0; + taskExecution->connectStartTime = 0; taskExecution->currentNodeIndex = 0; taskExecution->dataFetchTaskIndex = -1; taskExecution->failureCount = 0; diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index fd0638741..0a8f6681c 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -850,7 +850,9 @@ TrackerConnectPoll(TaskTracker *taskTracker) { taskTracker->trackerStatus = TRACKER_CONNECTED; } - else if (pollStatus == CLIENT_CONNECTION_BUSY) + else if (pollStatus == CLIENT_CONNECTION_BUSY || + pollStatus == CLIENT_CONNECTION_BUSY_READ || + pollStatus == CLIENT_CONNECTION_BUSY_WRITE) { taskTracker->trackerStatus = TRACKER_CONNECT_POLL; } @@ -864,7 +866,8 @@ TrackerConnectPoll(TaskTracker *taskTracker) /* now check if we have been trying to connect for too long */ taskTracker->connectPollCount++; - if (pollStatus == CLIENT_CONNECTION_BUSY) + if (pollStatus == CLIENT_CONNECTION_BUSY_READ || + pollStatus == CLIENT_CONNECTION_BUSY_WRITE) { uint32 maxCount = REMOTE_NODE_CONNECT_TIMEOUT / RemoteTaskCheckInterval; uint32 currentCount = taskTracker->connectPollCount; diff --git a/src/include/distributed/multi_client_executor.h b/src/include/distributed/multi_client_executor.h index 1486cd692..ab5b9bc38 100644 --- a/src/include/distributed/multi_client_executor.h +++ b/src/include/distributed/multi_client_executor.h @@ -14,7 +14,6 @@ #ifndef MULTI_CLIENT_EXECUTOR_H #define MULTI_CLIENT_EXECUTOR_H - #define INVALID_CONNECTION_ID -1 /* identifies an invalid connection */ #define CLIENT_CONNECT_TIMEOUT 5 /* connection timeout in seconds */ #define MAX_CONNECTION_COUNT 2048 /* simultaneous client connection count */ @@ -28,7 +27,9 @@ typedef enum CLIENT_INVALID_CONNECT = 0, CLIENT_CONNECTION_BAD = 1, CLIENT_CONNECTION_BUSY = 2, - CLIENT_CONNECTION_READY = 3 + CLIENT_CONNECTION_BUSY_READ = 3, + CLIENT_CONNECTION_BUSY_WRITE = 4, + CLIENT_CONNECTION_READY = 5 } ConnectStatus; @@ -72,6 +73,29 @@ typedef enum } BatchQueryStatus; +/* Enumeration to track whether a task is ready to run and, if not, what it's blocked on*/ +typedef enum TaskExecutionStatus +{ + TASK_STATUS_INVALID = 0, + TASK_STATUS_ERROR, /* error occured */ + TASK_STATUS_READY, /* task ready to be processed further */ + TASK_STATUS_SOCKET_READ, /* waiting for connection to become ready for reads */ + TASK_STATUS_SOCKET_WRITE /* waiting for connection to become ready for writes */ +} TaskExecutionStatus; + + +struct pollfd; /* forward declared, to avoid having to include poll.h */ + +typedef struct WaitInfo +{ + int maxWaiters; + struct pollfd *pollfds; + int registeredWaiters; + bool haveReadyWaiter; + bool haveFailedWaiter; +} WaitInfo; + + /* Function declarations for executing client-side (libpq) logic. */ extern int32 MultiClientConnect(const char *nodeName, uint32 nodePort, const char *nodeDatabase, const char *nodeUser); @@ -91,6 +115,13 @@ extern BatchQueryStatus MultiClientBatchResult(int32 connectionId, void **queryR int *rowCount, int *columnCount); extern char * MultiClientGetValue(void *queryResult, int rowIndex, int columnIndex); extern void MultiClientClearResult(void *queryResult); +extern WaitInfo * MultiClientCreateWaitInfo(int maxConnections); + +extern void MultiClientResetWaitInfo(WaitInfo *waitInfo); +extern void MultiClientFreeWaitInfo(WaitInfo *waitInfo); +extern void MultiClientRegisterWait(WaitInfo *waitInfo, TaskExecutionStatus waitStatus, + int32 connectionId); +extern void MultiClientWait(WaitInfo *waitInfo); #endif /* MULTI_CLIENT_EXECUTOR_H */ diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index fa5f03c24..ff769d18e 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -14,6 +14,7 @@ #ifndef MULTI_PHYSICAL_PLANNER_H #define MULTI_PHYSICAL_PLANNER_H +#include "datatype/timestamp.h" #include "distributed/citus_nodes.h" #include "distributed/master_metadata_utility.h" #include "distributed/multi_logical_planner.h" diff --git a/src/include/distributed/multi_server_executor.h b/src/include/distributed/multi_server_executor.h index 96647de64..741829857 100644 --- a/src/include/distributed/multi_server_executor.h +++ b/src/include/distributed/multi_server_executor.h @@ -121,7 +121,7 @@ struct TaskExecution TransmitExecStatus *transmitStatusArray; int32 *connectionIdArray; int32 *fileDescriptorArray; - uint32 connectPollCount; + TimestampTz connectStartTime; uint32 nodeCount; uint32 currentNodeIndex; uint32 querySourceNodeIndex; /* only applies to map fetch tasks */