diff --git a/src/backend/distributed/executor/intermediate_results.c b/src/backend/distributed/executor/intermediate_results.c index 49e09c3a2..eadccdeec 100644 --- a/src/backend/distributed/executor/intermediate_results.c +++ b/src/backend/distributed/executor/intermediate_results.c @@ -558,11 +558,10 @@ char * CreateIntermediateResultsDirectory(void) { char *resultDirectory = IntermediateResultsDirectory(); - int makeOK = 0; if (!CreatedResultsDirectory) { - makeOK = mkdir(resultDirectory, S_IRWXU); + int makeOK = mkdir(resultDirectory, S_IRWXU); if (makeOK != 0) { if (errno == EEXIST) diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index b2b43622f..459406e45 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -2844,8 +2844,8 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask) else if (timedOut) { ereport(WARNING, (errmsg("could not receive response for cleanup " - "query status for job " UINT64_FORMAT " " - "on node \"%s:%u\" with status %d", + "query status for job " UINT64_FORMAT + " on node \"%s:%u\" with status %d", jobId, nodeName, nodePort, (int) queryStatus), errhint("Manually clean job resources on node " @@ -2863,8 +2863,8 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask) { /* CLIENT_RESULT_UNAVAILABLE is returned if the connection failed somehow */ ereport(WARNING, (errmsg("could not receive response for cleanup query " - "result for job " UINT64_FORMAT " on node " - "\"%s:%u\" with status %d", + "result for job " UINT64_FORMAT + " on node \"%s:%u\" with status %d", jobId, nodeName, nodePort, (int) resultStatus), errhint("Manually clean job resources on node " diff --git a/src/backend/distributed/worker/task_tracker.c b/src/backend/distributed/worker/task_tracker.c index cc1abdfb0..5d7bbe4fe 100644 --- a/src/backend/distributed/worker/task_tracker.c +++ b/src/backend/distributed/worker/task_tracker.c @@ -1021,6 +1021,12 @@ ManageWorkerTask(WorkerTask *workerTask, HTAB *WorkerTasksHash) workerTask->connectionId = INVALID_CONNECTION_ID; } + if (workerTask->taskId == JOB_CLEANUP_TASK_ID) + { + StringInfo jobDirectoryName = JobDirectoryName(workerTask->jobId); + CitusRemoveDirectory(jobDirectoryName->data); + } + workerTask->taskStatus = TASK_TO_REMOVE; break; } diff --git a/src/backend/distributed/worker/task_tracker_protocol.c b/src/backend/distributed/worker/task_tracker_protocol.c index bb4e439ca..bfb12dd5d 100644 --- a/src/backend/distributed/worker/task_tracker_protocol.c +++ b/src/backend/distributed/worker/task_tracker_protocol.c @@ -188,6 +188,7 @@ task_tracker_cleanup_job(PG_FUNCTION_ARGS) CheckCitusVersion(ERROR); StringInfo jobSchemaName = JobSchemaName(jobId); + StringInfo jobDirectoryName = JobDirectoryName(jobId); /* * We'll keep this lock for a while, but that's ok because nothing @@ -230,7 +231,6 @@ task_tracker_cleanup_job(PG_FUNCTION_ARGS) * schema drop call can block if another process is creating the schema or * writing to a table within the schema. */ - StringInfo jobDirectoryName = JobDirectoryName(jobId); CitusRemoveDirectory(jobDirectoryName->data); RemoveJobSchema(jobSchemaName); @@ -450,7 +450,7 @@ CleanupTask(WorkerTask *workerTask) return; } - /* remove the task from the shared hash */ + /* remove task from the shared hash */ WorkerTask *taskRemoved = hash_search(TaskTrackerTaskHash, hashKey, HASH_REMOVE, NULL); if (taskRemoved == NULL) diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 3687f3d89..9f62924e1 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -326,6 +326,5 @@ test: multi_deparse_function multi_deparse_procedure # --------- # test that no tests leaked intermediate results. This should always be last -# Causes random test failures so commented out for now # --------- -# test: ensure_no_intermediate_data_leak +test: ensure_no_intermediate_data_leak