diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index fd0638741..e024c1299 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -32,6 +32,7 @@ #include "storage/fd.h" #include "utils/builtins.h" #include "utils/hsearch.h" +#include "utils/timestamp.h" int MaxAssignTaskBatchSize = 64; /* maximum number of tasks to assign per round */ @@ -2765,12 +2766,14 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask) { uint64 jobId = jobCleanupTask->jobId; List *taskTrackerList = NIL; - ListCell *taskTrackerCell = NULL; - long sleepInterval = 0; - const int minimumSleepInterval = 150; - + List *remainingTaskTrackerList = NIL; + const long timeoutDuration = 4000; /* milliseconds */ + const long statusCheckInterval = 10000; /* microseconds */ + bool timedOut = false; + TimestampTz startTime = 0; TaskTracker *taskTracker = NULL; HASH_SEQ_STATUS status; + hash_seq_init(&status, taskTrackerHash); /* walk over task trackers and try to issue job clean up requests */ @@ -2818,45 +2821,80 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask) taskTracker = (TaskTracker *) hash_seq_search(&status); } - /* give task trackers time to finish their clean up jobs */ - sleepInterval = Max(minimumSleepInterval, RemoteTaskCheckInterval * 2) * 1000L; - pg_usleep(sleepInterval); + /* record the time when we start waiting for cleanup jobs to be sent */ + startTime = GetCurrentTimestamp(); - /* walk over task trackers to which we sent clean up requests */ - taskTrackerCell = NULL; - foreach(taskTrackerCell, taskTrackerList) + /* + * Walk over task trackers to which we sent clean up requests. Perform + * these checks until it times out. + * + * We want to determine timedOut flag after the loop start to make sure + * we iterate one more time after time out occurs. This is necessary to report + * warning messages for timed out cleanup jobs. + */ + remainingTaskTrackerList = taskTrackerList; + while (list_length(remainingTaskTrackerList) > 0 && !timedOut) { - TaskTracker *taskTracker = (TaskTracker *) lfirst(taskTrackerCell); - int32 connectionId = taskTracker->connectionId; - const char *nodeName = taskTracker->workerName; - uint32 nodePort = taskTracker->workerPort; + List *activeTackTrackerList = remainingTaskTrackerList; + ListCell *activeTaskTrackerCell = NULL; + TimestampTz currentTime = 0; - ResultStatus resultStatus = MultiClientResultStatus(connectionId); - if (resultStatus == CLIENT_RESULT_READY) + remainingTaskTrackerList = NIL; + + pg_usleep(statusCheckInterval); + currentTime = GetCurrentTimestamp(); + timedOut = TimestampDifferenceExceeds(startTime, currentTime, timeoutDuration); + + foreach(activeTaskTrackerCell, activeTackTrackerList) { - QueryStatus queryStatus = MultiClientQueryStatus(connectionId); - if (queryStatus == CLIENT_QUERY_DONE) - { - ereport(DEBUG4, (errmsg("completed cleanup query for job " UINT64_FORMAT - " on node \"%s:%u\"", jobId, nodeName, - nodePort))); + TaskTracker *taskTracker = (TaskTracker *) lfirst(activeTaskTrackerCell); + int32 connectionId = taskTracker->connectionId; + const char *nodeName = taskTracker->workerName; + uint32 nodePort = taskTracker->workerPort; - /* clear connection for future cleanup queries */ - taskTracker->connectionBusy = false; + ResultStatus resultStatus = MultiClientResultStatus(connectionId); + if (resultStatus == CLIENT_RESULT_READY) + { + QueryStatus queryStatus = MultiClientQueryStatus(connectionId); + if (queryStatus == CLIENT_QUERY_DONE) + { + ereport(DEBUG4, (errmsg("completed cleanup query for job " + UINT64_FORMAT, jobId))); + + /* clear connection for future cleanup queries */ + taskTracker->connectionBusy = false; + } + else if (timedOut) + { + ereport(WARNING, (errmsg("could not receive response for cleanup " + "query status for job " UINT64_FORMAT " " + "on node \"%s:%u\" with status %d", jobId, + nodeName, nodePort, (int) queryStatus), + errhint("Manually clean job resources on node " + "\"%s:%u\" by running \"%s\" ", nodeName, + nodePort, jobCleanupTask->queryString))); + } + else + { + remainingTaskTrackerList = lappend(remainingTaskTrackerList, + taskTracker); + } + } + else if (timedOut) + { + ereport(WARNING, (errmsg("could not receive response for cleanup query " + "result for job " UINT64_FORMAT " on node " + "\"%s:%u\" with status %d", jobId, nodeName, + nodePort, (int) resultStatus), + errhint("Manually clean job resources on node " + "\"%s:%u\" by running \"%s\" ", nodeName, + nodePort, jobCleanupTask->queryString))); } else { - ereport(WARNING, (errmsg("could not receive response for cleanup query " - "for job " UINT64_FORMAT " on node \"%s:%u\"", - jobId, nodeName, nodePort))); + remainingTaskTrackerList = lappend(remainingTaskTrackerList, taskTracker); } } - else - { - ereport(WARNING, (errmsg("could not receive response for cleanup query " - "for job " UINT64_FORMAT " on node \"%s:%u\"", - jobId, nodeName, nodePort))); - } } } diff --git a/src/test/regress/expected/multi_large_table_join_planning.out b/src/test/regress/expected/multi_large_table_join_planning.out index 413ac9ca4..2fec591d8 100644 --- a/src/test/regress/expected/multi_large_table_join_planning.out +++ b/src/test/regress/expected/multi_large_table_join_planning.out @@ -102,12 +102,12 @@ DETAIL: Creating dependency on merge taskId 13 DEBUG: assigned task 6 to node localhost:57637 DEBUG: assigned task 3 to node localhost:57638 DEBUG: assigned task 9 to node localhost:57637 -DEBUG: completed cleanup query for job 1252 on node "localhost:57638" -DEBUG: completed cleanup query for job 1252 on node "localhost:57637" -DEBUG: completed cleanup query for job 1251 on node "localhost:57638" -DEBUG: completed cleanup query for job 1251 on node "localhost:57637" -DEBUG: completed cleanup query for job 1250 on node "localhost:57638" -DEBUG: completed cleanup query for job 1250 on node "localhost:57637" +DEBUG: completed cleanup query for job 1252 +DEBUG: completed cleanup query for job 1252 +DEBUG: completed cleanup query for job 1251 +DEBUG: completed cleanup query for job 1251 +DEBUG: completed cleanup query for job 1250 +DEBUG: completed cleanup query for job 1250 DEBUG: CommitTransactionCommand l_partkey | o_orderkey | count -----------+------------+------- @@ -219,12 +219,12 @@ DEBUG: assigned task 3 to node localhost:57638 DEBUG: assigned task 6 to node localhost:57637 DEBUG: assigned task 9 to node localhost:57638 DEBUG: assigned task 12 to node localhost:57637 -DEBUG: completed cleanup query for job 1255 on node "localhost:57638" -DEBUG: completed cleanup query for job 1255 on node "localhost:57637" -DEBUG: completed cleanup query for job 1253 on node "localhost:57638" -DEBUG: completed cleanup query for job 1253 on node "localhost:57637" -DEBUG: completed cleanup query for job 1254 on node "localhost:57638" -DEBUG: completed cleanup query for job 1254 on node "localhost:57637" +DEBUG: completed cleanup query for job 1255 +DEBUG: completed cleanup query for job 1255 +DEBUG: completed cleanup query for job 1253 +DEBUG: completed cleanup query for job 1253 +DEBUG: completed cleanup query for job 1254 +DEBUG: completed cleanup query for job 1254 DEBUG: CommitTransactionCommand l_partkey | o_orderkey | count -----------+------------+------- diff --git a/src/test/regress/expected/multi_large_table_join_planning_0.out b/src/test/regress/expected/multi_large_table_join_planning_0.out index f254c40ca..f553be792 100644 --- a/src/test/regress/expected/multi_large_table_join_planning_0.out +++ b/src/test/regress/expected/multi_large_table_join_planning_0.out @@ -102,12 +102,12 @@ DETAIL: Creating dependency on merge taskId 13 DEBUG: assigned task 6 to node localhost:57637 DEBUG: assigned task 3 to node localhost:57638 DEBUG: assigned task 9 to node localhost:57637 -DEBUG: completed cleanup query for job 1252 on node "localhost:57638" -DEBUG: completed cleanup query for job 1252 on node "localhost:57637" -DEBUG: completed cleanup query for job 1251 on node "localhost:57638" -DEBUG: completed cleanup query for job 1251 on node "localhost:57637" -DEBUG: completed cleanup query for job 1250 on node "localhost:57638" -DEBUG: completed cleanup query for job 1250 on node "localhost:57637" +DEBUG: completed cleanup query for job 1252 +DEBUG: completed cleanup query for job 1252 +DEBUG: completed cleanup query for job 1251 +DEBUG: completed cleanup query for job 1251 +DEBUG: completed cleanup query for job 1250 +DEBUG: completed cleanup query for job 1250 DEBUG: CommitTransactionCommand l_partkey | o_orderkey | count -----------+------------+------- @@ -219,12 +219,12 @@ DEBUG: assigned task 3 to node localhost:57638 DEBUG: assigned task 6 to node localhost:57637 DEBUG: assigned task 9 to node localhost:57638 DEBUG: assigned task 12 to node localhost:57637 -DEBUG: completed cleanup query for job 1255 on node "localhost:57638" -DEBUG: completed cleanup query for job 1255 on node "localhost:57637" -DEBUG: completed cleanup query for job 1253 on node "localhost:57638" -DEBUG: completed cleanup query for job 1253 on node "localhost:57637" -DEBUG: completed cleanup query for job 1254 on node "localhost:57638" -DEBUG: completed cleanup query for job 1254 on node "localhost:57637" +DEBUG: completed cleanup query for job 1255 +DEBUG: completed cleanup query for job 1255 +DEBUG: completed cleanup query for job 1253 +DEBUG: completed cleanup query for job 1253 +DEBUG: completed cleanup query for job 1254 +DEBUG: completed cleanup query for job 1254 DEBUG: CommitTransactionCommand l_partkey | o_orderkey | count -----------+------------+-------