With #4338, the executor is smart enough to failover to
local node if there is not enough space in max_connections
for remote connections.
For COPY, the logic is different. With #4034, we made COPY
work with the adaptive connection management slightly
differently. The cause of the difference is that COPY doesn't
know which placements are going to be accessed hence requires
to get connections up-front.
Similarly, COPY decides to use local execution up-front.
With this commit, we change the logic for COPY on local nodes:
Try to reserve a connection to local host. This logic follows
the same logic (e.g., citus.local_shared_pool_size) as the
executor because COPY also relies on TryToIncrementSharedConnectionCounter().
If reservation to local node fails, switch to local execution
Apart from this, if local execution is disabled, we follow the
exact same logic for multi-node Citus. It means that if we are
out of the connection, we'd give an error.
It seems that we were not considering the case where coordinator was
added to the cluster as a worker in the optimization of intermediate
results.
This could lead to errors when coordinator was added as a worker.
pg_get_tableschemadef_string doesn't know how to deparse identity
columns so we cannot reflect those columns when creating table
from scratch. For this reason, we don't allow using alter_table udfs
with tables having any identity cols.
pg_get_tableschemadef_string doesn't know how to deparse identity
columns so we cannot reflect those columns when creating shell
relation.
For this reason, we don't allow adding local tables -having identity cols-
to metadata.
Postgres doesn't allow inserting into columns having GENERATED ALWAYS
AS (...) STORED expressions.
For this reason, when executing undistribute_table or an alter_* udf,
we should skip copying such columns.
This is not bad since Postgres would already generate such columns.
Enables an overall plan to be parallel (e.g. over a partition
hierarchy), even though an individual ColumnarScan is not
parallel-aware.
Co-authored-by: Jeff Davis <jefdavi@microsoft.com>
Previously, if columnar.enable_custom_scan was false, parallel paths
could remain, leading to an unexpected error.
Also, ensure that cheapest_parameterized_paths is cleared if a custom
scan is used.
Co-authored-by: Jeff Davis <jefdavi@microsoft.com>
When finding columns owning sequences, we shouldn't rely on atthasdef
since it might be true when column has GENERATED ALWAYS AS (...)
STORED expression.
* Fix partition column index issue
We send column names to worker_hash/range_partition_table methods, and
in these methods we check the column name index from tuple descriptor.
Then this index is used to decide the bucket that the current row will
be sent for the repartition.
This becomes a problem when there are the same column names in the
tupleDescriptor. Then we can choose the wrong index. Hence the
partitioned data will be put to wrong workers. Then the result could
miss some data because workers might contain different range of data.
An example:
TupleDescriptor contains "trip_id", "car_id", "car_id" for one table.
It contains only "car_id" for the other table. And assuming that the
tables will be partitioned by car_id, it is not certain what should be
used for deciding the bucket number for the first table. Assuming value
2 goes to bucket 2 and value 3 goes to bucket 3, it is not certain which
bucket "1 2 3" (trip_id, car_id, car_id) row will go to.
As a solution we send the index of partition column in targetList
instead of the column name.
The old API is kept so that if workers upgrade work, it still works
(though it will have the same bug)
* Use the same method so that backporting is easier
Fixing a division by zero in the cost calculations for scanning a columnar table.
Due to how the columns in a columnar table are counted an empty table would result in a division by zero. Instead this patch keeps the column selection ratio on zero when this happens, resulting in an accurate cost of zero pages to scan a columnar table.
fixes#4589
* Make undistribute_table() and citus_create_local_table() work with columnar
* Rename and use LocallyExecuteUtilityTask for UDF check
* Remove 'local' references in ExecuteUtilityCommand
/*
* Creating Citus local tables relies on functions that accesses
* shards locally (e.g., ExecuteAndLogDDLCommand()). As long as
* we don't teach those functions to access shards remotely, we
* cannot relax this check.
*/
The reason behind skipping postgres tables is that we support
foreign keys between postgres tables and reference tables
(without converting postgres tables to citus local tables)
when enable_local_reference_table_foreign_keys is false or
when coordinator is not added to metadata.
When enabled any foreign keys between local tables and reference
tables supported by converting the local table to a citus local
table.
When the coordinator is not in the metadata, the logic is disabled
as foreign keys are not allowed in this configuration.
If relation is not involved in any foreign key relationships,
foreign key graph would not return any relations for given
relationId as expected.
But even if it's the case, we should still undistribute the table
itself.
* Stronger check for triggers on columnar tables (#4493).
Previously, we used a simple ProcessUtility_hook. Change to use an
object_access_hook instead.
* Replace alter_table_set_access_method test on partition with foreign key
Co-authored-by: Jeff Davis <jefdavi@microsoft.com>
Co-authored-by: Marco Slot <marco.slot@gmail.com>
With citus shard helper view, we can easily see:
- where each shard is, which node, which port
- what kind of table it belongs to
- its size
With such a view, we can see shards that have a size bigger than some
value, which could be useful. Also debugging can be easier in production
as well with this view.
Fetch shards in one go per node
The previous implementation was slow because it would do a lot of round
trips, one per shard to be exact. Hence it is improved so that we fetch
all the shard_name, shard-size pairs per node in one go.
Construct shards_names, sizes query on coordinator
* Replace master_add_node with citus_add_node
* Replace master_activate_node with citus_activate_node
* Replace master_add_inactive_node with citus_add_inactive_node
* Use master udfs in old scripts
* Replace master_add_secondary_node with citus_add_secondary_node
* Replace master_disable_node with citus_disable_node
* Replace master_drain_node with citus_drain_node
* Replace master_remove_node with citus_remove_node
* Replace master_set_node_property with citus_set_node_property
* Replace master_unmark_object_distributed with citus_unmark_object_distributed
* Replace master_update_node with citus_update_node
* Replace master_update_shard_statistics with citus_update_shard_statistics
* Replace master_update_table_statistics with citus_update_table_statistics
* Rename master_conninfo_cache_invalidate to citus_conninfo_cache_invalidate
Rename master_dist_local_group_cache_invalidate to citus_dist_local_group_cache_invalidate
* Replace master_copy_shard_placement with citus_copy_shard_placement
* Replace master_move_shard_placement with citus_move_shard_placement
* Rename master_dist_node_cache_invalidate to citus_dist_node_cache_invalidate
* Rename master_dist_object_cache_invalidate to citus_dist_object_cache_invalidate
* Rename master_dist_partition_cache_invalidate to citus_dist_partition_cache_invalidate
* Rename master_dist_placement_cache_invalidate to citus_dist_placement_cache_invalidate
* Rename master_dist_shard_cache_invalidate to citus_dist_shard_cache_invalidate
* Drop master_modify_multiple_shards
* Rename master_drop_all_shards to citus_drop_all_shards
* Drop master_create_distributed_table
* Drop master_create_worker_shards
* Revert old function definitions
* Add missing revoke statement for citus_disable_node
* Rethrow original concurrent index creation failure message
* Alter test outputs for concurrent index creation
* Detect duplicate table failure in concurrent index creation
* Add test for conc. index creation w/out duplicates
* Prevent deadlock for long named partitioned index creation on single node
* Create IsSingleNodeCluster function
* Use both local and sequential execution
On top of our foreign key graph, implement the infrastructure to get
list of relations that are connected to input relation via a foreign key
graph.
We need this to support cascading create_citus_local_table &
undistribute_table operations.
Also add regression tests to see what our foreign key graph is able to
capture currently.
Attribute number in a subquery RTE and relation RTE means different
things. In a relation attribute number will point to the column number
in the table definition including the dropped columns as well however in
subquery, it means the index in the target list. When we convert a
relation RTE to subquery RTE we should either correct all the relevant
attribute numbers or we can just add a dummy column for the dropped
columns. We choose the latter in this commit because it is practically
too vulnerable to update all the vars in a query.
Another thing this commit fixes is that in case a join restriction
clause list contains a false clause, we should just returns a false
clause instead of the whole list, because the whole list will contain
restrictions from other RTEs as well and this breaks the query, which
can be seen from the output changes, now it is much simpler.
Also instead of adding single tests for dropped columns, we choose to
run the whole mixed queries with tables with dropped columns, this
revealed some bugs already, which are fixed in this commit.
Baseinfo also has pushed down filters etc, so it makes more sense to use
BaseRestrictInfo to determine what columns have constant equality
filters.
Also RteIdentity is used for removing conversion candidates instead of
rteIndex.
It seems that most of the updates were broken, we weren't aware of it
because there wasn't any data in the tables. They are broken mostly
because local tables do not have a shard id and some code paths should
be updated with that information, currently when there is an invalid
shard id, it is assumed to be pruned.
Consider local tables in router planner
In case there is a local table, the shard id will not be valid and there
are some checks that rely on shard id, we should skip these in case of
local tables, which is handled with a dummy placement.
Add citus local table dist table join tests
add local-dist table mixed joins tests
AllDataLocallyAccessible and ContainsLocalTableSubqueryJoin are removed.
We can possibly remove ModifiesLocalTableWithRemoteCitusLocalTable as
well. Though this removal has a side effect that now when all the data
is locally available, we could still wrap a relation into a subquery, I
guess that should be resolved in the router planner itself.
Add more tests
When we wrap an RTE to subquery we are updating the variables varno's as
1, however we should also update the varno's of vars in quals.
Also some other small code quality improvements are done.
The previous algorithm was not consistent and it could convert different
RTEs based on the table orders in the query. Now we convert local tables
if there is a distributed table which doesn't have a unique index. So if
there are 4 tables, local1, local2, dist1, dist2_with_pkey then we will
convert local1 and local2 in `auto` mode. Converting a distributed table
is not that logical because as there is a distributed table without a
unique index, we will need to convert the local tables anyway. So
converting the distributed table with pkey is redundant.
We should not recursively plan an already routable plannable query. An
example of this is (SELECT * FROM local JOIN (SELECT * FROM dist) d1
USING(a));
So we let the recursive planner do all of its work and at the end we
convert the final query to to handle unsupported joins. While doing each
conversion, we check if it is router plannable, if so we stop.
Only consider range table entries that are in jointree
If a range table is not in jointree then there is no point in
considering that because we are trying to convert range table entries to
subqueries for join use case.
Check equality in quals
We want to recursively plan distributed tables only if they have an
equality filter on a unique column. So '>' and '<' operators will not
trigger recursive planning of distributed tables in local-distributed
table joins.
Recursively plan distributed table only if the filter is constant
If the filter is not a constant then the join might return multiple rows
and there is a chance that the distributed table will return huge data.
Hence if the filter is not constant we choose to recursively plan the
local table.
When doing local-distributed table joins we convert one of them to
subquery. The current policy is that we convert distributed tables to
subquery if it has a unique index on a column that has unique
index(primary key also has a unique index).
UPDATEs on partitioned tables that affect only row partitions should
succeed, the rest should fail.
Also rename CStoreScan to ColumnarScan to make the error message more
relevant.
When Citus needs to parallelize queries on the local node (e.g., the node
executing the distributed query and the shards are the same), we need to
be mindful about the connection management. The reason is that the client
backends that are running distributed queries are competing with the client
backends that Citus initiates to parallelize the queries in order to get
a slot on the max_connections.
In that regard, we implemented a "failover" mechanism where if the distributed
queries cannot get a connection, the execution failovers the tasks to the local
execution.
The failover logic is follows:
- As the connection manager if it is OK to get a connection
- If yes, we are good.
- If no, we fail the workerPool and the failure triggers
the failover of the tasks to local execution queue
The decision of getting a connection is follows:
/*
* For local nodes, solely relying on citus.max_shared_pool_size or
* max_connections might not be sufficient. The former gives us
* a preview of the future (e.g., we let the new connections to establish,
* but they are not established yet). The latter gives us the close to
* precise view of the past (e.g., the active number of client backends).
*
* Overall, we want to limit both of the metrics. The former limit typically
* kics in under regular loads, where the load of the database increases in
* a reasonable pace. The latter limit typically kicks in when the database
* is issued lots of concurrent sessions at the same time, such as benchmarks.
*/
When distributing a columnar table, as well as changing options on a distributed columnar table, this patch will forward the settings from the coordinator to the workers.
For propagating options changes on an already distributed table this change is pretty straight forward. Before applying the change in options locally we will create a `DDLJob` that contains a call to `alter_columnar_table_set(...)` for every shard placement with all settings of the current table. This goes both for setting an option as well as resetting. This will reset the values to the defaults configured on the coordinator. Having the effect that the coordinator is authoritative on the settings and makes sure the shards have the same settings set as the table on the coordinator.
When a columnar table is distributed it is using the `TableDDLCommand` infra structure to create a new kind of `TableDDLCommand`. This new type, called a `TableDDLCommandFunction` contains a context and 2 function pointers to execute. One function returns the command as applied on the table, the second function will return the sql command to apply to a shard with a given shard id. The schema name is ignored as it will use the fully qualified name of the shard in the same schema as the base table.
Multi-row execution already uses sequential execution. When shards
are local, using local execution is profitable as it avoids
an extra connection establishment to the local node.
Columnar options were by accident linked to the relfilenode instead of the regclass/relation oid. This PR moves everything related to columnar options to their own catalog table.
Considering the adaptive connection management
improvements that we plan to roll soon, it makes it
very helpful to know the number of active client
backends.
We are doing this addition to simplify yhe adaptive connection
management for single node Citus. In single node Citus, both the
client backends and Citus parallel queries would compete to get
slots on Postgres' `max_connections` on the same Citus database.
With adaptive connection management, we have the counters for
Citus parallel queries. That helps us to adaptively decide
on the remote executions pool size (e.g., throttle connections
if necessary).
However, we do not have any counters for the total number of
client backends on the database. For single node Citus, we
should consider all the client backends, not only the remote
connections that Citus does.
Of course Postgres internally knows how many client
backends are active. However, to get that number Postgres
iterates over all the backends. For examaple, see [pg_stat_get_db_numbackends](8e90ec5580/src/backend/utils/adt/pgstatfuncs.c (L1240))
where Postgres iterates over all the backends.
For our purpuses, we need this information on every connection
establishment. That's why we cannot affort to do this kind of
iterattion.
* Move local execution after the remote execution
Before this commit, when both local and remote tasks
exist, the executor was starting the execution with
local execution. There is no strict requirements on
this.
Especially considering the adaptive connection management
improvements that we plan to roll soon, moving the local
execution after to the remote execution makes more sense.
The adaptive connection management for single node Citus
would look roughly as follows:
- Try to connect back to the coordinator for running
parallel queries.
- If succeeds, go on and execute tasks in parallel
- If fails, fallback to the local execution
So, we'll use local execution as a fallback mechanism. And,
moving it after to the remote execution allows us to implement
such further scenarios.
Before this commit, we let AdaptiveExecutorPreExecutorRun()
to be effective multiple times on every FETCH on cursors.
That does not affect the correctness of the query results,
but adds significant overhead.
TableAM API doesn't allow us to pass around a state variable along all of the tuple inserts belonging to the same command. We require this in columnar store, since we batch them, and when we have enough rows we flush them as stripes.
To do that, we keep a (relfilenode) -> stack of (subxact id, TableWriteState) global mapping.
**Inserts**
Whenever we want to insert a tuple, we look up for the relation's relfilenode in this mapping. If top of the stack matches current subtransaction, we us the existing TableWriteState. Otherwise, we allocate a new TableWriteState and push it on top of stack.
**(Sub)Transaction Commit/Aborts**
When the subtransaction or transaction is committed, we flush and pop all entries matching current SubTransactionId.
When the subtransaction or transaction is committed, we pop all entries matching current SubTransactionId and discard them without flushing.
**Reads**
Since we might have unwritten rows which needs to be read by a table scan, we flush write states on SELECTs. Since flushing the write state of upper transactions in a subtransaction will cause metadata being written in wrong subtransaction, we ERROR out if any of the upper subtransactions have unflushed rows.
**Table Drops**
We record in which subtransaction the table was dropped. When committing a subtransaction in which table was dropped, we propagate the drop to upper transaction. When aborting a subtransaction in which table was dropped, we mark table as not deleted.
When a relation is used on an OUTER JOIN with FALSE filters,
set_rel_pathlist_hook may not be called for the table.
There might be other cases as well, so do not rely on the hook
for classification of the tables.
* Fix incorrect join related fields
Ruleutils expect to give the original index of join columns hence we
should consider the dropped columns while setting the fields in
SetJoinRelatedFieldsCompat.
* add some more tests for joins
* Move tests to join.sql and create a utility function
Disallow `ON TRUE` outer joins with reference & distributed tables
when reference table is outer relation by fixing the logic bug made
when calling `LeftListIsSubset` function.
Also, be more defensive when removing duplicate join restrictions
when join clause is empty for non-inner joins as they might still
contain useful information for non-inner joins.
It seems like Postgres could call set_rel_pathlist() for
the same relation multiple times. This breaks the logic
where we assume relationCount eqauls to the number of
entries in relationRestrictionList.
In summary, relationRestrictionList may contain duplicate
entries.