remove ExecuteUtilityTaskListWithoutResults (#3696)

This PR removes ExecuteUtilityTaskListWithoutResults and uses the same
path for local execution via ExecuteTaskListExtended.
ExecuteUtilityTaskList is added. ExecuteLocalTaskListExtended now has a
parameter for utility commands so that it can call the right method. In
order not to change the existing calls,
ExecuteTaskListExtendedInternal is added, which is the main method that
runs the execution, via local and remote execution.
pull/3823/head
SaitTalhaNisanci 2020-05-07 13:30:50 +03:00 committed by GitHub
parent 105de7beb8
commit 22c903b151
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 72 additions and 108 deletions

View File

@ -37,6 +37,7 @@
#include "commands/dbcommands.h"
#include "commands/defrem.h"
#include "commands/tablecmds.h"
#include "distributed/adaptive_executor.h"
#include "distributed/colocation_utils.h"
#include "distributed/commands.h"
#include "distributed/commands/multi_copy.h"
@ -680,14 +681,6 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
EnsurePartitionTableNotReplicated(targetRelationId);
}
/*
* If it is a local placement of a distributed table or a reference table,
* then execute the DDL command locally.
* Here we set localExecutionSupported to true regardless of whether the
* DDL command is run for/on a distributed table as
* ExecuteUtilityTaskListWithoutResults would already identify those
* DDL tasks not accessing any of the local placements.
*/
bool localExecutionSupported = true;
if (!ddlJob->concurrentIndexCmd)
@ -710,7 +703,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
SendCommandToWorkersWithMetadata((char *) ddlJob->commandString);
}
ExecuteUtilityTaskListWithoutResults(ddlJob->taskList, localExecutionSupported);
ExecuteUtilityTaskList(ddlJob->taskList, localExecutionSupported);
}
else
{
@ -721,8 +714,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
PG_TRY();
{
ExecuteUtilityTaskListWithoutResults(ddlJob->taskList,
localExecutionSupported);
ExecuteUtilityTaskList(ddlJob->taskList, localExecutionSupported);
if (shouldSyncMetadata)
{

View File

@ -16,6 +16,7 @@
#include "commands/defrem.h"
#endif
#include "commands/vacuum.h"
#include "distributed/adaptive_executor.h"
#include "distributed/commands.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/deparse_shard_query.h"
@ -112,8 +113,7 @@ PostprocessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand)
/* local execution is not implemented for VACUUM commands */
bool localExecutionSupported = false;
ExecuteUtilityTaskListWithoutResults(taskList, localExecutionSupported);
ExecuteUtilityTaskList(taskList, localExecutionSupported);
executedVacuumCount++;
}
relationIndex++;

View File

@ -628,7 +628,6 @@ static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events
eventCount, bool *cancellationReceived);
static long MillisecondsBetweenTimestamps(instr_time startTime, instr_time endTime);
/*
* AdaptiveExecutorPreExecutorRun gets called right before postgres starts its executor
* run. Given that the result of our subplans would be evaluated before the first call to
@ -787,10 +786,12 @@ static void
RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution)
{
EState *estate = ScanStateGetExecutorState(scanState);
bool isUtilityCommand = false;
uint64 rowsProcessed = ExecuteLocalTaskListExtended(execution->localTaskList,
estate->es_param_list_info,
scanState->distributedPlan,
scanState->tuplestorestate);
scanState->tuplestorestate,
isUtilityCommand);
/*
* We're deliberately not setting execution->rowsProcessed here. The main reason
@ -819,57 +820,21 @@ AdjustDistributedExecutionAfterLocalExecution(DistributedExecution *execution)
/*
* ExecuteUtilityTaskListWithoutResults is a wrapper around executing task
* list for utility commands. For remote tasks, it simply calls in adaptive
* executor's task execution function. For local tasks (if any), kicks Process
* Utility via CitusProcessUtility for utility commands. As some local utility
* commands can trigger udf calls, this function also processes those udf calls
* locally.
* ExecuteUtilityTaskList is a wrapper around executing task
* list for utility commands.
*/
void
ExecuteUtilityTaskListWithoutResults(List *taskList, bool localExecutionSupported)
uint64
ExecuteUtilityTaskList(List *utilityTaskList, bool localExecutionSupported)
{
RowModifyLevel rowModifyLevel = ROW_MODIFY_NONE;
RowModifyLevel modLevel = ROW_MODIFY_NONE;
ExecutionParams *executionParams = CreateBasicExecutionParams(
modLevel, utilityTaskList, MaxAdaptiveExecutorPoolSize, localExecutionSupported
);
executionParams->xactProperties =
DecideTransactionPropertiesForTaskList(modLevel, utilityTaskList, false);
executionParams->isUtilityCommand = true;
List *localTaskList = NIL;
List *remoteTaskList = NIL;
/*
* Divide tasks into two if localExecutionSupported is set to true and execute
* the local tasks
*/
if (localExecutionSupported && ShouldExecuteTasksLocally(taskList))
{
/*
* Either we are executing a utility command or a UDF call triggered
* by such a command, it has to be a modifying one
*/
bool readOnlyPlan = false;
/* set local (if any) & remote tasks */
ExtractLocalAndRemoteTasks(readOnlyPlan, taskList, &localTaskList,
&remoteTaskList);
/* execute local tasks */
ExecuteLocalUtilityTaskList(localTaskList);
}
else
{
/* all tasks should be executed via remote connections */
remoteTaskList = taskList;
}
/* execute remote tasks if any */
if (list_length(remoteTaskList) > 0)
{
/*
* We already executed tasks locally. We should ideally remove this method and
* let ExecuteTaskListExtended handle the local execution.
*/
localExecutionSupported = false;
ExecuteTaskList(rowModifyLevel, remoteTaskList, MaxAdaptiveExecutorPoolSize,
localExecutionSupported);
}
return ExecuteTaskListExtended(executionParams);
}
@ -977,8 +942,15 @@ ExecuteTaskListExtended(ExecutionParams *executionParams)
ErrorIfTransactionAccessedPlacementsLocally();
}
if (executionParams->isUtilityCommand)
{
locallyProcessedRows += ExecuteLocalUtilityTaskList(localTaskList);
}
else
{
locallyProcessedRows += ExecuteLocalTaskList(localTaskList,
executionParams->tupleStore);
}
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
{
@ -1020,6 +992,7 @@ CreateBasicExecutionParams(RowModifyLevel modLevel,
executionParams->tupleStore = NULL;
executionParams->tupleDescriptor = NULL;
executionParams->hasReturning = false;
executionParams->isUtilityCommand = false;
executionParams->jobIdList = NIL;
return executionParams;

View File

@ -147,8 +147,30 @@ ExecuteLocalTaskList(List *taskList, Tuplestorestate *tupleStoreState)
}
DistributedPlan *distributedPlan = NULL;
ParamListInfo paramListInfo = NULL;
bool isUtilityCommand = false;
return ExecuteLocalTaskListExtended(taskList, paramListInfo, distributedPlan,
tupleStoreState);
tupleStoreState, isUtilityCommand);
}
/*
* ExecuteLocalUtilityTaskList executes the given tasks locally.
*
* The function returns totalRowsProcessed.
*/
uint64
ExecuteLocalUtilityTaskList(List *utilityTaskList)
{
if (list_length(utilityTaskList) == 0)
{
return 0;
}
DistributedPlan *distributedPlan = NULL;
ParamListInfo paramListInfo = NULL;
Tuplestorestate *tupleStoreState = NULL;
bool isUtilityCommand = true;
return ExecuteLocalTaskListExtended(utilityTaskList, paramListInfo, distributedPlan,
tupleStoreState, isUtilityCommand);
}
@ -163,9 +185,11 @@ ExecuteLocalTaskList(List *taskList, Tuplestorestate *tupleStoreState)
* The function returns totalRowsProcessed.
*/
uint64
ExecuteLocalTaskListExtended(List *taskList, ParamListInfo orig_paramListInfo,
ExecuteLocalTaskListExtended(List *taskList,
ParamListInfo orig_paramListInfo,
DistributedPlan *distributedPlan,
Tuplestorestate *tupleStoreState)
Tuplestorestate *tupleStoreState,
bool isUtilityCommand)
{
ParamListInfo paramListInfo = copyParamList(orig_paramListInfo);
int numParams = 0;
@ -202,6 +226,11 @@ ExecuteLocalTaskListExtended(List *taskList, ParamListInfo orig_paramListInfo,
}
LogLocalCommand(task);
if (isUtilityCommand)
{
LocallyExecuteUtilityTask(TaskQueryStringForAllPlacements(task));
continue;
}
PlannedStmt *localPlan = GetCachedLocalPlan(task, distributedPlan);
@ -271,7 +300,6 @@ ExecuteLocalTaskListExtended(List *taskList, ParamListInfo orig_paramListInfo,
localPlan = planner(shardQuery, cursorOptions, paramListInfo);
}
char *shardQueryString = NULL;
if (GetTaskQueryType(task) == TASK_QUERY_TEXT)
{
@ -332,39 +360,6 @@ ExtractParametersForLocalExecution(ParamListInfo paramListInfo, Oid **parameterT
}
/*
* ExecuteLocalUtilityTaskList executes a list of tasks locally. This function
* also logs local execution notice for each task and sets
* TransactionAccessedLocalPlacement to true for next set of possible queries
* & commands within the current transaction block. See the comment in function.
*/
void
ExecuteLocalUtilityTaskList(List *localTaskList)
{
Task *localTask = NULL;
foreach_ptr(localTask, localTaskList)
{
const char *localTaskQueryCommand = TaskQueryStringForAllPlacements(localTask);
/* we do not expect tasks with INVALID_SHARD_ID for utility commands */
Assert(localTask->anchorShardId != INVALID_SHARD_ID);
Assert(TaskAccessesLocalNode(localTask));
/*
* We should register the access to local placement to force the local
* execution of the following commands withing the current transaction.
*/
SetLocalExecutionStatus(LOCAL_EXECUTION_REQUIRED);
LogLocalCommand(localTask);
LocallyExecuteUtilityTask(localTaskQueryCommand);
}
}
/*
* LocallyExecuteUtilityTask executes the given local task query in the current
* session.

View File

@ -17,6 +17,7 @@
#include "commands/tablecmds.h"
#include "commands/trigger.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/adaptive_executor.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/foreign_key_relationship.h"
@ -85,8 +86,7 @@ citus_truncate_trigger(PG_FUNCTION_ARGS)
* then execute TRUNCATE command locally.
*/
bool localExecutionSupported = true;
ExecuteUtilityTaskListWithoutResults(taskList, localExecutionSupported);
ExecuteUtilityTaskList(taskList, localExecutionSupported);
}
PG_RETURN_DATUM(PointerGetDatum(NULL));

View File

@ -12,6 +12,7 @@ extern int ExecutorSlowStartInterval;
extern uint64 ExecuteTaskList(RowModifyLevel modLevel, List *taskList,
int targetPoolSize, bool localExecutionSupported);
extern uint64 ExecuteUtilityTaskList(List *utilityTaskList, bool localExecutionSupported);
extern uint64 ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList,
int targetPoolSize, List *jobIdList);

