diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 710654bf4..fbed95a5f 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -37,6 +37,7 @@ #include "commands/dbcommands.h" #include "commands/defrem.h" #include "commands/tablecmds.h" +#include "distributed/adaptive_executor.h" #include "distributed/colocation_utils.h" #include "distributed/commands.h" #include "distributed/commands/multi_copy.h" @@ -680,14 +681,6 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) EnsurePartitionTableNotReplicated(targetRelationId); } - /* - * If it is a local placement of a distributed table or a reference table, - * then execute the DDL command locally. - * Here we set localExecutionSupported to true regardless of whether the - * DDL command is run for/on a distributed table as - * ExecuteUtilityTaskListWithoutResults would already identify those - * DDL tasks not accessing any of the local placements. - */ bool localExecutionSupported = true; if (!ddlJob->concurrentIndexCmd) @@ -710,7 +703,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) SendCommandToWorkersWithMetadata((char *) ddlJob->commandString); } - ExecuteUtilityTaskListWithoutResults(ddlJob->taskList, localExecutionSupported); + ExecuteUtilityTaskList(ddlJob->taskList, localExecutionSupported); } else { @@ -721,8 +714,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) PG_TRY(); { - ExecuteUtilityTaskListWithoutResults(ddlJob->taskList, - localExecutionSupported); + ExecuteUtilityTaskList(ddlJob->taskList, localExecutionSupported); if (shouldSyncMetadata) { diff --git a/src/backend/distributed/commands/vacuum.c b/src/backend/distributed/commands/vacuum.c index ddec8c1a3..db32e23fe 100644 --- a/src/backend/distributed/commands/vacuum.c +++ b/src/backend/distributed/commands/vacuum.c @@ -16,6 +16,7 @@ #include "commands/defrem.h" #endif #include "commands/vacuum.h" +#include "distributed/adaptive_executor.h" #include "distributed/commands.h" #include "distributed/commands/utility_hook.h" #include "distributed/deparse_shard_query.h" @@ -112,8 +113,7 @@ PostprocessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand) /* local execution is not implemented for VACUUM commands */ bool localExecutionSupported = false; - - ExecuteUtilityTaskListWithoutResults(taskList, localExecutionSupported); + ExecuteUtilityTaskList(taskList, localExecutionSupported); executedVacuumCount++; } relationIndex++; diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 6779ff764..e03a0330b 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -628,7 +628,6 @@ static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events eventCount, bool *cancellationReceived); static long MillisecondsBetweenTimestamps(instr_time startTime, instr_time endTime); - /* * AdaptiveExecutorPreExecutorRun gets called right before postgres starts its executor * run. Given that the result of our subplans would be evaluated before the first call to @@ -787,10 +786,12 @@ static void RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution) { EState *estate = ScanStateGetExecutorState(scanState); + bool isUtilityCommand = false; uint64 rowsProcessed = ExecuteLocalTaskListExtended(execution->localTaskList, estate->es_param_list_info, scanState->distributedPlan, - scanState->tuplestorestate); + scanState->tuplestorestate, + isUtilityCommand); /* * We're deliberately not setting execution->rowsProcessed here. The main reason @@ -819,57 +820,21 @@ AdjustDistributedExecutionAfterLocalExecution(DistributedExecution *execution) /* - * ExecuteUtilityTaskListWithoutResults is a wrapper around executing task - * list for utility commands. For remote tasks, it simply calls in adaptive - * executor's task execution function. For local tasks (if any), kicks Process - * Utility via CitusProcessUtility for utility commands. As some local utility - * commands can trigger udf calls, this function also processes those udf calls - * locally. + * ExecuteUtilityTaskList is a wrapper around executing task + * list for utility commands. */ -void -ExecuteUtilityTaskListWithoutResults(List *taskList, bool localExecutionSupported) +uint64 +ExecuteUtilityTaskList(List *utilityTaskList, bool localExecutionSupported) { - RowModifyLevel rowModifyLevel = ROW_MODIFY_NONE; + RowModifyLevel modLevel = ROW_MODIFY_NONE; + ExecutionParams *executionParams = CreateBasicExecutionParams( + modLevel, utilityTaskList, MaxAdaptiveExecutorPoolSize, localExecutionSupported + ); + executionParams->xactProperties = + DecideTransactionPropertiesForTaskList(modLevel, utilityTaskList, false); + executionParams->isUtilityCommand = true; - List *localTaskList = NIL; - List *remoteTaskList = NIL; - - /* - * Divide tasks into two if localExecutionSupported is set to true and execute - * the local tasks - */ - if (localExecutionSupported && ShouldExecuteTasksLocally(taskList)) - { - /* - * Either we are executing a utility command or a UDF call triggered - * by such a command, it has to be a modifying one - */ - bool readOnlyPlan = false; - - /* set local (if any) & remote tasks */ - ExtractLocalAndRemoteTasks(readOnlyPlan, taskList, &localTaskList, - &remoteTaskList); - - /* execute local tasks */ - ExecuteLocalUtilityTaskList(localTaskList); - } - else - { - /* all tasks should be executed via remote connections */ - remoteTaskList = taskList; - } - - /* execute remote tasks if any */ - if (list_length(remoteTaskList) > 0) - { - /* - * We already executed tasks locally. We should ideally remove this method and - * let ExecuteTaskListExtended handle the local execution. - */ - localExecutionSupported = false; - ExecuteTaskList(rowModifyLevel, remoteTaskList, MaxAdaptiveExecutorPoolSize, - localExecutionSupported); - } + return ExecuteTaskListExtended(executionParams); } @@ -977,8 +942,15 @@ ExecuteTaskListExtended(ExecutionParams *executionParams) ErrorIfTransactionAccessedPlacementsLocally(); } - locallyProcessedRows += ExecuteLocalTaskList(localTaskList, - executionParams->tupleStore); + if (executionParams->isUtilityCommand) + { + locallyProcessedRows += ExecuteLocalUtilityTaskList(localTaskList); + } + else + { + locallyProcessedRows += ExecuteLocalTaskList(localTaskList, + executionParams->tupleStore); + } if (MultiShardConnectionType == SEQUENTIAL_CONNECTION) { @@ -1020,6 +992,7 @@ CreateBasicExecutionParams(RowModifyLevel modLevel, executionParams->tupleStore = NULL; executionParams->tupleDescriptor = NULL; executionParams->hasReturning = false; + executionParams->isUtilityCommand = false; executionParams->jobIdList = NIL; return executionParams; diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index 32771d68b..f8a8c14e1 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -147,8 +147,30 @@ ExecuteLocalTaskList(List *taskList, Tuplestorestate *tupleStoreState) } DistributedPlan *distributedPlan = NULL; ParamListInfo paramListInfo = NULL; + bool isUtilityCommand = false; return ExecuteLocalTaskListExtended(taskList, paramListInfo, distributedPlan, - tupleStoreState); + tupleStoreState, isUtilityCommand); +} + + +/* + * ExecuteLocalUtilityTaskList executes the given tasks locally. + * + * The function returns totalRowsProcessed. + */ +uint64 +ExecuteLocalUtilityTaskList(List *utilityTaskList) +{ + if (list_length(utilityTaskList) == 0) + { + return 0; + } + DistributedPlan *distributedPlan = NULL; + ParamListInfo paramListInfo = NULL; + Tuplestorestate *tupleStoreState = NULL; + bool isUtilityCommand = true; + return ExecuteLocalTaskListExtended(utilityTaskList, paramListInfo, distributedPlan, + tupleStoreState, isUtilityCommand); } @@ -163,9 +185,11 @@ ExecuteLocalTaskList(List *taskList, Tuplestorestate *tupleStoreState) * The function returns totalRowsProcessed. */ uint64 -ExecuteLocalTaskListExtended(List *taskList, ParamListInfo orig_paramListInfo, +ExecuteLocalTaskListExtended(List *taskList, + ParamListInfo orig_paramListInfo, DistributedPlan *distributedPlan, - Tuplestorestate *tupleStoreState) + Tuplestorestate *tupleStoreState, + bool isUtilityCommand) { ParamListInfo paramListInfo = copyParamList(orig_paramListInfo); int numParams = 0; @@ -202,6 +226,11 @@ ExecuteLocalTaskListExtended(List *taskList, ParamListInfo orig_paramListInfo, } LogLocalCommand(task); + if (isUtilityCommand) + { + LocallyExecuteUtilityTask(TaskQueryStringForAllPlacements(task)); + continue; + } PlannedStmt *localPlan = GetCachedLocalPlan(task, distributedPlan); @@ -271,7 +300,6 @@ ExecuteLocalTaskListExtended(List *taskList, ParamListInfo orig_paramListInfo, localPlan = planner(shardQuery, cursorOptions, paramListInfo); } - char *shardQueryString = NULL; if (GetTaskQueryType(task) == TASK_QUERY_TEXT) { @@ -332,39 +360,6 @@ ExtractParametersForLocalExecution(ParamListInfo paramListInfo, Oid **parameterT } -/* - * ExecuteLocalUtilityTaskList executes a list of tasks locally. This function - * also logs local execution notice for each task and sets - * TransactionAccessedLocalPlacement to true for next set of possible queries - * & commands within the current transaction block. See the comment in function. - */ -void -ExecuteLocalUtilityTaskList(List *localTaskList) -{ - Task *localTask = NULL; - - foreach_ptr(localTask, localTaskList) - { - const char *localTaskQueryCommand = TaskQueryStringForAllPlacements(localTask); - - /* we do not expect tasks with INVALID_SHARD_ID for utility commands */ - Assert(localTask->anchorShardId != INVALID_SHARD_ID); - - Assert(TaskAccessesLocalNode(localTask)); - - /* - * We should register the access to local placement to force the local - * execution of the following commands withing the current transaction. - */ - SetLocalExecutionStatus(LOCAL_EXECUTION_REQUIRED); - - LogLocalCommand(localTask); - - LocallyExecuteUtilityTask(localTaskQueryCommand); - } -} - - /* * LocallyExecuteUtilityTask executes the given local task query in the current * session. diff --git a/src/backend/distributed/master/master_truncate.c b/src/backend/distributed/master/master_truncate.c index 76ad4c091..fb92d3f11 100644 --- a/src/backend/distributed/master/master_truncate.c +++ b/src/backend/distributed/master/master_truncate.c @@ -17,6 +17,7 @@ #include "commands/tablecmds.h" #include "commands/trigger.h" #include "distributed/citus_ruleutils.h" +#include "distributed/adaptive_executor.h" #include "distributed/commands/utility_hook.h" #include "distributed/deparse_shard_query.h" #include "distributed/foreign_key_relationship.h" @@ -85,8 +86,7 @@ citus_truncate_trigger(PG_FUNCTION_ARGS) * then execute TRUNCATE command locally. */ bool localExecutionSupported = true; - - ExecuteUtilityTaskListWithoutResults(taskList, localExecutionSupported); + ExecuteUtilityTaskList(taskList, localExecutionSupported); } PG_RETURN_DATUM(PointerGetDatum(NULL)); diff --git a/src/include/distributed/adaptive_executor.h b/src/include/distributed/adaptive_executor.h index 3dd804e06..3fec4b8a5 100644 --- a/src/include/distributed/adaptive_executor.h +++ b/src/include/distributed/adaptive_executor.h @@ -12,6 +12,7 @@ extern int ExecutorSlowStartInterval; extern uint64 ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int targetPoolSize, bool localExecutionSupported); +extern uint64 ExecuteUtilityTaskList(List *utilityTaskList, bool localExecutionSupported); extern uint64 ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList, int targetPoolSize, List *jobIdList); diff --git a/src/include/distributed/local_executor.h b/src/include/distributed/local_executor.h index f42a7c47b..37a5696f0 100644 --- a/src/include/distributed/local_executor.h +++ b/src/include/distributed/local_executor.h @@ -29,11 +29,12 @@ extern enum LocalExecutionStatus CurrentLocalExecutionStatus; /* extern function declarations */ extern uint64 ExecuteLocalTaskList(List *taskList, Tuplestorestate *tupleStoreState); +extern uint64 ExecuteLocalUtilityTaskList(List *utilityTaskList); extern uint64 ExecuteLocalTaskListExtended(List *taskList, ParamListInfo orig_paramListInfo, DistributedPlan *distributedPlan, - Tuplestorestate *tupleStoreState); -extern void ExecuteLocalUtilityTaskList(List *localTaskList); + Tuplestorestate *tupleStoreState, + bool isUtilityCommand); extern void ExtractLocalAndRemoteTasks(bool readOnlyPlan, List *taskList, List **localTaskList, List **remoteTaskList); extern bool ShouldExecuteTasksLocally(List *taskList); diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index a2755e640..9670b0a6d 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -108,6 +108,10 @@ typedef struct ExecutionParams /* localExecutionSupported is true if we can use local execution, if it is false * local execution will not be used. */ bool localExecutionSupported; + + /* isUtilityCommand is true if the current execution is for a utility + * command such as a DDL command.*/ + bool isUtilityCommand; } ExecutionParams; ExecutionParams * CreateBasicExecutionParams(RowModifyLevel modLevel, @@ -120,8 +124,6 @@ extern uint64 ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskL TupleDesc tupleDescriptor, Tuplestorestate *tupleStore, bool hasReturning); -extern void ExecuteUtilityTaskListWithoutResults(List *taskList, bool - localExecutionSupported); extern bool IsCitusCustomState(PlanState *planState); extern TupleTableSlot * CitusExecScan(CustomScanState *node); extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState);