Merge pull request #391 from citusdata/feature/rely-less-on-remote-task-check-interval

Prototype: Rely less on remote_task_check_interval.

cr: @jasonmp85
pull/544/head
Jason Petersen 2016-06-02 12:34:25 -06:00
commit 027a7a717d
7 changed files with 274 additions and 23 deletions

View File

@ -21,6 +21,7 @@
#include "distributed/metadata_cache.h"
#include "distributed/connection_cache.h"
#include "distributed/multi_client_executor.h"
#include "distributed/multi_server_executor.h"
#include <errno.h>
#include <unistd.h>
@ -226,9 +227,12 @@ MultiClientConnectPoll(int32 connectionId)
if (readReady)
{
ClientPollingStatusArray[connectionId] = PQconnectPoll(connection);
connectStatus = CLIENT_CONNECTION_BUSY;
}
else
{
connectStatus = CLIENT_CONNECTION_BUSY_READ;
}
connectStatus = CLIENT_CONNECTION_BUSY;
}
else if (pollingStatus == PGRES_POLLING_WRITING)
{
@ -236,9 +240,12 @@ MultiClientConnectPoll(int32 connectionId)
if (writeReady)
{
ClientPollingStatusArray[connectionId] = PQconnectPoll(connection);
connectStatus = CLIENT_CONNECTION_BUSY;
}
else
{
connectStatus = CLIENT_CONNECTION_BUSY_WRITE;
}
connectStatus = CLIENT_CONNECTION_BUSY;
}
else if (pollingStatus == PGRES_POLLING_FAILED)
{
@ -691,6 +698,160 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor)
}
/*
* MultiClientCreateWaitInfo creates a WaitInfo structure, capable of keeping
* track of what maxConnections connections are waiting for; to allow
* efficiently waiting for all of them at once.
*
* Connections can be added using MultiClientRegisterWait(). All added
* connections can then be waited upon together using MultiClientWait().
*/
WaitInfo *
MultiClientCreateWaitInfo(int maxConnections)
{
WaitInfo *waitInfo = palloc(sizeof(WaitInfo));
waitInfo->maxWaiters = maxConnections;
waitInfo->pollfds = palloc(maxConnections * sizeof(struct pollfd));
/* initialize remaining fields */
MultiClientResetWaitInfo(waitInfo);
return waitInfo;
}
/* MultiClientResetWaitInfo clears all pending waits from a WaitInfo. */
void
MultiClientResetWaitInfo(WaitInfo *waitInfo)
{
waitInfo->registeredWaiters = 0;
waitInfo->haveReadyWaiter = false;
waitInfo->haveFailedWaiter = false;
}
/* MultiClientFreeWaitInfo frees a resources associated with a waitInfo struct. */
void
MultiClientFreeWaitInfo(WaitInfo *waitInfo)
{
pfree(waitInfo->pollfds);
pfree(waitInfo);
}
/*
* MultiClientRegisterWait adds a connection to be waited upon, waiting for
* executionStatus.
*/
void
MultiClientRegisterWait(WaitInfo *waitInfo, TaskExecutionStatus executionStatus,
int32 connectionId)
{
PGconn *connection = NULL;
struct pollfd *pollfd = NULL;
Assert(waitInfo->registeredWaiters < waitInfo->maxWaiters);
if (executionStatus == TASK_STATUS_READY)
{
waitInfo->haveReadyWaiter = true;
return;
}
else if (executionStatus == TASK_STATUS_ERROR)
{
waitInfo->haveFailedWaiter = true;
return;
}
connection = ClientConnectionArray[connectionId];
pollfd = &waitInfo->pollfds[waitInfo->registeredWaiters];
pollfd->fd = PQsocket(connection);
if (executionStatus == TASK_STATUS_SOCKET_READ)
{
pollfd->events = POLLERR | POLLIN;
}
else if (executionStatus == TASK_STATUS_SOCKET_WRITE)
{
pollfd->events = POLLERR | POLLOUT;
}
waitInfo->registeredWaiters++;
}
/*
* MultiClientWait waits until at least one connection added with
* MultiClientRegisterWait is ready to be processed again.
*/
void
MultiClientWait(WaitInfo *waitInfo)
{
long sleepIntervalPerCycle = RemoteTaskCheckInterval * 1000L;
/*
* If we had a failure, we always want to sleep for a bit, to prevent
* flooding the other system, probably making the situation worse.
*/
if (waitInfo->haveFailedWaiter)
{
pg_usleep(sleepIntervalPerCycle);
return;
}
/* if there are tasks that already need attention again, don't wait */
if (waitInfo->haveReadyWaiter)
{
return;
}
while (true)
{
/*
* Wait for activity on any of the sockets. Limit the maximum time
* spent waiting in one wait cycle, as insurance against edge
* cases. For efficiency we don't want wake up quite as often as
* citus.remote_task_check_interval, so rather arbitrarily sleep ten
* times as long.
*/
int rc = poll(waitInfo->pollfds, waitInfo->registeredWaiters,
sleepIntervalPerCycle * 10);
if (rc < 0)
{
/*
* Signals that arrive can interrupt our poll(). In that case just
* check for interrupts, and try again. Every other error is
* unexpected and treated as such.
*/
if (errno == EAGAIN || errno == EINTR)
{
CHECK_FOR_INTERRUPTS();
/* maximum wait starts at max again, but that's ok, it's just a stopgap */
continue;
}
else
{
ereport(ERROR, (errcode_for_file_access(),
errmsg("poll failed: %m")));
}
}
else if (rc == 0)
{
ereport(DEBUG2,
(errmsg("waiting for activity on tasks took longer than %ld ms",
(long) RemoteTaskCheckInterval * 10)));
}
/*
* At least one fd changed received a readiness notification, time to
* process tasks again.
*/
return;
}
}
/*
* ClearRemainingResults reads result objects from the connection until we get
* null, and clears these results. This is the last step in completing an async

View File

@ -21,6 +21,7 @@
#include <sys/stat.h>
#include <unistd.h>
#include <poll.h>
#include "commands/dbcommands.h"
#include "distributed/multi_client_executor.h"
@ -28,10 +29,12 @@
#include "distributed/multi_server_executor.h"
#include "distributed/worker_protocol.h"
#include "storage/fd.h"
#include "utils/timestamp.h"
/* Local functions forward declarations */
static ConnectAction ManageTaskExecution(Task *task, TaskExecution *taskExecution);
static ConnectAction ManageTaskExecution(Task *task, TaskExecution *taskExecution,
TaskExecutionStatus *executionStatus);
static bool TaskExecutionReadyToStart(TaskExecution *taskExecution);
static bool TaskExecutionCompleted(TaskExecution *taskExecution);
static void CancelTaskExecutionIfActive(TaskExecution *taskExecution);
@ -76,6 +79,7 @@ MultiRealTimeExecute(Job *job)
List *workerNodeList = NIL;
HTAB *workerHash = NULL;
const char *workerHashName = "Worker node hash";
WaitInfo *waitInfo = MultiClientCreateWaitInfo(list_length(taskList));
workerNodeList = WorkerNodeList();
workerHash = WorkerHash(workerHashName, workerNodeList);
@ -99,12 +103,15 @@ MultiRealTimeExecute(Job *job)
ListCell *taskCell = NULL;
ListCell *taskExecutionCell = NULL;
MultiClientResetWaitInfo(waitInfo);
forboth(taskCell, taskList, taskExecutionCell, taskExecutionList)
{
Task *task = (Task *) lfirst(taskCell);
TaskExecution *taskExecution = (TaskExecution *) lfirst(taskExecutionCell);
ConnectAction connectAction = CONNECT_ACTION_NONE;
WorkerNodeState *workerNodeState = NULL;
TaskExecutionStatus executionStatus;
workerNodeState = LookupWorkerForTask(workerHash, task, taskExecution);
@ -117,7 +124,7 @@ MultiRealTimeExecute(Job *job)
}
/* call the function that performs the core task execution logic */
connectAction = ManageTaskExecution(task, taskExecution);
connectAction = ManageTaskExecution(task, taskExecution, &executionStatus);
/* update the connection counter for throttling */
UpdateConnectionCounter(workerNodeState, connectAction);
@ -139,20 +146,38 @@ MultiRealTimeExecute(Job *job)
{
completedTaskCount++;
}
else
{
uint32 currentIndex = taskExecution->currentNodeIndex;
int32 *connectionIdArray = taskExecution->connectionIdArray;
int32 connectionId = connectionIdArray[currentIndex];
/*
* If not done with the task yet, make note of what this task
* and its associated connection is waiting for.
*/
MultiClientRegisterWait(waitInfo, executionStatus, connectionId);
}
}
/* check if all tasks completed; otherwise sleep to avoid tight loop */
/*
* Check if all tasks completed; otherwise wait as appropriate to
* avoid a tight loop. That means we immediately continue if tasks are
* ready to be processed further, and block when we're waiting for
* network IO.
*/
if (completedTaskCount == taskCount)
{
allTasksCompleted = true;
}
else
{
long sleepIntervalPerCycle = RemoteTaskCheckInterval * 1000L;
pg_usleep(sleepIntervalPerCycle);
MultiClientWait(waitInfo);
}
}
MultiClientFreeWaitInfo(waitInfo);
/*
* We prevent cancel/die interrupts until we clean up connections to worker
* nodes. Note that for the above while loop, if the user Ctrl+C's a query
@ -172,6 +197,9 @@ MultiRealTimeExecute(Job *job)
/*
* If cancel might have been sent, give remote backends some time to flush
* their responses. This avoids some broken pipe logs on the backend-side.
*
* FIXME: This shouldn't be dependant on RemoteTaskCheckInterval; they're
* unrelated type of delays.
*/
if (taskFailed || QueryCancelPending)
{
@ -213,10 +241,12 @@ MultiRealTimeExecute(Job *job)
* Note that this function directly manages a task's execution by opening up a
* separate connection to the worker node for each execution. The function
* returns a ConnectAction enum indicating whether a connection has been opened
* or closed in this call.
* or closed in this call. Via the executionStatus parameter this function returns
* what a Task is blocked on.
*/
static ConnectAction
ManageTaskExecution(Task *task, TaskExecution *taskExecution)
ManageTaskExecution(Task *task, TaskExecution *taskExecution,
TaskExecutionStatus *executionStatus)
{
TaskExecStatus *taskStatusArray = taskExecution->taskStatusArray;
int32 *connectionIdArray = taskExecution->connectionIdArray;
@ -229,6 +259,9 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution)
uint32 nodePort = taskPlacement->nodePort;
ConnectAction connectAction = CONNECT_ACTION_NONE;
/* as most state transitions don't require blocking, default to not waiting */
*executionStatus = TASK_STATUS_READY;
switch (currentStatus)
{
case EXEC_TASK_CONNECT_START:
@ -246,12 +279,14 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution)
if (connectionId != INVALID_CONNECTION_ID)
{
taskStatusArray[currentIndex] = EXEC_TASK_CONNECT_POLL;
taskExecution->connectPollCount = 0;
taskExecution->connectStartTime = GetCurrentTimestamp();
connectAction = CONNECT_ACTION_OPENED;
}
else
{
*executionStatus = TASK_STATUS_ERROR;
AdjustStateForFailure(taskExecution);
break;
}
break;
@ -273,6 +308,17 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution)
}
else if (pollStatus == CLIENT_CONNECTION_BUSY)
{
/* immediately retry */
taskStatusArray[currentIndex] = EXEC_TASK_CONNECT_POLL;
}
else if (pollStatus == CLIENT_CONNECTION_BUSY_READ)
{
*executionStatus = TASK_STATUS_SOCKET_READ;
taskStatusArray[currentIndex] = EXEC_TASK_CONNECT_POLL;
}
else if (pollStatus == CLIENT_CONNECTION_BUSY_WRITE)
{
*executionStatus = TASK_STATUS_SOCKET_WRITE;
taskStatusArray[currentIndex] = EXEC_TASK_CONNECT_POLL;
}
else if (pollStatus == CLIENT_CONNECTION_BAD)
@ -281,12 +327,12 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution)
}
/* now check if we have been trying to connect for too long */
taskExecution->connectPollCount++;
if (pollStatus == CLIENT_CONNECTION_BUSY)
if (pollStatus == CLIENT_CONNECTION_BUSY_READ ||
pollStatus == CLIENT_CONNECTION_BUSY_WRITE)
{
uint32 maxCount = REMOTE_NODE_CONNECT_TIMEOUT / RemoteTaskCheckInterval;
uint32 currentCount = taskExecution->connectPollCount;
if (currentCount >= maxCount)
if (TimestampDifferenceExceeds(taskExecution->connectStartTime,
GetCurrentTimestamp(),
REMOTE_NODE_CONNECT_TIMEOUT))
{
ereport(WARNING, (errmsg("could not establish asynchronous "
"connection after %u ms",
@ -317,6 +363,12 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution)
/* try next worker node */
AdjustStateForFailure(taskExecution);
/*
* Add a delay, to avoid potentially excerbating problems by
* looping quickly
*/
*executionStatus = TASK_STATUS_ERROR;
break;
}
@ -372,6 +424,7 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution)
/* check if query results are in progress or unavailable */
if (resultStatus == CLIENT_RESULT_BUSY)
{
*executionStatus = TASK_STATUS_SOCKET_READ;
taskStatusArray[currentIndex] = EXEC_FETCH_TASK_RUNNING;
break;
}
@ -446,6 +499,7 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution)
if (resultStatus == CLIENT_RESULT_BUSY)
{
taskStatusArray[currentIndex] = EXEC_COMPUTE_TASK_RUNNING;
*executionStatus = TASK_STATUS_SOCKET_READ;
break;
}
else if (resultStatus == CLIENT_RESULT_UNAVAILABLE)
@ -511,6 +565,7 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution)
if (copyStatus == CLIENT_COPY_MORE)
{
taskStatusArray[currentIndex] = EXEC_COMPUTE_TASK_COPYING;
*executionStatus = TASK_STATUS_SOCKET_READ;
}
else if (copyStatus == CLIENT_COPY_DONE)
{

View File

@ -226,7 +226,7 @@ InitTaskExecution(Task *task, TaskExecStatus initialTaskExecStatus)
taskExecution->jobId = task->jobId;
taskExecution->taskId = task->taskId;
taskExecution->nodeCount = nodeCount;
taskExecution->connectPollCount = 0;
taskExecution->connectStartTime = 0;
taskExecution->currentNodeIndex = 0;
taskExecution->dataFetchTaskIndex = -1;
taskExecution->failureCount = 0;

View File

@ -850,7 +850,9 @@ TrackerConnectPoll(TaskTracker *taskTracker)
{
taskTracker->trackerStatus = TRACKER_CONNECTED;
}
else if (pollStatus == CLIENT_CONNECTION_BUSY)
else if (pollStatus == CLIENT_CONNECTION_BUSY ||
pollStatus == CLIENT_CONNECTION_BUSY_READ ||
pollStatus == CLIENT_CONNECTION_BUSY_WRITE)
{
taskTracker->trackerStatus = TRACKER_CONNECT_POLL;
}
@ -864,7 +866,8 @@ TrackerConnectPoll(TaskTracker *taskTracker)
/* now check if we have been trying to connect for too long */
taskTracker->connectPollCount++;
if (pollStatus == CLIENT_CONNECTION_BUSY)
if (pollStatus == CLIENT_CONNECTION_BUSY_READ ||
pollStatus == CLIENT_CONNECTION_BUSY_WRITE)
{
uint32 maxCount = REMOTE_NODE_CONNECT_TIMEOUT / RemoteTaskCheckInterval;
uint32 currentCount = taskTracker->connectPollCount;

View File

@ -14,7 +14,6 @@
#ifndef MULTI_CLIENT_EXECUTOR_H
#define MULTI_CLIENT_EXECUTOR_H
#define INVALID_CONNECTION_ID -1 /* identifies an invalid connection */
#define CLIENT_CONNECT_TIMEOUT 5 /* connection timeout in seconds */
#define MAX_CONNECTION_COUNT 2048 /* simultaneous client connection count */
@ -28,7 +27,9 @@ typedef enum
CLIENT_INVALID_CONNECT = 0,
CLIENT_CONNECTION_BAD = 1,
CLIENT_CONNECTION_BUSY = 2,
CLIENT_CONNECTION_READY = 3
CLIENT_CONNECTION_BUSY_READ = 3,
CLIENT_CONNECTION_BUSY_WRITE = 4,
CLIENT_CONNECTION_READY = 5
} ConnectStatus;
@ -72,6 +73,29 @@ typedef enum
} BatchQueryStatus;
/* Enumeration to track whether a task is ready to run and, if not, what it's blocked on*/
typedef enum TaskExecutionStatus
{
TASK_STATUS_INVALID = 0,
TASK_STATUS_ERROR, /* error occured */
TASK_STATUS_READY, /* task ready to be processed further */
TASK_STATUS_SOCKET_READ, /* waiting for connection to become ready for reads */
TASK_STATUS_SOCKET_WRITE /* waiting for connection to become ready for writes */
} TaskExecutionStatus;
struct pollfd; /* forward declared, to avoid having to include poll.h */
typedef struct WaitInfo
{
int maxWaiters;
struct pollfd *pollfds;
int registeredWaiters;
bool haveReadyWaiter;
bool haveFailedWaiter;
} WaitInfo;
/* Function declarations for executing client-side (libpq) logic. */
extern int32 MultiClientConnect(const char *nodeName, uint32 nodePort,
const char *nodeDatabase, const char *nodeUser);
@ -91,6 +115,13 @@ extern BatchQueryStatus MultiClientBatchResult(int32 connectionId, void **queryR
int *rowCount, int *columnCount);
extern char * MultiClientGetValue(void *queryResult, int rowIndex, int columnIndex);
extern void MultiClientClearResult(void *queryResult);
extern WaitInfo * MultiClientCreateWaitInfo(int maxConnections);
extern void MultiClientResetWaitInfo(WaitInfo *waitInfo);
extern void MultiClientFreeWaitInfo(WaitInfo *waitInfo);
extern void MultiClientRegisterWait(WaitInfo *waitInfo, TaskExecutionStatus waitStatus,
int32 connectionId);
extern void MultiClientWait(WaitInfo *waitInfo);
#endif /* MULTI_CLIENT_EXECUTOR_H */

View File

@ -14,6 +14,7 @@
#ifndef MULTI_PHYSICAL_PLANNER_H
#define MULTI_PHYSICAL_PLANNER_H
#include "datatype/timestamp.h"
#include "distributed/citus_nodes.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/multi_logical_planner.h"

View File

@ -121,7 +121,7 @@ struct TaskExecution
TransmitExecStatus *transmitStatusArray;
int32 *connectionIdArray;
int32 *fileDescriptorArray;
uint32 connectPollCount;
TimestampTz connectStartTime;
uint32 nodeCount;
uint32 currentNodeIndex;
uint32 querySourceNodeIndex; /* only applies to map fetch tasks */