From 82f3157750002a6e7fe6299bac20e35cc78706d4 Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Wed, 12 Feb 2020 19:55:09 +0300 Subject: [PATCH] introduce functions for local execution of utility commands --- .../distributed/executor/local_executor.c | 101 +++++++++++++++++- .../distributed/executor/multi_executor.c | 16 +++ src/include/distributed/local_executor.h | 1 + src/include/distributed/multi_executor.h | 2 + 4 files changed, 117 insertions(+), 3 deletions(-) diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index 3451994c9..cf81ee4ee 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -67,9 +67,6 @@ * use local query execution since local execution is sequential. Basically, * we do not want to lose parallelism across local tasks by switching to local * execution. - * - The local execution currently only supports queries. In other words, any - * utility commands like TRUNCATE, fails if the command is executed after a local - * execution inside a transaction block. * - The local execution cannot be mixed with the executors other than adaptive, * namely task-tracker executor. * - Related with the previous item, COPY command cannot be mixed with local @@ -79,6 +76,7 @@ #include "postgres.h" #include "miscadmin.h" +#include "distributed/commands/utility_hook.h" #include "distributed/citus_custom_scan.h" #include "distributed/citus_ruleutils.h" #include "distributed/deparse_shard_query.h" @@ -90,6 +88,7 @@ #include "distributed/relation_access_tracking.h" #include "distributed/remote_commands.h" /* to access LogRemoteCommands */ #include "distributed/transaction_management.h" +#include "distributed/worker_protocol.h" #include "executor/tstoreReceiver.h" #include "executor/tuptable.h" #if PG_VERSION_NUM >= 120000 @@ -118,6 +117,8 @@ static void LogLocalCommand(Task *task); static void ExtractParametersForLocalExecution(ParamListInfo paramListInfo, Oid **parameterTypes, const char ***parameterValues); +static void LocallyExecuteUtilityTask(const char *utilityCommand); +static void LocallyExecuteUdfTaskQuery(Query *localUdfCommandQuery); /* @@ -245,6 +246,100 @@ ExtractParametersForLocalExecution(ParamListInfo paramListInfo, Oid **parameterT } +/* + * ExecuteLocalUtilityTaskList executes a list of tasks locally. This function + * also logs local execution notice for each task and sets + * TransactionAccessedLocalPlacement to true for next set of possible queries + * & commands within the current transaction block. See the comment in function. + */ +void +ExecuteLocalUtilityTaskList(List *localTaskList) +{ + Task *localTask = NULL; + + foreach_ptr(localTask, localTaskList) + { + const char *localTaskQueryCommand = TaskQueryString(localTask); + + /* we do not expect tasks with INVALID_SHARD_ID for utility commands */ + Assert(localTask->anchorShardId != INVALID_SHARD_ID); + + Assert(TaskAccessesLocalNode(localTask)); + + /* + * We should register the access to local placement to force the local + * execution of the following commands withing the current transaction. + * It can be a coordinated transaction or a transaction that is initiated + * explicitly. + */ + TransactionAccessedLocalPlacement = true; + + LogLocalCommand(localTask); + + LocallyExecuteUtilityTask(localTaskQueryCommand); + } +} + + +/* + * LocallyExecuteUtilityTask executes the given local task query in the current + * session. + */ +static void +LocallyExecuteUtilityTask(const char *localTaskQueryCommand) +{ + RawStmt *localTaskRawStmt = (RawStmt *) ParseTreeRawStmt(localTaskQueryCommand); + + Node *localTaskRawParseTree = localTaskRawStmt->stmt; + + /* + * Actually, the query passed to this function would mostly be a + * utility command to be executed locally. However, some utility + * commands do trigger udf calls (e.g worker_apply_shard_ddl_command) + * to execute commands in a generic way. But as we support local + * execution of utility commands, we should also process those udf + * calls locally as well. In that case, we simply execute the query + * implying the udf call in below conditional block. + */ + if (IsA(localTaskRawParseTree, SelectStmt)) + { + /* we have no additional parameters to rewrite the UDF call RawStmt */ + Query *localUdfTaskQuery = + RewriteRawQueryStmt(localTaskRawStmt, localTaskQueryCommand, NULL, 0); + + LocallyExecuteUdfTaskQuery(localUdfTaskQuery); + } + else + { + /* + * It is a regular utility command or SELECT query with non-udf, + * targets, then we should execute it locally via process utility. + * + * If it is a regular utility command, CitusProcessUtility is the + * appropriate function to process that command. However, if it's + * a SELECT query with non-udf targets, CitusProcessUtility would + * error out as we are not expecting such SELECT queries triggered + * by utility commands. + */ + CitusProcessUtility(localTaskRawParseTree, localTaskQueryCommand, + PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL); + } +} + + +/* + * LocallyExecuteUdfTaskQuery executes the given udf command locally. Local udf + * command is simply a "SELECT udf_call()" query and so it cannot be executed + * via process utility. + */ +static void +LocallyExecuteUdfTaskQuery(Query *localUdfTaskQuery) +{ + /* we do not need any destination receivers to execute it */ + ExecuteQueryIntoDestReceiver(localUdfTaskQuery, NULL, None_Receiver); +} + + /* * LogLocalCommand logs commands executed locally on this node. Although we're * talking about local execution, the function relies on citus.log_remote_commands diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 59826fe64..82d177054 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -607,6 +607,22 @@ Query * ParseQueryString(const char *queryString, Oid *paramOids, int numParams) { RawStmt *rawStmt = (RawStmt *) ParseTreeRawStmt(queryString); + + /* rewrite the parsed RawStmt to produce a Query */ + Query *query = RewriteRawQueryStmt(rawStmt, queryString, paramOids, numParams); + + return query; +} + + +/* + * RewriteRawQueryStmt rewrites the given parsed RawStmt according to the other + * parameters and returns a Query struct. + */ +Query * +RewriteRawQueryStmt(RawStmt *rawStmt, const char *queryString, Oid *paramOids, int + numParams) +{ List *queryTreeList = pg_analyze_and_rewrite(rawStmt, queryString, paramOids, numParams, NULL); diff --git a/src/include/distributed/local_executor.h b/src/include/distributed/local_executor.h index 98e69bf97..8b11e096c 100644 --- a/src/include/distributed/local_executor.h +++ b/src/include/distributed/local_executor.h @@ -22,6 +22,7 @@ extern bool TransactionConnectedToLocalGroup; /* extern function declarations */ extern uint64 ExecuteLocalTaskList(CitusScanState *scanState, List *taskList); +extern void ExecuteLocalUtilityTaskList(List *localTaskList); extern void ExtractLocalAndRemoteTasks(bool readOnlyPlan, List *taskList, List **localTaskList, List **remoteTaskList); extern bool ShouldExecuteTasksLocally(List *taskList); diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index 7ccbfd3e8..448158d59 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -95,6 +95,8 @@ extern void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *worker extern void ReadFileIntoTupleStore(char *fileName, char *copyFormat, TupleDesc tupleDescriptor, Tuplestorestate *tupstore); extern Query * ParseQueryString(const char *queryString, Oid *paramOids, int numParams); +extern Query * RewriteRawQueryStmt(RawStmt *rawStmt, const char *queryString, + Oid *paramOids, int numParams); extern void ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamListInfo params, DestReceiver *dest);