**Intro**
This adds support to Citus to change the CPU priority values of
backends. This is created with two main usecases in mind:
1. Users might want to run the logical replication part of the shard moves
or shard splits at a higher speed than they would do by themselves.
This might cause some small loss of DB performance for their regular
queries, but this is often worth it. During high load it's very possible
that the logical replication WAL sender is not able to keep up with the
WAL that is generated. This is especially a big problem when the
machine is close to running out of disk when doing a rebalance.
2. Users might have certain long running queries that they don't impact
their regular workload too much.
**Be very careful!!!**
Using CPU priorities to control scheduling can be helpful in some cases
to control which processes are getting more CPU time than others.
However, due to an issue called "[priority inversion][1]" it's possible that
using CPU priorities together with the many locks that are used within
Postgres cause the exact opposite behavior of what you intended. This
is why this PR only allows the PG superuser to change the CPU priority
of its own processes. Currently it's not recommended to set `citus.cpu_priority`
directly. Currently the only recommended interface for users is the setting
called `citus.cpu_priority_for_logical_replication_senders`. This setting
controls CPU priority for a very limited set of processes (the logical
replication senders). So, the dangers of priority inversion are also limited
with when using it for this usecase.
**Background**
Before reading the rest it's important to understand some basic
background regarding process CPU priorities, because they are a bit
counter intuitive. A lower priority value, means that the process will
be scheduled more and whatever it's doing will thus complete faster. The
default priority for processes is 0. Valid values are from -20 to 19
inclusive. On Linux a larger difference between values of two processes
will result in a bigger difference in percentage of scheduling.
**Handling the usecases**
Usecase 1 can be achieved by setting `citus.cpu_priority_for_logical_replication_senders`
to the priority value that you want it to have. It's necessary to set
this both on the workers and the coordinator. Example:
```
citus.cpu_priority_for_logical_replication_senders = -10
```
Usecase 2 can with this PR be achieved by running the following as
superuser. Note that this is only possible as superuser currently
due to the dangers mentioned in the "Be very carefull!!!" section.
And although this is possible it's **NOT** recommended:
```sql
ALTER USER background_job_user SET citus.cpu_priority = 5;
```
**OS configuration**
To actually make these settings work well it's important to run Postgres
with more a more permissive value for the 'nice' resource limit than
Linux will do by default. By default Linux will not allow a process to
set its priority lower than it currently is, even if it was lower when
the process originally started. This capability is necessary to reset
the CPU priority to its original value after a transaction finishes.
Depending on how you run Postgres this needs to be done in one of two
ways:
If you use systemd to start Postgres all you have to do is add a line
like this to the systemd service file:
```conf
LimitNice=+0 # the + is important, otherwise its interpreted incorrectly as 20
```
If that's not the case you'll have to configure `/etc/security/limits.conf`
like so, assuming that you are running Postgres as the `postgres` OS user:
```
postgres soft nice 0
postgres hard nice 0
```
Finally you'd have add the following line to `/etc/pam.d/common-session`
```
session required pam_limits.so
```
These settings would allow to change the priority back after setting it
to a higher value.
However, to actually allow you to set priorities even lower than the
default priority value you would need to change the values in the
config to something lower than 0. So for example:
```conf
LimitNice=-10
```
or
```
postgres soft nice -10
postgres hard nice -10
```
If you use WSL2 you'll likely have to do another thing. You have to
open a new shell, because when PAM is only used during login, and
WSL2 doesn't actually log you in. You can force a login like this:
```
sudo su $USER --shell /bin/bash
```
Source: https://stackoverflow.com/a/68322992/2570866
[1]: https://en.wikipedia.org/wiki/Priority_inversion
When introducing non-blocking shard split functionality it was based
heavily on the non-blocking shard moves. However, differences between
usage was slightly to big to be able to reuse the existing functions
easily. So, most logical replication code was simply copied to dedicated
shard split functions and modified for that purpose.
This PR tries to create a more generic logical replication
infrastructure that can be used by both shard splits and shard moves.
There's probably more code sharing possible in the future, but I believe
this is at least a good start and addresses the lowest hanging fruit.
This also adds a CreateSimpleHash function that makes creating the
most common type of hashmap common.
DESCRIPTION: Use faster custom copy logic for non-blocking shard moves
Non-blocking shard moves consist of two main phases:
1. Initial data copy
2. Catchup phase
This changes the first of these phases significantly. Previously we used the
copy logic provided by postgres subscriptions. This meant we didn't have
to implement it ourselves, but it came with the downside of little control.
When implementing shard splits we needed more control to even make it
work, so we implemented our own logic for copying data between nodes.
This PR starts using that logic for non-blocking shard moves. Doing so
has four main advantages:
1. It uses COPY in binary format when possible, which is cheaper to encode
and decode. Furthermore it very often results in less data that needs to
be sent over the network.
2. It allows us to create the primary key (or other replica identity) after doing
the initial data copy. This should give some speed up over the total run,
because creating an index is bulk is much faster than incrementally building it.
3. It doesn't require a replication slot per parallel copy. Increasing the maximum
number of replication slots uses resources in postgres, even if they are not used.
So reducing the number of replication slots that shard moves need is nice.
4. Logical replication table_sync workers are slow to start up, so if lots of shards
need to be copied that can make it quite slow. This can happen easily when
combining Postgres partitioning with Citus.
It turns out that create_distributed_table
and citus_move/copy_shard_placement does not
work well concurrently.
To fix that, we need to acquire a lock, which
sounds like a good use of colocation lock.
However, the current usage of colocation lock is
limited to higher level UDFs like rebalance_table_shards
etc. Those usage of lock is still useful, but
we cannot acquire the same lock on citus_move_shard_placement
etc. because the coordinator connects to itself to acquire
the lock. Hence, the high level UDF blocks itself.
To fix that, we use one more colocation lock, with the placements
are the main objects to consider.
Before this commit, we required multiple copies of the
same stringInfo if we needed to append/prepend data to
the stringInfo. Now, we optionally get prefix/postfix.
For large string operations, this can save up to %10
memory.
We used to only check whether the PID is valid
or not. However, Postgres does not necessarily
set the PID of the backend to 0 when it exists.
Instead, we need to be able to check it from procArray.
IsBackendPid() is what pg_stat_activity also relies
on for a similar purpose.
use RecurseObjectDependencies api to find if an object is citus depended
make vanilla tests runnable to see if citus_depended function is working correctly
* Remove if conditions with PG_VERSION_NUM < 13
* Remove server_above_twelve(&eleven) checks from tests
* Fix tests
* Remove pg12 and pg11 alternative test output files
* Remove pg12 specific normalization rules
* Some more if conditions in the code
* Change RemoteCollationIdExpression and some pg12/pg13 comments
* Remove some more normalization rules
When building packages on ubuntu jammy, we started to see some warnings.
autoreconf: warning: autoconf input should be named 'configure.ac', not
'configure.in'
* Blocking split setup
* Add missing type
* Missing API from Metadata Sync
* Shard Split e2e code
* Worker Split Copy DestReceiver skeleton
* Basic destreceiver code
* worker_split_copy UDF
* UDF calling
* Split points are text
* Isolate Tenant and Split Shard Unification
* Fixing executor and misc
* Reindent code
* Fixing UDF definitions
* Hello World Local Copy works
* Remote copy hello world works
* Local and Remote binary test
* Fixing text local copy and adding tests
* Hello World shard split works
* Negative tests
* Blocking Split workflow works
* Refactor
* Bug fix
* Reindent
* Cleaning up and adding comments
* Basic test for shard split workflow
* ReIndent
* Circle CI integration
* Removing include causing circle-ci build failure
* Remove SplitCopyDestReceiver and use PartitionedResultDestReceiver
* Add support for citus.enable_binary_protocol
* Reindent
* Fix build break
* Update Test
* Cleanup on catch
* Addressing open comments
* Update downgrade script and quote schema/table in COPY statement
* Fix metadata sync issue. Update regression test
* Isolation test and bug fix
* Add Isolation test, fix foreign constraint deadlock issue
* Misc code review comments
* Test name needing to be quoted
* Refactor code from review comments
* Explaining shardGroupSplitIntervalListList
* Fix upgrade & downgrade
* Fix broken test
* Test fix Round 2
* Fixing bug and modifying test appropriately
* Fully qualify copy udf name. Run Reindent
* Address PR comments
* Fix null handling when creating AuxiliaryStructures
* Ensure local copy is triggered in tests
* Limit max shards that can be created with split
* Test failure fix
* Remove split_mode and use shard_transfer_mode instead'
* Fix test failure
* Fix test failure
* Fixing permission issue when splitting non-superuser owned tables
* Fix test expected output
* Remove extra space
* Fix test
* attempt to fix test
* Addressing Marco's PR comment
* Only clean shards created by workflow
* Remove from merge
* Update test
Similar to #5897, one more step for running Citus with PG 15.
This PR at least make Citus run with PG 15. I have not tried running the tests with PG 15.
Shmem changes are based on 4f2400cb3f
Compile breaks are mostly due to #6008
* Support upgrade and downgrade and separate columnar as citus_columnar extension
Co-authored-by: Yanwen Jin <yanwjin@microsoft.com>
Co-authored-by: Jeff Davis <jeff@j-davis.com>
This PR makes all of the features open source that were previously only
available in Citus Enterprise.
Features that this adds:
1. Non blocking shard moves/shard rebalancer
(`citus.logical_replication_timeout`)
2. Propagation of CREATE/DROP/ALTER ROLE statements
3. Propagation of GRANT statements
4. Propagation of CLUSTER statements
5. Propagation of ALTER DATABASE ... OWNER TO ...
6. Optimization for COPY when loading JSON to avoid double parsing of
the JSON object (`citus.skip_jsonb_validation_in_copy`)
7. Support for row level security
8. Support for `pg_dist_authinfo`, which allows storing different
authentication options for different users, e.g. you can store
passwords or certificates here.
9. Support for `pg_dist_poolinfo`, which allows using connection poolers
in between coordinator and workers
10. Tracking distributed query execution times using
citus_stat_statements (`citus.stat_statements_max`,
`citus.stat_statements_purge_interval`,
`citus.stat_statements_track`). This is disabled by default.
11. Blocking tenant_isolation
12. Support for `sslkey` and `sslcert` in `citus.node_conninfo`
It is often useful to be able to sync the metadata in parallel
across nodes.
Also citus_finalize_upgrade_to_citus11() uses
start_metadata_sync_to_primary_nodes() after this commit.
Note that this commit does not parallelize all pieces of node
activation or metadata syncing. Instead, it tries to parallelize
potenially large parts of metadata, which is the objects and
distributed tables (in general Citus tables).
In the future, it would be nice to sync the reference tables
in parallel across nodes.
Create ~720 distributed tables / ~23450 shards
```SQL
-- declaratively partitioned table
CREATE TABLE github_events_looooooooooooooong_name (
event_id bigint,
event_type text,
event_public boolean,
repo_id bigint,
payload jsonb,
repo jsonb,
actor jsonb,
org jsonb,
created_at timestamp
) PARTITION BY RANGE (created_at);
SELECT create_time_partitions(
table_name := 'github_events_looooooooooooooong_name',
partition_interval := '1 day',
end_at := now() + '24 months'
);
CREATE INDEX ON github_events_looooooooooooooong_name USING btree (event_id, event_type, event_public, repo_id);
SELECT create_distributed_table('github_events_looooooooooooooong_name', 'repo_id');
SET client_min_messages TO ERROR;
```
across 1 node: almost same as expected
```SQL
SELECT start_metadata_sync_to_primary_nodes();
Time: 15664.418 ms (00:15.664)
select start_metadata_sync_to_node(nodename,nodeport) from pg_dist_node;
Time: 14284.069 ms (00:14.284)
```
across 7 nodes: ~3.5x improvement
```SQL
SELECT start_metadata_sync_to_primary_nodes();
┌──────────────────────────────────────┐
│ start_metadata_sync_to_primary_nodes │
├──────────────────────────────────────┤
│ t │
└──────────────────────────────────────┘
(1 row)
Time: 25711.192 ms (00:25.711)
-- across 7 nodes
select start_metadata_sync_to_node(nodename,nodeport) from pg_dist_node;
Time: 82126.075 ms (01:22.126)
```
* Bug fix for bug #5876. Memset MetadataCacheSystem every time there is an abort
* Created an ObjectAccessHook that saves the transactionlevel of when citus was created and will clear metadatacache if that transaction level is rolled back. Added additional tests to make sure metadatacache is cleared
Columnar: support relation options with ALTER TABLE.
Use ALTER TABLE ... SET/RESET to specify relation options rather than
alter_columnar_table_set() and alter_columnar_table_reset().
Not only is this more ergonomic, but it also allows better integration
because it can be treated like DDL on a regular table. For instance,
citus can use its own ProcessUtility_hook to distribute the new
settings to the shards.
DESCRIPTION: Columnar: support relation options with ALTER TABLE.
Before this commit, we had:
```SQL
SELECT citus_disable_node(nodename, nodeport, force boolean DEFAULT false)
```
Where, we allow forcing to disable first worker node with
`force:=true`. However, it entails the risk for losing
data / diverging placement data etc.
With `force` flag, we control disabling the first worker node,
and with `async` flag we control whether the changes are done
via bg worker or immediately.
```SQL
SELECT citus_disable_node(nodename, nodeport, force boolean DEFAULT false, sync boolean DEFAULT false)
```
Where we can achieve all the following:
| Mode | Data loss possibility | Can run in 2PC | Handle multiple node failures | Immediately effective |
| --- |--- |--- |--- |--- |
| force:false, sync: false | false | true | true | false |
| force:false, sync: true | false | false | false | true |
| force:true, sync: false | true | true | true | false |
| force:true, sync: true | false | false | false | true |
Over time we have added significantly improved the support for objects to be propagated by Citus as to make scaling out the database more seamless. It became evident that there was a lot of code duplication that got into the codebase to implement the propagation.
This PR tries to reduce the amount of repeated code that is at most only slightly different. To make things worse, most of the differences were actually oversights instead of correct.
This Patch introduces 3 reusable sets of pre/post processing steps for respectively
- create
- alter
- drop
With the use of the common functionality we should have more coherent behaviour between different supported object by Citus.
Some steps either omit the Pre or Post processing step if they would not make sense to include.
All tests pass, only 1 test needed changing, foreign servers, as the dropping of foreign servers didn't implement support for dropping multiple foreign servers at once. Given the common approach correctly supports dropping of multiple objects, either distributed or not, the test that assumed it wouldn't work was now obsolete.
We have a mechanism which ensures that newly distributed
objects are recorded during `alter extension citus update`.
However, the logic was lacking "view"s. With this commit, we make
sure that existing views are also marked as distributed during
upgrade.