Merge pull request #563 from citusdata/fix/345_task_cleanup_logic

make task cleanup logic less error prone
pull/1938/head
Murat Tuncer 2016-06-09 17:53:35 +03:00
commit c952a0e006
3 changed files with 95 additions and 57 deletions

View File

@ -32,6 +32,7 @@
#include "storage/fd.h"
#include "utils/builtins.h"
#include "utils/hsearch.h"
#include "utils/timestamp.h"
int MaxAssignTaskBatchSize = 64; /* maximum number of tasks to assign per round */
@ -2768,12 +2769,14 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask)
{
uint64 jobId = jobCleanupTask->jobId;
List *taskTrackerList = NIL;
ListCell *taskTrackerCell = NULL;
long sleepInterval = 0;
const int minimumSleepInterval = 150;
List *remainingTaskTrackerList = NIL;
const long timeoutDuration = 4000; /* milliseconds */
const long statusCheckInterval = 10000; /* microseconds */
bool timedOut = false;
TimestampTz startTime = 0;
TaskTracker *taskTracker = NULL;
HASH_SEQ_STATUS status;
hash_seq_init(&status, taskTrackerHash);
/* walk over task trackers and try to issue job clean up requests */
@ -2821,45 +2824,80 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask)
taskTracker = (TaskTracker *) hash_seq_search(&status);
}
/* give task trackers time to finish their clean up jobs */
sleepInterval = Max(minimumSleepInterval, RemoteTaskCheckInterval * 2) * 1000L;
pg_usleep(sleepInterval);
/* record the time when we start waiting for cleanup jobs to be sent */
startTime = GetCurrentTimestamp();
/* walk over task trackers to which we sent clean up requests */
taskTrackerCell = NULL;
foreach(taskTrackerCell, taskTrackerList)
/*
* Walk over task trackers to which we sent clean up requests. Perform
* these checks until it times out.
*
* We want to determine timedOut flag after the loop start to make sure
* we iterate one more time after time out occurs. This is necessary to report
* warning messages for timed out cleanup jobs.
*/
remainingTaskTrackerList = taskTrackerList;
while (list_length(remainingTaskTrackerList) > 0 && !timedOut)
{
TaskTracker *taskTracker = (TaskTracker *) lfirst(taskTrackerCell);
int32 connectionId = taskTracker->connectionId;
const char *nodeName = taskTracker->workerName;
uint32 nodePort = taskTracker->workerPort;
List *activeTackTrackerList = remainingTaskTrackerList;
ListCell *activeTaskTrackerCell = NULL;
TimestampTz currentTime = 0;
ResultStatus resultStatus = MultiClientResultStatus(connectionId);
if (resultStatus == CLIENT_RESULT_READY)
remainingTaskTrackerList = NIL;
pg_usleep(statusCheckInterval);
currentTime = GetCurrentTimestamp();
timedOut = TimestampDifferenceExceeds(startTime, currentTime, timeoutDuration);
foreach(activeTaskTrackerCell, activeTackTrackerList)
{
QueryStatus queryStatus = MultiClientQueryStatus(connectionId);
if (queryStatus == CLIENT_QUERY_DONE)
{
ereport(DEBUG4, (errmsg("completed cleanup query for job " UINT64_FORMAT
" on node \"%s:%u\"", jobId, nodeName,
nodePort)));
TaskTracker *taskTracker = (TaskTracker *) lfirst(activeTaskTrackerCell);
int32 connectionId = taskTracker->connectionId;
const char *nodeName = taskTracker->workerName;
uint32 nodePort = taskTracker->workerPort;
/* clear connection for future cleanup queries */
taskTracker->connectionBusy = false;
ResultStatus resultStatus = MultiClientResultStatus(connectionId);
if (resultStatus == CLIENT_RESULT_READY)
{
QueryStatus queryStatus = MultiClientQueryStatus(connectionId);
if (queryStatus == CLIENT_QUERY_DONE)
{
ereport(DEBUG4, (errmsg("completed cleanup query for job "
UINT64_FORMAT, jobId)));
/* clear connection for future cleanup queries */
taskTracker->connectionBusy = false;
}
else if (timedOut)
{
ereport(WARNING, (errmsg("could not receive response for cleanup "
"query status for job " UINT64_FORMAT " "
"on node \"%s:%u\" with status %d", jobId,
nodeName, nodePort, (int) queryStatus),
errhint("Manually clean job resources on node "
"\"%s:%u\" by running \"%s\" ", nodeName,
nodePort, jobCleanupTask->queryString)));
}
else
{
remainingTaskTrackerList = lappend(remainingTaskTrackerList,
taskTracker);
}
}
else if (timedOut)
{
ereport(WARNING, (errmsg("could not receive response for cleanup query "
"result for job " UINT64_FORMAT " on node "
"\"%s:%u\" with status %d", jobId, nodeName,
nodePort, (int) resultStatus),
errhint("Manually clean job resources on node "
"\"%s:%u\" by running \"%s\" ", nodeName,
nodePort, jobCleanupTask->queryString)));
}
else
{
ereport(WARNING, (errmsg("could not receive response for cleanup query "
"for job " UINT64_FORMAT " on node \"%s:%u\"",
jobId, nodeName, nodePort)));
remainingTaskTrackerList = lappend(remainingTaskTrackerList, taskTracker);
}
}
else
{
ereport(WARNING, (errmsg("could not receive response for cleanup query "
"for job " UINT64_FORMAT " on node \"%s:%u\"",
jobId, nodeName, nodePort)));
}
}
}

