Merge pull request #851 from citusdata/task_tracker_batching

Batch task_tracker_status calls to reduce task-tracker query times
pull/1314/head
Marco Slot 2017-04-03 12:17:13 +02:00 committed by GitHub
commit de034d2cab
2 changed files with 140 additions and 90 deletions

View File

@ -39,6 +39,8 @@
int MaxAssignTaskBatchSize = 64; /* maximum number of tasks to assign per round */ int MaxAssignTaskBatchSize = 64; /* maximum number of tasks to assign per round */
int MaxTaskStatusBatchSize = 64; /* maximum number of tasks status checks per round */
/* TaskMapKey is used as a key in task hash */ /* TaskMapKey is used as a key in task hash */
typedef struct TaskMapKey typedef struct TaskMapKey
@ -117,8 +119,9 @@ static void ManageTaskTracker(TaskTracker *taskTracker);
static bool TrackerConnectionUp(TaskTracker *taskTracker); static bool TrackerConnectionUp(TaskTracker *taskTracker);
static void TrackerReconnectPoll(TaskTracker *taskTracker); static void TrackerReconnectPoll(TaskTracker *taskTracker);
static List * AssignQueuedTasks(TaskTracker *taskTracker); static List * AssignQueuedTasks(TaskTracker *taskTracker);
static int32 NextRunningTaskIndex(List *assignedTaskList, int32 currentTaskIndex); static List * TaskStatusBatchList(TaskTracker *taskTracker);
static TaskStatus TaskStatusQueryResponse(int32 connectionId); static StringInfo TaskStatusBatchQuery(List *taskList);
static void ReceiveTaskStatusBatchQueryResponse(TaskTracker *taskTracker);
static void ManageTransmitTracker(TaskTracker *transmitTracker); static void ManageTransmitTracker(TaskTracker *transmitTracker);
static TrackerTaskState * NextQueuedFileTransmit(HTAB *taskStateHash); static TrackerTaskState * NextQueuedFileTransmit(HTAB *taskStateHash);
@ -2179,46 +2182,42 @@ ManageTaskTracker(TaskTracker *taskTracker)
} }
/* /*
* (2) We find an assigned task. We then send an asynchronous query to check * (2) We find assigned tasks. We then send an asynchronous query to check
* task's status. * the tasks' statuses.
*/ */
if (!taskTracker->connectionBusy) if (!taskTracker->connectionBusy)
{ {
List *assignedTaskList = taskTracker->assignedTaskList; List *taskStatusBatchList = TaskStatusBatchList(taskTracker);
int32 currentTaskIndex = taskTracker->currentTaskIndex;
int32 nextTaskIndex = -1;
nextTaskIndex = NextRunningTaskIndex(assignedTaskList, currentTaskIndex); /* if we have running tasks, check their status */
taskTracker->currentTaskIndex = nextTaskIndex; if (taskStatusBatchList)
/* if we have a running task's index, check task's status */
if (nextTaskIndex != -1)
{ {
int32 connectionId = taskTracker->connectionId; int32 connectionId = taskTracker->connectionId;
TrackerTaskState *taskState = NULL; StringInfo taskStatusBatchQuery = NULL;
StringInfo taskStatusQuery = NULL;
bool querySent = false; bool querySent = false;
taskState = (TrackerTaskState *) list_nth(assignedTaskList, nextTaskIndex); taskStatusBatchQuery = TaskStatusBatchQuery(taskStatusBatchList);
Assert(taskState != NULL);
taskStatusQuery = makeStringInfo(); querySent = MultiClientSendQuery(connectionId, taskStatusBatchQuery->data);
appendStringInfo(taskStatusQuery, TASK_STATUS_QUERY,
taskState->jobId, taskState->taskId);
querySent = MultiClientSendQuery(connectionId, taskStatusQuery->data);
if (querySent) if (querySent)
{ {
taskTracker->connectionBusy = true; taskTracker->connectionBusy = true;
taskTracker->connectionBusyOnTask = taskState; taskTracker->connectionBusyOnTaskList = taskStatusBatchList;
} }
else else
{ {
/* mark only first task in list as failed */
TrackerTaskState *taskState = (TrackerTaskState *) linitial(
taskStatusBatchList);
taskState->status = TASK_CLIENT_SIDE_STATUS_FAILED; taskState->status = TASK_CLIENT_SIDE_STATUS_FAILED;
list_free(taskStatusBatchList);
taskTracker->connectionBusy = false; taskTracker->connectionBusy = false;
taskTracker->connectionBusyOnTask = NULL; taskTracker->connectionBusyOnTaskList = NIL;
} }
pfree(taskStatusBatchQuery);
} }
} }
@ -2229,25 +2228,28 @@ ManageTaskTracker(TaskTracker *taskTracker)
{ {
int32 connectionId = taskTracker->connectionId; int32 connectionId = taskTracker->connectionId;
ResultStatus resultStatus = CLIENT_INVALID_RESULT_STATUS; ResultStatus resultStatus = CLIENT_INVALID_RESULT_STATUS;
TrackerTaskState *taskState = taskTracker->connectionBusyOnTask;
Assert(taskState != NULL);
/* if connection is available, update task status accordingly */ /* if connection is available, update task status accordingly */
resultStatus = MultiClientResultStatus(connectionId); resultStatus = MultiClientResultStatus(connectionId);
if (resultStatus == CLIENT_RESULT_READY) if (resultStatus == CLIENT_RESULT_READY)
{ {
taskState->status = TaskStatusQueryResponse(connectionId); ReceiveTaskStatusBatchQueryResponse(taskTracker);
} }
else if (resultStatus == CLIENT_RESULT_UNAVAILABLE) else if (resultStatus == CLIENT_RESULT_UNAVAILABLE)
{ {
TrackerTaskState *taskState = (TrackerTaskState *) linitial(
taskTracker->connectionBusyOnTaskList);
Assert(taskState != NULL);
taskState->status = TASK_CLIENT_SIDE_STATUS_FAILED; taskState->status = TASK_CLIENT_SIDE_STATUS_FAILED;
} }
/* if connection is available, give it back to the task tracker */ /* if connection is available, give it back to the task tracker */
if (resultStatus != CLIENT_RESULT_BUSY) if (resultStatus != CLIENT_RESULT_BUSY)
{ {
list_free(taskTracker->connectionBusyOnTaskList);
taskTracker->connectionBusy = false; taskTracker->connectionBusy = false;
taskTracker->connectionBusyOnTask = NULL; taskTracker->connectionBusyOnTaskList = NIL;
} }
} }
} }
@ -2325,7 +2327,7 @@ AssignQueuedTasks(TaskTracker *taskTracker)
List *assignedTaskList = NIL; List *assignedTaskList = NIL;
uint32 taskAssignmentCount = 0; uint32 taskAssignmentCount = 0;
List *tasksToAssignList = NIL; List *tasksToAssignList = NIL;
StringInfo multiAssignQuery = makeStringInfo(); StringInfo assignTaskBatchQuery = makeStringInfo();
int32 connectionId = taskTracker->connectionId; int32 connectionId = taskTracker->connectionId;
HASH_SEQ_STATUS status; HASH_SEQ_STATUS status;
@ -2339,12 +2341,7 @@ AssignQueuedTasks(TaskTracker *taskTracker)
{ {
StringInfo taskAssignmentQuery = taskState->taskAssignmentQuery; StringInfo taskAssignmentQuery = taskState->taskAssignmentQuery;
if (taskAssignmentCount > 0) appendStringInfo(assignTaskBatchQuery, "%s", taskAssignmentQuery->data);
{
appendStringInfo(multiAssignQuery, ";");
}
appendStringInfo(multiAssignQuery, "%s", taskAssignmentQuery->data);
tasksToAssignList = lappend(tasksToAssignList, taskState); tasksToAssignList = lappend(tasksToAssignList, taskState);
taskAssignmentCount++; taskAssignmentCount++;
@ -2365,7 +2362,8 @@ AssignQueuedTasks(TaskTracker *taskTracker)
int columnCount = 0; int columnCount = 0;
ListCell *taskCell = NULL; ListCell *taskCell = NULL;
bool batchSuccess = MultiClientSendQuery(connectionId, multiAssignQuery->data); bool batchSuccess = MultiClientSendQuery(connectionId,
assignTaskBatchQuery->data);
foreach(taskCell, tasksToAssignList) foreach(taskCell, tasksToAssignList)
{ {
@ -2398,6 +2396,7 @@ AssignQueuedTasks(TaskTracker *taskTracker)
MultiClientBatchResult(connectionId, &queryResult, &rowCount, &columnCount); MultiClientBatchResult(connectionId, &queryResult, &rowCount, &columnCount);
Assert(queryResult == NULL); Assert(queryResult == NULL);
pfree(assignTaskBatchQuery);
list_free(tasksToAssignList); list_free(tasksToAssignList);
} }
@ -2406,40 +2405,44 @@ AssignQueuedTasks(TaskTracker *taskTracker)
/* /*
* NextRunningTaskIndex walks over the tasks in the given list, and looks for a * TaskStatusBatchList returns a list containing up to MaxTaskStatusBatchSize
* task that was still running as of the last time we checked. If such a task * tasks from the list of assigned tasks. When the number of tasks is greater
* exists (before wrapping around the assigned task list), the function returns * than the maximum, the next call of this function will continue in the
* that task's index in the list. Else, the function returns -1 to signal either * assigned task list after the last task that was added to the current list.
* no such task exists or that the function may be called again to search from *
* the beginning of the task list. * In some cases the list may be empty even if tasks have been assigned due to
* wrap-around, namely if we first generate a batch of MaxTaskStatusBatchSize,
* but none of the remaining tasks in assignedTaskList are running.
*/ */
static int32 static List *
NextRunningTaskIndex(List *assignedTaskList, int32 currentTaskIndex) TaskStatusBatchList(TaskTracker *taskTracker)
{ {
int32 nextTaskIndex = -1; int32 assignedTaskCount = 0;
int32 lastTaskIndex = -1;
int32 assignedTaskIndex = 0; int32 assignedTaskIndex = 0;
ListCell *assignedTaskCell = NULL; List *assignedTaskList = taskTracker->assignedTaskList;
List *taskStatusBatchList = NIL;
ListCell *taskCell = NULL;
int32 currentTaskIndex = 0;
int32 lastTaskIndex = 0;
int32 assignedTaskCount = list_length(assignedTaskList); assignedTaskCount = list_length(assignedTaskList);
if (assignedTaskCount == 0) if (assignedTaskCount == 0)
{ {
return -1; return NIL;
} }
lastTaskIndex = (assignedTaskCount - 1); lastTaskIndex = (assignedTaskCount - 1);
currentTaskIndex = taskTracker->currentTaskIndex;
if (currentTaskIndex >= lastTaskIndex) if (currentTaskIndex >= lastTaskIndex)
{ {
currentTaskIndex = -1; currentTaskIndex = -1;
} }
assignedTaskCell = NULL; foreach(taskCell, assignedTaskList)
foreach(assignedTaskCell, assignedTaskList)
{ {
TrackerTaskState *assignedTask = (TrackerTaskState *) lfirst(assignedTaskCell); TrackerTaskState *assignedTask = (TrackerTaskState *) lfirst(taskCell);
TaskStatus taskStatus = assignedTask->status; TaskStatus taskStatus = assignedTask->status;
/* task tracker retries tasks that only failed once (task_failed) */
bool taskRunning = false; bool taskRunning = false;
if (taskStatus == TASK_ASSIGNED || taskStatus == TASK_SCHEDULED || if (taskStatus == TASK_ASSIGNED || taskStatus == TASK_SCHEDULED ||
taskStatus == TASK_RUNNING || taskStatus == TASK_FAILED) taskStatus == TASK_RUNNING || taskStatus == TASK_FAILED)
@ -2449,36 +2452,70 @@ NextRunningTaskIndex(List *assignedTaskList, int32 currentTaskIndex)
if (taskRunning && (assignedTaskIndex > currentTaskIndex)) if (taskRunning && (assignedTaskIndex > currentTaskIndex))
{ {
nextTaskIndex = assignedTaskIndex; taskStatusBatchList = lappend(taskStatusBatchList, assignedTask);
if (list_length(taskStatusBatchList) >= MaxTaskStatusBatchSize)
{
break; break;
} }
}
assignedTaskIndex++; assignedTaskIndex++;
} }
/* /* continue where we left off next time this function is called */
* We might have wrapped around the assigned task list and found nothing. If taskTracker->currentTaskIndex = assignedTaskIndex;
* that's the case, we'll start looking from the beginning next time.
*/ return taskStatusBatchList;
return nextTaskIndex;
} }
/* /*
* TaskStatusQueryResponse assumes that a task status query has been previously * TaskStatusBatchQuery builds a command string containing multiple
* sent on the given connection, and reads the response for this status query. * task_tracker_task_status queries from a TrackerTaskState list.
*/ */
static TaskStatus static StringInfo
TaskStatusQueryResponse(int32 connectionId) TaskStatusBatchQuery(List *taskList)
{ {
TaskStatus taskStatus = TASK_STATUS_INVALID_FIRST; StringInfo taskStatusBatchQuery = makeStringInfo();
void *queryResult = NULL; ListCell *taskCell = NULL;
foreach(taskCell, taskList)
{
TrackerTaskState *taskState = (TrackerTaskState *) lfirst(taskCell);
appendStringInfo(taskStatusBatchQuery, TASK_STATUS_QUERY,
taskState->jobId, taskState->taskId);
}
return taskStatusBatchQuery;
}
/*
* ReceiveTaskStatusBatchQueryResponse assumes that a batch of task status
* queries have been previously sent to the given task tracker, and receives
* and processes the responses for these status queries. If a status check fails
* only one task status is marked as failed and the remainder is considered not
* executed.
*/
static void
ReceiveTaskStatusBatchQueryResponse(TaskTracker *taskTracker)
{
ListCell *taskCell = NULL;
List *checkedTaskList = taskTracker->connectionBusyOnTaskList;
int32 connectionId = taskTracker->connectionId;
int rowCount = 0; int rowCount = 0;
int columnCount = 0; int columnCount = 0;
void *queryResult = NULL;
bool resultReceived = MultiClientQueryResult(connectionId, &queryResult, foreach(taskCell, checkedTaskList)
{
TrackerTaskState *checkedTask = (TrackerTaskState *) lfirst(taskCell);
TaskStatus taskStatus = TASK_STATUS_INVALID_FIRST;
BatchQueryStatus queryStatus = MultiClientBatchResult(connectionId, &queryResult,
&rowCount, &columnCount); &rowCount, &columnCount);
if (resultReceived) if (queryStatus == CLIENT_BATCH_QUERY_CONTINUE)
{ {
char *valueString = MultiClientGetValue(queryResult, 0, 0); char *valueString = MultiClientGetValue(queryResult, 0, 0);
if (valueString == NULL || (*valueString) == '\0') if (valueString == NULL || (*valueString) == '\0')
@ -2506,9 +2543,20 @@ TaskStatusQueryResponse(int32 connectionId)
taskStatus = TASK_CLIENT_SIDE_STATUS_FAILED; taskStatus = TASK_CLIENT_SIDE_STATUS_FAILED;
} }
checkedTask->status = taskStatus;
MultiClientClearResult(queryResult); MultiClientClearResult(queryResult);
return taskStatus; if (queryStatus == CLIENT_BATCH_QUERY_FAILED)
{
/* remaining queries were not executed */
break;
}
}
/* call MultiClientBatchResult one more time to finish reading results */
MultiClientBatchResult(connectionId, &queryResult, &rowCount, &columnCount);
Assert(queryResult == NULL);
} }
@ -2713,6 +2761,7 @@ TrackerHashWaitActiveRequest(HTAB *taskTrackerHash)
{ {
taskTracker->connectionBusy = false; taskTracker->connectionBusy = false;
taskTracker->connectionBusyOnTask = NULL; taskTracker->connectionBusyOnTask = NULL;
taskTracker->connectionBusyOnTaskList = NIL;
} }
} }

