In plain words, each distributed plan pulls the necessary intermediate
results to the worker nodes that the plan hits. This is primarily useful
in three ways.
(i) If the distributed plan that uses intermediate
result(s) is a router query, then the intermediate results are only
broadcasted to a single node.
(ii) If a distributed plan consists of only intermediate results, which
is not uncommon, the intermediate results are broadcasted to a single
node only.
(iii) If a distributed query hits a sub-set of the shards in multiple
workers, the intermediate results will be broadcasted to the relevant
node(s).
The final item (iii) becomes crucial for append/range distributed
tables where typically the distributed queries hit a small subset of
shards/workers.
To do this, for each query that Citus creates a distributed plan, we keep
track of the subPlans used in the queryTree, and save it in the distributed
plan. Just before Citus executes each subPlan, Citus first keeps track of
every worker node that the distributed plan hits, and marks every subPlan
should be broadcasted to these nodes. Later, for each subPlan which is a
distributed plan, Citus does this operation recursively since these
distributed plans may access to different subPlans, and those have to be
recorded as well.
DESCRIPTION: Expression in reference join
Fixed: #2582
This patch allows arbitrary expressions in the join clause when joining to a reference table. An example of such joins could be found in CHbenCHmark queries 7, 8, 9 and 11; `mod((s_w_id * s_i_id),10000) = su_suppkey` and `ascii(substr(c_state,1,1)) = n2.n_nationkey`. Since the join is on a reference table these queries are able to be pushed down to the workers.
To implement these queries we will widen the `IsJoinClause` predicate to not check if the expressions are a type `Var` after stripping the implicit coerciens. Instead we define a join clause when the `Var`'s in a clause come from more than 1 table.
This allows more clauses to pass into the logical planner's `MultiNodeTree(...)` planning function. To compensate for this we tighten down the `LocalJoin`, `SinglePartitionJoin` and `DualPartitionJoin` to check for direct column references when planning. This allows the planner to work with arbitrary join expressions on reference tables.
This is necassery to support Q20 of the CHbenCHmark: #2582.
To summarize the fix: The subquery is converted into an INNER JOIN on a
table. This fixes the issue, since an INNER JOIN on a table is already
supported by the repartion planner.
The way this replacement is happening.:
1. Postgres replaces `col in (subquery)` with a SEMI JOIN (subquery) on col = subquery_result
2. If this subquery is simple enough Postgres will replace it with a
regular read from a table
3. If the subquery returns unique results (e.g. a primary key) Postgres
will convert the SEMI JOIN into an INNER JOIN during the planning. It
will not change this in the rewritten query though.
4. We check if Postgres sends us any SEMI JOINs during its join order
planning, if it doesn't we replace all SEMI JOINs in the rewritten
query with INNER JOIN (which we already support).
We've changed the logic for pulling RTE_RELATIONs in #3109 and
non-colocated subquery joins and partitioned tables.
@onurctirtir found this steps where I traced back and found the issues.
While looking into it in more detail, we decided to expand the list in a
way that the callers get all the relevant RTE_RELATIONs RELKIND_RELATION,
RELKIND_PARTITIONED_TABLE, RELKIND_FOREIGN_TABLE and RELKIND_MATVIEW.
These are all relation kinds that Citus planner is aware of.
Areas for further optimization:
- Don't save subquery results to a local file on the coordinator when the subquery is not in the having clause
- Push the the HAVING with subquery to the workers if there's a group by on the distribution column
- Don't push down the results to the workers when we don't push down the HAVING clause, only the coordinator needs it
Fixes#520Fixes#756Closes#2047
Do it in two ways (a) re-use the rte list as much as possible instead of
re-calculating over and over again (b) Limit the recursion to the relevant
parts of the query tree
* Change worker_hash_partition_table() such that the
divergence between Citus planner's hashing and
worker_hash_partition_table() becomes the same.
* Rename single partitioning to single range partitioning.
* Add single hash repartitioning. Basically, logical planner
treats single hash and range partitioning almost equally.
Physical planner, on the other hand, treats single hash and
dual hash repartitioning almost equally (except for JoinPruning).
* Add a new GUC to enable this feature
- changes in ruleutils_11.c is reflected
- vacuum statement api change is handled. We now allow
multi-table vacuum commands.
- some other function header changes are reflected
- api conflicts between PG11 and earlier versions
are handled by adding shims in version_compat.h
- various regression tests are fixed due output and
functionality in PG1
- no change is made to support new features in PG11
they need to be handled by new commit
After this commit large_table_shard_count wont be used to
check whether broadcast join, which is renamed as reference
join, can be applied. Reference join can only be applied over
reference tables.
This is the first of series of window function work.
We can now support window functions that can be pushed down to workers.
Window function must have distribution column in the partition clause
to be pushed down.
With #1804 (and related PRs), Citus gained the ability to
plan subqueries that are not safe to pushdown.
There are two high-level requirements for pushing down subqueries:
* Individual subqueries that require a merge step (i.e., GROUP BY
on non-distribution key, or LIMIT in the subquery etc). We've
handled such subqueries via #1876.
* Combination of subqueries that are not joined on distribution keys.
This commit aims to recursively plan some of such subqueries to make
the whole query safe to pushdown.
The main logic behind non colocated subquery joins is that we pick
an anchor range table entry and check for distribution key equality
of any other subqueries in the given query. If for a given subquery,
we cannot find distribution key equality with the anchor rte, we
recursively plan that subquery.
We also used a hacky solution for picking relations as the anchor range
table entries. The hack is that we wrap them into a subquery. This is only
necessary since some of the attribute equivalance checks are based on
queries rather than range table entries.
clause is not supported
This change allows unsupported clauses to go through query pushdown
planner instead of erroring out as we already do for non-outer joins.
We used to error out if the join clause includes filters like
t1.a < t2.a even if other filter like t1.key = t2.key exists.
Recently we lifted that restriction in subquery planning by
not lifting that restriction and focusing on equivalance classes
provided by postgres.
This checkin forwards previously erroring out real-time queries
due to join clauses to subquery planner and let it handle the
join even if the query does not have a subquery.
We are now pushing down queries that do not have any
subqueries in it. Error message looked misleading, changed to a more descriptive one.
With this commit, Citus recursively plans subqueries that
are not safe to pushdown, in other words, requires a merge
step.
The algorithm is simple: Recursively traverse the query from bottom
up (i.e., bottom meaning the leaf queries). On each level, check
whether the query is safe to pushdown (or a single repartition
subquery). If the answer is yes, do not touch that subquery. If the
answer is no, plan the subquery seperately (i.e., create a subPlan
for it) and replace the subquery with a call to
`read_intermediate_results(planId, subPlanId)`. During the the
execution, run the subPlans first, and make them avaliable to the
next query executions.
Some of the queries hat this change allows us:
* Subqueries with LIMIT
* Subqueries with GROUP BY/DISTINCT on non-partition keys
* Subqueries involving re-partition joins, router queries
* Mixed usage of subqueries and CTEs (i.e., use CTEs in
subqueries as well). Nested subqueries as long as we
support the subquery inside the nested subquery.
* Subqueries with local tables (i.e., those subqueries
has the limitation that they have to be leaf subqueries)
* VIEWs on the distributed tables just works (i.e., the
limitations mentioned below still applies to views)
Some of the queries that is still NOT supported:
* Corrolated subqueries that are not safe to pushdown
* Window function on non-partition keys
* Recursively planned subqueries or CTEs on the outer
side of an outer join
* Only recursively planned subqueries and CTEs in the FROM
(i.e., not any distributed tables in the FROM) and subqueries
in WHERE clause
* Subquery joins that are not on the partition columns (i.e., each
subquery is individually joined on partition keys but not the upper
level subquery.)
* Any limitation that logical planner applies such as aggregate
distincts (except for count) when GROUP BY is on non-partition key,
or array_agg with ORDER BY
In subquery pushdown, we first ensure that each relation is joined with at least
on another relation on the partition keys. That's fine given that the decision
is binary: pushdown the query at all or not.
With recursive planning, we'd want to check whether any specific part
of the query can be pushded down or not. Thus, we need the ability to
understand which part(s) of the subquery is safe to pushdown. This commit
adds the infrastructure for doing that.
Subquery pushdown planning is based on relation restriction
equivalnce. This brings us the opportuneatly to allow any
other joins as long as there is an already equi join between
the distributed tables.
We already allow that for joins with reference tables and
this commit allows that for joins among distributed tables.
With this commit, we allow pushing down subqueries with only
reference tables where GROUP BY or DISTINCT clause or Window
functions include only columns from reference tables.
This commit provides the support for window functions in subquery and insert
into select queries. Note that our support for window functions is still limited
because it must have a partition by clause on the distribution key. This commit
makes changes in the files insert_select_planner and multi_logical_planner. The
required tests are also added with files multi_subquery_window_functions.out
and multi_insert_select_window.out.
Citus can handle INSERT INTO ... SELECT queries if the query inserts
into local table by reading data from distributed table. The opposite
way is not correct. With this commit we warn the user if the latter
option is used.
With this commit, we relax the restrictions put on the reference
tables with subquery pushdown.
We did three notable improvements:
1) Relax equi-join restrictions
Previously, we always expected that the non-reference tables are
equi joined with reference tables on the partition key of the
non-reference table.
With this commit, we allow any column of non-reference tables
joined using non-equi joins as well.
2) Relax OUTER JOIN restrictions
Previously Citus errored out if any reference table exists at
any point of the outer part of an outer join. For instance,
See the below sketch where (h) denotes a hash distributed relation,
(r) denotes a reference table, (L) denotes LEFT JOIN and
(I) denotes INNER JOIN.
(L)
/ \
(I) h
/ \
r h
Before this commit Citus would error out since a reference table
appears on the left most part of an left join. However, that was
too restrictive so that we only error out if the reference table
is directly below and in the outer part of an outer join.
3) Bug fixes
We've done some minor bugfixes in the existing implementation.
Add a second implementation of INSERT INTO distributed_table SELECT ... that is used if
the query cannot be pushed down. The basic idea is to execute the SELECT query separately
and pass the results into the distributed table using a CopyDestReceiver, which is also
used for COPY and create_distributed_table. When planning the SELECT, we go through
planner hooks again, which means the SELECT can also be a distributed query.
EXPLAIN is supported, but EXPLAIN ANALYZE is not because preventing double execution was
a lot more complicated in this case.
Distributed query planning for subquery pushdown is done on the original
query. This prevents the usage of external parameters on the execution.
To overcome this, we manually replace the parameters on the original
query.
* Support for subqueries in WHERE clause
This commit enables subqueries in WHERE clause to be pushed down
by the subquery pushdown logic.
The support covers:
- Correlated subqueries with IN, NOT IN, EXISTS, NOT EXISTS,
operator expressions such as (>, <, =, ALL, ANY etc.)
- Non-correlated subqueries with (partition_key) IN (SELECT partition_key ..)
(partition_key) =ANY (SELECT partition_key ...)
Note that this commit heavily utilizes the attribute equivalence logic introduced
in the 1cb6a34ba8. In general, this commit mostly
adjusts the logical planner not to error out on the subqueries in WHERE clause.
* Improve error checks for subquery pushdown and INSERT ... SELECT
Since we allow subqueries in WHERE clause with the previous commit,
we should apply the same limitations to those subqueries.
With this commit, we do not iterate on each subquery one by one.
Instead, we extract all the subqueries and apply the checks directly
on those subqueries. The aim of this change is to (i) Simplify the
code (ii) Make it close to the checks on INSERT .. SELECT code base.
* Extend checks for unresolved paramaters to include SubLinks
With the presence of subqueries in where clause (i.e., SubPlans on the
query) the existing way for checking unresolved parameters fail. The
reason is that the parameters for SubPlans are kept on the parent plan not
on the query itself (see primnodes.h for the details).
With this commit, instead of checking SubPlans on the modified plans
we start to use originalQuery, where SubLinks represent the subqueries
in where clause. The unresolved parameters can be found on the SubLinks.
* Apply code-review feedback
* Remove unnecessary copying of shard interval list
This commit removes unnecessary copying of shard interval list. Note
that there are no copyObject function implemented for shard intervals.
* Enabling physical planner for subquery pushdown changes
This commit applies the logic that exists in INSERT .. SELECT
planning to the subquery pushdown changes.
The main algorithm is followed as :
- pick an anchor relation (i.e., target relation)
- per each target shard interval
- add the target shard interval's shard range
as a restriction to the relations (if all relations
joined on the partition keys)
- Check whether the query is router plannable per
target shard interval.
- If router plannable, create a task
* Add union support within the JOINS
This commit adds support for UNION/UNION ALL subqueries that are
in the following form:
.... (Q1 UNION Q2 UNION ...) as union_query JOIN (QN) ...
In other words, we currently do NOT support the queries that are
in the following form where union query is not JOINed with
other relations/subqueries :
.... (Q1 UNION Q2 UNION ...) as union_query ....
* Subquery pushdown planner uses original query
With this commit, we change the input to the logical planner for
subquery pushdown. Before this commit, the planner was relying
on the query tree that is transformed by the postgresql planner.
After this commit, the planner uses the original query. The main
motivation behind this change is the simplify deparsing of
subqueries.
* Enable top level subquery join queries
This work enables
- Top level subquery joins
- Joins between subqueries and relations
- Joins involving more than 2 range table entries
A new regression test file is added to reflect enabled test cases
* Add top level union support
This commit adds support for UNION/UNION ALL subqueries that are
in the following form:
.... (Q1 UNION Q2 UNION ...) as union_query ....
In other words, Citus supports allow top level
unions being wrapped into aggregations queries
and/or simple projection queries that only selects
some fields from the lower level queries.
* Disallow subqueries without a relation in the range table list for subquery pushdown
This commit disallows subqueries without relation in the range table
list. This commit is only applied for subquery pushdown. In other words,
we do not add this limitation for single table re-partition subqueries.
The reasoning behind this limitation is that if we allow pushing down
such queries, the result would include (shardCount * expectedResults)
where in a non distributed world the result would be (expectedResult)
only.
* Disallow subqueries without a relation in the range table list for INSERT .. SELECT
This commit disallows subqueries without relation in the range table
list. This commit is only applied for INSERT.. SELECT queries.
The reasoning behind this limitation is that if we allow pushing down
such queries, the result would include (shardCount * expectedResults)
where in a non distributed world the result would be (expectedResult)
only.
* Change behaviour of subquery pushdown flag (#1315)
This commit changes the behaviour of the citus.subquery_pushdown flag.
Before this commit, the flag is used to enable subquery pushdown logic. But,
with this commit, that behaviour is enabled by default. In other words, the
flag is now useless. We prefer to keep the flag since we don't want to break
the backward compatibility. Also, we may consider using that flag for other
purposes in the next commits.
* Require subquery_pushdown when limit is used in subquery
Using limit in subqueries may cause returning incorrect
results. Therefore we allow limits in subqueries only
if user explicitly set subquery_pushdown flag.
* Evaluate expressions on the LIMIT clause (#1333)
Subquery pushdown uses orignal query, the LIMIT and OFFSET clauses
are not evaluated. However, logical optimizer expects these expressions
are already evaluated by the standard planner. This commit manually
evaluates the functions on the logical planner for subquery pushdown.
* Better format subquery regression tests (#1340)
* Style fix for subquery pushdown regression tests
With this commit we intented a more consistent style for the
regression tests we've added in the
- multi_subquery_union.sql
- multi_subquery_complex_queries.sql
- multi_subquery_behavioral_analytics.sql
* Enable the tests that are temporarily commented
This commit enables some of the regression tests that were commented
out until all the development is done.
* Fix merge conflicts (#1347)
- Update regression tests to meet the changes in the regression
test output.
- Replace Ifs with Asserts given that the check is already done
- Update shard pruning outputs
* Add view regression tests for increased subquery coverage (#1348)
- joins between views and tables
- joins between views
- union/union all queries involving views
- views with limit
- explain queries with view
* Improve btree operators for the subquery tests
This commit adds the missing comprasion for subquery composite key
btree comparator.
Enables use views within distributed queries.
User can create and use a view on distributed tables/queries
as he/she would use with regular queries.
After this change router queries will have full support for views,
insert into select queries will support reading from views, not
writing into. Outer joins would have a limited support, and would
error out at certain cases such as when a view is in the inner side
of the outer join.
Although PostgreSQL supports writing into views under certain circumstances.
We disallowed that for distributed views.
Adds support for PostgreSQL 9.6 by copying in the requisite ruleutils
file and refactoring the out/readfuncs code to flexibly support the
old-style copy/pasted out/readfuncs (prior to 9.6) or use extensible
node APIs (in 9.6 and higher).
Most version-specific code within this change is only needed to set new
fields in the AggRef nodes we build for aggregations. Version-specific
test output files were added in certain cases, though in most they were
not necessary. Each such file begins by e.g. printing the major version
in order to clarify its purpose.
The comment atop citus_nodes.h details how to add support for new nodes
for when that becomes necessary.
This commit completes having support in Citus by adding having support for
real-time and task-tracker executors. Multiple tests are added to regression
tests to cover new supported queries with having support.
In subquery pushdown, we allow outer joins if the join condition is on the
partition columns. WhereClauseList() used to return all join conditions including
outer joins. However, this has been changed with a commit related to outer join
support on regular queries. With this commit, we refactored ExtractFromExpressionWalker()
to return two lists of qualifiers. The first list is for inner join and filter
clauses and the second list is for outer join clauses. Therefore, we can also
use outer join clauses to check subquery pushdown prerequisites.
We can now support richer set of queries in router planner.
This allow us to support CTEs, joins, window function, subqueries
if they are known to be executed at a single worker with a single
task (all tables are filtered down to a single shard and a single
worker contains all table shards referenced in the query).
Fixes : #501
Fixes#394
This change adds LIMIT/OFFSET support for non router-plannable
distributed queries.
In cases that we can push the LIMIT down, we add the OFFSET value to
that LIMIT in the worker queries. When a query with LIMIT x OFFSET y is issued,
the query is propagated to the workers as LIMIT (x+y) OFFSET 0, and on the
master table, the original LIMIT and OFFSET values are used. With this change,
we can use OFFSET wherever we can use LIMIT.
This change removes the whitelisting check on the WHERE clauses. Note that, before
this change, citus was already allowing all types of nodes with the following
format (i.e., wrap with a boolean test):
* SELECT col FROM table WHERE (ANY EXPRESSION) is TRUE;
Thus, this change is mostly useful for allowing the expressions in the WHERE clause
directly and avoiding "unsupport clause type" errors.