* Use translated vars in postgres 13 as well
Postgres 13 removed translated vars with pg 13 so we had a special logic
for pg 13. However it had some bug, so now we copy the translated vars
before postgres deletes it. This also simplifies the logic.
* fix rtoffset with pg >= 13
/*
* The physical planner assumes that all worker queries would have
* target list entries based on the fact that at least the column
* on the JOINs have to be on the target list. However, there is
* an exception to that if there is a cartesian product join and
* there is no additional target list entries belong to one side
* of the JOIN. Once we support cartesian product join, we should
* remove this error.
*/
When executing alter_table / undistribute_table udf's, we should not try
to change sequence dependencies on MX workers if new table wouldn't
require syncing metadata.
Previously, we were checking that for input table. But in some cases, the
fact that input table requires syncing metadata doesn't imply the same
for resulting table (e.g when undistributing a Citus table).
Even more, doing that was giving an unexpected error when undistributing
a Citus table so this commit actually fixes that.
It seems that we need to consider only pseudo constants while doing some
shortcuts in planning. For example there could be a false clause but it
can contribute to the result in which case it will not be a pseudo
constant.
We would exclude tables without relationRestriction from conversion
candidates in local-distributed table joins. This could leave a leftover
local table which should have been converted to a subquery.
Ideally I would expect that in each call to CreateDistributedPlan we
would pass a new plan id, but that seems like a bigger change.
/*
* Colocated intermediate results are just files and not required to use
* the same connections with their co-located shards. So, we are free to
* use any connection we can get.
*
* Also, the current connection re-use logic does not know how to handle
* intermediate results as the intermediate results always truncates the
* existing files. That's why, we use one connection per intermediate
* result.
*/
We do not include dummy column if original task didn't return any
columns.
Otherwise, number of columns that original task returned wouldn't
match number of columns returned by worker_save_query_explain_analyze.
When COPY is used for copying into co-located files, it was
not allowed to use local execution. The primary reason was
Citus treating co-located intermediate results as co-located
shards, and COPY into the distributed table was done via
"format result". And, local execution of such COPY commands
was not implemented.
With this change, we implement support for local execution with
"format result". To do that, we use the buffer for every file
on shardState->copyOutState, similar to how local copy on
shards are implemented. In fact, the logic is similar to
local copy on shards, but instead of writing to the shards,
Citus writes the results to a file.
The logic relies on LOCAL_COPY_FLUSH_THRESHOLD, and flushes
only when the size exceeds the threshold. But, unlike local
copy on shards, in this case we write the headers and footers
just once.
* Sort results in citus_shards and give raw size
Sort results so that it is consistent and also similar to citus_tables.
Use raw size in the output so that doing operations on the size is
easier.
* Change column ordering
With #4338, the executor is smart enough to failover to
local node if there is not enough space in max_connections
for remote connections.
For COPY, the logic is different. With #4034, we made COPY
work with the adaptive connection management slightly
differently. The cause of the difference is that COPY doesn't
know which placements are going to be accessed hence requires
to get connections up-front.
Similarly, COPY decides to use local execution up-front.
With this commit, we change the logic for COPY on local nodes:
Try to reserve a connection to local host. This logic follows
the same logic (e.g., citus.local_shared_pool_size) as the
executor because COPY also relies on TryToIncrementSharedConnectionCounter().
If reservation to local node fails, switch to local execution
Apart from this, if local execution is disabled, we follow the
exact same logic for multi-node Citus. It means that if we are
out of the connection, we'd give an error.
It seems that we were not considering the case where coordinator was
added to the cluster as a worker in the optimization of intermediate
results.
This could lead to errors when coordinator was added as a worker.
pg_get_tableschemadef_string doesn't know how to deparse identity
columns so we cannot reflect those columns when creating table
from scratch. For this reason, we don't allow using alter_table udfs
with tables having any identity cols.
pg_get_tableschemadef_string doesn't know how to deparse identity
columns so we cannot reflect those columns when creating shell
relation.
For this reason, we don't allow adding local tables -having identity cols-
to metadata.
Postgres doesn't allow inserting into columns having GENERATED ALWAYS
AS (...) STORED expressions.
For this reason, when executing undistribute_table or an alter_* udf,
we should skip copying such columns.
This is not bad since Postgres would already generate such columns.
Enables an overall plan to be parallel (e.g. over a partition
hierarchy), even though an individual ColumnarScan is not
parallel-aware.
Co-authored-by: Jeff Davis <jefdavi@microsoft.com>
Previously, if columnar.enable_custom_scan was false, parallel paths
could remain, leading to an unexpected error.
Also, ensure that cheapest_parameterized_paths is cleared if a custom
scan is used.
Co-authored-by: Jeff Davis <jefdavi@microsoft.com>
When finding columns owning sequences, we shouldn't rely on atthasdef
since it might be true when column has GENERATED ALWAYS AS (...)
STORED expression.
* Fix partition column index issue
We send column names to worker_hash/range_partition_table methods, and
in these methods we check the column name index from tuple descriptor.
Then this index is used to decide the bucket that the current row will
be sent for the repartition.
This becomes a problem when there are the same column names in the
tupleDescriptor. Then we can choose the wrong index. Hence the
partitioned data will be put to wrong workers. Then the result could
miss some data because workers might contain different range of data.
An example:
TupleDescriptor contains "trip_id", "car_id", "car_id" for one table.
It contains only "car_id" for the other table. And assuming that the
tables will be partitioned by car_id, it is not certain what should be
used for deciding the bucket number for the first table. Assuming value
2 goes to bucket 2 and value 3 goes to bucket 3, it is not certain which
bucket "1 2 3" (trip_id, car_id, car_id) row will go to.
As a solution we send the index of partition column in targetList
instead of the column name.
The old API is kept so that if workers upgrade work, it still works
(though it will have the same bug)
* Use the same method so that backporting is easier
Fixing a division by zero in the cost calculations for scanning a columnar table.
Due to how the columns in a columnar table are counted an empty table would result in a division by zero. Instead this patch keeps the column selection ratio on zero when this happens, resulting in an accurate cost of zero pages to scan a columnar table.
fixes#4589
* Make undistribute_table() and citus_create_local_table() work with columnar
* Rename and use LocallyExecuteUtilityTask for UDF check
* Remove 'local' references in ExecuteUtilityCommand
/*
* Creating Citus local tables relies on functions that accesses
* shards locally (e.g., ExecuteAndLogDDLCommand()). As long as
* we don't teach those functions to access shards remotely, we
* cannot relax this check.
*/
Logical replication status can take wal_receiver_status_interval
seconds to get updated. Default is 10s, which means tests in
which logical replication is used can take a long time to finish.
We reduce it to 1 second to speed these tests up.
Logical replication apply launcher launches workers every
wal_retrieve_retry_interval, so if we have many shard moves with
logical replication consecutively, they will be throttled by this
parameter. Default is 5s, we reduce it to 1s so we finish tests
faster.
The reason behind skipping postgres tables is that we support
foreign keys between postgres tables and reference tables
(without converting postgres tables to citus local tables)
when enable_local_reference_table_foreign_keys is false or
when coordinator is not added to metadata.
When enabled any foreign keys between local tables and reference
tables supported by converting the local table to a citus local
table.
When the coordinator is not in the metadata, the logic is disabled
as foreign keys are not allowed in this configuration.
If relation is not involved in any foreign key relationships,
foreign key graph would not return any relations for given
relationId as expected.
But even if it's the case, we should still undistribute the table
itself.
DESCRIPTION: Add tests to verify crash recovery for columnar tables
Based on the Postgres TAP tooling we add a new test suite to the array of test suites for citus. It is modelled after `src/test/recovery` in the postgres project and takes the same place in our repository. It uses the perl modules defined in the postgres project to control the postgres nodes.
The test we add here focus on crash recovery. Our follower tests should cover the streaming replication behaviour.
It is hooked to our CI for both postgres 12 and postgres 13. We omit the recovery tests for postgres 11 as we do not have support for the columnar table access method.
* Stronger check for triggers on columnar tables (#4493).
Previously, we used a simple ProcessUtility_hook. Change to use an
object_access_hook instead.
* Replace alter_table_set_access_method test on partition with foreign key
Co-authored-by: Jeff Davis <jefdavi@microsoft.com>
Co-authored-by: Marco Slot <marco.slot@gmail.com>
With citus shard helper view, we can easily see:
- where each shard is, which node, which port
- what kind of table it belongs to
- its size
With such a view, we can see shards that have a size bigger than some
value, which could be useful. Also debugging can be easier in production
as well with this view.
Fetch shards in one go per node
The previous implementation was slow because it would do a lot of round
trips, one per shard to be exact. Hence it is improved so that we fetch
all the shard_name, shard-size pairs per node in one go.
Construct shards_names, sizes query on coordinator
* Replace master_add_node with citus_add_node
* Replace master_activate_node with citus_activate_node
* Replace master_add_inactive_node with citus_add_inactive_node
* Use master udfs in old scripts
* Replace master_add_secondary_node with citus_add_secondary_node
* Replace master_disable_node with citus_disable_node
* Replace master_drain_node with citus_drain_node
* Replace master_remove_node with citus_remove_node
* Replace master_set_node_property with citus_set_node_property
* Replace master_unmark_object_distributed with citus_unmark_object_distributed
* Replace master_update_node with citus_update_node
* Replace master_update_shard_statistics with citus_update_shard_statistics
* Replace master_update_table_statistics with citus_update_table_statistics
* Rename master_conninfo_cache_invalidate to citus_conninfo_cache_invalidate
Rename master_dist_local_group_cache_invalidate to citus_dist_local_group_cache_invalidate
* Replace master_copy_shard_placement with citus_copy_shard_placement
* Replace master_move_shard_placement with citus_move_shard_placement
* Rename master_dist_node_cache_invalidate to citus_dist_node_cache_invalidate
* Rename master_dist_object_cache_invalidate to citus_dist_object_cache_invalidate
* Rename master_dist_partition_cache_invalidate to citus_dist_partition_cache_invalidate
* Rename master_dist_placement_cache_invalidate to citus_dist_placement_cache_invalidate
* Rename master_dist_shard_cache_invalidate to citus_dist_shard_cache_invalidate
* Drop master_modify_multiple_shards
* Rename master_drop_all_shards to citus_drop_all_shards
* Drop master_create_distributed_table
* Drop master_create_worker_shards
* Revert old function definitions
* Add missing revoke statement for citus_disable_node
CREATE TABLE does not invalidate foreign key graph but some other set of
ddl commands do.
Previously, as we run multi_foreign_key & multi_foreign_key_relation_graph
in parallel, it's possible that multi_foreign_key invalidates foreign key
graph via some ddl commands and create table test in
multi_foreign_key_relation_graph becomes flaky.
So we un-parallelize those two tests.
* Rethrow original concurrent index creation failure message
* Alter test outputs for concurrent index creation
* Detect duplicate table failure in concurrent index creation
* Add test for conc. index creation w/out duplicates
* Prevent deadlock for long named partitioned index creation on single node
* Create IsSingleNodeCluster function
* Use both local and sequential execution
On top of our foreign key graph, implement the infrastructure to get
list of relations that are connected to input relation via a foreign key
graph.
We need this to support cascading create_citus_local_table &
undistribute_table operations.
Also add regression tests to see what our foreign key graph is able to
capture currently.
Attribute number in a subquery RTE and relation RTE means different
things. In a relation attribute number will point to the column number
in the table definition including the dropped columns as well however in
subquery, it means the index in the target list. When we convert a
relation RTE to subquery RTE we should either correct all the relevant
attribute numbers or we can just add a dummy column for the dropped
columns. We choose the latter in this commit because it is practically
too vulnerable to update all the vars in a query.
Another thing this commit fixes is that in case a join restriction
clause list contains a false clause, we should just returns a false
clause instead of the whole list, because the whole list will contain
restrictions from other RTEs as well and this breaks the query, which
can be seen from the output changes, now it is much simpler.
Also instead of adding single tests for dropped columns, we choose to
run the whole mixed queries with tables with dropped columns, this
revealed some bugs already, which are fixed in this commit.
It seems that there are only very few cases where that is useful, and
for now we prefer not having that check. This means that we might
perform some unnecessary checks, but that should be rare and not
performance critical.
Instead of sending NULL's over a network, we now convert the subqueries
in the form of:
SELECT t.a, NULL, NULL FROM (SELECT a FROM table)t;
And we recursively plan the inner part so that we don't send the NULL's
over network. We still need the NULLs in the outer subquery because we
currently don't have an easy way of updating all the necessary places in
the query.
Add some documentation for how the conversion is done
Baseinfo also has pushed down filters etc, so it makes more sense to use
BaseRestrictInfo to determine what columns have constant equality
filters.
Also RteIdentity is used for removing conversion candidates instead of
rteIndex.
It seems that most of the updates were broken, we weren't aware of it
because there wasn't any data in the tables. They are broken mostly
because local tables do not have a shard id and some code paths should
be updated with that information, currently when there is an invalid
shard id, it is assumed to be pruned.
Consider local tables in router planner
In case there is a local table, the shard id will not be valid and there
are some checks that rely on shard id, we should skip these in case of
local tables, which is handled with a dummy placement.
Add citus local table dist table join tests
add local-dist table mixed joins tests
AllDataLocallyAccessible and ContainsLocalTableSubqueryJoin are removed.
We can possibly remove ModifiesLocalTableWithRemoteCitusLocalTable as
well. Though this removal has a side effect that now when all the data
is locally available, we could still wrap a relation into a subquery, I
guess that should be resolved in the router planner itself.
Add more tests
When we wrap an RTE to subquery we are updating the variables varno's as
1, however we should also update the varno's of vars in quals.
Also some other small code quality improvements are done.
The previous algorithm was not consistent and it could convert different
RTEs based on the table orders in the query. Now we convert local tables
if there is a distributed table which doesn't have a unique index. So if
there are 4 tables, local1, local2, dist1, dist2_with_pkey then we will
convert local1 and local2 in `auto` mode. Converting a distributed table
is not that logical because as there is a distributed table without a
unique index, we will need to convert the local tables anyway. So
converting the distributed table with pkey is redundant.
Remove FillLocalAndDistributedRTECandidates and use
ShouldConvertLocalTableJoinsToSubqueries, which simplifies things as we
rely on a single function to decide whether we should continue
converting RTE to subquery.
We should not recursively plan an already routable plannable query. An
example of this is (SELECT * FROM local JOIN (SELECT * FROM dist) d1
USING(a));
So we let the recursive planner do all of its work and at the end we
convert the final query to to handle unsupported joins. While doing each
conversion, we check if it is router plannable, if so we stop.
Only consider range table entries that are in jointree
If a range table is not in jointree then there is no point in
considering that because we are trying to convert range table entries to
subqueries for join use case.
Check equality in quals
We want to recursively plan distributed tables only if they have an
equality filter on a unique column. So '>' and '<' operators will not
trigger recursive planning of distributed tables in local-distributed
table joins.
Recursively plan distributed table only if the filter is constant
If the filter is not a constant then the join might return multiple rows
and there is a chance that the distributed table will return huge data.
Hence if the filter is not constant we choose to recursively plan the
local table.
When doing local-distributed table joins we convert one of them to
subquery. The current policy is that we convert distributed tables to
subquery if it has a unique index on a column that has unique
index(primary key also has a unique index).
UPDATEs on partitioned tables that affect only row partitions should
succeed, the rest should fail.
Also rename CStoreScan to ColumnarScan to make the error message more
relevant.
When Citus needs to parallelize queries on the local node (e.g., the node
executing the distributed query and the shards are the same), we need to
be mindful about the connection management. The reason is that the client
backends that are running distributed queries are competing with the client
backends that Citus initiates to parallelize the queries in order to get
a slot on the max_connections.
In that regard, we implemented a "failover" mechanism where if the distributed
queries cannot get a connection, the execution failovers the tasks to the local
execution.
The failover logic is follows:
- As the connection manager if it is OK to get a connection
- If yes, we are good.
- If no, we fail the workerPool and the failure triggers
the failover of the tasks to local execution queue
The decision of getting a connection is follows:
/*
* For local nodes, solely relying on citus.max_shared_pool_size or
* max_connections might not be sufficient. The former gives us
* a preview of the future (e.g., we let the new connections to establish,
* but they are not established yet). The latter gives us the close to
* precise view of the past (e.g., the active number of client backends).
*
* Overall, we want to limit both of the metrics. The former limit typically
* kics in under regular loads, where the load of the database increases in
* a reasonable pace. The latter limit typically kicks in when the database
* is issued lots of concurrent sessions at the same time, such as benchmarks.
*/
When distributing a columnar table, as well as changing options on a distributed columnar table, this patch will forward the settings from the coordinator to the workers.
For propagating options changes on an already distributed table this change is pretty straight forward. Before applying the change in options locally we will create a `DDLJob` that contains a call to `alter_columnar_table_set(...)` for every shard placement with all settings of the current table. This goes both for setting an option as well as resetting. This will reset the values to the defaults configured on the coordinator. Having the effect that the coordinator is authoritative on the settings and makes sure the shards have the same settings set as the table on the coordinator.
When a columnar table is distributed it is using the `TableDDLCommand` infra structure to create a new kind of `TableDDLCommand`. This new type, called a `TableDDLCommandFunction` contains a context and 2 function pointers to execute. One function returns the command as applied on the table, the second function will return the sql command to apply to a shard with a given shard id. The schema name is ignored as it will use the fully qualified name of the shard in the same schema as the base table.
Multi-row execution already uses sequential execution. When shards
are local, using local execution is profitable as it avoids
an extra connection establishment to the local node.
Join test gets too many clients error too frequently hence we should
not run anything concurrently with that. Hopefully this will fix the
flakiness of test.
This is to avoid flaky changes like the following in test outputs:
-CPU: user: 0.00 s, system: 0.00 s, elapsed: 0.00 s.
+CPU: user: 0.00 s, system: 0.00 s, elapsed: 0.02 s.
Columnar options were by accident linked to the relfilenode instead of the regclass/relation oid. This PR moves everything related to columnar options to their own catalog table.
Considering the adaptive connection management
improvements that we plan to roll soon, it makes it
very helpful to know the number of active client
backends.
We are doing this addition to simplify yhe adaptive connection
management for single node Citus. In single node Citus, both the
client backends and Citus parallel queries would compete to get
slots on Postgres' `max_connections` on the same Citus database.
With adaptive connection management, we have the counters for
Citus parallel queries. That helps us to adaptively decide
on the remote executions pool size (e.g., throttle connections
if necessary).
However, we do not have any counters for the total number of
client backends on the database. For single node Citus, we
should consider all the client backends, not only the remote
connections that Citus does.
Of course Postgres internally knows how many client
backends are active. However, to get that number Postgres
iterates over all the backends. For examaple, see [pg_stat_get_db_numbackends](8e90ec5580/src/backend/utils/adt/pgstatfuncs.c (L1240))
where Postgres iterates over all the backends.
For our purpuses, we need this information on every connection
establishment. That's why we cannot affort to do this kind of
iterattion.
CitusTableTypeIdList() function iterates on all the entries of pg_dist_partition
and loads all the metadata in to the cache. This can be quite memory intensive
especially when there are lots of distributed tables.
When partitioned tables are used, it is common to have many distributed tables
given that each partition also becomes a distributed table.
CitusTableTypeIdList() is used on every CREATE TABLE .. PARTITION OF.. command
as well. It means that, anytime a partition is created, Citus loads all the
metadata to the cache. Note that Citus typically only loads the accessed table's
metadata to the cache.
* Move local execution after the remote execution
Before this commit, when both local and remote tasks
exist, the executor was starting the execution with
local execution. There is no strict requirements on
this.
Especially considering the adaptive connection management
improvements that we plan to roll soon, moving the local
execution after to the remote execution makes more sense.
The adaptive connection management for single node Citus
would look roughly as follows:
- Try to connect back to the coordinator for running
parallel queries.
- If succeeds, go on and execute tasks in parallel
- If fails, fallback to the local execution
So, we'll use local execution as a fallback mechanism. And,
moving it after to the remote execution allows us to implement
such further scenarios.
Before this commit, we let AdaptiveExecutorPreExecutorRun()
to be effective multiple times on every FETCH on cursors.
That does not affect the correctness of the query results,
but adds significant overhead.
TableAM API doesn't allow us to pass around a state variable along all of the tuple inserts belonging to the same command. We require this in columnar store, since we batch them, and when we have enough rows we flush them as stripes.
To do that, we keep a (relfilenode) -> stack of (subxact id, TableWriteState) global mapping.
**Inserts**
Whenever we want to insert a tuple, we look up for the relation's relfilenode in this mapping. If top of the stack matches current subtransaction, we us the existing TableWriteState. Otherwise, we allocate a new TableWriteState and push it on top of stack.
**(Sub)Transaction Commit/Aborts**
When the subtransaction or transaction is committed, we flush and pop all entries matching current SubTransactionId.
When the subtransaction or transaction is committed, we pop all entries matching current SubTransactionId and discard them without flushing.
**Reads**
Since we might have unwritten rows which needs to be read by a table scan, we flush write states on SELECTs. Since flushing the write state of upper transactions in a subtransaction will cause metadata being written in wrong subtransaction, we ERROR out if any of the upper subtransactions have unflushed rows.
**Table Drops**
We record in which subtransaction the table was dropped. When committing a subtransaction in which table was dropped, we propagate the drop to upper transaction. When aborting a subtransaction in which table was dropped, we mark table as not deleted.
* Update failure test dependencies
There was a security alert for cryptography. The vulnerability was fixed
in 3.2.0. The vulnebarility:
"RSA decryption was vulnerable to Bleichenbacher timing vulnerabilities,
which would impact people using RSA decryption in online scenarios."
The fix:
58494b41d6
It wasn't enough to only update crpytography because mitm was
incompatible with the new version, so mitm is also upgraded.
The steps to do in local:
python -m pip install -U cryptography
python -m pip install -U mitmproxy
When a relation is used on an OUTER JOIN with FALSE filters,
set_rel_pathlist_hook may not be called for the table.
There might be other cases as well, so do not rely on the hook
for classification of the tables.
Aliases that postgres choose for partitioned tables in explain output
might change in different pg versions, so normalize them and remove
the alternative test output
* Fix incorrect join related fields
Ruleutils expect to give the original index of join columns hence we
should consider the dropped columns while setting the fields in
SetJoinRelatedFieldsCompat.
* add some more tests for joins
* Move tests to join.sql and create a utility function
Disallow `ON TRUE` outer joins with reference & distributed tables
when reference table is outer relation by fixing the logic bug made
when calling `LeftListIsSubset` function.
Also, be more defensive when removing duplicate join restrictions
when join clause is empty for non-inner joins as they might still
contain useful information for non-inner joins.
It seems like Postgres could call set_rel_pathlist() for
the same relation multiple times. This breaks the logic
where we assume relationCount eqauls to the number of
entries in relationRestrictionList.
In summary, relationRestrictionList may contain duplicate
entries.
With this commit, we make sure that local execution adds the
intermediate result size as the distributed execution adds. Plus,
it enforces the citus.max_intermediate_result_size value.
Before this commit, the logic was:
- As long as the outer side of the JOIN is not a JOIN (e.g., relation
or subquery etc.), we check for the existence of any recurring
tuples. There were two implications of this decision.
First, even if a subquery which is on the outer side contains
distributed table JOIN reference table, Citus would unnecessarily throw
an error. Note that, the JOIN inside the subquery would already
be going to be tested recursively. But, as long as that check
passes, there is no reason for the upper JOIN to fail. An example, which
used to fail and now works:
SELECT * FROM (SELECT * FROM dist JOIN ref) as foo LEFT JOIN dist;
Second, certain JOINs, especially with ON (true) conditions were not
represented as Citus expects the JOINs to be in the format
DeferredErrorIfUnsupportedRecurringTuplesJoin().
Use short lived per-tuple context in citus_evaluate_expr like
(pg) evaluate_expr does.
We should not use planState->ExprContext when evaluating expressions
as it might lead to freeing the same executor twice (first one happens
in citus_evaluate_expr itself and the other one happens when postgres
doing clean-up for the top level executor state), which in turn might
cause seg.faults.
However, now as we don't have necessary planState info to evaluate
prepared statements, we also add planState->es_param_list_info to
per-tuple ExprContext.
With postgres 13, there is a global lock that prevents multiple VACUUMs
happening in the current database. This global lock is taken for a short
time but this creates a problem because of the following:
- We execute the VACUUM for the shell table through the standard process
utility. In this step the global lock is taken for the current database.
- If the current node has shard placements then it tries to execute
VACUUM over a connection to localhost with ExecuteUtilityTaskList.
- the VACUUM on shard placements cannot proceed because it is waiting
for the global lock for the current database to be released.
- The acquired lock from the VACUUM for shell table will not be released
until the transaction is committed.
- So there is a deadlock.
As a solution, we commit the current transaction in case of VACUUM after
the VACUUM is executed for the shell table. Executing the VACUUM on a
shell table is not important because the data there will probably be
truncated. PostprocessVacuumStmt takes the necessary locks on the shell
table so we don't need to take any extra locks after we commit the
current transaction.
In our test structure, we have been passing postgres configurations from
the terminal, which causes problems after it hits to a certain length
hence it cannot start the server and understanding why it failed is not
easy because there isn't a nice error message.
This commit changes this to write the settings directly to the postgres
configuration file. This way we can add as many postgres settings as we
want to without needing to worry about the length problem.
Multi-row & router INSERT's were crashing with local execution if at
least one of the DEFAULT columns were not specified in VALUES list.
This was because, the changes we make on query->values_lists and
query->targetList was sufficient for deparsing given INSERT for remote
execution but not sufficient for local execution.
With this commit, DEFAULT value normalization for multi-row & router
INSERT's is fixed by adding dummy column references for unspecified
DEFAULT columns.
Citus has the logic to truncate the long shard names to prevent
various issues, including self-deadlocks. However, for partitioned
tables, when index is created on the parent table, the index names
on the partitions are auto-generated by Postgres. We use the same
Postgres function to generate the index names on the shards of the
partitions. If the length exceeds the limit, we switch to sequential
execution mode.
We currently do not support volatile functions in update/delete statements
because the function evaluation logic does not know how to distinguish
volatile functions (that need to be evaluated per row) from stable functions
(that need to be evaluated per query), and it is also not safe to push the
volatile functions down on replicated tables.
Add sort method parameter for regression tests
Fix check-style
Change sorting method parameters to enum
Polish
Add task fields to OutTask
Add test into multi_explain
Fix isolation test
As the previous versions of Citus don't know how to handle citus local
tables, we should prevent downgrading from 9.5 to older versions if any
citus local tables exists.
Pushing down the CALLs to the node that the CALL is executed is
dangerous and could lead to infinite recursion.
When the coordinator added as worker, Citus was by chance preventing
this. The coordinator was marked as "not metadatasynced" node
in pg_dist_node, which prevented CALL/function delegation to happen.
With this commit, we do the following:
- Fix metadatasynced column for the coordinator on pg_dist_node
- Prevent pushdown of function/procedure to the same node that
the function/procedure is being executed. Today, we do not sync
pg_dist_object (e.g., distributed functions metadata) to the
worker nodes. But, even if we do it now, the function call delegation
would prevent the infinite recursion.
* Not allow removing a single node with ref tables
We should not allow removing a node if it is the only node in the
cluster and there is a data on it. We have this check for distributed
tables but we didn't have it for reference tables.
* Update src/test/regress/expected/single_node.out
Co-authored-by: Onur Tirtir <onurcantirtir@gmail.com>
* Update src/test/regress/sql/single_node.sql
Co-authored-by: Onur Tirtir <onurcantirtir@gmail.com>
This commit brings following features:
Foreign key support from citus local tables to reference tables
* Foreign key support from reference tables to citus local tables
(only with RESTRICT & NO ACTION behavior)
* ALTER TABLE ENABLE/DISABLE trigger command support
* CREATE/DROP/ALTER trigger command support
and disallows:
* ALTER TABLE ATTACH/DETACH PARTITION commands
* CREATE TABLE <postgres table> ATTACH PARTITION <citus local table>
commands
* Foreign keys from postgres tables to citus local tables
(the other way was already disallowed)
for citus local tables.
create_distributed_function(function_name,
distribution_arg_name,
colocate_with text)
This UDF did not allow colocate_with parameters when there were no
disttribution_arg_name supplied. This commit changes the behaviour to
allow missing distribution_arg_name parameters when the function should
be colocated with a reference table.
* Hide citus.subquery_pushdown flag
This flag is dangerous and could likely to let queries
return wrong results.
The flag has a very specific purpose for a very specific
data distribution and query structure. In those cases, when
the flag is set, the user can skip recursive planning altogether
*at their own risk*.
The meaning of the flag is that "I know what I'm doing such that
the query structure/data distribution is on my control, so Citus
can skip many correctness checks".
For regular users, enabling this flag is discouraged. We have to
keep the support only for backward compatibility for some users.
In addition to that, give a NOTICE to discourage new users to
use it.
* Update and separate test images
The build image was a single one and it would contain pg11, pg12 and
pg13. Now it is separated so that we can build each pg major
independently.
Tags are used as full postgres versions so that we can know which
version we use by looking at the tag. For example exttester:11.9 would
mean we are using pg11.9.
pg11 is updated from 11.5 to 11.9.
pg12 is updated from 12rc to 12.4.
* Ignore memory usage in pg13 explain
* Use citus instead of personal repo
AllTargetExpressionsAreColumnReferences would return false if a query
had an entry that is referencing the outer query. It seems safe to not
have this for non-distributed tables, such as reference tables. We
already have separate checks for other cases such as having limits.
The error message when index has opclassopts is improved and the commit
from postgres side is also included for future reference.
Also some minor style related changes are applied.
Error out if index has opclassopts.
Changelog entry on PG13:
Allow CREATE INDEX to specify the GiST signature length and maximum number of integer ranges (Nikita Glukhov)
It seems that we don't support propagating commands related to base
types. Therefore Alter TYPE options doesn't seem to apply to us. I have
added a test to verify that we don't propagate them.
Changelog entry on pg13:
Add ALTER TYPE options useful for extensions, like TOAST and I/O functions control (Tomas Vondra, Tom Lane)
Unicode escapes work as expected, related tests are added.
Changelog entry on PG13:
Allow Unicode escapes, e.g., E'\u####', U&'\####', to specify any character available in the database encoding, even when the database encoding is not UTF-8 (Tom Lane)
Tests for is_normalized and normalized ar eadded. One thing that seems
to be because of existent bug is that when we don't give the second
argument to normalize or is_normalized, which is optional, it crashes.
Because in the executor part, in the expression we don't have the
default argument.
Changelog entry in PG-13:
Add SQL functions NORMALIZE() to normalize Unicode strings, and IS NORMALIZED to check for normalization (Peter Eisentraut)
Commit on Postgres:
2991ac5fc9b3904ca4582be6d323497d7c3d17c9
It seems that row suffix notation is working fine with our code, a test
is added.
Changelog entry in PG13:
Allow ROW values values to have their members extracted with suffix notation (Tom Lane)
PG13 now supports dropping expression from a column such as generated
columns. We error out with this currently.
Changelog entry in postgres:
Add ALTER TABLE clause DROP EXPRESSION to remove generated properties from columns (Peter Eisentraut)
Postgres 13 added a new VACUUM option, PARALLEL. It is now supported
in our code as well.
Relevant changelog message on postgres:
Allow VACUUM to process indexes in parallel (Masahiko Sawada, Amit Kapila)
With pg13, constants functions from "FROM" clause are replaced. This
means that in citus side, we will see the constraints in restriction
info, instead of the function call. For example:
SELECT * FROM table1 JOIN add(3,5) sum ON (id = sum) ORDER BY id ASC;
Assuming that the function `add` returns constant, it will be evaluated
on postgres side. This means that this query will be routable because
there will be only one shard after pruning with the restrictions.
However before pg13, this would be multi shard query. And it would go
into recursive planning, the function would be evaluated on the
coordinator because it can be.
This means that with pg13, users will need to distribute the function
because when it is routable executable, it will currently also send the
function call to the worker in the query. So the function should exist
in the worker.
It could be better to replace the constant in the query tree as well so
that the query string sent to the worker has the constant value and
therefore it doesn't need the function. However I feel like users would
already have the function in workers if they have any multi shard query.
Commit on Postgres side:
7266d0997dd2a0632da38a594c78e25ff21df67e
CREATE EXTENSION <name> FROM <old_version> is not supported anymore with
postgres 13. An alternative output is added for pg13 where we basically
error for that statement.
The not-null constraint message changed with pg13 slightly hence a
normalization rule is added for that, which converts it to pg < 13
output.
Commit on postgres:
05f18c6b6b6e4b44302ee20a042cedc664532aa2
An extra debug message is added related to indexes on postgres, these
are safe to be ignored, so we can delete them from tests.
Commit on Postgres side:
612a1ab76724aa1514b6509269342649f8cab375
varnoold is renamed as varnosyn and varoattno is renamed as varattnosyn
so in the output we normalize the values as the old ones to simply pass
the tests.
With this patch, we introduce `locally_reserved_shared_connections.c/h` files
which are responsible for reserving some space in shared memory counters
upfront.
We sometimes need to reserve connections, but not necessarily
establish them. For example:
- COPY command should reserve connections as it cannot know which
connections it needs in which order. COPY establishes connections
as any input data hits the workers. For example, for router COPY
command, it only establishes 1 connection.
As discussed here (https://github.com/citusdata/citus/pull/3849#pullrequestreview-431792473),
COPY needs to reserve connections up-front, otherwise we can end
up with resource starvation/un-detected deadlocks.
* ensure propagation of CHECK statements to workers with parantheses & adjust regression test outputs
* add tests for distributing tables with simple CHECK constraints
* added test for CHECK on bool variable
Enable custom aggregates with multiple parameters to be executed on workers.
#2921 introduces distributed execution of custom aggregates. One of the limitations of this feature is that only aggregate functions with a single aggregation parameter can be pushed to worker nodes. Aim of this change is to remove that limitation and support handling of multi-parameter aggregates.
Resolves: #3997
See also: #2921
Some GUCs support a list of values which is indicated by GUC_LIST_INPUT flag.
When an ALTER ROLE .. SET statement is executed, the new configuration
default for affected users and databases are stored in the
setconfig(text[]) column in a pg_db_role_setting record.
If a GUC that supports a list of values is used in an ALTER ROLE .. SET
statement, we need to split the text into items delimited by commas.
As noted by Talha https://github.com/citusdata/citus/pull/4029#issuecomment-660466972 there was still some sort order flappiness in the test.
The root cause is that sorting on `1::text` sorts on the literal `'1'` which causes sorting to be indeterministic.
This behaviour is consistent with Postgres' behaviour, so no bug on Citus' side.
* use adaptive executor even if task-tracker is set
* Update check-multi-mx tests for adaptive executor
Basically repartition joins are enabled where necessary. For parallel
tests max adaptive executor pool size is decresed to 2, otherwise we
would get too many clients error.
* Update limit_intermediate_size test
It seems that when we use adaptive executor instead of task tracker, we
exceed the intermediate result size less in the test. Therefore updated
the tests accordingly.
* Update multi_router_planner
It seems that there is one problem with multi_router_planner when we use
adaptive executor, we should fix the following error:
+ERROR: relation "authors_range_840010" does not exist
+CONTEXT: while executing command on localhost:57637
* update repartition join tests for check-multi
* update isolation tests for repartitioning
* Error out if shard_replication_factor > 1 with repartitioning
As we are removing the task tracker, we cannot switch to it if
shard_replication_factor > 1. In that case, we simply error out.
* Remove MULTI_EXECUTOR_TASK_TRACKER
* Remove multi_task_tracker_executor
Some utility methods are moved to task_execution_utils.c.
* Remove task tracker protocol methods
* Remove task_tracker.c methods
* remove unused methods from multi_server_executor
* fix style
* remove task tracker specific tests from worker_schedule
* comment out task tracker udf calls in tests
We were using task tracker udfs to test permissions in
multi_multiuser.sql. We should find some other way to test them, then we
should remove the commented out task tracker calls.
* remove task tracker test from follower schedule
* remove task tracker tests from multi mx schedule
* Remove task-tracker specific functions from worker functions
* remove multi task tracker extra schedule
* Remove unused methods from multi physical planner
* remove task_executor_type related things in tests
* remove LoadTuplesIntoTupleStore
* Do initial cleanup for repartition leftovers
During startup, task tracker would call TrackerCleanupJobDirectories and
TrackerCleanupJobSchemas to clean up leftover directories and job
schemas. With adaptive executor, while doing repartitions it is possible
to leak these things as well. We don't retry cleanups, so it is possible
to have leftover in case of errors.
TrackerCleanupJobDirectories is renamed as
RepartitionCleanupJobDirectories since it is repartition specific now,
however TrackerCleanupJobSchemas cannot be used currently because it is
task tracker specific. The thing is that this function is a no-op
currently.
We should add cleaning up intermediate schemas to DoInitialCleanup
method when that problem is solved(We might want to solve it in this PR
as well)
* Revert "remove task tracker tests from multi mx schedule"
This reverts commit 03ecc0a681.
* update multi mx repartition parallel tests
* not error with task_tracker_conninfo_cache_invalidate
* not run 4 repartition queries in parallel
It seems that when we run 4 repartition queries in parallel we get too
many clients error on CI even though we don't get it locally. Our guess
is that, it is because we open/close many connections without doing some
work and postgres has some delay to close the connections. Hence even
though connections are removed from the pg_stat_activity, they might
still not be closed. If the above assumption is correct, it is unlikely
for it to happen in practice because:
- There is some network latency in clusters, so this leaves some times
for connections to be able to close
- Repartition joins return some data and that also leaves some time for
connections to be fully closed.
As we don't get this error in our local, we currently assume that it is
not a bug. Ideally this wouldn't happen when we get rid of the
task-tracker repartition methods because they don't do any pruning and
might be opening more connections than necessary.
If this still gives us "too many clients" error, we can try to increase
the max_connections in our test suite(which is 100 by default).
Also there are different places where this error is given in postgres,
but adding some backtrace it seems that we get this from
ProcessStartupPacket. The backtraces can be found in this link:
https://circleci.com/gh/citusdata/citus/138702
* Set distributePlan->relationIdList when it is needed
It seems that we were setting the distributedPlan->relationIdList after
JobExecutorType is called, which would choose task-tracker if
replication factor > 1 and there is a repartition query. However, it
uses relationIdList to decide if the query has a repartition query, and
since it was not set yet, it would always think it is not a repartition
query and would choose adaptive executor when it should choose
task-tracker.
* use adaptive executor even with shard_replication_factor > 1
It seems that we were already using adaptive executor when
replication_factor > 1. So this commit removes the check.
* remove multi_resowner.c and deprecate some settings
* remove TaskExecution related leftovers
* change deprecated API error message
* not recursively plan single relatition repartition subquery
* recursively plan single relation repartition subquery
* test depreceated task tracker functions
* fix overlapping shard intervals in range-distributed test
* fix error message for citus_metadata_container
* drop task-tracker deprecated functions
* put the implemantation back to worker_cleanup_job_schema_cachesince citus cloud uses it
* drop some functions, add downgrade script
Some deprecated functions are dropped.
Downgrade script is added.
Some gucs are deprecated.
A new guc for repartition joins bucket size is added.
* order by a test to fix flappiness
As reported on #4011https://github.com/citusdata/citus/pull/4011/files#r453804702 some of the tests were flapping due to an indeterministic order for test outputs.
This PR makes the test output ordered for all tests returning non-zero rows.
Needs to be backported to 9.2, 9.3, 9.4
The reason we should use ActiveReadableNodeList instead of ActiveReadableNonCoordinatorNodeList is that if coordinator is added to cluster as a worker, it should be counted as well. Otherwise if there is only coordinator in the cluster, the count will be 0, hence we get a warning.
In MultiTaskTrackerExecute, we should connect to coordinator if it is
added to the cluster because it will also be assigned tasks.
ActiveReadableWorkerNodeList doesn't include coordinator, however if
coordinator is added as a worker, we should also include that while
planning. The current methods are very easily misusable and this
requires a refactoring to make the distinction between methods that
include coordinator and that don't very explicit as they can introduce
subtle/major bugs pretty easily.
We were using ALL_WORKERS TargetWorkerSet while sending temporary schema
creation and cleanup. We(well mostly I) thought that ALL_WORKERS would also include coordinator when it is added as a worker. It turns out that it was FILTERING OUT the coordinator even if it is added as a worker to the cluster.
So to have some context here, in repartitions, for each jobId we create
(at least we were supposed to) a schema in each worker node in the cluster. Then we partition each shard table into some intermediate files, which is called the PARTITION step. So after this partition step each node has some intermediate files having tuples in those nodes. Then we fetch the partition files to necessary worker nodes, which is called the FETCH step. Then from the files we create intermediate tables in the temporarily created schemas, which is called a MERGE step. Then after evaluating the result, we remove the temporary schemas(one for each job ID in each node) and files.
If node 1 has file1, and node 2 has file2 after PARTITION step, it is
enough to either move file1 from node1 to node2 or vice versa. So we
prune one of them.
In the MERGE step, if the schema for a given jobID doesn't exist, the
node tries to use the `public` schema if it is a superuser, which is
actually added for testing in the past.
So when we were not sending schema creation comands for each job ID to
the coordinator(because we were using ALL_WORKERS flag, and it doesn't
include the coordinator), we would basically not have any schemas for
repartitions in the coordinator. The PARTITION step would be executed on
the coordinator (because the tasks are generated in the planner part)
and it wouldn't give us any error because it doesn't have anything to do
with the temporary schemas(that we didn't create). But later two things
would happen:
- If by chance the fetch is pruned on the coordinator side, we the other
nodes would fetch the partitioned files from the coordinator and execute
the query as expected, because it has all the information.
- If the fetch tasks are not pruned in the coordinator, in the MERGE
step, the coordinator would either error out saying that the necessary
schema doesn't exist, or it would try to create the temporary tables
under public schema ( if it is a superuser). But then if we had the same
task ID with different jobID it would fail saying that the table already
exists, which is an error we were getting.
In the first case, the query would work okay, but it would still not do
the cleanup, hence we would leave the partitioned files from the
PARTITION step there. Hence ensure_no_intermediate_data_leak would fail.
To make things more explicit and prevent such bugs in the future,
ALL_WORKERS is named as ALL_NON_COORD_WORKERS. And a new flag to return
all the active nodes is added as ALL_DATA_NODES. For repartition case,
we don't use the only-reference table nodes but this version makes the
code simpler and there shouldn't be any significant performance issue
with that.
DESCRIPTION: Force aliases in deparsing for queries with anonymous column references
Fixes: #3985
The root cause has todo with discrepancies in the query tree we create. I think in the future we should spend some time on categorising all changes we made to ruleutils and see if we can change the data structure `query` we pass to the deparser to have an actual valid postgres query for the deparser to render.
For now the fix is to keep track, besides changing the names of the entries in the target list, also if we have a reference to an anonymous columns. If there are anonymous columns we set the `printaliases` flag to true which forces the deparser to add the aliases.
Static analysis found an issue where we could dereference `NULL`, because
`CreateDummyPlacement` could return `NULL` when there were no workers. This
PR changes it so that it never returns `NULL`, which was intended by
@marcocitus when doing this change: https://github.com/citusdata/citus/pull/3887/files#r438136433
While adding tests for citus on a single node I also added some more basic
tests and it turns out we error out on repartition joins. This has been
present since `shouldhaveshards` was introduced and is not trivial to fix.
So I created a separate issue for this: https://github.com/citusdata/citus/issues/3996
I recently forgot to add tests to a schedule in two of my PRs. One of
these was caught by review, but the other one was not. This adds a
script to causes CI to ensure that each test in the repo is included in
at least one schedule.
Three tests were found that were currently not part of a schedule. This PR
adds those three tests to a schedule as well and it also fixes some small
issues with these tests.
We keep accumulating more and more scripts to flag issues in CI. This is
good, but we are currently missing consistent documentation for them.
This commit moves all these scripts to the `ci` directory and adds some
documentation for all of them in the README. It also makes sure that the
last line of output of a failed script points to this documentation.
It was possible to get an assertion error, if a DML command was
cancelled that opened a connection and then "ROLLBACK TO SAVEPOINT" was
used to continue the transaction. The reason for this was that canceling
the transaction might leave the `claimedExclusively` flag on for (some
of) it's connections.
This caused an assertion failure because `CanUseExistingConnection`
would return false and a new connection would be opened, and then there
would be two connections doing DML for the same placement. Which is
disallowed. That this situation caused an assertion failure instead of
an error, means that without asserts this could possibly result in some
visibility bugs, similar to the ones described
https://github.com/citusdata/citus/issues/3867
This is so we don't need to calculate it twice in
insert_select_executor.c and multi_explain.c, which can
cause discrepancy if an update in one of them is not
reflected in the other site.
* Not set TaskExecution with adaptive executor
Adaptive executor is using a utility method from task tracker for
repartition joins, however adaptive executor doesn't need taskExecution.
It is only used by task tracker. This causes a problem when explain
analyze is used because what taskExecution is pointing to might be
random.
We solve this by not setting taskExecution from adaptive executor. So it
will stay NULL as set by CreateTask.
* use same memory context as task for taskExecution
Co-authored-by: Jelte Fennema <github-tech@jeltef.nl>
We've had two issues with merge conflicts to enterprise in the last week, that
suddenly happened. Because of this CI check this actually blocks all community
PRs from being merged.
This PR tries to improve on the previous script we had, by putting tougher
constraints on when a merge is allowed.
Previously the check would pass in two cases:
1. This PR be merged without conflicts into `enterprise-master`
2. A branch exists with the same name as this PR on enterprise and that can be
merged into `enterprise-master`.
The first case stays the same, but I've changed the second case to require the
following instead:
1. A branch exists on enterprise with the same name as this PR
2. **NEW: This branch contains the the last commit of the community PR branch**
3. This branch can be merged into enterprise-master
This makes sure the enterprise branch is actually up to date and not forgotten about.
If we still get problems with this change, future improvements could be:
1. Check that the PR on enterprise passes CI
2. Check that the PR on enterprise has been approved
3. Require the enterprise PR branch to be merged before merging community.
When we are using hammerdb jobs, the job creates a branch on test
automation, since that branch should be deleted, it would have
`delete_me` prefix, however since the result branch on
release-test-results will have the test automation branch as prefix, it
will also have `delete_me` prefix, which seems a bit confusing.
This PR updates it as citus_github_push
In #3901 the "Data received from worker(s)" sections were added to EXPLAIN
ANALYZE. After merging @pykello posted some review comments. This addresses
those comments as well as fixing a other issues that I found while addressing
them. The things this does:
1. Fix `EXPLAIN ANALYZE EXECUTE p1` to not increase received data on every
execution
2. Fix `EXPLAIN ANALYZE EXECUTE p1(1)` to not return 0 bytes as received data
allways.
3. Move `EXPLAIN ANALYZE` specific logic to `multi_explain.c` from
`adaptive_executor.c`
4. Change naming of new explain sections to `Tuple data received from node(s)`.
Firstly because a task can reference the coordinator too, so "worker(s)" was
incorrect. Secondly to indicate that this is tuple data and not all network
traffic that was performed.
5. Rename `totalReceivedData` in our codebase to `totalReceivedTupleData` to
make it clearer that it's a tuple data counter, not all network traffic.
6. Actually add `binary_protocol` test to `multi_schedule` (woops)
7. Fix a randomly failing test in `local_shard_execution.sql`.
Sometimes isolation tests get stuck in CI and we cannot see why, because
the job is killed by the CI runner. This will instead fail inside make
the testsuite continue, but mark it as a failure like this in the diff
output:
```diff
+isolationtester: canceling step s2-ddl-create-index-concurrently after 5 seconds
step s2-ddl-create-index-concurrently: CREATE INDEX CONCURRENTLY select_append_index ON select_append(id);
+ERROR: CONCURRENTLY-enabled index command failed
```
We should detect blockages very quickly and the queries we run are also
very fast, so 5 seconds should be more than enough to catch any random
slowness. The default from Postgres is 5 minutes, which is waaay to much
for us.
Sadly this does not actually work yet for binary protocol data, because
when doing EXPLAIN ANALYZE we send two commands at the same time. This
means we cannot use `SendRemoteCommandParams`, and thus cannot use the
binary protocol. This can still be useful though when using the text
protocol, to find out that a lot of data is being sent.
Due to the problem described in #3908 we don't cover the tdigest integration (and other extensions) on CI.
Due to this a bug got in the patch due to a change in `EXPLAIN VERBOSE` being merged concurrently with the tdigest integration. This PR fixes the test output that missed the newly added information.
* Insert select with master query
* Use relid to set custom_scan_tlist varno
* Reviews
* Fixes null check
Co-authored-by: Marco Slot <marco.slot@gmail.com>
This can save a lot of data to be sent in some cases, thus improving
performance for which inter query bandwidth is the bottleneck.
There's some issues with enabling this as default, so that's currently not done.
DESCRIPTION: Adds support to partially push down tdigest aggregates
tdigest extensions: https://github.com/tvondra/tdigest
This PR implements the partial pushdown of tdigest calculations when possible. The extension adds a tdigest type which can be combined into the same structure. There are several aggregate functions that can be used to get;
- a quantile
- a list of quantiles
- the quantile of a hypothetical value
- a list of quantiles for a list of hypothetical values
These function can work both on values or tdigest types.
Since we can create tdigest values either by combining them, or based on a group of values we can rewrite the aggregates in such a way that most of the computation gets delegated to the compute on the shards. This both speeds up the percentile calculations because the values don't have to be sorted while at the same time making the transfer size from the shards to the coordinator significantly less.
We still recursively plan some cases, eg:
- INSERTs
- SELECT FOR UPDATE when reference tables in query
- Everything must be same single shard & replication model
We wrap worker tasks in worker_save_query_explain_analyze() so we can fetch
their explain output later by a call worker_last_saved_explain_analyze().
Fixes#3519Fixes#2347Fixes#2613Fixes#621
Implements worker_save_query_explain_analyze and worker_last_saved_explain_analyze.
worker_save_query_explain_analyze executes and returns results of query while
saving its EXPLAIN ANALYZE to be fetched later.
worker_last_saved_explain_analyze returns the saved EXPLAIN ANALYZE result.
Append IF NOT EXISTS to CREATE SERVER commands generated by
pg_get_serverdef_string function when deparsing an existing server
object that a foreign table depends.
With this commit:
You can trigger two types of hammerdb benchmark jobs:
-ch_benchmark (analytical and transactional queries)
-tpcc_benchmark (only transactional queries)
Your branch will be run against `master` branch.
In order to trigger the jobs prepend `ch_benchmark/` or `tpcc_benchmark/` to your branch and push it.
For example if you were running on a feature/improvement branch with name `improve/adaptive_executor`. In order to trigger a tpcc benchmark, you can do the following:
```bash
git checkout improve/adaptive_executor
git checkout -b tpcc_benchmark/improve/adaptive_executor
git push origin tpcc_benchmark/improve/adaptive_executor # the tpcc benchmark job will be triggered.
```
You will see the results in a branch in [https://github.com/citusdata/release-test-results](https://github.com/citusdata/release-test-results).
The branch name will be something like: `delete_me/citusbot_tpcc_benchmark_rg/<date>/<date>`.
The resource groups will be deleted automatically but if the benchmark fails, they won't be deleted(If you don't see the results after a reasonable time, it might mean it failed, you can check the resource usage from portal, if it is almost 0 and you didn't see the results, it means it probably failed). In that case, you will need to delete the resource groups manually from portal, the resource groups are `citusbot_ch_benchmark_rg` and `citusbot_tpcc_benchmark_rg`.
DESCRIPTION: Ignore pruned target list entries in coordinator plan
The postgres planner has the ability to prune target list entries that are proven not used in the output relation. When this happens at the `CitusCustomScan` boundary we need to _not_ return these pruned columns to not upset the rest of the planner.
By using the target list the planner asks us to return we fix issues that lead to Assertion failures, and potentially could be runtime errors when they hit in a production build.
Fixes#3809
* add a job to check if merge to enterprise master would fail
Add a job to check if merge to enterprise master would fail.
The job does the following:
- It checks if there is already a branch with the same name on
enterprise, if so it tries to merge it to enterprise master, if the
merge fails the job fails.
- If the branch doesn't exist on the enterprise, it tries to merge the
current branch to enterprise master, it fails if there is any conflict
while merging.
The motivation is that if a branch on community would create a conflict
on enterprise-master, until we create a PR on enterprise that would
solve this conflict, we won't be able to merge the PR on community. This
way we won't have many conflicts when merging to enterprise master and
the author, who has the most context will be responsible for resolving
the conflict when he has the most context, not after 1 month.
* Improve test suite to be able to easily run locally
* Add documentation on how to resolve conflicts to enterprise master
* Improve enterprise merge script
* Improve merge conflict job README
* Improve merge conflict job README
* Improve merge conflict job README
* Improve merge conflict job README
Co-authored-by: Nils Dijk <nils@citusdata.com>
In the code, we had the assumption that if restriction information
is NULL, it means that we cannot have any disributetd tables in
the subquery.
However, for subqueries in WHERE clause, that is not the case when
the subquery is ANDed with FALSE. In that case, Citus operates
on the originalQuery (which doesn't go through the standard_planner()),
and rely on the restriction information generated by standard_plannner().
As Postgres is smart enough to no generate restriction information for
subqueries ANDed with FALSE, we hit the assertion.
* Not append empty task in ExtractLocalAndRemoteTasks
ExtractLocalAndRemoteTasks extracts the local and remote tasks. If we do
not have a local task the localTaskPlacementList will be NIL, in this
case we should not append anything to local tasks. Previously we would
first check if a task contains a single placement or not, now we first
check if there is any local task before doing anything.
* fix copy of node task
Task node has task query, which might contain a list of strings in its
fields. We were using postgres copyObject for these lists. Postgres
assumes that each element of list will be a node type. If it is not a
node type it will error.
As a solution to that, a new macro is introduced to copy a list of
strings.
Physical planner doesn't support parameters. If the parameters have already
been resolved when the physical planner handling the queries, mark it.
The reason is that the executor is unaware of this, and sends the parameters
along with the worker queries, which fails for composite types.
(See `DissuadePlannerFromUsingPlan()` for the details of paramater resolving)
We currently put the actual error message to the detail part. However,
many drivers don't show detail part.
As connection errors are somehow common, and hard to trace back, can't
we added the detail to the message itself.
In addition to that, we changed "connection error" message, as it
was confusing to the users who think that the error was happening
while connecting to the coordinator. In fact, this error is showing
up when the coordinator fails to connect remote nodes.
* invalidate plan cache in master_update_node
If a plan is cached by postgres but a user uses master_update_node, then
when the plan cache is used for the updated node, they will get the old
nodename/nodepost in the plan. This is because the plan cache doesn't
know about the master_update_node. This could be a problem in prepared
statements or anything that goes into plancache. As a solution the plan
cache is invalidated inside master_update_node.
* add invalidate_inactive_shared_connections test function
We introduce invalidate_inactive_shared_connections udf to be used in
testing. It is possible that a connection count for an inactive node
will be greater than 0 and in that case it will not be removed at the
time of invalidation. However, later we don't have a mechanism to remove
it, which means that it will stay in the hash. For this not to cause a
problem, we use this udf in testing.
* move invalidate_inactive_shared_connections to udfs from test as it will be used in mx
* remove the test udf
* remove the IsInactive check
We initially considered removing entries just before any change to
pg_dist_node. However, that ended-up being very complex and making
MX even more complex.
Instead, we're switching to a simpler solution, where we remove entries
when the counter gets to 0.
With certain workloads, this may have some performance penalty. But, two
notes on that:
- When counter == 0, it implies that the cluster is not busy
- With cached connections, that's not possible
When we call SetTaskQueryString we would set the task type to
TASK_QUERY_TEXT, and some parts of the codebase rely on the fact that if
TASK_QUERY_TEXT is set, the data can be read safely. However if
SetTaskQueryString is called with a NULL taskQueryString this can cause
crashes. In that case taskQueryType will simply be set to
TASK_QUERY_NULL.
DESCRIPTION: Alter role only works for citus managed roles
Alter role was implemented before we implemented good role management that hooks into the object propagation framework. This is a refactor of all alter role commands that have been implemented to
- be on by default
- only work for supported roles
- make the citus extension owner a supported role
Instead of distributing the alter role commands for roles at the beginning of the node activation role it now _only_ executes the alter role commands for all users in all databases and in the current database.
In preparation of full role support small refactors have been done in the deparser.
Earlier tests targeting other roles than the citus extension owner have been either slightly changed or removed to be put back where we have full role support.
Fixes#2549
With this commit, we're introducing a new infrastructure to throttle
connections to the worker nodes. This infrastructure is useful for
multi-shard queries, router queries are have not been affected by this.
The goal is to prevent establishing more than citus.max_shared_pool_size
number of connections per worker node in total, across sessions.
To do that, we've introduced a new connection flag OPTIONAL_CONNECTION.
The idea is that some connections are optional such as the second
(and further connections) for the adaptive executor. A single connection
is enough to finish the distributed execution, the others are useful to
execute the query faster. Thus, they can be consider as optional connections.
When an optional connection is not allowed to the adaptive executor, it
simply skips it and continues the execution with the already established
connections. However, it'll keep retrying to establish optional
connections, in case some slots are open again.
* use local executon when in a transaction block
When we are inside a transaction block, there could be other methods
that need local execution, therefore we will use local execution in a
transaction block.
* update test outputs with transaction block local execution
* add a test to verify we dont leak intermediate schemas
* test that we don't leak intermediate schemas
We have tests to make sure that we don't intermediate any intermediate
files, tables etc but we don't test if we are leaking schemas. It makes
sense to test this as well.
* remove all repartition schemas in case of error
This solution is not an ideal one but it seems to be doing the job.
We should have a more generic solution for the cleanup but it seems that
putting the cleanup in the abort handler is dangerous and it was
crashing.
When the file does not exist, it could mean two different things.
First -- and a lot more common -- case is that a failure happened
in a concurrent backend on the same distributed transaction. And,
one of the backends in that transaction has already been roll
backed, which has already removed the file. If we throw an error
here, the user might see this error instead of the actual error
message. Instead, we prefer to WARN the user and pretend that the
file has no data in it. In the end, the user would see the actual
error message for the failure.
Second, in case of any bugs in intermediate result broadcasts,
we could try to read a non-existing file. That is most likely
to happen during development. Thus, when asserts enabled, we throw
an error instead of WARNING so that the developers cannot miss.
In case we don't care about the tupleStoreState in
ExecuteLocalTaskListExtended, it could be passed as null. In that case
we will get a seg error. This changes it so that a dummy tuple store
will be created when it is null.
Do not use local execution in ExecuteTaskListOutsideTransaction.
As we are going to run the tasks outside transaction, we shouldn't use local execution.
However, there is some problem when using local execution related to
repartition joins, when we solve that problem, we can execute the tasks
coming to this path with local execution.
Also logging the local command is simplified.
normalize job id in worker_hash_partition_table in test outputs.
This is possible whenever we aren't pulling up intermediate rows
We want to do this because this was done in 9.2,
some queries rely on the performance of grouping causing distinct values
This change was introduced when implementing window functions on coordinator
The purpose of null_parameters is to make sure that citus doesn't crash
with null parameters. (The related issue is #3493.) The logs in this
file are not that important and they are flaky. The flakiness is related
to postgres part as well so it is hard to reproduce them. Therefore it
makes sense to decrease the log level.
look at sent commands to simplify complex logic in vacuum test
also normalize connection id as that can differ when we don't have to
choose a specific connection.
It seems that sometimes the pruning is deferred and sometimes not with
this statement. What we care in this test is to see that it doesn't
crash. I think we don't care about the log statement for this line. So
it makes sense to not log this statement, and care about the result.
ExecuteTaskListExtended is the common method for different codepaths,
and instead of writing separate local execution logics in different
codepaths, it makes more sense to have the logic here. We still need to
do some refactoring, this is an initial step.
After this commit, we can run create shard commands locally. There is a
special case with shard creation commands. A create shard command might
have a concatenated query string, however local execution did not know
how to execute a task with multiple query strings. This is also
implemented in this commit. We go over each query in the concatenated
query string and plan/execute them one by one.
A more clean solution to this would be to make sure that each task has a
single query. We currently cannot do that because we need to ensure the
task dependencies. However, it would make sense to do that at some point
and it would simplify the code a lot.
It seems that one of the deadlock detection tests fails way too often in
our CI. The difference is only ordering. Currently it seems that it is a
good idea to disable this test for the sake of development.
In PostgreSQL, user defaults for config parameters can be changed by
ALTER ROLE .. SET statements. We wish to propagate those defaults
accross the Citus cluster so that the behaviour will be similar in
different workers.
The defaults can either be set in a specific database, or the whole
cluster, similarly they can be set for a single role or all roles.
We propagate the ALTER ROLE .. SET if all the conditions below are met:
- The query affects the current database, or all databases
- The user is already created in worker nodes
Some refactoring:
Consolidate expression which decides whether GROUP BY/HAVING are pushed down
Rename early pullUpIntermediateRows to hasNonDistributableAggregates
Create WorkerColumnName to handle formatting WORKER_COLUMN_FORMAT
Ignore NULL StringInfo pointers to SafeToPushdownWindowFunction
Fix bug where SubqueryPushdownMultiNodeTree mutates supplied Query,
SafeToPushdownWindowFunction requires the original query as it relies on rtable
We cache connections between nodes in our connection management code.
This is good for speed. For security this can be a problem though. If
the user changes settings related to TLS encryption they want those to
be applied to future queries. This is especially important when they did
not have TLS enabled before and now they want to enable it. This can
normally be achieved by changing citus.node_conninfo. However, because
connections are not reopened there will still be old connections that
might not be encrypted at all.
This commit changes that by marking all connections to be shutdown at
the end of their current transaction. This way running transactions will
succeed, even if placement requires connections to be reused for this
transaction. But after this transaction completes any future statements
will use a connection created with the new connection options.
If a connection is requested and a connection is found that is marked
for shutdown, then we don't return this connection. Instead a new one is
created. This is needed to make sure that if there are no running
transactions, then the next statement will not use an old cached
connection, since connections are only actually shutdown at the end of a
transaction.
It seems that when logging is enabled we should not run local shard copy
in parallel with other tests. The reason is that it adds coordinator for
reference tables and if the parallel test creates a schema before this
test is run, the schema will be logged. So it is not deterministic.
If two tables have the same distribution column type, we implicitly
colocate them. This is useful since colocation has a big performance
impact in most applications.
When a table is rebalanced, all of the colocated tables are also
rebalanced. If table A and table B are colocated and we want to
rebalance table A, table B will also be rebalanced. We need replica
identity so that logical replication can replicate updates and deletes
during rebalancing. If table B does not have a replica identity we
error out.
A solution to this is to introduce a UDF so that colocation can be
updated. The remaining tables in the colocation group will stay
colocated. For example if table A, B and C are colocated and after
updating table B's colocations, table A and table C stay colocated.
The "updating colocation" step does not move any data around, it only
updated pg_dist_partition and pg_dist_colocation tables. Specifically it
creates a new colocation group for the table and updates the entry in
pg_dist_partition while invalidating any cache.
We're getting a lot of random failures on CI regarding connection errors. This
works around that by not running that create lots of connections in parallel.
We can use local copy in INSERT..SELECT, so the check that disables
local execution is removed.
Also a test for local copy where the data size >
LOCAL_COPY_FLUSH_THRESHOLD is added.
use local execution with insert..select
If current transaction is connected to local group we should not use
local copy, because we might not see some of the changes that are made
over the connection to the local group.
DESCRIPTION: Fix left join shard pruning in pushdown planner
Due to #2481 which moves outer join planning through the pushdown planner we caused a regression on the shard pruning behaviour for outer joins.
In the pushdown planner we make a union of the placement groups for all shards accessed by a query based on the filters we see during planning. Unfortunately implicit filters for left joins are not available during this part. This causes the inner part of an outer join to not prune any shards away. When we take the union of the placement groups it shows the behaviour of not having any shards pruned.
Since the inner part of an outer query will not return any rows if the outer part does not contain any rows we have observed we do not have to add the shard intervals of the inner part of an outer query to the list of shard intervals to query.
Fixes: #3512
* reimplement ExecuteUtilityTaskListWithoutResults for local utility command execution
* introduce new functions for local execution of utility commands
* change ErrorIfTransactionAccessedPlacementsLocally logic for local utility command execution
* enable local execution for TRUNCATE command on distributed & reference tables
* update existing tests for local utility command execution
* enable local execution for DDL commands on distributed & reference tables
* enable local execution for DROP command on distributed & reference tables
* add normalization rules for cascaded commands
* add new tests for local utility command execution
* Add third column to master_evaluation_modify table
It was already added in some tests, but now make it globally
applicable to the test file.
* Add third column to master_evaluation_select table
As we'll use the column in some tests
* Add modify regression tests
For the combinations of: local/remote, router/fast-path:
- Distribution key is a const.
- Contains a function
- A column which is not dist. key is parametrized
* Add select regression tests
For the combinations of: local/remote, router/fast-path:
- Distribution key is a const.
- Contains a function
- A column which is not dist. key is parametrized
* Make some tests consistent to check-base
As we don't have any other executors to run them.
These schedules were added when we had both the adaptive executor and
the real-time/router executors in the code. Since we only have adaptive
executor anymore, we can remove these.
Add failing tests, make changes to avoid crashes at least
Fix HAVING subquery pushdown ignoring reference table only subqueries,
also include HAVING in recursive planning
Given that we have a function IsDistributedTable which includes reference tables,
it seems best to have IsDistributedTableRTE & QueryContainsDistributedTableRTE
reflect that they do not include reference tables in their check
Similarly SublinkList's name should reflect that it only scans WHERE
contain_agg_clause asserts that we don't have SubLinks,
use contain_aggs_of_level as suggested by pg sourcecode
Before this commit, we considered !ContainsRecurringRTE() enough
for NotContainsOnlyRecurringTuples. However, instead, we can check
for existince of any distributed table.
DESCRIPTION: Fixes a bug that causes wrong results with complex outer joins
There are 2 problems with our early exit strategy that this commit fixes:
1- When we decide that a subplan results are sent to all worker nodes,
we used to skip traversing the whole distributed plan, instead of
skipping only the subplan.
2- We used to consider all available nodes in the cluster (secondaries
and inactive nodes as well as active primaries) when deciding on early
exit strategy. This resulted in failures to early exit when there are
secondaries or inactive nodes.
DESCRIPTION: satisfy static analysis tool for a nullptr dereference
During the static analysis project on the codebase this code has been flagged as having the potential for a null pointer dereference. Funnily enough the author had already made a comment of it in the code this was not possible due to us setting the schema name before we pass in the statement. If we want to reuse this code in a later setting this comment might not always apply and we could actually run into null pointer dereference.
This patch changes a bit of the code around to first of all make sure there is no NULL pointer dereference in this code anymore.
Secondly we allow for better deparsing by setting and adhering to the `if_not_exists` flag on the statement.
And finally add support for all syntax described in the documentation of postgres (FROM was missing).
If the generated column does not come at the end of the column list,
columnNameList doesn't line up with the column indexes. Seek past
CREATE TABLE test_table (
test_id int PRIMARY KEY,
gen_n int GENERATED ALWAYS AS (1) STORED,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
SELECT create_distributed_table('test_table', 'test_id');
Would raise ERROR: cannot cast 23 to 1184
Semmle reported quite some places where we use a value that could be NULL. Most of these are not actually a real issue, but better to be on the safe side with these things and make the static analysis happy.
DESCRIPTION: Replace the query planner for the coordinator part with the postgres planner
Closes#2761
Citus had a simple rule based planner for the query executed on the query coordinator. This planner grew over time with the addigion of SQL support till it was getting close to the functionality of the postgres planner. Except the code was brittle and its complexity rose which made it hard to add new SQL support.
Given its resemblance with the postgres planner it was a long outstanding wish to replace our hand crafted planner with the well supported postgres planner. This patch replaces our planner with a call to postgres' planner.
Due to the functionality of the postgres planner we needed to support both projections and filters/quals on the citus custom scan node. When a sort operation is planned above the custom scan it might require fields to be reordered in the custom scan before returning the tuple (projection). The postgres planner assumes every custom scan node implements projections. Because we controlled the plan that was created we prevented reordering in the custom scan and never had implemented it before.
A same optimisation applies to having clauses that could have been where clauses. Instead of applying the filter as a having on the aggregate it will push it down into the plan which could reach a custom scan node.
For both filters and projections we have implemented them when tuples are read from the tuple store. If no projections or filters are required it will directly return the tuple from the tuple store. Otherwise it will loop tuples from the tuple store through the filter and projection until a tuple is found and returned.
Besides filters being pushed down a side effect of having quals that could have been a where clause is that a call to read intermediate result could be called before the first tuple is fetched from the custom scan. This failed because the intermediate result would only be pulled to the coordinator on the first tuple fetch. To overcome this problem we do run the distributed subplans now before we run the postgres executor. This ensures the intermediate result is present on the coordinator in time. We do account for total time instrumentation by removing the instrumentation before handing control to the psotgres executor and update the timings our self.
For future SQL support it is enough to create a valid query structure for the part of the query to be executed on the query coordinating node. As a utility we do serialise and print the query at debug level4 for engineers to inspect what kind of query is being planned on the query coordinator.
- Stop the daemon when citus extension is dropped
- Bail on maintenance daemon startup if myDbData is started with a non-zero pid
- Stop maintenance daemon from spawning itself
- Don't use postgres die, just wrap proc_exit(0)
- Assert(myDbData->workerPid == MyProcPid)
The two issues were that multiple daemons could be running for a database,
or that a daemon would be leftover after DROP EXTENSION citus
When using --allow-group-access option from initdb our keys and
certificates would be created with 0640 permissions. Which is a pretty
serious security issue: This changes that. This would not be exploitable
though, since postgres would not actually enable SSL and would output
the following message in the logs:
```
DETAIL: File must have permissions u=rw (0600) or less if owned by the database user, or permissions u=rw,g=r (0640) or less if owned by root.
```
Since citus still expected the cluster to have SSL enabled handshakes
between workers and coordinator would fail. So instead of a security
issue the cluster would simply be unusable.
Previously a limitation in the shard pruning logic caused multi distribution value queries to always go into all the shards/workers whenever query also used OR conditions in WHERE clause.
Related to https://github.com/citusdata/citus/issues/2593 and https://github.com/citusdata/citus/issues/1537
There was no good workaround for this limitation. The limitation caused quite a bit of overhead with simple queries being sent to all workers/shards (especially with setups having lot of workers/shards).
An example of a previous plan which was inadequately pruned:
```
EXPLAIN SELECT count(*) FROM orders_hash_partitioned
WHERE (o_orderkey IN (1,2)) AND (o_custkey = 11 OR o_custkey = 22);
QUERY PLAN
---------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Aggregate (cost=13.68..13.69 rows=1 width=8)
-> Seq Scan on orders_hash_partitioned_630000 orders_hash_partitioned (cost=0.00..13.68 rows=1 width=0)
Filter: ((o_orderkey = ANY ('{1,2}'::integer[])) AND ((o_custkey = 11) OR (o_custkey = 22)))
(9 rows)
```
After this commit the task count is what one would expect from the query defining multiple distinct values for the distribution column:
```
EXPLAIN SELECT count(*) FROM orders_hash_partitioned
WHERE (o_orderkey IN (1,2)) AND (o_custkey = 11 OR o_custkey = 22);
QUERY PLAN
---------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0)
Task Count: 2
Tasks Shown: One of 2
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Aggregate (cost=13.68..13.69 rows=1 width=8)
-> Seq Scan on orders_hash_partitioned_630000 orders_hash_partitioned (cost=0.00..13.68 rows=1 width=0)
Filter: ((o_orderkey = ANY ('{1,2}'::integer[])) AND ((o_custkey = 11) OR (o_custkey = 22)))
(9 rows)
```
"Core" of the pruning logic works as previously where it uses `PrunableInstances` to queue ORable valid constraints for shard pruning.
The difference is that now we build a compact internal representation of the query expression tree with PruningTreeNodes before actual shard pruning is run.
Pruning tree nodes represent boolean operators and the associated constraints of it. This internal format allows us to have compact representation of the query WHERE clauses which allows "core" pruning logic to work with OR-clauses correctly.
For example query having
`WHERE (o_orderkey IN (1,2)) AND (o_custkey=11 OR (o_shippriority > 1 AND o_shippriority < 10))`
gets transformed into:
1. AND(o_orderkey IN (1,2), OR(X, AND(X, X)))
2. AND(o_orderkey IN (1,2), OR(X, X))
3. AND(o_orderkey IN (1,2), X)
Here X is any set of unknown condition(s) for shard pruning.
This allow the final shard pruning to correctly recognize that shard pruning is done with the valid condition of `o_orderkey IN (1,2)`.
Another example with unprunable condition in query
`WHERE (o_orderkey IN (1,2)) OR (o_custkey=11 AND o_custkey=22)`
gets transformed into:
1. OR(o_orderkey IN (1,2), AND(X, X))
2. OR(o_orderkey IN (1,2), X)
Which is recognized as unprunable due to the OR condition between distribution column and unknown constraint -> goes to all shards.
Issue https://github.com/citusdata/citus/issues/1537 originally suggested transforming the query conditions into a full disjunctive normal form (DNF),
but this process of transforming into DNF is quite a heavy operation. It may "blow up" into a really large DNF form with complex queries having non trivial `WHERE` clauses.
I think the logic for shard pruning could be simplified further but I decided to leave the "core" of the shard pruning untouched.
On worker 2 it was waiting for dustbunnies_990001 to be
vacuumed/analyzed. This table doesn't actually exist, so that never
happend. Now it waits for the correct table and throws an error if it
waits more than 10 seconds.
The root of the problem is that, standard_planner() converts the following qual
```
{OPEXPR
:opno 98
:opfuncid 67
:opresulttype 16
:opretset false
:opcollid 0
:inputcollid 100
:args (
{VAR
:varno 1
:varattno 1
:vartype 25
:vartypmod -1
:varcollid 100
:varlevelsup 0
:varnoold 1
:varoattno 1
:location 45
}
{CONST
:consttype 25
:consttypmod -1
:constcollid 100
:constlen -1
:constbyval false
:constisnull true
:location 51
:constvalue <>
}
)
:location 49
}
```
To
```
(
{CONST
:consttype 16
:consttypmod -1
:constcollid 0
:constlen 1
:constbyval true
:constisnull true
:location -1
:constvalue <>
}
)
```
So, Citus doesn't deal with NULL values in real-time or non-fast path router queries.
And, in the FastPathRouter planner, we check constisnull in DistKeyInSimpleOpExpression().
However, in deferred pruning case, we do not check for isnull for const.
Thus, the fix consists of two parts:
- Let PruneShards() not crash when NULL parameter is passed
- For deferred shard pruning in fast-path queries, explicitly check that we have CONST which is not NULL
Mark existing objects that are not included in distributed object infrastructure
in older versions of Citus (but now should be) as distributed, after updating
Citus successfully.
DESCRIPTION: Fix unnecessary repartition on joins with more than 4 tables
In 9.1 we have introduced support for all CH-benCHmark queries by widening our definitions of joins to include joins with expressions in them. This had the undesired side effect of Q5 regressing on its plan by implementing a repartition join.
It turned out this regression was not directly related to widening of the join clause, nor the schema employed by CH-benCHmark. Instead it had to do with 4 or more tables being joined in a chain. A chain meaning:
```sql
SELECT * FROM a,b,c,d WHERE a.part = b.part AND b.part = c.part AND ....
```
Due to how our join order planner was implemented it would only keep track of 1 of the partition columns when comparing if the join could be executed locally. This manifested in a join chain of 4 tables to _always_ be executed as a repartition join. 3 tables joined in a chain would have the middle table shared by the two outer tables causing the local join possibility to be found.
With this patch we keep a unique list (or set) of all partition columns participating in the join. When a candidate table is checked for a possibility to execute a local join it will check if there is any partition column in that set that matches an equality join clause on the partition column of the candidate table.
By taking into account all partition columns in the left relation it will now find the local join path on >= 4 tables joined in a chain.
fixes: #3276
For example, a PARAM might reside inside a function just because
of a casting of a type such as the follows:
```
{FUNCEXPR
:funcid 1740
:funcresulttype 1700
:funcretset false
:funcvariadic false
:funcformat 2
:funccollid 0
:inputcollid 0
:args (
{PARAM
:paramkind 0
:paramid 15
:paramtype 23
:paramtypmod -1
:paramcollid 0
:location 356
}
)
```
We should recursively check the expression before bailing out.
Previously we only prevented AVG from being pushed down, but this is incorrect:
- array_agg, while somewhat non sensical to order by, will potentially be missing values
- combinefunc aggregation will raise errors about cstrings not being comparable (while we also can't know if the aggregate is commutative)
This commit limits approximating LIMIT pushdown when ordering by aggregates to:
min, max, sum, count, bit_and, bit_or, every, any
Which means of those we previously supported, we now exclude:
avg, array_agg, jsonb_agg, jsonb_object_agg, json_agg, json_object_agg, hll_add, hll_union, topn_add, topn_union
Previously, the logic for evaluting the functions and the parameters
were the same. That ended-up evaluting the functions inaccurately
on the coordinator. Instead, split the function evaluation logic
from parameter evalution logic.
Previously, we've identified the usedSubPlans by only looking
to the subPlanId.
With this commit, we're expanding it to also include information
on the location of the subPlan.
This is useful to distinguish the cases where the subPlan is used
either on only HAVING or both HAVING and any other part of the query.
First, diff is updated to not update the files in-place
For some reason diff is being called multiple times,
so $file1.unmodified becomes normalized on second invocation
Secondly, diff-filter updates output to come from the unmodified version
Normalization is serving two purposes:
- avoid diff noise in regressions
- avoid diff noise in commits when expected result is updated
The first purpose only wants to reduce the lines which diff registers,
whereas the second wants those changes to be committed
* Update shardPlacement->nodeId to uint
As the source of the shardPlacement->nodeId is always workerNode->nodeId,
and that is uint32.
We had this hack because of: 0ea4e52df5 (r266421409)
And, that is gone with: 90056f7d3c (diff-c532177d74c72d3f0e7cd10e448ab3c6L1123)
So, we're safe to do it now.
* Relax the restrictions on using the local execution
Previously, whenever any local execution happens, we disabled further
commands to do any remote queries. The basic motivation for doing that
is to prevent any accesses in the same transaction block to access the
same placements over multiple sessions: one is local session the other
is remote session to the same placement.
However, the current implementation does not distinguish local accesses
being to a placement or not. For example, we could have local accesses
that only touches intermediate results. In that case, we should not
implement the same restrictions as they become useless.
So, this is a pre-requisite for executing the intermediate result only
queries locally.
* Update the error messages
As the underlying implementation has changed, reflect it in the error
messages.
* Keep track of connections to local node
With this commit, we're adding infrastructure to track if any connection
to the same local host is done or not.
The main motivation for doing this is that we've previously were more
conservative about not choosing local execution. Simply, we disallowed
local execution if any connection to any remote node is done. However,
if we want to use local execution for intermediate result only queries,
this'd be annoying because we expect all queries to touch remote node
before the final query.
Note that this approach is still limiting in Citus MX case, but for now
we can ignore that.
* Formalize the concept of Local Node
Also some minor refactoring while creating the dummy placement
* Write intermediate results locally when the results are only needed locally
Before this commit, Citus used to always broadcast all the intermediate
results to remote nodes. However, it is possible to skip pushing
the results to remote nodes always.
There are two notable cases for doing that:
(a) When the query consists of only intermediate results
(b) When the query is a zero shard query
In both of the above cases, we don't need to access any data on the shards. So,
it is a valuable optimization to skip pushing the results to remote nodes.
The pattern mentioned in (a) is actually a common patterns that Citus users
use in practice. For example, if you have the following query:
WITH cte_1 AS (...), cte_2 AS (....), ... cte_n (...)
SELECT ... FROM cte_1 JOIN cte_2 .... JOIN cte_n ...;
The final query could be operating only on intermediate results. With this patch,
the intermediate results of the ctes are not unnecessarily pushed to remote
nodes.
* Add specific regression tests
As there are edge cases in Citus MX and with round-robin policy,
use the same queries on those cases as well.
* Fix failure tests
By forcing not to use local execution for intermediate results since
all the tests expects the results to be pushed remotely.
* Fix flaky test
* Apply code-review feedback
Mostly style changes
* Limit the max value of pg_dist_node_seq to reserve for internal use
This can helpful in guiding us where to look when this test fails.
For example, if the result file has repartitioned_results_ prefix,
then we need to look into repartitioned insert/select. Otherwise
it is probably a CTE or a subquery.
When creating a new distributed table. The shards would colocate with shards
with SHARD_STATE_TO_DELETE (shardstate = 4). This means if that state was
because of a shard move the new shard would be created on two nodes and it
would not get deleted since it's shard state would be 1.
Comment from code:
/*
* We had to implement this hack because on Postgres11 and below, the originalQuery
* and the query would have significant differences in terms of CTEs where CTEs
* would not be inlined on the query (as standard_planner() wouldn't inline CTEs
* on PG 11 and below).
*
* Instead, we prefer to pass the inlined query to the distributed planning. We rely
* on the fact that the query includes subqueries, and it'd definitely go through
* query pushdown planning. During query pushdown planning, the only relevant query
* tree is the original query.
*/
Deparsing and parsing a query can be heavy on CPU. When locally executing
the query we don't need to do this in theory most of the time.
This PR is the first step in allowing to skip deparsing and parsing
the query in these cases, by lazily creating the query string and
storing the query in the task. Future commits will make use of this and
not deparse and parse the query anymore, but use the one from the task
directly.
This is purely to enable better performance with prepared statements.
Before this commit, the fast path queries with prepared statements
where the distribution key includes a parameter always went through
distributed planning. After this change, we only go through distributed
planning on the first 5 executions.
DESCRIPTION: Fixes a problem when adding a new node due to tables referenced in a functions body
Fixes#3378
It was reported that `master_add_node` would fail if a distributed function has a table name referenced in its declare section of the body. By default postgres validates the body of a function on creation. This is not a problem in the normal case as tables are replicated to the workers when we distribute functions.
However when a new node is added we first create dependencies on the workers before we try to create any tables, and the original tables get created out of bound when the metadata gets synced to the new node. This causes the function body validator to raise an error the table is not on the worker.
To mitigate this issue we set `check_function_bodies` to `off` right before we are creating the function.
The added test shows this does resolve the issue. (issue can be reproduced on the commit without the fix)
In this commit, we're introducing a way to prevent CTE inlining via a GUC.
The GUC is used in all the tests where PG 11 and PG 12 tests would diverge
otherwise.
Note that, in PG 12, the restriction information for CTEs are generated. It
means that for some queries involving CTEs, Citus planner (router planner/
pushdown planner) may behave differently. So, via the GUC, we prevent
tests to diverge on PG 11 vs PG 12.
When we drop PG 11 support, we should get rid of the GUC, and mark
relevant ctes as MATERIALIZED, which does the same thing.
These set of tests has changed in both PG 11 and PG 12.
The changes are only about CTE inlining kicking in both
versions, and yielding the exact same distributed planning.
With this commit, we're adding the specific tests for CTE inlining.
The test has a different output file for pg 11, because as mentioned
in the previous commits, PG 12 generates more restriction information
for CTEs.
In two places I've made code more straight forward by using ROUTINE in our own codegen
Two changes which may seem extraneous:
AppendFunctionName was updated to not use pg_get_function_identity_arguments.
This is because that function includes ORDER BY when printing an aggregate like my_rank.
While ALTER AGGREGATE my_rank(x "any" ORDER BY y "any") is accepted by postgres,
ALTER ROUTINE my_rank(x "any" ORDER BY y "any") is not.
Tests were updated to use macaddr over integer. Using integer is flaky, our logic
could sometimes end up on tables like users_table. I originally wanted to use money,
but money isn't hashable.
Fixes#3331
In #2389, we've implemented support for partitioned tables with rep > 1.
The implementation is limiting the use of modification queries on the
partitions. In fact, we error out when any partition is modified via
EnsurePartitionTableNotReplicated().
However, we seem to forgot an important case, where the parent table's
partition is marked as INVALID. In that case, at least one of the partition
becomes INVALID. However, we do not mark partitions as INVALID ever.
If the user queries the partition table directly, Citus could happily send
the query to INVALID placements -- which are not marked as INVALID.
This PR fixes it by marking the placements of the partitions as INVALID
as well.
The shard placement repair logic already re-creates all the partitions,
so should be fine in that front.
* WIP
* wip
* add basic logic to run a single job with repartioning joins with adaptive executor
* fix some warnings and return in ExecuteDependedTasks if there is none
* Add the logic to run depended jobs in adaptive executor
The execution of depended tasks logic is changed. With the current
logic:
- All tasks are created from the top level task list.
- At one iteration:
- CurTasks whose dependencies are executed are found.
- CurTasks are executed in parallel with adapter executor main
logic.
- The iteration is repeated until all tasks are completed.
* Separate adaptive executor repartioning logic
* Remove duplicate parts
* cleanup directories and schemas
* add basic repartion tests for adaptive executor
* Use the first placement to fetch data
In task tracker, when there are replicas, we try to fetch from a replica
for which a map task is succeeded. TaskExecution is used for this,
however TaskExecution is not used in adaptive executor. So we cannot use
the same thing as task tracker.
Since adaptive executor fails when a map task fails (There is no retry
logic yet). We know that if we try to execute a fetch task, all of its
map tasks already succeeded, so we can just use the first one to fetch
from.
* fix clean directories logic
* do not change the search path while creating a udf
* Enable repartition joins with adaptive executor with only enable_reparitition_joins guc
* Add comments to adaptive_executor_repartition
* dont run adaptive executor repartition test in paralle with other tests
* execute cleanup only in the top level execution
* do cleanup only in the top level ezecution
* not begin a transaction if repartition query is used
* use new connections for repartititon specific queries
New connections are opened to send repartition specific queries. The
opened connections will be closed at the FinishDistributedExecution.
While sending repartition queries no transaction is begun so that
we can see all changes.
* error if a modification was done prior to repartition execution
* not start a transaction if a repartition query and sql task, and clean temporary files and schemas at each subplan level
* fix cleanup logic
* update tests
* add missing function comments
* add test for transaction with DDL before repartition query
* do not close repartition connections in adaptive executor
* rollback instead of commit in repartition join test
* use close connection instead of shutdown connection
* remove unnecesary connection list, ensure schema owner before removing directory
* rename ExecuteTaskListRepartition
* put fetch query string in planner not executor as we currently support only replication factor = 1 with adaptive executor and repartition query and we know the query string in the planner phase in that case
* split adaptive executor repartition to DAG execution logic and repartition logic
* apply review items
* apply review items
* use an enum for remote transaction state and fix cleanup for repartition
* add outside transaction flag to find connections that are unclaimed instead of always opening a new transaction
* fix style
* wip
* rename removejobdir to partition cleanup
* do not close connections at the end of repartition queries
* do repartition cleanup in pg catch
* apply review items
* decide whether to use transaction or not at execution creation
* rename isOutsideTransaction and add missing comment
* not error in pg catch while doing cleanup
* use replication factor of the creation time, not current time to decide if task tracker should be chosen
* apply review items
* apply review items
* apply review item
Currently in mx isolation tests the setup is the same except the creation of tables. Isolation framework lets us define multiple `setup` stages, therefore I thought that we can put the `mx_setup` to one file and prepend this prior to running tests.
How the structure works:
- cpp is used before running isolation tests to preprocess spec files. This way we can include any file we want to. Currently this is used to include mx common part.
- spec files are put to `/build/specs` for clear separation between generated files and template files
- a symbolic link is created for `/expected` in `build/expected/`.
- when running isolation tests, as the `inputdir`, `build` is passed so it runs the spec files from `build/specs` and checks the expected output from `build/expected`.
`/specs` is renamed as `/spec` because postgres first look at the `specs` file under current directory, so this is renamed to avoid that since we are running the isolation tests from `build/specs` now.
Note: now we use `//` instead of `#` in comments in spec files, because cpp interprets `#` as a directive and it ignores `//`.
Postgres keeps track of recursive CTEs in the queryTree in two ways:
- queryTree->hasRecursive is set to true, whenever a RECURSIVE CTE
is used in the SQL. Citus checks for it
- If the CTE is actually a recursive one (a.k.a., references itself)
Postgres marks CommonTableExpr->cterecursive as true as well
The tests that are changed in the PR doesn't cover (b), and this becomes
an issue with CTE inlining (#3161). In that case, Citus/Postgres can inline
such CTEs, and the queries works with Citus.
However, this tests intend to check if there is any recursive CTE in the queryTree.
So, we're actually making the CTEs recursive CTEs by referring itself.
We'll add cases where a recursive CTE works by inlining in #3161.
Use partition column's collation for range distributed tables
Don't allow non deterministic collations for hash distributed tables
CoPartitionedTables: don't compare unequal types
Test ALTER ROLE doesn't deadlock when coordinator added, or propagate from mx workers
Consolidate wait_until_metadata_sync & verify_metadata to multi_test_helpers
Previously,
- we'd push down ORDER BY, but this doesn't order intermediate results between workers
- we'd keep FILTER on master aggregate, which would raise an error about unexpected cstrings
Support for ARRAY[] expressions is limited to having a consistent shape,
eg ARRAY[(int,text),(int,text)] as opposed to ARRAY[(int,text),(float,text)] or ARRAY[(int,text),(int,text,float)]