From 84a500ffc69899e09541109813db38d3371ce5fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 24 Jan 2020 00:53:57 +0000 Subject: [PATCH] CitusRemoveDirectory: loop when directory is not empty Sometimes during errors workers will create files while we're deleting intermediate directories example: DEBUG: could not remove file "base/pgsql_job_cache/10_0_431": Directory not empty DETAIL: WARNING from localhost:57637 --- .../executor/intermediate_results.c | 5 +- .../executor/multi_server_executor.c | 2 +- src/backend/distributed/test/file_utils.c | 2 +- src/backend/distributed/worker/task_tracker.c | 2 +- .../worker/task_tracker_protocol.c | 2 +- .../worker/worker_merge_protocol.c | 2 +- .../worker/worker_partition_protocol.c | 136 ++++++++++-------- src/include/distributed/worker_protocol.h | 2 +- 8 files changed, 81 insertions(+), 72 deletions(-) diff --git a/src/backend/distributed/executor/intermediate_results.c b/src/backend/distributed/executor/intermediate_results.c index c1b06e30b..49e09c3a2 100644 --- a/src/backend/distributed/executor/intermediate_results.c +++ b/src/backend/distributed/executor/intermediate_results.c @@ -667,10 +667,7 @@ RemoveIntermediateResultsDirectory(void) { if (CreatedResultsDirectory) { - StringInfo resultsDirectory = makeStringInfo(); - appendStringInfoString(resultsDirectory, IntermediateResultsDirectory()); - - CitusRemoveDirectory(resultsDirectory); + CitusRemoveDirectory(IntermediateResultsDirectory()); CreatedResultsDirectory = false; } diff --git a/src/backend/distributed/executor/multi_server_executor.c b/src/backend/distributed/executor/multi_server_executor.c index 188d1668b..3758ff476 100644 --- a/src/backend/distributed/executor/multi_server_executor.c +++ b/src/backend/distributed/executor/multi_server_executor.c @@ -187,7 +187,7 @@ void RemoveJobDirectory(uint64 jobId) { StringInfo jobDirectoryName = MasterJobDirectoryName(jobId); - CitusRemoveDirectory(jobDirectoryName); + CitusRemoveDirectory(jobDirectoryName->data); ResourceOwnerForgetJobDirectory(CurrentResourceOwner, jobId); } diff --git a/src/backend/distributed/test/file_utils.c b/src/backend/distributed/test/file_utils.c index 02ecaa0bd..a6576ac55 100644 --- a/src/backend/distributed/test/file_utils.c +++ b/src/backend/distributed/test/file_utils.c @@ -23,7 +23,7 @@ citus_rm_job_directory(PG_FUNCTION_ARGS) appendStringInfo(jobCacheDirectory, "base/%s/%s%0*" INT64_MODIFIER "u", PG_JOB_CACHE_DIR, JOB_DIRECTORY_PREFIX, MIN_JOB_DIRNAME_WIDTH, jobId); - CitusRemoveDirectory(jobCacheDirectory); + CitusRemoveDirectory(jobCacheDirectory->data); FreeStringInfo(jobCacheDirectory); PG_RETURN_VOID(); diff --git a/src/backend/distributed/worker/task_tracker.c b/src/backend/distributed/worker/task_tracker.c index 1105e3187..cc1abdfb0 100644 --- a/src/backend/distributed/worker/task_tracker.c +++ b/src/backend/distributed/worker/task_tracker.c @@ -344,7 +344,7 @@ TrackerCleanupJobDirectories(void) StringInfo jobCacheDirectory = makeStringInfo(); appendStringInfo(jobCacheDirectory, "base/%s", PG_JOB_CACHE_DIR); - CitusRemoveDirectory(jobCacheDirectory); + CitusRemoveDirectory(jobCacheDirectory->data); CitusCreateDirectory(jobCacheDirectory); FreeStringInfo(jobCacheDirectory); diff --git a/src/backend/distributed/worker/task_tracker_protocol.c b/src/backend/distributed/worker/task_tracker_protocol.c index d13fede50..44ce69ae4 100644 --- a/src/backend/distributed/worker/task_tracker_protocol.c +++ b/src/backend/distributed/worker/task_tracker_protocol.c @@ -231,7 +231,7 @@ task_tracker_cleanup_job(PG_FUNCTION_ARGS) * writing to a table within the schema. */ StringInfo jobDirectoryName = JobDirectoryName(jobId); - CitusRemoveDirectory(jobDirectoryName); + CitusRemoveDirectory(jobDirectoryName->data); RemoveJobSchema(jobSchemaName); UnlockJobResource(jobId, AccessExclusiveLock); diff --git a/src/backend/distributed/worker/worker_merge_protocol.c b/src/backend/distributed/worker/worker_merge_protocol.c index 43a4dc042..98b1a3f86 100644 --- a/src/backend/distributed/worker/worker_merge_protocol.c +++ b/src/backend/distributed/worker/worker_merge_protocol.c @@ -97,7 +97,7 @@ worker_repartition_cleanup(PG_FUNCTION_ARGS) Oid schemaId = get_namespace_oid(jobSchemaName->data, false); EnsureSchemaOwner(schemaId); - CitusRemoveDirectory(jobDirectoryName); + CitusRemoveDirectory(jobDirectoryName->data); RemoveJobSchema(jobSchemaName); PG_RETURN_VOID(); } diff --git a/src/backend/distributed/worker/worker_partition_protocol.c b/src/backend/distributed/worker/worker_partition_protocol.c index e46cbbd73..f930006c1 100644 --- a/src/backend/distributed/worker/worker_partition_protocol.c +++ b/src/backend/distributed/worker/worker_partition_protocol.c @@ -85,7 +85,7 @@ static uint32 RangePartitionId(Datum partitionValue, Oid partitionCollation, static uint32 HashPartitionId(Datum partitionValue, Oid partitionCollation, const void *context); static StringInfo UserPartitionFilename(StringInfo directoryName, uint32 partitionId); -static bool FileIsLink(char *filename, struct stat filestat); +static bool FileIsLink(const char *filename, struct stat filestat); /* exports for SQL callable functions */ @@ -158,7 +158,7 @@ worker_range_partition_table(PG_FUNCTION_ARGS) /* close partition files and atomically rename (commit) them */ ClosePartitionFiles(partitionFileArray, fileCount); - CitusRemoveDirectory(taskDirectory); + CitusRemoveDirectory(taskDirectory->data); RenameDirectory(taskAttemptDirectory, taskDirectory); PG_RETURN_VOID(); @@ -232,7 +232,7 @@ worker_hash_partition_table(PG_FUNCTION_ARGS) /* close partition files and atomically rename (commit) them */ ClosePartitionFiles(partitionFileArray, fileCount); - CitusRemoveDirectory(taskDirectory); + CitusRemoveDirectory(taskDirectory->data); RenameDirectory(taskAttemptDirectory, taskDirectory); PG_RETURN_VOID(); @@ -698,7 +698,7 @@ FileIsLink(char *filename, struct stat filestat) #else static bool -FileIsLink(char *filename, struct stat filestat) +FileIsLink(const char *filename, struct stat filestat) { return S_ISLNK(filestat.st_mode); } @@ -714,79 +714,91 @@ FileIsLink(char *filename, struct stat filestat) * system library's remove_all() method. */ void -CitusRemoveDirectory(StringInfo filename) +CitusRemoveDirectory(const char *filename) { - struct stat fileStat; - int removed = 0; - - int fileStated = stat(filename->data, &fileStat); - if (fileStated < 0) + /* files may be added during execution, loop when that occurs */ + while (true) { - if (errno == ENOENT) - { - return; /* if file does not exist, return */ - } - else - { - ereport(ERROR, (errcode_for_file_access(), - errmsg("could not stat file \"%s\": %m", filename->data))); - } - } + struct stat fileStat; + int removed = 0; - /* - * If this is a directory, iterate over all its contents and for each - * content, recurse into this function. Also, make sure that we do not - * recurse into symbolic links. - */ - if (S_ISDIR(fileStat.st_mode) && !FileIsLink(filename->data, fileStat)) - { - const char *directoryName = filename->data; - - DIR *directory = AllocateDir(directoryName); - if (directory == NULL) + int fileStated = stat(filename, &fileStat); + if (fileStated < 0) { - ereport(ERROR, (errcode_for_file_access(), - errmsg("could not open directory \"%s\": %m", - directoryName))); - } - - struct dirent *directoryEntry = ReadDir(directory, directoryName); - for (; directoryEntry != NULL; directoryEntry = ReadDir(directory, directoryName)) - { - const char *baseFilename = directoryEntry->d_name; - - /* if system file, skip it */ - if (strncmp(baseFilename, ".", MAXPGPATH) == 0 || - strncmp(baseFilename, "..", MAXPGPATH) == 0) + if (errno == ENOENT) { - continue; + return; /* if file does not exist, return */ + } + else + { + ereport(ERROR, (errcode_for_file_access(), + errmsg("could not stat file \"%s\": %m", filename))); + } + } + + /* + * If this is a directory, iterate over all its contents and for each + * content, recurse into this function. Also, make sure that we do not + * recurse into symbolic links. + */ + if (S_ISDIR(fileStat.st_mode) && !FileIsLink(filename, fileStat)) + { + const char *directoryName = filename; + + DIR *directory = AllocateDir(directoryName); + if (directory == NULL) + { + ereport(ERROR, (errcode_for_file_access(), + errmsg("could not open directory \"%s\": %m", + directoryName))); } StringInfo fullFilename = makeStringInfo(); - appendStringInfo(fullFilename, "%s/%s", directoryName, baseFilename); + struct dirent *directoryEntry = ReadDir(directory, directoryName); + for (; directoryEntry != NULL; directoryEntry = ReadDir(directory, + directoryName)) + { + const char *baseFilename = directoryEntry->d_name; - CitusRemoveDirectory(fullFilename); + /* if system file, skip it */ + if (strncmp(baseFilename, ".", MAXPGPATH) == 0 || + strncmp(baseFilename, "..", MAXPGPATH) == 0) + { + continue; + } + + resetStringInfo(fullFilename); + appendStringInfo(fullFilename, "%s/%s", directoryName, baseFilename); + + CitusRemoveDirectory(fullFilename->data); + } FreeStringInfo(fullFilename); + FreeDir(directory); } - FreeDir(directory); - } + /* we now have an empty directory or a regular file, remove it */ + if (S_ISDIR(fileStat.st_mode)) + { + removed = rmdir(filename); - /* we now have an empty directory or a regular file, remove it */ - if (S_ISDIR(fileStat.st_mode)) - { - removed = rmdir(filename->data); - } - else - { - removed = unlink(filename->data); - } + if (errno == ENOTEMPTY || errno == EEXIST) + { + continue; + } + } + else + { + removed = unlink(filename); + } - if (removed != 0 && errno != ENOENT) - { - ereport(ERROR, (errcode_for_file_access(), - errmsg("could not remove file \"%s\": %m", filename->data))); + if (removed != 0 && errno != ENOENT) + { + ereport(ERROR, (errcode_for_file_access(), + errmsg("could not remove file \"%s\": %m", filename))); + } + + return; } } diff --git a/src/include/distributed/worker_protocol.h b/src/include/distributed/worker_protocol.h index da29535b7..3e1f7c426 100644 --- a/src/include/distributed/worker_protocol.h +++ b/src/include/distributed/worker_protocol.h @@ -117,7 +117,7 @@ extern bool CacheDirectoryElement(const char *filename); extern bool JobDirectoryElement(const char *filename); extern bool DirectoryExists(StringInfo directoryName); extern void CitusCreateDirectory(StringInfo directoryName); -extern void CitusRemoveDirectory(StringInfo filename); +extern void CitusRemoveDirectory(const char *filename); extern StringInfo InitTaskDirectory(uint64 jobId, uint32 taskId); extern void RemoveJobSchema(StringInfo schemaName); extern Datum * DeconstructArrayObject(ArrayType *arrayObject);