mirror of https://github.com/citusdata/citus.git
Refactor task tracker cleanup to enable workers receive cleanup jobs
Long sleep is replaced by multiple small sleeps. Maximum timeout is also increased since we do not have to wait for that long most of the cases.pull/563/head
parent
c42c393cc5
commit
bb3eee63e7
|
@ -32,6 +32,7 @@
|
|||
#include "storage/fd.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/hsearch.h"
|
||||
#include "utils/timestamp.h"
|
||||
|
||||
|
||||
int MaxAssignTaskBatchSize = 64; /* maximum number of tasks to assign per round */
|
||||
|
@ -2768,12 +2769,14 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask)
|
|||
{
|
||||
uint64 jobId = jobCleanupTask->jobId;
|
||||
List *taskTrackerList = NIL;
|
||||
ListCell *taskTrackerCell = NULL;
|
||||
long sleepInterval = 0;
|
||||
const int minimumSleepInterval = 150;
|
||||
|
||||
List *remainingTaskTrackerList = NIL;
|
||||
const long timeoutDuration = 4000; /* milliseconds */
|
||||
const long statusCheckInterval = 10000; /* microseconds */
|
||||
bool timedOut = false;
|
||||
TimestampTz startTime = 0;
|
||||
TaskTracker *taskTracker = NULL;
|
||||
HASH_SEQ_STATUS status;
|
||||
|
||||
hash_seq_init(&status, taskTrackerHash);
|
||||
|
||||
/* walk over task trackers and try to issue job clean up requests */
|
||||
|
@ -2821,15 +2824,33 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask)
|
|||
taskTracker = (TaskTracker *) hash_seq_search(&status);
|
||||
}
|
||||
|
||||
/* give task trackers time to finish their clean up jobs */
|
||||
sleepInterval = Max(minimumSleepInterval, RemoteTaskCheckInterval * 2) * 1000L;
|
||||
pg_usleep(sleepInterval);
|
||||
/* record the time when we start waiting for cleanup jobs to be sent */
|
||||
startTime = GetCurrentTimestamp();
|
||||
|
||||
/* walk over task trackers to which we sent clean up requests */
|
||||
taskTrackerCell = NULL;
|
||||
foreach(taskTrackerCell, taskTrackerList)
|
||||
/*
|
||||
* Walk over task trackers to which we sent clean up requests. Perform
|
||||
* these checks until it times out.
|
||||
*
|
||||
* We want to determine timedOut flag after the loop start to make sure
|
||||
* we iterate one more time after time out occurs. This is necessary to report
|
||||
* warning messages for timed out cleanup jobs.
|
||||
*/
|
||||
remainingTaskTrackerList = taskTrackerList;
|
||||
while (list_length(remainingTaskTrackerList) > 0 && !timedOut)
|
||||
{
|
||||
TaskTracker *taskTracker = (TaskTracker *) lfirst(taskTrackerCell);
|
||||
List *activeTackTrackerList = remainingTaskTrackerList;
|
||||
ListCell *activeTaskTrackerCell = NULL;
|
||||
TimestampTz currentTime = 0;
|
||||
|
||||
remainingTaskTrackerList = NIL;
|
||||
|
||||
pg_usleep(statusCheckInterval);
|
||||
currentTime = GetCurrentTimestamp();
|
||||
timedOut = TimestampDifferenceExceeds(startTime, currentTime, timeoutDuration);
|
||||
|
||||
foreach(activeTaskTrackerCell, activeTackTrackerList)
|
||||
{
|
||||
TaskTracker *taskTracker = (TaskTracker *) lfirst(activeTaskTrackerCell);
|
||||
int32 connectionId = taskTracker->connectionId;
|
||||
const char *nodeName = taskTracker->workerName;
|
||||
uint32 nodePort = taskTracker->workerPort;
|
||||
|
@ -2840,25 +2861,42 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask)
|
|||
QueryStatus queryStatus = MultiClientQueryStatus(connectionId);
|
||||
if (queryStatus == CLIENT_QUERY_DONE)
|
||||
{
|
||||
ereport(DEBUG4, (errmsg("completed cleanup query for job " UINT64_FORMAT
|
||||
" on node \"%s:%u\"", jobId, nodeName,
|
||||
nodePort)));
|
||||
ereport(DEBUG4, (errmsg("completed cleanup query for job "
|
||||
UINT64_FORMAT, jobId)));
|
||||
|
||||
/* clear connection for future cleanup queries */
|
||||
taskTracker->connectionBusy = false;
|
||||
}
|
||||
else
|
||||
else if (timedOut)
|
||||
{
|
||||
ereport(WARNING, (errmsg("could not receive response for cleanup query "
|
||||
"for job " UINT64_FORMAT " on node \"%s:%u\"",
|
||||
jobId, nodeName, nodePort)));
|
||||
}
|
||||
ereport(WARNING, (errmsg("could not receive response for cleanup "
|
||||
"query status for job " UINT64_FORMAT " "
|
||||
"on node \"%s:%u\" with status %d", jobId,
|
||||
nodeName, nodePort, (int) queryStatus),
|
||||
errhint("Manually clean job resources on node "
|
||||
"\"%s:%u\" by running \"%s\" ", nodeName,
|
||||
nodePort, jobCleanupTask->queryString)));
|
||||
}
|
||||
else
|
||||
{
|
||||
remainingTaskTrackerList = lappend(remainingTaskTrackerList,
|
||||
taskTracker);
|
||||
}
|
||||
}
|
||||
else if (timedOut)
|
||||
{
|
||||
ereport(WARNING, (errmsg("could not receive response for cleanup query "
|
||||
"for job " UINT64_FORMAT " on node \"%s:%u\"",
|
||||
jobId, nodeName, nodePort)));
|
||||
"result for job " UINT64_FORMAT " on node "
|
||||
"\"%s:%u\" with status %d", jobId, nodeName,
|
||||
nodePort, (int) resultStatus),
|
||||
errhint("Manually clean job resources on node "
|
||||
"\"%s:%u\" by running \"%s\" ", nodeName,
|
||||
nodePort, jobCleanupTask->queryString)));
|
||||
}
|
||||
else
|
||||
{
|
||||
remainingTaskTrackerList = lappend(remainingTaskTrackerList, taskTracker);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -104,12 +104,12 @@ DETAIL: Creating dependency on merge taskId 13
|
|||
DEBUG: assigned task 9 to node localhost:57637
|
||||
DEBUG: assigned task 6 to node localhost:57638
|
||||
DEBUG: assigned task 3 to node localhost:57637
|
||||
DEBUG: completed cleanup query for job 1252 on node "localhost:57638"
|
||||
DEBUG: completed cleanup query for job 1252 on node "localhost:57637"
|
||||
DEBUG: completed cleanup query for job 1251 on node "localhost:57638"
|
||||
DEBUG: completed cleanup query for job 1251 on node "localhost:57637"
|
||||
DEBUG: completed cleanup query for job 1250 on node "localhost:57638"
|
||||
DEBUG: completed cleanup query for job 1250 on node "localhost:57637"
|
||||
DEBUG: completed cleanup query for job 1252
|
||||
DEBUG: completed cleanup query for job 1252
|
||||
DEBUG: completed cleanup query for job 1251
|
||||
DEBUG: completed cleanup query for job 1251
|
||||
DEBUG: completed cleanup query for job 1250
|
||||
DEBUG: completed cleanup query for job 1250
|
||||
DEBUG: CommitTransactionCommand
|
||||
l_partkey | o_orderkey | count
|
||||
-----------+------------+-------
|
||||
|
@ -221,12 +221,12 @@ DEBUG: assigned task 3 to node localhost:57638
|
|||
DEBUG: assigned task 6 to node localhost:57637
|
||||
DEBUG: assigned task 9 to node localhost:57638
|
||||
DEBUG: assigned task 12 to node localhost:57637
|
||||
DEBUG: completed cleanup query for job 1255 on node "localhost:57638"
|
||||
DEBUG: completed cleanup query for job 1255 on node "localhost:57637"
|
||||
DEBUG: completed cleanup query for job 1253 on node "localhost:57638"
|
||||
DEBUG: completed cleanup query for job 1253 on node "localhost:57637"
|
||||
DEBUG: completed cleanup query for job 1254 on node "localhost:57638"
|
||||
DEBUG: completed cleanup query for job 1254 on node "localhost:57637"
|
||||
DEBUG: completed cleanup query for job 1255
|
||||
DEBUG: completed cleanup query for job 1255
|
||||
DEBUG: completed cleanup query for job 1253
|
||||
DEBUG: completed cleanup query for job 1253
|
||||
DEBUG: completed cleanup query for job 1254
|
||||
DEBUG: completed cleanup query for job 1254
|
||||
DEBUG: CommitTransactionCommand
|
||||
l_partkey | o_orderkey | count
|
||||
-----------+------------+-------
|
||||
|
|
|
@ -104,12 +104,12 @@ DETAIL: Creating dependency on merge taskId 13
|
|||
DEBUG: assigned task 9 to node localhost:57637
|
||||
DEBUG: assigned task 6 to node localhost:57638
|
||||
DEBUG: assigned task 3 to node localhost:57637
|
||||
DEBUG: completed cleanup query for job 1252 on node "localhost:57638"
|
||||
DEBUG: completed cleanup query for job 1252 on node "localhost:57637"
|
||||
DEBUG: completed cleanup query for job 1251 on node "localhost:57638"
|
||||
DEBUG: completed cleanup query for job 1251 on node "localhost:57637"
|
||||
DEBUG: completed cleanup query for job 1250 on node "localhost:57638"
|
||||
DEBUG: completed cleanup query for job 1250 on node "localhost:57637"
|
||||
DEBUG: completed cleanup query for job 1252
|
||||
DEBUG: completed cleanup query for job 1252
|
||||
DEBUG: completed cleanup query for job 1251
|
||||
DEBUG: completed cleanup query for job 1251
|
||||
DEBUG: completed cleanup query for job 1250
|
||||
DEBUG: completed cleanup query for job 1250
|
||||
DEBUG: CommitTransactionCommand
|
||||
l_partkey | o_orderkey | count
|
||||
-----------+------------+-------
|
||||
|
@ -221,12 +221,12 @@ DEBUG: assigned task 3 to node localhost:57638
|
|||
DEBUG: assigned task 6 to node localhost:57637
|
||||
DEBUG: assigned task 9 to node localhost:57638
|
||||
DEBUG: assigned task 12 to node localhost:57637
|
||||
DEBUG: completed cleanup query for job 1255 on node "localhost:57638"
|
||||
DEBUG: completed cleanup query for job 1255 on node "localhost:57637"
|
||||
DEBUG: completed cleanup query for job 1253 on node "localhost:57638"
|
||||
DEBUG: completed cleanup query for job 1253 on node "localhost:57637"
|
||||
DEBUG: completed cleanup query for job 1254 on node "localhost:57638"
|
||||
DEBUG: completed cleanup query for job 1254 on node "localhost:57637"
|
||||
DEBUG: completed cleanup query for job 1255
|
||||
DEBUG: completed cleanup query for job 1255
|
||||
DEBUG: completed cleanup query for job 1253
|
||||
DEBUG: completed cleanup query for job 1253
|
||||
DEBUG: completed cleanup query for job 1254
|
||||
DEBUG: completed cleanup query for job 1254
|
||||
DEBUG: CommitTransactionCommand
|
||||
l_partkey | o_orderkey | count
|
||||
-----------+------------+-------
|
||||
|
|
Loading…
Reference in New Issue