Merge pull request #742 from citusdata/feature/task_tracker_folders

Differentiate worker and master job temporary folders - MX Backport
pull/825/head
Eren Başak 2016-10-03 14:29:54 +03:00 committed by GitHub
commit 7e7b0f3491
8 changed files with 73 additions and 47 deletions

View File

@ -87,7 +87,7 @@ multi_ExecutorStart(QueryDesc *queryDesc, int eflags)
* We create a directory on the master node to keep task execution results. * We create a directory on the master node to keep task execution results.
* We also register this directory for automatic cleanup on portal delete. * We also register this directory for automatic cleanup on portal delete.
*/ */
jobDirectoryName = JobDirectoryName(workerJob->jobId); jobDirectoryName = MasterJobDirectoryName(workerJob->jobId);
CreateDirectory(jobDirectoryName); CreateDirectory(jobDirectoryName);
ResourceOwnerEnlargeJobDirectories(CurrentResourceOwner); ResourceOwnerEnlargeJobDirectories(CurrentResourceOwner);

View File

@ -514,7 +514,7 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution,
queryStatus = MultiClientQueryStatus(connectionId); queryStatus = MultiClientQueryStatus(connectionId);
if (queryStatus == CLIENT_QUERY_COPY) if (queryStatus == CLIENT_QUERY_COPY)
{ {
StringInfo jobDirectoryName = JobDirectoryName(task->jobId); StringInfo jobDirectoryName = MasterJobDirectoryName(task->jobId);
StringInfo taskFilename = TaskFilename(jobDirectoryName, task->taskId); StringInfo taskFilename = TaskFilename(jobDirectoryName, task->taskId);
char *filename = taskFilename->data; char *filename = taskFilename->data;

View File

@ -204,7 +204,7 @@ MaxMasterConnectionCount(void)
void void
RemoveJobDirectory(uint64 jobId) RemoveJobDirectory(uint64 jobId)
{ {
StringInfo jobDirectoryName = JobDirectoryName(jobId); StringInfo jobDirectoryName = MasterJobDirectoryName(jobId);
RemoveDirectory(jobDirectoryName); RemoveDirectory(jobDirectoryName);
ResourceOwnerForgetJobDirectory(CurrentResourceOwner, jobId); ResourceOwnerForgetJobDirectory(CurrentResourceOwner, jobId);

View File

@ -1314,7 +1314,7 @@ ManageTransmitExecution(TaskTracker *transmitTracker,
queryStatus = MultiClientQueryStatus(connectionId); queryStatus = MultiClientQueryStatus(connectionId);
if (queryStatus == CLIENT_QUERY_COPY) if (queryStatus == CLIENT_QUERY_COPY)
{ {
StringInfo jobDirectoryName = JobDirectoryName(task->jobId); StringInfo jobDirectoryName = MasterJobDirectoryName(task->jobId);
StringInfo taskFilename = TaskFilename(jobDirectoryName, task->taskId); StringInfo taskFilename = TaskFilename(jobDirectoryName, task->taskId);
char *filename = taskFilename->data; char *filename = taskFilename->data;

View File

@ -347,7 +347,7 @@ MasterNodeCopyStatementList(MultiPlan *multiPlan)
foreach(workerTaskCell, workerTaskList) foreach(workerTaskCell, workerTaskList)
{ {
Task *workerTask = (Task *) lfirst(workerTaskCell); Task *workerTask = (Task *) lfirst(workerTaskCell);
StringInfo jobDirectoryName = JobDirectoryName(workerTask->jobId); StringInfo jobDirectoryName = MasterJobDirectoryName(workerTask->jobId);
StringInfo taskFilename = TaskFilename(jobDirectoryName, workerTask->taskId); StringInfo taskFilename = TaskFilename(jobDirectoryName, workerTask->taskId);
RangeVar *relation = makeRangeVar(NULL, tableName, -1); RangeVar *relation = makeRangeVar(NULL, tableName, -1);

View File

@ -441,7 +441,31 @@ ClosePartitionFiles(FileOutputStream *partitionFileArray, uint32 fileCount)
} }
/* Constructs a standardized job directory path for the given job id. */ /*
* MasterJobDirectoryName constructs a standardized job
* directory path for the given job id on the master node.
*/
StringInfo
MasterJobDirectoryName(uint64 jobId)
{
StringInfo jobDirectoryName = makeStringInfo();
/*
* We use the default tablespace in {datadir}/base. Further, we need to
* apply padding on our 64-bit job id, and hence can't use UINT64_FORMAT.
*/
appendStringInfo(jobDirectoryName, "base/%s/%s%0*" INT64_MODIFIER "u",
PG_JOB_CACHE_DIR, MASTER_JOB_DIRECTORY_PREFIX,
MIN_JOB_DIRNAME_WIDTH, jobId);
return jobDirectoryName;
}
/*
* JobDirectoryName Constructs a standardized job
* directory path for the given job id on the worker nodes.
*/
StringInfo StringInfo
JobDirectoryName(uint64 jobId) JobDirectoryName(uint64 jobId)
{ {

View File

@ -26,6 +26,7 @@
/* Directory, file, table name, and UDF related defines for distributed tasks */ /* Directory, file, table name, and UDF related defines for distributed tasks */
#define PG_JOB_CACHE_DIR "pgsql_job_cache" #define PG_JOB_CACHE_DIR "pgsql_job_cache"
#define MASTER_JOB_DIRECTORY_PREFIX "master_job_"
#define JOB_DIRECTORY_PREFIX "job_" #define JOB_DIRECTORY_PREFIX "job_"
#define JOB_SCHEMA_PREFIX "pg_merge_job_" #define JOB_SCHEMA_PREFIX "pg_merge_job_"
#define TASK_FILE_PREFIX "task_" #define TASK_FILE_PREFIX "task_"
@ -107,6 +108,7 @@ extern StringInfo JobSchemaName(uint64 jobId);
extern StringInfo TaskTableName(uint32 taskId); extern StringInfo TaskTableName(uint32 taskId);
extern bool JobSchemaExists(StringInfo schemaName); extern bool JobSchemaExists(StringInfo schemaName);
extern StringInfo JobDirectoryName(uint64 jobId); extern StringInfo JobDirectoryName(uint64 jobId);
extern StringInfo MasterJobDirectoryName(uint64 jobId);
extern StringInfo TaskDirectoryName(uint64 jobId, uint32 taskId); extern StringInfo TaskDirectoryName(uint64 jobId, uint32 taskId);
extern StringInfo PartitionFilename(StringInfo directoryName, uint32 partitionId); extern StringInfo PartitionFilename(StringInfo directoryName, uint32 partitionId);
extern bool CacheDirectoryElement(const char *filename); extern bool CacheDirectoryElement(const char *filename);

View File

@ -96,28 +96,28 @@ DECLARE c_17 CURSOR FOR SELECT sum(l_extendedprice * l_discount) as revenue FROM
DECLARE c_18 CURSOR FOR SELECT sum(l_extendedprice * l_discount) as revenue FROM lineitem; DECLARE c_18 CURSOR FOR SELECT sum(l_extendedprice * l_discount) as revenue FROM lineitem;
DECLARE c_19 CURSOR FOR SELECT sum(l_extendedprice * l_discount) as revenue FROM lineitem; DECLARE c_19 CURSOR FOR SELECT sum(l_extendedprice * l_discount) as revenue FROM lineitem;
SELECT * FROM pg_ls_dir('base/pgsql_job_cache') f ORDER BY f; SELECT * FROM pg_ls_dir('base/pgsql_job_cache') f ORDER BY f;
f f
---------- -----------------
job_1256 master_job_1256
job_1257 master_job_1257
job_1258 master_job_1258
job_1259 master_job_1259
job_1260 master_job_1260
job_1261 master_job_1261
job_1262 master_job_1262
job_1263 master_job_1263
job_1264 master_job_1264
job_1265 master_job_1265
job_1266 master_job_1266
job_1267 master_job_1267
job_1268 master_job_1268
job_1269 master_job_1269
job_1270 master_job_1270
job_1271 master_job_1271
job_1272 master_job_1272
job_1273 master_job_1273
job_1274 master_job_1274
job_1275 master_job_1275
(20 rows) (20 rows)
-- close first, 17th (first after re-alloc) and last cursor. -- close first, 17th (first after re-alloc) and last cursor.
@ -125,25 +125,25 @@ CLOSE c_00;
CLOSE c_16; CLOSE c_16;
CLOSE c_19; CLOSE c_19;
SELECT * FROM pg_ls_dir('base/pgsql_job_cache') f ORDER BY f; SELECT * FROM pg_ls_dir('base/pgsql_job_cache') f ORDER BY f;
f f
---------- -----------------
job_1257 master_job_1257
job_1258 master_job_1258
job_1259 master_job_1259
job_1260 master_job_1260
job_1261 master_job_1261
job_1262 master_job_1262
job_1263 master_job_1263
job_1264 master_job_1264
job_1265 master_job_1265
job_1266 master_job_1266
job_1267 master_job_1267
job_1268 master_job_1268
job_1269 master_job_1269
job_1270 master_job_1270
job_1271 master_job_1271
job_1273 master_job_1273
job_1274 master_job_1274
(17 rows) (17 rows)
ROLLBACK; ROLLBACK;