From caf402d506318ac9c28fee527d10055c205a4aac Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Thu, 22 Nov 2018 01:26:31 +0100 Subject: [PATCH 1/4] COPY to a task file no longer switches to superuser --- Makefile | 1 + src/backend/distributed/commands/multi_copy.c | 95 +++--- .../distributed/commands/utility_hook.c | 18 +- .../distributed/executor/multi_executor.c | 14 +- .../worker/worker_data_fetch_protocol.c | 12 + .../worker/worker_sql_task_protocol.c | 280 ++++++++++++++++++ src/include/distributed/commands/multi_copy.h | 3 +- src/include/distributed/multi_executor.h | 1 + src/include/distributed/worker_protocol.h | 2 + 9 files changed, 361 insertions(+), 65 deletions(-) create mode 100644 src/backend/distributed/worker/worker_sql_task_protocol.c diff --git a/Makefile b/Makefile index b3bcb59f9..e0a2b108d 100644 --- a/Makefile +++ b/Makefile @@ -128,6 +128,7 @@ OBJS = src/backend/distributed/shared_library_init.o \ src/backend/distributed/worker/worker_file_access_protocol.o \ src/backend/distributed/worker/worker_merge_protocol.o \ src/backend/distributed/worker/worker_partition_protocol.o \ + src/backend/distributed/worker/worker_sql_task_protocol.o \ src/backend/distributed/worker/worker_truncate_trigger_protocol.o \ $(WIN32RES) diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index e50441f7d..8887513cc 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -140,9 +140,9 @@ static FmgrInfo * TypeOutputFunctions(uint32 columnCount, Oid *typeIdArray, bool binaryFormat); static Datum CoerceColumnValue(Datum inputValue, CopyCoercionData *coercionPath); static void CreateLocalTable(RangeVar *relation, char *nodeName, int32 nodePort); -static void CheckCopyPermissions(CopyStmt *copyStatement); static List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist); static bool IsCopyResultStmt(CopyStmt *copyStatement); +static bool CopyStatementHasFormat(CopyStmt *copyStatement, char *formatName); static bool IsCopyFromWorker(CopyStmt *copyStatement); static NodeAddress * MasterNodeAddress(CopyStmt *copyStatement); static void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag); @@ -2445,9 +2445,20 @@ CitusCopyDestReceiverDestroy(DestReceiver *destReceiver) */ static bool IsCopyResultStmt(CopyStmt *copyStatement) +{ + return CopyStatementHasFormat(copyStatement, "result"); +} + + +/* + * CopyStatementHasFormat checks whether the COPY statement has the given + * format. + */ +static bool +CopyStatementHasFormat(CopyStmt *copyStatement, char *formatName) { ListCell *optionCell = NULL; - bool hasFormatReceive = false; + bool hasFormat = false; /* extract WITH (...) options from the COPY statement */ foreach(optionCell, copyStatement->options) @@ -2455,14 +2466,14 @@ IsCopyResultStmt(CopyStmt *copyStatement) DefElem *defel = (DefElem *) lfirst(optionCell); if (strncmp(defel->defname, "format", NAMEDATALEN) == 0 && - strncmp(defGetString(defel), "result", NAMEDATALEN) == 0) + strncmp(defGetString(defel), formatName, NAMEDATALEN) == 0) { - hasFormatReceive = true; + hasFormat = true; break; } } - return hasFormatReceive; + return hasFormat; } @@ -2471,18 +2482,10 @@ IsCopyResultStmt(CopyStmt *copyStatement) * COPYing from distributed tables and preventing unsupported actions. The * function returns a modified COPY statement to be executed, or NULL if no * further processing is needed. - * - * commandMustRunAsOwner is an output parameter used to communicate to the caller whether - * the copy statement should be executed using elevated privileges. If - * ProcessCopyStmt that is required, a call to CheckCopyPermissions will take - * care of verifying the current user's permissions before ProcessCopyStmt - * returns. */ Node * -ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustRunAsOwner) +ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, const char *queryString) { - *commandMustRunAsOwner = false; /* make sure variable is initialized */ - /* * Handle special COPY "resultid" FROM STDIN WITH (format result) commands * for sending intermediate results to workers. @@ -2591,43 +2594,43 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustR if (copyStatement->filename != NULL && !copyStatement->is_program) { - const char *filename = copyStatement->filename; + char *filename = copyStatement->filename; - if (CacheDirectoryElement(filename)) + /* + * We execute COPY commands issued by the task-tracker executor here + * because we're not normally allowed to write to a file as a regular + * user and we don't want to execute the query as superuser. + */ + if (CacheDirectoryElement(filename) && copyStatement->query != NULL && + !copyStatement->is_from && !is_absolute_path(filename)) { - /* - * Only superusers are allowed to copy from a file, so we have to - * become superuser to execute copies to/from files used by citus' - * query execution. - * - * XXX: This is a decidedly suboptimal solution, as that means - * that triggers, input functions, etc. run with elevated - * privileges. But this is better than not being able to run - * queries as normal user. - */ - *commandMustRunAsOwner = true; + bool binaryCopyFormat = CopyStatementHasFormat(copyStatement, "binary"); + int64 tuplesSent = 0; + Query *query = NULL; + Node *queryNode = copyStatement->query; + List *queryTreeList = NIL; - /* - * Have to manually check permissions here as the COPY is will be - * run as a superuser. - */ - if (copyStatement->relation != NULL) +#if (PG_VERSION_NUM >= 100000) + RawStmt *rawStmt = makeNode(RawStmt); + rawStmt->stmt = queryNode; + + queryTreeList = pg_analyze_and_rewrite(rawStmt, queryString, NULL, 0, NULL); +#else + queryTreeList = pg_analyze_and_rewrite(queryNode, queryString, NULL, 0); +#endif + + if (list_length(queryTreeList) != 1) { - CheckCopyPermissions(copyStatement); + ereport(ERROR, (errmsg("can only execute a single query"))); } - /* - * Check if we have a "COPY (query) TO filename". If we do, copy - * doesn't accept relative file paths. However, SQL tasks that get - * assigned to worker nodes have relative paths. We therefore - * convert relative paths to absolute ones here. - */ - if (copyStatement->relation == NULL && - !copyStatement->is_from && - !is_absolute_path(filename)) - { - copyStatement->filename = make_absolute_path(filename); - } + query = (Query *) linitial(queryTreeList); + tuplesSent = WorkerExecuteSqlTask(query, filename, binaryCopyFormat); + + snprintf(completionTag, COMPLETION_TAG_BUFSIZE, + "COPY " UINT64_FORMAT, tuplesSent); + + return NULL; } } @@ -2724,7 +2727,7 @@ CreateLocalTable(RangeVar *relation, char *nodeName, int32 nodePort) * * Copied from postgres, where it's part of DoCopy(). */ -static void +void CheckCopyPermissions(CopyStmt *copyStatement) { /* *INDENT-OFF* */ diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index b0d52383a..b247e4952 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -128,9 +128,6 @@ multi_ProcessUtility(PlannedStmt *pstmt, char *completionTag) { Node *parsetree = pstmt->utilityStmt; - bool commandMustRunAsOwner = false; - Oid savedUserId = InvalidOid; - int savedSecurityContext = 0; List *ddlJobs = NIL; bool checkExtensionVersion = false; @@ -248,8 +245,7 @@ multi_ProcessUtility(PlannedStmt *pstmt, MemoryContext previousContext; parsetree = copyObject(parsetree); - parsetree = ProcessCopyStmt((CopyStmt *) parsetree, completionTag, - &commandMustRunAsOwner); + parsetree = ProcessCopyStmt((CopyStmt *) parsetree, completionTag, queryString); previousContext = MemoryContextSwitchTo(planContext); parsetree = copyObject(parsetree); @@ -450,13 +446,6 @@ multi_ProcessUtility(PlannedStmt *pstmt, } } - /* set user if needed and go ahead and run local utility using standard hook */ - if (commandMustRunAsOwner) - { - GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); - SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); - } - #if (PG_VERSION_NUM >= 100000) pstmt->utilityStmt = parsetree; standard_ProcessUtility(pstmt, queryString, context, @@ -495,11 +484,6 @@ multi_ProcessUtility(PlannedStmt *pstmt, PostProcessUtility(parsetree); } - if (commandMustRunAsOwner) - { - SetUserIdAndSecContext(savedUserId, savedSecurityContext); - } - /* * Re-forming the foreign key graph relies on the command being executed * on the local table first. However, in order to decide whether the diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 73d2d88e3..172cf4598 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -337,6 +337,18 @@ StubRelation(TupleDesc tupleDescriptor) void ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamListInfo params, DestReceiver *dest) +{ + Query *query = ParseQueryString(queryString); + + ExecuteQueryIntoDestReceiver(query, params, dest); +} + + +/* + * ParseQuery parses query string and returns a Query struct. + */ +Query * +ParseQueryString(const char *queryString) { Query *query = NULL; @@ -355,7 +367,7 @@ ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamListInfo params query = (Query *) linitial(queryTreeList); - ExecuteQueryIntoDestReceiver(query, params, dest); + return query; } diff --git a/src/backend/distributed/worker/worker_data_fetch_protocol.c b/src/backend/distributed/worker/worker_data_fetch_protocol.c index 2ebc56c3e..b90e19f9c 100644 --- a/src/backend/distributed/worker/worker_data_fetch_protocol.c +++ b/src/backend/distributed/worker/worker_data_fetch_protocol.c @@ -32,6 +32,7 @@ #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/multi_client_executor.h" +#include "distributed/commands/multi_copy.h" #include "distributed/multi_logical_optimizer.h" #include "distributed/multi_partitioning_utils.h" #include "distributed/multi_server_executor.h" @@ -766,6 +767,8 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS) StringInfo queryString = NULL; Oid sourceShardRelationId = InvalidOid; Oid sourceSchemaId = InvalidOid; + Oid savedUserId = InvalidOid; + int savedSecurityContext = 0; CheckCitusVersion(ERROR); @@ -829,9 +832,18 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS) appendStringInfo(queryString, COPY_IN_COMMAND, shardQualifiedName, localFilePath->data); + /* make sure we are allowed to execute the COPY command */ + CheckCopyPermissions(localCopyCommand); + + /* need superuser to copy from files */ + GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); + SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); + CitusProcessUtility((Node *) localCopyCommand, queryString->data, PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL); + SetUserIdAndSecContext(savedUserId, savedSecurityContext); + /* finally delete the temporary file we created */ CitusDeleteFile(localFilePath->data); diff --git a/src/backend/distributed/worker/worker_sql_task_protocol.c b/src/backend/distributed/worker/worker_sql_task_protocol.c new file mode 100644 index 000000000..dfbeca81f --- /dev/null +++ b/src/backend/distributed/worker/worker_sql_task_protocol.c @@ -0,0 +1,280 @@ +/*------------------------------------------------------------------------- + * + * worker_sql_task_protocol.c + * + * Routines for executing SQL tasks during task-tracker execution. + * + * Copyright (c) 2012-2018, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "funcapi.h" +#include "pgstat.h" + +#include "distributed/commands/multi_copy.h" +#include "distributed/multi_executor.h" +#include "distributed/transmit.h" +#include "distributed/worker_protocol.h" +#include "utils/builtins.h" +#include "utils/memutils.h" + + +/* TaskFileDestReceiver can be used to stream results into a file */ +typedef struct TaskFileDestReceiver +{ + /* public DestReceiver interface */ + DestReceiver pub; + + /* descriptor of the tuples that are sent to the worker */ + TupleDesc tupleDescriptor; + + /* EState for per-tuple memory allocation */ + EState *executorState; + + /* MemoryContext for DestReceiver session */ + MemoryContext memoryContext; + + /* output file */ + char *filePath; + File fileDesc; + bool binaryCopyFormat; + + /* state on how to copy out data types */ + CopyOutState copyOutState; + FmgrInfo *columnOutputFunctions; + + /* number of tuples sent */ + uint64 tuplesSent; +} TaskFileDestReceiver; + + +static DestReceiver * CreateTaskFileDestReceiver(char *filePath, EState *executorState, + bool binaryCopyFormat); +static void TaskFileDestReceiverStartup(DestReceiver *dest, int operation, + TupleDesc inputTupleDescriptor); +static bool TaskFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest); +static void WriteToLocalFile(StringInfo copyData, File fileDesc); +static void TaskFileDestReceiverShutdown(DestReceiver *destReceiver); +static void TaskFileDestReceiverDestroy(DestReceiver *destReceiver); + + +/* + * WorkerExecuteSqlTask executes an already-parsed query and writes the result + * to the given task file. + */ +int64 +WorkerExecuteSqlTask(Query *query, char *taskFilename, bool binaryCopyFormat) +{ + EState *estate = NULL; + TaskFileDestReceiver *taskFileDest = NULL; + ParamListInfo paramListInfo = NULL; + int64 tuplesSent = 0L; + + estate = CreateExecutorState(); + taskFileDest = + (TaskFileDestReceiver *) CreateTaskFileDestReceiver(taskFilename, estate, + binaryCopyFormat); + + ExecuteQueryIntoDestReceiver(query, paramListInfo, (DestReceiver *) taskFileDest); + + tuplesSent = taskFileDest->tuplesSent; + + taskFileDest->pub.rDestroy((DestReceiver *) taskFileDest); + FreeExecutorState(estate); + + return tuplesSent; +} + + +/* + * CreateTaskFileDestReceiver creates a DestReceiver for writing query results + * to a task file. + */ +static DestReceiver * +CreateTaskFileDestReceiver(char *filePath, EState *executorState, bool binaryCopyFormat) +{ + TaskFileDestReceiver *taskFileDest = NULL; + + taskFileDest = (TaskFileDestReceiver *) palloc0(sizeof(TaskFileDestReceiver)); + + /* set up the DestReceiver function pointers */ + taskFileDest->pub.receiveSlot = TaskFileDestReceiverReceive; + taskFileDest->pub.rStartup = TaskFileDestReceiverStartup; + taskFileDest->pub.rShutdown = TaskFileDestReceiverShutdown; + taskFileDest->pub.rDestroy = TaskFileDestReceiverDestroy; + taskFileDest->pub.mydest = DestCopyOut; + + /* set up output parameters */ + taskFileDest->executorState = executorState; + taskFileDest->memoryContext = CurrentMemoryContext; + taskFileDest->filePath = pstrdup(filePath); + taskFileDest->binaryCopyFormat = binaryCopyFormat; + + return (DestReceiver *) taskFileDest; +} + + +/* + * TaskFileDestReceiverStartup implements the rStartup interface of + * TaskFileDestReceiver. It opens the destination file and sets up + * the CopyOutState. + */ +static void +TaskFileDestReceiverStartup(DestReceiver *dest, int operation, + TupleDesc inputTupleDescriptor) +{ + TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) dest; + + CopyOutState copyOutState = NULL; + const char *delimiterCharacter = "\t"; + const char *nullPrintCharacter = "\\N"; + + const int fileFlags = (O_APPEND | O_CREAT | O_RDWR | O_TRUNC | PG_BINARY); + const int fileMode = (S_IRUSR | S_IWUSR); + + /* use the memory context that was in place when the DestReceiver was created */ + MemoryContext oldContext = MemoryContextSwitchTo(taskFileDest->memoryContext); + + taskFileDest->tupleDescriptor = inputTupleDescriptor; + + /* define how tuples will be serialised */ + copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData)); + copyOutState->delim = (char *) delimiterCharacter; + copyOutState->null_print = (char *) nullPrintCharacter; + copyOutState->null_print_client = (char *) nullPrintCharacter; + copyOutState->binary = taskFileDest->binaryCopyFormat; + copyOutState->fe_msgbuf = makeStringInfo(); + copyOutState->rowcontext = GetPerTupleMemoryContext(taskFileDest->executorState); + taskFileDest->copyOutState = copyOutState; + + taskFileDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor, + copyOutState->binary); + + taskFileDest->fileDesc = FileOpenForTransmit(taskFileDest->filePath, fileFlags, + fileMode); + + if (copyOutState->binary) + { + /* write headers when using binary encoding */ + resetStringInfo(copyOutState->fe_msgbuf); + AppendCopyBinaryHeaders(copyOutState); + + WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest->fileDesc); + } + + MemoryContextSwitchTo(oldContext); +} + + +/* + * TaskFileDestReceiverReceive implements the receiveSlot function of + * TaskFileDestReceiver. It takes a TupleTableSlot and writes the contents + * to a local file. + */ +static bool +TaskFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) +{ + TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) dest; + + TupleDesc tupleDescriptor = taskFileDest->tupleDescriptor; + + CopyOutState copyOutState = taskFileDest->copyOutState; + FmgrInfo *columnOutputFunctions = taskFileDest->columnOutputFunctions; + + Datum *columnValues = NULL; + bool *columnNulls = NULL; + StringInfo copyData = copyOutState->fe_msgbuf; + + EState *executorState = taskFileDest->executorState; + MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState); + MemoryContext oldContext = MemoryContextSwitchTo(executorTupleContext); + + slot_getallattrs(slot); + + columnValues = slot->tts_values; + columnNulls = slot->tts_isnull; + + resetStringInfo(copyData); + + /* construct row in COPY format */ + AppendCopyRowData(columnValues, columnNulls, tupleDescriptor, + copyOutState, columnOutputFunctions, NULL); + + WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest->fileDesc); + + MemoryContextSwitchTo(oldContext); + + taskFileDest->tuplesSent++; + + ResetPerTupleExprContext(executorState); + + return true; +} + + +/* + * WriteToLocalResultsFile writes the bytes in a StringInfo to a local file. + */ +static void +WriteToLocalFile(StringInfo copyData, File fileDesc) +{ +#if (PG_VERSION_NUM >= 100000) + int bytesWritten = FileWrite(fileDesc, copyData->data, copyData->len, PG_WAIT_IO); +#else + int bytesWritten = FileWrite(fileDesc, copyData->data, copyData->len); +#endif + if (bytesWritten < 0) + { + ereport(ERROR, (errcode_for_file_access(), + errmsg("could not append to file: %m"))); + } +} + + +/* + * TaskFileDestReceiverShutdown implements the rShutdown interface of + * TaskFileDestReceiver. It writes the footer and closes the file. + * the relation. + */ +static void +TaskFileDestReceiverShutdown(DestReceiver *destReceiver) +{ + TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) destReceiver; + CopyOutState copyOutState = taskFileDest->copyOutState; + + if (copyOutState->binary) + { + /* write footers when using binary encoding */ + resetStringInfo(copyOutState->fe_msgbuf); + AppendCopyBinaryFooters(copyOutState); + WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest->fileDesc); + } + + FileClose(taskFileDest->fileDesc); +} + + +/* + * TaskFileDestReceiverDestroy frees memory allocated as part of the + * TaskFileDestReceiver and closes file descriptors. + */ +static void +TaskFileDestReceiverDestroy(DestReceiver *destReceiver) +{ + TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) destReceiver; + + if (taskFileDest->copyOutState) + { + pfree(taskFileDest->copyOutState); + } + + if (taskFileDest->columnOutputFunctions) + { + pfree(taskFileDest->columnOutputFunctions); + } + + pfree(taskFileDest->filePath); + pfree(taskFileDest); +} diff --git a/src/include/distributed/commands/multi_copy.h b/src/include/distributed/commands/multi_copy.h index c4d7a8f4b..6ecc63500 100644 --- a/src/include/distributed/commands/multi_copy.h +++ b/src/include/distributed/commands/multi_copy.h @@ -129,7 +129,8 @@ extern void AppendCopyBinaryHeaders(CopyOutState headerOutputState); extern void AppendCopyBinaryFooters(CopyOutState footerOutputState); extern void EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailure); extern Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, - bool *commandMustRunAsOwner); + const char *queryString); +extern void CheckCopyPermissions(CopyStmt *copyStatement); #endif /* MULTI_COPY_H */ diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index 6c798eb26..ad1aca23e 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -36,6 +36,7 @@ extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState); extern void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob); extern void ReadFileIntoTupleStore(char *fileName, char *copyFormat, TupleDesc tupleDescriptor, Tuplestorestate *tupstore); +extern Query * ParseQueryString(const char *queryString); extern void ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamListInfo params, DestReceiver *dest); diff --git a/src/include/distributed/worker_protocol.h b/src/include/distributed/worker_protocol.h index 30bef0f68..b92119b7f 100644 --- a/src/include/distributed/worker_protocol.h +++ b/src/include/distributed/worker_protocol.h @@ -122,6 +122,8 @@ extern FmgrInfo * GetFunctionInfo(Oid typeId, Oid accessMethodId, int16 procedur extern uint64 ExtractShardIdFromTableName(const char *tableName, bool missingOk); extern List * TableDDLCommandList(const char *nodeName, uint32 nodePort, const char *tableName); +extern int64 WorkerExecuteSqlTask(Query *query, char *taskFilename, + bool binaryCopyFormat); /* Function declarations shared with the master planner */ From e3521ce32004eb3e867ef23939ab3968b1436907 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Thu, 22 Nov 2018 01:11:11 +0100 Subject: [PATCH 2/4] Test current user in task-tracker queries --- src/test/regress/expected/multi_multiuser.out | 18 +++++++++--------- .../regress/expected/multi_multiuser_0.out | 18 +++++++++--------- src/test/regress/sql/multi_multiuser.sql | 6 +++--- 3 files changed, 21 insertions(+), 21 deletions(-) diff --git a/src/test/regress/expected/multi_multiuser.out b/src/test/regress/expected/multi_multiuser.out index 5d04ddfa1..a3a381966 100644 --- a/src/test/regress/expected/multi_multiuser.out +++ b/src/test/regress/expected/multi_multiuser.out @@ -108,10 +108,10 @@ SELECT count(*) FROM test WHERE id = 1; (1 row) SET citus.task_executor_type TO 'task-tracker'; -SELECT count(*) FROM test; - count -------- - 2 +SELECT count(*), min(current_user) FROM test; + count | min +-------+------------- + 2 | full_access (1 row) -- test re-partition query (needs to transmit intermediate results) @@ -155,10 +155,10 @@ SELECT count(*) FROM test WHERE id = 1; (1 row) SET citus.task_executor_type TO 'task-tracker'; -SELECT count(*) FROM test; - count -------- - 2 +SELECT count(*), min(current_user) FROM test; + count | min +-------+------------- + 2 | read_access (1 row) -- test re-partition query (needs to transmit intermediate results) @@ -197,7 +197,7 @@ ERROR: permission denied for table test SELECT count(*) FROM test WHERE id = 1; ERROR: permission denied for table test SET citus.task_executor_type TO 'task-tracker'; -SELECT count(*) FROM test; +SELECT count(*), min(current_user) FROM test; ERROR: permission denied for table test -- test re-partition query SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2; diff --git a/src/test/regress/expected/multi_multiuser_0.out b/src/test/regress/expected/multi_multiuser_0.out index 3088baa74..adbac9b1d 100644 --- a/src/test/regress/expected/multi_multiuser_0.out +++ b/src/test/regress/expected/multi_multiuser_0.out @@ -108,10 +108,10 @@ SELECT count(*) FROM test WHERE id = 1; (1 row) SET citus.task_executor_type TO 'task-tracker'; -SELECT count(*) FROM test; - count -------- - 2 +SELECT count(*), min(current_user) FROM test; + count | min +-------+------------- + 2 | full_access (1 row) -- test re-partition query (needs to transmit intermediate results) @@ -155,10 +155,10 @@ SELECT count(*) FROM test WHERE id = 1; (1 row) SET citus.task_executor_type TO 'task-tracker'; -SELECT count(*) FROM test; - count -------- - 2 +SELECT count(*), min(current_user) FROM test; + count | min +-------+------------- + 2 | read_access (1 row) -- test re-partition query (needs to transmit intermediate results) @@ -197,7 +197,7 @@ ERROR: permission denied for relation test SELECT count(*) FROM test WHERE id = 1; ERROR: permission denied for relation test SET citus.task_executor_type TO 'task-tracker'; -SELECT count(*) FROM test; +SELECT count(*), min(current_user) FROM test; ERROR: permission denied for relation test -- test re-partition query SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2; diff --git a/src/test/regress/sql/multi_multiuser.sql b/src/test/regress/sql/multi_multiuser.sql index 6b9529c33..ca072106d 100644 --- a/src/test/regress/sql/multi_multiuser.sql +++ b/src/test/regress/sql/multi_multiuser.sql @@ -77,7 +77,7 @@ SELECT count(*) FROM test; SELECT count(*) FROM test WHERE id = 1; SET citus.task_executor_type TO 'task-tracker'; -SELECT count(*) FROM test; +SELECT count(*), min(current_user) FROM test; -- test re-partition query (needs to transmit intermediate results) SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2; @@ -101,7 +101,7 @@ SELECT count(*) FROM test; SELECT count(*) FROM test WHERE id = 1; SET citus.task_executor_type TO 'task-tracker'; -SELECT count(*) FROM test; +SELECT count(*), min(current_user) FROM test; -- test re-partition query (needs to transmit intermediate results) SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2; @@ -128,7 +128,7 @@ SELECT count(*) FROM test; SELECT count(*) FROM test WHERE id = 1; SET citus.task_executor_type TO 'task-tracker'; -SELECT count(*) FROM test; +SELECT count(*), min(current_user) FROM test; -- test re-partition query SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2; From 30bad7e66f9118964c5133a9be5ccc684db5dfc4 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Thu, 22 Nov 2018 01:08:47 +0100 Subject: [PATCH 3/4] Add worker_execute_sql_task UDF --- citus.control | 2 +- src/backend/distributed/Makefile | 4 ++- .../distributed/citus--8.0-9--8.0-10.sql | 11 +++++++ src/backend/distributed/citus.control | 2 +- .../worker/worker_sql_task_protocol.c | 31 +++++++++++++++++++ src/test/regress/expected/multi_extension.out | 2 ++ src/test/regress/sql/multi_extension.sql | 2 ++ 7 files changed, 51 insertions(+), 3 deletions(-) create mode 100644 src/backend/distributed/citus--8.0-9--8.0-10.sql diff --git a/citus.control b/citus.control index 86f89c819..7dc8c29f6 100644 --- a/citus.control +++ b/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '8.0-8' +default_version = '8.0-10' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index 764065655..b8db6fc77 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -17,7 +17,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 7.3-1 7.3-2 7.3-3 \ 7.4-1 7.4-2 7.4-3 \ 7.5-1 7.5-2 7.5-3 7.5-4 7.5-5 7.5-6 7.5-7 \ - 8.0-1 8.0-2 8.0-3 8.0-4 8.0-5 8.0-6 8.0-7 8.0-8 8.0-9 + 8.0-1 8.0-2 8.0-3 8.0-4 8.0-5 8.0-6 8.0-7 8.0-8 8.0-9 8.0-10 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -233,6 +233,8 @@ $(EXTENSION)--8.0-8.sql: $(EXTENSION)--8.0-7.sql $(EXTENSION)--8.0-7--8.0-8.sql cat $^ > $@ $(EXTENSION)--8.0-9.sql: $(EXTENSION)--8.0-8.sql $(EXTENSION)--8.0-8--8.0-9.sql cat $^ > $@ +$(EXTENSION)--8.0-10.sql: $(EXTENSION)--8.0-9.sql $(EXTENSION)--8.0-9--8.0-10.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--8.0-9--8.0-10.sql b/src/backend/distributed/citus--8.0-9--8.0-10.sql new file mode 100644 index 000000000..cd13b6fee --- /dev/null +++ b/src/backend/distributed/citus--8.0-9--8.0-10.sql @@ -0,0 +1,11 @@ +/* citus--8.0-9--8.0-10 */ +SET search_path = 'pg_catalog'; + +CREATE FUNCTION worker_execute_sql_task(jobid bigint, taskid integer, query text, binary bool) +RETURNS bigint +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$worker_execute_sql_task$$; +COMMENT ON FUNCTION worker_execute_sql_task(bigint, integer, text, bool) +IS 'execute a query and write the results to a task file'; + +RESET search_path; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index 12f9faa74..7dc8c29f6 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '8.0-9' +default_version = '8.0-10' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/worker/worker_sql_task_protocol.c b/src/backend/distributed/worker/worker_sql_task_protocol.c index dfbeca81f..1648a0c2c 100644 --- a/src/backend/distributed/worker/worker_sql_task_protocol.c +++ b/src/backend/distributed/worker/worker_sql_task_protocol.c @@ -60,6 +60,37 @@ static void TaskFileDestReceiverShutdown(DestReceiver *destReceiver); static void TaskFileDestReceiverDestroy(DestReceiver *destReceiver); +/* exports for SQL callable functions */ +PG_FUNCTION_INFO_V1(worker_execute_sql_task); + + +/* + * worker_execute_sql_task executes a query and writes the results to + * a file according to the usual task naming scheme. + */ +Datum +worker_execute_sql_task(PG_FUNCTION_ARGS) +{ + uint64 jobId = PG_GETARG_INT64(0); + uint32 taskId = PG_GETARG_UINT32(1); + text *queryText = PG_GETARG_TEXT_P(2); + char *queryString = text_to_cstring(queryText); + bool binaryCopyFormat = PG_GETARG_BOOL(3); + + int64 tuplesSent = 0; + Query *query = NULL; + + /* job directory is created prior to scheduling the task */ + StringInfo jobDirectoryName = JobDirectoryName(jobId); + StringInfo taskFilename = TaskFilename(jobDirectoryName, taskId); + + query = ParseQueryString(queryString); + tuplesSent = WorkerExecuteSqlTask(query, taskFilename->data, binaryCopyFormat); + + PG_RETURN_INT64(tuplesSent); +} + + /* * WorkerExecuteSqlTask executes an already-parsed query and writes the result * to the given task file. diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index b191afe2e..c1dfc0c9c 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -151,6 +151,8 @@ ALTER EXTENSION citus UPDATE TO '8.0-5'; ALTER EXTENSION citus UPDATE TO '8.0-6'; ALTER EXTENSION citus UPDATE TO '8.0-7'; ALTER EXTENSION citus UPDATE TO '8.0-8'; +ALTER EXTENSION citus UPDATE TO '8.0-9'; +ALTER EXTENSION citus UPDATE TO '8.0-10'; -- show running version SHOW citus.version; citus.version diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index 284c435ad..4c2724570 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -151,6 +151,8 @@ ALTER EXTENSION citus UPDATE TO '8.0-5'; ALTER EXTENSION citus UPDATE TO '8.0-6'; ALTER EXTENSION citus UPDATE TO '8.0-7'; ALTER EXTENSION citus UPDATE TO '8.0-8'; +ALTER EXTENSION citus UPDATE TO '8.0-9'; +ALTER EXTENSION citus UPDATE TO '8.0-10'; -- show running version SHOW citus.version; From a59bf31c7632a28e6d7106c1c2eb082df19445e0 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Thu, 22 Nov 2018 01:10:37 +0100 Subject: [PATCH 4/4] Use worker_execute_sql_task UDF in task-tracker executor --- .../executor/multi_task_tracker_executor.c | 22 +++++++++---------- .../distributed/multi_server_executor.h | 6 +++-- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index 9bf24f6b3..724359b57 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -1622,28 +1622,28 @@ TrackerQueueSqlTask(TaskTracker *taskTracker, Task *task) StringInfo taskAssignmentQuery = NULL; /* - * We first wrap a copy out command around the original query string. This - * allows for the query's results to persist on the worker node after the - * query completes and for the executor to later use this persisted data. + * We first wrap the original query string in a worker_execute_sql_task + * call. This allows for the query's results to persist on the worker node + * after the query completes and for the executor to later fetch this + * persisted data using COPY ... (format 'transmit') */ - StringInfo jobDirectoryName = JobDirectoryName(task->jobId); - StringInfo taskFilename = TaskFilename(jobDirectoryName, task->taskId); - StringInfo copyQueryString = makeStringInfo(); + StringInfo sqlTaskQueryString = makeStringInfo(); + char *escapedTaskQueryString = quote_literal_cstr(task->queryString); if (BinaryMasterCopyFormat) { - appendStringInfo(copyQueryString, COPY_QUERY_TO_FILE_BINARY, - task->queryString, taskFilename->data); + appendStringInfo(sqlTaskQueryString, EXECUTE_SQL_TASK_TO_FILE_BINARY, + task->jobId, task->taskId, escapedTaskQueryString); } else { - appendStringInfo(copyQueryString, COPY_QUERY_TO_FILE_TEXT, - task->queryString, taskFilename->data); + appendStringInfo(sqlTaskQueryString, EXECUTE_SQL_TASK_TO_FILE_TEXT, + task->jobId, task->taskId, escapedTaskQueryString); } /* wrap a task assignment query outside the copy out query */ - taskAssignmentQuery = TaskAssignmentQuery(task, copyQueryString->data); + taskAssignmentQuery = TaskAssignmentQuery(task, sqlTaskQueryString->data); taskState = TaskStateHashEnter(taskStateHash, task->jobId, task->taskId); taskState->status = TASK_CLIENT_SIDE_QUEUED; diff --git a/src/include/distributed/multi_server_executor.h b/src/include/distributed/multi_server_executor.h index 496149f4f..70b40aa83 100644 --- a/src/include/distributed/multi_server_executor.h +++ b/src/include/distributed/multi_server_executor.h @@ -26,8 +26,10 @@ /* copy out query results */ #define COPY_QUERY_TO_STDOUT_TEXT "COPY (%s) TO STDOUT" #define COPY_QUERY_TO_STDOUT_BINARY "COPY (%s) TO STDOUT WITH (FORMAT binary)" -#define COPY_QUERY_TO_FILE_TEXT "COPY (%s) TO '%s'" -#define COPY_QUERY_TO_FILE_BINARY "COPY (%s) TO '%s' WITH (FORMAT binary)" +#define EXECUTE_SQL_TASK_TO_FILE_BINARY \ + "SELECT worker_execute_sql_task("UINT64_FORMAT ", %u, %s, true)" +#define EXECUTE_SQL_TASK_TO_FILE_TEXT \ + "SELECT worker_execute_sql_task("UINT64_FORMAT ", %u, %s, false)" /* Task tracker executor related defines */ #define TASK_ASSIGNMENT_QUERY "SELECT task_tracker_assign_task \