diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 8887513cc..ee8cd5700 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -2609,6 +2609,7 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, const char *queryS Query *query = NULL; Node *queryNode = copyStatement->query; List *queryTreeList = NIL; + StringInfo userFilePath = makeStringInfo(); #if (PG_VERSION_NUM >= 100000) RawStmt *rawStmt = makeNode(RawStmt); @@ -2625,6 +2626,14 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, const char *queryS } query = (Query *) linitial(queryTreeList); + + /* + * Add a user ID suffix to prevent other users from reading/writing + * the same file. We do this consistently in all functions that interact + * with task files. + */ + appendStringInfo(userFilePath, "%s.%u", filename, GetUserId()); + tuplesSent = WorkerExecuteSqlTask(query, filename, binaryCopyFormat); snprintf(completionTag, COMPLETION_TAG_BUFSIZE, diff --git a/src/backend/distributed/commands/transmit.c b/src/backend/distributed/commands/transmit.c index 95c90bba9..fa976661c 100644 --- a/src/backend/distributed/commands/transmit.c +++ b/src/backend/distributed/commands/transmit.c @@ -359,6 +359,32 @@ IsTransmitStmt(Node *parsetree) } +/* + * TransmitStatementUser extracts the user attribute from a + * COPY ... (format 'transmit', user '...') statement. + */ +char * +TransmitStatementUser(CopyStmt *copyStatement) +{ + ListCell *optionCell = NULL; + char *userName = NULL; + + AssertArg(IsTransmitStmt((Node *) copyStatement)); + + foreach(optionCell, copyStatement->options) + { + DefElem *defel = (DefElem *) lfirst(optionCell); + + if (strncmp(defel->defname, "user", NAMEDATALEN) == 0) + { + userName = defGetString(defel); + } + } + + return userName; +} + + /* * VerifyTransmitStmt checks that the passed in command is a valid transmit * statement. Raise ERROR if not. diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index b247e4952..f7699a94e 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -222,17 +222,28 @@ multi_ProcessUtility(PlannedStmt *pstmt, if (IsTransmitStmt(parsetree)) { CopyStmt *copyStatement = (CopyStmt *) parsetree; + char *userName = TransmitStatementUser(copyStatement); + bool missingOK = false; + StringInfo transmitPath = makeStringInfo(); VerifyTransmitStmt(copyStatement); /* ->relation->relname is the target file in our overloaded COPY */ + appendStringInfoString(transmitPath, copyStatement->relation->relname); + + if (userName != NULL) + { + Oid userId = get_role_oid(userName, missingOK); + appendStringInfo(transmitPath, ".%d", userId); + } + if (copyStatement->is_from) { - RedirectCopyDataToRegularFile(copyStatement->relation->relname); + RedirectCopyDataToRegularFile(transmitPath->data); } else { - SendRegularFile(copyStatement->relation->relname); + SendRegularFile(transmitPath->data); } /* Don't execute the faux copy statement */ diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index 724359b57..ae4dcd682 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -2590,9 +2590,11 @@ ManageTransmitTracker(TaskTracker *transmitTracker) int32 connectionId = transmitTracker->connectionId; StringInfo jobDirectoryName = JobDirectoryName(transmitState->jobId); StringInfo taskFilename = TaskFilename(jobDirectoryName, transmitState->taskId); + char *userName = CurrentUserName(); StringInfo fileTransmitQuery = makeStringInfo(); - appendStringInfo(fileTransmitQuery, TRANSMIT_REGULAR_COMMAND, taskFilename->data); + appendStringInfo(fileTransmitQuery, TRANSMIT_WITH_USER_COMMAND, + taskFilename->data, quote_literal_cstr(userName)); fileTransmitStarted = MultiClientSendQuery(connectionId, fileTransmitQuery->data); if (fileTransmitStarted) diff --git a/src/backend/distributed/worker/worker_data_fetch_protocol.c b/src/backend/distributed/worker/worker_data_fetch_protocol.c index b90e19f9c..d0533158a 100644 --- a/src/backend/distributed/worker/worker_data_fetch_protocol.c +++ b/src/backend/distributed/worker/worker_data_fetch_protocol.c @@ -108,7 +108,7 @@ worker_fetch_partition_file(PG_FUNCTION_ARGS) /* local filename is // */ StringInfo taskDirectoryName = TaskDirectoryName(jobId, upstreamTaskId); - StringInfo taskFilename = TaskFilename(taskDirectoryName, partitionTaskId); + StringInfo taskFilename = UserTaskFilename(taskDirectoryName, partitionTaskId); /* * If we are the first function to fetch a file for the upstream task, the @@ -154,7 +154,7 @@ worker_fetch_query_results_file(PG_FUNCTION_ARGS) /* local filename is // */ StringInfo taskDirectoryName = TaskDirectoryName(jobId, upstreamTaskId); - StringInfo taskFilename = TaskFilename(taskDirectoryName, queryTaskId); + StringInfo taskFilename = UserTaskFilename(taskDirectoryName, queryTaskId); /* * If we are the first function to fetch a file for the upstream task, the @@ -191,6 +191,21 @@ TaskFilename(StringInfo directoryName, uint32 taskId) } +/* + * UserTaskFilename returns a full file path for a task file including the + * current user ID as a suffix. + */ +StringInfo +UserTaskFilename(StringInfo directoryName, uint32 taskId) +{ + StringInfo taskFilename = TaskFilename(directoryName, taskId); + + appendStringInfo(taskFilename, ".%u", GetUserId()); + + return taskFilename; +} + + /* * FetchRegularFileAsSuperUser copies a file from a remote node in an idempotent * manner. It connects to the remote node as superuser to give file access. @@ -203,6 +218,7 @@ FetchRegularFileAsSuperUser(const char *nodeName, uint32 nodePort, char *nodeUser = NULL; StringInfo attemptFilename = NULL; StringInfo transmitCommand = NULL; + char *userName = CurrentUserName(); uint32 randomId = (uint32) random(); bool received = false; int renamed = 0; @@ -217,7 +233,8 @@ FetchRegularFileAsSuperUser(const char *nodeName, uint32 nodePort, MIN_TASK_FILENAME_WIDTH, randomId, ATTEMPT_FILE_SUFFIX); transmitCommand = makeStringInfo(); - appendStringInfo(transmitCommand, TRANSMIT_REGULAR_COMMAND, remoteFilename->data); + appendStringInfo(transmitCommand, TRANSMIT_WITH_USER_COMMAND, remoteFilename->data, + quote_literal_cstr(userName)); /* connect as superuser to give file access */ nodeUser = CitusExtensionOwnerName(); diff --git a/src/backend/distributed/worker/worker_merge_protocol.c b/src/backend/distributed/worker/worker_merge_protocol.c index f95d19d16..74331f984 100644 --- a/src/backend/distributed/worker/worker_merge_protocol.c +++ b/src/backend/distributed/worker/worker_merge_protocol.c @@ -23,6 +23,7 @@ #include "catalog/pg_namespace.h" #include "commands/copy.h" #include "commands/tablecmds.h" +#include "common/string.h" #include "distributed/metadata_cache.h" #include "distributed/worker_protocol.h" #include "distributed/version_compat.h" @@ -42,7 +43,7 @@ static List * ArrayObjectToCStringList(ArrayType *arrayObject); static void CreateTaskTable(StringInfo schemaName, StringInfo relationName, List *columnNameList, List *columnTypeList); static void CopyTaskFilesFromDirectory(StringInfo schemaName, StringInfo relationName, - StringInfo sourceDirectoryName); + StringInfo sourceDirectoryName, Oid userId); /* exports for SQL callable functions */ @@ -78,6 +79,7 @@ worker_merge_files_into_table(PG_FUNCTION_ARGS) List *columnTypeList = NIL; Oid savedUserId = InvalidOid; int savedSecurityContext = 0; + Oid userId = GetUserId(); /* we should have the same number of column names and types */ int32 columnNameCount = ArrayObjectCount(columnNameObject); @@ -112,7 +114,8 @@ worker_merge_files_into_table(PG_FUNCTION_ARGS) GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); - CopyTaskFilesFromDirectory(jobSchemaName, taskTableName, taskDirectoryName); + CopyTaskFilesFromDirectory(jobSchemaName, taskTableName, taskDirectoryName, + userId); SetUserIdAndSecContext(savedUserId, savedSecurityContext); @@ -155,6 +158,7 @@ worker_merge_files_and_run_query(PG_FUNCTION_ARGS) int createMergeTableResult = 0; int createIntermediateTableResult = 0; int finished = 0; + Oid userId = GetUserId(); CheckCitusVersion(ERROR); @@ -196,7 +200,8 @@ worker_merge_files_and_run_query(PG_FUNCTION_ARGS) appendStringInfo(mergeTableName, "%s%s", intermediateTableName->data, MERGE_TABLE_SUFFIX); - CopyTaskFilesFromDirectory(jobSchemaName, mergeTableName, taskDirectoryName); + CopyTaskFilesFromDirectory(jobSchemaName, mergeTableName, taskDirectoryName, + userId); createIntermediateTableResult = SPI_exec(createIntermediateTableQuery, 0); if (createIntermediateTableResult < 0) @@ -482,14 +487,20 @@ CreateStatement(RangeVar *relation, List *columnDefinitionList) * CopyTaskFilesFromDirectory finds all files in the given directory, except for * those having an attempt suffix. The function then copies these files into the * database table identified by the given schema and table name. + * + * The function makes sure all files were generated by the current user by checking + * whether the filename ends with the username, since this is added to local file + * names by functions such as worker_fetch_partition-file. Files that were generated + * by other users calling worker_fetch_partition_file directly are skipped. */ static void CopyTaskFilesFromDirectory(StringInfo schemaName, StringInfo relationName, - StringInfo sourceDirectoryName) + StringInfo sourceDirectoryName, Oid userId) { const char *directoryName = sourceDirectoryName->data; struct dirent *directoryEntry = NULL; uint64 copiedRowTotal = 0; + StringInfo expectedFileSuffix = makeStringInfo(); DIR *directory = AllocateDir(directoryName); if (directory == NULL) @@ -498,6 +509,8 @@ CopyTaskFilesFromDirectory(StringInfo schemaName, StringInfo relationName, errmsg("could not open directory \"%s\": %m", directoryName))); } + appendStringInfo(expectedFileSuffix, ".%u", userId); + directoryEntry = ReadDir(directory, directoryName); for (; directoryEntry != NULL; directoryEntry = ReadDir(directory, directoryName)) { @@ -516,6 +529,18 @@ CopyTaskFilesFromDirectory(StringInfo schemaName, StringInfo relationName, continue; } + if (!pg_str_endswith(baseFilename, expectedFileSuffix->data)) + { + /* + * Someone is trying to tamper with our results. We don't throw an error + * here because we don't want to allow users to prevent each other from + * running queries. + */ + ereport(WARNING, (errmsg("Task file \"%s\" does not have expected suffix " + "\"%s\"", baseFilename, expectedFileSuffix->data))); + continue; + } + fullFilename = makeStringInfo(); appendStringInfo(fullFilename, "%s/%s", directoryName, baseFilename); diff --git a/src/backend/distributed/worker/worker_partition_protocol.c b/src/backend/distributed/worker/worker_partition_protocol.c index 386e7fd28..d5ca9b44f 100644 --- a/src/backend/distributed/worker/worker_partition_protocol.c +++ b/src/backend/distributed/worker/worker_partition_protocol.c @@ -16,6 +16,7 @@ #include "postgres.h" #include "funcapi.h" +#include "miscadmin.h" #include "pgstat.h" #include @@ -80,6 +81,7 @@ static void OutputBinaryFooters(FileOutputStream *partitionFileArray, uint32 fil static uint32 RangePartitionId(Datum partitionValue, const void *context); static uint32 HashPartitionId(Datum partitionValue, const void *context); static uint32 HashPartitionIdViaDeprecatedAPI(Datum partitionValue, const void *context); +static StringInfo UserPartitionFilename(StringInfo directoryName, uint32 partitionId); static bool FileIsLink(char *filename, struct stat filestat); @@ -509,7 +511,7 @@ OpenPartitionFiles(StringInfo directoryName, uint32 fileCount) for (fileIndex = 0; fileIndex < fileCount; fileIndex++) { - StringInfo filePath = PartitionFilename(directoryName, fileIndex); + StringInfo filePath = UserPartitionFilename(directoryName, fileIndex); fileDescriptor = PathNameOpenFilePerm(filePath->data, fileFlags, fileMode); if (fileDescriptor < 0) @@ -606,7 +608,14 @@ TaskDirectoryName(uint64 jobId, uint32 taskId) } -/* Constructs a standardized partition file path for given directory and id. */ +/* + * PartitionFilename returns a partition file path for given directory and id + * which is suitable for use in worker_fetch_partition_file and tranmsit. + * + * It excludes the user ID part at the end of the filename, since that is + * added by worker_fetch_partition_file itself based on the current user. + * For the full path use UserPartitionFilename. + */ StringInfo PartitionFilename(StringInfo directoryName, uint32 partitionId) { @@ -619,6 +628,21 @@ PartitionFilename(StringInfo directoryName, uint32 partitionId) } +/* + * UserPartitionFilename returns the path of a partition file for the given + * partition ID and the current user. + */ +static StringInfo +UserPartitionFilename(StringInfo directoryName, uint32 partitionId) +{ + StringInfo partitionFilename = PartitionFilename(directoryName, partitionId); + + appendStringInfo(partitionFilename, ".%u", GetUserId()); + + return partitionFilename; +} + + /* * JobDirectoryElement takes in a filename, and checks if this name lives in the * directory path that is used for task output files. Note that this function's diff --git a/src/backend/distributed/worker/worker_sql_task_protocol.c b/src/backend/distributed/worker/worker_sql_task_protocol.c index 1648a0c2c..c9246038c 100644 --- a/src/backend/distributed/worker/worker_sql_task_protocol.c +++ b/src/backend/distributed/worker/worker_sql_task_protocol.c @@ -82,7 +82,7 @@ worker_execute_sql_task(PG_FUNCTION_ARGS) /* job directory is created prior to scheduling the task */ StringInfo jobDirectoryName = JobDirectoryName(jobId); - StringInfo taskFilename = TaskFilename(jobDirectoryName, taskId); + StringInfo taskFilename = UserTaskFilename(jobDirectoryName, taskId); query = ParseQueryString(queryString); tuplesSent = WorkerExecuteSqlTask(query, taskFilename->data, binaryCopyFormat); diff --git a/src/include/distributed/transmit.h b/src/include/distributed/transmit.h index 28ed0dad3..5aadafec6 100644 --- a/src/include/distributed/transmit.h +++ b/src/include/distributed/transmit.h @@ -28,6 +28,7 @@ extern void FreeStringInfo(StringInfo stringInfo); /* Local functions forward declarations for Transmit statement */ extern bool IsTransmitStmt(Node *parsetree); +extern char * TransmitStatementUser(CopyStmt *copyStatement); extern void VerifyTransmitStmt(CopyStmt *copyStatement); diff --git a/src/include/distributed/worker_protocol.h b/src/include/distributed/worker_protocol.h index b92119b7f..5dca935cb 100644 --- a/src/include/distributed/worker_protocol.h +++ b/src/include/distributed/worker_protocol.h @@ -44,7 +44,8 @@ /* Defines used for fetching files and tables */ /* the tablename in the overloaded COPY statement is the to-be-transferred file */ -#define TRANSMIT_REGULAR_COMMAND "COPY \"%s\" TO STDOUT WITH (format 'transmit')" +#define TRANSMIT_WITH_USER_COMMAND \ + "COPY \"%s\" TO STDOUT WITH (format 'transmit', user %s)" #define COPY_OUT_COMMAND "COPY %s TO STDOUT" #define COPY_SELECT_ALL_OUT_COMMAND "COPY (SELECT * FROM %s) TO STDOUT" #define COPY_IN_COMMAND "COPY %s FROM '%s'" @@ -128,6 +129,7 @@ extern int64 WorkerExecuteSqlTask(Query *query, char *taskFilename, /* Function declarations shared with the master planner */ extern StringInfo TaskFilename(StringInfo directoryName, uint32 taskId); +extern StringInfo UserTaskFilename(StringInfo directoryName, uint32 taskId); extern List * ExecuteRemoteQuery(const char *nodeName, uint32 nodePort, char *runAsUser, StringInfo queryString); extern List * ColumnDefinitionList(List *columnNameList, List *columnTypeList); diff --git a/src/test/regress/expected/task_tracker_partition_task.out b/src/test/regress/expected/task_tracker_partition_task.out index 2d36f93b0..6d30e3d48 100644 --- a/src/test/regress/expected/task_tracker_partition_task.out +++ b/src/test/regress/expected/task_tracker_partition_task.out @@ -8,6 +8,11 @@ \set TablePart00 lineitem_partition_task_part_00 \set TablePart01 lineitem_partition_task_part_01 \set TablePart02 lineitem_partition_task_part_02 +SELECT usesysid AS userid FROM pg_user WHERE usename = current_user \gset +\set File_Basedir base/pgsql_job_cache +\set Table_File_00 :File_Basedir/job_:JobId/task_:PartitionTaskId/p_00000.:userid +\set Table_File_01 :File_Basedir/job_:JobId/task_:PartitionTaskId/p_00001.:userid +\set Table_File_02 :File_Basedir/job_:JobId/task_:PartitionTaskId/p_00002.:userid -- We assign a partition task and wait for it to complete. Note that we hardcode -- the partition function call string, including the job and task identifiers, -- into the argument in the task assignment function. This hardcoding is @@ -34,9 +39,9 @@ SELECT task_tracker_task_status(:JobId, :PartitionTaskId); 6 (1 row) -COPY :TablePart00 FROM 'base/pgsql_job_cache/job_401010/task_801106/p_00000'; -COPY :TablePart01 FROM 'base/pgsql_job_cache/job_401010/task_801106/p_00001'; -COPY :TablePart02 FROM 'base/pgsql_job_cache/job_401010/task_801106/p_00002'; +COPY :TablePart00 FROM :'Table_File_00'; +COPY :TablePart01 FROM :'Table_File_01'; +COPY :TablePart02 FROM :'Table_File_02'; SELECT COUNT(*) FROM :TablePart00; count ------- diff --git a/src/test/regress/expected/worker_binary_data_partition.out b/src/test/regress/expected/worker_binary_data_partition.out index afab11021..b07d7d66e 100644 --- a/src/test/regress/expected/worker_binary_data_partition.out +++ b/src/test/regress/expected/worker_binary_data_partition.out @@ -12,6 +12,11 @@ \set Table_Part_00 binary_data_table_part_00 \set Table_Part_01 binary_data_table_part_01 \set Table_Part_02 binary_data_table_part_02 +SELECT usesysid AS userid FROM pg_user WHERE usename = current_user \gset +\set File_Basedir base/pgsql_job_cache +\set Table_File_00 :File_Basedir/job_:JobId/task_:TaskId/p_00000.:userid +\set Table_File_01 :File_Basedir/job_:JobId/task_:TaskId/p_00001.:userid +\set Table_File_02 :File_Basedir/job_:JobId/task_:TaskId/p_00002.:userid -- Create table with special characters CREATE TABLE :Table_Name(textcolumn text, binarycolumn bytea); COPY :Table_Name FROM stdin; @@ -47,9 +52,9 @@ SELECT worker_range_partition_table(:JobId, :TaskId, :Select_Query_Text, CREATE TABLE :Table_Part_00 ( LIKE :Table_Name ); CREATE TABLE :Table_Part_01 ( LIKE :Table_Name ); CREATE TABLE :Table_Part_02 ( LIKE :Table_Name ); -COPY :Table_Part_00 FROM 'base/pgsql_job_cache/job_201010/task_101105/p_00000'; -COPY :Table_Part_01 FROM 'base/pgsql_job_cache/job_201010/task_101105/p_00001'; -COPY :Table_Part_02 FROM 'base/pgsql_job_cache/job_201010/task_101105/p_00002'; +COPY :Table_Part_00 FROM :'Table_File_00'; +COPY :Table_Part_01 FROM :'Table_File_01'; +COPY :Table_Part_02 FROM :'Table_File_02'; -- The union of the three partitions should have as many rows as original table SELECT COUNT(*) AS total_row_count FROM ( SELECT * FROM :Table_Part_00 UNION ALL diff --git a/src/test/regress/expected/worker_hash_partition.out b/src/test/regress/expected/worker_hash_partition.out index 63ed5f27d..c21c943d4 100644 --- a/src/test/regress/expected/worker_hash_partition.out +++ b/src/test/regress/expected/worker_hash_partition.out @@ -16,6 +16,12 @@ \set Table_Part_01 lineitem_hash_part_01 \set Table_Part_02 lineitem_hash_part_02 \set Table_Part_03 lineitem_hash_part_03 +SELECT usesysid AS userid FROM pg_user WHERE usename = current_user \gset +\set File_Basedir base/pgsql_job_cache +\set Table_File_00 :File_Basedir/job_:JobId/task_:TaskId/p_00000.:userid +\set Table_File_01 :File_Basedir/job_:JobId/task_:TaskId/p_00001.:userid +\set Table_File_02 :File_Basedir/job_:JobId/task_:TaskId/p_00002.:userid +\set Table_File_03 :File_Basedir/job_:JobId/task_:TaskId/p_00003.:userid -- Run select query, and apply hash partitioning on query results SELECT worker_hash_partition_table(:JobId, :TaskId, :Select_Query_Text, :Partition_Column_Text, :Partition_Column_Type::regtype, @@ -25,10 +31,10 @@ SELECT worker_hash_partition_table(:JobId, :TaskId, :Select_Query_Text, (1 row) -COPY :Table_Part_00 FROM 'base/pgsql_job_cache/job_201010/task_101103/p_00000'; -COPY :Table_Part_01 FROM 'base/pgsql_job_cache/job_201010/task_101103/p_00001'; -COPY :Table_Part_02 FROM 'base/pgsql_job_cache/job_201010/task_101103/p_00002'; -COPY :Table_Part_03 FROM 'base/pgsql_job_cache/job_201010/task_101103/p_00003'; +COPY :Table_Part_00 FROM :'Table_File_00'; +COPY :Table_Part_01 FROM :'Table_File_01'; +COPY :Table_Part_02 FROM :'Table_File_02'; +COPY :Table_Part_03 FROM :'Table_File_03'; SELECT COUNT(*) FROM :Table_Part_00; count ------- diff --git a/src/test/regress/expected/worker_hash_partition_complex.out b/src/test/regress/expected/worker_hash_partition_complex.out index a90f4d505..4931d0032 100644 --- a/src/test/regress/expected/worker_hash_partition_complex.out +++ b/src/test/regress/expected/worker_hash_partition_complex.out @@ -15,6 +15,12 @@ \set Table_Part_01 lineitem_hash_complex_part_01 \set Table_Part_02 lineitem_hash_complex_part_02 \set Table_Part_03 lineitem_hash_complex_part_03 +SELECT usesysid AS userid FROM pg_user WHERE usename = current_user \gset +\set File_Basedir base/pgsql_job_cache +\set Table_File_00 :File_Basedir/job_:JobId/task_:TaskId/p_00000.:userid +\set Table_File_01 :File_Basedir/job_:JobId/task_:TaskId/p_00001.:userid +\set Table_File_02 :File_Basedir/job_:JobId/task_:TaskId/p_00002.:userid +\set Table_File_03 :File_Basedir/job_:JobId/task_:TaskId/p_00003.:userid -- Run hardcoded complex select query, and apply hash partitioning on query -- results SELECT worker_hash_partition_table(:JobId, :TaskId, @@ -30,10 +36,10 @@ SELECT worker_hash_partition_table(:JobId, :TaskId, (1 row) -- Copy partitioned data files into tables for testing purposes -COPY :Table_Part_00 FROM 'base/pgsql_job_cache/job_201010/task_101104/p_00000'; -COPY :Table_Part_01 FROM 'base/pgsql_job_cache/job_201010/task_101104/p_00001'; -COPY :Table_Part_02 FROM 'base/pgsql_job_cache/job_201010/task_101104/p_00002'; -COPY :Table_Part_03 FROM 'base/pgsql_job_cache/job_201010/task_101104/p_00003'; +COPY :Table_Part_00 FROM :'Table_File_00'; +COPY :Table_Part_01 FROM :'Table_File_01'; +COPY :Table_Part_02 FROM :'Table_File_02'; +COPY :Table_Part_03 FROM :'Table_File_03'; SELECT COUNT(*) FROM :Table_Part_00; count ------- diff --git a/src/test/regress/expected/worker_null_data_partition.out b/src/test/regress/expected/worker_null_data_partition.out index 344658142..08b3e0976 100644 --- a/src/test/regress/expected/worker_null_data_partition.out +++ b/src/test/regress/expected/worker_null_data_partition.out @@ -11,6 +11,11 @@ \set Range_Table_Part_00 supplier_range_part_00 \set Range_Table_Part_01 supplier_range_part_01 \set Range_Table_Part_02 supplier_range_part_02 +SELECT usesysid AS userid FROM pg_user WHERE usename = current_user \gset +\set File_Basedir base/pgsql_job_cache +\set Range_Table_File_00 :File_Basedir/job_:JobId/task_:Range_TaskId/p_00000.:userid +\set Range_Table_File_01 :File_Basedir/job_:JobId/task_:Range_TaskId/p_00001.:userid +\set Range_Table_File_02 :File_Basedir/job_:JobId/task_:Range_TaskId/p_00002.:userid -- Run select query, and apply range partitioning on query results. Note that -- one of the split point values is 0, We are checking here that the partition -- function doesn't treat 0 as null, and that range repartitioning correctly @@ -24,9 +29,9 @@ SELECT worker_range_partition_table(:JobId, :Range_TaskId, :Select_Query_Text, (1 row) -- Copy partitioned data files into tables for testing purposes -COPY :Range_Table_Part_00 FROM 'base/pgsql_job_cache/job_201010/task_101106/p_00000'; -COPY :Range_Table_Part_01 FROM 'base/pgsql_job_cache/job_201010/task_101106/p_00001'; -COPY :Range_Table_Part_02 FROM 'base/pgsql_job_cache/job_201010/task_101106/p_00002'; +COPY :Range_Table_Part_00 FROM :'Range_Table_File_00'; +COPY :Range_Table_Part_01 FROM :'Range_Table_File_01'; +COPY :Range_Table_Part_02 FROM :'Range_Table_File_02'; SELECT COUNT(*) FROM :Range_Table_Part_00; count ------- @@ -103,6 +108,10 @@ SELECT COUNT(*) AS diff_rhs_02 FROM ( \set Hash_Table_Part_00 supplier_hash_part_00 \set Hash_Table_Part_01 supplier_hash_part_01 \set Hash_Table_Part_02 supplier_hash_part_02 +\set File_Basedir base/pgsql_job_cache +\set Hash_Table_File_00 :File_Basedir/job_:JobId/task_:Hash_TaskId/p_00000.:userid +\set Hash_Table_File_01 :File_Basedir/job_:JobId/task_:Hash_TaskId/p_00001.:userid +\set Hash_Table_File_02 :File_Basedir/job_:JobId/task_:Hash_TaskId/p_00002.:userid -- Run select query, and apply hash partitioning on query results SELECT worker_hash_partition_table(:JobId, :Hash_TaskId, :Select_Query_Text, :Partition_Column_Text, :Partition_Column_Type, @@ -112,9 +121,9 @@ SELECT worker_hash_partition_table(:JobId, :Hash_TaskId, :Select_Query_Text, (1 row) -COPY :Hash_Table_Part_00 FROM 'base/pgsql_job_cache/job_201010/task_101107/p_00000'; -COPY :Hash_Table_Part_01 FROM 'base/pgsql_job_cache/job_201010/task_101107/p_00001'; -COPY :Hash_Table_Part_02 FROM 'base/pgsql_job_cache/job_201010/task_101107/p_00002'; +COPY :Hash_Table_Part_00 FROM :'Hash_Table_File_00'; +COPY :Hash_Table_Part_01 FROM :'Hash_Table_File_01'; +COPY :Hash_Table_Part_02 FROM :'Hash_Table_File_02'; SELECT COUNT(*) FROM :Hash_Table_Part_00; count ------- diff --git a/src/test/regress/expected/worker_range_partition.out b/src/test/regress/expected/worker_range_partition.out index 14e6203ed..bb4041ac3 100644 --- a/src/test/regress/expected/worker_range_partition.out +++ b/src/test/regress/expected/worker_range_partition.out @@ -12,11 +12,12 @@ \set Table_Part_01 lineitem_range_part_01 \set Table_Part_02 lineitem_range_part_02 \set Table_Part_03 lineitem_range_part_03 +SELECT usesysid AS userid FROM pg_user WHERE usename = current_user \gset \set File_Basedir base/pgsql_job_cache -\set Table_File_00 :File_Basedir/job_:JobId/task_:TaskId/p_00000 -\set Table_File_01 :File_Basedir/job_:JobId/task_:TaskId/p_00001 -\set Table_File_02 :File_Basedir/job_:JobId/task_:TaskId/p_00002 -\set Table_File_03 :File_Basedir/job_:JobId/task_:TaskId/p_00003 +\set Table_File_00 :File_Basedir/job_:JobId/task_:TaskId/p_00000.:userid +\set Table_File_01 :File_Basedir/job_:JobId/task_:TaskId/p_00001.:userid +\set Table_File_02 :File_Basedir/job_:JobId/task_:TaskId/p_00002.:userid +\set Table_File_03 :File_Basedir/job_:JobId/task_:TaskId/p_00003.:userid -- Run select query, and apply range partitioning on query results SELECT worker_range_partition_table(:JobId, :TaskId, :Select_Query_Text, :Partition_Column_Text, :Partition_Column_Type, diff --git a/src/test/regress/expected/worker_range_partition_complex.out b/src/test/regress/expected/worker_range_partition_complex.out index 57b2c7795..ff739a3c1 100644 --- a/src/test/regress/expected/worker_range_partition_complex.out +++ b/src/test/regress/expected/worker_range_partition_complex.out @@ -12,6 +12,12 @@ \set Table_Part_01 lineitem_range_complex_part_01 \set Table_Part_02 lineitem_range_complex_part_02 \set Table_Part_03 lineitem_range_complex_part_03 +SELECT usesysid AS userid FROM pg_user WHERE usename = current_user \gset +\set File_Basedir base/pgsql_job_cache +\set Table_File_00 :File_Basedir/job_:JobId/task_:TaskId/p_00000.:userid +\set Table_File_01 :File_Basedir/job_:JobId/task_:TaskId/p_00001.:userid +\set Table_File_02 :File_Basedir/job_:JobId/task_:TaskId/p_00002.:userid +\set Table_File_03 :File_Basedir/job_:JobId/task_:TaskId/p_00003.:userid -- Run hardcoded complex select query, and apply range partitioning on query -- results SELECT worker_range_partition_table(:JobId, :TaskId, @@ -27,10 +33,10 @@ SELECT worker_range_partition_table(:JobId, :TaskId, (1 row) -- Copy partitioned data files into tables for testing purposes -COPY :Table_Part_00 FROM 'base/pgsql_job_cache/job_201010/task_101102/p_00000'; -COPY :Table_Part_01 FROM 'base/pgsql_job_cache/job_201010/task_101102/p_00001'; -COPY :Table_Part_02 FROM 'base/pgsql_job_cache/job_201010/task_101102/p_00002'; -COPY :Table_Part_03 FROM 'base/pgsql_job_cache/job_201010/task_101102/p_00003'; +COPY :Table_Part_00 FROM :'Table_File_00'; +COPY :Table_Part_01 FROM :'Table_File_01'; +COPY :Table_Part_02 FROM :'Table_File_02'; +COPY :Table_Part_03 FROM :'Table_File_03'; SELECT COUNT(*) FROM :Table_Part_00; count ------- diff --git a/src/test/regress/sql/task_tracker_partition_task.sql b/src/test/regress/sql/task_tracker_partition_task.sql index 3d8f40117..89fc0bbe2 100644 --- a/src/test/regress/sql/task_tracker_partition_task.sql +++ b/src/test/regress/sql/task_tracker_partition_task.sql @@ -14,6 +14,13 @@ \set TablePart01 lineitem_partition_task_part_01 \set TablePart02 lineitem_partition_task_part_02 +SELECT usesysid AS userid FROM pg_user WHERE usename = current_user \gset + +\set File_Basedir base/pgsql_job_cache +\set Table_File_00 :File_Basedir/job_:JobId/task_:PartitionTaskId/p_00000.:userid +\set Table_File_01 :File_Basedir/job_:JobId/task_:PartitionTaskId/p_00001.:userid +\set Table_File_02 :File_Basedir/job_:JobId/task_:PartitionTaskId/p_00002.:userid + -- We assign a partition task and wait for it to complete. Note that we hardcode -- the partition function call string, including the job and task identifiers, -- into the argument in the task assignment function. This hardcoding is @@ -29,9 +36,9 @@ SELECT pg_sleep(4.0); SELECT task_tracker_task_status(:JobId, :PartitionTaskId); -COPY :TablePart00 FROM 'base/pgsql_job_cache/job_401010/task_801106/p_00000'; -COPY :TablePart01 FROM 'base/pgsql_job_cache/job_401010/task_801106/p_00001'; -COPY :TablePart02 FROM 'base/pgsql_job_cache/job_401010/task_801106/p_00002'; +COPY :TablePart00 FROM :'Table_File_00'; +COPY :TablePart01 FROM :'Table_File_01'; +COPY :TablePart02 FROM :'Table_File_02'; SELECT COUNT(*) FROM :TablePart00; SELECT COUNT(*) FROM :TablePart02; diff --git a/src/test/regress/sql/worker_binary_data_partition.sql b/src/test/regress/sql/worker_binary_data_partition.sql index 168302634..d6469b968 100644 --- a/src/test/regress/sql/worker_binary_data_partition.sql +++ b/src/test/regress/sql/worker_binary_data_partition.sql @@ -18,6 +18,13 @@ \set Table_Part_01 binary_data_table_part_01 \set Table_Part_02 binary_data_table_part_02 +SELECT usesysid AS userid FROM pg_user WHERE usename = current_user \gset + +\set File_Basedir base/pgsql_job_cache +\set Table_File_00 :File_Basedir/job_:JobId/task_:TaskId/p_00000.:userid +\set Table_File_01 :File_Basedir/job_:JobId/task_:TaskId/p_00001.:userid +\set Table_File_02 :File_Basedir/job_:JobId/task_:TaskId/p_00002.:userid + -- Create table with special characters CREATE TABLE :Table_Name(textcolumn text, binarycolumn bytea); @@ -52,9 +59,9 @@ CREATE TABLE :Table_Part_00 ( LIKE :Table_Name ); CREATE TABLE :Table_Part_01 ( LIKE :Table_Name ); CREATE TABLE :Table_Part_02 ( LIKE :Table_Name ); -COPY :Table_Part_00 FROM 'base/pgsql_job_cache/job_201010/task_101105/p_00000'; -COPY :Table_Part_01 FROM 'base/pgsql_job_cache/job_201010/task_101105/p_00001'; -COPY :Table_Part_02 FROM 'base/pgsql_job_cache/job_201010/task_101105/p_00002'; +COPY :Table_Part_00 FROM :'Table_File_00'; +COPY :Table_Part_01 FROM :'Table_File_01'; +COPY :Table_Part_02 FROM :'Table_File_02'; -- The union of the three partitions should have as many rows as original table diff --git a/src/test/regress/sql/worker_hash_partition.sql b/src/test/regress/sql/worker_hash_partition.sql index 6137638fe..7097de6dc 100644 --- a/src/test/regress/sql/worker_hash_partition.sql +++ b/src/test/regress/sql/worker_hash_partition.sql @@ -22,16 +22,24 @@ \set Table_Part_02 lineitem_hash_part_02 \set Table_Part_03 lineitem_hash_part_03 +SELECT usesysid AS userid FROM pg_user WHERE usename = current_user \gset + +\set File_Basedir base/pgsql_job_cache +\set Table_File_00 :File_Basedir/job_:JobId/task_:TaskId/p_00000.:userid +\set Table_File_01 :File_Basedir/job_:JobId/task_:TaskId/p_00001.:userid +\set Table_File_02 :File_Basedir/job_:JobId/task_:TaskId/p_00002.:userid +\set Table_File_03 :File_Basedir/job_:JobId/task_:TaskId/p_00003.:userid + -- Run select query, and apply hash partitioning on query results SELECT worker_hash_partition_table(:JobId, :TaskId, :Select_Query_Text, :Partition_Column_Text, :Partition_Column_Type::regtype, ARRAY[-2147483648, -1073741824, 0, 1073741824]::int4[]); -COPY :Table_Part_00 FROM 'base/pgsql_job_cache/job_201010/task_101103/p_00000'; -COPY :Table_Part_01 FROM 'base/pgsql_job_cache/job_201010/task_101103/p_00001'; -COPY :Table_Part_02 FROM 'base/pgsql_job_cache/job_201010/task_101103/p_00002'; -COPY :Table_Part_03 FROM 'base/pgsql_job_cache/job_201010/task_101103/p_00003'; +COPY :Table_Part_00 FROM :'Table_File_00'; +COPY :Table_Part_01 FROM :'Table_File_01'; +COPY :Table_Part_02 FROM :'Table_File_02'; +COPY :Table_Part_03 FROM :'Table_File_03'; SELECT COUNT(*) FROM :Table_Part_00; SELECT COUNT(*) FROM :Table_Part_01; diff --git a/src/test/regress/sql/worker_hash_partition_complex.sql b/src/test/regress/sql/worker_hash_partition_complex.sql index 722109332..9701e3bc2 100644 --- a/src/test/regress/sql/worker_hash_partition_complex.sql +++ b/src/test/regress/sql/worker_hash_partition_complex.sql @@ -22,6 +22,14 @@ \set Table_Part_02 lineitem_hash_complex_part_02 \set Table_Part_03 lineitem_hash_complex_part_03 +SELECT usesysid AS userid FROM pg_user WHERE usename = current_user \gset + +\set File_Basedir base/pgsql_job_cache +\set Table_File_00 :File_Basedir/job_:JobId/task_:TaskId/p_00000.:userid +\set Table_File_01 :File_Basedir/job_:JobId/task_:TaskId/p_00001.:userid +\set Table_File_02 :File_Basedir/job_:JobId/task_:TaskId/p_00002.:userid +\set Table_File_03 :File_Basedir/job_:JobId/task_:TaskId/p_00003.:userid + -- Run hardcoded complex select query, and apply hash partitioning on query -- results @@ -35,10 +43,10 @@ SELECT worker_hash_partition_table(:JobId, :TaskId, -- Copy partitioned data files into tables for testing purposes -COPY :Table_Part_00 FROM 'base/pgsql_job_cache/job_201010/task_101104/p_00000'; -COPY :Table_Part_01 FROM 'base/pgsql_job_cache/job_201010/task_101104/p_00001'; -COPY :Table_Part_02 FROM 'base/pgsql_job_cache/job_201010/task_101104/p_00002'; -COPY :Table_Part_03 FROM 'base/pgsql_job_cache/job_201010/task_101104/p_00003'; +COPY :Table_Part_00 FROM :'Table_File_00'; +COPY :Table_Part_01 FROM :'Table_File_01'; +COPY :Table_Part_02 FROM :'Table_File_02'; +COPY :Table_Part_03 FROM :'Table_File_03'; SELECT COUNT(*) FROM :Table_Part_00; SELECT COUNT(*) FROM :Table_Part_03; diff --git a/src/test/regress/sql/worker_null_data_partition.sql b/src/test/regress/sql/worker_null_data_partition.sql index b6fa47d3d..d243feaf8 100644 --- a/src/test/regress/sql/worker_null_data_partition.sql +++ b/src/test/regress/sql/worker_null_data_partition.sql @@ -17,6 +17,13 @@ \set Range_Table_Part_01 supplier_range_part_01 \set Range_Table_Part_02 supplier_range_part_02 +SELECT usesysid AS userid FROM pg_user WHERE usename = current_user \gset + +\set File_Basedir base/pgsql_job_cache +\set Range_Table_File_00 :File_Basedir/job_:JobId/task_:Range_TaskId/p_00000.:userid +\set Range_Table_File_01 :File_Basedir/job_:JobId/task_:Range_TaskId/p_00001.:userid +\set Range_Table_File_02 :File_Basedir/job_:JobId/task_:Range_TaskId/p_00002.:userid + -- Run select query, and apply range partitioning on query results. Note that -- one of the split point values is 0, We are checking here that the partition -- function doesn't treat 0 as null, and that range repartitioning correctly @@ -28,9 +35,9 @@ SELECT worker_range_partition_table(:JobId, :Range_TaskId, :Select_Query_Text, -- Copy partitioned data files into tables for testing purposes -COPY :Range_Table_Part_00 FROM 'base/pgsql_job_cache/job_201010/task_101106/p_00000'; -COPY :Range_Table_Part_01 FROM 'base/pgsql_job_cache/job_201010/task_101106/p_00001'; -COPY :Range_Table_Part_02 FROM 'base/pgsql_job_cache/job_201010/task_101106/p_00002'; +COPY :Range_Table_Part_00 FROM :'Range_Table_File_00'; +COPY :Range_Table_Part_01 FROM :'Range_Table_File_01'; +COPY :Range_Table_Part_02 FROM :'Range_Table_File_02'; SELECT COUNT(*) FROM :Range_Table_Part_00; SELECT COUNT(*) FROM :Range_Table_Part_02; @@ -76,15 +83,20 @@ SELECT COUNT(*) AS diff_rhs_02 FROM ( \set Hash_Table_Part_01 supplier_hash_part_01 \set Hash_Table_Part_02 supplier_hash_part_02 +\set File_Basedir base/pgsql_job_cache +\set Hash_Table_File_00 :File_Basedir/job_:JobId/task_:Hash_TaskId/p_00000.:userid +\set Hash_Table_File_01 :File_Basedir/job_:JobId/task_:Hash_TaskId/p_00001.:userid +\set Hash_Table_File_02 :File_Basedir/job_:JobId/task_:Hash_TaskId/p_00002.:userid + -- Run select query, and apply hash partitioning on query results SELECT worker_hash_partition_table(:JobId, :Hash_TaskId, :Select_Query_Text, :Partition_Column_Text, :Partition_Column_Type, ARRAY[-2147483648, -1073741824, 0, 1073741824]::int4[]); -COPY :Hash_Table_Part_00 FROM 'base/pgsql_job_cache/job_201010/task_101107/p_00000'; -COPY :Hash_Table_Part_01 FROM 'base/pgsql_job_cache/job_201010/task_101107/p_00001'; -COPY :Hash_Table_Part_02 FROM 'base/pgsql_job_cache/job_201010/task_101107/p_00002'; +COPY :Hash_Table_Part_00 FROM :'Hash_Table_File_00'; +COPY :Hash_Table_Part_01 FROM :'Hash_Table_File_01'; +COPY :Hash_Table_Part_02 FROM :'Hash_Table_File_02'; SELECT COUNT(*) FROM :Hash_Table_Part_00; SELECT COUNT(*) FROM :Hash_Table_Part_02; diff --git a/src/test/regress/sql/worker_range_partition.sql b/src/test/regress/sql/worker_range_partition.sql index 32d55bbda..cb225f2d5 100644 --- a/src/test/regress/sql/worker_range_partition.sql +++ b/src/test/regress/sql/worker_range_partition.sql @@ -18,11 +18,13 @@ \set Table_Part_02 lineitem_range_part_02 \set Table_Part_03 lineitem_range_part_03 +SELECT usesysid AS userid FROM pg_user WHERE usename = current_user \gset + \set File_Basedir base/pgsql_job_cache -\set Table_File_00 :File_Basedir/job_:JobId/task_:TaskId/p_00000 -\set Table_File_01 :File_Basedir/job_:JobId/task_:TaskId/p_00001 -\set Table_File_02 :File_Basedir/job_:JobId/task_:TaskId/p_00002 -\set Table_File_03 :File_Basedir/job_:JobId/task_:TaskId/p_00003 +\set Table_File_00 :File_Basedir/job_:JobId/task_:TaskId/p_00000.:userid +\set Table_File_01 :File_Basedir/job_:JobId/task_:TaskId/p_00001.:userid +\set Table_File_02 :File_Basedir/job_:JobId/task_:TaskId/p_00002.:userid +\set Table_File_03 :File_Basedir/job_:JobId/task_:TaskId/p_00003.:userid -- Run select query, and apply range partitioning on query results diff --git a/src/test/regress/sql/worker_range_partition_complex.sql b/src/test/regress/sql/worker_range_partition_complex.sql index a9ff2fccd..fd23f5989 100644 --- a/src/test/regress/sql/worker_range_partition_complex.sql +++ b/src/test/regress/sql/worker_range_partition_complex.sql @@ -18,6 +18,14 @@ \set Table_Part_02 lineitem_range_complex_part_02 \set Table_Part_03 lineitem_range_complex_part_03 +SELECT usesysid AS userid FROM pg_user WHERE usename = current_user \gset + +\set File_Basedir base/pgsql_job_cache +\set Table_File_00 :File_Basedir/job_:JobId/task_:TaskId/p_00000.:userid +\set Table_File_01 :File_Basedir/job_:JobId/task_:TaskId/p_00001.:userid +\set Table_File_02 :File_Basedir/job_:JobId/task_:TaskId/p_00002.:userid +\set Table_File_03 :File_Basedir/job_:JobId/task_:TaskId/p_00003.:userid + -- Run hardcoded complex select query, and apply range partitioning on query -- results @@ -31,10 +39,10 @@ SELECT worker_range_partition_table(:JobId, :TaskId, -- Copy partitioned data files into tables for testing purposes -COPY :Table_Part_00 FROM 'base/pgsql_job_cache/job_201010/task_101102/p_00000'; -COPY :Table_Part_01 FROM 'base/pgsql_job_cache/job_201010/task_101102/p_00001'; -COPY :Table_Part_02 FROM 'base/pgsql_job_cache/job_201010/task_101102/p_00002'; -COPY :Table_Part_03 FROM 'base/pgsql_job_cache/job_201010/task_101102/p_00003'; +COPY :Table_Part_00 FROM :'Table_File_00'; +COPY :Table_Part_01 FROM :'Table_File_01'; +COPY :Table_Part_02 FROM :'Table_File_02'; +COPY :Table_Part_03 FROM :'Table_File_03'; SELECT COUNT(*) FROM :Table_Part_00; SELECT COUNT(*) FROM :Table_Part_03;