Rely less on remote_task_check_interval.

When executing queries with citus.task_executor = 'real-time', query
execution could, so far, spend a significant amount of time
sleeping. That's because we were
a) sleeping after several phases of query execution, even if we're not
   waiting for network IO
b) sleeping for a fixed amount of time when waiting for network IO;
   often a lot longer than actually required.
Just reducing the amount of time slept isn't a real solution, because
that just increases CPU usage.

Instead have the real-time executor's ManageTaskExecution return whether
a task is currently being processed, waiting for reads or writes, or
failed. When all tasks are waiting for IO use poll() to wait for IO
readyness.

That requires to slightly redefine how connection timeouts are handled:
before we counted the number of times ManageTaskExecution() was called,
and compared that with the timeout divided by the task check
interval. That, if processing of tasks took a while, could significantly
increase the time till a timeout occurred. Because it was based on the
ManageTaskExecution() being called on a constant interval, this approach
isn't feasible anymore.  Instead measure the actual time since
connection establishment was started. That could in theory, if task
processing takes a very long time, lead to few passes over
PQconnectPoll().

The problem of sleeping too much also exists for the 'task-tracker'
executor, but is generally less problematic there, as processing the
individual tasks usually will take longer. That said, for e.g. the
regression tests it'd be helpful to use a similar approach.
pull/391/head
Andres Freund 2016-05-31 12:19:09 -07:00 committed by Jason Petersen
parent ae8ba2ac52
commit 3dac0a4d14
No known key found for this signature in database
GPG Key ID: 9F1D3510D110ABA9
7 changed files with 274 additions and 23 deletions

View File

