The new shard copy code that was created for shard splits has some
advantages over the old shard copy code. The old code was using
worker_append_table_to_shard, which wrote to disk twice. And it also
didn't use binary copy when that was possible. Both of these issues
were fixed in the new copy code. This PR starts using this new copy
logic also for shard moves, not just for shard splits.
On my local machine I created a single shard table like this.
```sql
set citus.shard_count = 1;
create table t(id bigint, a bigint);
select create_distributed_table('t', 'id');
INSERT into t(id, a) SELECT i, i from generate_series(1, 100000000) i;
```
I then turned `fsync` off to make sure I wasn't bottlenecked by disk.
Finally I moved this shard between nodes with `citus_move_shard_placement`
with `block_writes`.
Before this PR a move took ~127s, after this PR it took only ~38s. So for this
small test this resulted in spending ~70% less time.
And I also tried the same test for a table that contained large strings:
```sql
set citus.shard_count = 1;
create table t(id bigint, a bigint, content text);
select create_distributed_table('t', 'id');
INSERT into t(id, a, content) SELECT i, i, 'aunethautnehoautnheaotnuhetnohueoutnehotnuhetncouhaeohuaeochgrhgd.athbetndairgexdbuhaobulrhdbaetoausnetohuracehousncaoehuesousnaceohuenacouhancoexdaseohusnaetobuetnoduhasneouhaceohusnaoetcuhmsnaetohuacoeuhebtokteaoshetouhsanetouhaoug.lcuahesonuthaseauhcoerhuaoecuh.lg;rcydabsnetabuesabhenth' from generate_series(1, 20000000) i;
```
Before this commit, we required multiple copies of the
same stringInfo if we needed to append/prepend data to
the stringInfo. Now, we optionally get prefix/postfix.
For large string operations, this can save up to %10
memory.
Previously, CreateFixPartitionShardIndexNames() created all
the relevant query strings for all the shards, and executed
the large query string. And, in terms of the memory consumption,
this huge command (and its ExprContext generated while running
the command) is the main bottleneck/
With this change, we are reducing the total amount of memory
usage to almost 1/shard_count.
On my local machine, a distributed partitioned table with 120 partitions,
each 32 shards, the total memory consumption reduced from ~3GB
to ~0.1GB. And, the total execution time increased from ~28 seconds
to ~30 seconds. This seems like a good trade-off.
use RecurseObjectDependencies api to find if an object is citus depended
make vanilla tests runnable to see if citus_depended function is working correctly
* Remove if conditions with PG_VERSION_NUM < 13
* Remove server_above_twelve(&eleven) checks from tests
* Fix tests
* Remove pg12 and pg11 alternative test output files
* Remove pg12 specific normalization rules
* Some more if conditions in the code
* Change RemoteCollationIdExpression and some pg12/pg13 comments
* Remove some more normalization rules
* Blocking split setup
* Add missing type
* Missing API from Metadata Sync
* Shard Split e2e code
* Worker Split Copy DestReceiver skeleton
* Basic destreceiver code
* worker_split_copy UDF
* UDF calling
* Split points are text
* Isolate Tenant and Split Shard Unification
* Fixing executor and misc
* Reindent code
* Fixing UDF definitions
* Hello World Local Copy works
* Remote copy hello world works
* Local and Remote binary test
* Fixing text local copy and adding tests
* Hello World shard split works
* Negative tests
* Blocking Split workflow works
* Refactor
* Bug fix
* Reindent
* Cleaning up and adding comments
* Basic test for shard split workflow
* ReIndent
* Circle CI integration
* Removing include causing circle-ci build failure
* Remove SplitCopyDestReceiver and use PartitionedResultDestReceiver
* Add support for citus.enable_binary_protocol
* Reindent
* Fix build break
* Update Test
* Cleanup on catch
* Addressing open comments
* Update downgrade script and quote schema/table in COPY statement
* Fix metadata sync issue. Update regression test
* Isolation test and bug fix
* Add Isolation test, fix foreign constraint deadlock issue
* Misc code review comments
* Test name needing to be quoted
* Refactor code from review comments
* Explaining shardGroupSplitIntervalListList
* Fix upgrade & downgrade
* Fix broken test
* Test fix Round 2
* Fixing bug and modifying test appropriately
* Fully qualify copy udf name. Run Reindent
* Address PR comments
* Fix null handling when creating AuxiliaryStructures
* Ensure local copy is triggered in tests
* Limit max shards that can be created with split
* Test failure fix
* Remove split_mode and use shard_transfer_mode instead'
* Fix test failure
* Fix test failure
* Fixing permission issue when splitting non-superuser owned tables
* Fix test expected output
* Remove extra space
* Fix test
* attempt to fix test
* Addressing Marco's PR comment
* Only clean shards created by workflow
* Remove from merge
* Update test
Similar to #5897, one more step for running Citus with PG 15.
This PR at least make Citus run with PG 15. I have not tried running the tests with PG 15.
Shmem changes are based on 4f2400cb3f
Compile breaks are mostly due to #6008
This PR makes all of the features open source that were previously only
available in Citus Enterprise.
Features that this adds:
1. Non blocking shard moves/shard rebalancer
(`citus.logical_replication_timeout`)
2. Propagation of CREATE/DROP/ALTER ROLE statements
3. Propagation of GRANT statements
4. Propagation of CLUSTER statements
5. Propagation of ALTER DATABASE ... OWNER TO ...
6. Optimization for COPY when loading JSON to avoid double parsing of
the JSON object (`citus.skip_jsonb_validation_in_copy`)
7. Support for row level security
8. Support for `pg_dist_authinfo`, which allows storing different
authentication options for different users, e.g. you can store
passwords or certificates here.
9. Support for `pg_dist_poolinfo`, which allows using connection poolers
in between coordinator and workers
10. Tracking distributed query execution times using
citus_stat_statements (`citus.stat_statements_max`,
`citus.stat_statements_purge_interval`,
`citus.stat_statements_track`). This is disabled by default.
11. Blocking tenant_isolation
12. Support for `sslkey` and `sslcert` in `citus.node_conninfo`
Do not obtain AccessShareLock before acquiring the distributed locks.
Acquiring an AccessShareLock ensures that the relations which we are trying to get a distributed lock on will not be dropped in the time between when the LOCK command is issued and the LOCK commands are send to the worker. However, this also leads to distributed deadlocks in such scenarios:
```sql
-- for dist lock acquiring order coor, w1, w2
-- on w2
LOCK t1 IN ACCESS EXLUSIVE MODE;
-- acquire AccessShareLock locally on t1 to ensure it is not dropped while we get ready to distribute the lock
-- concurrently on w1
LOCK t1 IN ACCESS EXLUSIVE MODE;
-- acquire AccessShareLock locally on t1 to ensure it is not dropped while we get ready to distribute the lock
-- acquire dist lock on coor, w1, gets blocked on local AccessShareLock on w2
-- on w2 continuation of the execution above
-- starts to acquire dist locks and gets blocked on the coor by the lock acquired by w1
-- distributed deadlock
```
We opt for avoiding such deadlocks with the cost of the possibility of running into errors when the relations on which we are trying to acquire locks on get dropped.
It is often useful to be able to sync the metadata in parallel
across nodes.
Also citus_finalize_upgrade_to_citus11() uses
start_metadata_sync_to_primary_nodes() after this commit.
Note that this commit does not parallelize all pieces of node
activation or metadata syncing. Instead, it tries to parallelize
potenially large parts of metadata, which is the objects and
distributed tables (in general Citus tables).
In the future, it would be nice to sync the reference tables
in parallel across nodes.
Create ~720 distributed tables / ~23450 shards
```SQL
-- declaratively partitioned table
CREATE TABLE github_events_looooooooooooooong_name (
event_id bigint,
event_type text,
event_public boolean,
repo_id bigint,
payload jsonb,
repo jsonb,
actor jsonb,
org jsonb,
created_at timestamp
) PARTITION BY RANGE (created_at);
SELECT create_time_partitions(
table_name := 'github_events_looooooooooooooong_name',
partition_interval := '1 day',
end_at := now() + '24 months'
);
CREATE INDEX ON github_events_looooooooooooooong_name USING btree (event_id, event_type, event_public, repo_id);
SELECT create_distributed_table('github_events_looooooooooooooong_name', 'repo_id');
SET client_min_messages TO ERROR;
```
across 1 node: almost same as expected
```SQL
SELECT start_metadata_sync_to_primary_nodes();
Time: 15664.418 ms (00:15.664)
select start_metadata_sync_to_node(nodename,nodeport) from pg_dist_node;
Time: 14284.069 ms (00:14.284)
```
across 7 nodes: ~3.5x improvement
```SQL
SELECT start_metadata_sync_to_primary_nodes();
┌──────────────────────────────────────┐
│ start_metadata_sync_to_primary_nodes │
├──────────────────────────────────────┤
│ t │
└──────────────────────────────────────┘
(1 row)
Time: 25711.192 ms (00:25.711)
-- across 7 nodes
select start_metadata_sync_to_node(nodename,nodeport) from pg_dist_node;
Time: 82126.075 ms (01:22.126)
```
Breaking down #5899 into smaller PR-s
This particular PR changes the way TRUNCATE acquires distributed locks on the relations it is truncating to use the LOCK command instead of lock_relation_if_exists. This has the benefit of using pg's recursive locking logic it implements for the LOCK command instead of us having to resolve relation dependencies and lock them explicitly. While this does not directly affect truncate, it will allow us to generalize this locking logic to then log different relations where the pg recursive locking will become useful (e.g. locking views).
This implementation is a bit more complex that it needs to be due to pg not supporting locking foreign tables. We can however, still lock foreign tables with lock_relation_if_exists. So for a command:
TRUNCATE dist_table_1, dist_table_2, foreign_table_1, foreign_table_2, dist_table_3;
We generate and send the following command to all the workers in metadata:
```sql
SEL citus.enable_ddl_propagation TO FALSE;
LOCK dist_table_1, dist_table_2 IN ACCESS EXCLUSIVE MODE;
SELECT lock_relation_if_exists('foreign_table_1', 'ACCESS EXCLUSIVE');
SELECT lock_relation_if_exists('foreign_table_2', 'ACCESS EXCLUSIVE');
LOCK dist_table_3 IN ACCESS EXCLUSIVE MODE;
SEL citus.enable_ddl_propagation TO TRUE;
```
Note that we need to alternate between the lock command and lock_table_if_exists in order to preserve the TRUNCATE order of relations.
When pg supports locking foreign tables, we will be able to massive simplify this logic and send a single LOCK command.
We've had custom versions of Postgres its `foreach` macro which with a
hidden ListCell for quite some time now. People like these custom
macros, because they are easier to use and require less boilerplate.
This adds similar custom versions of Postgres its `forboth` macro. Now
you don't need ListCells anymore when looping over two lists at the same
time.
If a worker node is being added, a command is sent to get the server_id of the worker from the pg_dist_node_metadata table. If the worker's id is the same as the node executing the code, we will know the node is trying to add itself. If the node tries to add itself without specifying `groupid:=0` the operation will result in an error.
With this commit we've started to propagate sequences and shell
tables within the object dependency resolution. So, ensuring any
dependencies for any object will consider shell tables and sequences
as well. Separate logics for both shell tables and sequences have
been removed.
Since both shell tables and sequences logic were implemented as a
part of the metadata handling before that logic, we were propagating
them while syncing table metadata. With this commit we've divided
metadata (which means anything except shards thereafter) syncing
logic into multiple parts and implemented it either as a part of
ActivateNode. You can check the functions called in ActivateNode
to check definition of different metadata.
Definitions of start_metadata_sync_to_node and citus_activate_node
have also been updated. citus_activate_node will basically create
an active node with all metadata and reference table shards.
start_metadata_sync_to_node will be same with citus_activate_node
except replicating reference tables. stop_metadata_sync_to_node
will remove all the metadata. All of those UDFs need to be called
by superuser.
* Require superuser while activating a node
With this change, we require ActiveNode() (hence citus_add_node(),
citus_activate_node()) explicitly require for a superuser.
Before this commit, these functions were designed to work with
non-superuser roles with the relevent GRANTs given.
However, that is not a widely used way for calling the functions
above.
Due to possibility of non-super user calling the UDFs, they were
designed in a way that some commands were using some additional
short-lived superuser connections. That is:
(a) breaking transactional behavior (e.g., ROLLBACK
wouldn't fully rollback the whole transaction)
(b) Making it very complicated to reason about which
parts of the node activation goes over which connections,
and becoming vulnerable to deadlocks / visibility issues.
We prefer the background daemon to only sync node metadata. That's
why we move placement metadata changes from disable node to
activate node. With that, we can make sure that disable node
only changes node metadata, whereas activate node syncs all
the metadata changes. In essence, we already expect all
nodes to be up when a node is activated. So, this does not change
the behavior much.
With this commit, fix_partition_shard_index_names()
works significantly faster.
For example,
32 shards, 365 partitions, 5 indexes drop from ~120 seconds to ~44 seconds
32 shards, 1095 partitions, 5 indexes drop from ~600 seconds to ~265 seconds
`queryStringList` can be really long, because it may contain #partitions * #indexes entries.
Before this change, we were actually going through the executor where each command
in the query string triggers 1 round trip per entry in queryStringList.
The aim of this commit is to avoid the round-trips by creating a single query string.
I first simply tried sending `q1;q2;..;qn` . However, the executor is designed to
handle `q1;q2;..;qn` type of query executions via the infrastructure mentioned
above (e.g., by tracking the query indexes in the list and doing 1 statement
per round trip).
One another option could have been to change the executor such that only track
the query index when `queryStringList` is provided not with queryString
including multiple `;`s . That is (a) more work (b) could cause weird edge
cases with failure handling (c) felt like coding a special case in to the executor
As of master branch, Citus does all the modifications to replicated tables
(e.g., reference tables and distributed tables with replication factor > 1),
via 2PC and avoids any shardstate=3. As a side-effect of those changes,
handling node failures for replicated tables change.
With this PR, when one (or multiple) node failures happen, the users would
see query errors on modifications. If the problem is intermitant, that's OK,
once the node failure(s) recover by themselves, the modification queries would
succeed. If the node failure(s) are permenant, the users should call
`SELECT citus_disable_node(...)` to disable the node. As soon as the node is
disabled, modification would start to succeed. However, now the old node gets
behind. It means that, when the node is up again, the placements should be
re-created on the node. First, use `SELECT citus_activate_node()`. Then, use
`SELECT replicate_table_shards(...)` to replicate the missing placements on
the re-activated node.
Before this commit, we acquire the metadata locks on the reference
tables while removing/disabling a node on all the MX nodes.
Although it has some marginal benefits, such as a concurrent
modification during remove/disable node blocks, instead of erroring
out, the drawbacks seems worse. Both citus_remove_node and citus_disable_node
are not tolerant to multiple node failures.
With this commit, we relax the locks. The implication is that while
a node is removed/disabled, users might see query errors. On the
other hand, this change becomes removing/disabling nodes more
tolerant to multiple node failures.
- [x] Add some more regression test coverage
- [x] Make sure returning works fine in case of
local execution + remote execution
(task->partiallyLocalOrRemote works as expected, already added tests)
- [x] Implement locking properly (and add isolation tests)
- [x] We do #shardcount round-trips on `SerializeNonCommutativeWrites`.
We made it a single round-trip.
- [x] Acquire locks for subselects on the workers & add isolation tests
- [x] Add a GUC to prevent modification from the workers, hence increase the
coordinator-only throughput
- The performance slightly drops (~%15), unless
`citus.allow_modifications_from_workers_to_replicated_tables`
is set to false
Before this commit, we required the user to be owner of the shard/table
in order to call lock_shard_resources.
However, that is too restrictive. We can have users with GRANTS
to the table who are not owners of the tables/shards.
With this commit, we allow such patterns.
In the past, we allowed users to manually switch to 1PC
(e.g., one phase commit). However, with this commit, we
don't. All multi-shard modifications are done via 2PC.
Add/fix tests
Fix creating partitions
Add test for mx - partition creating case
Enable cascading to partitioned tables
Fix mx partition adding test
Fix cascading through fkeys
Style
Disable converting with non-inherited fkeys
Fix detach bug
Early return in case of cascade & Add tests
Style
Fix undistribute_table bug & Fix test outputs
Remove RemovePartitionRelationIds
Test with undistribute_table
Add test for mx+convert+undistribute
Remove redundant usage of CreatePartitionedCitusLocalTable
Add some comments
Introduce bulk functions for generating attach/detach partition commands
Fix: Convert partitioned tables after adding fkey
Change the error message for partitions
Introduce function ErrorIfPartitionTableAddedToMetadata
Polish attach/detach command generation functions
Use time_partitions for testing
Move mx tests to citus_local_tables_mx
Add new partitioned table to cascade test
Add test with time series management UDFs
Fix test output
Fix: Assertion fail on relation access tracking
Style
Refactor creating partitioned citus local tables
Remove CreatePartitionedCitusLocalTable
Style
Error out if converting multi-level table
Revert some old tests
Error out adding partitioned partition
Polish
Polish/address
Fix create table partition of case
Use CascadeOperationForRelationIdList if no cascade needed
Fix create partition bug
Revert / Add new tests to mx
Style
Fix dropping fkey bug
Add test with IF NOT EXISTS
Convert to CLT when doing ATTACH PARTITION
Add comments
Add more tests with time series management
Edit the error message for converting the child
Use OR instead of AND in ErrorIfUnsupportedAlterTableStmt
Edit/improve tests
Disable ddl prop when dropping default column definitions
Disable/enable ddl prop just before/after the command
Add comment
Add sequence test
Add trigger test
Remove NeedCascadeViaForeignKeys
Add one more insert to sequence test
Add comment
Style
Fix test output shard ids
Update comments
Disable creating fkey on partitions
Move partition check to CreateCitusLocalTable
Add comment
Add check for attachingmulti-level partition
Add test for pg_constraint
Check pg_dist_partition in tests
Add test inserting on the worker
* Add udf to include shardId in broken partition shard index names
* Address reviews: rename index such that operations can be done on it
* More comprehensive index tests
* Final touches and formatting