mirror of https://github.com/citusdata/citus.git
Merge pull request #3453 from citusdata/fix-stray-files
Fix multi_task_string_size sometimes leaking intermediate filespull/3539/head
commit
0c4f9e230d
|
@ -558,11 +558,10 @@ char *
|
||||||
CreateIntermediateResultsDirectory(void)
|
CreateIntermediateResultsDirectory(void)
|
||||||
{
|
{
|
||||||
char *resultDirectory = IntermediateResultsDirectory();
|
char *resultDirectory = IntermediateResultsDirectory();
|
||||||
int makeOK = 0;
|
|
||||||
|
|
||||||
if (!CreatedResultsDirectory)
|
if (!CreatedResultsDirectory)
|
||||||
{
|
{
|
||||||
makeOK = mkdir(resultDirectory, S_IRWXU);
|
int makeOK = mkdir(resultDirectory, S_IRWXU);
|
||||||
if (makeOK != 0)
|
if (makeOK != 0)
|
||||||
{
|
{
|
||||||
if (errno == EEXIST)
|
if (errno == EEXIST)
|
||||||
|
|
|
@ -2844,8 +2844,8 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask)
|
||||||
else if (timedOut)
|
else if (timedOut)
|
||||||
{
|
{
|
||||||
ereport(WARNING, (errmsg("could not receive response for cleanup "
|
ereport(WARNING, (errmsg("could not receive response for cleanup "
|
||||||
"query status for job " UINT64_FORMAT " "
|
"query status for job " UINT64_FORMAT
|
||||||
"on node \"%s:%u\" with status %d",
|
" on node \"%s:%u\" with status %d",
|
||||||
jobId,
|
jobId,
|
||||||
nodeName, nodePort, (int) queryStatus),
|
nodeName, nodePort, (int) queryStatus),
|
||||||
errhint("Manually clean job resources on node "
|
errhint("Manually clean job resources on node "
|
||||||
|
@ -2863,8 +2863,8 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask)
|
||||||
{
|
{
|
||||||
/* CLIENT_RESULT_UNAVAILABLE is returned if the connection failed somehow */
|
/* CLIENT_RESULT_UNAVAILABLE is returned if the connection failed somehow */
|
||||||
ereport(WARNING, (errmsg("could not receive response for cleanup query "
|
ereport(WARNING, (errmsg("could not receive response for cleanup query "
|
||||||
"result for job " UINT64_FORMAT " on node "
|
"result for job " UINT64_FORMAT
|
||||||
"\"%s:%u\" with status %d",
|
" on node \"%s:%u\" with status %d",
|
||||||
jobId, nodeName,
|
jobId, nodeName,
|
||||||
nodePort, (int) resultStatus),
|
nodePort, (int) resultStatus),
|
||||||
errhint("Manually clean job resources on node "
|
errhint("Manually clean job resources on node "
|
||||||
|
|
|
@ -1021,6 +1021,12 @@ ManageWorkerTask(WorkerTask *workerTask, HTAB *WorkerTasksHash)
|
||||||
workerTask->connectionId = INVALID_CONNECTION_ID;
|
workerTask->connectionId = INVALID_CONNECTION_ID;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (workerTask->taskId == JOB_CLEANUP_TASK_ID)
|
||||||
|
{
|
||||||
|
StringInfo jobDirectoryName = JobDirectoryName(workerTask->jobId);
|
||||||
|
CitusRemoveDirectory(jobDirectoryName->data);
|
||||||
|
}
|
||||||
|
|
||||||
workerTask->taskStatus = TASK_TO_REMOVE;
|
workerTask->taskStatus = TASK_TO_REMOVE;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -188,6 +188,7 @@ task_tracker_cleanup_job(PG_FUNCTION_ARGS)
|
||||||
CheckCitusVersion(ERROR);
|
CheckCitusVersion(ERROR);
|
||||||
|
|
||||||
StringInfo jobSchemaName = JobSchemaName(jobId);
|
StringInfo jobSchemaName = JobSchemaName(jobId);
|
||||||
|
StringInfo jobDirectoryName = JobDirectoryName(jobId);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We'll keep this lock for a while, but that's ok because nothing
|
* We'll keep this lock for a while, but that's ok because nothing
|
||||||
|
@ -230,7 +231,6 @@ task_tracker_cleanup_job(PG_FUNCTION_ARGS)
|
||||||
* schema drop call can block if another process is creating the schema or
|
* schema drop call can block if another process is creating the schema or
|
||||||
* writing to a table within the schema.
|
* writing to a table within the schema.
|
||||||
*/
|
*/
|
||||||
StringInfo jobDirectoryName = JobDirectoryName(jobId);
|
|
||||||
CitusRemoveDirectory(jobDirectoryName->data);
|
CitusRemoveDirectory(jobDirectoryName->data);
|
||||||
|
|
||||||
RemoveJobSchema(jobSchemaName);
|
RemoveJobSchema(jobSchemaName);
|
||||||
|
@ -450,7 +450,7 @@ CleanupTask(WorkerTask *workerTask)
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* remove the task from the shared hash */
|
/* remove task from the shared hash */
|
||||||
WorkerTask *taskRemoved = hash_search(TaskTrackerTaskHash, hashKey, HASH_REMOVE,
|
WorkerTask *taskRemoved = hash_search(TaskTrackerTaskHash, hashKey, HASH_REMOVE,
|
||||||
NULL);
|
NULL);
|
||||||
if (taskRemoved == NULL)
|
if (taskRemoved == NULL)
|
||||||
|
|
|
@ -326,6 +326,5 @@ test: multi_deparse_function multi_deparse_procedure
|
||||||
|
|
||||||
# ---------
|
# ---------
|
||||||
# test that no tests leaked intermediate results. This should always be last
|
# test that no tests leaked intermediate results. This should always be last
|
||||||
# Causes random test failures so commented out for now
|
|
||||||
# ---------
|
# ---------
|
||||||
# test: ensure_no_intermediate_data_leak
|
test: ensure_no_intermediate_data_leak
|
||||||
|
|
Loading…
Reference in New Issue