mirror of https://github.com/citusdata/citus.git
Merge pull request #3423 from citusdata/remove-directory-even-if-new-files-added
CitusRemoveDirectory: loop when directory is not emptypull/3447/head
commit
d7204c9696
|
@ -667,10 +667,7 @@ RemoveIntermediateResultsDirectory(void)
|
||||||
{
|
{
|
||||||
if (CreatedResultsDirectory)
|
if (CreatedResultsDirectory)
|
||||||
{
|
{
|
||||||
StringInfo resultsDirectory = makeStringInfo();
|
CitusRemoveDirectory(IntermediateResultsDirectory());
|
||||||
appendStringInfoString(resultsDirectory, IntermediateResultsDirectory());
|
|
||||||
|
|
||||||
CitusRemoveDirectory(resultsDirectory);
|
|
||||||
|
|
||||||
CreatedResultsDirectory = false;
|
CreatedResultsDirectory = false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -187,7 +187,7 @@ void
|
||||||
RemoveJobDirectory(uint64 jobId)
|
RemoveJobDirectory(uint64 jobId)
|
||||||
{
|
{
|
||||||
StringInfo jobDirectoryName = MasterJobDirectoryName(jobId);
|
StringInfo jobDirectoryName = MasterJobDirectoryName(jobId);
|
||||||
CitusRemoveDirectory(jobDirectoryName);
|
CitusRemoveDirectory(jobDirectoryName->data);
|
||||||
|
|
||||||
ResourceOwnerForgetJobDirectory(CurrentResourceOwner, jobId);
|
ResourceOwnerForgetJobDirectory(CurrentResourceOwner, jobId);
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,7 +23,7 @@ citus_rm_job_directory(PG_FUNCTION_ARGS)
|
||||||
appendStringInfo(jobCacheDirectory, "base/%s/%s%0*" INT64_MODIFIER "u",
|
appendStringInfo(jobCacheDirectory, "base/%s/%s%0*" INT64_MODIFIER "u",
|
||||||
PG_JOB_CACHE_DIR, JOB_DIRECTORY_PREFIX,
|
PG_JOB_CACHE_DIR, JOB_DIRECTORY_PREFIX,
|
||||||
MIN_JOB_DIRNAME_WIDTH, jobId);
|
MIN_JOB_DIRNAME_WIDTH, jobId);
|
||||||
CitusRemoveDirectory(jobCacheDirectory);
|
CitusRemoveDirectory(jobCacheDirectory->data);
|
||||||
FreeStringInfo(jobCacheDirectory);
|
FreeStringInfo(jobCacheDirectory);
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
|
|
|
@ -344,7 +344,7 @@ TrackerCleanupJobDirectories(void)
|
||||||
StringInfo jobCacheDirectory = makeStringInfo();
|
StringInfo jobCacheDirectory = makeStringInfo();
|
||||||
appendStringInfo(jobCacheDirectory, "base/%s", PG_JOB_CACHE_DIR);
|
appendStringInfo(jobCacheDirectory, "base/%s", PG_JOB_CACHE_DIR);
|
||||||
|
|
||||||
CitusRemoveDirectory(jobCacheDirectory);
|
CitusRemoveDirectory(jobCacheDirectory->data);
|
||||||
CitusCreateDirectory(jobCacheDirectory);
|
CitusCreateDirectory(jobCacheDirectory);
|
||||||
|
|
||||||
FreeStringInfo(jobCacheDirectory);
|
FreeStringInfo(jobCacheDirectory);
|
||||||
|
|
|
@ -231,7 +231,7 @@ task_tracker_cleanup_job(PG_FUNCTION_ARGS)
|
||||||
* writing to a table within the schema.
|
* writing to a table within the schema.
|
||||||
*/
|
*/
|
||||||
StringInfo jobDirectoryName = JobDirectoryName(jobId);
|
StringInfo jobDirectoryName = JobDirectoryName(jobId);
|
||||||
CitusRemoveDirectory(jobDirectoryName);
|
CitusRemoveDirectory(jobDirectoryName->data);
|
||||||
|
|
||||||
RemoveJobSchema(jobSchemaName);
|
RemoveJobSchema(jobSchemaName);
|
||||||
UnlockJobResource(jobId, AccessExclusiveLock);
|
UnlockJobResource(jobId, AccessExclusiveLock);
|
||||||
|
|
|
@ -97,7 +97,7 @@ worker_repartition_cleanup(PG_FUNCTION_ARGS)
|
||||||
Oid schemaId = get_namespace_oid(jobSchemaName->data, false);
|
Oid schemaId = get_namespace_oid(jobSchemaName->data, false);
|
||||||
|
|
||||||
EnsureSchemaOwner(schemaId);
|
EnsureSchemaOwner(schemaId);
|
||||||
CitusRemoveDirectory(jobDirectoryName);
|
CitusRemoveDirectory(jobDirectoryName->data);
|
||||||
RemoveJobSchema(jobSchemaName);
|
RemoveJobSchema(jobSchemaName);
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
}
|
}
|
||||||
|
|
|
@ -85,7 +85,7 @@ static uint32 RangePartitionId(Datum partitionValue, Oid partitionCollation,
|
||||||
static uint32 HashPartitionId(Datum partitionValue, Oid partitionCollation,
|
static uint32 HashPartitionId(Datum partitionValue, Oid partitionCollation,
|
||||||
const void *context);
|
const void *context);
|
||||||
static StringInfo UserPartitionFilename(StringInfo directoryName, uint32 partitionId);
|
static StringInfo UserPartitionFilename(StringInfo directoryName, uint32 partitionId);
|
||||||
static bool FileIsLink(char *filename, struct stat filestat);
|
static bool FileIsLink(const char *filename, struct stat filestat);
|
||||||
|
|
||||||
|
|
||||||
/* exports for SQL callable functions */
|
/* exports for SQL callable functions */
|
||||||
|
@ -158,7 +158,7 @@ worker_range_partition_table(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
/* close partition files and atomically rename (commit) them */
|
/* close partition files and atomically rename (commit) them */
|
||||||
ClosePartitionFiles(partitionFileArray, fileCount);
|
ClosePartitionFiles(partitionFileArray, fileCount);
|
||||||
CitusRemoveDirectory(taskDirectory);
|
CitusRemoveDirectory(taskDirectory->data);
|
||||||
RenameDirectory(taskAttemptDirectory, taskDirectory);
|
RenameDirectory(taskAttemptDirectory, taskDirectory);
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
|
@ -232,7 +232,7 @@ worker_hash_partition_table(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
/* close partition files and atomically rename (commit) them */
|
/* close partition files and atomically rename (commit) them */
|
||||||
ClosePartitionFiles(partitionFileArray, fileCount);
|
ClosePartitionFiles(partitionFileArray, fileCount);
|
||||||
CitusRemoveDirectory(taskDirectory);
|
CitusRemoveDirectory(taskDirectory->data);
|
||||||
RenameDirectory(taskAttemptDirectory, taskDirectory);
|
RenameDirectory(taskAttemptDirectory, taskDirectory);
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
|
@ -698,7 +698,7 @@ FileIsLink(char *filename, struct stat filestat)
|
||||||
|
|
||||||
#else
|
#else
|
||||||
static bool
|
static bool
|
||||||
FileIsLink(char *filename, struct stat filestat)
|
FileIsLink(const char *filename, struct stat filestat)
|
||||||
{
|
{
|
||||||
return S_ISLNK(filestat.st_mode);
|
return S_ISLNK(filestat.st_mode);
|
||||||
}
|
}
|
||||||
|
@ -714,79 +714,91 @@ FileIsLink(char *filename, struct stat filestat)
|
||||||
* system library's remove_all() method.
|
* system library's remove_all() method.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
CitusRemoveDirectory(StringInfo filename)
|
CitusRemoveDirectory(const char *filename)
|
||||||
{
|
{
|
||||||
struct stat fileStat;
|
/* files may be added during execution, loop when that occurs */
|
||||||
int removed = 0;
|
while (true)
|
||||||
|
|
||||||
int fileStated = stat(filename->data, &fileStat);
|
|
||||||
if (fileStated < 0)
|
|
||||||
{
|
{
|
||||||
if (errno == ENOENT)
|
struct stat fileStat;
|
||||||
{
|
int removed = 0;
|
||||||
return; /* if file does not exist, return */
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errcode_for_file_access(),
|
|
||||||
errmsg("could not stat file \"%s\": %m", filename->data)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
int fileStated = stat(filename, &fileStat);
|
||||||
* If this is a directory, iterate over all its contents and for each
|
if (fileStated < 0)
|
||||||
* content, recurse into this function. Also, make sure that we do not
|
|
||||||
* recurse into symbolic links.
|
|
||||||
*/
|
|
||||||
if (S_ISDIR(fileStat.st_mode) && !FileIsLink(filename->data, fileStat))
|
|
||||||
{
|
|
||||||
const char *directoryName = filename->data;
|
|
||||||
|
|
||||||
DIR *directory = AllocateDir(directoryName);
|
|
||||||
if (directory == NULL)
|
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode_for_file_access(),
|
if (errno == ENOENT)
|
||||||
errmsg("could not open directory \"%s\": %m",
|
|
||||||
directoryName)));
|
|
||||||
}
|
|
||||||
|
|
||||||
struct dirent *directoryEntry = ReadDir(directory, directoryName);
|
|
||||||
for (; directoryEntry != NULL; directoryEntry = ReadDir(directory, directoryName))
|
|
||||||
{
|
|
||||||
const char *baseFilename = directoryEntry->d_name;
|
|
||||||
|
|
||||||
/* if system file, skip it */
|
|
||||||
if (strncmp(baseFilename, ".", MAXPGPATH) == 0 ||
|
|
||||||
strncmp(baseFilename, "..", MAXPGPATH) == 0)
|
|
||||||
{
|
{
|
||||||
continue;
|
return; /* if file does not exist, return */
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode_for_file_access(),
|
||||||
|
errmsg("could not stat file \"%s\": %m", filename)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If this is a directory, iterate over all its contents and for each
|
||||||
|
* content, recurse into this function. Also, make sure that we do not
|
||||||
|
* recurse into symbolic links.
|
||||||
|
*/
|
||||||
|
if (S_ISDIR(fileStat.st_mode) && !FileIsLink(filename, fileStat))
|
||||||
|
{
|
||||||
|
const char *directoryName = filename;
|
||||||
|
|
||||||
|
DIR *directory = AllocateDir(directoryName);
|
||||||
|
if (directory == NULL)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode_for_file_access(),
|
||||||
|
errmsg("could not open directory \"%s\": %m",
|
||||||
|
directoryName)));
|
||||||
}
|
}
|
||||||
|
|
||||||
StringInfo fullFilename = makeStringInfo();
|
StringInfo fullFilename = makeStringInfo();
|
||||||
appendStringInfo(fullFilename, "%s/%s", directoryName, baseFilename);
|
struct dirent *directoryEntry = ReadDir(directory, directoryName);
|
||||||
|
for (; directoryEntry != NULL; directoryEntry = ReadDir(directory,
|
||||||
|
directoryName))
|
||||||
|
{
|
||||||
|
const char *baseFilename = directoryEntry->d_name;
|
||||||
|
|
||||||
CitusRemoveDirectory(fullFilename);
|
/* if system file, skip it */
|
||||||
|
if (strncmp(baseFilename, ".", MAXPGPATH) == 0 ||
|
||||||
|
strncmp(baseFilename, "..", MAXPGPATH) == 0)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
resetStringInfo(fullFilename);
|
||||||
|
appendStringInfo(fullFilename, "%s/%s", directoryName, baseFilename);
|
||||||
|
|
||||||
|
CitusRemoveDirectory(fullFilename->data);
|
||||||
|
}
|
||||||
|
|
||||||
FreeStringInfo(fullFilename);
|
FreeStringInfo(fullFilename);
|
||||||
|
FreeDir(directory);
|
||||||
}
|
}
|
||||||
|
|
||||||
FreeDir(directory);
|
/* we now have an empty directory or a regular file, remove it */
|
||||||
}
|
if (S_ISDIR(fileStat.st_mode))
|
||||||
|
{
|
||||||
|
removed = rmdir(filename);
|
||||||
|
|
||||||
/* we now have an empty directory or a regular file, remove it */
|
if (errno == ENOTEMPTY || errno == EEXIST)
|
||||||
if (S_ISDIR(fileStat.st_mode))
|
{
|
||||||
{
|
continue;
|
||||||
removed = rmdir(filename->data);
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
removed = unlink(filename->data);
|
removed = unlink(filename);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (removed != 0 && errno != ENOENT)
|
if (removed != 0 && errno != ENOENT)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode_for_file_access(),
|
ereport(ERROR, (errcode_for_file_access(),
|
||||||
errmsg("could not remove file \"%s\": %m", filename->data)));
|
errmsg("could not remove file \"%s\": %m", filename)));
|
||||||
|
}
|
||||||
|
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -117,7 +117,7 @@ extern bool CacheDirectoryElement(const char *filename);
|
||||||
extern bool JobDirectoryElement(const char *filename);
|
extern bool JobDirectoryElement(const char *filename);
|
||||||
extern bool DirectoryExists(StringInfo directoryName);
|
extern bool DirectoryExists(StringInfo directoryName);
|
||||||
extern void CitusCreateDirectory(StringInfo directoryName);
|
extern void CitusCreateDirectory(StringInfo directoryName);
|
||||||
extern void CitusRemoveDirectory(StringInfo filename);
|
extern void CitusRemoveDirectory(const char *filename);
|
||||||
extern StringInfo InitTaskDirectory(uint64 jobId, uint32 taskId);
|
extern StringInfo InitTaskDirectory(uint64 jobId, uint32 taskId);
|
||||||
extern void RemoveJobSchema(StringInfo schemaName);
|
extern void RemoveJobSchema(StringInfo schemaName);
|
||||||
extern Datum * DeconstructArrayObject(ArrayType *arrayObject);
|
extern Datum * DeconstructArrayObject(ArrayType *arrayObject);
|
||||||
|
|
Loading…
Reference in New Issue