From ca8a4dc73512f25461e8e5b196ec0fd939340c63 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Thu, 22 Nov 2018 01:26:31 +0100 Subject: [PATCH 1/3] COPY to a task file no longer switches to superuser --- Makefile | 1 + .../distributed/executor/multi_executor.c | 14 +- .../distributed/executor/multi_utility.c | 116 ++++---- .../worker/worker_data_fetch_protocol.c | 12 + .../worker/worker_sql_task_protocol.c | 280 ++++++++++++++++++ src/include/distributed/multi_copy.h | 1 + src/include/distributed/multi_executor.h | 1 + src/include/distributed/worker_protocol.h | 2 + 8 files changed, 361 insertions(+), 66 deletions(-) create mode 100644 src/backend/distributed/worker/worker_sql_task_protocol.c diff --git a/Makefile b/Makefile index 5558cc54a..97e2eede2 100644 --- a/Makefile +++ b/Makefile @@ -116,6 +116,7 @@ OBJS = src/backend/distributed/shared_library_init.o \ src/backend/distributed/worker/worker_file_access_protocol.o \ src/backend/distributed/worker/worker_merge_protocol.o \ src/backend/distributed/worker/worker_partition_protocol.o \ + src/backend/distributed/worker/worker_sql_task_protocol.o \ src/backend/distributed/worker/worker_truncate_trigger_protocol.o \ $(WIN32RES) diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 5735e4422..6415411d5 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -338,6 +338,18 @@ StubRelation(TupleDesc tupleDescriptor) void ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamListInfo params, DestReceiver *dest) +{ + Query *query = ParseQueryString(queryString); + + ExecuteQueryIntoDestReceiver(query, params, dest); +} + + +/* + * ParseQuery parses query string and returns a Query struct. + */ +Query * +ParseQueryString(const char *queryString) { Query *query = NULL; @@ -356,7 +368,7 @@ ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamListInfo params query = (Query *) linitial(queryTreeList); - ExecuteQueryIntoDestReceiver(query, params, dest); + return query; } diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index 12ca7e709..76a0b7e70 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -117,10 +117,11 @@ static bool IsCitusExtensionStmt(Node *parsetree); static bool IsTransmitStmt(Node *parsetree); static void VerifyTransmitStmt(CopyStmt *copyStatement); static bool IsCopyResultStmt(CopyStmt *copyStatement); +static bool CopyStatementHasFormat(CopyStmt *copyStatement, char *formatName); /* Local functions forward declarations for processing distributed table commands */ static Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, - bool *commandMustRunAsOwner); + const char *queryString); static void ProcessCreateTableStmtPartitionOf(CreateStmt *createStatement); static void ProcessAlterTableStmtAttachPartition(AlterTableStmt *alterTableStatement); static List * PlanIndexStmt(IndexStmt *createIndexStatement, @@ -176,7 +177,6 @@ static List * InterShardDDLTaskList(Oid leftRelationId, Oid rightRelationId, const char *commandString); static void RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid oldRelOid, void *arg); -static void CheckCopyPermissions(CopyStmt *copyStatement); static List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist); static void PostProcessUtility(Node *parsetree); static List * CollectGrantTableIdList(GrantStmt *grantStmt); @@ -257,9 +257,6 @@ multi_ProcessUtility(PlannedStmt *pstmt, char *completionTag) { Node *parsetree = pstmt->utilityStmt; - bool commandMustRunAsOwner = false; - Oid savedUserId = InvalidOid; - int savedSecurityContext = 0; List *ddlJobs = NIL; bool checkExtensionVersion = false; @@ -369,7 +366,7 @@ multi_ProcessUtility(PlannedStmt *pstmt, parsetree = copyObject(parsetree); parsetree = ProcessCopyStmt((CopyStmt *) parsetree, completionTag, - &commandMustRunAsOwner); + queryString); previousContext = MemoryContextSwitchTo(planContext); parsetree = copyObject(parsetree); @@ -560,13 +557,6 @@ multi_ProcessUtility(PlannedStmt *pstmt, } } - /* set user if needed and go ahead and run local utility using standard hook */ - if (commandMustRunAsOwner) - { - GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); - SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); - } - #if (PG_VERSION_NUM >= 100000) pstmt->utilityStmt = parsetree; standard_ProcessUtility(pstmt, queryString, context, @@ -605,11 +595,6 @@ multi_ProcessUtility(PlannedStmt *pstmt, PostProcessUtility(parsetree); } - if (commandMustRunAsOwner) - { - SetUserIdAndSecContext(savedUserId, savedSecurityContext); - } - /* * Re-forming the foreign key graph relies on the command being executed * on the local table first. However, in order to decide whether the @@ -971,9 +956,20 @@ VerifyTransmitStmt(CopyStmt *copyStatement) */ static bool IsCopyResultStmt(CopyStmt *copyStatement) +{ + return CopyStatementHasFormat(copyStatement, "result"); +} + + +/* + * CopyStatementHasFormat checks whether the COPY statement has the given + * format. + */ +static bool +CopyStatementHasFormat(CopyStmt *copyStatement, char *formatName) { ListCell *optionCell = NULL; - bool hasFormatReceive = false; + bool hasFormat = false; /* extract WITH (...) options from the COPY statement */ foreach(optionCell, copyStatement->options) @@ -981,14 +977,14 @@ IsCopyResultStmt(CopyStmt *copyStatement) DefElem *defel = (DefElem *) lfirst(optionCell); if (strncmp(defel->defname, "format", NAMEDATALEN) == 0 && - strncmp(defGetString(defel), "result", NAMEDATALEN) == 0) + strncmp(defGetString(defel), formatName, NAMEDATALEN) == 0) { - hasFormatReceive = true; + hasFormat = true; break; } } - return hasFormatReceive; + return hasFormat; } @@ -997,18 +993,10 @@ IsCopyResultStmt(CopyStmt *copyStatement) * COPYing from distributed tables and preventing unsupported actions. The * function returns a modified COPY statement to be executed, or NULL if no * further processing is needed. - * - * commandMustRunAsOwner is an output parameter used to communicate to the caller whether - * the copy statement should be executed using elevated privileges. If - * ProcessCopyStmt that is required, a call to CheckCopyPermissions will take - * care of verifying the current user's permissions before ProcessCopyStmt - * returns. */ static Node * -ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustRunAsOwner) +ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, const char *queryString) { - *commandMustRunAsOwner = false; /* make sure variable is initialized */ - /* * Handle special COPY "resultid" FROM STDIN WITH (format result) commands * for sending intermediate results to workers. @@ -1114,50 +1102,48 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustR } } - if (copyStatement->filename != NULL && !copyStatement->is_program) { - const char *filename = copyStatement->filename; + char *filename = copyStatement->filename; - if (CacheDirectoryElement(filename)) + /* + * We execute COPY commands issued by the task-tracker executor here + * because we're not normally allowed to write to a file as a regular + * user and we don't want to execute the query as superuser. + */ + if (CacheDirectoryElement(filename) && copyStatement->query != NULL && + !copyStatement->is_from && !is_absolute_path(filename)) { - /* - * Only superusers are allowed to copy from a file, so we have to - * become superuser to execute copies to/from files used by citus' - * query execution. - * - * XXX: This is a decidedly suboptimal solution, as that means - * that triggers, input functions, etc. run with elevated - * privileges. But this is better than not being able to run - * queries as normal user. - */ - *commandMustRunAsOwner = true; + bool binaryCopyFormat = CopyStatementHasFormat(copyStatement, "binary"); + int64 tuplesSent = 0; + Query *query = NULL; + Node *queryNode = copyStatement->query; + List *queryTreeList = NIL; - /* - * Have to manually check permissions here as the COPY is will be - * run as a superuser. - */ - if (copyStatement->relation != NULL) +#if (PG_VERSION_NUM >= 100000) + RawStmt *rawStmt = makeNode(RawStmt); + rawStmt->stmt = queryNode; + + queryTreeList = pg_analyze_and_rewrite(rawStmt, queryString, NULL, 0, NULL); +#else + queryTreeList = pg_analyze_and_rewrite(queryNode, queryString, NULL, 0); +#endif + + if (list_length(queryTreeList) != 1) { - CheckCopyPermissions(copyStatement); + ereport(ERROR, (errmsg("can only execute a single query"))); } - /* - * Check if we have a "COPY (query) TO filename". If we do, copy - * doesn't accept relative file paths. However, SQL tasks that get - * assigned to worker nodes have relative paths. We therefore - * convert relative paths to absolute ones here. - */ - if (copyStatement->relation == NULL && - !copyStatement->is_from && - !is_absolute_path(filename)) - { - copyStatement->filename = make_absolute_path(filename); - } + query = (Query *) linitial(queryTreeList); + tuplesSent = WorkerExecuteSqlTask(query, filename, binaryCopyFormat); + + snprintf(completionTag, COMPLETION_TAG_BUFSIZE, + "COPY " UINT64_FORMAT, tuplesSent); + + return NULL; } } - return (Node *) copyStatement; } @@ -3917,7 +3903,7 @@ RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid oldRelOid, voi * * Copied from postgres, where it's part of DoCopy(). */ -static void +void CheckCopyPermissions(CopyStmt *copyStatement) { /* *INDENT-OFF* */ diff --git a/src/backend/distributed/worker/worker_data_fetch_protocol.c b/src/backend/distributed/worker/worker_data_fetch_protocol.c index 0b8415ea1..35b001d2a 100644 --- a/src/backend/distributed/worker/worker_data_fetch_protocol.c +++ b/src/backend/distributed/worker/worker_data_fetch_protocol.c @@ -31,6 +31,7 @@ #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/multi_client_executor.h" +#include "distributed/multi_copy.h" #include "distributed/multi_logical_optimizer.h" #include "distributed/multi_partitioning_utils.h" #include "distributed/multi_server_executor.h" @@ -766,6 +767,8 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS) StringInfo queryString = NULL; Oid sourceShardRelationId = InvalidOid; Oid sourceSchemaId = InvalidOid; + Oid savedUserId = InvalidOid; + int savedSecurityContext = 0; CheckCitusVersion(ERROR); @@ -829,9 +832,18 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS) appendStringInfo(queryString, COPY_IN_COMMAND, shardQualifiedName, localFilePath->data); + /* make sure we are allowed to execute the COPY command */ + CheckCopyPermissions(localCopyCommand); + + /* need superuser to copy from files */ + GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); + SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); + CitusProcessUtility((Node *) localCopyCommand, queryString->data, PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL); + SetUserIdAndSecContext(savedUserId, savedSecurityContext); + /* finally delete the temporary file we created */ CitusDeleteFile(localFilePath->data); diff --git a/src/backend/distributed/worker/worker_sql_task_protocol.c b/src/backend/distributed/worker/worker_sql_task_protocol.c new file mode 100644 index 000000000..de914fbc1 --- /dev/null +++ b/src/backend/distributed/worker/worker_sql_task_protocol.c @@ -0,0 +1,280 @@ +/*------------------------------------------------------------------------- + * + * worker_sql_task_protocol.c + * + * Routines for executing SQL tasks during task-tracker execution. + * + * Copyright (c) 2012-2018, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "funcapi.h" +#include "pgstat.h" + +#include "distributed/multi_copy.h" +#include "distributed/multi_executor.h" +#include "distributed/transmit.h" +#include "distributed/worker_protocol.h" +#include "utils/builtins.h" +#include "utils/memutils.h" + + +/* TaskFileDestReceiver can be used to stream results into a file */ +typedef struct TaskFileDestReceiver +{ + /* public DestReceiver interface */ + DestReceiver pub; + + /* descriptor of the tuples that are sent to the worker */ + TupleDesc tupleDescriptor; + + /* EState for per-tuple memory allocation */ + EState *executorState; + + /* MemoryContext for DestReceiver session */ + MemoryContext memoryContext; + + /* output file */ + char *filePath; + File fileDesc; + bool binaryCopyFormat; + + /* state on how to copy out data types */ + CopyOutState copyOutState; + FmgrInfo *columnOutputFunctions; + + /* number of tuples sent */ + uint64 tuplesSent; +} TaskFileDestReceiver; + + +static DestReceiver * CreateTaskFileDestReceiver(char *filePath, EState *executorState, + bool binaryCopyFormat); +static void TaskFileDestReceiverStartup(DestReceiver *dest, int operation, + TupleDesc inputTupleDescriptor); +static bool TaskFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest); +static void WriteToLocalFile(StringInfo copyData, File fileDesc); +static void TaskFileDestReceiverShutdown(DestReceiver *destReceiver); +static void TaskFileDestReceiverDestroy(DestReceiver *destReceiver); + + +/* + * WorkerExecuteSqlTask executes an already-parsed query and writes the result + * to the given task file. + */ +int64 +WorkerExecuteSqlTask(Query *query, char *taskFilename, bool binaryCopyFormat) +{ + EState *estate = NULL; + TaskFileDestReceiver *taskFileDest = NULL; + ParamListInfo paramListInfo = NULL; + int64 tuplesSent = 0L; + + estate = CreateExecutorState(); + taskFileDest = + (TaskFileDestReceiver *) CreateTaskFileDestReceiver(taskFilename, estate, + binaryCopyFormat); + + ExecuteQueryIntoDestReceiver(query, paramListInfo, (DestReceiver *) taskFileDest); + + tuplesSent = taskFileDest->tuplesSent; + + taskFileDest->pub.rDestroy((DestReceiver *) taskFileDest); + FreeExecutorState(estate); + + return tuplesSent; +} + + +/* + * CreateTaskFileDestReceiver creates a DestReceiver for writing query results + * to a task file. + */ +static DestReceiver * +CreateTaskFileDestReceiver(char *filePath, EState *executorState, bool binaryCopyFormat) +{ + TaskFileDestReceiver *taskFileDest = NULL; + + taskFileDest = (TaskFileDestReceiver *) palloc0(sizeof(TaskFileDestReceiver)); + + /* set up the DestReceiver function pointers */ + taskFileDest->pub.receiveSlot = TaskFileDestReceiverReceive; + taskFileDest->pub.rStartup = TaskFileDestReceiverStartup; + taskFileDest->pub.rShutdown = TaskFileDestReceiverShutdown; + taskFileDest->pub.rDestroy = TaskFileDestReceiverDestroy; + taskFileDest->pub.mydest = DestCopyOut; + + /* set up output parameters */ + taskFileDest->executorState = executorState; + taskFileDest->memoryContext = CurrentMemoryContext; + taskFileDest->filePath = pstrdup(filePath); + taskFileDest->binaryCopyFormat = binaryCopyFormat; + + return (DestReceiver *) taskFileDest; +} + + +/* + * TaskFileDestReceiverStartup implements the rStartup interface of + * TaskFileDestReceiver. It opens the destination file and sets up + * the CopyOutState. + */ +static void +TaskFileDestReceiverStartup(DestReceiver *dest, int operation, + TupleDesc inputTupleDescriptor) +{ + TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) dest; + + CopyOutState copyOutState = NULL; + const char *delimiterCharacter = "\t"; + const char *nullPrintCharacter = "\\N"; + + const int fileFlags = (O_APPEND | O_CREAT | O_RDWR | O_TRUNC | PG_BINARY); + const int fileMode = (S_IRUSR | S_IWUSR); + + /* use the memory context that was in place when the DestReceiver was created */ + MemoryContext oldContext = MemoryContextSwitchTo(taskFileDest->memoryContext); + + taskFileDest->tupleDescriptor = inputTupleDescriptor; + + /* define how tuples will be serialised */ + copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData)); + copyOutState->delim = (char *) delimiterCharacter; + copyOutState->null_print = (char *) nullPrintCharacter; + copyOutState->null_print_client = (char *) nullPrintCharacter; + copyOutState->binary = taskFileDest->binaryCopyFormat; + copyOutState->fe_msgbuf = makeStringInfo(); + copyOutState->rowcontext = GetPerTupleMemoryContext(taskFileDest->executorState); + taskFileDest->copyOutState = copyOutState; + + taskFileDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor, + copyOutState->binary); + + taskFileDest->fileDesc = FileOpenForTransmit(taskFileDest->filePath, fileFlags, + fileMode); + + if (copyOutState->binary) + { + /* write headers when using binary encoding */ + resetStringInfo(copyOutState->fe_msgbuf); + AppendCopyBinaryHeaders(copyOutState); + + WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest->fileDesc); + } + + MemoryContextSwitchTo(oldContext); +} + + +/* + * TaskFileDestReceiverReceive implements the receiveSlot function of + * TaskFileDestReceiver. It takes a TupleTableSlot and writes the contents + * to a local file. + */ +static bool +TaskFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) +{ + TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) dest; + + TupleDesc tupleDescriptor = taskFileDest->tupleDescriptor; + + CopyOutState copyOutState = taskFileDest->copyOutState; + FmgrInfo *columnOutputFunctions = taskFileDest->columnOutputFunctions; + + Datum *columnValues = NULL; + bool *columnNulls = NULL; + StringInfo copyData = copyOutState->fe_msgbuf; + + EState *executorState = taskFileDest->executorState; + MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState); + MemoryContext oldContext = MemoryContextSwitchTo(executorTupleContext); + + slot_getallattrs(slot); + + columnValues = slot->tts_values; + columnNulls = slot->tts_isnull; + + resetStringInfo(copyData); + + /* construct row in COPY format */ + AppendCopyRowData(columnValues, columnNulls, tupleDescriptor, + copyOutState, columnOutputFunctions, NULL); + + WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest->fileDesc); + + MemoryContextSwitchTo(oldContext); + + taskFileDest->tuplesSent++; + + ResetPerTupleExprContext(executorState); + + return true; +} + + +/* + * WriteToLocalResultsFile writes the bytes in a StringInfo to a local file. + */ +static void +WriteToLocalFile(StringInfo copyData, File fileDesc) +{ +#if (PG_VERSION_NUM >= 100000) + int bytesWritten = FileWrite(fileDesc, copyData->data, copyData->len, PG_WAIT_IO); +#else + int bytesWritten = FileWrite(fileDesc, copyData->data, copyData->len); +#endif + if (bytesWritten < 0) + { + ereport(ERROR, (errcode_for_file_access(), + errmsg("could not append to file: %m"))); + } +} + + +/* + * TaskFileDestReceiverShutdown implements the rShutdown interface of + * TaskFileDestReceiver. It writes the footer and closes the file. + * the relation. + */ +static void +TaskFileDestReceiverShutdown(DestReceiver *destReceiver) +{ + TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) destReceiver; + CopyOutState copyOutState = taskFileDest->copyOutState; + + if (copyOutState->binary) + { + /* write footers when using binary encoding */ + resetStringInfo(copyOutState->fe_msgbuf); + AppendCopyBinaryFooters(copyOutState); + WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest->fileDesc); + } + + FileClose(taskFileDest->fileDesc); +} + + +/* + * TaskFileDestReceiverDestroy frees memory allocated as part of the + * TaskFileDestReceiver and closes file descriptors. + */ +static void +TaskFileDestReceiverDestroy(DestReceiver *destReceiver) +{ + TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) destReceiver; + + if (taskFileDest->copyOutState) + { + pfree(taskFileDest->copyOutState); + } + + if (taskFileDest->columnOutputFunctions) + { + pfree(taskFileDest->columnOutputFunctions); + } + + pfree(taskFileDest->filePath); + pfree(taskFileDest); +} diff --git a/src/include/distributed/multi_copy.h b/src/include/distributed/multi_copy.h index deb74fa92..ae4f3c868 100644 --- a/src/include/distributed/multi_copy.h +++ b/src/include/distributed/multi_copy.h @@ -131,6 +131,7 @@ extern void EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailur extern void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag); extern bool IsCopyFromWorker(CopyStmt *copyStatement); extern NodeAddress * MasterNodeAddress(CopyStmt *copyStatement); +extern void CheckCopyPermissions(CopyStmt *copyStatement); #endif /* MULTI_COPY_H */ diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index 6c798eb26..ad1aca23e 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -36,6 +36,7 @@ extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState); extern void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob); extern void ReadFileIntoTupleStore(char *fileName, char *copyFormat, TupleDesc tupleDescriptor, Tuplestorestate *tupstore); +extern Query * ParseQueryString(const char *queryString); extern void ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamListInfo params, DestReceiver *dest); diff --git a/src/include/distributed/worker_protocol.h b/src/include/distributed/worker_protocol.h index 30bef0f68..b92119b7f 100644 --- a/src/include/distributed/worker_protocol.h +++ b/src/include/distributed/worker_protocol.h @@ -122,6 +122,8 @@ extern FmgrInfo * GetFunctionInfo(Oid typeId, Oid accessMethodId, int16 procedur extern uint64 ExtractShardIdFromTableName(const char *tableName, bool missingOk); extern List * TableDDLCommandList(const char *nodeName, uint32 nodePort, const char *tableName); +extern int64 WorkerExecuteSqlTask(Query *query, char *taskFilename, + bool binaryCopyFormat); /* Function declarations shared with the master planner */ From 4392cc2f9cfed135b92637ea1a7fffea88ee4555 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Thu, 22 Nov 2018 01:11:11 +0100 Subject: [PATCH 2/3] Test current user in task-tracker queries --- src/test/regress/expected/multi_multiuser.out | 18 +++++++++--------- .../regress/expected/multi_multiuser_0.out | 18 +++++++++--------- src/test/regress/sql/multi_multiuser.sql | 6 +++--- 3 files changed, 21 insertions(+), 21 deletions(-) diff --git a/src/test/regress/expected/multi_multiuser.out b/src/test/regress/expected/multi_multiuser.out index d040f3e3a..fab62f0a9 100644 --- a/src/test/regress/expected/multi_multiuser.out +++ b/src/test/regress/expected/multi_multiuser.out @@ -93,10 +93,10 @@ SELECT count(*) FROM test WHERE id = 1; (1 row) SET citus.task_executor_type TO 'task-tracker'; -SELECT count(*) FROM test; - count -------- - 2 +SELECT count(*), min(current_user) FROM test; + count | min +-------+------------- + 2 | full_access (1 row) -- test re-partition query (needs to transmit intermediate results) @@ -140,10 +140,10 @@ SELECT count(*) FROM test WHERE id = 1; (1 row) SET citus.task_executor_type TO 'task-tracker'; -SELECT count(*) FROM test; - count -------- - 2 +SELECT count(*), min(current_user) FROM test; + count | min +-------+------------- + 2 | read_access (1 row) -- test re-partition query (needs to transmit intermediate results) @@ -171,7 +171,7 @@ ERROR: permission denied for table test SELECT count(*) FROM test WHERE id = 1; ERROR: permission denied for table test SET citus.task_executor_type TO 'task-tracker'; -SELECT count(*) FROM test; +SELECT count(*), min(current_user) FROM test; ERROR: permission denied for table test -- test re-partition query SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2; diff --git a/src/test/regress/expected/multi_multiuser_0.out b/src/test/regress/expected/multi_multiuser_0.out index 037a31381..77ad642b7 100644 --- a/src/test/regress/expected/multi_multiuser_0.out +++ b/src/test/regress/expected/multi_multiuser_0.out @@ -93,10 +93,10 @@ SELECT count(*) FROM test WHERE id = 1; (1 row) SET citus.task_executor_type TO 'task-tracker'; -SELECT count(*) FROM test; - count -------- - 2 +SELECT count(*), min(current_user) FROM test; + count | min +-------+------------- + 2 | full_access (1 row) -- test re-partition query (needs to transmit intermediate results) @@ -140,10 +140,10 @@ SELECT count(*) FROM test WHERE id = 1; (1 row) SET citus.task_executor_type TO 'task-tracker'; -SELECT count(*) FROM test; - count -------- - 2 +SELECT count(*), min(current_user) FROM test; + count | min +-------+------------- + 2 | read_access (1 row) -- test re-partition query (needs to transmit intermediate results) @@ -171,7 +171,7 @@ ERROR: permission denied for relation test SELECT count(*) FROM test WHERE id = 1; ERROR: permission denied for relation test SET citus.task_executor_type TO 'task-tracker'; -SELECT count(*) FROM test; +SELECT count(*), min(current_user) FROM test; ERROR: permission denied for relation test -- test re-partition query SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2; diff --git a/src/test/regress/sql/multi_multiuser.sql b/src/test/regress/sql/multi_multiuser.sql index 43306f09c..f569ffbde 100644 --- a/src/test/regress/sql/multi_multiuser.sql +++ b/src/test/regress/sql/multi_multiuser.sql @@ -70,7 +70,7 @@ SELECT count(*) FROM test; SELECT count(*) FROM test WHERE id = 1; SET citus.task_executor_type TO 'task-tracker'; -SELECT count(*) FROM test; +SELECT count(*), min(current_user) FROM test; -- test re-partition query (needs to transmit intermediate results) SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2; @@ -94,7 +94,7 @@ SELECT count(*) FROM test; SELECT count(*) FROM test WHERE id = 1; SET citus.task_executor_type TO 'task-tracker'; -SELECT count(*) FROM test; +SELECT count(*), min(current_user) FROM test; -- test re-partition query (needs to transmit intermediate results) SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2; @@ -115,7 +115,7 @@ SELECT count(*) FROM test; SELECT count(*) FROM test WHERE id = 1; SET citus.task_executor_type TO 'task-tracker'; -SELECT count(*) FROM test; +SELECT count(*), min(current_user) FROM test; -- test re-partition query SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2; From 67f058c5f69301c309fc291ec1ef909f224ca679 Mon Sep 17 00:00:00 2001 From: Nils Dijk Date: Tue, 13 Nov 2018 16:10:36 +0100 Subject: [PATCH 3/3] Description: Fix failures of tests on recent postgres builds In recent postgres builds you cannot set client_min_messages to values higher then ERROR, if will silently set it to ERROR if so. During some tests we would set it to fatal to hide random values (eg. pid's of processes) from the test output. This patch will use different tactics for hiding these values. --- src/test/regress/.gitignore | 3 +++ .../regress/expected/failure_multi_dml.out | 23 ++++++++++++++----- .../expected/failure_real_time_select.out | 22 ++++-------------- .../expected/multi_task_string_size.out | 6 ++++- .../regress/expected/multi_test_helpers.out | 10 ++++++++ src/test/regress/sql/failure_multi_dml.sql | 20 +++++++++------- .../regress/sql/failure_real_time_select.sql | 15 ++---------- .../regress/sql/multi_task_string_size.sql | 4 +++- src/test/regress/sql/multi_test_helpers.sql | 11 +++++++++ 9 files changed, 68 insertions(+), 46 deletions(-) diff --git a/src/test/regress/.gitignore b/src/test/regress/.gitignore index 477bd27cc..6e3048cce 100644 --- a/src/test/regress/.gitignore +++ b/src/test/regress/.gitignore @@ -9,3 +9,6 @@ # Regression test output /regression.diffs /regression.out + +# Failure test side effets +/proxy.output diff --git a/src/test/regress/expected/failure_multi_dml.out b/src/test/regress/expected/failure_multi_dml.out index abee6d84e..990d77318 100644 --- a/src/test/regress/expected/failure_multi_dml.out +++ b/src/test/regress/expected/failure_multi_dml.out @@ -122,17 +122,28 @@ SELECT citus.mitmproxy('conn.onQuery(query="^PREPARE TRANSACTION").kill()'); (1 row) --- hide the error message (it has the PID)... +-- this transaction block will be sent to the coordinator as a remote command to hide the +-- error message that is caused during commit. -- we'll test for the txn side-effects to ensure it didn't run -SET client_min_messages TO FATAL; +SELECT master_run_on_worker( + ARRAY['localhost']::text[], + ARRAY[:master_port]::int[], + ARRAY[' BEGIN; DELETE FROM dml_test WHERE id = 1; DELETE FROM dml_test WHERE id = 2; -INSERT INTO dml_test VALUES (5, 'Epsilon'); -UPDATE dml_test SET name = 'alpha' WHERE id = 1; -UPDATE dml_test SET name = 'gamma' WHERE id = 3; +INSERT INTO dml_test VALUES (5, ''Epsilon''); +UPDATE dml_test SET name = ''alpha'' WHERE id = 1; +UPDATE dml_test SET name = ''gamma'' WHERE id = 3; COMMIT; -SET client_min_messages TO DEFAULT; + '], + false +); + master_run_on_worker +--------------------------- + (localhost,57636,t,BEGIN) +(1 row) + SELECT citus.mitmproxy('conn.allow()'); mitmproxy ----------- diff --git a/src/test/regress/expected/failure_real_time_select.out b/src/test/regress/expected/failure_real_time_select.out index da4d77bac..967569af2 100644 --- a/src/test/regress/expected/failure_real_time_select.out +++ b/src/test/regress/expected/failure_real_time_select.out @@ -23,16 +23,6 @@ SELECT create_distributed_table('test_table','id'); -- Populate data to the table INSERT INTO test_table VALUES(1,1,1),(1,2,2),(2,1,1),(2,2,2),(3,1,1),(3,2,2); --- Create a function to make sure that queries returning the same result -CREATE FUNCTION raise_failed_execution(query text) RETURNS void AS $$ -BEGIN - EXECUTE query; - EXCEPTION WHEN OTHERS THEN - IF SQLERRM LIKE 'failed to execute task%' THEN - RAISE 'Task failed to execute'; - END IF; -END; -$$LANGUAGE plpgsql; -- Kill when the first COPY command arrived, since we have a single placement -- it is expected to error out. SET client_min_messages TO ERROR; @@ -42,9 +32,9 @@ SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()'); (1 row) -SELECT raise_failed_execution('SELECT count(*) FROM test_table'); +SELECT public.raise_failed_execution('SELECT count(*) FROM test_table'); ERROR: Task failed to execute -CONTEXT: PL/pgSQL function raise_failed_execution(text) line 6 at RAISE +CONTEXT: PL/pgSQL function public.raise_failed_execution(text) line 6 at RAISE SET client_min_messages TO DEFAULT; -- Kill the connection with a CTE SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()'); @@ -70,12 +60,12 @@ SELECT citus.mitmproxy('conn.onQuery(query="^COPY").after(1).kill()'); (1 row) -SELECT raise_failed_execution('WITH +SELECT public.raise_failed_execution('WITH results AS (SELECT * FROM test_table) SELECT * FROM test_table, results WHERE test_table.id = results.id'); ERROR: Task failed to execute -CONTEXT: PL/pgSQL function raise_failed_execution(text) line 6 at RAISE +CONTEXT: PL/pgSQL function public.raise_failed_execution(text) line 6 at RAISE SET client_min_messages TO DEFAULT; -- In parallel execution mode Citus opens separate connections for each shard -- so killing the connection after the first copy does not break it. @@ -297,7 +287,5 @@ WARNING: could not consume data from worker node COMMIT; DROP SCHEMA real_time_select_failure CASCADE; -NOTICE: drop cascades to 2 other objects -DETAIL: drop cascades to function raise_failed_execution(text) -drop cascades to table test_table +NOTICE: drop cascades to table test_table SET search_path TO default; diff --git a/src/test/regress/expected/multi_task_string_size.out b/src/test/regress/expected/multi_task_string_size.out index 57d527c07..7660e4329 100644 --- a/src/test/regress/expected/multi_task_string_size.out +++ b/src/test/regress/expected/multi_task_string_size.out @@ -225,8 +225,12 @@ ERROR: parameter "citus.max_task_string_size" cannot be changed without restart -- error message may vary between executions -- hiding warning and error message -- no output means the query has failed -SET client_min_messages to FATAL; +SET client_min_messages to ERROR; +SELECT raise_failed_execution(' SELECT u.* FROM wide_table u JOIN wide_table v ON (u.long_column_002 = v.long_column_003); +'); +ERROR: Task failed to execute +CONTEXT: PL/pgSQL function raise_failed_execution(text) line 6 at RAISE -- following will succeed since it fetches few columns SELECT u.long_column_001, u.long_column_002, u.long_column_003 FROM wide_table u JOIN wide_table v ON (u.long_column_002 = v.long_column_003); long_column_001 | long_column_002 | long_column_003 diff --git a/src/test/regress/expected/multi_test_helpers.out b/src/test/regress/expected/multi_test_helpers.out index 6ffe7eae8..40073f61e 100644 --- a/src/test/regress/expected/multi_test_helpers.out +++ b/src/test/regress/expected/multi_test_helpers.out @@ -109,3 +109,13 @@ $desc_views$ (1 row) +-- Create a function to make sure that queries returning the same result +CREATE FUNCTION raise_failed_execution(query text) RETURNS void AS $$ +BEGIN + EXECUTE query; + EXCEPTION WHEN OTHERS THEN + IF SQLERRM LIKE 'failed to execute task%' THEN + RAISE 'Task failed to execute'; + END IF; +END; +$$LANGUAGE plpgsql; diff --git a/src/test/regress/sql/failure_multi_dml.sql b/src/test/regress/sql/failure_multi_dml.sql index 15294c5b8..b8ed0391c 100644 --- a/src/test/regress/sql/failure_multi_dml.sql +++ b/src/test/regress/sql/failure_multi_dml.sql @@ -65,19 +65,23 @@ SELECT * FROM dml_test ORDER BY id ASC; -- fail at PREPARE TRANSACTION SELECT citus.mitmproxy('conn.onQuery(query="^PREPARE TRANSACTION").kill()'); --- hide the error message (it has the PID)... +-- this transaction block will be sent to the coordinator as a remote command to hide the +-- error message that is caused during commit. -- we'll test for the txn side-effects to ensure it didn't run -SET client_min_messages TO FATAL; - +SELECT master_run_on_worker( + ARRAY['localhost']::text[], + ARRAY[:master_port]::int[], + ARRAY[' BEGIN; DELETE FROM dml_test WHERE id = 1; DELETE FROM dml_test WHERE id = 2; -INSERT INTO dml_test VALUES (5, 'Epsilon'); -UPDATE dml_test SET name = 'alpha' WHERE id = 1; -UPDATE dml_test SET name = 'gamma' WHERE id = 3; +INSERT INTO dml_test VALUES (5, ''Epsilon''); +UPDATE dml_test SET name = ''alpha'' WHERE id = 1; +UPDATE dml_test SET name = ''gamma'' WHERE id = 3; COMMIT; - -SET client_min_messages TO DEFAULT; + '], + false +); SELECT citus.mitmproxy('conn.allow()'); SELECT shardid FROM pg_dist_shard_placement WHERE shardstate = 3; diff --git a/src/test/regress/sql/failure_real_time_select.sql b/src/test/regress/sql/failure_real_time_select.sql index a9df56c80..3dacca315 100644 --- a/src/test/regress/sql/failure_real_time_select.sql +++ b/src/test/regress/sql/failure_real_time_select.sql @@ -17,22 +17,11 @@ SELECT create_distributed_table('test_table','id'); -- Populate data to the table INSERT INTO test_table VALUES(1,1,1),(1,2,2),(2,1,1),(2,2,2),(3,1,1),(3,2,2); --- Create a function to make sure that queries returning the same result -CREATE FUNCTION raise_failed_execution(query text) RETURNS void AS $$ -BEGIN - EXECUTE query; - EXCEPTION WHEN OTHERS THEN - IF SQLERRM LIKE 'failed to execute task%' THEN - RAISE 'Task failed to execute'; - END IF; -END; -$$LANGUAGE plpgsql; - -- Kill when the first COPY command arrived, since we have a single placement -- it is expected to error out. SET client_min_messages TO ERROR; SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()'); -SELECT raise_failed_execution('SELECT count(*) FROM test_table'); +SELECT public.raise_failed_execution('SELECT count(*) FROM test_table'); SET client_min_messages TO DEFAULT; -- Kill the connection with a CTE @@ -46,7 +35,7 @@ WHERE test_table.id = results.id; -- killing connection after first successful query should break. SET client_min_messages TO ERROR; SELECT citus.mitmproxy('conn.onQuery(query="^COPY").after(1).kill()'); -SELECT raise_failed_execution('WITH +SELECT public.raise_failed_execution('WITH results AS (SELECT * FROM test_table) SELECT * FROM test_table, results WHERE test_table.id = results.id'); diff --git a/src/test/regress/sql/multi_task_string_size.sql b/src/test/regress/sql/multi_task_string_size.sql index c650902a0..a1c104b6e 100644 --- a/src/test/regress/sql/multi_task_string_size.sql +++ b/src/test/regress/sql/multi_task_string_size.sql @@ -220,9 +220,11 @@ SET citus.max_task_string_size TO 20000; -- error message may vary between executions -- hiding warning and error message -- no output means the query has failed -SET client_min_messages to FATAL; +SET client_min_messages to ERROR; +SELECT raise_failed_execution(' SELECT u.* FROM wide_table u JOIN wide_table v ON (u.long_column_002 = v.long_column_003); +'); -- following will succeed since it fetches few columns SELECT u.long_column_001, u.long_column_002, u.long_column_003 FROM wide_table u JOIN wide_table v ON (u.long_column_002 = v.long_column_003); diff --git a/src/test/regress/sql/multi_test_helpers.sql b/src/test/regress/sql/multi_test_helpers.sql index 23bbd97f4..26bb17882 100644 --- a/src/test/regress/sql/multi_test_helpers.sql +++ b/src/test/regress/sql/multi_test_helpers.sql @@ -106,3 +106,14 @@ ORDER BY a.attrelid, a.attnum; $desc_views$ ); + +-- Create a function to make sure that queries returning the same result +CREATE FUNCTION raise_failed_execution(query text) RETURNS void AS $$ +BEGIN + EXECUTE query; + EXCEPTION WHEN OTHERS THEN + IF SQLERRM LIKE 'failed to execute task%' THEN + RAISE 'Task failed to execute'; + END IF; +END; +$$LANGUAGE plpgsql;