It seems that there are only very few cases where that is useful, and
for now we prefer not having that check. This means that we might
perform some unnecessary checks, but that should be rare and not
performance critical.
Instead of sending NULL's over a network, we now convert the subqueries
in the form of:
SELECT t.a, NULL, NULL FROM (SELECT a FROM table)t;
And we recursively plan the inner part so that we don't send the NULL's
over network. We still need the NULLs in the outer subquery because we
currently don't have an easy way of updating all the necessary places in
the query.
Add some documentation for how the conversion is done
Baseinfo also has pushed down filters etc, so it makes more sense to use
BaseRestrictInfo to determine what columns have constant equality
filters.
Also RteIdentity is used for removing conversion candidates instead of
rteIndex.
It seems that most of the updates were broken, we weren't aware of it
because there wasn't any data in the tables. They are broken mostly
because local tables do not have a shard id and some code paths should
be updated with that information, currently when there is an invalid
shard id, it is assumed to be pruned.
Consider local tables in router planner
In case there is a local table, the shard id will not be valid and there
are some checks that rely on shard id, we should skip these in case of
local tables, which is handled with a dummy placement.
Add citus local table dist table join tests
add local-dist table mixed joins tests
AllDataLocallyAccessible and ContainsLocalTableSubqueryJoin are removed.
We can possibly remove ModifiesLocalTableWithRemoteCitusLocalTable as
well. Though this removal has a side effect that now when all the data
is locally available, we could still wrap a relation into a subquery, I
guess that should be resolved in the router planner itself.
Add more tests
We should not recursively plan an already routable plannable query. An
example of this is (SELECT * FROM local JOIN (SELECT * FROM dist) d1
USING(a));
So we let the recursive planner do all of its work and at the end we
convert the final query to to handle unsupported joins. While doing each
conversion, we check if it is router plannable, if so we stop.
Only consider range table entries that are in jointree
If a range table is not in jointree then there is no point in
considering that because we are trying to convert range table entries to
subqueries for join use case.
Check equality in quals
We want to recursively plan distributed tables only if they have an
equality filter on a unique column. So '>' and '<' operators will not
trigger recursive planning of distributed tables in local-distributed
table joins.
Recursively plan distributed table only if the filter is constant
If the filter is not a constant then the join might return multiple rows
and there is a chance that the distributed table will return huge data.
Hence if the filter is not constant we choose to recursively plan the
local table.
When doing local-distributed table joins we convert one of them to
subquery. The current policy is that we convert distributed tables to
subquery if it has a unique index on a column that has unique
index(primary key also has a unique index).
The logical planner cannot handle joins between local and distributed table.
Instead, we can recursively plan one side of the join and let the logical
planner handle the rest.
Our algorithm is a little smart, trying not to recursively plan distributed
tables, but favors local tables.
A utility function is added so that each caller can implement a handler
for each index on a given table. This means that the caller doesn't need
to worry about how to access each index, the only thing that it needs to
do each to implement a function to which each index on the table is
passed iteratively.
When Citus needs to parallelize queries on the local node (e.g., the node
executing the distributed query and the shards are the same), we need to
be mindful about the connection management. The reason is that the client
backends that are running distributed queries are competing with the client
backends that Citus initiates to parallelize the queries in order to get
a slot on the max_connections.
In that regard, we implemented a "failover" mechanism where if the distributed
queries cannot get a connection, the execution failovers the tasks to the local
execution.
The failover logic is follows:
- As the connection manager if it is OK to get a connection
- If yes, we are good.
- If no, we fail the workerPool and the failure triggers
the failover of the tasks to local execution queue
The decision of getting a connection is follows:
/*
* For local nodes, solely relying on citus.max_shared_pool_size or
* max_connections might not be sufficient. The former gives us
* a preview of the future (e.g., we let the new connections to establish,
* but they are not established yet). The latter gives us the close to
* precise view of the past (e.g., the active number of client backends).
*
* Overall, we want to limit both of the metrics. The former limit typically
* kics in under regular loads, where the load of the database increases in
* a reasonable pace. The latter limit typically kicks in when the database
* is issued lots of concurrent sessions at the same time, such as benchmarks.
*/
When distributing a columnar table, as well as changing options on a distributed columnar table, this patch will forward the settings from the coordinator to the workers.
For propagating options changes on an already distributed table this change is pretty straight forward. Before applying the change in options locally we will create a `DDLJob` that contains a call to `alter_columnar_table_set(...)` for every shard placement with all settings of the current table. This goes both for setting an option as well as resetting. This will reset the values to the defaults configured on the coordinator. Having the effect that the coordinator is authoritative on the settings and makes sure the shards have the same settings set as the table on the coordinator.
When a columnar table is distributed it is using the `TableDDLCommand` infra structure to create a new kind of `TableDDLCommand`. This new type, called a `TableDDLCommandFunction` contains a context and 2 function pointers to execute. One function returns the command as applied on the table, the second function will return the sql command to apply to a shard with a given shard id. The schema name is ignored as it will use the fully qualified name of the shard in the same schema as the base table.
Multi-row execution already uses sequential execution. When shards
are local, using local execution is profitable as it avoids
an extra connection establishment to the local node.
If MemoryContextAlloc errors out -e.g. during an OOM-, ConnectionHashEntry->connections
stays as NULL.
With this commit, we add isValid flag to ConnectionHashEntry that should be set to true
right after we allocate & initialize ConnectionHashEntry->connections list properly, and we
check it before accesing to ConnectionHashEntry->connections.
Refactor internals on how Citus creates the SQL commands it sends to recreate shards.
Before Citus collected solely ddl commands as `char *`'s to recreate a table. If they were used to create a shard they were wrapped with `worker_apply_shard_ddl_command` and send to the workers. On the workers the UDF wrapping the ddl command would rewrite the parsetree to replace tables names with their shard name equivalent.
This worked well, but poses an issue when adding columnar. Due to limitations in Postgres on creating custom options on table access methods we need to fall back on a UDF to set columnar specific options. Now, to recreate the table, we can not longer rely on having solely DDL statements to recreate a table.
A prototype was made to run this UDF wrapped in `worker_apply_shard_ddl_command`. This became pretty messy, hard to understand and subsequently hard to maintain.
This PR proposes a refactor of the internal representation of table ddl commands into a `TableDDLCommand` structure. The current implementation only supports a `char *` as its contents. Based on the use of the DDL statement (eg. creating the table -mx- or creating a shard) one of two different functions can be called to get the statement to send to the worker:
- `GetTableDDLCommand(TableDDLCommand *command)`: This function returns that ddl command to create the table. In this implementation it will just return the `char *`. This has the same functionality as getting the old list and not wrapping it.
- `GetShardedTableDDLCommand(TableDDLCommand *command, uint64 shardId, char *schemaName)`: This function returns the ddl command wrapped in `worker_apply_shard_ddl_command` with the `shardId` as an argument. Due to backwards compatibility it also accepts a. `schemaName`. The exact purpose is not directly clear. Ideally new implementations would work with fully qualified statements and ignore the `schemaName`.
A future implementation could accept 2.function pointers and a `void *` for context to let the two pointers work on. This gives greater flexibility in controlling what commands get send in which situations. Also, in a future, we could implement the intermediate step of creating the `parsetree` datastructure of statements based on the contents in the catalog with a corresponding deparser. For sharded queries a mutator could be ran over the parsetree to rewrite the tablenames to the names with the shard identifier. This will completely omit the requirement for `worker_apply_shard_ddl_command`.
Considering the adaptive connection management
improvements that we plan to roll soon, it makes it
very helpful to know the number of active client
backends.
We are doing this addition to simplify yhe adaptive connection
management for single node Citus. In single node Citus, both the
client backends and Citus parallel queries would compete to get
slots on Postgres' `max_connections` on the same Citus database.
With adaptive connection management, we have the counters for
Citus parallel queries. That helps us to adaptively decide
on the remote executions pool size (e.g., throttle connections
if necessary).
However, we do not have any counters for the total number of
client backends on the database. For single node Citus, we
should consider all the client backends, not only the remote
connections that Citus does.
Of course Postgres internally knows how many client
backends are active. However, to get that number Postgres
iterates over all the backends. For examaple, see [pg_stat_get_db_numbackends](8e90ec5580/src/backend/utils/adt/pgstatfuncs.c (L1240))
where Postgres iterates over all the backends.
For our purpuses, we need this information on every connection
establishment. That's why we cannot affort to do this kind of
iterattion.
Before this commit, we let AdaptiveExecutorPreExecutorRun()
to be effective multiple times on every FETCH on cursors.
That does not affect the correctness of the query results,
but adds significant overhead.
If one wishes to iterate through a List and insert list elements in
PG13, it is not safe to use for_each_ptr as the List representation
in PostgreSQL no longer linked lists, but arrays, and it is possible
that the whole array is repalloc'ed if ther is not sufficient space
available.
See postgres commit 1cff1b95ab6ddae32faa3efe0d95a820dbfdc164 for more
information
When a relation is used on an OUTER JOIN with FALSE filters,
set_rel_pathlist_hook may not be called for the table.
There might be other cases as well, so do not rely on the hook
for classification of the tables.
RemoveDuplicateJoinRestrictions() function was introduced with the aim of decrasing the overall planning times by eliminating the duplicate JOIN restriction entries (#1989). However, it turns out that the function itself is so CPU intensive with a very high algorithmic complexity, it hurts a lot more than it helps. The function is a clear example of premature optimization.
The table below shows the difference clearly:
"distributed query planning
time master" RemoveDuplicateJoinRestrictions() execution time on master "Remove the function RemoveDuplicateJoinRestrictions()
this PR"
5 table INNER JOIN 9 msec 2msec 7 msec
10 table INNER JOIN 227 msec 194 msec 29 msec
20 table INNER JOIN 1 sec 235 msec 1 sec 139 msec 90 msecs
50 table INNER JOIN 24 seconds 21 seconds 1.5 seconds
100 table INNER JOIN 2 minutes 16 secods 1 minute 53 seconds 23 seconds
250 table INNER JOIN Bottleneck on JoinClauseList 18 minutes 52 seconds Bottleneck on JoinClauseList
5 table INNER JOIN in subquery 9 msec 0 msec 6 msec
10 table INNER JOIN subquery 33 msec 10 msec 32 msec
20 table INNER JOIN subquery 132 msec 67 msec 123 msec
50 table INNER JOIN subquery 1.2 seconds 900 msec 500 msec
100 table INNER JOIN subquery 6 seconds 5 seconds 2 seconds
250 table INNER JOIN subquery 54 seconds 37 seconds 20 seconds
5 table LEFT JOIN 5 msec 0 msec 5 msec
10 table LEFT JOIN 11 msec 0 msec 13 msec
20 table LEFT JOIN 26 msec 2 msec 30 msec
50 table LEFT JOIN 150 msec 15 msec 193 msec
100 table LEFT JOIN 757 msec 71 msec 722 msec
250 table LEFT JOIN 8 seconds 600 msec 8 seconds
5 JOINs among 2 table JOINs 37 msec 11 msec 25 msec
10 JOINs among 2 table JOINs 536 msec 306 msec 352 msec
20 JOINs among 2 table JOINs 794 msec 181 msec 640 msec
50 JOINs among 2 table JOINs 25 seconds 2 seconds 22 seconds
100 JOINs among 2 table JOINs Bottleneck on JoinClauseList 9 seconds Bottleneck on JoinClauseList
150 JOINs among 2 table JOINs Bottleneck on JoinClauseList 46 seconds Bottleneck on JoinClauseList
On top of the performance penalty, the function had a critical bug #4255, and with #4254 we hit one more important bug. It should be fixed by adding the followig check to the ContextCoversJoinRestriction():
```
static bool
JoinRelIdsSame(JoinRestriction *leftRestriction, JoinRestriction *rightRestriction)
{
Relids leftInnerRelIds = leftRestriction->innerrel->relids;
Relids rightInnerRelIds = rightRestriction->innerrel->relids;
if (!bms_equal(leftInnerRelIds, rightInnerRelIds))
{
return false;
}
Relids leftOuterRelIds = leftRestriction->outerrel->relids;
Relids rightOuterRelIds = rightRestriction->outerrel->relids;
if (!bms_equal(leftOuterRelIds, rightOuterRelIds))
{
return false;
}
return true;
}
```
However, adding this eliminates all the benefits tha RemoveDuplicateJoinRestrictions() brings.
I've used the commands here to generate the JOINs mentioned in the PR: https://gist.github.com/onderkalaci/fe8654f9df5916c7af4c7c5eb892561e#file-gistfile1-txt
Inner and outer JOINs behave roughly the same, to simplify the table only added INNER joins.
* Fix incorrect join related fields
Ruleutils expect to give the original index of join columns hence we
should consider the dropped columns while setting the fields in
SetJoinRelatedFieldsCompat.
* add some more tests for joins
* Move tests to join.sql and create a utility function
Disallow `ON TRUE` outer joins with reference & distributed tables
when reference table is outer relation by fixing the logic bug made
when calling `LeftListIsSubset` function.
Also, be more defensive when removing duplicate join restrictions
when join clause is empty for non-inner joins as they might still
contain useful information for non-inner joins.
It seems like Postgres could call set_rel_pathlist() for
the same relation multiple times. This breaks the logic
where we assume relationCount eqauls to the number of
entries in relationRestrictionList.
In summary, relationRestrictionList may contain duplicate
entries.
With this commit, we make sure that local execution adds the
intermediate result size as the distributed execution adds. Plus,
it enforces the citus.max_intermediate_result_size value.
We should not access CurrentLocalExecutionStatus directly because that
would mean that we could also set it directly, which we shouldn't
because we have checks to see if the new state is possible, otherwise we
error.
Citus has the logic to truncate the long shard names to prevent
various issues, including self-deadlocks. However, for partitioned
tables, when index is created on the parent table, the index names
on the partitions are auto-generated by Postgres. We use the same
Postgres function to generate the index names on the shards of the
partitions. If the length exceeds the limit, we switch to sequential
execution mode.
After the connection timeout, we fail the session/pool. However, the
underlying connection can still be trying to connect. That is dangerous
because the new placement executions have already been in place. The
executor cannot handle the situation where multiple of
EXECUTION_ORDER_ANY task executions succeeds.
Adding a regression test doesn't seem easily doable. To reproduce the issue
- Add 2 worker nodes
- create a reference table
- set citus.node_connection_timeout to 1ms (requires code change)
- Continiously execute `SELECT count(*) FROM ref_table`
- Sometime later, you hit an out-of-array access in
`ScheduleNextPlacementExecution()` hence crashing.
- The reason for that is sometimes the first connection
successfully established while the executor is already
trying to execute the query on the second node.
Add sort method parameter for regression tests
Fix check-style
Change sorting method parameters to enum
Polish
Add task fields to OutTask
Add test into multi_explain
Fix isolation test
* Not take ShareUpdateExlusiveLock on pg_dist_transaction
We were taking ShareUpdateExlusiveLock on pg_dist_transaction during
recovery to prevent multiple recoveries happening concurrenly. VACUUM(
not FULL) also takes ShareUpdateExclusiveLock, and they can conflict. It
seems that VACUUM will skip the table if there is a conflicting lock
already taken unless it is doing the vacuum to prevent id wraparound, in
which case there can be a deadlock. I guess the deadlock happens if:
- VACUUM takes a lock on pg_dist_transaction and is done for id
wraparound problem
- The transaction in the maintenance tries to take a lock but
cannot as that conflicts with the lock acquired by VACUUM
- The transaction in the maintenance daemon has a very old xid hence
VACUUM cannot proceed.
If we take a row exclusive lock in transaction recovery then it wouldn't
conflict with VACUUM hence it could proceed so the deadlock would be
resolved. To prevent concurrent transaction recoveries happening, an
advisory lock is taken with ShareUpdateExlusiveLock as before.
* Use CITUS_OPERATIONS tag
* Not allow removing a single node with ref tables
We should not allow removing a node if it is the only node in the
cluster and there is a data on it. We have this check for distributed
tables but we didn't have it for reference tables.
* Update src/test/regress/expected/single_node.out
Co-authored-by: Onur Tirtir <onurcantirtir@gmail.com>
* Update src/test/regress/sql/single_node.sql
Co-authored-by: Onur Tirtir <onurcantirtir@gmail.com>
This commit brings following features:
Foreign key support from citus local tables to reference tables
* Foreign key support from reference tables to citus local tables
(only with RESTRICT & NO ACTION behavior)
* ALTER TABLE ENABLE/DISABLE trigger command support
* CREATE/DROP/ALTER trigger command support
and disallows:
* ALTER TABLE ATTACH/DETACH PARTITION commands
* CREATE TABLE <postgres table> ATTACH PARTITION <citus local table>
commands
* Foreign keys from postgres tables to citus local tables
(the other way was already disallowed)
for citus local tables.
Introduce table entry utility functions
Citus table cache entry utilities are introduced so that we can easily
extend existing functionality with minimum changes, specifically changes
to these functions. For example IsNonDistributedTableCacheEntry can be
extended for citus local tables without the need to scan the whole
codebase and update each relevant part.
* Introduce utility functions to find the type of tables
A table type can be a reference table, a hash/range/append distributed
table. Utility methods are created so that we don't have to worry about
how a table is considered as a reference table etc. This also makes it
easy to extend the table types.
* Add IsCitusTableType utilities
* Rename IsCacheEntryCitusTableType -> IsCitusTableTypeCacheEntry
* Change citus table types in some checks
create_distributed_function(function_name,
distribution_arg_name,
colocate_with text)
This UDF did not allow colocate_with parameters when there were no
disttribution_arg_name supplied. This commit changes the behaviour to
allow missing distribution_arg_name parameters when the function should
be colocated with a reference table.
FindNodeCheck is not clear about what the function is doing. They are
renamed to FindNodeMatchingCheckFunctionXXX. Also for choosing elements in these
functions, CheckNodeFunc type is introduced.
It seems that currently we process even postgres tables in explain
commands. This is because we register a hook for explain and we don't
have any check to see if the query has any citus table.
With this commit, we now send the buffer usage as well to the relevant
API. There is some duplicate in the code but it is because of the
existing structure, we can refactor this separately.
The codebase is updated to use varattnosync and varnosyn and we defined
the macros for older versions. This way we can just remove the macros
when we drop an older version.
CMDTAG_SELECT exists in PG12 hence defining a MACRO such as
CMDTAG_SELECT -> "SELECT" is not possible. I chose CMDTAG_SELECT_COMPAT
because with the COMPAT suffix it is explicit that it maps to different
things in different versions and also has a less chance of mapping
something irrevelant. For example if we used SELECT as a macro, then it
would map every SELECT to whatever it is mapping to, which might have
unexpected/undesired behaviour.
The error message when index has opclassopts is improved and the commit
from postgres side is also included for future reference.
Also some minor style related changes are applied.
This commit mostly adds pg_get_triggerdef_command to our ruleutils_13.
This doesn't add anything extra for ruleutils 13 so it is basically a copy
of the change on ruleutils_12
Postgres introduced QueryCompletion struct. Hence a compat utility is
added to finish query completion for older versions and pg >= 13.
The commit on Postgres side:
2f9661311b83dc481fc19f6e3bda015392010a40
addRangeTableEntryXXX methods return a ParseNamespaceItem with pg >= 13.
RangeTableEntryFromNSItem macro is added so that we return the range
table entry from the ParseNamespaceItem in pg>=13 and for pg < 13 rte
would already be returned with addRangeTableEntryXXX methods.
Commit on Postgres side:
5815696bc66b3092f6361f53e0394909647042c8
Since PG13 changed the list, a listcell doesn't contain data anymore.
Therefore Set_ptr_value macro is created, so that depending on the
version it will either use cell->data.ptr_value or cell->ptr_value.
Commit on Postgres side:
1cff1b95ab6ddae32faa3efe0d95a820dbfdc164
With PG13 varoattno and varnoold fields were renamed as varattnosyn and
varnosyn. A macro is defined for these.
Commit on Postgres side:
9ce77d75c5ab094637cc4a446296dc3be6e3c221
Command on Postgres side:
git log --all --grep="varoattno"
Since ExplainOnePlan expects BufferUsage as well with PG >= 13,
ExplainOnePlanCompat is added.
Commit on Postgres side:
ed7a5095716ee498ecc406e1b8d5ab92c7662d10
PortalDefineQuery doesn't accept char* for command tag anymore with PG
>= 13. We are currently only using it with Select, therefore a Portal
define query compat for select is created.
Commit on PG side:
2f9661311b83dc481fc19f6e3bda015392010a40
As the new planner and pg_plan_query_compat methods expect the query
string as well, macros are defined to be compatible in different
versions of postgres.
Relevant commit on Postgres:
6aba63ef3e606db71beb596210dd95fa73c44ce2
Command on Postgres:
git log --all --grep="pg_plan_query"
With PG13 heap_* (heap_open, heap_close etc) are replaced with table_*
(table_open, table_close etc).
It is better to use the new table access methods in the codebase and
define the macros for the previous versions as we can easily remove the
macro without having to change the codebase when we drop the support for
the old version.
Commits that introduced this change on Postgres:
f25968c49697db673f6cd2a07b3f7626779f1827
e0c4ec07284db817e1f8d9adfb3fffc952252db0
4b21acf522d751ba5b6679df391d5121b6c4a35f
Command to see relevant commits on Postgres side:
git log --all --grep="heap_open"
Pass the list to lnext API
lnext API now expects the list as well.
The commit on Postgres that introduced the change: 1cff1b95ab6ddae32faa3efe0d95a820dbfdc164
lnext_compat and list_delete_cell_compat macros are introduced so that
we can use these macros in the codebase without having to use #if
directives in the codebase.
Related commit on postgres:
1cff1b95ab6ddae32faa3efe0d95a820dbfdc164
Command to search in postgres:
git log --all --grep="list_delete_cell"
add ListCellAndListWrapper
When iterating a list in separate function calls, we need both the list
and the current cell starting from PG13, therefore
ListCellAndListWrapper is added to store both as a wrapper.
Use ListCellAndListWrapper in foreign key test udfs
As we iterate a list in these udfs using a functionContext, we need to
use the wrapper to be able to access both the list and the current cell.