diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 19ed656c2..4301fec6c 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -87,7 +87,7 @@ multi_ExecutorStart(QueryDesc *queryDesc, int eflags) * We create a directory on the master node to keep task execution results. * We also register this directory for automatic cleanup on portal delete. */ - jobDirectoryName = JobDirectoryName(workerJob->jobId); + jobDirectoryName = MasterJobDirectoryName(workerJob->jobId); CreateDirectory(jobDirectoryName); ResourceOwnerEnlargeJobDirectories(CurrentResourceOwner); diff --git a/src/backend/distributed/executor/multi_real_time_executor.c b/src/backend/distributed/executor/multi_real_time_executor.c index 230da5955..86a069584 100644 --- a/src/backend/distributed/executor/multi_real_time_executor.c +++ b/src/backend/distributed/executor/multi_real_time_executor.c @@ -514,7 +514,7 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution, queryStatus = MultiClientQueryStatus(connectionId); if (queryStatus == CLIENT_QUERY_COPY) { - StringInfo jobDirectoryName = JobDirectoryName(task->jobId); + StringInfo jobDirectoryName = MasterJobDirectoryName(task->jobId); StringInfo taskFilename = TaskFilename(jobDirectoryName, task->taskId); char *filename = taskFilename->data; diff --git a/src/backend/distributed/executor/multi_server_executor.c b/src/backend/distributed/executor/multi_server_executor.c index 48127ed92..703fa4449 100644 --- a/src/backend/distributed/executor/multi_server_executor.c +++ b/src/backend/distributed/executor/multi_server_executor.c @@ -204,7 +204,7 @@ MaxMasterConnectionCount(void) void RemoveJobDirectory(uint64 jobId) { - StringInfo jobDirectoryName = JobDirectoryName(jobId); + StringInfo jobDirectoryName = MasterJobDirectoryName(jobId); RemoveDirectory(jobDirectoryName); ResourceOwnerForgetJobDirectory(CurrentResourceOwner, jobId); diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index 35a3b3c3c..e08657e98 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -1314,7 +1314,7 @@ ManageTransmitExecution(TaskTracker *transmitTracker, queryStatus = MultiClientQueryStatus(connectionId); if (queryStatus == CLIENT_QUERY_COPY) { - StringInfo jobDirectoryName = JobDirectoryName(task->jobId); + StringInfo jobDirectoryName = MasterJobDirectoryName(task->jobId); StringInfo taskFilename = TaskFilename(jobDirectoryName, task->taskId); char *filename = taskFilename->data; diff --git a/src/backend/distributed/planner/multi_master_planner.c b/src/backend/distributed/planner/multi_master_planner.c index 2558267d4..05a522e6e 100644 --- a/src/backend/distributed/planner/multi_master_planner.c +++ b/src/backend/distributed/planner/multi_master_planner.c @@ -347,7 +347,7 @@ MasterNodeCopyStatementList(MultiPlan *multiPlan) foreach(workerTaskCell, workerTaskList) { Task *workerTask = (Task *) lfirst(workerTaskCell); - StringInfo jobDirectoryName = JobDirectoryName(workerTask->jobId); + StringInfo jobDirectoryName = MasterJobDirectoryName(workerTask->jobId); StringInfo taskFilename = TaskFilename(jobDirectoryName, workerTask->taskId); RangeVar *relation = makeRangeVar(NULL, tableName, -1); diff --git a/src/backend/distributed/worker/worker_partition_protocol.c b/src/backend/distributed/worker/worker_partition_protocol.c index e511ae0cd..ced447154 100644 --- a/src/backend/distributed/worker/worker_partition_protocol.c +++ b/src/backend/distributed/worker/worker_partition_protocol.c @@ -441,7 +441,31 @@ ClosePartitionFiles(FileOutputStream *partitionFileArray, uint32 fileCount) } -/* Constructs a standardized job directory path for the given job id. */ +/* + * MasterJobDirectoryName constructs a standardized job + * directory path for the given job id on the master node. + */ +StringInfo +MasterJobDirectoryName(uint64 jobId) +{ + StringInfo jobDirectoryName = makeStringInfo(); + + /* + * We use the default tablespace in {datadir}/base. Further, we need to + * apply padding on our 64-bit job id, and hence can't use UINT64_FORMAT. + */ + appendStringInfo(jobDirectoryName, "base/%s/%s%0*" INT64_MODIFIER "u", + PG_JOB_CACHE_DIR, MASTER_JOB_DIRECTORY_PREFIX, + MIN_JOB_DIRNAME_WIDTH, jobId); + + return jobDirectoryName; +} + + +/* + * JobDirectoryName Constructs a standardized job + * directory path for the given job id on the worker nodes. + */ StringInfo JobDirectoryName(uint64 jobId) { diff --git a/src/include/distributed/worker_protocol.h b/src/include/distributed/worker_protocol.h index df0dcd72d..6a95e99ca 100644 --- a/src/include/distributed/worker_protocol.h +++ b/src/include/distributed/worker_protocol.h @@ -26,6 +26,7 @@ /* Directory, file, table name, and UDF related defines for distributed tasks */ #define PG_JOB_CACHE_DIR "pgsql_job_cache" +#define MASTER_JOB_DIRECTORY_PREFIX "master_job_" #define JOB_DIRECTORY_PREFIX "job_" #define JOB_SCHEMA_PREFIX "pg_merge_job_" #define TASK_FILE_PREFIX "task_" @@ -107,6 +108,7 @@ extern StringInfo JobSchemaName(uint64 jobId); extern StringInfo TaskTableName(uint32 taskId); extern bool JobSchemaExists(StringInfo schemaName); extern StringInfo JobDirectoryName(uint64 jobId); +extern StringInfo MasterJobDirectoryName(uint64 jobId); extern StringInfo TaskDirectoryName(uint64 jobId, uint32 taskId); extern StringInfo PartitionFilename(StringInfo directoryName, uint32 partitionId); extern bool CacheDirectoryElement(const char *filename); diff --git a/src/test/regress/expected/multi_query_directory_cleanup.out b/src/test/regress/expected/multi_query_directory_cleanup.out index 65caed2e9..6395248b6 100644 --- a/src/test/regress/expected/multi_query_directory_cleanup.out +++ b/src/test/regress/expected/multi_query_directory_cleanup.out @@ -96,28 +96,28 @@ DECLARE c_17 CURSOR FOR SELECT sum(l_extendedprice * l_discount) as revenue FROM DECLARE c_18 CURSOR FOR SELECT sum(l_extendedprice * l_discount) as revenue FROM lineitem; DECLARE c_19 CURSOR FOR SELECT sum(l_extendedprice * l_discount) as revenue FROM lineitem; SELECT * FROM pg_ls_dir('base/pgsql_job_cache') f ORDER BY f; - f ----------- - job_1256 - job_1257 - job_1258 - job_1259 - job_1260 - job_1261 - job_1262 - job_1263 - job_1264 - job_1265 - job_1266 - job_1267 - job_1268 - job_1269 - job_1270 - job_1271 - job_1272 - job_1273 - job_1274 - job_1275 + f +----------------- + master_job_1256 + master_job_1257 + master_job_1258 + master_job_1259 + master_job_1260 + master_job_1261 + master_job_1262 + master_job_1263 + master_job_1264 + master_job_1265 + master_job_1266 + master_job_1267 + master_job_1268 + master_job_1269 + master_job_1270 + master_job_1271 + master_job_1272 + master_job_1273 + master_job_1274 + master_job_1275 (20 rows) -- close first, 17th (first after re-alloc) and last cursor. @@ -125,25 +125,25 @@ CLOSE c_00; CLOSE c_16; CLOSE c_19; SELECT * FROM pg_ls_dir('base/pgsql_job_cache') f ORDER BY f; - f ----------- - job_1257 - job_1258 - job_1259 - job_1260 - job_1261 - job_1262 - job_1263 - job_1264 - job_1265 - job_1266 - job_1267 - job_1268 - job_1269 - job_1270 - job_1271 - job_1273 - job_1274 + f +----------------- + master_job_1257 + master_job_1258 + master_job_1259 + master_job_1260 + master_job_1261 + master_job_1262 + master_job_1263 + master_job_1264 + master_job_1265 + master_job_1266 + master_job_1267 + master_job_1268 + master_job_1269 + master_job_1270 + master_job_1271 + master_job_1273 + master_job_1274 (17 rows) ROLLBACK;