Refactor task tracker cleanup to enable workers receive cleanup jobs

Long sleep is replaced by multiple small sleeps. Maximum timeout
is also increased since we do not have to wait for that long most
of the cases.
pull/689/head
Murat Tuncer 2016-05-30 11:45:43 +03:00 committed by Jason Petersen
parent 0f9232dee4
commit c613c991bf
No known key found for this signature in database
GPG Key ID: 9F1D3510D110ABA9
3 changed files with 95 additions and 57 deletions

View File

@ -32,6 +32,7 @@
#include "storage/fd.h"
#include "utils/builtins.h"
#include "utils/hsearch.h"
#include "utils/timestamp.h"
int MaxAssignTaskBatchSize = 64; /* maximum number of tasks to assign per round */
@ -2765,12 +2766,14 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask)
{
uint64 jobId = jobCleanupTask->jobId;
List *taskTrackerList = NIL;
ListCell *taskTrackerCell = NULL;
long sleepInterval = 0;
const int minimumSleepInterval = 150;
List *remainingTaskTrackerList = NIL;
const long timeoutDuration = 4000; /* milliseconds */
const long statusCheckInterval = 10000; /* microseconds */
bool timedOut = false;
TimestampTz startTime = 0;
TaskTracker *taskTracker = NULL;
HASH_SEQ_STATUS status;
hash_seq_init(&status, taskTrackerHash);
/* walk over task trackers and try to issue job clean up requests */
@ -2818,45 +2821,80 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask)
taskTracker = (TaskTracker *) hash_seq_search(&status);
}
/* give task trackers time to finish their clean up jobs */
sleepInterval = Max(minimumSleepInterval, RemoteTaskCheckInterval * 2) * 1000L;
pg_usleep(sleepInterval);
/* record the time when we start waiting for cleanup jobs to be sent */
startTime = GetCurrentTimestamp();
/* walk over task trackers to which we sent clean up requests */
taskTrackerCell = NULL;
foreach(taskTrackerCell, taskTrackerList)
/*
* Walk over task trackers to which we sent clean up requests. Perform
* these checks until it times out.
*
* We want to determine timedOut flag after the loop start to make sure
* we iterate one more time after time out occurs. This is necessary to report
* warning messages for timed out cleanup jobs.
*/
remainingTaskTrackerList = taskTrackerList;
while (list_length(remainingTaskTrackerList) > 0 && !timedOut)
{
TaskTracker *taskTracker = (TaskTracker *) lfirst(taskTrackerCell);
int32 connectionId = taskTracker->connectionId;
const char *nodeName = taskTracker->workerName;
uint32 nodePort = taskTracker->workerPort;
List *activeTackTrackerList = remainingTaskTrackerList;
ListCell *activeTaskTrackerCell = NULL;
TimestampTz currentTime = 0;
ResultStatus resultStatus = MultiClientResultStatus(connectionId);
if (resultStatus == CLIENT_RESULT_READY)
remainingTaskTrackerList = NIL;
pg_usleep(statusCheckInterval);
currentTime = GetCurrentTimestamp();
timedOut = TimestampDifferenceExceeds(startTime, currentTime, timeoutDuration);
foreach(activeTaskTrackerCell, activeTackTrackerList)
{
QueryStatus queryStatus = MultiClientQueryStatus(connectionId);
if (queryStatus == CLIENT_QUERY_DONE)
{
ereport(DEBUG4, (errmsg("completed cleanup query for job " UINT64_FORMAT
" on node \"%s:%u\"", jobId, nodeName,
nodePort)));
TaskTracker *taskTracker = (TaskTracker *) lfirst(activeTaskTrackerCell);
int32 connectionId = taskTracker->connectionId;
const char *nodeName = taskTracker->workerName;
uint32 nodePort = taskTracker->workerPort;
/* clear connection for future cleanup queries */
taskTracker->connectionBusy = false;
ResultStatus resultStatus = MultiClientResultStatus(connectionId);
if (resultStatus == CLIENT_RESULT_READY)
{
QueryStatus queryStatus = MultiClientQueryStatus(connectionId);
if (queryStatus == CLIENT_QUERY_DONE)
{
ereport(DEBUG4, (errmsg("completed cleanup query for job "
UINT64_FORMAT, jobId)));
/* clear connection for future cleanup queries */
taskTracker->connectionBusy = false;
}
else if (timedOut)
{
ereport(WARNING, (errmsg("could not receive response for cleanup "
"query status for job " UINT64_FORMAT " "
"on node \"%s:%u\" with status %d", jobId,
nodeName, nodePort, (int) queryStatus),
errhint("Manually clean job resources on node "
"\"%s:%u\" by running \"%s\" ", nodeName,
nodePort, jobCleanupTask->queryString)));
}
else
{
remainingTaskTrackerList = lappend(remainingTaskTrackerList,
taskTracker);
}
}
else if (timedOut)
{
ereport(WARNING, (errmsg("could not receive response for cleanup query "
"result for job " UINT64_FORMAT " on node "
"\"%s:%u\" with status %d", jobId, nodeName,
nodePort, (int) resultStatus),
errhint("Manually clean job resources on node "
"\"%s:%u\" by running \"%s\" ", nodeName,
nodePort, jobCleanupTask->queryString)));
}
else
{
ereport(WARNING, (errmsg("could not receive response for cleanup query "
"for job " UINT64_FORMAT " on node \"%s:%u\"",
jobId, nodeName, nodePort)));
remainingTaskTrackerList = lappend(remainingTaskTrackerList, taskTracker);
}
}
else
{
ereport(WARNING, (errmsg("could not receive response for cleanup query "
"for job " UINT64_FORMAT " on node \"%s:%u\"",
jobId, nodeName, nodePort)));
}
}
}

