With #1804 (and related PRs), Citus gained the ability to
plan subqueries that are not safe to pushdown.
There are two high-level requirements for pushing down subqueries:
* Individual subqueries that require a merge step (i.e., GROUP BY
on non-distribution key, or LIMIT in the subquery etc). We've
handled such subqueries via #1876.
* Combination of subqueries that are not joined on distribution keys.
This commit aims to recursively plan some of such subqueries to make
the whole query safe to pushdown.
The main logic behind non colocated subquery joins is that we pick
an anchor range table entry and check for distribution key equality
of any other subqueries in the given query. If for a given subquery,
we cannot find distribution key equality with the anchor rte, we
recursively plan that subquery.
We also used a hacky solution for picking relations as the anchor range
table entries. The hack is that we wrap them into a subquery. This is only
necessary since some of the attribute equivalance checks are based on
queries rather than range table entries.
With this commit, Citus recursively plans subqueries that
are not safe to pushdown, in other words, requires a merge
step.
The algorithm is simple: Recursively traverse the query from bottom
up (i.e., bottom meaning the leaf queries). On each level, check
whether the query is safe to pushdown (or a single repartition
subquery). If the answer is yes, do not touch that subquery. If the
answer is no, plan the subquery seperately (i.e., create a subPlan
for it) and replace the subquery with a call to
`read_intermediate_results(planId, subPlanId)`. During the the
execution, run the subPlans first, and make them avaliable to the
next query executions.
Some of the queries hat this change allows us:
* Subqueries with LIMIT
* Subqueries with GROUP BY/DISTINCT on non-partition keys
* Subqueries involving re-partition joins, router queries
* Mixed usage of subqueries and CTEs (i.e., use CTEs in
subqueries as well). Nested subqueries as long as we
support the subquery inside the nested subquery.
* Subqueries with local tables (i.e., those subqueries
has the limitation that they have to be leaf subqueries)
* VIEWs on the distributed tables just works (i.e., the
limitations mentioned below still applies to views)
Some of the queries that is still NOT supported:
* Corrolated subqueries that are not safe to pushdown
* Window function on non-partition keys
* Recursively planned subqueries or CTEs on the outer
side of an outer join
* Only recursively planned subqueries and CTEs in the FROM
(i.e., not any distributed tables in the FROM) and subqueries
in WHERE clause
* Subquery joins that are not on the partition columns (i.e., each
subquery is individually joined on partition keys but not the upper
level subquery.)
* Any limitation that logical planner applies such as aggregate
distincts (except for count) when GROUP BY is on non-partition key,
or array_agg with ORDER BY
* Support for subqueries in WHERE clause
This commit enables subqueries in WHERE clause to be pushed down
by the subquery pushdown logic.
The support covers:
- Correlated subqueries with IN, NOT IN, EXISTS, NOT EXISTS,
operator expressions such as (>, <, =, ALL, ANY etc.)
- Non-correlated subqueries with (partition_key) IN (SELECT partition_key ..)
(partition_key) =ANY (SELECT partition_key ...)
Note that this commit heavily utilizes the attribute equivalence logic introduced
in the 1cb6a34ba8. In general, this commit mostly
adjusts the logical planner not to error out on the subqueries in WHERE clause.
* Improve error checks for subquery pushdown and INSERT ... SELECT
Since we allow subqueries in WHERE clause with the previous commit,
we should apply the same limitations to those subqueries.
With this commit, we do not iterate on each subquery one by one.
Instead, we extract all the subqueries and apply the checks directly
on those subqueries. The aim of this change is to (i) Simplify the
code (ii) Make it close to the checks on INSERT .. SELECT code base.
* Extend checks for unresolved paramaters to include SubLinks
With the presence of subqueries in where clause (i.e., SubPlans on the
query) the existing way for checking unresolved parameters fail. The
reason is that the parameters for SubPlans are kept on the parent plan not
on the query itself (see primnodes.h for the details).
With this commit, instead of checking SubPlans on the modified plans
we start to use originalQuery, where SubLinks represent the subqueries
in where clause. The unresolved parameters can be found on the SubLinks.
* Apply code-review feedback
* Remove unnecessary copying of shard interval list
This commit removes unnecessary copying of shard interval list. Note
that there are no copyObject function implemented for shard intervals.
Custom Scan is a node in the planned statement which helps external providers
to abstract data scan not just for foreign data wrappers but also for regular
relations so you can benefit your version of caching or hardware optimizations.
This sounds like only an abstraction on the data scan layer, but we can use it
as an abstraction for our distributed queries. The only thing we need to do is
to find distributable parts of the query, plan for them and replace them with
a Citus Custom Scan. Then, whenever PostgreSQL hits this custom scan node in
its Vulcano style execution, it will call our callback functions which run
distributed plan and provides tuples to the upper node as it scans a regular
relation. This means fewer code changes, fewer bugs and more supported features
for us!
First, in the distributed query planner phase, we create a Custom Scan which
wraps the distributed plan. For real-time and task-tracker executors, we add
this custom plan under the master query plan. For router executor, we directly
pass the custom plan because there is not any master query. Then, we simply let
the PostgreSQL executor run this plan. When it hits the custom scan node, we
call the related executor parts for distributed plan, fill the tuple store in
the custom scan and return results to PostgreSQL executor in Vulcano style,
a tuple per XXX_ExecScan() call.
* Modify planner to utilize Custom Scan node.
* Create different scan methods for different executors.
* Use native PostgreSQL Explain for master part of queries.