From bb3eee63e7bec9fb6c47ef1bd1b87cd3de1d6baf Mon Sep 17 00:00:00 2001 From: Murat Tuncer Date: Mon, 30 May 2016 11:45:43 +0300 Subject: [PATCH] Refactor task tracker cleanup to enable workers receive cleanup jobs Long sleep is replaced by multiple small sleeps. Maximum timeout is also increased since we do not have to wait for that long most of the cases. --- .../executor/multi_task_tracker_executor.c | 104 ++++++++++++------ .../multi_large_table_join_planning.out | 24 ++-- .../multi_large_table_join_planning_0.out | 24 ++-- 3 files changed, 95 insertions(+), 57 deletions(-) diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index 0a8f6681c..872496853 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -32,6 +32,7 @@ #include "storage/fd.h" #include "utils/builtins.h" #include "utils/hsearch.h" +#include "utils/timestamp.h" int MaxAssignTaskBatchSize = 64; /* maximum number of tasks to assign per round */ @@ -2768,12 +2769,14 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask) { uint64 jobId = jobCleanupTask->jobId; List *taskTrackerList = NIL; - ListCell *taskTrackerCell = NULL; - long sleepInterval = 0; - const int minimumSleepInterval = 150; - + List *remainingTaskTrackerList = NIL; + const long timeoutDuration = 4000; /* milliseconds */ + const long statusCheckInterval = 10000; /* microseconds */ + bool timedOut = false; + TimestampTz startTime = 0; TaskTracker *taskTracker = NULL; HASH_SEQ_STATUS status; + hash_seq_init(&status, taskTrackerHash); /* walk over task trackers and try to issue job clean up requests */ @@ -2821,45 +2824,80 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask) taskTracker = (TaskTracker *) hash_seq_search(&status); } - /* give task trackers time to finish their clean up jobs */ - sleepInterval = Max(minimumSleepInterval, RemoteTaskCheckInterval * 2) * 1000L; - pg_usleep(sleepInterval); + /* record the time when we start waiting for cleanup jobs to be sent */ + startTime = GetCurrentTimestamp(); - /* walk over task trackers to which we sent clean up requests */ - taskTrackerCell = NULL; - foreach(taskTrackerCell, taskTrackerList) + /* + * Walk over task trackers to which we sent clean up requests. Perform + * these checks until it times out. + * + * We want to determine timedOut flag after the loop start to make sure + * we iterate one more time after time out occurs. This is necessary to report + * warning messages for timed out cleanup jobs. + */ + remainingTaskTrackerList = taskTrackerList; + while (list_length(remainingTaskTrackerList) > 0 && !timedOut) { - TaskTracker *taskTracker = (TaskTracker *) lfirst(taskTrackerCell); - int32 connectionId = taskTracker->connectionId; - const char *nodeName = taskTracker->workerName; - uint32 nodePort = taskTracker->workerPort; + List *activeTackTrackerList = remainingTaskTrackerList; + ListCell *activeTaskTrackerCell = NULL; + TimestampTz currentTime = 0; - ResultStatus resultStatus = MultiClientResultStatus(connectionId); - if (resultStatus == CLIENT_RESULT_READY) + remainingTaskTrackerList = NIL; + + pg_usleep(statusCheckInterval); + currentTime = GetCurrentTimestamp(); + timedOut = TimestampDifferenceExceeds(startTime, currentTime, timeoutDuration); + + foreach(activeTaskTrackerCell, activeTackTrackerList) { - QueryStatus queryStatus = MultiClientQueryStatus(connectionId); - if (queryStatus == CLIENT_QUERY_DONE) - { - ereport(DEBUG4, (errmsg("completed cleanup query for job " UINT64_FORMAT - " on node \"%s:%u\"", jobId, nodeName, - nodePort))); + TaskTracker *taskTracker = (TaskTracker *) lfirst(activeTaskTrackerCell); + int32 connectionId = taskTracker->connectionId; + const char *nodeName = taskTracker->workerName; + uint32 nodePort = taskTracker->workerPort; - /* clear connection for future cleanup queries */ - taskTracker->connectionBusy = false; + ResultStatus resultStatus = MultiClientResultStatus(connectionId); + if (resultStatus == CLIENT_RESULT_READY) + { + QueryStatus queryStatus = MultiClientQueryStatus(connectionId); + if (queryStatus == CLIENT_QUERY_DONE) + { + ereport(DEBUG4, (errmsg("completed cleanup query for job " + UINT64_FORMAT, jobId))); + + /* clear connection for future cleanup queries */ + taskTracker->connectionBusy = false; + } + else if (timedOut) + { + ereport(WARNING, (errmsg("could not receive response for cleanup " + "query status for job " UINT64_FORMAT " " + "on node \"%s:%u\" with status %d", jobId, + nodeName, nodePort, (int) queryStatus), + errhint("Manually clean job resources on node " + "\"%s:%u\" by running \"%s\" ", nodeName, + nodePort, jobCleanupTask->queryString))); + } + else + { + remainingTaskTrackerList = lappend(remainingTaskTrackerList, + taskTracker); + } + } + else if (timedOut) + { + ereport(WARNING, (errmsg("could not receive response for cleanup query " + "result for job " UINT64_FORMAT " on node " + "\"%s:%u\" with status %d", jobId, nodeName, + nodePort, (int) resultStatus), + errhint("Manually clean job resources on node " + "\"%s:%u\" by running \"%s\" ", nodeName, + nodePort, jobCleanupTask->queryString))); } else { - ereport(WARNING, (errmsg("could not receive response for cleanup query " - "for job " UINT64_FORMAT " on node \"%s:%u\"", - jobId, nodeName, nodePort))); + remainingTaskTrackerList = lappend(remainingTaskTrackerList, taskTracker); } } - else - { - ereport(WARNING, (errmsg("could not receive response for cleanup query " - "for job " UINT64_FORMAT " on node \"%s:%u\"", - jobId, nodeName, nodePort))); - } } } diff --git a/src/test/regress/expected/multi_large_table_join_planning.out b/src/test/regress/expected/multi_large_table_join_planning.out index 70dfe85a0..0d74a72c3 100644 --- a/src/test/regress/expected/multi_large_table_join_planning.out +++ b/src/test/regress/expected/multi_large_table_join_planning.out @@ -104,12 +104,12 @@ DETAIL: Creating dependency on merge taskId 13 DEBUG: assigned task 9 to node localhost:57637 DEBUG: assigned task 6 to node localhost:57638 DEBUG: assigned task 3 to node localhost:57637 -DEBUG: completed cleanup query for job 1252 on node "localhost:57638" -DEBUG: completed cleanup query for job 1252 on node "localhost:57637" -DEBUG: completed cleanup query for job 1251 on node "localhost:57638" -DEBUG: completed cleanup query for job 1251 on node "localhost:57637" -DEBUG: completed cleanup query for job 1250 on node "localhost:57638" -DEBUG: completed cleanup query for job 1250 on node "localhost:57637" +DEBUG: completed cleanup query for job 1252 +DEBUG: completed cleanup query for job 1252 +DEBUG: completed cleanup query for job 1251 +DEBUG: completed cleanup query for job 1251 +DEBUG: completed cleanup query for job 1250 +DEBUG: completed cleanup query for job 1250 DEBUG: CommitTransactionCommand l_partkey | o_orderkey | count -----------+------------+------- @@ -221,12 +221,12 @@ DEBUG: assigned task 3 to node localhost:57638 DEBUG: assigned task 6 to node localhost:57637 DEBUG: assigned task 9 to node localhost:57638 DEBUG: assigned task 12 to node localhost:57637 -DEBUG: completed cleanup query for job 1255 on node "localhost:57638" -DEBUG: completed cleanup query for job 1255 on node "localhost:57637" -DEBUG: completed cleanup query for job 1253 on node "localhost:57638" -DEBUG: completed cleanup query for job 1253 on node "localhost:57637" -DEBUG: completed cleanup query for job 1254 on node "localhost:57638" -DEBUG: completed cleanup query for job 1254 on node "localhost:57637" +DEBUG: completed cleanup query for job 1255 +DEBUG: completed cleanup query for job 1255 +DEBUG: completed cleanup query for job 1253 +DEBUG: completed cleanup query for job 1253 +DEBUG: completed cleanup query for job 1254 +DEBUG: completed cleanup query for job 1254 DEBUG: CommitTransactionCommand l_partkey | o_orderkey | count -----------+------------+------- diff --git a/src/test/regress/expected/multi_large_table_join_planning_0.out b/src/test/regress/expected/multi_large_table_join_planning_0.out index 65c0b8ca0..11fd75a49 100644 --- a/src/test/regress/expected/multi_large_table_join_planning_0.out +++ b/src/test/regress/expected/multi_large_table_join_planning_0.out @@ -104,12 +104,12 @@ DETAIL: Creating dependency on merge taskId 13 DEBUG: assigned task 9 to node localhost:57637 DEBUG: assigned task 6 to node localhost:57638 DEBUG: assigned task 3 to node localhost:57637 -DEBUG: completed cleanup query for job 1252 on node "localhost:57638" -DEBUG: completed cleanup query for job 1252 on node "localhost:57637" -DEBUG: completed cleanup query for job 1251 on node "localhost:57638" -DEBUG: completed cleanup query for job 1251 on node "localhost:57637" -DEBUG: completed cleanup query for job 1250 on node "localhost:57638" -DEBUG: completed cleanup query for job 1250 on node "localhost:57637" +DEBUG: completed cleanup query for job 1252 +DEBUG: completed cleanup query for job 1252 +DEBUG: completed cleanup query for job 1251 +DEBUG: completed cleanup query for job 1251 +DEBUG: completed cleanup query for job 1250 +DEBUG: completed cleanup query for job 1250 DEBUG: CommitTransactionCommand l_partkey | o_orderkey | count -----------+------------+------- @@ -221,12 +221,12 @@ DEBUG: assigned task 3 to node localhost:57638 DEBUG: assigned task 6 to node localhost:57637 DEBUG: assigned task 9 to node localhost:57638 DEBUG: assigned task 12 to node localhost:57637 -DEBUG: completed cleanup query for job 1255 on node "localhost:57638" -DEBUG: completed cleanup query for job 1255 on node "localhost:57637" -DEBUG: completed cleanup query for job 1253 on node "localhost:57638" -DEBUG: completed cleanup query for job 1253 on node "localhost:57637" -DEBUG: completed cleanup query for job 1254 on node "localhost:57638" -DEBUG: completed cleanup query for job 1254 on node "localhost:57637" +DEBUG: completed cleanup query for job 1255 +DEBUG: completed cleanup query for job 1255 +DEBUG: completed cleanup query for job 1253 +DEBUG: completed cleanup query for job 1253 +DEBUG: completed cleanup query for job 1254 +DEBUG: completed cleanup query for job 1254 DEBUG: CommitTransactionCommand l_partkey | o_orderkey | count -----------+------------+-------