Merge pull request #2511 from citusdata/planner_readme

Expand planner readme
pull/2520/head
Marco Slot 2018-12-04 14:29:37 +01:00 committed by GitHub
commit 7c2a2d08af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 69 additions and 3 deletions

View File

@ -1,12 +1,78 @@
# INSERT ... SELECT query planning
# Distributed Query Planner
The distributed query planner is entered through the `distributed_planner` function in `distributed_planner.c`. This is the hook that Postgres calls instead of `standard_planner`.
We always first call `standard_planner` to build a `PlannedStmt`. For queries containing a distributed table or reference table, we then proceed with distributed planning, which overwrites the `planTree` in the `PlannedStmt`.
Distributed planning (`CreateDistributedPlan`) tries several different methods to plan the query:
1. Router planner, proceed if the query prunes down to a single set of co-located shards
2. Modification planning, proceed if the query is a DML command and all joins are co-located
3. Recursive planning, find CTEs and subqueries that cannot be pushed down and go back to 1
4. Logical planner, constructs a multi-relational algebra tree to find a distributed execution plan
## Router planner
During the call to `standard_planner`, Postgres calls a hook named `multi_relation_restriction_hook`. We use this hook to determine explicit and implicit filters on (occurrences of) distributed tables. We apply shard pruning to all tables using the filters in `PlanRouterQuery`. If all tables prune down to a single shard and all those shards are on the same node, then the query is router plannable meaning it can be fully executed by one of the worker nodes.
The router planner preserves the original query tree, but the query does need to be rewritten to have shard names instead of table names. We cannot simply put the shard names into the query tree, because it actually contains relation OIDs. If the deparsing logic resolves those OIDs, it would throw an error since the shards do not exist on the coordinator. Instead, we replace the table entries with a fake SQL function call to `citus_extradata_container` and encode the original table ID and the shard ID into the parameter of the function. The deparsing functions recognise the fake function call and convert it into a shard name.
## Recursive planning
CTEs and subqueries that cannot be pushed down (checked using `DeferErrorIfCannotPushdownSubquery`) and do not contain references to the outer query are planned by recursively calling the `planner` function with the subquery as the parse tree. Because the planner is called from the top, any type of query that is supported by Postgres or Citus can be a valid subquery. The resulting plan is added to the `subPlanList` in the `DistributedPlan` and the subquery is replaced by a call to `read_intermediate_result` with a particular subplan name. At execution time, all of the plans in the `subPlanList` are executed and their output is sent to all the workers and written into an intermediate result with the subplan name (see `subplan_execution.c`). In the remainder of the planner, calls to `read_intermediate_result` are treated in the same way as reference tables, which means they can be joined by any column.
## Logical planner
The logical planner constructs a multi-relational algebra tree from the query with operators such as `MultiTable`, `MultiProject`, `MultiJoin` and `MultiCollect`. It first picks a strategy for handling joins in `MultiLogicalPlanCreate` (pushdown planning, or join order planning) and then builds a `MultiNode` tree based on the original query tree. In the initial `MultiNode` tree, each `MultiTable` is wrapped in `MultiCollect`, which effectively means collect the entire table in one place. The `MultiNode` tree is passed to the logical optimizer which transforms the tree into one that requires less network traffic by pushing down operators. Finally, the physical planner transforms the `MultiNode` tree into a `DistributedPlan` which contains the queries to execute on shards and can be passed to the executor.
### Pushdown planning
During the call to `standard_planner`, Postgres calls a hook named `multi_relation_restriction_hook`. We use this hook to determine whether all (occurrences of) distributed tables are joined on their respective distribution columns. When this is the case, we can be somewhat agnostic to the structure of subqueries and other joins. In that case, we treat the whole join tree as a single `MultiTable` and deparse this part of the query as is during physical planning. Pushing down a subquery is only possible when the subquery can be answered without a merge step (checked using `DeferErrorIfCannotPushdownSubquery`). However, you may notice that these subqueries are already replaced by `read_intermediate_result` calls during recursive planning. Only subqueries that have references to the outer query remain at this stage would pass through recursive planning and fail the check.
### Join order planning
The join order planner is applied to the join tree in the original query and generates all possible pair-wise join orders. If a pair-wise join is not on the distribution column, it requires re-partitioning. The join order that requires the fewest re-partition steps is converted into a tree of `MultiJoin` nodes, prior to running the logical optimizer. The optimizer leaves the `MultiJoin` tree in tact, but pushes down select and collect operators below the `MultiJoin` nodes.
### Logical optimizer
The logical optimizer uses commutativity rules to push project and select operators down below the `MultiCollect` nodes. Everything above the `MultiCollect` operator will be is executed on the coordinator and everything below on the workers. Additionally, the optimizer uses distributivity rules to push down operators below the `MultiJoin` nodes, such that filters and projections are applied prior to joins. This is primarily relevant for re-partition joins which first try to reduce the data by applying selections and projections, and then re-partitioning the result.
A number of SQL clauses like aggregates, GROUP BY, ORDER BY, LIMIT can only be pushed down below the `MultiCollect` under certain conditions. All these clauses are bundled together in a `MultiExtendedOpNode`. After the basic transformation, the `MultiExtendedOpNode`s are directly above the `MultiCollect` nodes. They are then split into a master and a worker part and the worker part is pushed down below the `MultiCollect`.
### Physical planner
This section needs to be expanded.
## Modification planning
In terms of modification planning, we distinguish between several cases:
1. DML planning (`CreateModifyPlan`)
1.a. UPDATE/DELETE planning
1.b. INSERT planning
2. INSERT...SELECT planning (`CreateInsertSelectPlan`)
### UPDATE/DELETE planning
UPDATE and DELETE commands are handled by the router planner (`PlanRouterQuery`), but when tables prune to multiple shards we do not fall back to other planners, but instead proceed to generate a task for each shard, as long as all subqueries can be pushed down. We can do this because UPDATE and DELETE never have a meaningful merge step on the coordinator, other than concatening RETURNING rows.
When there are CTEs or subqueries that cannot be pushed down in the UPDATE/DELETE, we continue with recursive planning and try again. If recursive planning cannot resolve the issue (e.g. due to a correlated subquery), then the command will error out.
### INSERT planning
Distributed planning for INSERT commands is relatively complicated because of multi-row INSERT. Each row in a multi-row INSERT may go to different shard and therefore we need to construct a different query for each shard. The logic for this is primarily in `BuildRoutesForInsert`, which builds the set of rows for each shard. Rather than construct a full query tree for each shard, we put each set of rows in the `rowValuesLists` of the `Task` and replace the VALUES section of the INSERT just before deparsing in `UpdateTaskQueryString`.
One additional complication for INSERTs is that it is very common to have a function call (such as `now()` or `nextval(..)`) in the position of the distribution column. In that case building the task list is deferred until the functions have been evaluated in the executor.
### INSERT ... SELECT query planning
Citus supports `INSERT ... SELECT` queries either by pushing down the whole query to the worker nodes or pulling the `SELECT` part to the coordinator.
## INSERT ... SELECT - by pushing down
#### INSERT ... SELECT - by pushing down
If `INSERT ... SELECT` query can be planned by pushing down it to the worker nodes, Citus selects to choose that logic first. Query is planned separately for each shard in the target table. Do so by replacing the partitioning qual parameter using the shard's actual boundary values to create modify task for each shard. Then, shard pruning is performed to decide on to which shards query will be pushed down. Finally, checks if the target shardInterval has exactly same placements with the select task's available anchor placements.
## INSERT...SELECT - via the coordinator
#### INSERT...SELECT - via the coordinator
If the query can not be pushed down to the worker nodes, two different approaches can be followed depending on whether ON CONFLICT or RETURNING clauses are used.