View File

@ -31,8 +31,8 @@
/* Task tracker executor related defines */ /* Task tracker executor related defines */
#define TASK_ASSIGNMENT_QUERY "SELECT task_tracker_assign_task \ #define TASK_ASSIGNMENT_QUERY "SELECT task_tracker_assign_task \
("UINT64_FORMAT ", %u, %s)" ("UINT64_FORMAT ", %u, %s);"
#define TASK_STATUS_QUERY "SELECT task_tracker_task_status("UINT64_FORMAT ", %u)" #define TASK_STATUS_QUERY "SELECT task_tracker_task_status("UINT64_FORMAT ", %u);"
#define JOB_CLEANUP_QUERY "SELECT task_tracker_cleanup_job("UINT64_FORMAT ")" #define JOB_CLEANUP_QUERY "SELECT task_tracker_cleanup_job("UINT64_FORMAT ")"
#define JOB_CLEANUP_TASK_ID INT_MAX #define JOB_CLEANUP_TASK_ID INT_MAX
@ -163,6 +163,7 @@ typedef struct TaskTracker
int32 currentTaskIndex; int32 currentTaskIndex;
bool connectionBusy; bool connectionBusy;
TrackerTaskState *connectionBusyOnTask; TrackerTaskState *connectionBusyOnTask;
List *connectionBusyOnTaskList;
} TaskTracker; } TaskTracker;