Merge pull request #3889 from citusdata/fix/stage_generates_utility_commands

Execute shard creation as utility tasks
pull/3869/head
Marco Slot 2020-06-10 11:51:24 +02:00 committed by GitHub
commit 02a70df656
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 54 additions and 25 deletions

View File

@ -855,6 +855,29 @@ ExecuteUtilityTaskList(List *utilityTaskList, bool localExecutionSupported)
}
/*
* ExecuteUtilityTaskListExtended is a wrapper around executing task
* list for utility commands.
*/
uint64
ExecuteUtilityTaskListExtended(List *utilityTaskList, int poolSize,
bool localExecutionSupported)
{
RowModifyLevel modLevel = ROW_MODIFY_NONE;
ExecutionParams *executionParams = CreateBasicExecutionParams(
modLevel, utilityTaskList, poolSize, localExecutionSupported
);
bool excludeFromXact = false;
executionParams->xactProperties =
DecideTransactionPropertiesForTaskList(modLevel, utilityTaskList,
excludeFromXact);
executionParams->isUtilityCommand = true;
return ExecuteTaskListExtended(executionParams);
}
/*
* ExecuteTaskListOutsideTransaction is a proxy to ExecuteTaskListExtended
* with defaults for some of the arguments.

View File

@ -366,35 +366,39 @@ ExtractParametersForLocalExecution(ParamListInfo paramListInfo, Oid **parameterT
static void
LocallyExecuteUtilityTask(const char *localTaskQueryCommand)
{
RawStmt *localTaskRawStmt = (RawStmt *) ParseTreeRawStmt(localTaskQueryCommand);
List *parseTreeList = pg_parse_query(localTaskQueryCommand);
RawStmt *localTaskRawStmt = NULL;
Node *localTaskRawParseTree = localTaskRawStmt->stmt;
/*
* Actually, the query passed to this function would mostly be a
* utility command to be executed locally. However, some utility
* commands do trigger udf calls (e.g worker_apply_shard_ddl_command)
* to execute commands in a generic way. But as we support local
* execution of utility commands, we should also process those udf
* calls locally as well. In that case, we simply execute the query
* implying the udf call in below conditional block.
*/
if (IsA(localTaskRawParseTree, SelectStmt))
foreach_ptr(localTaskRawStmt, parseTreeList)
{
/* we have no external parameters to rewrite the UDF call RawStmt */
Query *localUdfTaskQuery =
RewriteRawQueryStmt(localTaskRawStmt, localTaskQueryCommand, NULL, 0);
Node *localTaskRawParseTree = localTaskRawStmt->stmt;
LocallyExecuteUdfTaskQuery(localUdfTaskQuery);
}
else
{
/*
* It is a regular utility command we should execute it locally via
* process utility.
* Actually, the query passed to this function would mostly be a
* utility command to be executed locally. However, some utility
* commands do trigger udf calls (e.g worker_apply_shard_ddl_command)
* to execute commands in a generic way. But as we support local
* execution of utility commands, we should also process those udf
* calls locally as well. In that case, we simply execute the query
* implying the udf call in below conditional block.
*/
CitusProcessUtility(localTaskRawParseTree, localTaskQueryCommand,
PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL);
if (IsA(localTaskRawParseTree, SelectStmt))
{
/* we have no external parameters to rewrite the UDF call RawStmt */
Query *localUdfTaskQuery =
RewriteRawQueryStmt(localTaskRawStmt, localTaskQueryCommand, NULL, 0);
LocallyExecuteUdfTaskQuery(localUdfTaskQuery);
}
else
{
/*
* It is a regular utility command we should execute it locally via
* process utility.
*/
CitusProcessUtility(localTaskRawParseTree, localTaskQueryCommand,
PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL);
}
}
}

View File

@ -550,7 +550,7 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
poolSize = MaxAdaptiveExecutorPoolSize;
}
bool localExecutionSupported = true;
ExecuteTaskList(ROW_MODIFY_NONE, taskList, poolSize, localExecutionSupported);
ExecuteUtilityTaskListExtended(taskList, poolSize, localExecutionSupported);
}

View File

@ -13,6 +13,8 @@ extern int ExecutorSlowStartInterval;
extern uint64 ExecuteTaskList(RowModifyLevel modLevel, List *taskList,
int targetPoolSize, bool localExecutionSupported);
extern uint64 ExecuteUtilityTaskList(List *utilityTaskList, bool localExecutionSupported);
extern uint64 ExecuteUtilityTaskListExtended(List *utilityTaskList, int poolSize,
bool localExecutionSupported);
extern uint64 ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList,
int targetPoolSize, List *jobIdList);