Mark existing objects that are not included in distributed object infrastructure
in older versions of Citus (but now should be) as distributed, after updating
Citus successfully.
DESCRIPTION: Fix unnecessary repartition on joins with more than 4 tables
In 9.1 we have introduced support for all CH-benCHmark queries by widening our definitions of joins to include joins with expressions in them. This had the undesired side effect of Q5 regressing on its plan by implementing a repartition join.
It turned out this regression was not directly related to widening of the join clause, nor the schema employed by CH-benCHmark. Instead it had to do with 4 or more tables being joined in a chain. A chain meaning:
```sql
SELECT * FROM a,b,c,d WHERE a.part = b.part AND b.part = c.part AND ....
```
Due to how our join order planner was implemented it would only keep track of 1 of the partition columns when comparing if the join could be executed locally. This manifested in a join chain of 4 tables to _always_ be executed as a repartition join. 3 tables joined in a chain would have the middle table shared by the two outer tables causing the local join possibility to be found.
With this patch we keep a unique list (or set) of all partition columns participating in the join. When a candidate table is checked for a possibility to execute a local join it will check if there is any partition column in that set that matches an equality join clause on the partition column of the candidate table.
By taking into account all partition columns in the left relation it will now find the local join path on >= 4 tables joined in a chain.
fixes: #3276
For example, a PARAM might reside inside a function just because
of a casting of a type such as the follows:
```
{FUNCEXPR
:funcid 1740
:funcresulttype 1700
:funcretset false
:funcvariadic false
:funcformat 2
:funccollid 0
:inputcollid 0
:args (
{PARAM
:paramkind 0
:paramid 15
:paramtype 23
:paramtypmod -1
:paramcollid 0
:location 356
}
)
```
We should recursively check the expression before bailing out.
Sometimes during errors workers will create files while we're deleting intermediate directories
example:
DEBUG: could not remove file "base/pgsql_job_cache/10_0_431": Directory not empty
DETAIL: WARNING from localhost:57637
Previously we only prevented AVG from being pushed down, but this is incorrect:
- array_agg, while somewhat non sensical to order by, will potentially be missing values
- combinefunc aggregation will raise errors about cstrings not being comparable (while we also can't know if the aggregate is commutative)
This commit limits approximating LIMIT pushdown when ordering by aggregates to:
min, max, sum, count, bit_and, bit_or, every, any
Which means of those we previously supported, we now exclude:
avg, array_agg, jsonb_agg, jsonb_object_agg, json_agg, json_object_agg, hll_add, hll_union, topn_add, topn_union
Previously, the logic for evaluting the functions and the parameters
were the same. That ended-up evaluting the functions inaccurately
on the coordinator. Instead, split the function evaluation logic
from parameter evalution logic.
As that is powerful and cause metadata inconsistency. See the following steps:
(Note that we cannot use PGC_SUSET because on Citus MX we need this flag for non-
superusers as well)
```SQL
CREATE TABLE test_ref_table(key int);
SELECT create_reference_table('test_ref_table');
SELECT logicalrelid, logicalrelid::oid FROM pg_dist_partition;
┌────────────────┬──────────────┐
│ logicalrelid │ logicalrelid │
├────────────────┼──────────────┤
│ test_ref_table │ 16831 │
└────────────────┴──────────────┘
(1 row)
Time: 0.929 ms
SELECT relname FROM pg_class WHERE oid = 16831;
┌────────────────┐
│ relname │
├────────────────┤
│ test_ref_table │
└────────────────┘
(1 row)
Time: 0.785 ms
SET citus.enable_ddl_propagation TO off;
DROP TABLE test_ref_table ;
SELECT logicalrelid, logicalrelid::oid FROM pg_dist_partition;
┌──────────────┬──────────────┐
│ logicalrelid │ logicalrelid │
├──────────────┼──────────────┤
│ 16831 │ 16831 │
└──────────────┴──────────────┘
(1 row)
Time: 0.972 ms
SELECT relname FROM pg_class WHERE oid = 16831;
┌─────────┐
│ relname │
├─────────┤
└─────────┘
(0 rows)
Time: 0.908 ms
SELECT master_add_node('localhost', 9703);
server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
The connection to the server was lost. Attempting reset: Failed.
Time: 5.028 ms
!>
```
Previously, we've identified the usedSubPlans by only looking
to the subPlanId.
With this commit, we're expanding it to also include information
on the location of the subPlan.
This is useful to distinguish the cases where the subPlan is used
either on only HAVING or both HAVING and any other part of the query.
First, diff is updated to not update the files in-place
For some reason diff is being called multiple times,
so $file1.unmodified becomes normalized on second invocation
Secondly, diff-filter updates output to come from the unmodified version
Normalization is serving two purposes:
- avoid diff noise in regressions
- avoid diff noise in commits when expected result is updated
The first purpose only wants to reduce the lines which diff registers,
whereas the second wants those changes to be committed
* Update shardPlacement->nodeId to uint
As the source of the shardPlacement->nodeId is always workerNode->nodeId,
and that is uint32.
We had this hack because of: 0ea4e52df5 (r266421409)
And, that is gone with: 90056f7d3c (diff-c532177d74c72d3f0e7cd10e448ab3c6L1123)
So, we're safe to do it now.
* Relax the restrictions on using the local execution
Previously, whenever any local execution happens, we disabled further
commands to do any remote queries. The basic motivation for doing that
is to prevent any accesses in the same transaction block to access the
same placements over multiple sessions: one is local session the other
is remote session to the same placement.
However, the current implementation does not distinguish local accesses
being to a placement or not. For example, we could have local accesses
that only touches intermediate results. In that case, we should not
implement the same restrictions as they become useless.
So, this is a pre-requisite for executing the intermediate result only
queries locally.
* Update the error messages
As the underlying implementation has changed, reflect it in the error
messages.
* Keep track of connections to local node
With this commit, we're adding infrastructure to track if any connection
to the same local host is done or not.
The main motivation for doing this is that we've previously were more
conservative about not choosing local execution. Simply, we disallowed
local execution if any connection to any remote node is done. However,
if we want to use local execution for intermediate result only queries,
this'd be annoying because we expect all queries to touch remote node
before the final query.
Note that this approach is still limiting in Citus MX case, but for now
we can ignore that.
* Formalize the concept of Local Node
Also some minor refactoring while creating the dummy placement
* Write intermediate results locally when the results are only needed locally
Before this commit, Citus used to always broadcast all the intermediate
results to remote nodes. However, it is possible to skip pushing
the results to remote nodes always.
There are two notable cases for doing that:
(a) When the query consists of only intermediate results
(b) When the query is a zero shard query
In both of the above cases, we don't need to access any data on the shards. So,
it is a valuable optimization to skip pushing the results to remote nodes.
The pattern mentioned in (a) is actually a common patterns that Citus users
use in practice. For example, if you have the following query:
WITH cte_1 AS (...), cte_2 AS (....), ... cte_n (...)
SELECT ... FROM cte_1 JOIN cte_2 .... JOIN cte_n ...;
The final query could be operating only on intermediate results. With this patch,
the intermediate results of the ctes are not unnecessarily pushed to remote
nodes.
* Add specific regression tests
As there are edge cases in Citus MX and with round-robin policy,
use the same queries on those cases as well.
* Fix failure tests
By forcing not to use local execution for intermediate results since
all the tests expects the results to be pushed remotely.
* Fix flaky test
* Apply code-review feedback
Mostly style changes
* Limit the max value of pg_dist_node_seq to reserve for internal use
This can helpful in guiding us where to look when this test fails.
For example, if the result file has repartitioned_results_ prefix,
then we need to look into repartitioned insert/select. Otherwise
it is probably a CTE or a subquery.
In #3374 a new way of locking shard distribution metadata was
implemented. However, this was only done in the function
`LockShardDistributionMetadata` and not in
`TryLockShardDistributionMetadata`. This is bad, since it causes these
locks to not block eachother in some cases.
This commit fixes this issue by sharing the code that sets the locktag
between the two function.
When creating a new distributed table. The shards would colocate with shards
with SHARD_STATE_TO_DELETE (shardstate = 4). This means if that state was
because of a shard move the new shard would be created on two nodes and it
would not get deleted since it's shard state would be 1.
adaptive_executor: sort includes, use foreach_ptr, remove lies from FinishDistributedExecution docs
connection_management: rename msecs, which isn't milliseconds
placement_connection: small typos
Comment from code:
/*
* We had to implement this hack because on Postgres11 and below, the originalQuery
* and the query would have significant differences in terms of CTEs where CTEs
* would not be inlined on the query (as standard_planner() wouldn't inline CTEs
* on PG 11 and below).
*
* Instead, we prefer to pass the inlined query to the distributed planning. We rely
* on the fact that the query includes subqueries, and it'd definitely go through
* query pushdown planning. During query pushdown planning, the only relevant query
* tree is the original query.
*/
Deparsing and parsing a query can be heavy on CPU. When locally executing
the query we don't need to do this in theory most of the time.
This PR is the first step in allowing to skip deparsing and parsing
the query in these cases, by lazily creating the query string and
storing the query in the task. Future commits will make use of this and
not deparse and parse the query anymore, but use the one from the task
directly.
This is purely to enable better performance with prepared statements.
Before this commit, the fast path queries with prepared statements
where the distribution key includes a parameter always went through
distributed planning. After this change, we only go through distributed
planning on the first 5 executions.
DESCRIPTION: Fixes a problem when adding a new node due to tables referenced in a functions body
Fixes#3378
It was reported that `master_add_node` would fail if a distributed function has a table name referenced in its declare section of the body. By default postgres validates the body of a function on creation. This is not a problem in the normal case as tables are replicated to the workers when we distribute functions.
However when a new node is added we first create dependencies on the workers before we try to create any tables, and the original tables get created out of bound when the metadata gets synced to the new node. This causes the function body validator to raise an error the table is not on the worker.
To mitigate this issue we set `check_function_bodies` to `off` right before we are creating the function.
The added test shows this does resolve the issue. (issue can be reproduced on the commit without the fix)
In this commit, we're introducing a way to prevent CTE inlining via a GUC.
The GUC is used in all the tests where PG 11 and PG 12 tests would diverge
otherwise.
Note that, in PG 12, the restriction information for CTEs are generated. It
means that for some queries involving CTEs, Citus planner (router planner/
pushdown planner) may behave differently. So, via the GUC, we prevent
tests to diverge on PG 11 vs PG 12.
When we drop PG 11 support, we should get rid of the GUC, and mark
relevant ctes as MATERIALIZED, which does the same thing.
These set of tests has changed in both PG 11 and PG 12.
The changes are only about CTE inlining kicking in both
versions, and yielding the exact same distributed planning.
With this commit, we're adding the specific tests for CTE inlining.
The test has a different output file for pg 11, because as mentioned
in the previous commits, PG 12 generates more restriction information
for CTEs.
The idea is simple: Inline CTEs(if any), try distributed planning.
If the planning yields a successful distributed plan, simply return
it.
If the planning fails, fallback to distributed planning on the query
tree where CTEs are not inlined. In that case, if the planning failed
just because of the CTE inlining, via recursive planning, the same
query would yield a successful plan.
A very basic set of examples:
WITH cte_1 AS (SELECT * FROM test_table)
SELECT
*, row_number() OVER ()
FROM
cte_1;
or
WITH a AS (SELECT * FROM test_table),
b AS (SELECT * FROM test_table)
SELECT * FROM a JOIN b ON (a.value> b.value);
With this commit we add the necessary Citus function to inline CTEs
in a queryTree.
You might ask, why do we need to inline CTEs if Postgres is already
going to do it?
Few reasons behind this decision:
- One techinal node here is that Citus does the recursive CTE planning
by checking the originalQuery which is the query that has not gone
through the standard_planner().
CTEs in Citus is super powerful. It is practically key for full SQL
coverage for multi-shard queries. With CTEs, you can always reduce
any query multi-shard query into a router query via recursive
planning (thus full SQL coverage).
We cannot let CTE inlining break that. The main idea is Citus should
be able to retry planning if anything goes after CTE inlining.
So, by taking ownership of CTE inlining on the originalQuery, Citus
can fallback to recursive planning of CTEs if the planning with the
inlined query fails. It could have been a lot harder if we had relied
on standard_planner() to have the inlined CTEs on the original query.
- We want to have this feature in PostgreSQL 11 as well, but Postgres
only inlines in version 12
All the code in this commit is direct copy & paste from Postgres
source code.
We can classify the copy&paste code into two:
- Copy paste from CTE inline patch from postgres
(https://git.postgresql.org/gitweb/?p=postgresql.git;a=commitdiff;h=608b167f9f9c4553c35bb1ec0eab9ddae643989b)
These include the functions inline_cte(), inline_cte_walker(),
contain_dml(), contain_dml_walker().
It also include the code in function PostgreSQLCTEInlineCondition().
We prefer to extract that code into a seperate function, because
(a) we'll re-use the logic later (b) we added one check for PG_11
Finally, the struct "inline_cte_walker_context" is also copied from
the same Postgres commit.
- Copy paste from the other parts of the Postgres code
In order to implement CTE inlining in Postgres 12, the hackers
modified the query_tree_walker()/range_table_walker() with the
18c0da88a5
Since Citus needs to support the same logic in PG 11, we copy & pasted
that functions (and related flags) with the names pg_12_query_tree_walker()
and pg_12_range_table_walker()
In two places I've made code more straight forward by using ROUTINE in our own codegen
Two changes which may seem extraneous:
AppendFunctionName was updated to not use pg_get_function_identity_arguments.
This is because that function includes ORDER BY when printing an aggregate like my_rank.
While ALTER AGGREGATE my_rank(x "any" ORDER BY y "any") is accepted by postgres,
ALTER ROUTINE my_rank(x "any" ORDER BY y "any") is not.
Tests were updated to use macaddr over integer. Using integer is flaky, our logic
could sometimes end up on tables like users_table. I originally wanted to use money,
but money isn't hashable.
We might need to send commands from workers to other workers. In
these cases we shouldn't override the xact id assigned by coordinator,
or otherwise we won't read the consistent set of result files
accross the nodes.
We need to know which placement succeeded in executing the worker_partition_query_result() call. Otherwise we wouldn't know which node to fetch from. This change allows that by introducing Task::perPlacementQueryStrings.
Fixes#3331
In #2389, we've implemented support for partitioned tables with rep > 1.
The implementation is limiting the use of modification queries on the
partitions. In fact, we error out when any partition is modified via
EnsurePartitionTableNotReplicated().
However, we seem to forgot an important case, where the parent table's
partition is marked as INVALID. In that case, at least one of the partition
becomes INVALID. However, we do not mark partitions as INVALID ever.
If the user queries the partition table directly, Citus could happily send
the query to INVALID placements -- which are not marked as INVALID.
This PR fixes it by marking the placements of the partitions as INVALID
as well.
The shard placement repair logic already re-creates all the partitions,
so should be fine in that front.