diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index 994bc9a5a..1d00452a4 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -39,6 +39,8 @@ int MaxAssignTaskBatchSize = 64; /* maximum number of tasks to assign per round */ +int MaxTaskStatusBatchSize = 64; /* maximum number of tasks status checks per round */ + /* TaskMapKey is used as a key in task hash */ typedef struct TaskMapKey @@ -117,8 +119,9 @@ static void ManageTaskTracker(TaskTracker *taskTracker); static bool TrackerConnectionUp(TaskTracker *taskTracker); static void TrackerReconnectPoll(TaskTracker *taskTracker); static List * AssignQueuedTasks(TaskTracker *taskTracker); -static int32 NextRunningTaskIndex(List *assignedTaskList, int32 currentTaskIndex); -static TaskStatus TaskStatusQueryResponse(int32 connectionId); +static List * TaskStatusBatchList(TaskTracker *taskTracker); +static StringInfo TaskStatusBatchQuery(List *taskList); +static void ReceiveTaskStatusBatchQueryResponse(TaskTracker *taskTracker); static void ManageTransmitTracker(TaskTracker *transmitTracker); static TrackerTaskState * NextQueuedFileTransmit(HTAB *taskStateHash); @@ -2179,46 +2182,42 @@ ManageTaskTracker(TaskTracker *taskTracker) } /* - * (2) We find an assigned task. We then send an asynchronous query to check - * task's status. + * (2) We find assigned tasks. We then send an asynchronous query to check + * the tasks' statuses. */ if (!taskTracker->connectionBusy) { - List *assignedTaskList = taskTracker->assignedTaskList; - int32 currentTaskIndex = taskTracker->currentTaskIndex; - int32 nextTaskIndex = -1; + List *taskStatusBatchList = TaskStatusBatchList(taskTracker); - nextTaskIndex = NextRunningTaskIndex(assignedTaskList, currentTaskIndex); - taskTracker->currentTaskIndex = nextTaskIndex; - - /* if we have a running task's index, check task's status */ - if (nextTaskIndex != -1) + /* if we have running tasks, check their status */ + if (taskStatusBatchList) { int32 connectionId = taskTracker->connectionId; - TrackerTaskState *taskState = NULL; - StringInfo taskStatusQuery = NULL; + StringInfo taskStatusBatchQuery = NULL; bool querySent = false; - taskState = (TrackerTaskState *) list_nth(assignedTaskList, nextTaskIndex); - Assert(taskState != NULL); + taskStatusBatchQuery = TaskStatusBatchQuery(taskStatusBatchList); - taskStatusQuery = makeStringInfo(); - appendStringInfo(taskStatusQuery, TASK_STATUS_QUERY, - taskState->jobId, taskState->taskId); - - querySent = MultiClientSendQuery(connectionId, taskStatusQuery->data); + querySent = MultiClientSendQuery(connectionId, taskStatusBatchQuery->data); if (querySent) { taskTracker->connectionBusy = true; - taskTracker->connectionBusyOnTask = taskState; + taskTracker->connectionBusyOnTaskList = taskStatusBatchList; } else { + /* mark only first task in list as failed */ + TrackerTaskState *taskState = (TrackerTaskState *) linitial( + taskStatusBatchList); taskState->status = TASK_CLIENT_SIDE_STATUS_FAILED; + list_free(taskStatusBatchList); + taskTracker->connectionBusy = false; - taskTracker->connectionBusyOnTask = NULL; + taskTracker->connectionBusyOnTaskList = NIL; } + + pfree(taskStatusBatchQuery); } } @@ -2229,25 +2228,28 @@ ManageTaskTracker(TaskTracker *taskTracker) { int32 connectionId = taskTracker->connectionId; ResultStatus resultStatus = CLIENT_INVALID_RESULT_STATUS; - TrackerTaskState *taskState = taskTracker->connectionBusyOnTask; - Assert(taskState != NULL); /* if connection is available, update task status accordingly */ resultStatus = MultiClientResultStatus(connectionId); if (resultStatus == CLIENT_RESULT_READY) { - taskState->status = TaskStatusQueryResponse(connectionId); + ReceiveTaskStatusBatchQueryResponse(taskTracker); } else if (resultStatus == CLIENT_RESULT_UNAVAILABLE) { + TrackerTaskState *taskState = (TrackerTaskState *) linitial( + taskTracker->connectionBusyOnTaskList); + Assert(taskState != NULL); taskState->status = TASK_CLIENT_SIDE_STATUS_FAILED; } /* if connection is available, give it back to the task tracker */ if (resultStatus != CLIENT_RESULT_BUSY) { + list_free(taskTracker->connectionBusyOnTaskList); + taskTracker->connectionBusy = false; - taskTracker->connectionBusyOnTask = NULL; + taskTracker->connectionBusyOnTaskList = NIL; } } } @@ -2325,7 +2327,7 @@ AssignQueuedTasks(TaskTracker *taskTracker) List *assignedTaskList = NIL; uint32 taskAssignmentCount = 0; List *tasksToAssignList = NIL; - StringInfo multiAssignQuery = makeStringInfo(); + StringInfo assignTaskBatchQuery = makeStringInfo(); int32 connectionId = taskTracker->connectionId; HASH_SEQ_STATUS status; @@ -2339,12 +2341,7 @@ AssignQueuedTasks(TaskTracker *taskTracker) { StringInfo taskAssignmentQuery = taskState->taskAssignmentQuery; - if (taskAssignmentCount > 0) - { - appendStringInfo(multiAssignQuery, ";"); - } - - appendStringInfo(multiAssignQuery, "%s", taskAssignmentQuery->data); + appendStringInfo(assignTaskBatchQuery, "%s", taskAssignmentQuery->data); tasksToAssignList = lappend(tasksToAssignList, taskState); taskAssignmentCount++; @@ -2365,7 +2362,8 @@ AssignQueuedTasks(TaskTracker *taskTracker) int columnCount = 0; ListCell *taskCell = NULL; - bool batchSuccess = MultiClientSendQuery(connectionId, multiAssignQuery->data); + bool batchSuccess = MultiClientSendQuery(connectionId, + assignTaskBatchQuery->data); foreach(taskCell, tasksToAssignList) { @@ -2398,6 +2396,7 @@ AssignQueuedTasks(TaskTracker *taskTracker) MultiClientBatchResult(connectionId, &queryResult, &rowCount, &columnCount); Assert(queryResult == NULL); + pfree(assignTaskBatchQuery); list_free(tasksToAssignList); } @@ -2406,40 +2405,44 @@ AssignQueuedTasks(TaskTracker *taskTracker) /* - * NextRunningTaskIndex walks over the tasks in the given list, and looks for a - * task that was still running as of the last time we checked. If such a task - * exists (before wrapping around the assigned task list), the function returns - * that task's index in the list. Else, the function returns -1 to signal either - * no such task exists or that the function may be called again to search from - * the beginning of the task list. + * TaskStatusBatchList returns a list containing up to MaxTaskStatusBatchSize + * tasks from the list of assigned tasks. When the number of tasks is greater + * than the maximum, the next call of this function will continue in the + * assigned task list after the last task that was added to the current list. + * + * In some cases the list may be empty even if tasks have been assigned due to + * wrap-around, namely if we first generate a batch of MaxTaskStatusBatchSize, + * but none of the remaining tasks in assignedTaskList are running. */ -static int32 -NextRunningTaskIndex(List *assignedTaskList, int32 currentTaskIndex) +static List * +TaskStatusBatchList(TaskTracker *taskTracker) { - int32 nextTaskIndex = -1; - int32 lastTaskIndex = -1; + int32 assignedTaskCount = 0; int32 assignedTaskIndex = 0; - ListCell *assignedTaskCell = NULL; + List *assignedTaskList = taskTracker->assignedTaskList; + List *taskStatusBatchList = NIL; + ListCell *taskCell = NULL; + int32 currentTaskIndex = 0; + int32 lastTaskIndex = 0; - int32 assignedTaskCount = list_length(assignedTaskList); + assignedTaskCount = list_length(assignedTaskList); if (assignedTaskCount == 0) { - return -1; + return NIL; } lastTaskIndex = (assignedTaskCount - 1); + currentTaskIndex = taskTracker->currentTaskIndex; if (currentTaskIndex >= lastTaskIndex) { currentTaskIndex = -1; } - assignedTaskCell = NULL; - foreach(assignedTaskCell, assignedTaskList) + foreach(taskCell, assignedTaskList) { - TrackerTaskState *assignedTask = (TrackerTaskState *) lfirst(assignedTaskCell); + TrackerTaskState *assignedTask = (TrackerTaskState *) lfirst(taskCell); TaskStatus taskStatus = assignedTask->status; - /* task tracker retries tasks that only failed once (task_failed) */ bool taskRunning = false; if (taskStatus == TASK_ASSIGNED || taskStatus == TASK_SCHEDULED || taskStatus == TASK_RUNNING || taskStatus == TASK_FAILED) @@ -2449,66 +2452,111 @@ NextRunningTaskIndex(List *assignedTaskList, int32 currentTaskIndex) if (taskRunning && (assignedTaskIndex > currentTaskIndex)) { - nextTaskIndex = assignedTaskIndex; - break; + taskStatusBatchList = lappend(taskStatusBatchList, assignedTask); + if (list_length(taskStatusBatchList) >= MaxTaskStatusBatchSize) + { + break; + } } assignedTaskIndex++; } - /* - * We might have wrapped around the assigned task list and found nothing. If - * that's the case, we'll start looking from the beginning next time. - */ - return nextTaskIndex; + /* continue where we left off next time this function is called */ + taskTracker->currentTaskIndex = assignedTaskIndex; + + return taskStatusBatchList; } /* - * TaskStatusQueryResponse assumes that a task status query has been previously - * sent on the given connection, and reads the response for this status query. + * TaskStatusBatchQuery builds a command string containing multiple + * task_tracker_task_status queries from a TrackerTaskState list. */ -static TaskStatus -TaskStatusQueryResponse(int32 connectionId) +static StringInfo +TaskStatusBatchQuery(List *taskList) { - TaskStatus taskStatus = TASK_STATUS_INVALID_FIRST; - void *queryResult = NULL; + StringInfo taskStatusBatchQuery = makeStringInfo(); + ListCell *taskCell = NULL; + + foreach(taskCell, taskList) + { + TrackerTaskState *taskState = (TrackerTaskState *) lfirst(taskCell); + + appendStringInfo(taskStatusBatchQuery, TASK_STATUS_QUERY, + taskState->jobId, taskState->taskId); + } + + return taskStatusBatchQuery; +} + + +/* + * ReceiveTaskStatusBatchQueryResponse assumes that a batch of task status + * queries have been previously sent to the given task tracker, and receives + * and processes the responses for these status queries. If a status check fails + * only one task status is marked as failed and the remainder is considered not + * executed. + */ +static void +ReceiveTaskStatusBatchQueryResponse(TaskTracker *taskTracker) +{ + ListCell *taskCell = NULL; + List *checkedTaskList = taskTracker->connectionBusyOnTaskList; + int32 connectionId = taskTracker->connectionId; int rowCount = 0; int columnCount = 0; + void *queryResult = NULL; - bool resultReceived = MultiClientQueryResult(connectionId, &queryResult, - &rowCount, &columnCount); - if (resultReceived) + foreach(taskCell, checkedTaskList) { - char *valueString = MultiClientGetValue(queryResult, 0, 0); - if (valueString == NULL || (*valueString) == '\0') + TrackerTaskState *checkedTask = (TrackerTaskState *) lfirst(taskCell); + TaskStatus taskStatus = TASK_STATUS_INVALID_FIRST; + + BatchQueryStatus queryStatus = MultiClientBatchResult(connectionId, &queryResult, + &rowCount, &columnCount); + if (queryStatus == CLIENT_BATCH_QUERY_CONTINUE) { - taskStatus = TASK_PERMANENTLY_FAILED; + char *valueString = MultiClientGetValue(queryResult, 0, 0); + if (valueString == NULL || (*valueString) == '\0') + { + taskStatus = TASK_PERMANENTLY_FAILED; + } + else + { + char *valueStringEnd = NULL; + errno = 0; + + taskStatus = strtoul(valueString, &valueStringEnd, 0); + if (errno != 0 || (*valueStringEnd) != '\0') + { + /* we couldn't parse received integer */ + taskStatus = TASK_PERMANENTLY_FAILED; + } + + Assert(taskStatus > TASK_STATUS_INVALID_FIRST); + Assert(taskStatus < TASK_STATUS_LAST); + } } else { - char *valueStringEnd = NULL; - errno = 0; + taskStatus = TASK_CLIENT_SIDE_STATUS_FAILED; + } - taskStatus = strtoul(valueString, &valueStringEnd, 0); - if (errno != 0 || (*valueStringEnd) != '\0') - { - /* we couldn't parse received integer */ - taskStatus = TASK_PERMANENTLY_FAILED; - } + checkedTask->status = taskStatus; - Assert(taskStatus > TASK_STATUS_INVALID_FIRST); - Assert(taskStatus < TASK_STATUS_LAST); + MultiClientClearResult(queryResult); + + if (queryStatus == CLIENT_BATCH_QUERY_FAILED) + { + /* remaining queries were not executed */ + break; } } - else - { - taskStatus = TASK_CLIENT_SIDE_STATUS_FAILED; - } - MultiClientClearResult(queryResult); - - return taskStatus; + /* call MultiClientBatchResult one more time to finish reading results */ + MultiClientBatchResult(connectionId, &queryResult, &rowCount, &columnCount); + Assert(queryResult == NULL); } @@ -2713,6 +2761,7 @@ TrackerHashWaitActiveRequest(HTAB *taskTrackerHash) { taskTracker->connectionBusy = false; taskTracker->connectionBusyOnTask = NULL; + taskTracker->connectionBusyOnTaskList = NIL; } } diff --git a/src/include/distributed/multi_server_executor.h b/src/include/distributed/multi_server_executor.h index fd4713cbe..5f3b5a8e8 100644 --- a/src/include/distributed/multi_server_executor.h +++ b/src/include/distributed/multi_server_executor.h @@ -31,8 +31,8 @@ /* Task tracker executor related defines */ #define TASK_ASSIGNMENT_QUERY "SELECT task_tracker_assign_task \ - ("UINT64_FORMAT ", %u, %s)" -#define TASK_STATUS_QUERY "SELECT task_tracker_task_status("UINT64_FORMAT ", %u)" + ("UINT64_FORMAT ", %u, %s);" +#define TASK_STATUS_QUERY "SELECT task_tracker_task_status("UINT64_FORMAT ", %u);" #define JOB_CLEANUP_QUERY "SELECT task_tracker_cleanup_job("UINT64_FORMAT ")" #define JOB_CLEANUP_TASK_ID INT_MAX @@ -163,6 +163,7 @@ typedef struct TaskTracker int32 currentTaskIndex; bool connectionBusy; TrackerTaskState *connectionBusyOnTask; + List *connectionBusyOnTaskList; } TaskTracker;