mirror of https://github.com/citusdata/citus.git
Merge pull request #2489 from citusdata/task_tracker_suffix
Add user ID suffix to intermediate files in task-trackerpull/2492/head
commit
711eef611f
|
@ -2609,6 +2609,7 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, const char *queryS
|
||||||
Query *query = NULL;
|
Query *query = NULL;
|
||||||
Node *queryNode = copyStatement->query;
|
Node *queryNode = copyStatement->query;
|
||||||
List *queryTreeList = NIL;
|
List *queryTreeList = NIL;
|
||||||
|
StringInfo userFilePath = makeStringInfo();
|
||||||
|
|
||||||
#if (PG_VERSION_NUM >= 100000)
|
#if (PG_VERSION_NUM >= 100000)
|
||||||
RawStmt *rawStmt = makeNode(RawStmt);
|
RawStmt *rawStmt = makeNode(RawStmt);
|
||||||
|
@ -2625,6 +2626,14 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, const char *queryS
|
||||||
}
|
}
|
||||||
|
|
||||||
query = (Query *) linitial(queryTreeList);
|
query = (Query *) linitial(queryTreeList);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Add a user ID suffix to prevent other users from reading/writing
|
||||||
|
* the same file. We do this consistently in all functions that interact
|
||||||
|
* with task files.
|
||||||
|
*/
|
||||||
|
appendStringInfo(userFilePath, "%s.%u", filename, GetUserId());
|
||||||
|
|
||||||
tuplesSent = WorkerExecuteSqlTask(query, filename, binaryCopyFormat);
|
tuplesSent = WorkerExecuteSqlTask(query, filename, binaryCopyFormat);
|
||||||
|
|
||||||
snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
|
snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
|
||||||
|
|
|
@ -359,6 +359,32 @@ IsTransmitStmt(Node *parsetree)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* TransmitStatementUser extracts the user attribute from a
|
||||||
|
* COPY ... (format 'transmit', user '...') statement.
|
||||||
|
*/
|
||||||
|
char *
|
||||||
|
TransmitStatementUser(CopyStmt *copyStatement)
|
||||||
|
{
|
||||||
|
ListCell *optionCell = NULL;
|
||||||
|
char *userName = NULL;
|
||||||
|
|
||||||
|
AssertArg(IsTransmitStmt((Node *) copyStatement));
|
||||||
|
|
||||||
|
foreach(optionCell, copyStatement->options)
|
||||||
|
{
|
||||||
|
DefElem *defel = (DefElem *) lfirst(optionCell);
|
||||||
|
|
||||||
|
if (strncmp(defel->defname, "user", NAMEDATALEN) == 0)
|
||||||
|
{
|
||||||
|
userName = defGetString(defel);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return userName;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* VerifyTransmitStmt checks that the passed in command is a valid transmit
|
* VerifyTransmitStmt checks that the passed in command is a valid transmit
|
||||||
* statement. Raise ERROR if not.
|
* statement. Raise ERROR if not.
|
||||||
|
|
|
@ -222,17 +222,28 @@ multi_ProcessUtility(PlannedStmt *pstmt,
|
||||||
if (IsTransmitStmt(parsetree))
|
if (IsTransmitStmt(parsetree))
|
||||||
{
|
{
|
||||||
CopyStmt *copyStatement = (CopyStmt *) parsetree;
|
CopyStmt *copyStatement = (CopyStmt *) parsetree;
|
||||||
|
char *userName = TransmitStatementUser(copyStatement);
|
||||||
|
bool missingOK = false;
|
||||||
|
StringInfo transmitPath = makeStringInfo();
|
||||||
|
|
||||||
VerifyTransmitStmt(copyStatement);
|
VerifyTransmitStmt(copyStatement);
|
||||||
|
|
||||||
/* ->relation->relname is the target file in our overloaded COPY */
|
/* ->relation->relname is the target file in our overloaded COPY */
|
||||||
|
appendStringInfoString(transmitPath, copyStatement->relation->relname);
|
||||||
|
|
||||||
|
if (userName != NULL)
|
||||||
|
{
|
||||||
|
Oid userId = get_role_oid(userName, missingOK);
|
||||||
|
appendStringInfo(transmitPath, ".%d", userId);
|
||||||
|
}
|
||||||
|
|
||||||
if (copyStatement->is_from)
|
if (copyStatement->is_from)
|
||||||
{
|
{
|
||||||
RedirectCopyDataToRegularFile(copyStatement->relation->relname);
|
RedirectCopyDataToRegularFile(transmitPath->data);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
SendRegularFile(copyStatement->relation->relname);
|
SendRegularFile(transmitPath->data);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Don't execute the faux copy statement */
|
/* Don't execute the faux copy statement */
|
||||||
|
|
|
@ -2590,9 +2590,11 @@ ManageTransmitTracker(TaskTracker *transmitTracker)
|
||||||
int32 connectionId = transmitTracker->connectionId;
|
int32 connectionId = transmitTracker->connectionId;
|
||||||
StringInfo jobDirectoryName = JobDirectoryName(transmitState->jobId);
|
StringInfo jobDirectoryName = JobDirectoryName(transmitState->jobId);
|
||||||
StringInfo taskFilename = TaskFilename(jobDirectoryName, transmitState->taskId);
|
StringInfo taskFilename = TaskFilename(jobDirectoryName, transmitState->taskId);
|
||||||
|
char *userName = CurrentUserName();
|
||||||
|
|
||||||
StringInfo fileTransmitQuery = makeStringInfo();
|
StringInfo fileTransmitQuery = makeStringInfo();
|
||||||
appendStringInfo(fileTransmitQuery, TRANSMIT_REGULAR_COMMAND, taskFilename->data);
|
appendStringInfo(fileTransmitQuery, TRANSMIT_WITH_USER_COMMAND,
|
||||||
|
taskFilename->data, quote_literal_cstr(userName));
|
||||||
|
|
||||||
fileTransmitStarted = MultiClientSendQuery(connectionId, fileTransmitQuery->data);
|
fileTransmitStarted = MultiClientSendQuery(connectionId, fileTransmitQuery->data);
|
||||||
if (fileTransmitStarted)
|
if (fileTransmitStarted)
|
||||||
|
|
|
@ -108,7 +108,7 @@ worker_fetch_partition_file(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
/* local filename is <jobId>/<upstreamTaskId>/<partitionTaskId> */
|
/* local filename is <jobId>/<upstreamTaskId>/<partitionTaskId> */
|
||||||
StringInfo taskDirectoryName = TaskDirectoryName(jobId, upstreamTaskId);
|
StringInfo taskDirectoryName = TaskDirectoryName(jobId, upstreamTaskId);
|
||||||
StringInfo taskFilename = TaskFilename(taskDirectoryName, partitionTaskId);
|
StringInfo taskFilename = UserTaskFilename(taskDirectoryName, partitionTaskId);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If we are the first function to fetch a file for the upstream task, the
|
* If we are the first function to fetch a file for the upstream task, the
|
||||||
|
@ -154,7 +154,7 @@ worker_fetch_query_results_file(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
/* local filename is <jobId>/<upstreamTaskId>/<queryTaskId> */
|
/* local filename is <jobId>/<upstreamTaskId>/<queryTaskId> */
|
||||||
StringInfo taskDirectoryName = TaskDirectoryName(jobId, upstreamTaskId);
|
StringInfo taskDirectoryName = TaskDirectoryName(jobId, upstreamTaskId);
|
||||||
StringInfo taskFilename = TaskFilename(taskDirectoryName, queryTaskId);
|
StringInfo taskFilename = UserTaskFilename(taskDirectoryName, queryTaskId);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If we are the first function to fetch a file for the upstream task, the
|
* If we are the first function to fetch a file for the upstream task, the
|
||||||
|
@ -191,6 +191,21 @@ TaskFilename(StringInfo directoryName, uint32 taskId)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* UserTaskFilename returns a full file path for a task file including the
|
||||||
|
* current user ID as a suffix.
|
||||||
|
*/
|
||||||
|
StringInfo
|
||||||
|
UserTaskFilename(StringInfo directoryName, uint32 taskId)
|
||||||
|
{
|
||||||
|
StringInfo taskFilename = TaskFilename(directoryName, taskId);
|
||||||
|
|
||||||
|
appendStringInfo(taskFilename, ".%u", GetUserId());
|
||||||
|
|
||||||
|
return taskFilename;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* FetchRegularFileAsSuperUser copies a file from a remote node in an idempotent
|
* FetchRegularFileAsSuperUser copies a file from a remote node in an idempotent
|
||||||
* manner. It connects to the remote node as superuser to give file access.
|
* manner. It connects to the remote node as superuser to give file access.
|
||||||
|
@ -203,6 +218,7 @@ FetchRegularFileAsSuperUser(const char *nodeName, uint32 nodePort,
|
||||||
char *nodeUser = NULL;
|
char *nodeUser = NULL;
|
||||||
StringInfo attemptFilename = NULL;
|
StringInfo attemptFilename = NULL;
|
||||||
StringInfo transmitCommand = NULL;
|
StringInfo transmitCommand = NULL;
|
||||||
|
char *userName = CurrentUserName();
|
||||||
uint32 randomId = (uint32) random();
|
uint32 randomId = (uint32) random();
|
||||||
bool received = false;
|
bool received = false;
|
||||||
int renamed = 0;
|
int renamed = 0;
|
||||||
|
@ -217,7 +233,8 @@ FetchRegularFileAsSuperUser(const char *nodeName, uint32 nodePort,
|
||||||
MIN_TASK_FILENAME_WIDTH, randomId, ATTEMPT_FILE_SUFFIX);
|
MIN_TASK_FILENAME_WIDTH, randomId, ATTEMPT_FILE_SUFFIX);
|
||||||
|
|
||||||
transmitCommand = makeStringInfo();
|
transmitCommand = makeStringInfo();
|
||||||
appendStringInfo(transmitCommand, TRANSMIT_REGULAR_COMMAND, remoteFilename->data);
|
appendStringInfo(transmitCommand, TRANSMIT_WITH_USER_COMMAND, remoteFilename->data,
|
||||||
|
quote_literal_cstr(userName));
|
||||||
|
|
||||||
/* connect as superuser to give file access */
|
/* connect as superuser to give file access */
|
||||||
nodeUser = CitusExtensionOwnerName();
|
nodeUser = CitusExtensionOwnerName();
|
||||||
|
|
|
@ -23,6 +23,7 @@
|
||||||
#include "catalog/pg_namespace.h"
|
#include "catalog/pg_namespace.h"
|
||||||
#include "commands/copy.h"
|
#include "commands/copy.h"
|
||||||
#include "commands/tablecmds.h"
|
#include "commands/tablecmds.h"
|
||||||
|
#include "common/string.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/worker_protocol.h"
|
#include "distributed/worker_protocol.h"
|
||||||
#include "distributed/version_compat.h"
|
#include "distributed/version_compat.h"
|
||||||
|
@ -42,7 +43,7 @@ static List * ArrayObjectToCStringList(ArrayType *arrayObject);
|
||||||
static void CreateTaskTable(StringInfo schemaName, StringInfo relationName,
|
static void CreateTaskTable(StringInfo schemaName, StringInfo relationName,
|
||||||
List *columnNameList, List *columnTypeList);
|
List *columnNameList, List *columnTypeList);
|
||||||
static void CopyTaskFilesFromDirectory(StringInfo schemaName, StringInfo relationName,
|
static void CopyTaskFilesFromDirectory(StringInfo schemaName, StringInfo relationName,
|
||||||
StringInfo sourceDirectoryName);
|
StringInfo sourceDirectoryName, Oid userId);
|
||||||
|
|
||||||
|
|
||||||
/* exports for SQL callable functions */
|
/* exports for SQL callable functions */
|
||||||
|
@ -78,6 +79,7 @@ worker_merge_files_into_table(PG_FUNCTION_ARGS)
|
||||||
List *columnTypeList = NIL;
|
List *columnTypeList = NIL;
|
||||||
Oid savedUserId = InvalidOid;
|
Oid savedUserId = InvalidOid;
|
||||||
int savedSecurityContext = 0;
|
int savedSecurityContext = 0;
|
||||||
|
Oid userId = GetUserId();
|
||||||
|
|
||||||
/* we should have the same number of column names and types */
|
/* we should have the same number of column names and types */
|
||||||
int32 columnNameCount = ArrayObjectCount(columnNameObject);
|
int32 columnNameCount = ArrayObjectCount(columnNameObject);
|
||||||
|
@ -112,7 +114,8 @@ worker_merge_files_into_table(PG_FUNCTION_ARGS)
|
||||||
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
|
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
|
||||||
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
|
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
|
||||||
|
|
||||||
CopyTaskFilesFromDirectory(jobSchemaName, taskTableName, taskDirectoryName);
|
CopyTaskFilesFromDirectory(jobSchemaName, taskTableName, taskDirectoryName,
|
||||||
|
userId);
|
||||||
|
|
||||||
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
|
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
|
||||||
|
|
||||||
|
@ -155,6 +158,7 @@ worker_merge_files_and_run_query(PG_FUNCTION_ARGS)
|
||||||
int createMergeTableResult = 0;
|
int createMergeTableResult = 0;
|
||||||
int createIntermediateTableResult = 0;
|
int createIntermediateTableResult = 0;
|
||||||
int finished = 0;
|
int finished = 0;
|
||||||
|
Oid userId = GetUserId();
|
||||||
|
|
||||||
CheckCitusVersion(ERROR);
|
CheckCitusVersion(ERROR);
|
||||||
|
|
||||||
|
@ -196,7 +200,8 @@ worker_merge_files_and_run_query(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
appendStringInfo(mergeTableName, "%s%s", intermediateTableName->data,
|
appendStringInfo(mergeTableName, "%s%s", intermediateTableName->data,
|
||||||
MERGE_TABLE_SUFFIX);
|
MERGE_TABLE_SUFFIX);
|
||||||
CopyTaskFilesFromDirectory(jobSchemaName, mergeTableName, taskDirectoryName);
|
CopyTaskFilesFromDirectory(jobSchemaName, mergeTableName, taskDirectoryName,
|
||||||
|
userId);
|
||||||
|
|
||||||
createIntermediateTableResult = SPI_exec(createIntermediateTableQuery, 0);
|
createIntermediateTableResult = SPI_exec(createIntermediateTableQuery, 0);
|
||||||
if (createIntermediateTableResult < 0)
|
if (createIntermediateTableResult < 0)
|
||||||
|
@ -482,14 +487,20 @@ CreateStatement(RangeVar *relation, List *columnDefinitionList)
|
||||||
* CopyTaskFilesFromDirectory finds all files in the given directory, except for
|
* CopyTaskFilesFromDirectory finds all files in the given directory, except for
|
||||||
* those having an attempt suffix. The function then copies these files into the
|
* those having an attempt suffix. The function then copies these files into the
|
||||||
* database table identified by the given schema and table name.
|
* database table identified by the given schema and table name.
|
||||||
|
*
|
||||||
|
* The function makes sure all files were generated by the current user by checking
|
||||||
|
* whether the filename ends with the username, since this is added to local file
|
||||||
|
* names by functions such as worker_fetch_partition-file. Files that were generated
|
||||||
|
* by other users calling worker_fetch_partition_file directly are skipped.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
CopyTaskFilesFromDirectory(StringInfo schemaName, StringInfo relationName,
|
CopyTaskFilesFromDirectory(StringInfo schemaName, StringInfo relationName,
|
||||||
StringInfo sourceDirectoryName)
|
StringInfo sourceDirectoryName, Oid userId)
|
||||||
{
|
{
|
||||||
const char *directoryName = sourceDirectoryName->data;
|
const char *directoryName = sourceDirectoryName->data;
|
||||||
struct dirent *directoryEntry = NULL;
|
struct dirent *directoryEntry = NULL;
|
||||||
uint64 copiedRowTotal = 0;
|
uint64 copiedRowTotal = 0;
|
||||||
|
StringInfo expectedFileSuffix = makeStringInfo();
|
||||||
|
|
||||||
DIR *directory = AllocateDir(directoryName);
|
DIR *directory = AllocateDir(directoryName);
|
||||||
if (directory == NULL)
|
if (directory == NULL)
|
||||||
|
@ -498,6 +509,8 @@ CopyTaskFilesFromDirectory(StringInfo schemaName, StringInfo relationName,
|
||||||
errmsg("could not open directory \"%s\": %m", directoryName)));
|
errmsg("could not open directory \"%s\": %m", directoryName)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
appendStringInfo(expectedFileSuffix, ".%u", userId);
|
||||||
|
|
||||||
directoryEntry = ReadDir(directory, directoryName);
|
directoryEntry = ReadDir(directory, directoryName);
|
||||||
for (; directoryEntry != NULL; directoryEntry = ReadDir(directory, directoryName))
|
for (; directoryEntry != NULL; directoryEntry = ReadDir(directory, directoryName))
|
||||||
{
|
{
|
||||||
|
@ -516,6 +529,18 @@ CopyTaskFilesFromDirectory(StringInfo schemaName, StringInfo relationName,
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!pg_str_endswith(baseFilename, expectedFileSuffix->data))
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Someone is trying to tamper with our results. We don't throw an error
|
||||||
|
* here because we don't want to allow users to prevent each other from
|
||||||
|
* running queries.
|
||||||
|
*/
|
||||||
|
ereport(WARNING, (errmsg("Task file \"%s\" does not have expected suffix "
|
||||||
|
"\"%s\"", baseFilename, expectedFileSuffix->data)));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
fullFilename = makeStringInfo();
|
fullFilename = makeStringInfo();
|
||||||
appendStringInfo(fullFilename, "%s/%s", directoryName, baseFilename);
|
appendStringInfo(fullFilename, "%s/%s", directoryName, baseFilename);
|
||||||
|
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
|
|
||||||
#include "postgres.h"
|
#include "postgres.h"
|
||||||
#include "funcapi.h"
|
#include "funcapi.h"
|
||||||
|
#include "miscadmin.h"
|
||||||
#include "pgstat.h"
|
#include "pgstat.h"
|
||||||
|
|
||||||
#include <arpa/inet.h>
|
#include <arpa/inet.h>
|
||||||
|
@ -80,6 +81,7 @@ static void OutputBinaryFooters(FileOutputStream *partitionFileArray, uint32 fil
|
||||||
static uint32 RangePartitionId(Datum partitionValue, const void *context);
|
static uint32 RangePartitionId(Datum partitionValue, const void *context);
|
||||||
static uint32 HashPartitionId(Datum partitionValue, const void *context);
|
static uint32 HashPartitionId(Datum partitionValue, const void *context);
|
||||||
static uint32 HashPartitionIdViaDeprecatedAPI(Datum partitionValue, const void *context);
|
static uint32 HashPartitionIdViaDeprecatedAPI(Datum partitionValue, const void *context);
|
||||||
|
static StringInfo UserPartitionFilename(StringInfo directoryName, uint32 partitionId);
|
||||||
static bool FileIsLink(char *filename, struct stat filestat);
|
static bool FileIsLink(char *filename, struct stat filestat);
|
||||||
|
|
||||||
|
|
||||||
|
@ -509,7 +511,7 @@ OpenPartitionFiles(StringInfo directoryName, uint32 fileCount)
|
||||||
|
|
||||||
for (fileIndex = 0; fileIndex < fileCount; fileIndex++)
|
for (fileIndex = 0; fileIndex < fileCount; fileIndex++)
|
||||||
{
|
{
|
||||||
StringInfo filePath = PartitionFilename(directoryName, fileIndex);
|
StringInfo filePath = UserPartitionFilename(directoryName, fileIndex);
|
||||||
|
|
||||||
fileDescriptor = PathNameOpenFilePerm(filePath->data, fileFlags, fileMode);
|
fileDescriptor = PathNameOpenFilePerm(filePath->data, fileFlags, fileMode);
|
||||||
if (fileDescriptor < 0)
|
if (fileDescriptor < 0)
|
||||||
|
@ -606,7 +608,14 @@ TaskDirectoryName(uint64 jobId, uint32 taskId)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/* Constructs a standardized partition file path for given directory and id. */
|
/*
|
||||||
|
* PartitionFilename returns a partition file path for given directory and id
|
||||||
|
* which is suitable for use in worker_fetch_partition_file and tranmsit.
|
||||||
|
*
|
||||||
|
* It excludes the user ID part at the end of the filename, since that is
|
||||||
|
* added by worker_fetch_partition_file itself based on the current user.
|
||||||
|
* For the full path use UserPartitionFilename.
|
||||||
|
*/
|
||||||
StringInfo
|
StringInfo
|
||||||
PartitionFilename(StringInfo directoryName, uint32 partitionId)
|
PartitionFilename(StringInfo directoryName, uint32 partitionId)
|
||||||
{
|
{
|
||||||
|
@ -619,6 +628,21 @@ PartitionFilename(StringInfo directoryName, uint32 partitionId)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* UserPartitionFilename returns the path of a partition file for the given
|
||||||
|
* partition ID and the current user.
|
||||||
|
*/
|
||||||
|
static StringInfo
|
||||||
|
UserPartitionFilename(StringInfo directoryName, uint32 partitionId)
|
||||||
|
{
|
||||||
|
StringInfo partitionFilename = PartitionFilename(directoryName, partitionId);
|
||||||
|
|
||||||
|
appendStringInfo(partitionFilename, ".%u", GetUserId());
|
||||||
|
|
||||||
|
return partitionFilename;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* JobDirectoryElement takes in a filename, and checks if this name lives in the
|
* JobDirectoryElement takes in a filename, and checks if this name lives in the
|
||||||
* directory path that is used for task output files. Note that this function's
|
* directory path that is used for task output files. Note that this function's
|
||||||
|
|
|
@ -82,7 +82,7 @@ worker_execute_sql_task(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
/* job directory is created prior to scheduling the task */
|
/* job directory is created prior to scheduling the task */
|
||||||
StringInfo jobDirectoryName = JobDirectoryName(jobId);
|
StringInfo jobDirectoryName = JobDirectoryName(jobId);
|
||||||
StringInfo taskFilename = TaskFilename(jobDirectoryName, taskId);
|
StringInfo taskFilename = UserTaskFilename(jobDirectoryName, taskId);
|
||||||
|
|
||||||
query = ParseQueryString(queryString);
|
query = ParseQueryString(queryString);
|
||||||
tuplesSent = WorkerExecuteSqlTask(query, taskFilename->data, binaryCopyFormat);
|
tuplesSent = WorkerExecuteSqlTask(query, taskFilename->data, binaryCopyFormat);
|
||||||
|
|
|
@ -28,6 +28,7 @@ extern void FreeStringInfo(StringInfo stringInfo);
|
||||||
|
|
||||||
/* Local functions forward declarations for Transmit statement */
|
/* Local functions forward declarations for Transmit statement */
|
||||||
extern bool IsTransmitStmt(Node *parsetree);
|
extern bool IsTransmitStmt(Node *parsetree);
|
||||||
|
extern char * TransmitStatementUser(CopyStmt *copyStatement);
|
||||||
extern void VerifyTransmitStmt(CopyStmt *copyStatement);
|
extern void VerifyTransmitStmt(CopyStmt *copyStatement);
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -44,7 +44,8 @@
|
||||||
|
|
||||||
/* Defines used for fetching files and tables */
|
/* Defines used for fetching files and tables */
|
||||||
/* the tablename in the overloaded COPY statement is the to-be-transferred file */
|
/* the tablename in the overloaded COPY statement is the to-be-transferred file */
|
||||||
#define TRANSMIT_REGULAR_COMMAND "COPY \"%s\" TO STDOUT WITH (format 'transmit')"
|
#define TRANSMIT_WITH_USER_COMMAND \
|
||||||
|
"COPY \"%s\" TO STDOUT WITH (format 'transmit', user %s)"
|
||||||
#define COPY_OUT_COMMAND "COPY %s TO STDOUT"
|
#define COPY_OUT_COMMAND "COPY %s TO STDOUT"
|
||||||
#define COPY_SELECT_ALL_OUT_COMMAND "COPY (SELECT * FROM %s) TO STDOUT"
|
#define COPY_SELECT_ALL_OUT_COMMAND "COPY (SELECT * FROM %s) TO STDOUT"
|
||||||
#define COPY_IN_COMMAND "COPY %s FROM '%s'"
|
#define COPY_IN_COMMAND "COPY %s FROM '%s'"
|
||||||
|
@ -128,6 +129,7 @@ extern int64 WorkerExecuteSqlTask(Query *query, char *taskFilename,
|
||||||
|
|
||||||
/* Function declarations shared with the master planner */
|
/* Function declarations shared with the master planner */
|
||||||
extern StringInfo TaskFilename(StringInfo directoryName, uint32 taskId);
|
extern StringInfo TaskFilename(StringInfo directoryName, uint32 taskId);
|
||||||
|
extern StringInfo UserTaskFilename(StringInfo directoryName, uint32 taskId);
|
||||||
extern List * ExecuteRemoteQuery(const char *nodeName, uint32 nodePort, char *runAsUser,
|
extern List * ExecuteRemoteQuery(const char *nodeName, uint32 nodePort, char *runAsUser,
|
||||||
StringInfo queryString);
|
StringInfo queryString);
|
||||||
extern List * ColumnDefinitionList(List *columnNameList, List *columnTypeList);
|
extern List * ColumnDefinitionList(List *columnNameList, List *columnTypeList);
|
||||||
|
|
|
@ -8,6 +8,11 @@
|
||||||
\set TablePart00 lineitem_partition_task_part_00
|
\set TablePart00 lineitem_partition_task_part_00
|
||||||
\set TablePart01 lineitem_partition_task_part_01
|
\set TablePart01 lineitem_partition_task_part_01
|
||||||
\set TablePart02 lineitem_partition_task_part_02
|
\set TablePart02 lineitem_partition_task_part_02
|
||||||
|
SELECT usesysid AS userid FROM pg_user WHERE usename = current_user \gset
|
||||||
|
\set File_Basedir base/pgsql_job_cache
|
||||||
|
\set Table_File_00 :File_Basedir/job_:JobId/task_:PartitionTaskId/p_00000.:userid
|
||||||
|
\set Table_File_01 :File_Basedir/job_:JobId/task_:PartitionTaskId/p_00001.:userid
|
||||||
|
\set Table_File_02 :File_Basedir/job_:JobId/task_:PartitionTaskId/p_00002.:userid
|
||||||
-- We assign a partition task and wait for it to complete. Note that we hardcode
|
-- We assign a partition task and wait for it to complete. Note that we hardcode
|
||||||
-- the partition function call string, including the job and task identifiers,
|
-- the partition function call string, including the job and task identifiers,
|
||||||
-- into the argument in the task assignment function. This hardcoding is
|
-- into the argument in the task assignment function. This hardcoding is
|
||||||
|
@ -34,9 +39,9 @@ SELECT task_tracker_task_status(:JobId, :PartitionTaskId);
|
||||||
6
|
6
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
COPY :TablePart00 FROM 'base/pgsql_job_cache/job_401010/task_801106/p_00000';
|
COPY :TablePart00 FROM :'Table_File_00';
|
||||||
COPY :TablePart01 FROM 'base/pgsql_job_cache/job_401010/task_801106/p_00001';
|
COPY :TablePart01 FROM :'Table_File_01';
|
||||||
COPY :TablePart02 FROM 'base/pgsql_job_cache/job_401010/task_801106/p_00002';
|
COPY :TablePart02 FROM :'Table_File_02';
|
||||||
SELECT COUNT(*) FROM :TablePart00;
|
SELECT COUNT(*) FROM :TablePart00;
|
||||||
count
|
count
|
||||||
-------
|
-------
|
||||||
|
|
|
@ -12,6 +12,11 @@
|
||||||
\set Table_Part_00 binary_data_table_part_00
|
\set Table_Part_00 binary_data_table_part_00
|
||||||
\set Table_Part_01 binary_data_table_part_01
|
\set Table_Part_01 binary_data_table_part_01
|
||||||
\set Table_Part_02 binary_data_table_part_02
|
\set Table_Part_02 binary_data_table_part_02
|
||||||
|
SELECT usesysid AS userid FROM pg_user WHERE usename = current_user \gset
|
||||||
|
\set File_Basedir base/pgsql_job_cache
|
||||||
|
\set Table_File_00 :File_Basedir/job_:JobId/task_:TaskId/p_00000.:userid
|
||||||
|
\set Table_File_01 :File_Basedir/job_:JobId/task_:TaskId/p_00001.:userid
|
||||||
|
\set Table_File_02 :File_Basedir/job_:JobId/task_:TaskId/p_00002.:userid
|
||||||
-- Create table with special characters
|
-- Create table with special characters
|
||||||
CREATE TABLE :Table_Name(textcolumn text, binarycolumn bytea);
|
CREATE TABLE :Table_Name(textcolumn text, binarycolumn bytea);
|
||||||
COPY :Table_Name FROM stdin;
|
COPY :Table_Name FROM stdin;
|
||||||
|
@ -47,9 +52,9 @@ SELECT worker_range_partition_table(:JobId, :TaskId, :Select_Query_Text,
|
||||||
CREATE TABLE :Table_Part_00 ( LIKE :Table_Name );
|
CREATE TABLE :Table_Part_00 ( LIKE :Table_Name );
|
||||||
CREATE TABLE :Table_Part_01 ( LIKE :Table_Name );
|
CREATE TABLE :Table_Part_01 ( LIKE :Table_Name );
|
||||||
CREATE TABLE :Table_Part_02 ( LIKE :Table_Name );
|
CREATE TABLE :Table_Part_02 ( LIKE :Table_Name );
|
||||||
COPY :Table_Part_00 FROM 'base/pgsql_job_cache/job_201010/task_101105/p_00000';
|
COPY :Table_Part_00 FROM :'Table_File_00';
|
||||||
COPY :Table_Part_01 FROM 'base/pgsql_job_cache/job_201010/task_101105/p_00001';
|
COPY :Table_Part_01 FROM :'Table_File_01';
|
||||||
COPY :Table_Part_02 FROM 'base/pgsql_job_cache/job_201010/task_101105/p_00002';
|
COPY :Table_Part_02 FROM :'Table_File_02';
|
||||||
-- The union of the three partitions should have as many rows as original table
|
-- The union of the three partitions should have as many rows as original table
|
||||||
SELECT COUNT(*) AS total_row_count FROM (
|
SELECT COUNT(*) AS total_row_count FROM (
|
||||||
SELECT * FROM :Table_Part_00 UNION ALL
|
SELECT * FROM :Table_Part_00 UNION ALL
|
||||||
|
|
|
@ -16,6 +16,12 @@
|
||||||
\set Table_Part_01 lineitem_hash_part_01
|
\set Table_Part_01 lineitem_hash_part_01
|
||||||
\set Table_Part_02 lineitem_hash_part_02
|
\set Table_Part_02 lineitem_hash_part_02
|
||||||
\set Table_Part_03 lineitem_hash_part_03
|
\set Table_Part_03 lineitem_hash_part_03
|
||||||
|
SELECT usesysid AS userid FROM pg_user WHERE usename = current_user \gset
|
||||||
|
\set File_Basedir base/pgsql_job_cache
|
||||||
|
\set Table_File_00 :File_Basedir/job_:JobId/task_:TaskId/p_00000.:userid
|
||||||
|
\set Table_File_01 :File_Basedir/job_:JobId/task_:TaskId/p_00001.:userid
|
||||||
|
\set Table_File_02 :File_Basedir/job_:JobId/task_:TaskId/p_00002.:userid
|
||||||
|
\set Table_File_03 :File_Basedir/job_:JobId/task_:TaskId/p_00003.:userid
|
||||||
-- Run select query, and apply hash partitioning on query results
|
-- Run select query, and apply hash partitioning on query results
|
||||||
SELECT worker_hash_partition_table(:JobId, :TaskId, :Select_Query_Text,
|
SELECT worker_hash_partition_table(:JobId, :TaskId, :Select_Query_Text,
|
||||||
:Partition_Column_Text, :Partition_Column_Type::regtype,
|
:Partition_Column_Text, :Partition_Column_Type::regtype,
|
||||||
|
@ -25,10 +31,10 @@ SELECT worker_hash_partition_table(:JobId, :TaskId, :Select_Query_Text,
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
COPY :Table_Part_00 FROM 'base/pgsql_job_cache/job_201010/task_101103/p_00000';
|
COPY :Table_Part_00 FROM :'Table_File_00';
|
||||||
COPY :Table_Part_01 FROM 'base/pgsql_job_cache/job_201010/task_101103/p_00001';
|
COPY :Table_Part_01 FROM :'Table_File_01';
|
||||||
COPY :Table_Part_02 FROM 'base/pgsql_job_cache/job_201010/task_101103/p_00002';
|
COPY :Table_Part_02 FROM :'Table_File_02';
|
||||||
COPY :Table_Part_03 FROM 'base/pgsql_job_cache/job_201010/task_101103/p_00003';
|
COPY :Table_Part_03 FROM :'Table_File_03';
|
||||||
SELECT COUNT(*) FROM :Table_Part_00;
|
SELECT COUNT(*) FROM :Table_Part_00;
|
||||||
count
|
count
|
||||||
-------
|
-------
|
||||||
|
|
|
@ -15,6 +15,12 @@
|
||||||
\set Table_Part_01 lineitem_hash_complex_part_01
|
\set Table_Part_01 lineitem_hash_complex_part_01
|
||||||
\set Table_Part_02 lineitem_hash_complex_part_02
|
\set Table_Part_02 lineitem_hash_complex_part_02
|
||||||
\set Table_Part_03 lineitem_hash_complex_part_03
|
\set Table_Part_03 lineitem_hash_complex_part_03
|
||||||
|
SELECT usesysid AS userid FROM pg_user WHERE usename = current_user \gset
|
||||||
|
\set File_Basedir base/pgsql_job_cache
|
||||||
|
\set Table_File_00 :File_Basedir/job_:JobId/task_:TaskId/p_00000.:userid
|
||||||
|
\set Table_File_01 :File_Basedir/job_:JobId/task_:TaskId/p_00001.:userid
|
||||||
|
\set Table_File_02 :File_Basedir/job_:JobId/task_:TaskId/p_00002.:userid
|
||||||
|
\set Table_File_03 :File_Basedir/job_:JobId/task_:TaskId/p_00003.:userid
|
||||||
-- Run hardcoded complex select query, and apply hash partitioning on query
|
-- Run hardcoded complex select query, and apply hash partitioning on query
|
||||||
-- results
|
-- results
|
||||||
SELECT worker_hash_partition_table(:JobId, :TaskId,
|
SELECT worker_hash_partition_table(:JobId, :TaskId,
|
||||||
|
@ -30,10 +36,10 @@ SELECT worker_hash_partition_table(:JobId, :TaskId,
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- Copy partitioned data files into tables for testing purposes
|
-- Copy partitioned data files into tables for testing purposes
|
||||||
COPY :Table_Part_00 FROM 'base/pgsql_job_cache/job_201010/task_101104/p_00000';
|
COPY :Table_Part_00 FROM :'Table_File_00';
|
||||||
COPY :Table_Part_01 FROM 'base/pgsql_job_cache/job_201010/task_101104/p_00001';
|
COPY :Table_Part_01 FROM :'Table_File_01';
|
||||||
COPY :Table_Part_02 FROM 'base/pgsql_job_cache/job_201010/task_101104/p_00002';
|
COPY :Table_Part_02 FROM :'Table_File_02';
|
||||||
COPY :Table_Part_03 FROM 'base/pgsql_job_cache/job_201010/task_101104/p_00003';
|
COPY :Table_Part_03 FROM :'Table_File_03';
|
||||||
SELECT COUNT(*) FROM :Table_Part_00;
|
SELECT COUNT(*) FROM :Table_Part_00;
|
||||||
count
|
count
|
||||||
-------
|
-------
|
||||||
|
|
|
@ -11,6 +11,11 @@
|
||||||
\set Range_Table_Part_00 supplier_range_part_00
|
\set Range_Table_Part_00 supplier_range_part_00
|
||||||
\set Range_Table_Part_01 supplier_range_part_01
|
\set Range_Table_Part_01 supplier_range_part_01
|
||||||
\set Range_Table_Part_02 supplier_range_part_02
|
\set Range_Table_Part_02 supplier_range_part_02
|
||||||
|
SELECT usesysid AS userid FROM pg_user WHERE usename = current_user \gset
|
||||||
|
\set File_Basedir base/pgsql_job_cache
|
||||||
|
\set Range_Table_File_00 :File_Basedir/job_:JobId/task_:Range_TaskId/p_00000.:userid
|
||||||
|
\set Range_Table_File_01 :File_Basedir/job_:JobId/task_:Range_TaskId/p_00001.:userid
|
||||||
|
\set Range_Table_File_02 :File_Basedir/job_:JobId/task_:Range_TaskId/p_00002.:userid
|
||||||
-- Run select query, and apply range partitioning on query results. Note that
|
-- Run select query, and apply range partitioning on query results. Note that
|
||||||
-- one of the split point values is 0, We are checking here that the partition
|
-- one of the split point values is 0, We are checking here that the partition
|
||||||
-- function doesn't treat 0 as null, and that range repartitioning correctly
|
-- function doesn't treat 0 as null, and that range repartitioning correctly
|
||||||
|
@ -24,9 +29,9 @@ SELECT worker_range_partition_table(:JobId, :Range_TaskId, :Select_Query_Text,
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- Copy partitioned data files into tables for testing purposes
|
-- Copy partitioned data files into tables for testing purposes
|
||||||
COPY :Range_Table_Part_00 FROM 'base/pgsql_job_cache/job_201010/task_101106/p_00000';
|
COPY :Range_Table_Part_00 FROM :'Range_Table_File_00';
|
||||||
COPY :Range_Table_Part_01 FROM 'base/pgsql_job_cache/job_201010/task_101106/p_00001';
|
COPY :Range_Table_Part_01 FROM :'Range_Table_File_01';
|
||||||
COPY :Range_Table_Part_02 FROM 'base/pgsql_job_cache/job_201010/task_101106/p_00002';
|
COPY :Range_Table_Part_02 FROM :'Range_Table_File_02';
|
||||||
SELECT COUNT(*) FROM :Range_Table_Part_00;
|
SELECT COUNT(*) FROM :Range_Table_Part_00;
|
||||||
count
|
count
|
||||||
-------
|
-------
|
||||||
|
@ -103,6 +108,10 @@ SELECT COUNT(*) AS diff_rhs_02 FROM (
|
||||||
\set Hash_Table_Part_00 supplier_hash_part_00
|
\set Hash_Table_Part_00 supplier_hash_part_00
|
||||||
\set Hash_Table_Part_01 supplier_hash_part_01
|
\set Hash_Table_Part_01 supplier_hash_part_01
|
||||||
\set Hash_Table_Part_02 supplier_hash_part_02
|
\set Hash_Table_Part_02 supplier_hash_part_02
|
||||||
|
\set File_Basedir base/pgsql_job_cache
|
||||||
|
\set Hash_Table_File_00 :File_Basedir/job_:JobId/task_:Hash_TaskId/p_00000.:userid
|
||||||
|
\set Hash_Table_File_01 :File_Basedir/job_:JobId/task_:Hash_TaskId/p_00001.:userid
|
||||||
|
\set Hash_Table_File_02 :File_Basedir/job_:JobId/task_:Hash_TaskId/p_00002.:userid
|
||||||
-- Run select query, and apply hash partitioning on query results
|
-- Run select query, and apply hash partitioning on query results
|
||||||
SELECT worker_hash_partition_table(:JobId, :Hash_TaskId, :Select_Query_Text,
|
SELECT worker_hash_partition_table(:JobId, :Hash_TaskId, :Select_Query_Text,
|
||||||
:Partition_Column_Text, :Partition_Column_Type,
|
:Partition_Column_Text, :Partition_Column_Type,
|
||||||
|
@ -112,9 +121,9 @@ SELECT worker_hash_partition_table(:JobId, :Hash_TaskId, :Select_Query_Text,
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
COPY :Hash_Table_Part_00 FROM 'base/pgsql_job_cache/job_201010/task_101107/p_00000';
|
COPY :Hash_Table_Part_00 FROM :'Hash_Table_File_00';
|
||||||
COPY :Hash_Table_Part_01 FROM 'base/pgsql_job_cache/job_201010/task_101107/p_00001';
|
COPY :Hash_Table_Part_01 FROM :'Hash_Table_File_01';
|
||||||
COPY :Hash_Table_Part_02 FROM 'base/pgsql_job_cache/job_201010/task_101107/p_00002';
|
COPY :Hash_Table_Part_02 FROM :'Hash_Table_File_02';
|
||||||
SELECT COUNT(*) FROM :Hash_Table_Part_00;
|
SELECT COUNT(*) FROM :Hash_Table_Part_00;
|
||||||
count
|
count
|
||||||
-------
|
-------
|
||||||
|
|
|
@ -12,11 +12,12 @@
|
||||||
\set Table_Part_01 lineitem_range_part_01
|
\set Table_Part_01 lineitem_range_part_01
|
||||||
\set Table_Part_02 lineitem_range_part_02
|
\set Table_Part_02 lineitem_range_part_02
|
||||||
\set Table_Part_03 lineitem_range_part_03
|
\set Table_Part_03 lineitem_range_part_03
|
||||||
|
SELECT usesysid AS userid FROM pg_user WHERE usename = current_user \gset
|
||||||
\set File_Basedir base/pgsql_job_cache
|
\set File_Basedir base/pgsql_job_cache
|
||||||
\set Table_File_00 :File_Basedir/job_:JobId/task_:TaskId/p_00000
|
\set Table_File_00 :File_Basedir/job_:JobId/task_:TaskId/p_00000.:userid
|
||||||
\set Table_File_01 :File_Basedir/job_:JobId/task_:TaskId/p_00001
|
\set Table_File_01 :File_Basedir/job_:JobId/task_:TaskId/p_00001.:userid
|
||||||
\set Table_File_02 :File_Basedir/job_:JobId/task_:TaskId/p_00002
|
\set Table_File_02 :File_Basedir/job_:JobId/task_:TaskId/p_00002.:userid
|
||||||
\set Table_File_03 :File_Basedir/job_:JobId/task_:TaskId/p_00003
|
\set Table_File_03 :File_Basedir/job_:JobId/task_:TaskId/p_00003.:userid
|
||||||
-- Run select query, and apply range partitioning on query results
|
-- Run select query, and apply range partitioning on query results
|
||||||
SELECT worker_range_partition_table(:JobId, :TaskId, :Select_Query_Text,
|
SELECT worker_range_partition_table(:JobId, :TaskId, :Select_Query_Text,
|
||||||
:Partition_Column_Text, :Partition_Column_Type,
|
:Partition_Column_Text, :Partition_Column_Type,
|
||||||
|
|
|
@ -12,6 +12,12 @@
|
||||||
\set Table_Part_01 lineitem_range_complex_part_01
|
\set Table_Part_01 lineitem_range_complex_part_01
|
||||||
\set Table_Part_02 lineitem_range_complex_part_02
|
\set Table_Part_02 lineitem_range_complex_part_02
|
||||||
\set Table_Part_03 lineitem_range_complex_part_03
|
\set Table_Part_03 lineitem_range_complex_part_03
|
||||||
|
SELECT usesysid AS userid FROM pg_user WHERE usename = current_user \gset
|
||||||
|
\set File_Basedir base/pgsql_job_cache
|
||||||
|
\set Table_File_00 :File_Basedir/job_:JobId/task_:TaskId/p_00000.:userid
|
||||||
|
\set Table_File_01 :File_Basedir/job_:JobId/task_:TaskId/p_00001.:userid
|
||||||
|
\set Table_File_02 :File_Basedir/job_:JobId/task_:TaskId/p_00002.:userid
|
||||||
|
\set Table_File_03 :File_Basedir/job_:JobId/task_:TaskId/p_00003.:userid
|
||||||
-- Run hardcoded complex select query, and apply range partitioning on query
|
-- Run hardcoded complex select query, and apply range partitioning on query
|
||||||
-- results
|
-- results
|
||||||
SELECT worker_range_partition_table(:JobId, :TaskId,
|
SELECT worker_range_partition_table(:JobId, :TaskId,
|
||||||
|
@ -27,10 +33,10 @@ SELECT worker_range_partition_table(:JobId, :TaskId,
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- Copy partitioned data files into tables for testing purposes
|
-- Copy partitioned data files into tables for testing purposes
|
||||||
COPY :Table_Part_00 FROM 'base/pgsql_job_cache/job_201010/task_101102/p_00000';
|
COPY :Table_Part_00 FROM :'Table_File_00';
|
||||||
COPY :Table_Part_01 FROM 'base/pgsql_job_cache/job_201010/task_101102/p_00001';
|
COPY :Table_Part_01 FROM :'Table_File_01';
|
||||||
COPY :Table_Part_02 FROM 'base/pgsql_job_cache/job_201010/task_101102/p_00002';
|
COPY :Table_Part_02 FROM :'Table_File_02';
|
||||||
COPY :Table_Part_03 FROM 'base/pgsql_job_cache/job_201010/task_101102/p_00003';
|
COPY :Table_Part_03 FROM :'Table_File_03';
|
||||||
SELECT COUNT(*) FROM :Table_Part_00;
|
SELECT COUNT(*) FROM :Table_Part_00;
|
||||||
count
|
count
|
||||||
-------
|
-------
|
||||||
|
|
|
@ -14,6 +14,13 @@
|
||||||
\set TablePart01 lineitem_partition_task_part_01
|
\set TablePart01 lineitem_partition_task_part_01
|
||||||
\set TablePart02 lineitem_partition_task_part_02
|
\set TablePart02 lineitem_partition_task_part_02
|
||||||
|
|
||||||
|
SELECT usesysid AS userid FROM pg_user WHERE usename = current_user \gset
|
||||||
|
|
||||||
|
\set File_Basedir base/pgsql_job_cache
|
||||||
|
\set Table_File_00 :File_Basedir/job_:JobId/task_:PartitionTaskId/p_00000.:userid
|
||||||
|
\set Table_File_01 :File_Basedir/job_:JobId/task_:PartitionTaskId/p_00001.:userid
|
||||||
|
\set Table_File_02 :File_Basedir/job_:JobId/task_:PartitionTaskId/p_00002.:userid
|
||||||
|
|
||||||
-- We assign a partition task and wait for it to complete. Note that we hardcode
|
-- We assign a partition task and wait for it to complete. Note that we hardcode
|
||||||
-- the partition function call string, including the job and task identifiers,
|
-- the partition function call string, including the job and task identifiers,
|
||||||
-- into the argument in the task assignment function. This hardcoding is
|
-- into the argument in the task assignment function. This hardcoding is
|
||||||
|
@ -29,9 +36,9 @@ SELECT pg_sleep(4.0);
|
||||||
|
|
||||||
SELECT task_tracker_task_status(:JobId, :PartitionTaskId);
|
SELECT task_tracker_task_status(:JobId, :PartitionTaskId);
|
||||||
|
|
||||||
COPY :TablePart00 FROM 'base/pgsql_job_cache/job_401010/task_801106/p_00000';
|
COPY :TablePart00 FROM :'Table_File_00';
|
||||||
COPY :TablePart01 FROM 'base/pgsql_job_cache/job_401010/task_801106/p_00001';
|
COPY :TablePart01 FROM :'Table_File_01';
|
||||||
COPY :TablePart02 FROM 'base/pgsql_job_cache/job_401010/task_801106/p_00002';
|
COPY :TablePart02 FROM :'Table_File_02';
|
||||||
|
|
||||||
SELECT COUNT(*) FROM :TablePart00;
|
SELECT COUNT(*) FROM :TablePart00;
|
||||||
SELECT COUNT(*) FROM :TablePart02;
|
SELECT COUNT(*) FROM :TablePart02;
|
||||||
|
|
|
@ -18,6 +18,13 @@
|
||||||
\set Table_Part_01 binary_data_table_part_01
|
\set Table_Part_01 binary_data_table_part_01
|
||||||
\set Table_Part_02 binary_data_table_part_02
|
\set Table_Part_02 binary_data_table_part_02
|
||||||
|
|
||||||
|
SELECT usesysid AS userid FROM pg_user WHERE usename = current_user \gset
|
||||||
|
|
||||||
|
\set File_Basedir base/pgsql_job_cache
|
||||||
|
\set Table_File_00 :File_Basedir/job_:JobId/task_:TaskId/p_00000.:userid
|
||||||
|
\set Table_File_01 :File_Basedir/job_:JobId/task_:TaskId/p_00001.:userid
|
||||||
|
\set Table_File_02 :File_Basedir/job_:JobId/task_:TaskId/p_00002.:userid
|
||||||
|
|
||||||
-- Create table with special characters
|
-- Create table with special characters
|
||||||
|
|
||||||
CREATE TABLE :Table_Name(textcolumn text, binarycolumn bytea);
|
CREATE TABLE :Table_Name(textcolumn text, binarycolumn bytea);
|
||||||
|
@ -52,9 +59,9 @@ CREATE TABLE :Table_Part_00 ( LIKE :Table_Name );
|
||||||
CREATE TABLE :Table_Part_01 ( LIKE :Table_Name );
|
CREATE TABLE :Table_Part_01 ( LIKE :Table_Name );
|
||||||
CREATE TABLE :Table_Part_02 ( LIKE :Table_Name );
|
CREATE TABLE :Table_Part_02 ( LIKE :Table_Name );
|
||||||
|
|
||||||
COPY :Table_Part_00 FROM 'base/pgsql_job_cache/job_201010/task_101105/p_00000';
|
COPY :Table_Part_00 FROM :'Table_File_00';
|
||||||
COPY :Table_Part_01 FROM 'base/pgsql_job_cache/job_201010/task_101105/p_00001';
|
COPY :Table_Part_01 FROM :'Table_File_01';
|
||||||
COPY :Table_Part_02 FROM 'base/pgsql_job_cache/job_201010/task_101105/p_00002';
|
COPY :Table_Part_02 FROM :'Table_File_02';
|
||||||
|
|
||||||
-- The union of the three partitions should have as many rows as original table
|
-- The union of the three partitions should have as many rows as original table
|
||||||
|
|
||||||
|
|
|
@ -22,16 +22,24 @@
|
||||||
\set Table_Part_02 lineitem_hash_part_02
|
\set Table_Part_02 lineitem_hash_part_02
|
||||||
\set Table_Part_03 lineitem_hash_part_03
|
\set Table_Part_03 lineitem_hash_part_03
|
||||||
|
|
||||||
|
SELECT usesysid AS userid FROM pg_user WHERE usename = current_user \gset
|
||||||
|
|
||||||
|
\set File_Basedir base/pgsql_job_cache
|
||||||
|
\set Table_File_00 :File_Basedir/job_:JobId/task_:TaskId/p_00000.:userid
|
||||||
|
\set Table_File_01 :File_Basedir/job_:JobId/task_:TaskId/p_00001.:userid
|
||||||
|
\set Table_File_02 :File_Basedir/job_:JobId/task_:TaskId/p_00002.:userid
|
||||||
|
\set Table_File_03 :File_Basedir/job_:JobId/task_:TaskId/p_00003.:userid
|
||||||
|
|
||||||
-- Run select query, and apply hash partitioning on query results
|
-- Run select query, and apply hash partitioning on query results
|
||||||
|
|
||||||
SELECT worker_hash_partition_table(:JobId, :TaskId, :Select_Query_Text,
|
SELECT worker_hash_partition_table(:JobId, :TaskId, :Select_Query_Text,
|
||||||
:Partition_Column_Text, :Partition_Column_Type::regtype,
|
:Partition_Column_Text, :Partition_Column_Type::regtype,
|
||||||
ARRAY[-2147483648, -1073741824, 0, 1073741824]::int4[]);
|
ARRAY[-2147483648, -1073741824, 0, 1073741824]::int4[]);
|
||||||
|
|
||||||
COPY :Table_Part_00 FROM 'base/pgsql_job_cache/job_201010/task_101103/p_00000';
|
COPY :Table_Part_00 FROM :'Table_File_00';
|
||||||
COPY :Table_Part_01 FROM 'base/pgsql_job_cache/job_201010/task_101103/p_00001';
|
COPY :Table_Part_01 FROM :'Table_File_01';
|
||||||
COPY :Table_Part_02 FROM 'base/pgsql_job_cache/job_201010/task_101103/p_00002';
|
COPY :Table_Part_02 FROM :'Table_File_02';
|
||||||
COPY :Table_Part_03 FROM 'base/pgsql_job_cache/job_201010/task_101103/p_00003';
|
COPY :Table_Part_03 FROM :'Table_File_03';
|
||||||
|
|
||||||
SELECT COUNT(*) FROM :Table_Part_00;
|
SELECT COUNT(*) FROM :Table_Part_00;
|
||||||
SELECT COUNT(*) FROM :Table_Part_01;
|
SELECT COUNT(*) FROM :Table_Part_01;
|
||||||
|
|
|
@ -22,6 +22,14 @@
|
||||||
\set Table_Part_02 lineitem_hash_complex_part_02
|
\set Table_Part_02 lineitem_hash_complex_part_02
|
||||||
\set Table_Part_03 lineitem_hash_complex_part_03
|
\set Table_Part_03 lineitem_hash_complex_part_03
|
||||||
|
|
||||||
|
SELECT usesysid AS userid FROM pg_user WHERE usename = current_user \gset
|
||||||
|
|
||||||
|
\set File_Basedir base/pgsql_job_cache
|
||||||
|
\set Table_File_00 :File_Basedir/job_:JobId/task_:TaskId/p_00000.:userid
|
||||||
|
\set Table_File_01 :File_Basedir/job_:JobId/task_:TaskId/p_00001.:userid
|
||||||
|
\set Table_File_02 :File_Basedir/job_:JobId/task_:TaskId/p_00002.:userid
|
||||||
|
\set Table_File_03 :File_Basedir/job_:JobId/task_:TaskId/p_00003.:userid
|
||||||
|
|
||||||
-- Run hardcoded complex select query, and apply hash partitioning on query
|
-- Run hardcoded complex select query, and apply hash partitioning on query
|
||||||
-- results
|
-- results
|
||||||
|
|
||||||
|
@ -35,10 +43,10 @@ SELECT worker_hash_partition_table(:JobId, :TaskId,
|
||||||
|
|
||||||
-- Copy partitioned data files into tables for testing purposes
|
-- Copy partitioned data files into tables for testing purposes
|
||||||
|
|
||||||
COPY :Table_Part_00 FROM 'base/pgsql_job_cache/job_201010/task_101104/p_00000';
|
COPY :Table_Part_00 FROM :'Table_File_00';
|
||||||
COPY :Table_Part_01 FROM 'base/pgsql_job_cache/job_201010/task_101104/p_00001';
|
COPY :Table_Part_01 FROM :'Table_File_01';
|
||||||
COPY :Table_Part_02 FROM 'base/pgsql_job_cache/job_201010/task_101104/p_00002';
|
COPY :Table_Part_02 FROM :'Table_File_02';
|
||||||
COPY :Table_Part_03 FROM 'base/pgsql_job_cache/job_201010/task_101104/p_00003';
|
COPY :Table_Part_03 FROM :'Table_File_03';
|
||||||
|
|
||||||
SELECT COUNT(*) FROM :Table_Part_00;
|
SELECT COUNT(*) FROM :Table_Part_00;
|
||||||
SELECT COUNT(*) FROM :Table_Part_03;
|
SELECT COUNT(*) FROM :Table_Part_03;
|
||||||
|
|
|
@ -17,6 +17,13 @@
|
||||||
\set Range_Table_Part_01 supplier_range_part_01
|
\set Range_Table_Part_01 supplier_range_part_01
|
||||||
\set Range_Table_Part_02 supplier_range_part_02
|
\set Range_Table_Part_02 supplier_range_part_02
|
||||||
|
|
||||||
|
SELECT usesysid AS userid FROM pg_user WHERE usename = current_user \gset
|
||||||
|
|
||||||
|
\set File_Basedir base/pgsql_job_cache
|
||||||
|
\set Range_Table_File_00 :File_Basedir/job_:JobId/task_:Range_TaskId/p_00000.:userid
|
||||||
|
\set Range_Table_File_01 :File_Basedir/job_:JobId/task_:Range_TaskId/p_00001.:userid
|
||||||
|
\set Range_Table_File_02 :File_Basedir/job_:JobId/task_:Range_TaskId/p_00002.:userid
|
||||||
|
|
||||||
-- Run select query, and apply range partitioning on query results. Note that
|
-- Run select query, and apply range partitioning on query results. Note that
|
||||||
-- one of the split point values is 0, We are checking here that the partition
|
-- one of the split point values is 0, We are checking here that the partition
|
||||||
-- function doesn't treat 0 as null, and that range repartitioning correctly
|
-- function doesn't treat 0 as null, and that range repartitioning correctly
|
||||||
|
@ -28,9 +35,9 @@ SELECT worker_range_partition_table(:JobId, :Range_TaskId, :Select_Query_Text,
|
||||||
|
|
||||||
-- Copy partitioned data files into tables for testing purposes
|
-- Copy partitioned data files into tables for testing purposes
|
||||||
|
|
||||||
COPY :Range_Table_Part_00 FROM 'base/pgsql_job_cache/job_201010/task_101106/p_00000';
|
COPY :Range_Table_Part_00 FROM :'Range_Table_File_00';
|
||||||
COPY :Range_Table_Part_01 FROM 'base/pgsql_job_cache/job_201010/task_101106/p_00001';
|
COPY :Range_Table_Part_01 FROM :'Range_Table_File_01';
|
||||||
COPY :Range_Table_Part_02 FROM 'base/pgsql_job_cache/job_201010/task_101106/p_00002';
|
COPY :Range_Table_Part_02 FROM :'Range_Table_File_02';
|
||||||
|
|
||||||
SELECT COUNT(*) FROM :Range_Table_Part_00;
|
SELECT COUNT(*) FROM :Range_Table_Part_00;
|
||||||
SELECT COUNT(*) FROM :Range_Table_Part_02;
|
SELECT COUNT(*) FROM :Range_Table_Part_02;
|
||||||
|
@ -76,15 +83,20 @@ SELECT COUNT(*) AS diff_rhs_02 FROM (
|
||||||
\set Hash_Table_Part_01 supplier_hash_part_01
|
\set Hash_Table_Part_01 supplier_hash_part_01
|
||||||
\set Hash_Table_Part_02 supplier_hash_part_02
|
\set Hash_Table_Part_02 supplier_hash_part_02
|
||||||
|
|
||||||
|
\set File_Basedir base/pgsql_job_cache
|
||||||
|
\set Hash_Table_File_00 :File_Basedir/job_:JobId/task_:Hash_TaskId/p_00000.:userid
|
||||||
|
\set Hash_Table_File_01 :File_Basedir/job_:JobId/task_:Hash_TaskId/p_00001.:userid
|
||||||
|
\set Hash_Table_File_02 :File_Basedir/job_:JobId/task_:Hash_TaskId/p_00002.:userid
|
||||||
|
|
||||||
-- Run select query, and apply hash partitioning on query results
|
-- Run select query, and apply hash partitioning on query results
|
||||||
|
|
||||||
SELECT worker_hash_partition_table(:JobId, :Hash_TaskId, :Select_Query_Text,
|
SELECT worker_hash_partition_table(:JobId, :Hash_TaskId, :Select_Query_Text,
|
||||||
:Partition_Column_Text, :Partition_Column_Type,
|
:Partition_Column_Text, :Partition_Column_Type,
|
||||||
ARRAY[-2147483648, -1073741824, 0, 1073741824]::int4[]);
|
ARRAY[-2147483648, -1073741824, 0, 1073741824]::int4[]);
|
||||||
|
|
||||||
COPY :Hash_Table_Part_00 FROM 'base/pgsql_job_cache/job_201010/task_101107/p_00000';
|
COPY :Hash_Table_Part_00 FROM :'Hash_Table_File_00';
|
||||||
COPY :Hash_Table_Part_01 FROM 'base/pgsql_job_cache/job_201010/task_101107/p_00001';
|
COPY :Hash_Table_Part_01 FROM :'Hash_Table_File_01';
|
||||||
COPY :Hash_Table_Part_02 FROM 'base/pgsql_job_cache/job_201010/task_101107/p_00002';
|
COPY :Hash_Table_Part_02 FROM :'Hash_Table_File_02';
|
||||||
|
|
||||||
SELECT COUNT(*) FROM :Hash_Table_Part_00;
|
SELECT COUNT(*) FROM :Hash_Table_Part_00;
|
||||||
SELECT COUNT(*) FROM :Hash_Table_Part_02;
|
SELECT COUNT(*) FROM :Hash_Table_Part_02;
|
||||||
|
|
|
@ -18,11 +18,13 @@
|
||||||
\set Table_Part_02 lineitem_range_part_02
|
\set Table_Part_02 lineitem_range_part_02
|
||||||
\set Table_Part_03 lineitem_range_part_03
|
\set Table_Part_03 lineitem_range_part_03
|
||||||
|
|
||||||
|
SELECT usesysid AS userid FROM pg_user WHERE usename = current_user \gset
|
||||||
|
|
||||||
\set File_Basedir base/pgsql_job_cache
|
\set File_Basedir base/pgsql_job_cache
|
||||||
\set Table_File_00 :File_Basedir/job_:JobId/task_:TaskId/p_00000
|
\set Table_File_00 :File_Basedir/job_:JobId/task_:TaskId/p_00000.:userid
|
||||||
\set Table_File_01 :File_Basedir/job_:JobId/task_:TaskId/p_00001
|
\set Table_File_01 :File_Basedir/job_:JobId/task_:TaskId/p_00001.:userid
|
||||||
\set Table_File_02 :File_Basedir/job_:JobId/task_:TaskId/p_00002
|
\set Table_File_02 :File_Basedir/job_:JobId/task_:TaskId/p_00002.:userid
|
||||||
\set Table_File_03 :File_Basedir/job_:JobId/task_:TaskId/p_00003
|
\set Table_File_03 :File_Basedir/job_:JobId/task_:TaskId/p_00003.:userid
|
||||||
|
|
||||||
-- Run select query, and apply range partitioning on query results
|
-- Run select query, and apply range partitioning on query results
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,14 @@
|
||||||
\set Table_Part_02 lineitem_range_complex_part_02
|
\set Table_Part_02 lineitem_range_complex_part_02
|
||||||
\set Table_Part_03 lineitem_range_complex_part_03
|
\set Table_Part_03 lineitem_range_complex_part_03
|
||||||
|
|
||||||
|
SELECT usesysid AS userid FROM pg_user WHERE usename = current_user \gset
|
||||||
|
|
||||||
|
\set File_Basedir base/pgsql_job_cache
|
||||||
|
\set Table_File_00 :File_Basedir/job_:JobId/task_:TaskId/p_00000.:userid
|
||||||
|
\set Table_File_01 :File_Basedir/job_:JobId/task_:TaskId/p_00001.:userid
|
||||||
|
\set Table_File_02 :File_Basedir/job_:JobId/task_:TaskId/p_00002.:userid
|
||||||
|
\set Table_File_03 :File_Basedir/job_:JobId/task_:TaskId/p_00003.:userid
|
||||||
|
|
||||||
-- Run hardcoded complex select query, and apply range partitioning on query
|
-- Run hardcoded complex select query, and apply range partitioning on query
|
||||||
-- results
|
-- results
|
||||||
|
|
||||||
|
@ -31,10 +39,10 @@ SELECT worker_range_partition_table(:JobId, :TaskId,
|
||||||
|
|
||||||
-- Copy partitioned data files into tables for testing purposes
|
-- Copy partitioned data files into tables for testing purposes
|
||||||
|
|
||||||
COPY :Table_Part_00 FROM 'base/pgsql_job_cache/job_201010/task_101102/p_00000';
|
COPY :Table_Part_00 FROM :'Table_File_00';
|
||||||
COPY :Table_Part_01 FROM 'base/pgsql_job_cache/job_201010/task_101102/p_00001';
|
COPY :Table_Part_01 FROM :'Table_File_01';
|
||||||
COPY :Table_Part_02 FROM 'base/pgsql_job_cache/job_201010/task_101102/p_00002';
|
COPY :Table_Part_02 FROM :'Table_File_02';
|
||||||
COPY :Table_Part_03 FROM 'base/pgsql_job_cache/job_201010/task_101102/p_00003';
|
COPY :Table_Part_03 FROM :'Table_File_03';
|
||||||
|
|
||||||
SELECT COUNT(*) FROM :Table_Part_00;
|
SELECT COUNT(*) FROM :Table_Part_00;
|
||||||
SELECT COUNT(*) FROM :Table_Part_03;
|
SELECT COUNT(*) FROM :Table_Part_03;
|
||||||
|
|
Loading…
Reference in New Issue