mirror of https://github.com/citusdata/citus.git
refactor local_executor.c lines with >78 characters
parent
6dbb48c9f1
commit
17d9b934c3
|
@ -3,21 +3,22 @@
|
||||||
*
|
*
|
||||||
* The scope of the local execution is locally executing the queries on the
|
* The scope of the local execution is locally executing the queries on the
|
||||||
* shards. In other words, local execution does not deal with any local tables
|
* shards. In other words, local execution does not deal with any local tables
|
||||||
* that are not on shards on the node that the query is being executed. In that sense,
|
* that are not on shards on the node that the query is being executed. In that
|
||||||
* the local executor is only triggered if the node has both the metadata and the
|
* sense, the local executor is only triggered if the node has both the metadata
|
||||||
* shards (e.g., only Citus MX worker nodes).
|
* and the shards (e.g., only Citus MX worker nodes).
|
||||||
*
|
*
|
||||||
* The goal of the local execution is to skip the unnecessary network round-trip
|
* The goal of the local execution is to skip the unnecessary network round-trip
|
||||||
* happening on the node itself. Instead, identify the locally executable tasks and
|
* happening on the node itself. Instead, identify the locally executable tasks
|
||||||
* simply call PostgreSQL's planner and executor.
|
* and simply call PostgreSQL's planner and executor.
|
||||||
*
|
*
|
||||||
* The local executor is an extension of the adaptive executor. So, the executor uses
|
* The local executor is an extension of the adaptive executor. So, the executor
|
||||||
* adaptive executor's custom scan nodes.
|
* uses adaptive executor's custom scan nodes.
|
||||||
*
|
*
|
||||||
* One thing to note is that Citus MX is only supported with replication factor = 1, so
|
* One thing to note is that Citus MX is only supported with replication factor
|
||||||
* keep that in mind while continuing the comments below.
|
* to be equal to 1, so keep that in mind while continuing the comments below.
|
||||||
*
|
*
|
||||||
* On the high level, there are 3 slightly different ways of utilizing local execution:
|
* On the high level, there are 3 slightly different ways of utilizing local
|
||||||
|
* execution:
|
||||||
*
|
*
|
||||||
* (1) Execution of local single shard queries of a distributed table
|
* (1) Execution of local single shard queries of a distributed table
|
||||||
*
|
*
|
||||||
|
@ -25,16 +26,19 @@
|
||||||
* executor, and since the query is only a single task the execution finishes
|
* executor, and since the query is only a single task the execution finishes
|
||||||
* without going to the network at all.
|
* without going to the network at all.
|
||||||
*
|
*
|
||||||
* Even if there is a transaction block (or recursively planned CTEs), as long
|
* Even if there is a transaction block (or recursively planned CTEs), as
|
||||||
* as the queries hit the shards on the same node, the local execution will kick in.
|
* long as the queries hit the shards on the same node, the local execution
|
||||||
|
* will kick in.
|
||||||
*
|
*
|
||||||
* (2) Execution of local single queries and remote multi-shard queries
|
* (2) Execution of local single queries and remote multi-shard queries
|
||||||
*
|
*
|
||||||
* The rule is simple. If a transaction block starts with a local query execution,
|
* The rule is simple. If a transaction block starts with a local query
|
||||||
* all the other queries in the same transaction block that touch any local shard
|
* execution,
|
||||||
* have to use the local execution. Although this sounds restrictive, we prefer to
|
* all the other queries in the same transaction block that touch any local
|
||||||
* implement it in this way, otherwise we'd end-up with as complex scenarios as we
|
* shard have to use the local execution. Although this sounds restrictive,
|
||||||
* have in the connection managements due to foreign keys.
|
* we prefer to implement it in this way, otherwise we'd end-up with as
|
||||||
|
* complex scenarios as we have in the connection managements due to foreign
|
||||||
|
* keys.
|
||||||
*
|
*
|
||||||
* See the following example:
|
* See the following example:
|
||||||
* BEGIN;
|
* BEGIN;
|
||||||
|
@ -49,26 +53,28 @@
|
||||||
*
|
*
|
||||||
* (3) Modifications of reference tables
|
* (3) Modifications of reference tables
|
||||||
*
|
*
|
||||||
* Modifications to reference tables have to be executed on all nodes. So, after the
|
* Modifications to reference tables have to be executed on all nodes. So,
|
||||||
* local execution, the adaptive executor keeps continuing the execution on the other
|
* after the local execution, the adaptive executor keeps continuing the
|
||||||
* nodes.
|
* execution on the other nodes.
|
||||||
*
|
*
|
||||||
* Note that for read-only queries, after the local execution, there is no need to
|
* Note that for read-only queries, after the local execution, there is no
|
||||||
* kick in adaptive executor.
|
* need to kick in adaptive executor.
|
||||||
*
|
*
|
||||||
* There are also a few limitations/trade-offs that are worth mentioning.
|
* There are also a few limitations/trade-offs that are worth mentioning.
|
||||||
* - The local execution on multiple shards might be slow because the execution has to
|
* - The local execution on multiple shards might be slow because the execution
|
||||||
* happen one task at a time (e.g., no parallelism).
|
* has to happen one task at a time (e.g., no parallelism).
|
||||||
* - If a transaction block/CTE starts with a multi-shard command, we do not use local query execution
|
* - If a transaction block/CTE starts with a multi-shard command, we do not
|
||||||
* since local execution is sequential. Basically, we do not want to lose parallelism
|
* use local query execution since local execution is sequential. Basically,
|
||||||
* across local tasks by switching to local execution.
|
* we do not want to lose parallelism across local tasks by switching to local
|
||||||
* - The local execution currently only supports queries. In other words, any utility commands like TRUNCATE,
|
* execution.
|
||||||
* fails if the command is executed after a local execution inside a transaction block.
|
* - The local execution currently only supports queries. In other words, any
|
||||||
|
* utility commands like TRUNCATE, fails if the command is executed after a local
|
||||||
|
* execution inside a transaction block.
|
||||||
* - The local execution cannot be mixed with the executors other than adaptive,
|
* - The local execution cannot be mixed with the executors other than adaptive,
|
||||||
* namely task-tracker executor.
|
* namely task-tracker executor.
|
||||||
* - Related with the previous item, COPY command
|
* - Related with the previous item, COPY command cannot be mixed with local
|
||||||
* cannot be mixed with local execution in a transaction. The implication of that is any
|
* execution in a transaction. The implication of that is any part of INSERT..SELECT
|
||||||
* part of INSERT..SELECT via coordinator cannot happen via the local execution.
|
* via coordinator cannot happen via the local execution.
|
||||||
*/
|
*/
|
||||||
#include "postgres.h"
|
#include "postgres.h"
|
||||||
#include "miscadmin.h"
|
#include "miscadmin.h"
|
||||||
|
@ -215,9 +221,10 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ExtractParametersForLocalExecution extracts parameter types and values from
|
* ExtractParametersForLocalExecution extracts parameter types and values
|
||||||
* the given ParamListInfo structure, and fills parameter type and value arrays.
|
* from the given ParamListInfo structure, and fills parameter type and
|
||||||
* It does not change the oid of custom types, because the query will be run locally.
|
* value arrays. It does not change the oid of custom types, because the
|
||||||
|
* query will be run locally.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
ExtractParametersForLocalExecution(ParamListInfo paramListInfo, Oid **parameterTypes,
|
ExtractParametersForLocalExecution(ParamListInfo paramListInfo, Oid **parameterTypes,
|
||||||
|
@ -272,8 +279,8 @@ ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList,
|
||||||
List *localTaskPlacementList = NULL;
|
List *localTaskPlacementList = NULL;
|
||||||
List *remoteTaskPlacementList = NULL;
|
List *remoteTaskPlacementList = NULL;
|
||||||
|
|
||||||
SplitLocalAndRemotePlacements(task->taskPlacementList, &localTaskPlacementList,
|
SplitLocalAndRemotePlacements(
|
||||||
&remoteTaskPlacementList);
|
task->taskPlacementList, &localTaskPlacementList, &remoteTaskPlacementList);
|
||||||
|
|
||||||
/* either the local or the remote should be non-nil */
|
/* either the local or the remote should be non-nil */
|
||||||
Assert(!(localTaskPlacementList == NIL && remoteTaskPlacementList == NIL));
|
Assert(!(localTaskPlacementList == NIL && remoteTaskPlacementList == NIL));
|
||||||
|
@ -281,9 +288,9 @@ ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList,
|
||||||
if (list_length(task->taskPlacementList) == 1)
|
if (list_length(task->taskPlacementList) == 1)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* At this point, the task has a single placement (e.g,. anchor shard is
|
* At this point, the task has a single placement (e.g,. anchor shard
|
||||||
* distributed table's shard). So, it is either added to local or remote
|
* is distributed table's shard). So, it is either added to local or
|
||||||
* taskList.
|
* remote taskList.
|
||||||
*/
|
*/
|
||||||
if (localTaskPlacementList == NIL)
|
if (localTaskPlacementList == NIL)
|
||||||
{
|
{
|
||||||
|
@ -297,10 +304,10 @@ ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList,
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* At this point, we're dealing with reference tables or intermediate results
|
* At this point, we're dealing with reference tables or intermediate
|
||||||
* where the task has placements on both local and remote nodes. We always
|
* results where the task has placements on both local and remote
|
||||||
* prefer to use local placement, and require remote placements only for
|
* nodes. We always prefer to use local placement, and require remote
|
||||||
* modifications.
|
* placements only for modifications.
|
||||||
*/
|
*/
|
||||||
task->partiallyLocalOrRemote = true;
|
task->partiallyLocalOrRemote = true;
|
||||||
|
|
||||||
|
@ -326,9 +333,9 @@ ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList,
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* SplitLocalAndRemotePlacements is a helper function which iterates over the input
|
* SplitLocalAndRemotePlacements is a helper function which iterates over the
|
||||||
* taskPlacementList and puts the placements into corresponding list of either
|
* input taskPlacementList and puts the placements into corresponding list of
|
||||||
* localTaskPlacementList or remoteTaskPlacementList.
|
* either localTaskPlacementList or remoteTaskPlacementList.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
SplitLocalAndRemotePlacements(List *taskPlacementList, List **localTaskPlacementList,
|
SplitLocalAndRemotePlacements(List *taskPlacementList, List **localTaskPlacementList,
|
||||||
|
@ -433,8 +440,8 @@ ShouldExecuteTasksLocally(List *taskList)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* TODO: A future improvement could be to keep track of which placements
|
* TODO: A future improvement could be to keep track of which placements
|
||||||
* have been locally executed. At this point, only use local execution for
|
* have been locally executed. At this point, only use local execution
|
||||||
* those placements. That'd help to benefit more from parallelism.
|
* for those placements. That'd help to benefit more from parallelism.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
@ -446,14 +453,14 @@ ShouldExecuteTasksLocally(List *taskList)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* This is the valuable time to use the local execution. We are likely
|
* This is the valuable time to use the local execution. We are likely
|
||||||
* to avoid any network round-trips by simply executing the command within
|
* to avoid any network round-trips by simply executing the command
|
||||||
* this session.
|
* within this session.
|
||||||
*
|
*
|
||||||
* We cannot avoid network round trips if the task is not a read only
|
* We cannot avoid network round trips if the task is not a read only
|
||||||
* task and accesses multiple placements. For example, modifications to
|
* task and accesses multiple placements. For example, modifications to
|
||||||
* distributed tables (with replication factor == 1) would avoid network
|
* distributed tables (with replication factor == 1) would avoid network
|
||||||
* round-trips. However, modifications to reference tables still needs to
|
* round-trips. However, modifications to reference tables still needs
|
||||||
* go to over the network to do the modification on the other placements.
|
* to go to over the network to do the modification on the other placements.
|
||||||
* Still, we'll be avoding the network round trip for this node.
|
* Still, we'll be avoding the network round trip for this node.
|
||||||
*
|
*
|
||||||
* Note that we shouldn't use local execution if any distributed execution
|
* Note that we shouldn't use local execution if any distributed execution
|
||||||
|
@ -466,10 +473,10 @@ ShouldExecuteTasksLocally(List *taskList)
|
||||||
if (!singleTask)
|
if (!singleTask)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* For multi-task executions, switching to local execution would likely to
|
* For multi-task executions, switching to local execution would likely
|
||||||
* perform poorly, because we'd lose the parallelism. Note that the local
|
* to perform poorly, because we'd lose the parallelism. Note that the
|
||||||
* execution is happening one task at a time (e.g., similar to sequential
|
* local execution is happening one task at a time (e.g., similar to
|
||||||
* distributed execution).
|
* sequential distributed execution).
|
||||||
*/
|
*/
|
||||||
Assert(!TransactionAccessedLocalPlacement);
|
Assert(!TransactionAccessedLocalPlacement);
|
||||||
|
|
||||||
|
@ -481,8 +488,8 @@ ShouldExecuteTasksLocally(List *taskList)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* TaskAccessesLocalNode returns true if any placements of the task reside on the
|
* TaskAccessesLocalNode returns true if any placements of the task reside on
|
||||||
* node that we're executing the query.
|
* the node that we're executing the query.
|
||||||
*/
|
*/
|
||||||
bool
|
bool
|
||||||
TaskAccessesLocalNode(Task *task)
|
TaskAccessesLocalNode(Task *task)
|
||||||
|
@ -503,19 +510,21 @@ TaskAccessesLocalNode(Task *task)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ErrorIfTransactionAccessedPlacementsLocally() errors out if a local query on any shard
|
* ErrorIfTransactionAccessedPlacementsLocally errors out if a local query
|
||||||
* has already been executed in the same transaction.
|
* on any shard has already been executed in the same transaction.
|
||||||
*
|
*
|
||||||
* This check is required because Citus currently hasn't implemented local execution
|
* This check is required because Citus currently hasn't implemented local
|
||||||
* infrastructure for all the commands/executors. As we implement local execution for
|
* execution infrastructure for all the commands/executors. As we implement
|
||||||
* the command/executor that this function call exists, we should simply remove the check.
|
* local execution for the command/executor that this function call exists,
|
||||||
|
* we should simply remove the check.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
ErrorIfTransactionAccessedPlacementsLocally(void)
|
ErrorIfTransactionAccessedPlacementsLocally(void)
|
||||||
{
|
{
|
||||||
if (TransactionAccessedLocalPlacement)
|
if (TransactionAccessedLocalPlacement)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errmsg("cannot execute command because a local execution has "
|
ereport(ERROR,
|
||||||
|
(errmsg("cannot execute command because a local execution has "
|
||||||
"accessed a placement in the transaction"),
|
"accessed a placement in the transaction"),
|
||||||
errhint("Try re-running the transaction with "
|
errhint("Try re-running the transaction with "
|
||||||
"\"SET LOCAL citus.enable_local_execution TO OFF;\""),
|
"\"SET LOCAL citus.enable_local_execution TO OFF;\""),
|
||||||
|
|
Loading…
Reference in New Issue