@ -21,6 +21,7 @@
#include "distributed/metadata_cache.h"
#include "distributed/connection_cache.h"
#include "distributed/multi_client_executor.h"
#include "distributed/multi_server_executor.h"
#include <errno.h>
#include <unistd.h>
@ -226,9 +227,12 @@ MultiClientConnectPoll(int32 connectionId)
if (readReady)
{
ClientPollingStatusArray[connectionId] = PQconnectPoll(connection);
connectStatus = CLIENT_CONNECTION_BUSY;
}
else
{
connectStatus = CLIENT_CONNECTION_BUSY_READ;
}
connectStatus = CLIENT_CONNECTION_BUSY;
}
else if (pollingStatus == PGRES_POLLING_WRITING)
{
@ -236,9 +240,12 @@ MultiClientConnectPoll(int32 connectionId)
if (writeReady)
{
ClientPollingStatusArray[connectionId] = PQconnectPoll(connection);
connectStatus = CLIENT_CONNECTION_BUSY;
}
else
{
connectStatus = CLIENT_CONNECTION_BUSY_WRITE;
}
connectStatus = CLIENT_CONNECTION_BUSY;
}
else if (pollingStatus == PGRES_POLLING_FAILED)
{
@ -691,6 +698,160 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor)
}
/*
* MultiClientCreateWaitInfo creates a WaitInfo structure, capable of keeping
* track of what maxConnections connections are waiting for; to allow
* efficiently waiting for all of them at once.
*
* Connections can be added using MultiClientRegisterWait(). All added
* connections can then be waited upon together using MultiClientWait().
*/
WaitInfo *
MultiClientCreateWaitInfo(int maxConnections)
{
WaitInfo *waitInfo = palloc(sizeof(WaitInfo));
waitInfo->maxWaiters = maxConnections;
waitInfo->pollfds = palloc(maxConnections * sizeof(struct pollfd));
/* initialize remaining fields */
MultiClientResetWaitInfo(waitInfo);
return waitInfo;
}
/* MultiClientResetWaitInfo clears all pending waits from a WaitInfo. */
void
MultiClientResetWaitInfo(WaitInfo *waitInfo)
{
waitInfo->registeredWaiters = 0;
waitInfo->haveReadyWaiter = false;
waitInfo->haveFailedWaiter = false;
}
/* MultiClientFreeWaitInfo frees a resources associated with a waitInfo struct. */
void
MultiClientFreeWaitInfo(WaitInfo *waitInfo)
{
pfree(waitInfo->pollfds);
pfree(waitInfo);
}
/*
* MultiClientRegisterWait adds a connection to be waited upon, waiting for
* executionStatus.
*/
void
MultiClientRegisterWait(WaitInfo *waitInfo, TaskExecutionStatus executionStatus,
int32 connectionId)
{
PGconn *connection = NULL;
struct pollfd *pollfd = NULL;
Assert(waitInfo->registeredWaiters < waitInfo->maxWaiters);
if (executionStatus == TASK_STATUS_READY)
{
waitInfo->haveReadyWaiter = true;
return;
}
else if (executionStatus == TASK_STATUS_ERROR)
{
waitInfo->haveFailedWaiter = true;
return;
}
connection = ClientConnectionArray[connectionId];
pollfd = &waitInfo->pollfds[waitInfo->registeredWaiters];
pollfd->fd = PQsocket(connection);
if (executionStatus == TASK_STATUS_SOCKET_READ)
{
pollfd->events = POLLERR | POLLIN;
}
else if (executionStatus == TASK_STATUS_SOCKET_WRITE)
{
pollfd->events = POLLERR | POLLOUT;
}
waitInfo->registeredWaiters++;
}
/*
* MultiClientWait waits until at least one connection added with
* MultiClientRegisterWait is ready to be processed again.
*/
void
MultiClientWait(WaitInfo *waitInfo)
{
long sleepIntervalPerCycle = RemoteTaskCheckInterval * 1000L;
/*
* If we had a failure, we always want to sleep for a bit, to prevent
* flooding the other system, probably making the situation worse.
*/
if (waitInfo->haveFailedWaiter)
{
pg_usleep(sleepIntervalPerCycle);
return;
}
/* if there are tasks that already need attention again, don't wait */
if (waitInfo->haveReadyWaiter)
{
return;
}
while (true)
{
/*
* Wait for activity on any of the sockets. Limit the maximum time
* spent waiting in one wait cycle, as insurance against edge
* cases. For efficiency we don't want wake up quite as often as
* citus.remote_task_check_interval, so rather arbitrarily sleep ten
* times as long.
*/
int rc = poll(waitInfo->pollfds, waitInfo->registeredWaiters,
sleepIntervalPerCycle * 10);
if (rc < 0)
{
/*
* Signals that arrive can interrupt our poll(). In that case just
* check for interrupts, and try again. Every other error is
* unexpected and treated as such.
*/
if (errno == EAGAIN || errno == EINTR)
{
CHECK_FOR_INTERRUPTS();
/* maximum wait starts at max again, but that's ok, it's just a stopgap */
continue;
}
else
{
ereport(ERROR, (errcode_for_file_access(),
errmsg("poll failed: %m")));
}
}
else if (rc == 0)
{
ereport(DEBUG2,
(errmsg("waiting for activity on tasks took longer than %ld ms",
(long) RemoteTaskCheckInterval * 10)));
}
/*
* At least one fd changed received a readiness notification, time to
* process tasks again.
*/
return;
}
}
/*
* ClearRemainingResults reads result objects from the connection until we get
* null, and clears these results. This is the last step in completing an async

View File

@ -21,6 +21,7 @@
#include <sys/stat.h>
#include <unistd.h>
#include <poll.h>
#include "commands/dbcommands.h"
#include "distributed/multi_client_executor.h"
@ -28,10 +29,12 @@
#include "distributed/multi_server_executor.h"
#include "distributed/worker_protocol.h"
#include "storage/fd.h"
#include "utils/timestamp.h"
/* Local functions forward declarations */
static ConnectAction ManageTaskExecution(Task *task, TaskExecution *taskExecution);
static ConnectAction ManageTaskExecution(Task *task, TaskExecution *taskExecution,
TaskExecutionStatus *executionStatus);
static bool TaskExecutionReadyToStart(TaskExecution *taskExecution);
static bool TaskExecutionCompleted(TaskExecution *taskExecution);
static void CancelTaskExecutionIfActive(TaskExecution *taskExecution);
@ -76,6 +79,7 @@ MultiRealTimeExecute(Job *job)
List *workerNodeList = NIL;
HTAB *workerHash = NULL;
const char *workerHashName = "Worker node hash";
WaitInfo *waitInfo = MultiClientCreateWaitInfo(list_length(taskList));
workerNodeList = WorkerNodeList();
workerHash = WorkerHash(workerHashName, workerNodeList);
@ -99,12 +103,15 @@ MultiRealTimeExecute(Job *job)
ListCell *taskCell = NULL;
ListCell *taskExecutionCell = NULL;
MultiClientResetWaitInfo(waitInfo);
forboth(taskCell, taskList, taskExecutionCell, taskExecutionList)
{
Task *task = (Task *) lfirst(taskCell);
TaskExecution *taskExecution = (TaskExecution *) lfirst(taskExecutionCell);
ConnectAction connectAction = CONNECT_ACTION_NONE;
WorkerNodeState *workerNodeState = NULL;
TaskExecutionStatus executionStatus;
workerNodeState = LookupWorkerForTask(workerHash, task, taskExecution);
@ -117,7 +124,7 @@ MultiRealTimeExecute(Job *job)
}
/* call the function that performs the core task execution logic */
connectAction = ManageTaskExecution(task, taskExecution);
connectAction = ManageTaskExecution(task, taskExecution, &executionStatus);
/* update the connection counter for throttling */
UpdateConnectionCounter(workerNodeState, connectAction);
@ -139,20 +146,38 @@ MultiRealTimeExecute(Job *job)
{
completedTaskCount++;
}
else
{
uint32 currentIndex = taskExecution->currentNodeIndex;
int32 *connectionIdArray = taskExecution->connectionIdArray;
int32 connectionId = connectionIdArray[currentIndex];
/*
* If not done with the task yet, make note of what this task
* and its associated connection is waiting for.
*/
MultiClientRegisterWait(waitInfo, executionStatus, connectionId);
}
}
/* check if all tasks completed; otherwise sleep to avoid tight loop */
/*
* Check if all tasks completed; otherwise wait as appropriate to
* avoid a tight loop. That means we immediately continue if tasks are
* ready to be processed further, and block when we're waiting for
* network IO.
*/
if (completedTaskCount == taskCount)
{
allTasksCompleted = true;
}
else
{
long sleepIntervalPerCycle = RemoteTaskCheckInterval * 1000L;
pg_usleep(sleepIntervalPerCycle);
MultiClientWait(waitInfo);
}
}
MultiClientFreeWaitInfo(waitInfo);
/*
* We prevent cancel/die interrupts until we clean up connections to worker
* nodes. Note that for the above while loop, if the user Ctrl+C's a query
@ -172,6 +197,9 @@ MultiRealTimeExecute(Job *job)
/*
* If cancel might have been sent, give remote backends some time to flush
* their responses. This avoids some broken pipe logs on the backend-side.
*
* FIXME: This shouldn't be dependant on RemoteTaskCheckInterval; they're
* unrelated type of delays.
*/
if (taskFailed || QueryCancelPending)
{
@ -213,10 +241,12 @@ MultiRealTimeExecute(Job *job)
* Note that this function directly manages a task's execution by opening up a
* separate connection to the worker node for each execution. The function
* returns a ConnectAction enum indicating whether a connection has been opened
* or closed in this call.
* or closed in this call. Via the executionStatus parameter this function returns
* what a Task is blocked on.
*/
static ConnectAction
ManageTaskExecution(Task *task, TaskExecution *taskExecution)
ManageTaskExecution(Task *task, TaskExecution *taskExecution,
TaskExecutionStatus *executionStatus)
{
TaskExecStatus *taskStatusArray = taskExecution->taskStatusArray;
int32 *connectionIdArray = taskExecution->connectionIdArray;
@ -229,6 +259,9 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution)
uint32 nodePort = taskPlacement->nodePort;
ConnectAction connectAction = CONNECT_ACTION_NONE;
/* as most state transitions don't require blocking, default to not waiting */
*executionStatus = TASK_STATUS_READY;
switch (currentStatus)
{
case EXEC_TASK_CONNECT_START:
@ -246,12 +279,14 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution)
if (connectionId != INVALID_CONNECTION_ID)
{
taskStatusArray[currentIndex] = EXEC_TASK_CONNECT_POLL;
taskExecution->connectPollCount = 0;
taskExecution->connectStartTime = GetCurrentTimestamp();
connectAction = CONNECT_ACTION_OPENED;
}
else
{
*executionStatus = TASK_STATUS_ERROR;
AdjustStateForFailure(taskExecution);
break;
}
break;
@ -273,6 +308,17 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution)
}
else if (pollStatus == CLIENT_CONNECTION_BUSY)
{
/* immediately retry */
taskStatusArray[currentIndex] = EXEC_TASK_CONNECT_POLL;
}
else if (pollStatus == CLIENT_CONNECTION_BUSY_READ)
{
*executionStatus = TASK_STATUS_SOCKET_READ;
taskStatusArray[currentIndex] = EXEC_TASK_CONNECT_POLL;
}
else if (pollStatus == CLIENT_CONNECTION_BUSY_WRITE)
{
*executionStatus = TASK_STATUS_SOCKET_WRITE;
taskStatusArray[currentIndex] = EXEC_TASK_CONNECT_POLL;
}
else if (pollStatus == CLIENT_CONNECTION_BAD)
@ -281,12 +327,12 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution)
}
/* now check if we have been trying to connect for too long */
taskExecution->connectPollCount++;
if (pollStatus == CLIENT_CONNECTION_BUSY)
if (pollStatus == CLIENT_CONNECTION_BUSY_READ ||
pollStatus == CLIENT_CONNECTION_BUSY_WRITE)
{
uint32 maxCount = REMOTE_NODE_CONNECT_TIMEOUT / RemoteTaskCheckInterval;
uint32 currentCount = taskExecution->connectPollCount;
if (currentCount >= maxCount)
if (TimestampDifferenceExceeds(taskExecution->connectStartTime,
GetCurrentTimestamp(),
REMOTE_NODE_CONNECT_TIMEOUT))
{
ereport(WARNING, (errmsg("could not establish asynchronous "
"connection after %u ms",
@ -317,6 +363,12 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution)
/* try next worker node */
AdjustStateForFailure(taskExecution);
/*
* Add a delay, to avoid potentially excerbating problems by
* looping quickly
*/
*executionStatus = TASK_STATUS_ERROR;
break;
}
@ -372,6 +424,7 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution)
/* check if query results are in progress or unavailable */
if (resultStatus == CLIENT_RESULT_BUSY)
{
*executionStatus = TASK_STATUS_SOCKET_READ;
taskStatusArray[currentIndex] = EXEC_FETCH_TASK_RUNNING;
break;
}
@ -446,6 +499,7 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution)
if (resultStatus == CLIENT_RESULT_BUSY)
{
taskStatusArray[currentIndex] = EXEC_COMPUTE_TASK_RUNNING;
*executionStatus = TASK_STATUS_SOCKET_READ;
break;
}
else if (resultStatus == CLIENT_RESULT_UNAVAILABLE)
@ -511,6 +565,7 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution)
if (copyStatus == CLIENT_COPY_MORE)
{
taskStatusArray[currentIndex] = EXEC_COMPUTE_TASK_COPYING;
*executionStatus = TASK_STATUS_SOCKET_READ;
}
else if (copyStatus == CLIENT_COPY_DONE)
{

View File

@ -226,7 +226,7 @@ InitTaskExecution(Task *task, TaskExecStatus initialTaskExecStatus)
taskExecution->jobId = task->jobId;
taskExecution->taskId = task->taskId;
taskExecution->nodeCount = nodeCount;
taskExecution->connectPollCount = 0;
taskExecution->connectStartTime = 0;
taskExecution->currentNodeIndex = 0;
taskExecution->dataFetchTaskIndex = -1;
taskExecution->failureCount = 0;

View File

@ -850,7 +850,9 @@ TrackerConnectPoll(TaskTracker *taskTracker)
{
taskTracker->trackerStatus = TRACKER_CONNECTED;
}
else if (pollStatus == CLIENT_CONNECTION_BUSY)
else if (pollStatus == CLIENT_CONNECTION_BUSY ||
pollStatus == CLIENT_CONNECTION_BUSY_READ ||
pollStatus == CLIENT_CONNECTION_BUSY_WRITE)
{
taskTracker->trackerStatus = TRACKER_CONNECT_POLL;
}
@ -864,7 +866,8 @@ TrackerConnectPoll(TaskTracker *taskTracker)
/* now check if we have been trying to connect for too long */
taskTracker->connectPollCount++;
if (pollStatus == CLIENT_CONNECTION_BUSY)
if (pollStatus == CLIENT_CONNECTION_BUSY_READ ||
pollStatus == CLIENT_CONNECTION_BUSY_WRITE)
{
uint32 maxCount = REMOTE_NODE_CONNECT_TIMEOUT / RemoteTaskCheckInterval;
uint32 currentCount = taskTracker->connectPollCount;

View File

@ -14,7 +14,6 @@
#ifndef MULTI_CLIENT_EXECUTOR_H
#define MULTI_CLIENT_EXECUTOR_H
#define INVALID_CONNECTION_ID -1 /* identifies an invalid connection */
#define CLIENT_CONNECT_TIMEOUT 5 /* connection timeout in seconds */
#define MAX_CONNECTION_COUNT 2048 /* simultaneous client connection count */
@ -28,7 +27,9 @@ typedef enum
CLIENT_INVALID_CONNECT = 0,
CLIENT_CONNECTION_BAD = 1,
CLIENT_CONNECTION_BUSY = 2,
CLIENT_CONNECTION_READY = 3
CLIENT_CONNECTION_BUSY_READ = 3,
CLIENT_CONNECTION_BUSY_WRITE = 4,
CLIENT_CONNECTION_READY = 5
} ConnectStatus;
@ -72,6 +73,29 @@ typedef enum
} BatchQueryStatus;
/* Enumeration to track whether a task is ready to run and, if not, what it's blocked on*/
typedef enum TaskExecutionStatus
{
TASK_STATUS_INVALID = 0,
TASK_STATUS_ERROR, /* error occured */
TASK_STATUS_READY, /* task ready to be processed further */
TASK_STATUS_SOCKET_READ, /* waiting for connection to become ready for reads */
TASK_STATUS_SOCKET_WRITE /* waiting for connection to become ready for writes */
} TaskExecutionStatus;
struct pollfd; /* forward declared, to avoid having to include poll.h */
typedef struct WaitInfo
{
int maxWaiters;
struct pollfd *pollfds;
int registeredWaiters;
bool haveReadyWaiter;
bool haveFailedWaiter;
} WaitInfo;
/* Function declarations for executing client-side (libpq) logic. */
extern int32 MultiClientConnect(const char *nodeName, uint32 nodePort,
const char *nodeDatabase, const char *nodeUser);
@ -91,6 +115,13 @@ extern BatchQueryStatus MultiClientBatchResult(int32 connectionId, void **queryR
int *rowCount, int *columnCount);
extern char * MultiClientGetValue(void *queryResult, int rowIndex, int columnIndex);
extern void MultiClientClearResult(void *queryResult);
extern WaitInfo * MultiClientCreateWaitInfo(int maxConnections);
extern void MultiClientResetWaitInfo(WaitInfo *waitInfo);
extern void MultiClientFreeWaitInfo(WaitInfo *waitInfo);
extern void MultiClientRegisterWait(WaitInfo *waitInfo, TaskExecutionStatus waitStatus,
int32 connectionId);
extern void MultiClientWait(WaitInfo *waitInfo);
#endif /* MULTI_CLIENT_EXECUTOR_H */

View File

@ -14,6 +14,7 @@
#ifndef MULTI_PHYSICAL_PLANNER_H
#define MULTI_PHYSICAL_PLANNER_H
#include "datatype/timestamp.h"
#include "distributed/citus_nodes.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/multi_logical_planner.h"

View File

@ -121,7 +121,7 @@ struct TaskExecution
TransmitExecStatus *transmitStatusArray;
int32 *connectionIdArray;
int32 *fileDescriptorArray;
uint32 connectPollCount;
TimestampTz connectStartTime;
uint32 nodeCount;
uint32 currentNodeIndex;
uint32 querySourceNodeIndex; /* only applies to map fetch tasks */