diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index c520cd6d4..89e5c9e7c 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -3,21 +3,22 @@ * * The scope of the local execution is locally executing the queries on the * shards. In other words, local execution does not deal with any local tables - * that are not on shards on the node that the query is being executed. In that sense, - * the local executor is only triggered if the node has both the metadata and the - * shards (e.g., only Citus MX worker nodes). + * that are not on shards on the node that the query is being executed. In that + * sense, the local executor is only triggered if the node has both the metadata + * and the shards (e.g., only Citus MX worker nodes). * * The goal of the local execution is to skip the unnecessary network round-trip - * happening on the node itself. Instead, identify the locally executable tasks and - * simply call PostgreSQL's planner and executor. + * happening on the node itself. Instead, identify the locally executable tasks + * and simply call PostgreSQL's planner and executor. * - * The local executor is an extension of the adaptive executor. So, the executor uses - * adaptive executor's custom scan nodes. + * The local executor is an extension of the adaptive executor. So, the executor + * uses adaptive executor's custom scan nodes. * - * One thing to note is that Citus MX is only supported with replication factor = 1, so - * keep that in mind while continuing the comments below. + * One thing to note is that Citus MX is only supported with replication factor + * to be equal to 1, so keep that in mind while continuing the comments below. * - * On the high level, there are 3 slightly different ways of utilizing local execution: + * On the high level, there are 3 slightly different ways of utilizing local + * execution: * * (1) Execution of local single shard queries of a distributed table * @@ -25,16 +26,19 @@ * executor, and since the query is only a single task the execution finishes * without going to the network at all. * - * Even if there is a transaction block (or recursively planned CTEs), as long - * as the queries hit the shards on the same node, the local execution will kick in. + * Even if there is a transaction block (or recursively planned CTEs), as + * long as the queries hit the shards on the same node, the local execution + * will kick in. * * (2) Execution of local single queries and remote multi-shard queries * - * The rule is simple. If a transaction block starts with a local query execution, - * all the other queries in the same transaction block that touch any local shard - * have to use the local execution. Although this sounds restrictive, we prefer to - * implement it in this way, otherwise we'd end-up with as complex scenarios as we - * have in the connection managements due to foreign keys. + * The rule is simple. If a transaction block starts with a local query + * execution, + * all the other queries in the same transaction block that touch any local + * shard have to use the local execution. Although this sounds restrictive, + * we prefer to implement it in this way, otherwise we'd end-up with as + * complex scenarios as we have in the connection managements due to foreign + * keys. * * See the following example: * BEGIN; @@ -49,26 +53,28 @@ * * (3) Modifications of reference tables * - * Modifications to reference tables have to be executed on all nodes. So, after the - * local execution, the adaptive executor keeps continuing the execution on the other - * nodes. + * Modifications to reference tables have to be executed on all nodes. So, + * after the local execution, the adaptive executor keeps continuing the + * execution on the other nodes. * - * Note that for read-only queries, after the local execution, there is no need to - * kick in adaptive executor. + * Note that for read-only queries, after the local execution, there is no + * need to kick in adaptive executor. * * There are also a few limitations/trade-offs that are worth mentioning. - * - The local execution on multiple shards might be slow because the execution has to - * happen one task at a time (e.g., no parallelism). - * - If a transaction block/CTE starts with a multi-shard command, we do not use local query execution - * since local execution is sequential. Basically, we do not want to lose parallelism - * across local tasks by switching to local execution. - * - The local execution currently only supports queries. In other words, any utility commands like TRUNCATE, - * fails if the command is executed after a local execution inside a transaction block. + * - The local execution on multiple shards might be slow because the execution + * has to happen one task at a time (e.g., no parallelism). + * - If a transaction block/CTE starts with a multi-shard command, we do not + * use local query execution since local execution is sequential. Basically, + * we do not want to lose parallelism across local tasks by switching to local + * execution. + * - The local execution currently only supports queries. In other words, any + * utility commands like TRUNCATE, fails if the command is executed after a local + * execution inside a transaction block. * - The local execution cannot be mixed with the executors other than adaptive, * namely task-tracker executor. - * - Related with the previous item, COPY command - * cannot be mixed with local execution in a transaction. The implication of that is any - * part of INSERT..SELECT via coordinator cannot happen via the local execution. + * - Related with the previous item, COPY command cannot be mixed with local + * execution in a transaction. The implication of that is any part of INSERT..SELECT + * via coordinator cannot happen via the local execution. */ #include "postgres.h" #include "miscadmin.h" @@ -215,9 +221,10 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList) /* - * ExtractParametersForLocalExecution extracts parameter types and values from - * the given ParamListInfo structure, and fills parameter type and value arrays. - * It does not change the oid of custom types, because the query will be run locally. + * ExtractParametersForLocalExecution extracts parameter types and values + * from the given ParamListInfo structure, and fills parameter type and + * value arrays. It does not change the oid of custom types, because the + * query will be run locally. */ static void ExtractParametersForLocalExecution(ParamListInfo paramListInfo, Oid **parameterTypes, @@ -272,8 +279,8 @@ ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList, List *localTaskPlacementList = NULL; List *remoteTaskPlacementList = NULL; - SplitLocalAndRemotePlacements(task->taskPlacementList, &localTaskPlacementList, - &remoteTaskPlacementList); + SplitLocalAndRemotePlacements( + task->taskPlacementList, &localTaskPlacementList, &remoteTaskPlacementList); /* either the local or the remote should be non-nil */ Assert(!(localTaskPlacementList == NIL && remoteTaskPlacementList == NIL)); @@ -281,9 +288,9 @@ ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList, if (list_length(task->taskPlacementList) == 1) { /* - * At this point, the task has a single placement (e.g,. anchor shard is - * distributed table's shard). So, it is either added to local or remote - * taskList. + * At this point, the task has a single placement (e.g,. anchor shard + * is distributed table's shard). So, it is either added to local or + * remote taskList. */ if (localTaskPlacementList == NIL) { @@ -297,10 +304,10 @@ ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList, else { /* - * At this point, we're dealing with reference tables or intermediate results - * where the task has placements on both local and remote nodes. We always - * prefer to use local placement, and require remote placements only for - * modifications. + * At this point, we're dealing with reference tables or intermediate + * results where the task has placements on both local and remote + * nodes. We always prefer to use local placement, and require remote + * placements only for modifications. */ task->partiallyLocalOrRemote = true; @@ -326,9 +333,9 @@ ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList, /* - * SplitLocalAndRemotePlacements is a helper function which iterates over the input - * taskPlacementList and puts the placements into corresponding list of either - * localTaskPlacementList or remoteTaskPlacementList. + * SplitLocalAndRemotePlacements is a helper function which iterates over the + * input taskPlacementList and puts the placements into corresponding list of + * either localTaskPlacementList or remoteTaskPlacementList. */ static void SplitLocalAndRemotePlacements(List *taskPlacementList, List **localTaskPlacementList, @@ -433,8 +440,8 @@ ShouldExecuteTasksLocally(List *taskList) /* * TODO: A future improvement could be to keep track of which placements - * have been locally executed. At this point, only use local execution for - * those placements. That'd help to benefit more from parallelism. + * have been locally executed. At this point, only use local execution + * for those placements. That'd help to benefit more from parallelism. */ return true; @@ -446,14 +453,14 @@ ShouldExecuteTasksLocally(List *taskList) { /* * This is the valuable time to use the local execution. We are likely - * to avoid any network round-trips by simply executing the command within - * this session. + * to avoid any network round-trips by simply executing the command + * within this session. * * We cannot avoid network round trips if the task is not a read only * task and accesses multiple placements. For example, modifications to * distributed tables (with replication factor == 1) would avoid network - * round-trips. However, modifications to reference tables still needs to - * go to over the network to do the modification on the other placements. + * round-trips. However, modifications to reference tables still needs + * to go to over the network to do the modification on the other placements. * Still, we'll be avoding the network round trip for this node. * * Note that we shouldn't use local execution if any distributed execution @@ -466,10 +473,10 @@ ShouldExecuteTasksLocally(List *taskList) if (!singleTask) { /* - * For multi-task executions, switching to local execution would likely to - * perform poorly, because we'd lose the parallelism. Note that the local - * execution is happening one task at a time (e.g., similar to sequential - * distributed execution). + * For multi-task executions, switching to local execution would likely + * to perform poorly, because we'd lose the parallelism. Note that the + * local execution is happening one task at a time (e.g., similar to + * sequential distributed execution). */ Assert(!TransactionAccessedLocalPlacement); @@ -481,8 +488,8 @@ ShouldExecuteTasksLocally(List *taskList) /* - * TaskAccessesLocalNode returns true if any placements of the task reside on the - * node that we're executing the query. + * TaskAccessesLocalNode returns true if any placements of the task reside on + * the node that we're executing the query. */ bool TaskAccessesLocalNode(Task *task) @@ -503,24 +510,26 @@ TaskAccessesLocalNode(Task *task) /* - * ErrorIfTransactionAccessedPlacementsLocally() errors out if a local query on any shard - * has already been executed in the same transaction. + * ErrorIfTransactionAccessedPlacementsLocally errors out if a local query + * on any shard has already been executed in the same transaction. * - * This check is required because Citus currently hasn't implemented local execution - * infrastructure for all the commands/executors. As we implement local execution for - * the command/executor that this function call exists, we should simply remove the check. + * This check is required because Citus currently hasn't implemented local + * execution infrastructure for all the commands/executors. As we implement + * local execution for the command/executor that this function call exists, + * we should simply remove the check. */ void ErrorIfTransactionAccessedPlacementsLocally(void) { if (TransactionAccessedLocalPlacement) { - ereport(ERROR, (errmsg("cannot execute command because a local execution has " - "accessed a placement in the transaction"), - errhint("Try re-running the transaction with " - "\"SET LOCAL citus.enable_local_execution TO OFF;\""), - errdetail("Some parallel commands cannot be executed if a " - "previous command has already been executed locally"))); + ereport(ERROR, + (errmsg("cannot execute command because a local execution has " + "accessed a placement in the transaction"), + errhint("Try re-running the transaction with " + "\"SET LOCAL citus.enable_local_execution TO OFF;\""), + errdetail("Some parallel commands cannot be executed if a " + "previous command has already been executed locally"))); } }