diff --git a/Makefile b/Makefile index 5558cc54a..97e2eede2 100644 --- a/Makefile +++ b/Makefile @@ -116,6 +116,7 @@ OBJS = src/backend/distributed/shared_library_init.o \ src/backend/distributed/worker/worker_file_access_protocol.o \ src/backend/distributed/worker/worker_merge_protocol.o \ src/backend/distributed/worker/worker_partition_protocol.o \ + src/backend/distributed/worker/worker_sql_task_protocol.o \ src/backend/distributed/worker/worker_truncate_trigger_protocol.o \ $(WIN32RES) diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 5735e4422..6415411d5 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -338,6 +338,18 @@ StubRelation(TupleDesc tupleDescriptor) void ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamListInfo params, DestReceiver *dest) +{ + Query *query = ParseQueryString(queryString); + + ExecuteQueryIntoDestReceiver(query, params, dest); +} + + +/* + * ParseQuery parses query string and returns a Query struct. + */ +Query * +ParseQueryString(const char *queryString) { Query *query = NULL; @@ -356,7 +368,7 @@ ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamListInfo params query = (Query *) linitial(queryTreeList); - ExecuteQueryIntoDestReceiver(query, params, dest); + return query; } diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index 12ca7e709..76a0b7e70 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -117,10 +117,11 @@ static bool IsCitusExtensionStmt(Node *parsetree); static bool IsTransmitStmt(Node *parsetree); static void VerifyTransmitStmt(CopyStmt *copyStatement); static bool IsCopyResultStmt(CopyStmt *copyStatement); +static bool CopyStatementHasFormat(CopyStmt *copyStatement, char *formatName); /* Local functions forward declarations for processing distributed table commands */ static Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, - bool *commandMustRunAsOwner); + const char *queryString); static void ProcessCreateTableStmtPartitionOf(CreateStmt *createStatement); static void ProcessAlterTableStmtAttachPartition(AlterTableStmt *alterTableStatement); static List * PlanIndexStmt(IndexStmt *createIndexStatement, @@ -176,7 +177,6 @@ static List * InterShardDDLTaskList(Oid leftRelationId, Oid rightRelationId, const char *commandString); static void RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid oldRelOid, void *arg); -static void CheckCopyPermissions(CopyStmt *copyStatement); static List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist); static void PostProcessUtility(Node *parsetree); static List * CollectGrantTableIdList(GrantStmt *grantStmt); @@ -257,9 +257,6 @@ multi_ProcessUtility(PlannedStmt *pstmt, char *completionTag) { Node *parsetree = pstmt->utilityStmt; - bool commandMustRunAsOwner = false; - Oid savedUserId = InvalidOid; - int savedSecurityContext = 0; List *ddlJobs = NIL; bool checkExtensionVersion = false; @@ -369,7 +366,7 @@ multi_ProcessUtility(PlannedStmt *pstmt, parsetree = copyObject(parsetree); parsetree = ProcessCopyStmt((CopyStmt *) parsetree, completionTag, - &commandMustRunAsOwner); + queryString); previousContext = MemoryContextSwitchTo(planContext); parsetree = copyObject(parsetree); @@ -560,13 +557,6 @@ multi_ProcessUtility(PlannedStmt *pstmt, } } - /* set user if needed and go ahead and run local utility using standard hook */ - if (commandMustRunAsOwner) - { - GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); - SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); - } - #if (PG_VERSION_NUM >= 100000) pstmt->utilityStmt = parsetree; standard_ProcessUtility(pstmt, queryString, context, @@ -605,11 +595,6 @@ multi_ProcessUtility(PlannedStmt *pstmt, PostProcessUtility(parsetree); } - if (commandMustRunAsOwner) - { - SetUserIdAndSecContext(savedUserId, savedSecurityContext); - } - /* * Re-forming the foreign key graph relies on the command being executed * on the local table first. However, in order to decide whether the @@ -971,9 +956,20 @@ VerifyTransmitStmt(CopyStmt *copyStatement) */ static bool IsCopyResultStmt(CopyStmt *copyStatement) +{ + return CopyStatementHasFormat(copyStatement, "result"); +} + + +/* + * CopyStatementHasFormat checks whether the COPY statement has the given + * format. + */ +static bool +CopyStatementHasFormat(CopyStmt *copyStatement, char *formatName) { ListCell *optionCell = NULL; - bool hasFormatReceive = false; + bool hasFormat = false; /* extract WITH (...) options from the COPY statement */ foreach(optionCell, copyStatement->options) @@ -981,14 +977,14 @@ IsCopyResultStmt(CopyStmt *copyStatement) DefElem *defel = (DefElem *) lfirst(optionCell); if (strncmp(defel->defname, "format", NAMEDATALEN) == 0 && - strncmp(defGetString(defel), "result", NAMEDATALEN) == 0) + strncmp(defGetString(defel), formatName, NAMEDATALEN) == 0) { - hasFormatReceive = true; + hasFormat = true; break; } } - return hasFormatReceive; + return hasFormat; } @@ -997,18 +993,10 @@ IsCopyResultStmt(CopyStmt *copyStatement) * COPYing from distributed tables and preventing unsupported actions. The * function returns a modified COPY statement to be executed, or NULL if no * further processing is needed. - * - * commandMustRunAsOwner is an output parameter used to communicate to the caller whether - * the copy statement should be executed using elevated privileges. If - * ProcessCopyStmt that is required, a call to CheckCopyPermissions will take - * care of verifying the current user's permissions before ProcessCopyStmt - * returns. */ static Node * -ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustRunAsOwner) +ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, const char *queryString) { - *commandMustRunAsOwner = false; /* make sure variable is initialized */ - /* * Handle special COPY "resultid" FROM STDIN WITH (format result) commands * for sending intermediate results to workers. @@ -1114,50 +1102,48 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustR } } - if (copyStatement->filename != NULL && !copyStatement->is_program) { - const char *filename = copyStatement->filename; + char *filename = copyStatement->filename; - if (CacheDirectoryElement(filename)) + /* + * We execute COPY commands issued by the task-tracker executor here + * because we're not normally allowed to write to a file as a regular + * user and we don't want to execute the query as superuser. + */ + if (CacheDirectoryElement(filename) && copyStatement->query != NULL && + !copyStatement->is_from && !is_absolute_path(filename)) { - /* - * Only superusers are allowed to copy from a file, so we have to - * become superuser to execute copies to/from files used by citus' - * query execution. - * - * XXX: This is a decidedly suboptimal solution, as that means - * that triggers, input functions, etc. run with elevated - * privileges. But this is better than not being able to run - * queries as normal user. - */ - *commandMustRunAsOwner = true; + bool binaryCopyFormat = CopyStatementHasFormat(copyStatement, "binary"); + int64 tuplesSent = 0; + Query *query = NULL; + Node *queryNode = copyStatement->query; + List *queryTreeList = NIL; - /* - * Have to manually check permissions here as the COPY is will be - * run as a superuser. - */ - if (copyStatement->relation != NULL) +#if (PG_VERSION_NUM >= 100000) + RawStmt *rawStmt = makeNode(RawStmt); + rawStmt->stmt = queryNode; + + queryTreeList = pg_analyze_and_rewrite(rawStmt, queryString, NULL, 0, NULL); +#else + queryTreeList = pg_analyze_and_rewrite(queryNode, queryString, NULL, 0); +#endif + + if (list_length(queryTreeList) != 1) { - CheckCopyPermissions(copyStatement); + ereport(ERROR, (errmsg("can only execute a single query"))); } - /* - * Check if we have a "COPY (query) TO filename". If we do, copy - * doesn't accept relative file paths. However, SQL tasks that get - * assigned to worker nodes have relative paths. We therefore - * convert relative paths to absolute ones here. - */ - if (copyStatement->relation == NULL && - !copyStatement->is_from && - !is_absolute_path(filename)) - { - copyStatement->filename = make_absolute_path(filename); - } + query = (Query *) linitial(queryTreeList); + tuplesSent = WorkerExecuteSqlTask(query, filename, binaryCopyFormat); + + snprintf(completionTag, COMPLETION_TAG_BUFSIZE, + "COPY " UINT64_FORMAT, tuplesSent); + + return NULL; } } - return (Node *) copyStatement; } @@ -3917,7 +3903,7 @@ RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid oldRelOid, voi * * Copied from postgres, where it's part of DoCopy(). */ -static void +void CheckCopyPermissions(CopyStmt *copyStatement) { /* *INDENT-OFF* */ diff --git a/src/backend/distributed/worker/worker_data_fetch_protocol.c b/src/backend/distributed/worker/worker_data_fetch_protocol.c index 0b8415ea1..35b001d2a 100644 --- a/src/backend/distributed/worker/worker_data_fetch_protocol.c +++ b/src/backend/distributed/worker/worker_data_fetch_protocol.c @@ -31,6 +31,7 @@ #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/multi_client_executor.h" +#include "distributed/multi_copy.h" #include "distributed/multi_logical_optimizer.h" #include "distributed/multi_partitioning_utils.h" #include "distributed/multi_server_executor.h" @@ -766,6 +767,8 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS) StringInfo queryString = NULL; Oid sourceShardRelationId = InvalidOid; Oid sourceSchemaId = InvalidOid; + Oid savedUserId = InvalidOid; + int savedSecurityContext = 0; CheckCitusVersion(ERROR); @@ -829,9 +832,18 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS) appendStringInfo(queryString, COPY_IN_COMMAND, shardQualifiedName, localFilePath->data); + /* make sure we are allowed to execute the COPY command */ + CheckCopyPermissions(localCopyCommand); + + /* need superuser to copy from files */ + GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); + SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); + CitusProcessUtility((Node *) localCopyCommand, queryString->data, PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL); + SetUserIdAndSecContext(savedUserId, savedSecurityContext); + /* finally delete the temporary file we created */ CitusDeleteFile(localFilePath->data); diff --git a/src/backend/distributed/worker/worker_sql_task_protocol.c b/src/backend/distributed/worker/worker_sql_task_protocol.c new file mode 100644 index 000000000..de914fbc1 --- /dev/null +++ b/src/backend/distributed/worker/worker_sql_task_protocol.c @@ -0,0 +1,280 @@ +/*------------------------------------------------------------------------- + * + * worker_sql_task_protocol.c + * + * Routines for executing SQL tasks during task-tracker execution. + * + * Copyright (c) 2012-2018, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "funcapi.h" +#include "pgstat.h" + +#include "distributed/multi_copy.h" +#include "distributed/multi_executor.h" +#include "distributed/transmit.h" +#include "distributed/worker_protocol.h" +#include "utils/builtins.h" +#include "utils/memutils.h" + + +/* TaskFileDestReceiver can be used to stream results into a file */ +typedef struct TaskFileDestReceiver +{ + /* public DestReceiver interface */ + DestReceiver pub; + + /* descriptor of the tuples that are sent to the worker */ + TupleDesc tupleDescriptor; + + /* EState for per-tuple memory allocation */ + EState *executorState; + + /* MemoryContext for DestReceiver session */ + MemoryContext memoryContext; + + /* output file */ + char *filePath; + File fileDesc; + bool binaryCopyFormat; + + /* state on how to copy out data types */ + CopyOutState copyOutState; + FmgrInfo *columnOutputFunctions; + + /* number of tuples sent */ + uint64 tuplesSent; +} TaskFileDestReceiver; + + +static DestReceiver * CreateTaskFileDestReceiver(char *filePath, EState *executorState, + bool binaryCopyFormat); +static void TaskFileDestReceiverStartup(DestReceiver *dest, int operation, + TupleDesc inputTupleDescriptor); +static bool TaskFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest); +static void WriteToLocalFile(StringInfo copyData, File fileDesc); +static void TaskFileDestReceiverShutdown(DestReceiver *destReceiver); +static void TaskFileDestReceiverDestroy(DestReceiver *destReceiver); + + +/* + * WorkerExecuteSqlTask executes an already-parsed query and writes the result + * to the given task file. + */ +int64 +WorkerExecuteSqlTask(Query *query, char *taskFilename, bool binaryCopyFormat) +{ + EState *estate = NULL; + TaskFileDestReceiver *taskFileDest = NULL; + ParamListInfo paramListInfo = NULL; + int64 tuplesSent = 0L; + + estate = CreateExecutorState(); + taskFileDest = + (TaskFileDestReceiver *) CreateTaskFileDestReceiver(taskFilename, estate, + binaryCopyFormat); + + ExecuteQueryIntoDestReceiver(query, paramListInfo, (DestReceiver *) taskFileDest); + + tuplesSent = taskFileDest->tuplesSent; + + taskFileDest->pub.rDestroy((DestReceiver *) taskFileDest); + FreeExecutorState(estate); + + return tuplesSent; +} + + +/* + * CreateTaskFileDestReceiver creates a DestReceiver for writing query results + * to a task file. + */ +static DestReceiver * +CreateTaskFileDestReceiver(char *filePath, EState *executorState, bool binaryCopyFormat) +{ + TaskFileDestReceiver *taskFileDest = NULL; + + taskFileDest = (TaskFileDestReceiver *) palloc0(sizeof(TaskFileDestReceiver)); + + /* set up the DestReceiver function pointers */ + taskFileDest->pub.receiveSlot = TaskFileDestReceiverReceive; + taskFileDest->pub.rStartup = TaskFileDestReceiverStartup; + taskFileDest->pub.rShutdown = TaskFileDestReceiverShutdown; + taskFileDest->pub.rDestroy = TaskFileDestReceiverDestroy; + taskFileDest->pub.mydest = DestCopyOut; + + /* set up output parameters */ + taskFileDest->executorState = executorState; + taskFileDest->memoryContext = CurrentMemoryContext; + taskFileDest->filePath = pstrdup(filePath); + taskFileDest->binaryCopyFormat = binaryCopyFormat; + + return (DestReceiver *) taskFileDest; +} + + +/* + * TaskFileDestReceiverStartup implements the rStartup interface of + * TaskFileDestReceiver. It opens the destination file and sets up + * the CopyOutState. + */ +static void +TaskFileDestReceiverStartup(DestReceiver *dest, int operation, + TupleDesc inputTupleDescriptor) +{ + TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) dest; + + CopyOutState copyOutState = NULL; + const char *delimiterCharacter = "\t"; + const char *nullPrintCharacter = "\\N"; + + const int fileFlags = (O_APPEND | O_CREAT | O_RDWR | O_TRUNC | PG_BINARY); + const int fileMode = (S_IRUSR | S_IWUSR); + + /* use the memory context that was in place when the DestReceiver was created */ + MemoryContext oldContext = MemoryContextSwitchTo(taskFileDest->memoryContext); + + taskFileDest->tupleDescriptor = inputTupleDescriptor; + + /* define how tuples will be serialised */ + copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData)); + copyOutState->delim = (char *) delimiterCharacter; + copyOutState->null_print = (char *) nullPrintCharacter; + copyOutState->null_print_client = (char *) nullPrintCharacter; + copyOutState->binary = taskFileDest->binaryCopyFormat; + copyOutState->fe_msgbuf = makeStringInfo(); + copyOutState->rowcontext = GetPerTupleMemoryContext(taskFileDest->executorState); + taskFileDest->copyOutState = copyOutState; + + taskFileDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor, + copyOutState->binary); + + taskFileDest->fileDesc = FileOpenForTransmit(taskFileDest->filePath, fileFlags, + fileMode); + + if (copyOutState->binary) + { + /* write headers when using binary encoding */ + resetStringInfo(copyOutState->fe_msgbuf); + AppendCopyBinaryHeaders(copyOutState); + + WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest->fileDesc); + } + + MemoryContextSwitchTo(oldContext); +} + + +/* + * TaskFileDestReceiverReceive implements the receiveSlot function of + * TaskFileDestReceiver. It takes a TupleTableSlot and writes the contents + * to a local file. + */ +static bool +TaskFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) +{ + TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) dest; + + TupleDesc tupleDescriptor = taskFileDest->tupleDescriptor; + + CopyOutState copyOutState = taskFileDest->copyOutState; + FmgrInfo *columnOutputFunctions = taskFileDest->columnOutputFunctions; + + Datum *columnValues = NULL; + bool *columnNulls = NULL; + StringInfo copyData = copyOutState->fe_msgbuf; + + EState *executorState = taskFileDest->executorState; + MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState); + MemoryContext oldContext = MemoryContextSwitchTo(executorTupleContext); + + slot_getallattrs(slot); + + columnValues = slot->tts_values; + columnNulls = slot->tts_isnull; + + resetStringInfo(copyData); + + /* construct row in COPY format */ + AppendCopyRowData(columnValues, columnNulls, tupleDescriptor, + copyOutState, columnOutputFunctions, NULL); + + WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest->fileDesc); + + MemoryContextSwitchTo(oldContext); + + taskFileDest->tuplesSent++; + + ResetPerTupleExprContext(executorState); + + return true; +} + + +/* + * WriteToLocalResultsFile writes the bytes in a StringInfo to a local file. + */ +static void +WriteToLocalFile(StringInfo copyData, File fileDesc) +{ +#if (PG_VERSION_NUM >= 100000) + int bytesWritten = FileWrite(fileDesc, copyData->data, copyData->len, PG_WAIT_IO); +#else + int bytesWritten = FileWrite(fileDesc, copyData->data, copyData->len); +#endif + if (bytesWritten < 0) + { + ereport(ERROR, (errcode_for_file_access(), + errmsg("could not append to file: %m"))); + } +} + + +/* + * TaskFileDestReceiverShutdown implements the rShutdown interface of + * TaskFileDestReceiver. It writes the footer and closes the file. + * the relation. + */ +static void +TaskFileDestReceiverShutdown(DestReceiver *destReceiver) +{ + TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) destReceiver; + CopyOutState copyOutState = taskFileDest->copyOutState; + + if (copyOutState->binary) + { + /* write footers when using binary encoding */ + resetStringInfo(copyOutState->fe_msgbuf); + AppendCopyBinaryFooters(copyOutState); + WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest->fileDesc); + } + + FileClose(taskFileDest->fileDesc); +} + + +/* + * TaskFileDestReceiverDestroy frees memory allocated as part of the + * TaskFileDestReceiver and closes file descriptors. + */ +static void +TaskFileDestReceiverDestroy(DestReceiver *destReceiver) +{ + TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) destReceiver; + + if (taskFileDest->copyOutState) + { + pfree(taskFileDest->copyOutState); + } + + if (taskFileDest->columnOutputFunctions) + { + pfree(taskFileDest->columnOutputFunctions); + } + + pfree(taskFileDest->filePath); + pfree(taskFileDest); +} diff --git a/src/include/distributed/multi_copy.h b/src/include/distributed/multi_copy.h index deb74fa92..ae4f3c868 100644 --- a/src/include/distributed/multi_copy.h +++ b/src/include/distributed/multi_copy.h @@ -131,6 +131,7 @@ extern void EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailur extern void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag); extern bool IsCopyFromWorker(CopyStmt *copyStatement); extern NodeAddress * MasterNodeAddress(CopyStmt *copyStatement); +extern void CheckCopyPermissions(CopyStmt *copyStatement); #endif /* MULTI_COPY_H */ diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index 6c798eb26..ad1aca23e 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -36,6 +36,7 @@ extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState); extern void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob); extern void ReadFileIntoTupleStore(char *fileName, char *copyFormat, TupleDesc tupleDescriptor, Tuplestorestate *tupstore); +extern Query * ParseQueryString(const char *queryString); extern void ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamListInfo params, DestReceiver *dest); diff --git a/src/include/distributed/worker_protocol.h b/src/include/distributed/worker_protocol.h index 30bef0f68..b92119b7f 100644 --- a/src/include/distributed/worker_protocol.h +++ b/src/include/distributed/worker_protocol.h @@ -122,6 +122,8 @@ extern FmgrInfo * GetFunctionInfo(Oid typeId, Oid accessMethodId, int16 procedur extern uint64 ExtractShardIdFromTableName(const char *tableName, bool missingOk); extern List * TableDDLCommandList(const char *nodeName, uint32 nodePort, const char *tableName); +extern int64 WorkerExecuteSqlTask(Query *query, char *taskFilename, + bool binaryCopyFormat); /* Function declarations shared with the master planner */ diff --git a/src/test/regress/.gitignore b/src/test/regress/.gitignore index 477bd27cc..6e3048cce 100644 --- a/src/test/regress/.gitignore +++ b/src/test/regress/.gitignore @@ -9,3 +9,6 @@ # Regression test output /regression.diffs /regression.out + +# Failure test side effets +/proxy.output diff --git a/src/test/regress/expected/failure_multi_dml.out b/src/test/regress/expected/failure_multi_dml.out index abee6d84e..990d77318 100644 --- a/src/test/regress/expected/failure_multi_dml.out +++ b/src/test/regress/expected/failure_multi_dml.out @@ -122,17 +122,28 @@ SELECT citus.mitmproxy('conn.onQuery(query="^PREPARE TRANSACTION").kill()'); (1 row) --- hide the error message (it has the PID)... +-- this transaction block will be sent to the coordinator as a remote command to hide the +-- error message that is caused during commit. -- we'll test for the txn side-effects to ensure it didn't run -SET client_min_messages TO FATAL; +SELECT master_run_on_worker( + ARRAY['localhost']::text[], + ARRAY[:master_port]::int[], + ARRAY[' BEGIN; DELETE FROM dml_test WHERE id = 1; DELETE FROM dml_test WHERE id = 2; -INSERT INTO dml_test VALUES (5, 'Epsilon'); -UPDATE dml_test SET name = 'alpha' WHERE id = 1; -UPDATE dml_test SET name = 'gamma' WHERE id = 3; +INSERT INTO dml_test VALUES (5, ''Epsilon''); +UPDATE dml_test SET name = ''alpha'' WHERE id = 1; +UPDATE dml_test SET name = ''gamma'' WHERE id = 3; COMMIT; -SET client_min_messages TO DEFAULT; + '], + false +); + master_run_on_worker +--------------------------- + (localhost,57636,t,BEGIN) +(1 row) + SELECT citus.mitmproxy('conn.allow()'); mitmproxy ----------- diff --git a/src/test/regress/expected/failure_real_time_select.out b/src/test/regress/expected/failure_real_time_select.out index da4d77bac..967569af2 100644 --- a/src/test/regress/expected/failure_real_time_select.out +++ b/src/test/regress/expected/failure_real_time_select.out @@ -23,16 +23,6 @@ SELECT create_distributed_table('test_table','id'); -- Populate data to the table INSERT INTO test_table VALUES(1,1,1),(1,2,2),(2,1,1),(2,2,2),(3,1,1),(3,2,2); --- Create a function to make sure that queries returning the same result -CREATE FUNCTION raise_failed_execution(query text) RETURNS void AS $$ -BEGIN - EXECUTE query; - EXCEPTION WHEN OTHERS THEN - IF SQLERRM LIKE 'failed to execute task%' THEN - RAISE 'Task failed to execute'; - END IF; -END; -$$LANGUAGE plpgsql; -- Kill when the first COPY command arrived, since we have a single placement -- it is expected to error out. SET client_min_messages TO ERROR; @@ -42,9 +32,9 @@ SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()'); (1 row) -SELECT raise_failed_execution('SELECT count(*) FROM test_table'); +SELECT public.raise_failed_execution('SELECT count(*) FROM test_table'); ERROR: Task failed to execute -CONTEXT: PL/pgSQL function raise_failed_execution(text) line 6 at RAISE +CONTEXT: PL/pgSQL function public.raise_failed_execution(text) line 6 at RAISE SET client_min_messages TO DEFAULT; -- Kill the connection with a CTE SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()'); @@ -70,12 +60,12 @@ SELECT citus.mitmproxy('conn.onQuery(query="^COPY").after(1).kill()'); (1 row) -SELECT raise_failed_execution('WITH +SELECT public.raise_failed_execution('WITH results AS (SELECT * FROM test_table) SELECT * FROM test_table, results WHERE test_table.id = results.id'); ERROR: Task failed to execute -CONTEXT: PL/pgSQL function raise_failed_execution(text) line 6 at RAISE +CONTEXT: PL/pgSQL function public.raise_failed_execution(text) line 6 at RAISE SET client_min_messages TO DEFAULT; -- In parallel execution mode Citus opens separate connections for each shard -- so killing the connection after the first copy does not break it. @@ -297,7 +287,5 @@ WARNING: could not consume data from worker node COMMIT; DROP SCHEMA real_time_select_failure CASCADE; -NOTICE: drop cascades to 2 other objects -DETAIL: drop cascades to function raise_failed_execution(text) -drop cascades to table test_table +NOTICE: drop cascades to table test_table SET search_path TO default; diff --git a/src/test/regress/expected/multi_multiuser.out b/src/test/regress/expected/multi_multiuser.out index d040f3e3a..fab62f0a9 100644 --- a/src/test/regress/expected/multi_multiuser.out +++ b/src/test/regress/expected/multi_multiuser.out @@ -93,10 +93,10 @@ SELECT count(*) FROM test WHERE id = 1; (1 row) SET citus.task_executor_type TO 'task-tracker'; -SELECT count(*) FROM test; - count -------- - 2 +SELECT count(*), min(current_user) FROM test; + count | min +-------+------------- + 2 | full_access (1 row) -- test re-partition query (needs to transmit intermediate results) @@ -140,10 +140,10 @@ SELECT count(*) FROM test WHERE id = 1; (1 row) SET citus.task_executor_type TO 'task-tracker'; -SELECT count(*) FROM test; - count -------- - 2 +SELECT count(*), min(current_user) FROM test; + count | min +-------+------------- + 2 | read_access (1 row) -- test re-partition query (needs to transmit intermediate results) @@ -171,7 +171,7 @@ ERROR: permission denied for table test SELECT count(*) FROM test WHERE id = 1; ERROR: permission denied for table test SET citus.task_executor_type TO 'task-tracker'; -SELECT count(*) FROM test; +SELECT count(*), min(current_user) FROM test; ERROR: permission denied for table test -- test re-partition query SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2; diff --git a/src/test/regress/expected/multi_multiuser_0.out b/src/test/regress/expected/multi_multiuser_0.out index 037a31381..77ad642b7 100644 --- a/src/test/regress/expected/multi_multiuser_0.out +++ b/src/test/regress/expected/multi_multiuser_0.out @@ -93,10 +93,10 @@ SELECT count(*) FROM test WHERE id = 1; (1 row) SET citus.task_executor_type TO 'task-tracker'; -SELECT count(*) FROM test; - count -------- - 2 +SELECT count(*), min(current_user) FROM test; + count | min +-------+------------- + 2 | full_access (1 row) -- test re-partition query (needs to transmit intermediate results) @@ -140,10 +140,10 @@ SELECT count(*) FROM test WHERE id = 1; (1 row) SET citus.task_executor_type TO 'task-tracker'; -SELECT count(*) FROM test; - count -------- - 2 +SELECT count(*), min(current_user) FROM test; + count | min +-------+------------- + 2 | read_access (1 row) -- test re-partition query (needs to transmit intermediate results) @@ -171,7 +171,7 @@ ERROR: permission denied for relation test SELECT count(*) FROM test WHERE id = 1; ERROR: permission denied for relation test SET citus.task_executor_type TO 'task-tracker'; -SELECT count(*) FROM test; +SELECT count(*), min(current_user) FROM test; ERROR: permission denied for relation test -- test re-partition query SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2; diff --git a/src/test/regress/expected/multi_task_string_size.out b/src/test/regress/expected/multi_task_string_size.out index 57d527c07..7660e4329 100644 --- a/src/test/regress/expected/multi_task_string_size.out +++ b/src/test/regress/expected/multi_task_string_size.out @@ -225,8 +225,12 @@ ERROR: parameter "citus.max_task_string_size" cannot be changed without restart -- error message may vary between executions -- hiding warning and error message -- no output means the query has failed -SET client_min_messages to FATAL; +SET client_min_messages to ERROR; +SELECT raise_failed_execution(' SELECT u.* FROM wide_table u JOIN wide_table v ON (u.long_column_002 = v.long_column_003); +'); +ERROR: Task failed to execute +CONTEXT: PL/pgSQL function raise_failed_execution(text) line 6 at RAISE -- following will succeed since it fetches few columns SELECT u.long_column_001, u.long_column_002, u.long_column_003 FROM wide_table u JOIN wide_table v ON (u.long_column_002 = v.long_column_003); long_column_001 | long_column_002 | long_column_003 diff --git a/src/test/regress/expected/multi_test_helpers.out b/src/test/regress/expected/multi_test_helpers.out index 6ffe7eae8..40073f61e 100644 --- a/src/test/regress/expected/multi_test_helpers.out +++ b/src/test/regress/expected/multi_test_helpers.out @@ -109,3 +109,13 @@ $desc_views$ (1 row) +-- Create a function to make sure that queries returning the same result +CREATE FUNCTION raise_failed_execution(query text) RETURNS void AS $$ +BEGIN + EXECUTE query; + EXCEPTION WHEN OTHERS THEN + IF SQLERRM LIKE 'failed to execute task%' THEN + RAISE 'Task failed to execute'; + END IF; +END; +$$LANGUAGE plpgsql; diff --git a/src/test/regress/sql/failure_multi_dml.sql b/src/test/regress/sql/failure_multi_dml.sql index 15294c5b8..b8ed0391c 100644 --- a/src/test/regress/sql/failure_multi_dml.sql +++ b/src/test/regress/sql/failure_multi_dml.sql @@ -65,19 +65,23 @@ SELECT * FROM dml_test ORDER BY id ASC; -- fail at PREPARE TRANSACTION SELECT citus.mitmproxy('conn.onQuery(query="^PREPARE TRANSACTION").kill()'); --- hide the error message (it has the PID)... +-- this transaction block will be sent to the coordinator as a remote command to hide the +-- error message that is caused during commit. -- we'll test for the txn side-effects to ensure it didn't run -SET client_min_messages TO FATAL; - +SELECT master_run_on_worker( + ARRAY['localhost']::text[], + ARRAY[:master_port]::int[], + ARRAY[' BEGIN; DELETE FROM dml_test WHERE id = 1; DELETE FROM dml_test WHERE id = 2; -INSERT INTO dml_test VALUES (5, 'Epsilon'); -UPDATE dml_test SET name = 'alpha' WHERE id = 1; -UPDATE dml_test SET name = 'gamma' WHERE id = 3; +INSERT INTO dml_test VALUES (5, ''Epsilon''); +UPDATE dml_test SET name = ''alpha'' WHERE id = 1; +UPDATE dml_test SET name = ''gamma'' WHERE id = 3; COMMIT; - -SET client_min_messages TO DEFAULT; + '], + false +); SELECT citus.mitmproxy('conn.allow()'); SELECT shardid FROM pg_dist_shard_placement WHERE shardstate = 3; diff --git a/src/test/regress/sql/failure_real_time_select.sql b/src/test/regress/sql/failure_real_time_select.sql index a9df56c80..3dacca315 100644 --- a/src/test/regress/sql/failure_real_time_select.sql +++ b/src/test/regress/sql/failure_real_time_select.sql @@ -17,22 +17,11 @@ SELECT create_distributed_table('test_table','id'); -- Populate data to the table INSERT INTO test_table VALUES(1,1,1),(1,2,2),(2,1,1),(2,2,2),(3,1,1),(3,2,2); --- Create a function to make sure that queries returning the same result -CREATE FUNCTION raise_failed_execution(query text) RETURNS void AS $$ -BEGIN - EXECUTE query; - EXCEPTION WHEN OTHERS THEN - IF SQLERRM LIKE 'failed to execute task%' THEN - RAISE 'Task failed to execute'; - END IF; -END; -$$LANGUAGE plpgsql; - -- Kill when the first COPY command arrived, since we have a single placement -- it is expected to error out. SET client_min_messages TO ERROR; SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()'); -SELECT raise_failed_execution('SELECT count(*) FROM test_table'); +SELECT public.raise_failed_execution('SELECT count(*) FROM test_table'); SET client_min_messages TO DEFAULT; -- Kill the connection with a CTE @@ -46,7 +35,7 @@ WHERE test_table.id = results.id; -- killing connection after first successful query should break. SET client_min_messages TO ERROR; SELECT citus.mitmproxy('conn.onQuery(query="^COPY").after(1).kill()'); -SELECT raise_failed_execution('WITH +SELECT public.raise_failed_execution('WITH results AS (SELECT * FROM test_table) SELECT * FROM test_table, results WHERE test_table.id = results.id'); diff --git a/src/test/regress/sql/multi_multiuser.sql b/src/test/regress/sql/multi_multiuser.sql index 43306f09c..f569ffbde 100644 --- a/src/test/regress/sql/multi_multiuser.sql +++ b/src/test/regress/sql/multi_multiuser.sql @@ -70,7 +70,7 @@ SELECT count(*) FROM test; SELECT count(*) FROM test WHERE id = 1; SET citus.task_executor_type TO 'task-tracker'; -SELECT count(*) FROM test; +SELECT count(*), min(current_user) FROM test; -- test re-partition query (needs to transmit intermediate results) SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2; @@ -94,7 +94,7 @@ SELECT count(*) FROM test; SELECT count(*) FROM test WHERE id = 1; SET citus.task_executor_type TO 'task-tracker'; -SELECT count(*) FROM test; +SELECT count(*), min(current_user) FROM test; -- test re-partition query (needs to transmit intermediate results) SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2; @@ -115,7 +115,7 @@ SELECT count(*) FROM test; SELECT count(*) FROM test WHERE id = 1; SET citus.task_executor_type TO 'task-tracker'; -SELECT count(*) FROM test; +SELECT count(*), min(current_user) FROM test; -- test re-partition query SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2; diff --git a/src/test/regress/sql/multi_task_string_size.sql b/src/test/regress/sql/multi_task_string_size.sql index c650902a0..a1c104b6e 100644 --- a/src/test/regress/sql/multi_task_string_size.sql +++ b/src/test/regress/sql/multi_task_string_size.sql @@ -220,9 +220,11 @@ SET citus.max_task_string_size TO 20000; -- error message may vary between executions -- hiding warning and error message -- no output means the query has failed -SET client_min_messages to FATAL; +SET client_min_messages to ERROR; +SELECT raise_failed_execution(' SELECT u.* FROM wide_table u JOIN wide_table v ON (u.long_column_002 = v.long_column_003); +'); -- following will succeed since it fetches few columns SELECT u.long_column_001, u.long_column_002, u.long_column_003 FROM wide_table u JOIN wide_table v ON (u.long_column_002 = v.long_column_003); diff --git a/src/test/regress/sql/multi_test_helpers.sql b/src/test/regress/sql/multi_test_helpers.sql index 23bbd97f4..26bb17882 100644 --- a/src/test/regress/sql/multi_test_helpers.sql +++ b/src/test/regress/sql/multi_test_helpers.sql @@ -106,3 +106,14 @@ ORDER BY a.attrelid, a.attnum; $desc_views$ ); + +-- Create a function to make sure that queries returning the same result +CREATE FUNCTION raise_failed_execution(query text) RETURNS void AS $$ +BEGIN + EXECUTE query; + EXCEPTION WHEN OTHERS THEN + IF SQLERRM LIKE 'failed to execute task%' THEN + RAISE 'Task failed to execute'; + END IF; +END; +$$LANGUAGE plpgsql;