View File

@ -104,12 +104,12 @@ DETAIL: Creating dependency on merge taskId 13
DEBUG: assigned task 9 to node localhost:57637
DEBUG: assigned task 6 to node localhost:57638
DEBUG: assigned task 3 to node localhost:57637
DEBUG: completed cleanup query for job 1252 on node "localhost:57638"
DEBUG: completed cleanup query for job 1252 on node "localhost:57637"
DEBUG: completed cleanup query for job 1251 on node "localhost:57638"
DEBUG: completed cleanup query for job 1251 on node "localhost:57637"
DEBUG: completed cleanup query for job 1250 on node "localhost:57638"
DEBUG: completed cleanup query for job 1250 on node "localhost:57637"
DEBUG: completed cleanup query for job 1252
DEBUG: completed cleanup query for job 1252
DEBUG: completed cleanup query for job 1251
DEBUG: completed cleanup query for job 1251
DEBUG: completed cleanup query for job 1250
DEBUG: completed cleanup query for job 1250
DEBUG: CommitTransactionCommand
l_partkey | o_orderkey | count
-----------+------------+-------
@ -221,12 +221,12 @@ DEBUG: assigned task 3 to node localhost:57638
DEBUG: assigned task 6 to node localhost:57637
DEBUG: assigned task 9 to node localhost:57638
DEBUG: assigned task 12 to node localhost:57637
DEBUG: completed cleanup query for job 1255 on node "localhost:57638"
DEBUG: completed cleanup query for job 1255 on node "localhost:57637"
DEBUG: completed cleanup query for job 1253 on node "localhost:57638"
DEBUG: completed cleanup query for job 1253 on node "localhost:57637"
DEBUG: completed cleanup query for job 1254 on node "localhost:57638"
DEBUG: completed cleanup query for job 1254 on node "localhost:57637"
DEBUG: completed cleanup query for job 1255
DEBUG: completed cleanup query for job 1255
DEBUG: completed cleanup query for job 1253
DEBUG: completed cleanup query for job 1253
DEBUG: completed cleanup query for job 1254
DEBUG: completed cleanup query for job 1254
DEBUG: CommitTransactionCommand
l_partkey | o_orderkey | count
-----------+------------+-------

View File

@ -104,12 +104,12 @@ DETAIL: Creating dependency on merge taskId 13
DEBUG: assigned task 9 to node localhost:57637
DEBUG: assigned task 6 to node localhost:57638
DEBUG: assigned task 3 to node localhost:57637
DEBUG: completed cleanup query for job 1252 on node "localhost:57638"
DEBUG: completed cleanup query for job 1252 on node "localhost:57637"
DEBUG: completed cleanup query for job 1251 on node "localhost:57638"
DEBUG: completed cleanup query for job 1251 on node "localhost:57637"
DEBUG: completed cleanup query for job 1250 on node "localhost:57638"
DEBUG: completed cleanup query for job 1250 on node "localhost:57637"
DEBUG: completed cleanup query for job 1252
DEBUG: completed cleanup query for job 1252
DEBUG: completed cleanup query for job 1251
DEBUG: completed cleanup query for job 1251
DEBUG: completed cleanup query for job 1250
DEBUG: completed cleanup query for job 1250
DEBUG: CommitTransactionCommand
l_partkey | o_orderkey | count
-----------+------------+-------
@ -221,12 +221,12 @@ DEBUG: assigned task 3 to node localhost:57638
DEBUG: assigned task 6 to node localhost:57637
DEBUG: assigned task 9 to node localhost:57638
DEBUG: assigned task 12 to node localhost:57637
DEBUG: completed cleanup query for job 1255 on node "localhost:57638"
DEBUG: completed cleanup query for job 1255 on node "localhost:57637"
DEBUG: completed cleanup query for job 1253 on node "localhost:57638"
DEBUG: completed cleanup query for job 1253 on node "localhost:57637"
DEBUG: completed cleanup query for job 1254 on node "localhost:57638"
DEBUG: completed cleanup query for job 1254 on node "localhost:57637"
DEBUG: completed cleanup query for job 1255
DEBUG: completed cleanup query for job 1255
DEBUG: completed cleanup query for job 1253
DEBUG: completed cleanup query for job 1253
DEBUG: completed cleanup query for job 1254
DEBUG: completed cleanup query for job 1254
DEBUG: CommitTransactionCommand
l_partkey | o_orderkey | count
-----------+------------+-------