mirror of https://github.com/citusdata/citus.git
introduce functions for local execution of utility commands
parent
ed66517e95
commit
82f3157750
|
@ -67,9 +67,6 @@
|
||||||
* use local query execution since local execution is sequential. Basically,
|
* use local query execution since local execution is sequential. Basically,
|
||||||
* we do not want to lose parallelism across local tasks by switching to local
|
* we do not want to lose parallelism across local tasks by switching to local
|
||||||
* execution.
|
* execution.
|
||||||
* - The local execution currently only supports queries. In other words, any
|
|
||||||
* utility commands like TRUNCATE, fails if the command is executed after a local
|
|
||||||
* execution inside a transaction block.
|
|
||||||
* - The local execution cannot be mixed with the executors other than adaptive,
|
* - The local execution cannot be mixed with the executors other than adaptive,
|
||||||
* namely task-tracker executor.
|
* namely task-tracker executor.
|
||||||
* - Related with the previous item, COPY command cannot be mixed with local
|
* - Related with the previous item, COPY command cannot be mixed with local
|
||||||
|
@ -79,6 +76,7 @@
|
||||||
#include "postgres.h"
|
#include "postgres.h"
|
||||||
#include "miscadmin.h"
|
#include "miscadmin.h"
|
||||||
|
|
||||||
|
#include "distributed/commands/utility_hook.h"
|
||||||
#include "distributed/citus_custom_scan.h"
|
#include "distributed/citus_custom_scan.h"
|
||||||
#include "distributed/citus_ruleutils.h"
|
#include "distributed/citus_ruleutils.h"
|
||||||
#include "distributed/deparse_shard_query.h"
|
#include "distributed/deparse_shard_query.h"
|
||||||
|
@ -90,6 +88,7 @@
|
||||||
#include "distributed/relation_access_tracking.h"
|
#include "distributed/relation_access_tracking.h"
|
||||||
#include "distributed/remote_commands.h" /* to access LogRemoteCommands */
|
#include "distributed/remote_commands.h" /* to access LogRemoteCommands */
|
||||||
#include "distributed/transaction_management.h"
|
#include "distributed/transaction_management.h"
|
||||||
|
#include "distributed/worker_protocol.h"
|
||||||
#include "executor/tstoreReceiver.h"
|
#include "executor/tstoreReceiver.h"
|
||||||
#include "executor/tuptable.h"
|
#include "executor/tuptable.h"
|
||||||
#if PG_VERSION_NUM >= 120000
|
#if PG_VERSION_NUM >= 120000
|
||||||
|
@ -118,6 +117,8 @@ static void LogLocalCommand(Task *task);
|
||||||
static void ExtractParametersForLocalExecution(ParamListInfo paramListInfo,
|
static void ExtractParametersForLocalExecution(ParamListInfo paramListInfo,
|
||||||
Oid **parameterTypes,
|
Oid **parameterTypes,
|
||||||
const char ***parameterValues);
|
const char ***parameterValues);
|
||||||
|
static void LocallyExecuteUtilityTask(const char *utilityCommand);
|
||||||
|
static void LocallyExecuteUdfTaskQuery(Query *localUdfCommandQuery);
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -245,6 +246,100 @@ ExtractParametersForLocalExecution(ParamListInfo paramListInfo, Oid **parameterT
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ExecuteLocalUtilityTaskList executes a list of tasks locally. This function
|
||||||
|
* also logs local execution notice for each task and sets
|
||||||
|
* TransactionAccessedLocalPlacement to true for next set of possible queries
|
||||||
|
* & commands within the current transaction block. See the comment in function.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
ExecuteLocalUtilityTaskList(List *localTaskList)
|
||||||
|
{
|
||||||
|
Task *localTask = NULL;
|
||||||
|
|
||||||
|
foreach_ptr(localTask, localTaskList)
|
||||||
|
{
|
||||||
|
const char *localTaskQueryCommand = TaskQueryString(localTask);
|
||||||
|
|
||||||
|
/* we do not expect tasks with INVALID_SHARD_ID for utility commands */
|
||||||
|
Assert(localTask->anchorShardId != INVALID_SHARD_ID);
|
||||||
|
|
||||||
|
Assert(TaskAccessesLocalNode(localTask));
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We should register the access to local placement to force the local
|
||||||
|
* execution of the following commands withing the current transaction.
|
||||||
|
* It can be a coordinated transaction or a transaction that is initiated
|
||||||
|
* explicitly.
|
||||||
|
*/
|
||||||
|
TransactionAccessedLocalPlacement = true;
|
||||||
|
|
||||||
|
LogLocalCommand(localTask);
|
||||||
|
|
||||||
|
LocallyExecuteUtilityTask(localTaskQueryCommand);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* LocallyExecuteUtilityTask executes the given local task query in the current
|
||||||
|
* session.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
LocallyExecuteUtilityTask(const char *localTaskQueryCommand)
|
||||||
|
{
|
||||||
|
RawStmt *localTaskRawStmt = (RawStmt *) ParseTreeRawStmt(localTaskQueryCommand);
|
||||||
|
|
||||||
|
Node *localTaskRawParseTree = localTaskRawStmt->stmt;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Actually, the query passed to this function would mostly be a
|
||||||
|
* utility command to be executed locally. However, some utility
|
||||||
|
* commands do trigger udf calls (e.g worker_apply_shard_ddl_command)
|
||||||
|
* to execute commands in a generic way. But as we support local
|
||||||
|
* execution of utility commands, we should also process those udf
|
||||||
|
* calls locally as well. In that case, we simply execute the query
|
||||||
|
* implying the udf call in below conditional block.
|
||||||
|
*/
|
||||||
|
if (IsA(localTaskRawParseTree, SelectStmt))
|
||||||
|
{
|
||||||
|
/* we have no additional parameters to rewrite the UDF call RawStmt */
|
||||||
|
Query *localUdfTaskQuery =
|
||||||
|
RewriteRawQueryStmt(localTaskRawStmt, localTaskQueryCommand, NULL, 0);
|
||||||
|
|
||||||
|
LocallyExecuteUdfTaskQuery(localUdfTaskQuery);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* It is a regular utility command or SELECT query with non-udf,
|
||||||
|
* targets, then we should execute it locally via process utility.
|
||||||
|
*
|
||||||
|
* If it is a regular utility command, CitusProcessUtility is the
|
||||||
|
* appropriate function to process that command. However, if it's
|
||||||
|
* a SELECT query with non-udf targets, CitusProcessUtility would
|
||||||
|
* error out as we are not expecting such SELECT queries triggered
|
||||||
|
* by utility commands.
|
||||||
|
*/
|
||||||
|
CitusProcessUtility(localTaskRawParseTree, localTaskQueryCommand,
|
||||||
|
PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* LocallyExecuteUdfTaskQuery executes the given udf command locally. Local udf
|
||||||
|
* command is simply a "SELECT udf_call()" query and so it cannot be executed
|
||||||
|
* via process utility.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
LocallyExecuteUdfTaskQuery(Query *localUdfTaskQuery)
|
||||||
|
{
|
||||||
|
/* we do not need any destination receivers to execute it */
|
||||||
|
ExecuteQueryIntoDestReceiver(localUdfTaskQuery, NULL, None_Receiver);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* LogLocalCommand logs commands executed locally on this node. Although we're
|
* LogLocalCommand logs commands executed locally on this node. Although we're
|
||||||
* talking about local execution, the function relies on citus.log_remote_commands
|
* talking about local execution, the function relies on citus.log_remote_commands
|
||||||
|
|
|
@ -607,6 +607,22 @@ Query *
|
||||||
ParseQueryString(const char *queryString, Oid *paramOids, int numParams)
|
ParseQueryString(const char *queryString, Oid *paramOids, int numParams)
|
||||||
{
|
{
|
||||||
RawStmt *rawStmt = (RawStmt *) ParseTreeRawStmt(queryString);
|
RawStmt *rawStmt = (RawStmt *) ParseTreeRawStmt(queryString);
|
||||||
|
|
||||||
|
/* rewrite the parsed RawStmt to produce a Query */
|
||||||
|
Query *query = RewriteRawQueryStmt(rawStmt, queryString, paramOids, numParams);
|
||||||
|
|
||||||
|
return query;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RewriteRawQueryStmt rewrites the given parsed RawStmt according to the other
|
||||||
|
* parameters and returns a Query struct.
|
||||||
|
*/
|
||||||
|
Query *
|
||||||
|
RewriteRawQueryStmt(RawStmt *rawStmt, const char *queryString, Oid *paramOids, int
|
||||||
|
numParams)
|
||||||
|
{
|
||||||
List *queryTreeList =
|
List *queryTreeList =
|
||||||
pg_analyze_and_rewrite(rawStmt, queryString, paramOids, numParams, NULL);
|
pg_analyze_and_rewrite(rawStmt, queryString, paramOids, numParams, NULL);
|
||||||
|
|
||||||
|
|
|
@ -22,6 +22,7 @@ extern bool TransactionConnectedToLocalGroup;
|
||||||
|
|
||||||
/* extern function declarations */
|
/* extern function declarations */
|
||||||
extern uint64 ExecuteLocalTaskList(CitusScanState *scanState, List *taskList);
|
extern uint64 ExecuteLocalTaskList(CitusScanState *scanState, List *taskList);
|
||||||
|
extern void ExecuteLocalUtilityTaskList(List *localTaskList);
|
||||||
extern void ExtractLocalAndRemoteTasks(bool readOnlyPlan, List *taskList,
|
extern void ExtractLocalAndRemoteTasks(bool readOnlyPlan, List *taskList,
|
||||||
List **localTaskList, List **remoteTaskList);
|
List **localTaskList, List **remoteTaskList);
|
||||||
extern bool ShouldExecuteTasksLocally(List *taskList);
|
extern bool ShouldExecuteTasksLocally(List *taskList);
|
||||||
|
|
|
@ -95,6 +95,8 @@ extern void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *worker
|
||||||
extern void ReadFileIntoTupleStore(char *fileName, char *copyFormat, TupleDesc
|
extern void ReadFileIntoTupleStore(char *fileName, char *copyFormat, TupleDesc
|
||||||
tupleDescriptor, Tuplestorestate *tupstore);
|
tupleDescriptor, Tuplestorestate *tupstore);
|
||||||
extern Query * ParseQueryString(const char *queryString, Oid *paramOids, int numParams);
|
extern Query * ParseQueryString(const char *queryString, Oid *paramOids, int numParams);
|
||||||
|
extern Query * RewriteRawQueryStmt(RawStmt *rawStmt, const char *queryString,
|
||||||
|
Oid *paramOids, int numParams);
|
||||||
extern void ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamListInfo
|
extern void ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamListInfo
|
||||||
params,
|
params,
|
||||||
DestReceiver *dest);
|
DestReceiver *dest);
|
||||||
|
|
Loading…
Reference in New Issue