View File

@ -102,12 +102,12 @@ DETAIL: Creating dependency on merge taskId 13
DEBUG: assigned task 6 to node localhost:57637
DEBUG: assigned task 3 to node localhost:57638
DEBUG: assigned task 9 to node localhost:57637
DEBUG: completed cleanup query for job 1252 on node "localhost:57638"
DEBUG: completed cleanup query for job 1252 on node "localhost:57637"
DEBUG: completed cleanup query for job 1251 on node "localhost:57638"
DEBUG: completed cleanup query for job 1251 on node "localhost:57637"
DEBUG: completed cleanup query for job 1250 on node "localhost:57638"
DEBUG: completed cleanup query for job 1250 on node "localhost:57637"
DEBUG: completed cleanup query for job 1252
DEBUG: completed cleanup query for job 1252
DEBUG: completed cleanup query for job 1251
DEBUG: completed cleanup query for job 1251
DEBUG: completed cleanup query for job 1250
DEBUG: completed cleanup query for job 1250
DEBUG: CommitTransactionCommand
l_partkey | o_orderkey | count
-----------+------------+-------
@ -219,12 +219,12 @@ DEBUG: assigned task 3 to node localhost:57638
DEBUG: assigned task 6 to node localhost:57637
DEBUG: assigned task 9 to node localhost:57638
DEBUG: assigned task 12 to node localhost:57637
DEBUG: completed cleanup query for job 1255 on node "localhost:57638"
DEBUG: completed cleanup query for job 1255 on node "localhost:57637"
DEBUG: completed cleanup query for job 1253 on node "localhost:57638"
DEBUG: completed cleanup query for job 1253 on node "localhost:57637"
DEBUG: completed cleanup query for job 1254 on node "localhost:57638"
DEBUG: completed cleanup query for job 1254 on node "localhost:57637"
DEBUG: completed cleanup query for job 1255
DEBUG: completed cleanup query for job 1255
DEBUG: completed cleanup query for job 1253
DEBUG: completed cleanup query for job 1253
DEBUG: completed cleanup query for job 1254
DEBUG: completed cleanup query for job 1254
DEBUG: CommitTransactionCommand
l_partkey | o_orderkey | count
-----------+------------+-------

View File

@ -102,12 +102,12 @@ DETAIL: Creating dependency on merge taskId 13
DEBUG: assigned task 6 to node localhost:57637
DEBUG: assigned task 3 to node localhost:57638
DEBUG: assigned task 9 to node localhost:57637
DEBUG: completed cleanup query for job 1252 on node "localhost:57638"
DEBUG: completed cleanup query for job 1252 on node "localhost:57637"
DEBUG: completed cleanup query for job 1251 on node "localhost:57638"
DEBUG: completed cleanup query for job 1251 on node "localhost:57637"
DEBUG: completed cleanup query for job 1250 on node "localhost:57638"
DEBUG: completed cleanup query for job 1250 on node "localhost:57637"
DEBUG: completed cleanup query for job 1252
DEBUG: completed cleanup query for job 1252
DEBUG: completed cleanup query for job 1251
DEBUG: completed cleanup query for job 1251
DEBUG: completed cleanup query for job 1250
DEBUG: completed cleanup query for job 1250
DEBUG: CommitTransactionCommand
l_partkey | o_orderkey | count
-----------+------------+-------
@ -219,12 +219,12 @@ DEBUG: assigned task 3 to node localhost:57638
DEBUG: assigned task 6 to node localhost:57637
DEBUG: assigned task 9 to node localhost:57638
DEBUG: assigned task 12 to node localhost:57637
DEBUG: completed cleanup query for job 1255 on node "localhost:57638"
DEBUG: completed cleanup query for job 1255 on node "localhost:57637"
DEBUG: completed cleanup query for job 1253 on node "localhost:57638"
DEBUG: completed cleanup query for job 1253 on node "localhost:57637"
DEBUG: completed cleanup query for job 1254 on node "localhost:57638"
DEBUG: completed cleanup query for job 1254 on node "localhost:57637"
DEBUG: completed cleanup query for job 1255
DEBUG: completed cleanup query for job 1255
DEBUG: completed cleanup query for job 1253
DEBUG: completed cleanup query for job 1253
DEBUG: completed cleanup query for job 1254
DEBUG: completed cleanup query for job 1254
DEBUG: CommitTransactionCommand
l_partkey | o_orderkey | count
-----------+------------+-------