View File

@ -29,11 +29,12 @@ extern enum LocalExecutionStatus CurrentLocalExecutionStatus;
/* extern function declarations */
extern uint64 ExecuteLocalTaskList(List *taskList,
Tuplestorestate *tupleStoreState);
extern uint64 ExecuteLocalUtilityTaskList(List *utilityTaskList);
extern uint64 ExecuteLocalTaskListExtended(List *taskList, ParamListInfo
orig_paramListInfo,
DistributedPlan *distributedPlan,
Tuplestorestate *tupleStoreState);
extern void ExecuteLocalUtilityTaskList(List *localTaskList);
Tuplestorestate *tupleStoreState,
bool isUtilityCommand);
extern void ExtractLocalAndRemoteTasks(bool readOnlyPlan, List *taskList,
List **localTaskList, List **remoteTaskList);
extern bool ShouldExecuteTasksLocally(List *taskList);

View File

@ -108,6 +108,10 @@ typedef struct ExecutionParams
/* localExecutionSupported is true if we can use local execution, if it is false
* local execution will not be used. */
bool localExecutionSupported;
/* isUtilityCommand is true if the current execution is for a utility
* command such as a DDL command.*/
bool isUtilityCommand;
} ExecutionParams;
ExecutionParams * CreateBasicExecutionParams(RowModifyLevel modLevel,
@ -120,8 +124,6 @@ extern uint64 ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskL
TupleDesc tupleDescriptor,
Tuplestorestate *tupleStore,
bool hasReturning);
extern void ExecuteUtilityTaskListWithoutResults(List *taskList, bool
localExecutionSupported);
extern bool IsCitusCustomState(PlanState *planState);
extern TupleTableSlot * CitusExecScan(CustomScanState *node);
extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState);