diff --git a/src/backend/distributed/executor/multi_client_executor.c b/src/backend/distributed/executor/multi_client_executor.c index 76c53de1e..0cccff038 100644 --- a/src/backend/distributed/executor/multi_client_executor.c +++ b/src/backend/distributed/executor/multi_client_executor.c @@ -21,6 +21,7 @@ #include "distributed/metadata_cache.h" #include "distributed/connection_cache.h" #include "distributed/multi_client_executor.h" +#include "distributed/multi_server_executor.h" #include #include @@ -226,9 +227,12 @@ MultiClientConnectPoll(int32 connectionId) if (readReady) { ClientPollingStatusArray[connectionId] = PQconnectPoll(connection); + connectStatus = CLIENT_CONNECTION_BUSY; + } + else + { + connectStatus = CLIENT_CONNECTION_BUSY_READ; } - - connectStatus = CLIENT_CONNECTION_BUSY; } else if (pollingStatus == PGRES_POLLING_WRITING) { @@ -236,9 +240,12 @@ MultiClientConnectPoll(int32 connectionId) if (writeReady) { ClientPollingStatusArray[connectionId] = PQconnectPoll(connection); + connectStatus = CLIENT_CONNECTION_BUSY; + } + else + { + connectStatus = CLIENT_CONNECTION_BUSY_WRITE; } - - connectStatus = CLIENT_CONNECTION_BUSY; } else if (pollingStatus == PGRES_POLLING_FAILED) { @@ -691,6 +698,160 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor) } +/* + * MultiClientCreateWaitInfo creates a WaitInfo structure, capable of keeping + * track of what maxConnections connections are waiting for; to allow + * efficiently waiting for all of them at once. + * + * Connections can be added using MultiClientRegisterWait(). All added + * connections can then be waited upon together using MultiClientWait(). + */ +WaitInfo * +MultiClientCreateWaitInfo(int maxConnections) +{ + WaitInfo *waitInfo = palloc(sizeof(WaitInfo)); + + waitInfo->maxWaiters = maxConnections; + waitInfo->pollfds = palloc(maxConnections * sizeof(struct pollfd)); + + /* initialize remaining fields */ + MultiClientResetWaitInfo(waitInfo); + + return waitInfo; +} + + +/* MultiClientResetWaitInfo clears all pending waits from a WaitInfo. */ +void +MultiClientResetWaitInfo(WaitInfo *waitInfo) +{ + waitInfo->registeredWaiters = 0; + waitInfo->haveReadyWaiter = false; + waitInfo->haveFailedWaiter = false; +} + + +/* MultiClientFreeWaitInfo frees a resources associated with a waitInfo struct. */ +void +MultiClientFreeWaitInfo(WaitInfo *waitInfo) +{ + pfree(waitInfo->pollfds); + pfree(waitInfo); +} + + +/* + * MultiClientRegisterWait adds a connection to be waited upon, waiting for + * executionStatus. + */ +void +MultiClientRegisterWait(WaitInfo *waitInfo, TaskExecutionStatus executionStatus, + int32 connectionId) +{ + PGconn *connection = NULL; + struct pollfd *pollfd = NULL; + + Assert(waitInfo->registeredWaiters < waitInfo->maxWaiters); + + if (executionStatus == TASK_STATUS_READY) + { + waitInfo->haveReadyWaiter = true; + return; + } + else if (executionStatus == TASK_STATUS_ERROR) + { + waitInfo->haveFailedWaiter = true; + return; + } + + connection = ClientConnectionArray[connectionId]; + pollfd = &waitInfo->pollfds[waitInfo->registeredWaiters]; + pollfd->fd = PQsocket(connection); + if (executionStatus == TASK_STATUS_SOCKET_READ) + { + pollfd->events = POLLERR | POLLIN; + } + else if (executionStatus == TASK_STATUS_SOCKET_WRITE) + { + pollfd->events = POLLERR | POLLOUT; + } + waitInfo->registeredWaiters++; +} + + +/* + * MultiClientWait waits until at least one connection added with + * MultiClientRegisterWait is ready to be processed again. + */ +void +MultiClientWait(WaitInfo *waitInfo) +{ + long sleepIntervalPerCycle = RemoteTaskCheckInterval * 1000L; + + /* + * If we had a failure, we always want to sleep for a bit, to prevent + * flooding the other system, probably making the situation worse. + */ + if (waitInfo->haveFailedWaiter) + { + pg_usleep(sleepIntervalPerCycle); + return; + } + + /* if there are tasks that already need attention again, don't wait */ + if (waitInfo->haveReadyWaiter) + { + return; + } + + while (true) + { + /* + * Wait for activity on any of the sockets. Limit the maximum time + * spent waiting in one wait cycle, as insurance against edge + * cases. For efficiency we don't want wake up quite as often as + * citus.remote_task_check_interval, so rather arbitrarily sleep ten + * times as long. + */ + int rc = poll(waitInfo->pollfds, waitInfo->registeredWaiters, + sleepIntervalPerCycle * 10); + + if (rc < 0) + { + /* + * Signals that arrive can interrupt our poll(). In that case just + * check for interrupts, and try again. Every other error is + * unexpected and treated as such. + */ + if (errno == EAGAIN || errno == EINTR) + { + CHECK_FOR_INTERRUPTS(); + + /* maximum wait starts at max again, but that's ok, it's just a stopgap */ + continue; + } + else + { + ereport(ERROR, (errcode_for_file_access(), + errmsg("poll failed: %m"))); + } + } + else if (rc == 0) + { + ereport(DEBUG2, + (errmsg("waiting for activity on tasks took longer than %ld ms", + (long) RemoteTaskCheckInterval * 10))); + } + + /* + * At least one fd changed received a readiness notification, time to + * process tasks again. + */ + return; + } +} + + /* * ClearRemainingResults reads result objects from the connection until we get * null, and clears these results. This is the last step in completing an async diff --git a/src/backend/distributed/executor/multi_real_time_executor.c b/src/backend/distributed/executor/multi_real_time_executor.c index 665276ca8..230da5955 100644 --- a/src/backend/distributed/executor/multi_real_time_executor.c +++ b/src/backend/distributed/executor/multi_real_time_executor.c @@ -21,6 +21,7 @@ #include #include +#include #include "commands/dbcommands.h" #include "distributed/multi_client_executor.h" @@ -28,10 +29,12 @@ #include "distributed/multi_server_executor.h" #include "distributed/worker_protocol.h" #include "storage/fd.h" +#include "utils/timestamp.h" /* Local functions forward declarations */ -static ConnectAction ManageTaskExecution(Task *task, TaskExecution *taskExecution); +static ConnectAction ManageTaskExecution(Task *task, TaskExecution *taskExecution, + TaskExecutionStatus *executionStatus); static bool TaskExecutionReadyToStart(TaskExecution *taskExecution); static bool TaskExecutionCompleted(TaskExecution *taskExecution); static void CancelTaskExecutionIfActive(TaskExecution *taskExecution); @@ -76,6 +79,7 @@ MultiRealTimeExecute(Job *job) List *workerNodeList = NIL; HTAB *workerHash = NULL; const char *workerHashName = "Worker node hash"; + WaitInfo *waitInfo = MultiClientCreateWaitInfo(list_length(taskList)); workerNodeList = WorkerNodeList(); workerHash = WorkerHash(workerHashName, workerNodeList); @@ -99,12 +103,15 @@ MultiRealTimeExecute(Job *job) ListCell *taskCell = NULL; ListCell *taskExecutionCell = NULL; + MultiClientResetWaitInfo(waitInfo); + forboth(taskCell, taskList, taskExecutionCell, taskExecutionList) { Task *task = (Task *) lfirst(taskCell); TaskExecution *taskExecution = (TaskExecution *) lfirst(taskExecutionCell); ConnectAction connectAction = CONNECT_ACTION_NONE; WorkerNodeState *workerNodeState = NULL; + TaskExecutionStatus executionStatus; workerNodeState = LookupWorkerForTask(workerHash, task, taskExecution); @@ -117,7 +124,7 @@ MultiRealTimeExecute(Job *job) } /* call the function that performs the core task execution logic */ - connectAction = ManageTaskExecution(task, taskExecution); + connectAction = ManageTaskExecution(task, taskExecution, &executionStatus); /* update the connection counter for throttling */ UpdateConnectionCounter(workerNodeState, connectAction); @@ -139,20 +146,38 @@ MultiRealTimeExecute(Job *job) { completedTaskCount++; } + else + { + uint32 currentIndex = taskExecution->currentNodeIndex; + int32 *connectionIdArray = taskExecution->connectionIdArray; + int32 connectionId = connectionIdArray[currentIndex]; + + /* + * If not done with the task yet, make note of what this task + * and its associated connection is waiting for. + */ + MultiClientRegisterWait(waitInfo, executionStatus, connectionId); + } } - /* check if all tasks completed; otherwise sleep to avoid tight loop */ + /* + * Check if all tasks completed; otherwise wait as appropriate to + * avoid a tight loop. That means we immediately continue if tasks are + * ready to be processed further, and block when we're waiting for + * network IO. + */ if (completedTaskCount == taskCount) { allTasksCompleted = true; } else { - long sleepIntervalPerCycle = RemoteTaskCheckInterval * 1000L; - pg_usleep(sleepIntervalPerCycle); + MultiClientWait(waitInfo); } } + MultiClientFreeWaitInfo(waitInfo); + /* * We prevent cancel/die interrupts until we clean up connections to worker * nodes. Note that for the above while loop, if the user Ctrl+C's a query @@ -172,6 +197,9 @@ MultiRealTimeExecute(Job *job) /* * If cancel might have been sent, give remote backends some time to flush * their responses. This avoids some broken pipe logs on the backend-side. + * + * FIXME: This shouldn't be dependant on RemoteTaskCheckInterval; they're + * unrelated type of delays. */ if (taskFailed || QueryCancelPending) { @@ -213,10 +241,12 @@ MultiRealTimeExecute(Job *job) * Note that this function directly manages a task's execution by opening up a * separate connection to the worker node for each execution. The function * returns a ConnectAction enum indicating whether a connection has been opened - * or closed in this call. + * or closed in this call. Via the executionStatus parameter this function returns + * what a Task is blocked on. */ static ConnectAction -ManageTaskExecution(Task *task, TaskExecution *taskExecution) +ManageTaskExecution(Task *task, TaskExecution *taskExecution, + TaskExecutionStatus *executionStatus) { TaskExecStatus *taskStatusArray = taskExecution->taskStatusArray; int32 *connectionIdArray = taskExecution->connectionIdArray; @@ -229,6 +259,9 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution) uint32 nodePort = taskPlacement->nodePort; ConnectAction connectAction = CONNECT_ACTION_NONE; + /* as most state transitions don't require blocking, default to not waiting */ + *executionStatus = TASK_STATUS_READY; + switch (currentStatus) { case EXEC_TASK_CONNECT_START: @@ -246,12 +279,14 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution) if (connectionId != INVALID_CONNECTION_ID) { taskStatusArray[currentIndex] = EXEC_TASK_CONNECT_POLL; - taskExecution->connectPollCount = 0; + taskExecution->connectStartTime = GetCurrentTimestamp(); connectAction = CONNECT_ACTION_OPENED; } else { + *executionStatus = TASK_STATUS_ERROR; AdjustStateForFailure(taskExecution); + break; } break; @@ -273,6 +308,17 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution) } else if (pollStatus == CLIENT_CONNECTION_BUSY) { + /* immediately retry */ + taskStatusArray[currentIndex] = EXEC_TASK_CONNECT_POLL; + } + else if (pollStatus == CLIENT_CONNECTION_BUSY_READ) + { + *executionStatus = TASK_STATUS_SOCKET_READ; + taskStatusArray[currentIndex] = EXEC_TASK_CONNECT_POLL; + } + else if (pollStatus == CLIENT_CONNECTION_BUSY_WRITE) + { + *executionStatus = TASK_STATUS_SOCKET_WRITE; taskStatusArray[currentIndex] = EXEC_TASK_CONNECT_POLL; } else if (pollStatus == CLIENT_CONNECTION_BAD) @@ -281,12 +327,12 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution) } /* now check if we have been trying to connect for too long */ - taskExecution->connectPollCount++; - if (pollStatus == CLIENT_CONNECTION_BUSY) + if (pollStatus == CLIENT_CONNECTION_BUSY_READ || + pollStatus == CLIENT_CONNECTION_BUSY_WRITE) { - uint32 maxCount = REMOTE_NODE_CONNECT_TIMEOUT / RemoteTaskCheckInterval; - uint32 currentCount = taskExecution->connectPollCount; - if (currentCount >= maxCount) + if (TimestampDifferenceExceeds(taskExecution->connectStartTime, + GetCurrentTimestamp(), + REMOTE_NODE_CONNECT_TIMEOUT)) { ereport(WARNING, (errmsg("could not establish asynchronous " "connection after %u ms", @@ -317,6 +363,12 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution) /* try next worker node */ AdjustStateForFailure(taskExecution); + /* + * Add a delay, to avoid potentially excerbating problems by + * looping quickly + */ + *executionStatus = TASK_STATUS_ERROR; + break; } @@ -372,6 +424,7 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution) /* check if query results are in progress or unavailable */ if (resultStatus == CLIENT_RESULT_BUSY) { + *executionStatus = TASK_STATUS_SOCKET_READ; taskStatusArray[currentIndex] = EXEC_FETCH_TASK_RUNNING; break; } @@ -446,6 +499,7 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution) if (resultStatus == CLIENT_RESULT_BUSY) { taskStatusArray[currentIndex] = EXEC_COMPUTE_TASK_RUNNING; + *executionStatus = TASK_STATUS_SOCKET_READ; break; } else if (resultStatus == CLIENT_RESULT_UNAVAILABLE) @@ -511,6 +565,7 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution) if (copyStatus == CLIENT_COPY_MORE) { taskStatusArray[currentIndex] = EXEC_COMPUTE_TASK_COPYING; + *executionStatus = TASK_STATUS_SOCKET_READ; } else if (copyStatus == CLIENT_COPY_DONE) { diff --git a/src/backend/distributed/executor/multi_server_executor.c b/src/backend/distributed/executor/multi_server_executor.c index 443ca813e..48127ed92 100644 --- a/src/backend/distributed/executor/multi_server_executor.c +++ b/src/backend/distributed/executor/multi_server_executor.c @@ -226,7 +226,7 @@ InitTaskExecution(Task *task, TaskExecStatus initialTaskExecStatus) taskExecution->jobId = task->jobId; taskExecution->taskId = task->taskId; taskExecution->nodeCount = nodeCount; - taskExecution->connectPollCount = 0; + taskExecution->connectStartTime = 0; taskExecution->currentNodeIndex = 0; taskExecution->dataFetchTaskIndex = -1; taskExecution->failureCount = 0; diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index fd0638741..0a8f6681c 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -850,7 +850,9 @@ TrackerConnectPoll(TaskTracker *taskTracker) { taskTracker->trackerStatus = TRACKER_CONNECTED; } - else if (pollStatus == CLIENT_CONNECTION_BUSY) + else if (pollStatus == CLIENT_CONNECTION_BUSY || + pollStatus == CLIENT_CONNECTION_BUSY_READ || + pollStatus == CLIENT_CONNECTION_BUSY_WRITE) { taskTracker->trackerStatus = TRACKER_CONNECT_POLL; } @@ -864,7 +866,8 @@ TrackerConnectPoll(TaskTracker *taskTracker) /* now check if we have been trying to connect for too long */ taskTracker->connectPollCount++; - if (pollStatus == CLIENT_CONNECTION_BUSY) + if (pollStatus == CLIENT_CONNECTION_BUSY_READ || + pollStatus == CLIENT_CONNECTION_BUSY_WRITE) { uint32 maxCount = REMOTE_NODE_CONNECT_TIMEOUT / RemoteTaskCheckInterval; uint32 currentCount = taskTracker->connectPollCount; diff --git a/src/include/distributed/multi_client_executor.h b/src/include/distributed/multi_client_executor.h index 1486cd692..ab5b9bc38 100644 --- a/src/include/distributed/multi_client_executor.h +++ b/src/include/distributed/multi_client_executor.h @@ -14,7 +14,6 @@ #ifndef MULTI_CLIENT_EXECUTOR_H #define MULTI_CLIENT_EXECUTOR_H - #define INVALID_CONNECTION_ID -1 /* identifies an invalid connection */ #define CLIENT_CONNECT_TIMEOUT 5 /* connection timeout in seconds */ #define MAX_CONNECTION_COUNT 2048 /* simultaneous client connection count */ @@ -28,7 +27,9 @@ typedef enum CLIENT_INVALID_CONNECT = 0, CLIENT_CONNECTION_BAD = 1, CLIENT_CONNECTION_BUSY = 2, - CLIENT_CONNECTION_READY = 3 + CLIENT_CONNECTION_BUSY_READ = 3, + CLIENT_CONNECTION_BUSY_WRITE = 4, + CLIENT_CONNECTION_READY = 5 } ConnectStatus; @@ -72,6 +73,29 @@ typedef enum } BatchQueryStatus; +/* Enumeration to track whether a task is ready to run and, if not, what it's blocked on*/ +typedef enum TaskExecutionStatus +{ + TASK_STATUS_INVALID = 0, + TASK_STATUS_ERROR, /* error occured */ + TASK_STATUS_READY, /* task ready to be processed further */ + TASK_STATUS_SOCKET_READ, /* waiting for connection to become ready for reads */ + TASK_STATUS_SOCKET_WRITE /* waiting for connection to become ready for writes */ +} TaskExecutionStatus; + + +struct pollfd; /* forward declared, to avoid having to include poll.h */ + +typedef struct WaitInfo +{ + int maxWaiters; + struct pollfd *pollfds; + int registeredWaiters; + bool haveReadyWaiter; + bool haveFailedWaiter; +} WaitInfo; + + /* Function declarations for executing client-side (libpq) logic. */ extern int32 MultiClientConnect(const char *nodeName, uint32 nodePort, const char *nodeDatabase, const char *nodeUser); @@ -91,6 +115,13 @@ extern BatchQueryStatus MultiClientBatchResult(int32 connectionId, void **queryR int *rowCount, int *columnCount); extern char * MultiClientGetValue(void *queryResult, int rowIndex, int columnIndex); extern void MultiClientClearResult(void *queryResult); +extern WaitInfo * MultiClientCreateWaitInfo(int maxConnections); + +extern void MultiClientResetWaitInfo(WaitInfo *waitInfo); +extern void MultiClientFreeWaitInfo(WaitInfo *waitInfo); +extern void MultiClientRegisterWait(WaitInfo *waitInfo, TaskExecutionStatus waitStatus, + int32 connectionId); +extern void MultiClientWait(WaitInfo *waitInfo); #endif /* MULTI_CLIENT_EXECUTOR_H */ diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index fa5f03c24..ff769d18e 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -14,6 +14,7 @@ #ifndef MULTI_PHYSICAL_PLANNER_H #define MULTI_PHYSICAL_PLANNER_H +#include "datatype/timestamp.h" #include "distributed/citus_nodes.h" #include "distributed/master_metadata_utility.h" #include "distributed/multi_logical_planner.h" diff --git a/src/include/distributed/multi_server_executor.h b/src/include/distributed/multi_server_executor.h index 96647de64..741829857 100644 --- a/src/include/distributed/multi_server_executor.h +++ b/src/include/distributed/multi_server_executor.h @@ -121,7 +121,7 @@ struct TaskExecution TransmitExecStatus *transmitStatusArray; int32 *connectionIdArray; int32 *fileDescriptorArray; - uint32 connectPollCount; + TimestampTz connectStartTime; uint32 nodeCount; uint32 currentNodeIndex; uint32 querySourceNodeIndex; /* only applies to map fetch tasks */