From 40105bf1fcee73319b056215b3b737d9767f73a3 Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Thu, 2 Mar 2023 16:04:10 +0300 Subject: [PATCH 1/9] Make single_node.sql re-runnable --- src/test/regress/citus_tests/run_test.py | 1 + src/test/regress/expected/single_node.out | 38 +++++++++++---------- src/test/regress/expected/single_node_0.out | 38 +++++++++++---------- src/test/regress/sql/single_node.sql | 14 ++++---- 4 files changed, 49 insertions(+), 42 deletions(-) diff --git a/src/test/regress/citus_tests/run_test.py b/src/test/regress/citus_tests/run_test.py index 4e4ee6c43..e21d47a45 100755 --- a/src/test/regress/citus_tests/run_test.py +++ b/src/test/regress/citus_tests/run_test.py @@ -84,6 +84,7 @@ if __name__ == "__main__": ), "create_role_propagation": TestDeps(None, ["multi_cluster_management"]), "single_node_enterprise": TestDeps(None), + "single_node": TestDeps(None), "multi_extension": TestDeps(None, repeatable=False), "multi_test_helpers": TestDeps(None), "multi_insert_select": TestDeps("base_schedule"), diff --git a/src/test/regress/expected/single_node.out b/src/test/regress/expected/single_node.out index 52d087b18..7f152decd 100644 --- a/src/test/regress/expected/single_node.out +++ b/src/test/regress/expected/single_node.out @@ -1829,6 +1829,7 @@ SELECT pg_sleep(0.1); -- backend(s) that execute on the shards will be terminated -- so show that there no internal backends SET search_path TO single_node; +SET citus.next_shard_id TO 90730500; SELECT count(*) from should_commit; count --------------------------------------------------------------------- @@ -1882,6 +1883,7 @@ BEGIN; ROLLBACK; \c - - - :master_port SET search_path TO single_node; +SET citus.next_shard_id TO 90830500; -- simulate that even if there is no connection slots -- to connect, Citus can switch to local execution SET citus.force_max_query_parallelization TO false; @@ -2106,10 +2108,10 @@ NOTICE: executing the command locally: SELECT count(DISTINCT (key)::text) AS co SET citus.shard_replication_factor TO 1; CREATE TABLE test_disabling_drop_and_truncate (a int); SELECT create_distributed_table('test_disabling_drop_and_truncate', 'a'); -NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102040, 'single_node', 'CREATE TABLE single_node.test_disabling_drop_and_truncate (a integer) USING heap');SELECT worker_apply_shard_ddl_command (102040, 'single_node', 'ALTER TABLE single_node.test_disabling_drop_and_truncate OWNER TO postgres') -NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102041, 'single_node', 'CREATE TABLE single_node.test_disabling_drop_and_truncate (a integer) USING heap');SELECT worker_apply_shard_ddl_command (102041, 'single_node', 'ALTER TABLE single_node.test_disabling_drop_and_truncate OWNER TO postgres') -NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102042, 'single_node', 'CREATE TABLE single_node.test_disabling_drop_and_truncate (a integer) USING heap');SELECT worker_apply_shard_ddl_command (102042, 'single_node', 'ALTER TABLE single_node.test_disabling_drop_and_truncate OWNER TO postgres') -NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102043, 'single_node', 'CREATE TABLE single_node.test_disabling_drop_and_truncate (a integer) USING heap');SELECT worker_apply_shard_ddl_command (102043, 'single_node', 'ALTER TABLE single_node.test_disabling_drop_and_truncate OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90830500, 'single_node', 'CREATE TABLE single_node.test_disabling_drop_and_truncate (a integer) USING heap');SELECT worker_apply_shard_ddl_command (90830500, 'single_node', 'ALTER TABLE single_node.test_disabling_drop_and_truncate OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90830501, 'single_node', 'CREATE TABLE single_node.test_disabling_drop_and_truncate (a integer) USING heap');SELECT worker_apply_shard_ddl_command (90830501, 'single_node', 'ALTER TABLE single_node.test_disabling_drop_and_truncate OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90830502, 'single_node', 'CREATE TABLE single_node.test_disabling_drop_and_truncate (a integer) USING heap');SELECT worker_apply_shard_ddl_command (90830502, 'single_node', 'ALTER TABLE single_node.test_disabling_drop_and_truncate OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90830503, 'single_node', 'CREATE TABLE single_node.test_disabling_drop_and_truncate (a integer) USING heap');SELECT worker_apply_shard_ddl_command (90830503, 'single_node', 'ALTER TABLE single_node.test_disabling_drop_and_truncate OWNER TO postgres') create_distributed_table --------------------------------------------------------------------- @@ -2117,24 +2119,24 @@ NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1 SET citus.enable_manual_changes_to_shards TO off; -- these should error out -DROP TABLE test_disabling_drop_and_truncate_102040; -ERROR: cannot modify "test_disabling_drop_and_truncate_102040" because it is a shard of a distributed table +DROP TABLE test_disabling_drop_and_truncate_90830500; +ERROR: cannot modify "test_disabling_drop_and_truncate_90830500" because it is a shard of a distributed table HINT: Use the distributed table or set citus.enable_manual_changes_to_shards to on to modify shards directly -TRUNCATE TABLE test_disabling_drop_and_truncate_102040; -ERROR: cannot modify "test_disabling_drop_and_truncate_102040" because it is a shard of a distributed table +TRUNCATE TABLE test_disabling_drop_and_truncate_90830500; +ERROR: cannot modify "test_disabling_drop_and_truncate_90830500" because it is a shard of a distributed table HINT: Use the distributed table or set citus.enable_manual_changes_to_shards to on to modify shards directly RESET citus.enable_manual_changes_to_shards ; -- these should work as expected -TRUNCATE TABLE test_disabling_drop_and_truncate_102040; -DROP TABLE test_disabling_drop_and_truncate_102040; +TRUNCATE TABLE test_disabling_drop_and_truncate_90830500; +DROP TABLE test_disabling_drop_and_truncate_90830500; DROP TABLE test_disabling_drop_and_truncate; -- test creating distributed or reference tables from shards CREATE TABLE test_creating_distributed_relation_table_from_shard (a int); SELECT create_distributed_table('test_creating_distributed_relation_table_from_shard', 'a'); -NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102044, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) USING heap');SELECT worker_apply_shard_ddl_command (102044, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres') -NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102045, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) USING heap');SELECT worker_apply_shard_ddl_command (102045, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres') -NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102046, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) USING heap');SELECT worker_apply_shard_ddl_command (102046, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres') -NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102047, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) USING heap');SELECT worker_apply_shard_ddl_command (102047, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90830504, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) USING heap');SELECT worker_apply_shard_ddl_command (90830504, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90830505, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) USING heap');SELECT worker_apply_shard_ddl_command (90830505, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90830506, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) USING heap');SELECT worker_apply_shard_ddl_command (90830506, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90830507, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) USING heap');SELECT worker_apply_shard_ddl_command (90830507, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres') create_distributed_table --------------------------------------------------------------------- @@ -2142,11 +2144,11 @@ NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1 -- these should error because shards cannot be used to: -- create distributed table -SELECT create_distributed_table('test_creating_distributed_relation_table_from_shard_102044', 'a'); -ERROR: relation "test_creating_distributed_relation_table_from_shard_102044" is a shard relation +SELECT create_distributed_table('test_creating_distributed_relation_table_from_shard_90830504', 'a'); +ERROR: relation "test_creating_distributed_relation_table_from_shard_90830504" is a shard relation -- create reference table -SELECT create_reference_table('test_creating_distributed_relation_table_from_shard_102044'); -ERROR: relation "test_creating_distributed_relation_table_from_shard_102044" is a shard relation +SELECT create_reference_table('test_creating_distributed_relation_table_from_shard_90830504'); +ERROR: relation "test_creating_distributed_relation_table_from_shard_90830504" is a shard relation RESET citus.shard_replication_factor; DROP TABLE test_creating_distributed_relation_table_from_shard; -- lets flush the copy often to make sure everyhing is fine diff --git a/src/test/regress/expected/single_node_0.out b/src/test/regress/expected/single_node_0.out index 247b8839d..a21cdd28a 100644 --- a/src/test/regress/expected/single_node_0.out +++ b/src/test/regress/expected/single_node_0.out @@ -1829,6 +1829,7 @@ SELECT pg_sleep(0.1); -- backend(s) that execute on the shards will be terminated -- so show that there no internal backends SET search_path TO single_node; +SET citus.next_shard_id TO 90730500; SELECT count(*) from should_commit; count --------------------------------------------------------------------- @@ -1882,6 +1883,7 @@ BEGIN; ROLLBACK; \c - - - :master_port SET search_path TO single_node; +SET citus.next_shard_id TO 90830500; -- simulate that even if there is no connection slots -- to connect, Citus can switch to local execution SET citus.force_max_query_parallelization TO false; @@ -2106,10 +2108,10 @@ NOTICE: executing the command locally: SELECT count(DISTINCT (key)::text) AS co SET citus.shard_replication_factor TO 1; CREATE TABLE test_disabling_drop_and_truncate (a int); SELECT create_distributed_table('test_disabling_drop_and_truncate', 'a'); -NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102040, 'single_node', 'CREATE TABLE single_node.test_disabling_drop_and_truncate (a integer) USING heap');SELECT worker_apply_shard_ddl_command (102040, 'single_node', 'ALTER TABLE single_node.test_disabling_drop_and_truncate OWNER TO postgres') -NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102041, 'single_node', 'CREATE TABLE single_node.test_disabling_drop_and_truncate (a integer) USING heap');SELECT worker_apply_shard_ddl_command (102041, 'single_node', 'ALTER TABLE single_node.test_disabling_drop_and_truncate OWNER TO postgres') -NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102042, 'single_node', 'CREATE TABLE single_node.test_disabling_drop_and_truncate (a integer) USING heap');SELECT worker_apply_shard_ddl_command (102042, 'single_node', 'ALTER TABLE single_node.test_disabling_drop_and_truncate OWNER TO postgres') -NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102043, 'single_node', 'CREATE TABLE single_node.test_disabling_drop_and_truncate (a integer) USING heap');SELECT worker_apply_shard_ddl_command (102043, 'single_node', 'ALTER TABLE single_node.test_disabling_drop_and_truncate OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90830500, 'single_node', 'CREATE TABLE single_node.test_disabling_drop_and_truncate (a integer) USING heap');SELECT worker_apply_shard_ddl_command (90830500, 'single_node', 'ALTER TABLE single_node.test_disabling_drop_and_truncate OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90830501, 'single_node', 'CREATE TABLE single_node.test_disabling_drop_and_truncate (a integer) USING heap');SELECT worker_apply_shard_ddl_command (90830501, 'single_node', 'ALTER TABLE single_node.test_disabling_drop_and_truncate OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90830502, 'single_node', 'CREATE TABLE single_node.test_disabling_drop_and_truncate (a integer) USING heap');SELECT worker_apply_shard_ddl_command (90830502, 'single_node', 'ALTER TABLE single_node.test_disabling_drop_and_truncate OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90830503, 'single_node', 'CREATE TABLE single_node.test_disabling_drop_and_truncate (a integer) USING heap');SELECT worker_apply_shard_ddl_command (90830503, 'single_node', 'ALTER TABLE single_node.test_disabling_drop_and_truncate OWNER TO postgres') create_distributed_table --------------------------------------------------------------------- @@ -2117,24 +2119,24 @@ NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1 SET citus.enable_manual_changes_to_shards TO off; -- these should error out -DROP TABLE test_disabling_drop_and_truncate_102040; -ERROR: cannot modify "test_disabling_drop_and_truncate_102040" because it is a shard of a distributed table +DROP TABLE test_disabling_drop_and_truncate_90830500; +ERROR: cannot modify "test_disabling_drop_and_truncate_90830500" because it is a shard of a distributed table HINT: Use the distributed table or set citus.enable_manual_changes_to_shards to on to modify shards directly -TRUNCATE TABLE test_disabling_drop_and_truncate_102040; -ERROR: cannot modify "test_disabling_drop_and_truncate_102040" because it is a shard of a distributed table +TRUNCATE TABLE test_disabling_drop_and_truncate_90830500; +ERROR: cannot modify "test_disabling_drop_and_truncate_90830500" because it is a shard of a distributed table HINT: Use the distributed table or set citus.enable_manual_changes_to_shards to on to modify shards directly RESET citus.enable_manual_changes_to_shards ; -- these should work as expected -TRUNCATE TABLE test_disabling_drop_and_truncate_102040; -DROP TABLE test_disabling_drop_and_truncate_102040; +TRUNCATE TABLE test_disabling_drop_and_truncate_90830500; +DROP TABLE test_disabling_drop_and_truncate_90830500; DROP TABLE test_disabling_drop_and_truncate; -- test creating distributed or reference tables from shards CREATE TABLE test_creating_distributed_relation_table_from_shard (a int); SELECT create_distributed_table('test_creating_distributed_relation_table_from_shard', 'a'); -NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102044, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) USING heap');SELECT worker_apply_shard_ddl_command (102044, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres') -NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102045, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) USING heap');SELECT worker_apply_shard_ddl_command (102045, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres') -NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102046, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) USING heap');SELECT worker_apply_shard_ddl_command (102046, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres') -NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102047, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) USING heap');SELECT worker_apply_shard_ddl_command (102047, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90830504, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) USING heap');SELECT worker_apply_shard_ddl_command (90830504, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90830505, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) USING heap');SELECT worker_apply_shard_ddl_command (90830505, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90830506, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) USING heap');SELECT worker_apply_shard_ddl_command (90830506, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90830507, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) USING heap');SELECT worker_apply_shard_ddl_command (90830507, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres') create_distributed_table --------------------------------------------------------------------- @@ -2142,11 +2144,11 @@ NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1 -- these should error because shards cannot be used to: -- create distributed table -SELECT create_distributed_table('test_creating_distributed_relation_table_from_shard_102044', 'a'); -ERROR: relation "test_creating_distributed_relation_table_from_shard_102044" is a shard relation +SELECT create_distributed_table('test_creating_distributed_relation_table_from_shard_90830504', 'a'); +ERROR: relation "test_creating_distributed_relation_table_from_shard_90830504" is a shard relation -- create reference table -SELECT create_reference_table('test_creating_distributed_relation_table_from_shard_102044'); -ERROR: relation "test_creating_distributed_relation_table_from_shard_102044" is a shard relation +SELECT create_reference_table('test_creating_distributed_relation_table_from_shard_90830504'); +ERROR: relation "test_creating_distributed_relation_table_from_shard_90830504" is a shard relation RESET citus.shard_replication_factor; DROP TABLE test_creating_distributed_relation_table_from_shard; -- lets flush the copy often to make sure everyhing is fine diff --git a/src/test/regress/sql/single_node.sql b/src/test/regress/sql/single_node.sql index 7bbbda895..3419025af 100644 --- a/src/test/regress/sql/single_node.sql +++ b/src/test/regress/sql/single_node.sql @@ -975,6 +975,7 @@ SELECT pg_sleep(0.1); -- backend(s) that execute on the shards will be terminated -- so show that there no internal backends SET search_path TO single_node; +SET citus.next_shard_id TO 90730500; SELECT count(*) from should_commit; SELECT count(*) FROM pg_stat_activity WHERE application_name LIKE 'citus_internal%'; SELECT get_all_active_client_backend_count(); @@ -998,6 +999,7 @@ ROLLBACK; \c - - - :master_port SET search_path TO single_node; +SET citus.next_shard_id TO 90830500; -- simulate that even if there is no connection slots -- to connect, Citus can switch to local execution @@ -1069,14 +1071,14 @@ SELECT create_distributed_table('test_disabling_drop_and_truncate', 'a'); SET citus.enable_manual_changes_to_shards TO off; -- these should error out -DROP TABLE test_disabling_drop_and_truncate_102040; -TRUNCATE TABLE test_disabling_drop_and_truncate_102040; +DROP TABLE test_disabling_drop_and_truncate_90830500; +TRUNCATE TABLE test_disabling_drop_and_truncate_90830500; RESET citus.enable_manual_changes_to_shards ; -- these should work as expected -TRUNCATE TABLE test_disabling_drop_and_truncate_102040; -DROP TABLE test_disabling_drop_and_truncate_102040; +TRUNCATE TABLE test_disabling_drop_and_truncate_90830500; +DROP TABLE test_disabling_drop_and_truncate_90830500; DROP TABLE test_disabling_drop_and_truncate; @@ -1086,10 +1088,10 @@ SELECT create_distributed_table('test_creating_distributed_relation_table_from_s -- these should error because shards cannot be used to: -- create distributed table -SELECT create_distributed_table('test_creating_distributed_relation_table_from_shard_102044', 'a'); +SELECT create_distributed_table('test_creating_distributed_relation_table_from_shard_90830504', 'a'); -- create reference table -SELECT create_reference_table('test_creating_distributed_relation_table_from_shard_102044'); +SELECT create_reference_table('test_creating_distributed_relation_table_from_shard_90830504'); RESET citus.shard_replication_factor; DROP TABLE test_creating_distributed_relation_table_from_shard; From a9820e96a3bd7ec08f5d734f81b07bfe835568ef Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Thu, 2 Mar 2023 12:42:40 +0300 Subject: [PATCH 2/9] Make single_node_truncate.sql re-runnable First of all, this commit sets next_shard_id for single_node_truncate.sql because shard ids in the test output were changing whenever we modify a prior test file. Then the flaky test detector started complaining about single_node_truncate.sql. We fix that by specifying the correct test dependency for it in run_test.py. --- src/test/regress/citus_tests/run_test.py | 1 + .../regress/expected/single_node_truncate.out | 71 ++++++++++--------- src/test/regress/sql/single_node_truncate.sql | 1 + 3 files changed, 38 insertions(+), 35 deletions(-) diff --git a/src/test/regress/citus_tests/run_test.py b/src/test/regress/citus_tests/run_test.py index e21d47a45..74c5ed31b 100755 --- a/src/test/regress/citus_tests/run_test.py +++ b/src/test/regress/citus_tests/run_test.py @@ -85,6 +85,7 @@ if __name__ == "__main__": "create_role_propagation": TestDeps(None, ["multi_cluster_management"]), "single_node_enterprise": TestDeps(None), "single_node": TestDeps(None), + "single_node_truncate": TestDeps(None), "multi_extension": TestDeps(None, repeatable=False), "multi_test_helpers": TestDeps(None), "multi_insert_select": TestDeps("base_schedule"), diff --git a/src/test/regress/expected/single_node_truncate.out b/src/test/regress/expected/single_node_truncate.out index 7d94dc744..18883a4df 100644 --- a/src/test/regress/expected/single_node_truncate.out +++ b/src/test/regress/expected/single_node_truncate.out @@ -1,6 +1,7 @@ CREATE SCHEMA single_node_truncate; SET search_path TO single_node_truncate; SET citus.shard_replication_factor TO 1; +SET citus.next_shard_id TO 91630500; -- helper view that prints out local table names and sizes in the schema CREATE VIEW table_sizes AS SELECT @@ -28,12 +29,12 @@ CREATE TABLE citus_local(id int, ref_id int REFERENCES ref(id)); INSERT INTO citus_local SELECT x,x FROM generate_series(1,10000) x; -- verify that shell tables for citus local tables are empty SELECT * FROM table_sizes; - name | has_data + name | has_data --------------------------------------------------------------------- - citus_local | f - citus_local_102049 | t - ref | t - ref_102048 | t + citus_local | f + citus_local_91630501 | t + ref | t + ref_91630500 | t (4 rows) -- verify that this UDF is noop on Citus local tables @@ -44,12 +45,12 @@ SELECT truncate_local_data_after_distributing_table('citus_local'); (1 row) SELECT * FROM table_sizes; - name | has_data + name | has_data --------------------------------------------------------------------- - citus_local | f - citus_local_102049 | t - ref | t - ref_102048 | t + citus_local | f + citus_local_91630501 | t + ref | t + ref_91630500 | t (4 rows) -- test that we allow cascading truncates to citus local tables @@ -62,12 +63,12 @@ NOTICE: truncate cascades to table "citus_local" (1 row) SELECT * FROM table_sizes; - name | has_data + name | has_data --------------------------------------------------------------------- - citus_local | f - citus_local_102049 | t - ref | f - ref_102048 | t + citus_local | f + citus_local_91630501 | t + ref | f + ref_91630500 | t (4 rows) ROLLBACK; @@ -95,17 +96,17 @@ NOTICE: truncate cascades to table "dist" (1 row) SELECT * FROM table_sizes; - name | has_data + name | has_data --------------------------------------------------------------------- - citus_local | f - citus_local_102049 | t - dist | f - dist_102051 | t - dist_102052 | t - dist_102053 | t - dist_102054 | t - ref | f - ref_102048 | t + citus_local | f + citus_local_91630501 | t + dist | f + dist_91630503 | t + dist_91630504 | t + dist_91630505 | t + dist_91630506 | t + ref | f + ref_91630500 | t (9 rows) ROLLBACK; @@ -118,17 +119,17 @@ SELECT truncate_local_data_after_distributing_table('dist'); (1 row) SELECT * FROM table_sizes; - name | has_data + name | has_data --------------------------------------------------------------------- - citus_local | f - citus_local_102049 | t - dist | f - dist_102051 | t - dist_102052 | t - dist_102053 | t - dist_102054 | t - ref | t - ref_102048 | t + citus_local | f + citus_local_91630501 | t + dist | f + dist_91630503 | t + dist_91630504 | t + dist_91630505 | t + dist_91630506 | t + ref | t + ref_91630500 | t (9 rows) ROLLBACK; diff --git a/src/test/regress/sql/single_node_truncate.sql b/src/test/regress/sql/single_node_truncate.sql index 5b555ff91..faaae7858 100644 --- a/src/test/regress/sql/single_node_truncate.sql +++ b/src/test/regress/sql/single_node_truncate.sql @@ -1,6 +1,7 @@ CREATE SCHEMA single_node_truncate; SET search_path TO single_node_truncate; SET citus.shard_replication_factor TO 1; +SET citus.next_shard_id TO 91630500; -- helper view that prints out local table names and sizes in the schema CREATE VIEW table_sizes AS From dc7fa0d5afa552128ce4036f1a060dc0fd26834c Mon Sep 17 00:00:00 2001 From: Gledis Zeneli <43916939+gledis69@users.noreply.github.com> Date: Fri, 3 Mar 2023 21:06:59 +0300 Subject: [PATCH 3/9] Fix multiple output version arbitrary config tests (#6744) With this small change, arbitrary config tests can have multiple acceptable correct outputs. For an arbitrary config tests named `t`, now you can define `expected/t.out`, `expected/t_0.out`, `expected/t_1.out` etc and the test will succeed if the output of `sql/t.sql` is equal to any of the `t.out` or `t_{0, 1, ...}.out` files. --- .../arbitrary_configs/citus_arbitrary_configs.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/test/regress/citus_tests/arbitrary_configs/citus_arbitrary_configs.py b/src/test/regress/citus_tests/arbitrary_configs/citus_arbitrary_configs.py index b7c05838c..6c9863434 100755 --- a/src/test/regress/citus_tests/arbitrary_configs/citus_arbitrary_configs.py +++ b/src/test/regress/citus_tests/arbitrary_configs/citus_arbitrary_configs.py @@ -151,14 +151,24 @@ def copy_test_files_with_names(test_names, sql_dir_path, expected_dir_path, conf continue sql_name = os.path.join("./sql", test_name + ".sql") - output_name = os.path.join("./expected", test_name + ".out") - shutil.copy(sql_name, sql_dir_path) - if os.path.isfile(output_name): + + # for a test named , all files: + # .out, _0.out, _1.out ... + # are considered as valid outputs for the test + # by the testing tool (pg_regress) + # so copy such files to the testing directory + output_name = os.path.join("./expected", test_name + ".out") + alt_output_version_no = 0 + while os.path.isfile(output_name): # it might be the first time we run this test and the expected file # might not be there yet, in that case, we don't want to error out # while copying the file. shutil.copy(output_name, expected_dir_path) + output_name = os.path.join( + "./expected", f"{test_name}_{alt_output_version_no}.out" + ) + alt_output_version_no += 1 def run_tests(configs, sql_schedule_name): From b489d763e169b4b2ed4accc3a2337e97a58fd716 Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Mon, 6 Mar 2023 10:53:12 +0100 Subject: [PATCH 4/9] Use pg_total_relation_size in citus_shards (#6748) DESCRIPTION: Correctly report shard size in citus_shards view When looking at citus_shards, people are interested in the actual size that all the data related to the shard takes up on disk. `pg_total_relation_size` is the function to use for that purpose. The previously used `pg_relation_size` does not include indexes or TOAST. Especially the missing toast can have enormous impact on the size of the shown data. --- .../distributed/metadata/metadata_utility.c | 2 +- src/test/regress/citus_tests/run_test.py | 9 + .../citus_update_table_statistics.out | 8 +- .../expected/multi_mx_create_table.out | 428 +++++++++--------- 4 files changed, 228 insertions(+), 219 deletions(-) diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index dba509681..98ffe1b7d 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -985,7 +985,7 @@ AppendShardSizeQuery(StringInfo selectQuery, ShardInterval *shardInterval) appendStringInfo(selectQuery, "SELECT " UINT64_FORMAT " AS shard_id, ", shardId); appendStringInfo(selectQuery, "%s AS shard_name, ", quotedShardName); - appendStringInfo(selectQuery, PG_RELATION_SIZE_FUNCTION, quotedShardName); + appendStringInfo(selectQuery, PG_TOTAL_RELATION_SIZE_FUNCTION, quotedShardName); } diff --git a/src/test/regress/citus_tests/run_test.py b/src/test/regress/citus_tests/run_test.py index 74c5ed31b..a4b303e90 100755 --- a/src/test/regress/citus_tests/run_test.py +++ b/src/test/regress/citus_tests/run_test.py @@ -89,6 +89,15 @@ if __name__ == "__main__": "multi_extension": TestDeps(None, repeatable=False), "multi_test_helpers": TestDeps(None), "multi_insert_select": TestDeps("base_schedule"), + "multi_mx_create_table": TestDeps( + None, + [ + "multi_test_helpers_superuser", + "multi_mx_node_metadata", + "multi_cluster_management", + "multi_mx_function_table_reference", + ], + ), } if not (test_file_name or test_file_path): diff --git a/src/test/regress/expected/citus_update_table_statistics.out b/src/test/regress/expected/citus_update_table_statistics.out index 69676c1bf..031104c53 100644 --- a/src/test/regress/expected/citus_update_table_statistics.out +++ b/src/test/regress/expected/citus_update_table_statistics.out @@ -64,11 +64,11 @@ SET citus.multi_shard_modify_mode TO sequential; SELECT citus_update_table_statistics('test_table_statistics_hash'); NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT 981000 AS shard_id, 'public.test_table_statistics_hash_981000' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981000') UNION ALL SELECT 981001 AS shard_id, 'public.test_table_statistics_hash_981001' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981001') UNION ALL SELECT 981002 AS shard_id, 'public.test_table_statistics_hash_981002' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981002') UNION ALL SELECT 981003 AS shard_id, 'public.test_table_statistics_hash_981003' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981003') UNION ALL SELECT 981004 AS shard_id, 'public.test_table_statistics_hash_981004' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981004') UNION ALL SELECT 981005 AS shard_id, 'public.test_table_statistics_hash_981005' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981005') UNION ALL SELECT 981006 AS shard_id, 'public.test_table_statistics_hash_981006' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981006') UNION ALL SELECT 981007 AS shard_id, 'public.test_table_statistics_hash_981007' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981007') UNION ALL SELECT 0::bigint, NULL::text, 0::bigint; +NOTICE: issuing SELECT 981000 AS shard_id, 'public.test_table_statistics_hash_981000' AS shard_name, pg_total_relation_size('public.test_table_statistics_hash_981000') UNION ALL SELECT 981001 AS shard_id, 'public.test_table_statistics_hash_981001' AS shard_name, pg_total_relation_size('public.test_table_statistics_hash_981001') UNION ALL SELECT 981002 AS shard_id, 'public.test_table_statistics_hash_981002' AS shard_name, pg_total_relation_size('public.test_table_statistics_hash_981002') UNION ALL SELECT 981003 AS shard_id, 'public.test_table_statistics_hash_981003' AS shard_name, pg_total_relation_size('public.test_table_statistics_hash_981003') UNION ALL SELECT 981004 AS shard_id, 'public.test_table_statistics_hash_981004' AS shard_name, pg_total_relation_size('public.test_table_statistics_hash_981004') UNION ALL SELECT 981005 AS shard_id, 'public.test_table_statistics_hash_981005' AS shard_name, pg_total_relation_size('public.test_table_statistics_hash_981005') UNION ALL SELECT 981006 AS shard_id, 'public.test_table_statistics_hash_981006' AS shard_name, pg_total_relation_size('public.test_table_statistics_hash_981006') UNION ALL SELECT 981007 AS shard_id, 'public.test_table_statistics_hash_981007' AS shard_name, pg_total_relation_size('public.test_table_statistics_hash_981007') UNION ALL SELECT 0::bigint, NULL::text, 0::bigint; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT 981000 AS shard_id, 'public.test_table_statistics_hash_981000' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981000') UNION ALL SELECT 981001 AS shard_id, 'public.test_table_statistics_hash_981001' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981001') UNION ALL SELECT 981002 AS shard_id, 'public.test_table_statistics_hash_981002' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981002') UNION ALL SELECT 981003 AS shard_id, 'public.test_table_statistics_hash_981003' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981003') UNION ALL SELECT 981004 AS shard_id, 'public.test_table_statistics_hash_981004' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981004') UNION ALL SELECT 981005 AS shard_id, 'public.test_table_statistics_hash_981005' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981005') UNION ALL SELECT 981006 AS shard_id, 'public.test_table_statistics_hash_981006' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981006') UNION ALL SELECT 981007 AS shard_id, 'public.test_table_statistics_hash_981007' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981007') UNION ALL SELECT 0::bigint, NULL::text, 0::bigint; +NOTICE: issuing SELECT 981000 AS shard_id, 'public.test_table_statistics_hash_981000' AS shard_name, pg_total_relation_size('public.test_table_statistics_hash_981000') UNION ALL SELECT 981001 AS shard_id, 'public.test_table_statistics_hash_981001' AS shard_name, pg_total_relation_size('public.test_table_statistics_hash_981001') UNION ALL SELECT 981002 AS shard_id, 'public.test_table_statistics_hash_981002' AS shard_name, pg_total_relation_size('public.test_table_statistics_hash_981002') UNION ALL SELECT 981003 AS shard_id, 'public.test_table_statistics_hash_981003' AS shard_name, pg_total_relation_size('public.test_table_statistics_hash_981003') UNION ALL SELECT 981004 AS shard_id, 'public.test_table_statistics_hash_981004' AS shard_name, pg_total_relation_size('public.test_table_statistics_hash_981004') UNION ALL SELECT 981005 AS shard_id, 'public.test_table_statistics_hash_981005' AS shard_name, pg_total_relation_size('public.test_table_statistics_hash_981005') UNION ALL SELECT 981006 AS shard_id, 'public.test_table_statistics_hash_981006' AS shard_name, pg_total_relation_size('public.test_table_statistics_hash_981006') UNION ALL SELECT 981007 AS shard_id, 'public.test_table_statistics_hash_981007' AS shard_name, pg_total_relation_size('public.test_table_statistics_hash_981007') UNION ALL SELECT 0::bigint, NULL::text, 0::bigint; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing COMMIT DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx @@ -152,11 +152,11 @@ SET citus.multi_shard_modify_mode TO sequential; SELECT citus_update_table_statistics('test_table_statistics_append'); NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT 981008 AS shard_id, 'public.test_table_statistics_append_981008' AS shard_name, pg_relation_size('public.test_table_statistics_append_981008') UNION ALL SELECT 981009 AS shard_id, 'public.test_table_statistics_append_981009' AS shard_name, pg_relation_size('public.test_table_statistics_append_981009') UNION ALL SELECT 0::bigint, NULL::text, 0::bigint; +NOTICE: issuing SELECT 981008 AS shard_id, 'public.test_table_statistics_append_981008' AS shard_name, pg_total_relation_size('public.test_table_statistics_append_981008') UNION ALL SELECT 981009 AS shard_id, 'public.test_table_statistics_append_981009' AS shard_name, pg_total_relation_size('public.test_table_statistics_append_981009') UNION ALL SELECT 0::bigint, NULL::text, 0::bigint; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT 981008 AS shard_id, 'public.test_table_statistics_append_981008' AS shard_name, pg_relation_size('public.test_table_statistics_append_981008') UNION ALL SELECT 981009 AS shard_id, 'public.test_table_statistics_append_981009' AS shard_name, pg_relation_size('public.test_table_statistics_append_981009') UNION ALL SELECT 0::bigint, NULL::text, 0::bigint; +NOTICE: issuing SELECT 981008 AS shard_id, 'public.test_table_statistics_append_981008' AS shard_name, pg_total_relation_size('public.test_table_statistics_append_981008') UNION ALL SELECT 981009 AS shard_id, 'public.test_table_statistics_append_981009' AS shard_name, pg_total_relation_size('public.test_table_statistics_append_981009') UNION ALL SELECT 0::bigint, NULL::text, 0::bigint; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing COMMIT DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx diff --git a/src/test/regress/expected/multi_mx_create_table.out b/src/test/regress/expected/multi_mx_create_table.out index ffbaa738e..6bdef048a 100644 --- a/src/test/regress/expected/multi_mx_create_table.out +++ b/src/test/regress/expected/multi_mx_create_table.out @@ -497,22 +497,22 @@ ORDER BY table_name::text; SELECT shard_name, table_name, citus_table_type, shard_size FROM citus_shards ORDER BY shard_name::text; shard_name | table_name | citus_table_type | shard_size --------------------------------------------------------------------- - app_analytics_events_mx_1220096 | app_analytics_events_mx | distributed | 0 - app_analytics_events_mx_1220096 | app_analytics_events_mx | distributed | 0 - app_analytics_events_mx_1220096 | app_analytics_events_mx | distributed | 0 - app_analytics_events_mx_1220096 | app_analytics_events_mx | distributed | 0 - app_analytics_events_mx_1220096 | app_analytics_events_mx | distributed | 0 - app_analytics_events_mx_1220096 | app_analytics_events_mx | distributed | 0 - app_analytics_events_mx_1220096 | app_analytics_events_mx | distributed | 0 - app_analytics_events_mx_1220097 | app_analytics_events_mx | distributed | 0 - app_analytics_events_mx_1220098 | app_analytics_events_mx | distributed | 0 - app_analytics_events_mx_1220098 | app_analytics_events_mx | distributed | 0 - app_analytics_events_mx_1220098 | app_analytics_events_mx | distributed | 0 - app_analytics_events_mx_1220098 | app_analytics_events_mx | distributed | 0 - app_analytics_events_mx_1220098 | app_analytics_events_mx | distributed | 0 - app_analytics_events_mx_1220098 | app_analytics_events_mx | distributed | 0 - app_analytics_events_mx_1220098 | app_analytics_events_mx | distributed | 0 - app_analytics_events_mx_1220099 | app_analytics_events_mx | distributed | 0 + app_analytics_events_mx_1220096 | app_analytics_events_mx | distributed | 8192 + app_analytics_events_mx_1220096 | app_analytics_events_mx | distributed | 8192 + app_analytics_events_mx_1220096 | app_analytics_events_mx | distributed | 8192 + app_analytics_events_mx_1220096 | app_analytics_events_mx | distributed | 8192 + app_analytics_events_mx_1220096 | app_analytics_events_mx | distributed | 8192 + app_analytics_events_mx_1220096 | app_analytics_events_mx | distributed | 8192 + app_analytics_events_mx_1220096 | app_analytics_events_mx | distributed | 8192 + app_analytics_events_mx_1220097 | app_analytics_events_mx | distributed | 8192 + app_analytics_events_mx_1220098 | app_analytics_events_mx | distributed | 8192 + app_analytics_events_mx_1220098 | app_analytics_events_mx | distributed | 8192 + app_analytics_events_mx_1220098 | app_analytics_events_mx | distributed | 8192 + app_analytics_events_mx_1220098 | app_analytics_events_mx | distributed | 8192 + app_analytics_events_mx_1220098 | app_analytics_events_mx | distributed | 8192 + app_analytics_events_mx_1220098 | app_analytics_events_mx | distributed | 8192 + app_analytics_events_mx_1220098 | app_analytics_events_mx | distributed | 8192 + app_analytics_events_mx_1220099 | app_analytics_events_mx | distributed | 8192 articles_hash_mx_1220104 | articles_hash_mx | distributed | 0 articles_hash_mx_1220104 | articles_hash_mx | distributed | 0 articles_hash_mx_1220104 | articles_hash_mx | distributed | 0 @@ -608,22 +608,22 @@ SELECT shard_name, table_name, citus_table_type, shard_size FROM citus_shards OR citus_mx_test_schema.nation_hash_collation_search_path_1220046 | citus_mx_test_schema.nation_hash_collation_search_path | distributed | 0 citus_mx_test_schema.nation_hash_collation_search_path_1220046 | citus_mx_test_schema.nation_hash_collation_search_path | distributed | 0 citus_mx_test_schema.nation_hash_collation_search_path_1220047 | citus_mx_test_schema.nation_hash_collation_search_path | distributed | 8192 - citus_mx_test_schema.nation_hash_composite_types_1220048 | citus_mx_test_schema.nation_hash_composite_types | distributed | 8192 - citus_mx_test_schema.nation_hash_composite_types_1220048 | citus_mx_test_schema.nation_hash_composite_types | distributed | 8192 - citus_mx_test_schema.nation_hash_composite_types_1220048 | citus_mx_test_schema.nation_hash_composite_types | distributed | 8192 - citus_mx_test_schema.nation_hash_composite_types_1220048 | citus_mx_test_schema.nation_hash_composite_types | distributed | 8192 - citus_mx_test_schema.nation_hash_composite_types_1220048 | citus_mx_test_schema.nation_hash_composite_types | distributed | 8192 - citus_mx_test_schema.nation_hash_composite_types_1220048 | citus_mx_test_schema.nation_hash_composite_types | distributed | 8192 - citus_mx_test_schema.nation_hash_composite_types_1220048 | citus_mx_test_schema.nation_hash_composite_types | distributed | 8192 - citus_mx_test_schema.nation_hash_composite_types_1220049 | citus_mx_test_schema.nation_hash_composite_types | distributed | 8192 - citus_mx_test_schema.nation_hash_composite_types_1220050 | citus_mx_test_schema.nation_hash_composite_types | distributed | 0 - citus_mx_test_schema.nation_hash_composite_types_1220050 | citus_mx_test_schema.nation_hash_composite_types | distributed | 0 - citus_mx_test_schema.nation_hash_composite_types_1220050 | citus_mx_test_schema.nation_hash_composite_types | distributed | 0 - citus_mx_test_schema.nation_hash_composite_types_1220050 | citus_mx_test_schema.nation_hash_composite_types | distributed | 0 - citus_mx_test_schema.nation_hash_composite_types_1220050 | citus_mx_test_schema.nation_hash_composite_types | distributed | 0 - citus_mx_test_schema.nation_hash_composite_types_1220050 | citus_mx_test_schema.nation_hash_composite_types | distributed | 0 - citus_mx_test_schema.nation_hash_composite_types_1220050 | citus_mx_test_schema.nation_hash_composite_types | distributed | 0 - citus_mx_test_schema.nation_hash_composite_types_1220051 | citus_mx_test_schema.nation_hash_composite_types | distributed | 8192 + citus_mx_test_schema.nation_hash_composite_types_1220048 | citus_mx_test_schema.nation_hash_composite_types | distributed | 16384 + citus_mx_test_schema.nation_hash_composite_types_1220048 | citus_mx_test_schema.nation_hash_composite_types | distributed | 16384 + citus_mx_test_schema.nation_hash_composite_types_1220048 | citus_mx_test_schema.nation_hash_composite_types | distributed | 16384 + citus_mx_test_schema.nation_hash_composite_types_1220048 | citus_mx_test_schema.nation_hash_composite_types | distributed | 16384 + citus_mx_test_schema.nation_hash_composite_types_1220048 | citus_mx_test_schema.nation_hash_composite_types | distributed | 16384 + citus_mx_test_schema.nation_hash_composite_types_1220048 | citus_mx_test_schema.nation_hash_composite_types | distributed | 16384 + citus_mx_test_schema.nation_hash_composite_types_1220048 | citus_mx_test_schema.nation_hash_composite_types | distributed | 16384 + citus_mx_test_schema.nation_hash_composite_types_1220049 | citus_mx_test_schema.nation_hash_composite_types | distributed | 16384 + citus_mx_test_schema.nation_hash_composite_types_1220050 | citus_mx_test_schema.nation_hash_composite_types | distributed | 8192 + citus_mx_test_schema.nation_hash_composite_types_1220050 | citus_mx_test_schema.nation_hash_composite_types | distributed | 8192 + citus_mx_test_schema.nation_hash_composite_types_1220050 | citus_mx_test_schema.nation_hash_composite_types | distributed | 8192 + citus_mx_test_schema.nation_hash_composite_types_1220050 | citus_mx_test_schema.nation_hash_composite_types | distributed | 8192 + citus_mx_test_schema.nation_hash_composite_types_1220050 | citus_mx_test_schema.nation_hash_composite_types | distributed | 8192 + citus_mx_test_schema.nation_hash_composite_types_1220050 | citus_mx_test_schema.nation_hash_composite_types | distributed | 8192 + citus_mx_test_schema.nation_hash_composite_types_1220050 | citus_mx_test_schema.nation_hash_composite_types | distributed | 8192 + citus_mx_test_schema.nation_hash_composite_types_1220051 | citus_mx_test_schema.nation_hash_composite_types | distributed | 16384 citus_mx_test_schema_join_1.nation_hash_1220032 | citus_mx_test_schema_join_1.nation_hash | distributed | 0 citus_mx_test_schema_join_1.nation_hash_1220032 | citus_mx_test_schema_join_1.nation_hash | distributed | 0 citus_mx_test_schema_join_1.nation_hash_1220032 | citus_mx_test_schema_join_1.nation_hash | distributed | 0 @@ -696,109 +696,109 @@ SELECT shard_name, table_name, citus_table_type, shard_size FROM citus_shards OR customer_mx_1220084 | customer_mx | reference | 0 customer_mx_1220084 | customer_mx | reference | 0 customer_mx_1220084 | customer_mx | reference | 0 - labs_mx_1220102 | labs_mx | distributed | 0 - labs_mx_1220102 | labs_mx | distributed | 0 - labs_mx_1220102 | labs_mx | distributed | 0 - labs_mx_1220102 | labs_mx | distributed | 0 - labs_mx_1220102 | labs_mx | distributed | 0 - labs_mx_1220102 | labs_mx | distributed | 0 - labs_mx_1220102 | labs_mx | distributed | 0 - limit_orders_mx_1220092 | limit_orders_mx | distributed | 0 - limit_orders_mx_1220092 | limit_orders_mx | distributed | 0 - limit_orders_mx_1220092 | limit_orders_mx | distributed | 0 - limit_orders_mx_1220092 | limit_orders_mx | distributed | 0 - limit_orders_mx_1220092 | limit_orders_mx | distributed | 0 - limit_orders_mx_1220092 | limit_orders_mx | distributed | 0 - limit_orders_mx_1220092 | limit_orders_mx | distributed | 0 - limit_orders_mx_1220093 | limit_orders_mx | distributed | 0 - lineitem_mx_1220052 | lineitem_mx | distributed | 0 - lineitem_mx_1220052 | lineitem_mx | distributed | 0 - lineitem_mx_1220052 | lineitem_mx | distributed | 0 - lineitem_mx_1220052 | lineitem_mx | distributed | 0 - lineitem_mx_1220052 | lineitem_mx | distributed | 0 - lineitem_mx_1220052 | lineitem_mx | distributed | 0 - lineitem_mx_1220052 | lineitem_mx | distributed | 0 - lineitem_mx_1220053 | lineitem_mx | distributed | 0 - lineitem_mx_1220054 | lineitem_mx | distributed | 0 - lineitem_mx_1220054 | lineitem_mx | distributed | 0 - lineitem_mx_1220054 | lineitem_mx | distributed | 0 - lineitem_mx_1220054 | lineitem_mx | distributed | 0 - lineitem_mx_1220054 | lineitem_mx | distributed | 0 - lineitem_mx_1220054 | lineitem_mx | distributed | 0 - lineitem_mx_1220054 | lineitem_mx | distributed | 0 - lineitem_mx_1220055 | lineitem_mx | distributed | 0 - lineitem_mx_1220056 | lineitem_mx | distributed | 0 - lineitem_mx_1220056 | lineitem_mx | distributed | 0 - lineitem_mx_1220056 | lineitem_mx | distributed | 0 - lineitem_mx_1220056 | lineitem_mx | distributed | 0 - lineitem_mx_1220056 | lineitem_mx | distributed | 0 - lineitem_mx_1220056 | lineitem_mx | distributed | 0 - lineitem_mx_1220056 | lineitem_mx | distributed | 0 - lineitem_mx_1220057 | lineitem_mx | distributed | 0 - lineitem_mx_1220058 | lineitem_mx | distributed | 0 - lineitem_mx_1220058 | lineitem_mx | distributed | 0 - lineitem_mx_1220058 | lineitem_mx | distributed | 0 - lineitem_mx_1220058 | lineitem_mx | distributed | 0 - lineitem_mx_1220058 | lineitem_mx | distributed | 0 - lineitem_mx_1220058 | lineitem_mx | distributed | 0 - lineitem_mx_1220058 | lineitem_mx | distributed | 0 - lineitem_mx_1220059 | lineitem_mx | distributed | 0 - lineitem_mx_1220060 | lineitem_mx | distributed | 0 - lineitem_mx_1220060 | lineitem_mx | distributed | 0 - lineitem_mx_1220060 | lineitem_mx | distributed | 0 - lineitem_mx_1220060 | lineitem_mx | distributed | 0 - lineitem_mx_1220060 | lineitem_mx | distributed | 0 - lineitem_mx_1220060 | lineitem_mx | distributed | 0 - lineitem_mx_1220060 | lineitem_mx | distributed | 0 - lineitem_mx_1220061 | lineitem_mx | distributed | 0 - lineitem_mx_1220062 | lineitem_mx | distributed | 0 - lineitem_mx_1220062 | lineitem_mx | distributed | 0 - lineitem_mx_1220062 | lineitem_mx | distributed | 0 - lineitem_mx_1220062 | lineitem_mx | distributed | 0 - lineitem_mx_1220062 | lineitem_mx | distributed | 0 - lineitem_mx_1220062 | lineitem_mx | distributed | 0 - lineitem_mx_1220062 | lineitem_mx | distributed | 0 - lineitem_mx_1220063 | lineitem_mx | distributed | 0 - lineitem_mx_1220064 | lineitem_mx | distributed | 0 - lineitem_mx_1220064 | lineitem_mx | distributed | 0 - lineitem_mx_1220064 | lineitem_mx | distributed | 0 - lineitem_mx_1220064 | lineitem_mx | distributed | 0 - lineitem_mx_1220064 | lineitem_mx | distributed | 0 - lineitem_mx_1220064 | lineitem_mx | distributed | 0 - lineitem_mx_1220064 | lineitem_mx | distributed | 0 - lineitem_mx_1220065 | lineitem_mx | distributed | 0 - lineitem_mx_1220066 | lineitem_mx | distributed | 0 - lineitem_mx_1220066 | lineitem_mx | distributed | 0 - lineitem_mx_1220066 | lineitem_mx | distributed | 0 - lineitem_mx_1220066 | lineitem_mx | distributed | 0 - lineitem_mx_1220066 | lineitem_mx | distributed | 0 - lineitem_mx_1220066 | lineitem_mx | distributed | 0 - lineitem_mx_1220066 | lineitem_mx | distributed | 0 - lineitem_mx_1220067 | lineitem_mx | distributed | 0 - multiple_hash_mx_1220094 | multiple_hash_mx | distributed | 0 - multiple_hash_mx_1220094 | multiple_hash_mx | distributed | 0 - multiple_hash_mx_1220094 | multiple_hash_mx | distributed | 0 - multiple_hash_mx_1220094 | multiple_hash_mx | distributed | 0 - multiple_hash_mx_1220094 | multiple_hash_mx | distributed | 0 - multiple_hash_mx_1220094 | multiple_hash_mx | distributed | 0 - multiple_hash_mx_1220094 | multiple_hash_mx | distributed | 0 - multiple_hash_mx_1220095 | multiple_hash_mx | distributed | 0 - mx_ddl_table_1220088 | mx_ddl_table | distributed | 8192 - mx_ddl_table_1220088 | mx_ddl_table | distributed | 8192 - mx_ddl_table_1220088 | mx_ddl_table | distributed | 8192 - mx_ddl_table_1220088 | mx_ddl_table | distributed | 8192 - mx_ddl_table_1220088 | mx_ddl_table | distributed | 8192 - mx_ddl_table_1220088 | mx_ddl_table | distributed | 8192 - mx_ddl_table_1220088 | mx_ddl_table | distributed | 8192 - mx_ddl_table_1220089 | mx_ddl_table | distributed | 8192 - mx_ddl_table_1220090 | mx_ddl_table | distributed | 8192 - mx_ddl_table_1220090 | mx_ddl_table | distributed | 8192 - mx_ddl_table_1220090 | mx_ddl_table | distributed | 8192 - mx_ddl_table_1220090 | mx_ddl_table | distributed | 8192 - mx_ddl_table_1220090 | mx_ddl_table | distributed | 8192 - mx_ddl_table_1220090 | mx_ddl_table | distributed | 8192 - mx_ddl_table_1220090 | mx_ddl_table | distributed | 8192 - mx_ddl_table_1220091 | mx_ddl_table | distributed | 8192 + labs_mx_1220102 | labs_mx | distributed | 8192 + labs_mx_1220102 | labs_mx | distributed | 8192 + labs_mx_1220102 | labs_mx | distributed | 8192 + labs_mx_1220102 | labs_mx | distributed | 8192 + labs_mx_1220102 | labs_mx | distributed | 8192 + labs_mx_1220102 | labs_mx | distributed | 8192 + labs_mx_1220102 | labs_mx | distributed | 8192 + limit_orders_mx_1220092 | limit_orders_mx | distributed | 16384 + limit_orders_mx_1220092 | limit_orders_mx | distributed | 16384 + limit_orders_mx_1220092 | limit_orders_mx | distributed | 16384 + limit_orders_mx_1220092 | limit_orders_mx | distributed | 16384 + limit_orders_mx_1220092 | limit_orders_mx | distributed | 16384 + limit_orders_mx_1220092 | limit_orders_mx | distributed | 16384 + limit_orders_mx_1220092 | limit_orders_mx | distributed | 16384 + limit_orders_mx_1220093 | limit_orders_mx | distributed | 16384 + lineitem_mx_1220052 | lineitem_mx | distributed | 16384 + lineitem_mx_1220052 | lineitem_mx | distributed | 16384 + lineitem_mx_1220052 | lineitem_mx | distributed | 16384 + lineitem_mx_1220052 | lineitem_mx | distributed | 16384 + lineitem_mx_1220052 | lineitem_mx | distributed | 16384 + lineitem_mx_1220052 | lineitem_mx | distributed | 16384 + lineitem_mx_1220052 | lineitem_mx | distributed | 16384 + lineitem_mx_1220053 | lineitem_mx | distributed | 16384 + lineitem_mx_1220054 | lineitem_mx | distributed | 16384 + lineitem_mx_1220054 | lineitem_mx | distributed | 16384 + lineitem_mx_1220054 | lineitem_mx | distributed | 16384 + lineitem_mx_1220054 | lineitem_mx | distributed | 16384 + lineitem_mx_1220054 | lineitem_mx | distributed | 16384 + lineitem_mx_1220054 | lineitem_mx | distributed | 16384 + lineitem_mx_1220054 | lineitem_mx | distributed | 16384 + lineitem_mx_1220055 | lineitem_mx | distributed | 16384 + lineitem_mx_1220056 | lineitem_mx | distributed | 16384 + lineitem_mx_1220056 | lineitem_mx | distributed | 16384 + lineitem_mx_1220056 | lineitem_mx | distributed | 16384 + lineitem_mx_1220056 | lineitem_mx | distributed | 16384 + lineitem_mx_1220056 | lineitem_mx | distributed | 16384 + lineitem_mx_1220056 | lineitem_mx | distributed | 16384 + lineitem_mx_1220056 | lineitem_mx | distributed | 16384 + lineitem_mx_1220057 | lineitem_mx | distributed | 16384 + lineitem_mx_1220058 | lineitem_mx | distributed | 16384 + lineitem_mx_1220058 | lineitem_mx | distributed | 16384 + lineitem_mx_1220058 | lineitem_mx | distributed | 16384 + lineitem_mx_1220058 | lineitem_mx | distributed | 16384 + lineitem_mx_1220058 | lineitem_mx | distributed | 16384 + lineitem_mx_1220058 | lineitem_mx | distributed | 16384 + lineitem_mx_1220058 | lineitem_mx | distributed | 16384 + lineitem_mx_1220059 | lineitem_mx | distributed | 16384 + lineitem_mx_1220060 | lineitem_mx | distributed | 16384 + lineitem_mx_1220060 | lineitem_mx | distributed | 16384 + lineitem_mx_1220060 | lineitem_mx | distributed | 16384 + lineitem_mx_1220060 | lineitem_mx | distributed | 16384 + lineitem_mx_1220060 | lineitem_mx | distributed | 16384 + lineitem_mx_1220060 | lineitem_mx | distributed | 16384 + lineitem_mx_1220060 | lineitem_mx | distributed | 16384 + lineitem_mx_1220061 | lineitem_mx | distributed | 16384 + lineitem_mx_1220062 | lineitem_mx | distributed | 16384 + lineitem_mx_1220062 | lineitem_mx | distributed | 16384 + lineitem_mx_1220062 | lineitem_mx | distributed | 16384 + lineitem_mx_1220062 | lineitem_mx | distributed | 16384 + lineitem_mx_1220062 | lineitem_mx | distributed | 16384 + lineitem_mx_1220062 | lineitem_mx | distributed | 16384 + lineitem_mx_1220062 | lineitem_mx | distributed | 16384 + lineitem_mx_1220063 | lineitem_mx | distributed | 16384 + lineitem_mx_1220064 | lineitem_mx | distributed | 16384 + lineitem_mx_1220064 | lineitem_mx | distributed | 16384 + lineitem_mx_1220064 | lineitem_mx | distributed | 16384 + lineitem_mx_1220064 | lineitem_mx | distributed | 16384 + lineitem_mx_1220064 | lineitem_mx | distributed | 16384 + lineitem_mx_1220064 | lineitem_mx | distributed | 16384 + lineitem_mx_1220064 | lineitem_mx | distributed | 16384 + lineitem_mx_1220065 | lineitem_mx | distributed | 16384 + lineitem_mx_1220066 | lineitem_mx | distributed | 16384 + lineitem_mx_1220066 | lineitem_mx | distributed | 16384 + lineitem_mx_1220066 | lineitem_mx | distributed | 16384 + lineitem_mx_1220066 | lineitem_mx | distributed | 16384 + lineitem_mx_1220066 | lineitem_mx | distributed | 16384 + lineitem_mx_1220066 | lineitem_mx | distributed | 16384 + lineitem_mx_1220066 | lineitem_mx | distributed | 16384 + lineitem_mx_1220067 | lineitem_mx | distributed | 16384 + multiple_hash_mx_1220094 | multiple_hash_mx | distributed | 8192 + multiple_hash_mx_1220094 | multiple_hash_mx | distributed | 8192 + multiple_hash_mx_1220094 | multiple_hash_mx | distributed | 8192 + multiple_hash_mx_1220094 | multiple_hash_mx | distributed | 8192 + multiple_hash_mx_1220094 | multiple_hash_mx | distributed | 8192 + multiple_hash_mx_1220094 | multiple_hash_mx | distributed | 8192 + multiple_hash_mx_1220094 | multiple_hash_mx | distributed | 8192 + multiple_hash_mx_1220095 | multiple_hash_mx | distributed | 8192 + mx_ddl_table_1220088 | mx_ddl_table | distributed | 24576 + mx_ddl_table_1220088 | mx_ddl_table | distributed | 24576 + mx_ddl_table_1220088 | mx_ddl_table | distributed | 24576 + mx_ddl_table_1220088 | mx_ddl_table | distributed | 24576 + mx_ddl_table_1220088 | mx_ddl_table | distributed | 24576 + mx_ddl_table_1220088 | mx_ddl_table | distributed | 24576 + mx_ddl_table_1220088 | mx_ddl_table | distributed | 24576 + mx_ddl_table_1220089 | mx_ddl_table | distributed | 24576 + mx_ddl_table_1220090 | mx_ddl_table | distributed | 24576 + mx_ddl_table_1220090 | mx_ddl_table | distributed | 24576 + mx_ddl_table_1220090 | mx_ddl_table | distributed | 24576 + mx_ddl_table_1220090 | mx_ddl_table | distributed | 24576 + mx_ddl_table_1220090 | mx_ddl_table | distributed | 24576 + mx_ddl_table_1220090 | mx_ddl_table | distributed | 24576 + mx_ddl_table_1220090 | mx_ddl_table | distributed | 24576 + mx_ddl_table_1220091 | mx_ddl_table | distributed | 24576 nation_hash_1220000 | nation_hash | distributed | 0 nation_hash_1220000 | nation_hash | distributed | 0 nation_hash_1220000 | nation_hash | distributed | 0 @@ -871,77 +871,77 @@ SELECT shard_name, table_name, citus_table_type, shard_size FROM citus_shards OR nation_mx_1220085 | nation_mx | reference | 0 nation_mx_1220085 | nation_mx | reference | 0 nation_mx_1220085 | nation_mx | reference | 0 - objects_mx_1220103 | objects_mx | distributed | 0 - objects_mx_1220103 | objects_mx | distributed | 0 - objects_mx_1220103 | objects_mx | distributed | 0 - objects_mx_1220103 | objects_mx | distributed | 0 - objects_mx_1220103 | objects_mx | distributed | 0 - objects_mx_1220103 | objects_mx | distributed | 0 - objects_mx_1220103 | objects_mx | distributed | 0 - orders_mx_1220068 | orders_mx | distributed | 0 - orders_mx_1220068 | orders_mx | distributed | 0 - orders_mx_1220068 | orders_mx | distributed | 0 - orders_mx_1220068 | orders_mx | distributed | 0 - orders_mx_1220068 | orders_mx | distributed | 0 - orders_mx_1220068 | orders_mx | distributed | 0 - orders_mx_1220068 | orders_mx | distributed | 0 - orders_mx_1220069 | orders_mx | distributed | 0 - orders_mx_1220070 | orders_mx | distributed | 0 - orders_mx_1220070 | orders_mx | distributed | 0 - orders_mx_1220070 | orders_mx | distributed | 0 - orders_mx_1220070 | orders_mx | distributed | 0 - orders_mx_1220070 | orders_mx | distributed | 0 - orders_mx_1220070 | orders_mx | distributed | 0 - orders_mx_1220070 | orders_mx | distributed | 0 - orders_mx_1220071 | orders_mx | distributed | 0 - orders_mx_1220072 | orders_mx | distributed | 0 - orders_mx_1220072 | orders_mx | distributed | 0 - orders_mx_1220072 | orders_mx | distributed | 0 - orders_mx_1220072 | orders_mx | distributed | 0 - orders_mx_1220072 | orders_mx | distributed | 0 - orders_mx_1220072 | orders_mx | distributed | 0 - orders_mx_1220072 | orders_mx | distributed | 0 - orders_mx_1220073 | orders_mx | distributed | 0 - orders_mx_1220074 | orders_mx | distributed | 0 - orders_mx_1220074 | orders_mx | distributed | 0 - orders_mx_1220074 | orders_mx | distributed | 0 - orders_mx_1220074 | orders_mx | distributed | 0 - orders_mx_1220074 | orders_mx | distributed | 0 - orders_mx_1220074 | orders_mx | distributed | 0 - orders_mx_1220074 | orders_mx | distributed | 0 - orders_mx_1220075 | orders_mx | distributed | 0 - orders_mx_1220076 | orders_mx | distributed | 0 - orders_mx_1220076 | orders_mx | distributed | 0 - orders_mx_1220076 | orders_mx | distributed | 0 - orders_mx_1220076 | orders_mx | distributed | 0 - orders_mx_1220076 | orders_mx | distributed | 0 - orders_mx_1220076 | orders_mx | distributed | 0 - orders_mx_1220076 | orders_mx | distributed | 0 - orders_mx_1220077 | orders_mx | distributed | 0 - orders_mx_1220078 | orders_mx | distributed | 0 - orders_mx_1220078 | orders_mx | distributed | 0 - orders_mx_1220078 | orders_mx | distributed | 0 - orders_mx_1220078 | orders_mx | distributed | 0 - orders_mx_1220078 | orders_mx | distributed | 0 - orders_mx_1220078 | orders_mx | distributed | 0 - orders_mx_1220078 | orders_mx | distributed | 0 - orders_mx_1220079 | orders_mx | distributed | 0 - orders_mx_1220080 | orders_mx | distributed | 0 - orders_mx_1220080 | orders_mx | distributed | 0 - orders_mx_1220080 | orders_mx | distributed | 0 - orders_mx_1220080 | orders_mx | distributed | 0 - orders_mx_1220080 | orders_mx | distributed | 0 - orders_mx_1220080 | orders_mx | distributed | 0 - orders_mx_1220080 | orders_mx | distributed | 0 - orders_mx_1220081 | orders_mx | distributed | 0 - orders_mx_1220082 | orders_mx | distributed | 0 - orders_mx_1220082 | orders_mx | distributed | 0 - orders_mx_1220082 | orders_mx | distributed | 0 - orders_mx_1220082 | orders_mx | distributed | 0 - orders_mx_1220082 | orders_mx | distributed | 0 - orders_mx_1220082 | orders_mx | distributed | 0 - orders_mx_1220082 | orders_mx | distributed | 0 - orders_mx_1220083 | orders_mx | distributed | 0 + objects_mx_1220103 | objects_mx | distributed | 16384 + objects_mx_1220103 | objects_mx | distributed | 16384 + objects_mx_1220103 | objects_mx | distributed | 16384 + objects_mx_1220103 | objects_mx | distributed | 16384 + objects_mx_1220103 | objects_mx | distributed | 16384 + objects_mx_1220103 | objects_mx | distributed | 16384 + objects_mx_1220103 | objects_mx | distributed | 16384 + orders_mx_1220068 | orders_mx | distributed | 8192 + orders_mx_1220068 | orders_mx | distributed | 8192 + orders_mx_1220068 | orders_mx | distributed | 8192 + orders_mx_1220068 | orders_mx | distributed | 8192 + orders_mx_1220068 | orders_mx | distributed | 8192 + orders_mx_1220068 | orders_mx | distributed | 8192 + orders_mx_1220068 | orders_mx | distributed | 8192 + orders_mx_1220069 | orders_mx | distributed | 8192 + orders_mx_1220070 | orders_mx | distributed | 8192 + orders_mx_1220070 | orders_mx | distributed | 8192 + orders_mx_1220070 | orders_mx | distributed | 8192 + orders_mx_1220070 | orders_mx | distributed | 8192 + orders_mx_1220070 | orders_mx | distributed | 8192 + orders_mx_1220070 | orders_mx | distributed | 8192 + orders_mx_1220070 | orders_mx | distributed | 8192 + orders_mx_1220071 | orders_mx | distributed | 8192 + orders_mx_1220072 | orders_mx | distributed | 8192 + orders_mx_1220072 | orders_mx | distributed | 8192 + orders_mx_1220072 | orders_mx | distributed | 8192 + orders_mx_1220072 | orders_mx | distributed | 8192 + orders_mx_1220072 | orders_mx | distributed | 8192 + orders_mx_1220072 | orders_mx | distributed | 8192 + orders_mx_1220072 | orders_mx | distributed | 8192 + orders_mx_1220073 | orders_mx | distributed | 8192 + orders_mx_1220074 | orders_mx | distributed | 8192 + orders_mx_1220074 | orders_mx | distributed | 8192 + orders_mx_1220074 | orders_mx | distributed | 8192 + orders_mx_1220074 | orders_mx | distributed | 8192 + orders_mx_1220074 | orders_mx | distributed | 8192 + orders_mx_1220074 | orders_mx | distributed | 8192 + orders_mx_1220074 | orders_mx | distributed | 8192 + orders_mx_1220075 | orders_mx | distributed | 8192 + orders_mx_1220076 | orders_mx | distributed | 8192 + orders_mx_1220076 | orders_mx | distributed | 8192 + orders_mx_1220076 | orders_mx | distributed | 8192 + orders_mx_1220076 | orders_mx | distributed | 8192 + orders_mx_1220076 | orders_mx | distributed | 8192 + orders_mx_1220076 | orders_mx | distributed | 8192 + orders_mx_1220076 | orders_mx | distributed | 8192 + orders_mx_1220077 | orders_mx | distributed | 8192 + orders_mx_1220078 | orders_mx | distributed | 8192 + orders_mx_1220078 | orders_mx | distributed | 8192 + orders_mx_1220078 | orders_mx | distributed | 8192 + orders_mx_1220078 | orders_mx | distributed | 8192 + orders_mx_1220078 | orders_mx | distributed | 8192 + orders_mx_1220078 | orders_mx | distributed | 8192 + orders_mx_1220078 | orders_mx | distributed | 8192 + orders_mx_1220079 | orders_mx | distributed | 8192 + orders_mx_1220080 | orders_mx | distributed | 8192 + orders_mx_1220080 | orders_mx | distributed | 8192 + orders_mx_1220080 | orders_mx | distributed | 8192 + orders_mx_1220080 | orders_mx | distributed | 8192 + orders_mx_1220080 | orders_mx | distributed | 8192 + orders_mx_1220080 | orders_mx | distributed | 8192 + orders_mx_1220080 | orders_mx | distributed | 8192 + orders_mx_1220081 | orders_mx | distributed | 8192 + orders_mx_1220082 | orders_mx | distributed | 8192 + orders_mx_1220082 | orders_mx | distributed | 8192 + orders_mx_1220082 | orders_mx | distributed | 8192 + orders_mx_1220082 | orders_mx | distributed | 8192 + orders_mx_1220082 | orders_mx | distributed | 8192 + orders_mx_1220082 | orders_mx | distributed | 8192 + orders_mx_1220082 | orders_mx | distributed | 8192 + orders_mx_1220083 | orders_mx | distributed | 8192 part_mx_1220086 | part_mx | reference | 0 part_mx_1220086 | part_mx | reference | 0 part_mx_1220086 | part_mx | reference | 0 @@ -950,14 +950,14 @@ SELECT shard_name, table_name, citus_table_type, shard_size FROM citus_shards OR part_mx_1220086 | part_mx | reference | 0 part_mx_1220086 | part_mx | reference | 0 part_mx_1220086 | part_mx | reference | 0 - researchers_mx_1220100 | researchers_mx | distributed | 0 - researchers_mx_1220100 | researchers_mx | distributed | 0 - researchers_mx_1220100 | researchers_mx | distributed | 0 - researchers_mx_1220100 | researchers_mx | distributed | 0 - researchers_mx_1220100 | researchers_mx | distributed | 0 - researchers_mx_1220100 | researchers_mx | distributed | 0 - researchers_mx_1220100 | researchers_mx | distributed | 0 - researchers_mx_1220101 | researchers_mx | distributed | 0 + researchers_mx_1220100 | researchers_mx | distributed | 8192 + researchers_mx_1220100 | researchers_mx | distributed | 8192 + researchers_mx_1220100 | researchers_mx | distributed | 8192 + researchers_mx_1220100 | researchers_mx | distributed | 8192 + researchers_mx_1220100 | researchers_mx | distributed | 8192 + researchers_mx_1220100 | researchers_mx | distributed | 8192 + researchers_mx_1220100 | researchers_mx | distributed | 8192 + researchers_mx_1220101 | researchers_mx | distributed | 8192 supplier_mx_1220087 | supplier_mx | reference | 0 supplier_mx_1220087 | supplier_mx | reference | 0 supplier_mx_1220087 | supplier_mx | reference | 0 From ed7cc8f460e91931d082af7785b8c228457b813e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Emel=20=C5=9Eim=C5=9Fek?= Date: Mon, 6 Mar 2023 13:59:45 +0300 Subject: [PATCH 5/9] Remove unused lock functions (#6747) Code cleanup. This change removes two unused functions seemingly left over after a previous refactoring of shard move code. --- src/backend/distributed/utils/resource_lock.c | 39 ------------------- src/include/distributed/resource_lock.h | 5 +-- 2 files changed, 1 insertion(+), 43 deletions(-) diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index cc4cb0d88..7b8edf758 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -503,45 +503,6 @@ SetLocktagForShardDistributionMetadata(int64 shardId, LOCKTAG *tag) } -/* - * LockPlacementCleanup takes an exclusive lock to ensure that only one process - * can cleanup placements at the same time. - */ -void -LockPlacementCleanup(void) -{ - LOCKTAG tag; - const bool sessionLock = false; - const bool dontWait = false; - - /* Moves acquire lock with a constant operation id CITUS_SHARD_MOVE. - * This will change as we add support for parallel moves. - */ - SET_LOCKTAG_CITUS_OPERATION(tag, CITUS_SHARD_MOVE); - (void) LockAcquire(&tag, ExclusiveLock, sessionLock, dontWait); -} - - -/* - * TryLockPlacementCleanup takes an exclusive lock to ensure that only one - * process can cleanup placements at the same time. - */ -bool -TryLockPlacementCleanup(void) -{ - LOCKTAG tag; - const bool sessionLock = false; - const bool dontWait = true; - - /* Moves acquire lock with a constant operation id CITUS_SHARD_MOVE. - * This will change as we add support for parallel moves. - */ - SET_LOCKTAG_CITUS_OPERATION(tag, CITUS_SHARD_MOVE); - bool lockAcquired = LockAcquire(&tag, ExclusiveLock, sessionLock, dontWait); - return lockAcquired; -} - - /* * LockReferencedReferenceShardDistributionMetadata acquires shard distribution * metadata locks with the given lock mode on the reference tables which has a diff --git a/src/include/distributed/resource_lock.h b/src/include/distributed/resource_lock.h index 9e143e467..9efa1b767 100644 --- a/src/include/distributed/resource_lock.h +++ b/src/include/distributed/resource_lock.h @@ -53,8 +53,7 @@ typedef enum CitusOperations CITUS_NONBLOCKING_SPLIT = 1, CITUS_CREATE_DISTRIBUTED_TABLE_CONCURRENTLY = 2, CITUS_CREATE_COLOCATION_DEFAULT = 3, - CITUS_SHARD_MOVE = 4, - CITUS_BACKGROUND_TASK_MONITOR = 5 + CITUS_BACKGROUND_TASK_MONITOR = 4 } CitusOperations; /* reuse advisory lock, but with different, unused field 4 (4)*/ @@ -165,8 +164,6 @@ enum DistLockConfigs /* Lock shard/relation metadata for safe modifications */ extern void LockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode); -extern void LockPlacementCleanup(void); -extern bool TryLockPlacementCleanup(void); extern void EnsureShardOwner(uint64 shardId, bool missingOk); extern void LockShardListMetadataOnWorkers(LOCKMODE lockmode, List *shardIntervalList); extern void BlockWritesToShardList(List *shardList); From 03f1bb70b734391f4274ccc6ef9ea4bbb4b0b04d Mon Sep 17 00:00:00 2001 From: Ahmet Gedemenli Date: Mon, 6 Mar 2023 14:14:27 +0300 Subject: [PATCH 6/9] Rebalance shard groups with placement count less than worker count (#6739) DESCRIPTION: Adds logic to distribute unbalanced shards If the number of shard placements (for a colocation group) is less than the number of workers, it means that some of the workers will remain empty. With this PR, we consider these shard groups as a colocation group, in order to make them be distributed evenly as much as possible across the cluster. Example: ```sql create table t1 (a int primary key); create table t2 (a int primary key); create table t3 (a int primary key); set citus.shard_count =1; select create_distributed_table('t1','a'); select create_distributed_table('t2','a',colocate_with=>'t1'); select create_distributed_table('t3','a',colocate_with=>'t2'); create table tb1 (a bigint); create table tb2 (a bigint); select create_distributed_table('tb1','a'); select create_distributed_table('tb2','a',colocate_with=>'tb1'); select citus_add_node('localhost',9702); select rebalance_table_shards(); ``` Here we have two colocation groups, each with one shard group. Both shard groups are placed on the first worker node. When we add a new worker node and try to rebalance table shards, the rebalance planner considers it well balanced and does nothing. With this PR, the rebalancer tries to distribute these shard groups evenly across the cluster as much as possible. For this example, with this PR, the rebalancer moves one of the shard groups to the second worker node. fixes: #6715 --- .../distributed/operations/shard_rebalancer.c | 26 +++++- .../distributed/test/shard_rebalancer.c | 20 +++++ .../regress/expected/shard_rebalancer.out | 88 +++++++++++++++++++ .../expected/shard_rebalancer_unit.out | 72 +++++++++++++++ src/test/regress/sql/shard_rebalancer.sql | 35 ++++++++ .../regress/sql/shard_rebalancer_unit.sql | 54 ++++++++++++ 6 files changed, 293 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index d24936925..baed8b0d5 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -475,6 +475,7 @@ GetRebalanceSteps(RebalanceOptions *options) /* sort the lists to make the function more deterministic */ List *activeWorkerList = SortedActiveWorkers(); List *activeShardPlacementListList = NIL; + List *unbalancedShards = NIL; Oid relationId = InvalidOid; foreach_oid(relationId, options->relationIdList) @@ -490,8 +491,29 @@ GetRebalanceSteps(RebalanceOptions *options) shardPlacementList, options->workerNode); } - activeShardPlacementListList = - lappend(activeShardPlacementListList, activeShardPlacementListForRelation); + if (list_length(activeShardPlacementListForRelation) >= list_length( + activeWorkerList)) + { + activeShardPlacementListList = lappend(activeShardPlacementListList, + activeShardPlacementListForRelation); + } + else + { + /* + * If the number of shard groups are less than the number of worker nodes, + * at least one of the worker nodes will remain empty. For such cases, + * we consider those shard groups as a colocation group and try to + * distribute them across the cluster. + */ + unbalancedShards = list_concat(unbalancedShards, + activeShardPlacementListForRelation); + } + } + + if (list_length(unbalancedShards) > 0) + { + activeShardPlacementListList = lappend(activeShardPlacementListList, + unbalancedShards); } if (options->threshold < options->rebalanceStrategy->minimumThreshold) diff --git a/src/backend/distributed/test/shard_rebalancer.c b/src/backend/distributed/test/shard_rebalancer.c index 60603f091..56a063982 100644 --- a/src/backend/distributed/test/shard_rebalancer.c +++ b/src/backend/distributed/test/shard_rebalancer.c @@ -147,6 +147,26 @@ shard_placement_rebalance_array(PG_FUNCTION_ARGS) shardPlacementList = SortList(shardPlacementList, CompareShardPlacements); shardPlacementListList = lappend(shardPlacementListList, shardPlacementList); + List *unbalancedShards = NIL; + ListCell *shardPlacementListCell = NULL; + foreach(shardPlacementListCell, shardPlacementListList) + { + List *placementList = (List *) lfirst(shardPlacementListCell); + + if (list_length(placementList) < list_length(workerNodeList)) + { + unbalancedShards = list_concat(unbalancedShards, + placementList); + shardPlacementListList = foreach_delete_current(shardPlacementListList, + shardPlacementListCell); + } + } + + if (list_length(unbalancedShards) > 0) + { + shardPlacementListList = lappend(shardPlacementListList, unbalancedShards); + } + rebalancePlanFunctions.context = &context; /* sort the lists to make the function more deterministic */ diff --git a/src/test/regress/expected/shard_rebalancer.out b/src/test/regress/expected/shard_rebalancer.out index 9eec2cee3..2146d67f1 100644 --- a/src/test/regress/expected/shard_rebalancer.out +++ b/src/test/regress/expected/shard_rebalancer.out @@ -2626,6 +2626,94 @@ RESET citus.shard_count; DROP VIEW table_placements_per_node; DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='capacity_high_worker_2'; DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='only_worker_1'; +-- add colocation groups with shard group count < worker count +-- the rebalancer should balance those "unbalanced shards" evenly as much as possible +SELECT 1 FROM citus_remove_node('localhost', :worker_2_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +create table single_shard_colocation_1a (a int primary key); +create table single_shard_colocation_1b (a int primary key); +create table single_shard_colocation_1c (a int primary key); +SET citus.shard_replication_factor = 1; +select create_distributed_table('single_shard_colocation_1a','a', colocate_with => 'none', shard_count => 1); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +select create_distributed_table('single_shard_colocation_1b','a',colocate_with=>'single_shard_colocation_1a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +select create_distributed_table('single_shard_colocation_1c','a',colocate_with=>'single_shard_colocation_1b'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +create table single_shard_colocation_2a (a bigint); +create table single_shard_colocation_2b (a bigint); +select create_distributed_table('single_shard_colocation_2a','a', colocate_with => 'none', shard_count => 1); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +select create_distributed_table('single_shard_colocation_2b','a',colocate_with=>'single_shard_colocation_2a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- all shards are placed on the first worker node +SELECT sh.logicalrelid, pl.nodeport + FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid + WHERE sh.logicalrelid::text IN ('single_shard_colocation_1a', 'single_shard_colocation_1b', 'single_shard_colocation_1c', 'single_shard_colocation_2a', 'single_shard_colocation_2b') + ORDER BY sh.logicalrelid; + logicalrelid | nodeport +--------------------------------------------------------------------- + single_shard_colocation_1a | 57637 + single_shard_colocation_1b | 57637 + single_shard_colocation_1c | 57637 + single_shard_colocation_2a | 57637 + single_shard_colocation_2b | 57637 +(5 rows) + +-- add the second node back, then rebalance +ALTER SEQUENCE pg_dist_groupid_seq RESTART WITH 16; +select 1 from citus_add_node('localhost', :worker_2_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +select rebalance_table_shards(); +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... + rebalance_table_shards +--------------------------------------------------------------------- + +(1 row) + +-- verify some shards are moved to the new node +SELECT sh.logicalrelid, pl.nodeport + FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid + WHERE sh.logicalrelid::text IN ('single_shard_colocation_1a', 'single_shard_colocation_1b', 'single_shard_colocation_1c', 'single_shard_colocation_2a', 'single_shard_colocation_2b') + ORDER BY sh.logicalrelid; + logicalrelid | nodeport +--------------------------------------------------------------------- + single_shard_colocation_1a | 57638 + single_shard_colocation_1b | 57638 + single_shard_colocation_1c | 57638 + single_shard_colocation_2a | 57637 + single_shard_colocation_2b | 57637 +(5 rows) + +DROP TABLE single_shard_colocation_1a, single_shard_colocation_1b, single_shard_colocation_1c, single_shard_colocation_2a, single_shard_colocation_2b CASCADE; \c - - - :worker_1_port SET citus.enable_ddl_propagation TO OFF; REVOKE ALL ON SCHEMA public FROM testrole; diff --git a/src/test/regress/expected/shard_rebalancer_unit.out b/src/test/regress/expected/shard_rebalancer_unit.out index 156edfc6b..9ebd6f942 100644 --- a/src/test/regress/expected/shard_rebalancer_unit.out +++ b/src/test/regress/expected/shard_rebalancer_unit.out @@ -742,3 +742,75 @@ HINT: If you do want these moves to happen, try changing improvement_threshold {"updatetype":1,"shardid":2,"sourcename":"a","sourceport":5432,"targetname":"b","targetport":5432} (2 rows) +-- Test single shard colocation groups +SELECT unnest(shard_placement_rebalance_array( + ARRAY['{"node_name": "a"}', + '{"node_name": "b"}']::json[], + ARRAY['{"shardid":1, "cost":20, "nodename":"a"}', + '{"shardid":2, "cost":10, "nodename":"a", "next_colocation": true}', + '{"shardid":3, "cost":10, "nodename":"a", "next_colocation": true}', + '{"shardid":4, "cost":100, "nodename":"a", "next_colocation": true}', + '{"shardid":5, "cost":50, "nodename":"a", "next_colocation": true}', + '{"shardid":6, "cost":50, "nodename":"a", "next_colocation": true}' + ]::json[], + improvement_threshold := 0.1 +)); + unnest +--------------------------------------------------------------------- + {"updatetype":1,"shardid":4,"sourcename":"a","sourceport":5432,"targetname":"b","targetport":5432} + {"updatetype":1,"shardid":1,"sourcename":"a","sourceport":5432,"targetname":"b","targetport":5432} +(2 rows) + +-- Test colocation groups with shard count < worker count +SELECT unnest(shard_placement_rebalance_array( + ARRAY['{"node_name": "a"}', + '{"node_name": "b"}', + '{"node_name": "c"}']::json[], + ARRAY['{"shardid":1, "cost":20, "nodename":"a"}', + '{"shardid":2, "cost":10, "nodename":"a"}', + '{"shardid":3, "cost":10, "nodename":"a", "next_colocation": true}', + '{"shardid":4, "cost":100, "nodename":"a"}', + '{"shardid":5, "cost":50, "nodename":"a", "next_colocation": true}', + '{"shardid":6, "cost":50, "nodename":"a"}' + ]::json[], + improvement_threshold := 0.1 +)); + unnest +--------------------------------------------------------------------- + {"updatetype":1,"shardid":4,"sourcename":"a","sourceport":5432,"targetname":"b","targetport":5432} + {"updatetype":1,"shardid":5,"sourcename":"a","sourceport":5432,"targetname":"c","targetport":5432} + {"updatetype":1,"shardid":1,"sourcename":"a","sourceport":5432,"targetname":"c","targetport":5432} +(3 rows) + +-- Test colocation groups with shard count < worker count +-- mixed with a colocation group shard_count > worker count +SELECT unnest(shard_placement_rebalance_array( + ARRAY['{"node_name": "a"}', + '{"node_name": "b"}', + '{"node_name": "c"}']::json[], + ARRAY['{"shardid":1, "cost":20, "nodename":"a"}', + '{"shardid":2, "cost":10, "nodename":"a"}', + '{"shardid":3, "cost":10, "nodename":"a", "next_colocation": true}', + '{"shardid":4, "cost":100, "nodename":"a"}', + '{"shardid":5, "cost":50, "nodename":"a", "next_colocation": true}', + '{"shardid":6, "cost":50, "nodename":"a"}', + '{"shardid":7, "cost":50, "nodename":"b", "next_colocation": true}', + '{"shardid":8, "cost":50, "nodename":"b"}', + '{"shardid":9, "cost":50, "nodename":"b"}', + '{"shardid":10, "cost":50, "nodename":"b"}', + '{"shardid":11, "cost":50, "nodename":"b"}', + '{"shardid":12, "cost":50, "nodename":"b"}' + ]::json[], + improvement_threshold := 0.1 +)); + unnest +--------------------------------------------------------------------- + {"updatetype":1,"shardid":7,"sourcename":"b","sourceport":5432,"targetname":"a","targetport":5432} + {"updatetype":1,"shardid":8,"sourcename":"b","sourceport":5432,"targetname":"c","targetport":5432} + {"updatetype":1,"shardid":9,"sourcename":"b","sourceport":5432,"targetname":"a","targetport":5432} + {"updatetype":1,"shardid":10,"sourcename":"b","sourceport":5432,"targetname":"c","targetport":5432} + {"updatetype":1,"shardid":4,"sourcename":"a","sourceport":5432,"targetname":"b","targetport":5432} + {"updatetype":1,"shardid":5,"sourcename":"a","sourceport":5432,"targetname":"c","targetport":5432} + {"updatetype":1,"shardid":1,"sourcename":"a","sourceport":5432,"targetname":"c","targetport":5432} +(7 rows) + diff --git a/src/test/regress/sql/shard_rebalancer.sql b/src/test/regress/sql/shard_rebalancer.sql index 02a6df666..dbbc94732 100644 --- a/src/test/regress/sql/shard_rebalancer.sql +++ b/src/test/regress/sql/shard_rebalancer.sql @@ -1462,6 +1462,41 @@ DROP VIEW table_placements_per_node; DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='capacity_high_worker_2'; DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='only_worker_1'; +-- add colocation groups with shard group count < worker count +-- the rebalancer should balance those "unbalanced shards" evenly as much as possible +SELECT 1 FROM citus_remove_node('localhost', :worker_2_port); +create table single_shard_colocation_1a (a int primary key); +create table single_shard_colocation_1b (a int primary key); +create table single_shard_colocation_1c (a int primary key); +SET citus.shard_replication_factor = 1; +select create_distributed_table('single_shard_colocation_1a','a', colocate_with => 'none', shard_count => 1); +select create_distributed_table('single_shard_colocation_1b','a',colocate_with=>'single_shard_colocation_1a'); +select create_distributed_table('single_shard_colocation_1c','a',colocate_with=>'single_shard_colocation_1b'); + +create table single_shard_colocation_2a (a bigint); +create table single_shard_colocation_2b (a bigint); +select create_distributed_table('single_shard_colocation_2a','a', colocate_with => 'none', shard_count => 1); +select create_distributed_table('single_shard_colocation_2b','a',colocate_with=>'single_shard_colocation_2a'); + +-- all shards are placed on the first worker node +SELECT sh.logicalrelid, pl.nodeport + FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid + WHERE sh.logicalrelid::text IN ('single_shard_colocation_1a', 'single_shard_colocation_1b', 'single_shard_colocation_1c', 'single_shard_colocation_2a', 'single_shard_colocation_2b') + ORDER BY sh.logicalrelid; + +-- add the second node back, then rebalance +ALTER SEQUENCE pg_dist_groupid_seq RESTART WITH 16; +select 1 from citus_add_node('localhost', :worker_2_port); +select rebalance_table_shards(); + +-- verify some shards are moved to the new node +SELECT sh.logicalrelid, pl.nodeport + FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid + WHERE sh.logicalrelid::text IN ('single_shard_colocation_1a', 'single_shard_colocation_1b', 'single_shard_colocation_1c', 'single_shard_colocation_2a', 'single_shard_colocation_2b') + ORDER BY sh.logicalrelid; + +DROP TABLE single_shard_colocation_1a, single_shard_colocation_1b, single_shard_colocation_1c, single_shard_colocation_2a, single_shard_colocation_2b CASCADE; + \c - - - :worker_1_port SET citus.enable_ddl_propagation TO OFF; REVOKE ALL ON SCHEMA public FROM testrole; diff --git a/src/test/regress/sql/shard_rebalancer_unit.sql b/src/test/regress/sql/shard_rebalancer_unit.sql index 51293a227..607be4710 100644 --- a/src/test/regress/sql/shard_rebalancer_unit.sql +++ b/src/test/regress/sql/shard_rebalancer_unit.sql @@ -530,3 +530,57 @@ SELECT unnest(shard_placement_rebalance_array( ]::json[], improvement_threshold := 0.6 )); + + +-- Test single shard colocation groups +SELECT unnest(shard_placement_rebalance_array( + ARRAY['{"node_name": "a"}', + '{"node_name": "b"}']::json[], + ARRAY['{"shardid":1, "cost":20, "nodename":"a"}', + '{"shardid":2, "cost":10, "nodename":"a", "next_colocation": true}', + '{"shardid":3, "cost":10, "nodename":"a", "next_colocation": true}', + '{"shardid":4, "cost":100, "nodename":"a", "next_colocation": true}', + '{"shardid":5, "cost":50, "nodename":"a", "next_colocation": true}', + '{"shardid":6, "cost":50, "nodename":"a", "next_colocation": true}' + ]::json[], + improvement_threshold := 0.1 +)); + + +-- Test colocation groups with shard count < worker count +SELECT unnest(shard_placement_rebalance_array( + ARRAY['{"node_name": "a"}', + '{"node_name": "b"}', + '{"node_name": "c"}']::json[], + ARRAY['{"shardid":1, "cost":20, "nodename":"a"}', + '{"shardid":2, "cost":10, "nodename":"a"}', + '{"shardid":3, "cost":10, "nodename":"a", "next_colocation": true}', + '{"shardid":4, "cost":100, "nodename":"a"}', + '{"shardid":5, "cost":50, "nodename":"a", "next_colocation": true}', + '{"shardid":6, "cost":50, "nodename":"a"}' + ]::json[], + improvement_threshold := 0.1 +)); + + +-- Test colocation groups with shard count < worker count +-- mixed with a colocation group shard_count > worker count +SELECT unnest(shard_placement_rebalance_array( + ARRAY['{"node_name": "a"}', + '{"node_name": "b"}', + '{"node_name": "c"}']::json[], + ARRAY['{"shardid":1, "cost":20, "nodename":"a"}', + '{"shardid":2, "cost":10, "nodename":"a"}', + '{"shardid":3, "cost":10, "nodename":"a", "next_colocation": true}', + '{"shardid":4, "cost":100, "nodename":"a"}', + '{"shardid":5, "cost":50, "nodename":"a", "next_colocation": true}', + '{"shardid":6, "cost":50, "nodename":"a"}', + '{"shardid":7, "cost":50, "nodename":"b", "next_colocation": true}', + '{"shardid":8, "cost":50, "nodename":"b"}', + '{"shardid":9, "cost":50, "nodename":"b"}', + '{"shardid":10, "cost":50, "nodename":"b"}', + '{"shardid":11, "cost":50, "nodename":"b"}', + '{"shardid":12, "cost":50, "nodename":"b"}' + ]::json[], + improvement_threshold := 0.1 +)); From 4043abd5aaa93bc9b51a87686f3961ede05145ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Emel=20=C5=9Eim=C5=9Fek?= Date: Tue, 7 Mar 2023 18:15:50 +0300 Subject: [PATCH 7/9] Exclude-Generated-Columns-In-Copy (#6721) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit DESCRIPTION: Fixes a bug in shard copy operations. For copying shards in both shard move and shard split operations, Citus uses the COPY statement. A COPY all statement in the following form ` COPY target_shard FROM STDIN;` throws an error when there is a GENERATED column in the shard table. In order to fix this issue, we need to exclude the GENERATED columns in the COPY and the matching SELECT statements. Hence this fix converts the COPY and SELECT all statements to the following form: ``` COPY target_shard (col1, col2, ..., coln) FROM STDIN; SELECT (col1, col2, ..., coln) FROM source_shard; ``` where (col1, col2, ..., coln) does not include a GENERATED column. GENERATED column values are created in the target_shard as the values are inserted. Fixes #6705. --------- Co-authored-by: Teja Mupparti Co-authored-by: aykut-bozkurt <51649454+aykut-bozkurt@users.noreply.github.com> Co-authored-by: Jelte Fennema Co-authored-by: Gürkan İndibay --- .../worker_copy_table_to_node_udf.c | 8 +- .../operations/worker_shard_copy.c | 70 ++++++++++++++-- .../operations/worker_split_copy_udf.c | 7 +- src/include/distributed/worker_shard_copy.h | 5 ++ .../citus_non_blocking_split_shards.out | 5 +- .../citus_split_shard_by_split_points.out | 5 +- src/test/regress/expected/multi_move_mx.out | 32 ++++++++ .../expected/worker_split_copy_test.out | 82 +++++++++++++++++++ .../sql/citus_non_blocking_split_shards.sql | 6 +- .../sql/citus_split_shard_by_split_points.sql | 6 +- src/test/regress/sql/multi_move_mx.sql | 26 ++++++ .../regress/sql/worker_split_copy_test.sql | 58 +++++++++++++ 12 files changed, 295 insertions(+), 15 deletions(-) diff --git a/src/backend/distributed/operations/worker_copy_table_to_node_udf.c b/src/backend/distributed/operations/worker_copy_table_to_node_udf.c index 7af80ef55..f0f83744d 100644 --- a/src/backend/distributed/operations/worker_copy_table_to_node_udf.c +++ b/src/backend/distributed/operations/worker_copy_table_to_node_udf.c @@ -53,8 +53,14 @@ worker_copy_table_to_node(PG_FUNCTION_ARGS) targetNodeId); StringInfo selectShardQueryForCopy = makeStringInfo(); + + /* + * Even though we do COPY(SELECT ...) all the columns, we can't just do SELECT * because we need to not COPY generated colums. + */ + const char *columnList = CopyableColumnNamesFromRelationName(relationSchemaName, + relationName); appendStringInfo(selectShardQueryForCopy, - "SELECT * FROM %s;", relationQualifiedName); + "SELECT %s FROM %s;", columnList, relationQualifiedName); ParamListInfo params = NULL; ExecuteQueryStringIntoDestReceiver(selectShardQueryForCopy->data, params, diff --git a/src/backend/distributed/operations/worker_shard_copy.c b/src/backend/distributed/operations/worker_shard_copy.c index 9239caffb..e9c2af512 100644 --- a/src/backend/distributed/operations/worker_shard_copy.c +++ b/src/backend/distributed/operations/worker_shard_copy.c @@ -73,7 +73,7 @@ static void ShardCopyDestReceiverDestroy(DestReceiver *destReceiver); static bool CanUseLocalCopy(uint32_t destinationNodeId); static StringInfo ConstructShardCopyStatement(List *destinationShardFullyQualifiedName, bool - useBinaryFormat); + useBinaryFormat, TupleDesc tupleDesc); static void WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest); static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead); static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState @@ -105,7 +105,8 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest) StringInfo copyStatement = ConstructShardCopyStatement( copyDest->destinationShardFullyQualifiedName, - copyDest->copyOutState->binary); + copyDest->copyOutState->binary, + copyDest->tupleDescriptor); if (!SendRemoteCommand(copyDest->connection, copyStatement->data)) { @@ -344,21 +345,80 @@ ShardCopyDestReceiverDestroy(DestReceiver *dest) } +/* + * CopyableColumnNamesFromTupleDesc function creates and returns a comma seperated column names string to be used in COPY + * and SELECT statements when copying a table. The COPY and SELECT statements should filter out the GENERATED columns since COPY + * statement fails to handle them. Iterating over the attributes of the table we also need to skip the dropped columns. + */ +const char * +CopyableColumnNamesFromTupleDesc(TupleDesc tupDesc) +{ + StringInfo columnList = makeStringInfo(); + bool firstInList = true; + + for (int i = 0; i < tupDesc->natts; i++) + { + Form_pg_attribute att = TupleDescAttr(tupDesc, i); + if (att->attgenerated || att->attisdropped) + { + continue; + } + if (!firstInList) + { + appendStringInfo(columnList, ","); + } + + firstInList = false; + + appendStringInfo(columnList, "%s", quote_identifier(NameStr(att->attname))); + } + + return columnList->data; +} + + +/* + * CopyableColumnNamesFromRelationName function is a wrapper for CopyableColumnNamesFromTupleDesc. + */ +const char * +CopyableColumnNamesFromRelationName(const char *schemaName, const char *relationName) +{ + Oid namespaceOid = get_namespace_oid(schemaName, true); + + Oid relationId = get_relname_relid(relationName, namespaceOid); + + Relation relation = relation_open(relationId, AccessShareLock); + + TupleDesc tupleDesc = RelationGetDescr(relation); + + const char *columnList = CopyableColumnNamesFromTupleDesc(tupleDesc); + + relation_close(relation, NoLock); + + return columnList; +} + + /* * ConstructShardCopyStatement constructs the text of a COPY statement * for copying into a result table */ static StringInfo ConstructShardCopyStatement(List *destinationShardFullyQualifiedName, bool - useBinaryFormat) + useBinaryFormat, + TupleDesc tupleDesc) { char *destinationShardSchemaName = linitial(destinationShardFullyQualifiedName); char *destinationShardRelationName = lsecond(destinationShardFullyQualifiedName); + StringInfo command = makeStringInfo(); - appendStringInfo(command, "COPY %s.%s FROM STDIN", + + const char *columnList = CopyableColumnNamesFromTupleDesc(tupleDesc); + + appendStringInfo(command, "COPY %s.%s (%s) FROM STDIN", quote_identifier(destinationShardSchemaName), quote_identifier( - destinationShardRelationName)); + destinationShardRelationName), columnList); if (useBinaryFormat) { diff --git a/src/backend/distributed/operations/worker_split_copy_udf.c b/src/backend/distributed/operations/worker_split_copy_udf.c index b96475992..c154ac040 100644 --- a/src/backend/distributed/operations/worker_split_copy_udf.c +++ b/src/backend/distributed/operations/worker_split_copy_udf.c @@ -110,8 +110,13 @@ worker_split_copy(PG_FUNCTION_ARGS) splitCopyInfoList)))); StringInfo selectShardQueryForCopy = makeStringInfo(); + const char *columnList = CopyableColumnNamesFromRelationName( + sourceShardToCopySchemaName, + sourceShardToCopyName); + appendStringInfo(selectShardQueryForCopy, - "SELECT * FROM %s;", sourceShardToCopyQualifiedName); + "SELECT %s FROM %s;", columnList, + sourceShardToCopyQualifiedName); ParamListInfo params = NULL; ExecuteQueryStringIntoDestReceiver(selectShardQueryForCopy->data, params, diff --git a/src/include/distributed/worker_shard_copy.h b/src/include/distributed/worker_shard_copy.h index 2ab2775f9..77f57c761 100644 --- a/src/include/distributed/worker_shard_copy.h +++ b/src/include/distributed/worker_shard_copy.h @@ -19,4 +19,9 @@ extern DestReceiver * CreateShardCopyDestReceiver(EState *executorState, List *destinationShardFullyQualifiedName, uint32_t destinationNodeId); +extern const char * CopyableColumnNamesFromRelationName(const char *schemaName, const + char *relationName); + +extern const char * CopyableColumnNamesFromTupleDesc(TupleDesc tupdesc); + #endif /* WORKER_SHARD_COPY_H_ */ diff --git a/src/test/regress/expected/citus_non_blocking_split_shards.out b/src/test/regress/expected/citus_non_blocking_split_shards.out index d6dde8b7a..fe3cade55 100644 --- a/src/test/regress/expected/citus_non_blocking_split_shards.out +++ b/src/test/regress/expected/citus_non_blocking_split_shards.out @@ -60,7 +60,7 @@ SELECT create_reference_table('reference_table'); (1 row) -CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY); +CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY, genid integer GENERATED ALWAYS AS ( measureid + 3 ) stored, value varchar(44), col_todrop integer); CLUSTER colocated_dist_table USING colocated_dist_table_pkey; SELECT create_distributed_table('colocated_dist_table', 'measureid', colocate_with:='sensors'); create_distributed_table @@ -84,8 +84,9 @@ ALTER TABLE sensors ADD CONSTRAINT fkey_table_to_dist FOREIGN KEY (measureid) RE -- END : Create Foreign key constraints. -- BEGIN : Load data into tables. INSERT INTO reference_table SELECT i FROM generate_series(0,1000)i; -INSERT INTO colocated_dist_table SELECT i FROM generate_series(0,1000)i; +INSERT INTO colocated_dist_table(measureid, value, col_todrop) SELECT i,'Value',i FROM generate_series(0,1000)i; INSERT INTO sensors SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' FROM generate_series(0,1000)i; +ALTER TABLE colocated_dist_table DROP COLUMN col_todrop; SELECT COUNT(*) FROM sensors; count --------------------------------------------------------------------- diff --git a/src/test/regress/expected/citus_split_shard_by_split_points.out b/src/test/regress/expected/citus_split_shard_by_split_points.out index 87f50da31..13f3b7a36 100644 --- a/src/test/regress/expected/citus_split_shard_by_split_points.out +++ b/src/test/regress/expected/citus_split_shard_by_split_points.out @@ -56,7 +56,7 @@ SELECT create_reference_table('reference_table'); (1 row) -CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY); +CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY, genid integer GENERATED ALWAYS AS ( measureid + 3 ) stored, value varchar(44), col_todrop integer); CLUSTER colocated_dist_table USING colocated_dist_table_pkey; SELECT create_distributed_table('colocated_dist_table', 'measureid', colocate_with:='sensors'); create_distributed_table @@ -80,8 +80,9 @@ ALTER TABLE sensors ADD CONSTRAINT fkey_table_to_dist FOREIGN KEY (measureid) RE -- END : Create Foreign key constraints. -- BEGIN : Load data into tables. INSERT INTO reference_table SELECT i FROM generate_series(0,1000)i; -INSERT INTO colocated_dist_table SELECT i FROM generate_series(0,1000)i; +INSERT INTO colocated_dist_table(measureid, value, col_todrop) SELECT i,'Value',i FROM generate_series(0,1000)i; INSERT INTO sensors SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' FROM generate_series(0,1000)i; +ALTER TABLE colocated_dist_table DROP COLUMN col_todrop; SELECT COUNT(*) FROM sensors; count --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_move_mx.out b/src/test/regress/expected/multi_move_mx.out index 833c9f7df..b6cc5d0d7 100644 --- a/src/test/regress/expected/multi_move_mx.out +++ b/src/test/regress/expected/multi_move_mx.out @@ -238,8 +238,40 @@ ORDER BY LIMIT 1 OFFSET 1; ERROR: operation is not allowed on this node HINT: Connect to the coordinator and run it again. +-- Check that shards of a table with GENERATED columns can be moved. +\c - - - :master_port +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 1; +CREATE TABLE mx_table_with_generated_column (a int, b int GENERATED ALWAYS AS ( a + 3 ) STORED, c int); +SELECT create_distributed_table('mx_table_with_generated_column', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Check that dropped columns are handled properly in a move. +ALTER TABLE mx_table_with_generated_column DROP COLUMN c; +-- Move a shard from worker 1 to worker 2 +SELECT + citus_move_shard_placement(shardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical') +FROM + pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE + logicalrelid = 'mx_table_with_generated_column'::regclass + AND nodeport = :worker_1_port +ORDER BY + shardid +LIMIT 1; + citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + -- Cleanup \c - - - :master_port +SET client_min_messages TO WARNING; +CALL citus_cleanup_orphaned_resources(); +DROP TABLE mx_table_with_generated_column; DROP TABLE mx_table_1; DROP TABLE mx_table_2; DROP TABLE mx_table_3; diff --git a/src/test/regress/expected/worker_split_copy_test.out b/src/test/regress/expected/worker_split_copy_test.out index 67d515198..f4fae57e0 100644 --- a/src/test/regress/expected/worker_split_copy_test.out +++ b/src/test/regress/expected/worker_split_copy_test.out @@ -142,8 +142,90 @@ SELECT COUNT(*) FROM worker_split_copy_test."test !/ \n _""dist_123_table_810700 (1 row) -- END: List updated row count for local targets shard. +-- Check that GENERATED columns are handled properly in a shard split operation. +\c - - - :master_port +SET search_path TO worker_split_copy_test; +SET citus.shard_count TO 2; +SET citus.shard_replication_factor TO 1; +SET citus.next_shard_id TO 81080000; +-- BEGIN: Create distributed table and insert data. +CREATE TABLE worker_split_copy_test.dist_table_with_generated_col(id int primary key, new_id int GENERATED ALWAYS AS ( id + 3 ) stored, value char, col_todrop int); +SELECT create_distributed_table('dist_table_with_generated_col', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Check that dropped columns are filtered out in COPY command. +ALTER TABLE dist_table_with_generated_col DROP COLUMN col_todrop; +INSERT INTO dist_table_with_generated_col (id, value) (SELECT g.id, 'N' FROM generate_series(1, 1000) AS g(id)); +-- END: Create distributed table and insert data. +-- BEGIN: Create target shards in Worker1 and Worker2 for a 2-way split copy. +\c - - - :worker_1_port +CREATE TABLE worker_split_copy_test.dist_table_with_generated_col_81080015(id int primary key, new_id int GENERATED ALWAYS AS ( id + 3 ) stored, value char); +\c - - - :worker_2_port +CREATE TABLE worker_split_copy_test.dist_table_with_generated_col_81080016(id int primary key, new_id int GENERATED ALWAYS AS ( id + 3 ) stored, value char); +-- BEGIN: List row count for source shard and targets shard in Worker1. +\c - - - :worker_1_port +SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080000; + count +--------------------------------------------------------------------- + 510 +(1 row) + +SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080015; + count +--------------------------------------------------------------------- + 0 +(1 row) + +-- BEGIN: List row count for target shard in Worker2. +\c - - - :worker_2_port +SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080016; + count +--------------------------------------------------------------------- + 0 +(1 row) + +\c - - - :worker_1_port +SELECT * from worker_split_copy( + 81080000, -- source shard id to copy + 'id', + ARRAY[ + -- split copy info for split children 1 + ROW(81080015, -- destination shard id + -2147483648, -- split range begin + -1073741824, --split range end + :worker_1_node)::pg_catalog.split_copy_info, + -- split copy info for split children 2 + ROW(81080016, --destination shard id + -1073741823, --split range begin + -1, --split range end + :worker_2_node)::pg_catalog.split_copy_info + ] + ); + worker_split_copy +--------------------------------------------------------------------- + +(1 row) + +\c - - - :worker_1_port +SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080015; + count +--------------------------------------------------------------------- + 247 +(1 row) + +\c - - - :worker_2_port +SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080016; + count +--------------------------------------------------------------------- + 263 +(1 row) + -- BEGIN: CLEANUP. \c - - - :master_port SET client_min_messages TO WARNING; +CALL citus_cleanup_orphaned_resources(); DROP SCHEMA worker_split_copy_test CASCADE; -- END: CLEANUP. diff --git a/src/test/regress/sql/citus_non_blocking_split_shards.sql b/src/test/regress/sql/citus_non_blocking_split_shards.sql index 11275a342..909beac02 100644 --- a/src/test/regress/sql/citus_non_blocking_split_shards.sql +++ b/src/test/regress/sql/citus_non_blocking_split_shards.sql @@ -53,7 +53,7 @@ SELECT create_distributed_table('sensors', 'measureid', colocate_with:='none'); CREATE TABLE reference_table (measureid integer PRIMARY KEY); SELECT create_reference_table('reference_table'); -CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY); +CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY, genid integer GENERATED ALWAYS AS ( measureid + 3 ) stored, value varchar(44), col_todrop integer); CLUSTER colocated_dist_table USING colocated_dist_table_pkey; SELECT create_distributed_table('colocated_dist_table', 'measureid', colocate_with:='sensors'); @@ -70,9 +70,11 @@ ALTER TABLE sensors ADD CONSTRAINT fkey_table_to_dist FOREIGN KEY (measureid) RE -- BEGIN : Load data into tables. INSERT INTO reference_table SELECT i FROM generate_series(0,1000)i; -INSERT INTO colocated_dist_table SELECT i FROM generate_series(0,1000)i; +INSERT INTO colocated_dist_table(measureid, value, col_todrop) SELECT i,'Value',i FROM generate_series(0,1000)i; INSERT INTO sensors SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' FROM generate_series(0,1000)i; +ALTER TABLE colocated_dist_table DROP COLUMN col_todrop; + SELECT COUNT(*) FROM sensors; SELECT COUNT(*) FROM reference_table; SELECT COUNT(*) FROM colocated_dist_table; diff --git a/src/test/regress/sql/citus_split_shard_by_split_points.sql b/src/test/regress/sql/citus_split_shard_by_split_points.sql index f5e7f005a..47b28b9d7 100644 --- a/src/test/regress/sql/citus_split_shard_by_split_points.sql +++ b/src/test/regress/sql/citus_split_shard_by_split_points.sql @@ -49,7 +49,7 @@ SELECT create_distributed_table('sensors', 'measureid', colocate_with:='none'); CREATE TABLE reference_table (measureid integer PRIMARY KEY); SELECT create_reference_table('reference_table'); -CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY); +CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY, genid integer GENERATED ALWAYS AS ( measureid + 3 ) stored, value varchar(44), col_todrop integer); CLUSTER colocated_dist_table USING colocated_dist_table_pkey; SELECT create_distributed_table('colocated_dist_table', 'measureid', colocate_with:='sensors'); @@ -66,9 +66,11 @@ ALTER TABLE sensors ADD CONSTRAINT fkey_table_to_dist FOREIGN KEY (measureid) RE -- BEGIN : Load data into tables. INSERT INTO reference_table SELECT i FROM generate_series(0,1000)i; -INSERT INTO colocated_dist_table SELECT i FROM generate_series(0,1000)i; +INSERT INTO colocated_dist_table(measureid, value, col_todrop) SELECT i,'Value',i FROM generate_series(0,1000)i; INSERT INTO sensors SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' FROM generate_series(0,1000)i; +ALTER TABLE colocated_dist_table DROP COLUMN col_todrop; + SELECT COUNT(*) FROM sensors; SELECT COUNT(*) FROM reference_table; SELECT COUNT(*) FROM colocated_dist_table; diff --git a/src/test/regress/sql/multi_move_mx.sql b/src/test/regress/sql/multi_move_mx.sql index 166069a6e..9cfa8a3db 100644 --- a/src/test/regress/sql/multi_move_mx.sql +++ b/src/test/regress/sql/multi_move_mx.sql @@ -151,8 +151,34 @@ ORDER BY shardid LIMIT 1 OFFSET 1; +-- Check that shards of a table with GENERATED columns can be moved. +\c - - - :master_port +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 1; + +CREATE TABLE mx_table_with_generated_column (a int, b int GENERATED ALWAYS AS ( a + 3 ) STORED, c int); +SELECT create_distributed_table('mx_table_with_generated_column', 'a'); + +-- Check that dropped columns are handled properly in a move. +ALTER TABLE mx_table_with_generated_column DROP COLUMN c; + +-- Move a shard from worker 1 to worker 2 +SELECT + citus_move_shard_placement(shardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical') +FROM + pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE + logicalrelid = 'mx_table_with_generated_column'::regclass + AND nodeport = :worker_1_port +ORDER BY + shardid +LIMIT 1; + -- Cleanup \c - - - :master_port +SET client_min_messages TO WARNING; +CALL citus_cleanup_orphaned_resources(); +DROP TABLE mx_table_with_generated_column; DROP TABLE mx_table_1; DROP TABLE mx_table_2; DROP TABLE mx_table_3; diff --git a/src/test/regress/sql/worker_split_copy_test.sql b/src/test/regress/sql/worker_split_copy_test.sql index 2fac91c69..e2f4f9a23 100644 --- a/src/test/regress/sql/worker_split_copy_test.sql +++ b/src/test/regress/sql/worker_split_copy_test.sql @@ -110,8 +110,66 @@ SELECT COUNT(*) FROM worker_split_copy_test."test !/ \n _""dist_123_table_810700 SELECT COUNT(*) FROM worker_split_copy_test."test !/ \n _""dist_123_table_81070016"; -- END: List updated row count for local targets shard. +-- Check that GENERATED columns are handled properly in a shard split operation. +\c - - - :master_port +SET search_path TO worker_split_copy_test; +SET citus.shard_count TO 2; +SET citus.shard_replication_factor TO 1; +SET citus.next_shard_id TO 81080000; + +-- BEGIN: Create distributed table and insert data. +CREATE TABLE worker_split_copy_test.dist_table_with_generated_col(id int primary key, new_id int GENERATED ALWAYS AS ( id + 3 ) stored, value char, col_todrop int); +SELECT create_distributed_table('dist_table_with_generated_col', 'id'); + +-- Check that dropped columns are filtered out in COPY command. +ALTER TABLE dist_table_with_generated_col DROP COLUMN col_todrop; + +INSERT INTO dist_table_with_generated_col (id, value) (SELECT g.id, 'N' FROM generate_series(1, 1000) AS g(id)); + +-- END: Create distributed table and insert data. + +-- BEGIN: Create target shards in Worker1 and Worker2 for a 2-way split copy. +\c - - - :worker_1_port +CREATE TABLE worker_split_copy_test.dist_table_with_generated_col_81080015(id int primary key, new_id int GENERATED ALWAYS AS ( id + 3 ) stored, value char); +\c - - - :worker_2_port +CREATE TABLE worker_split_copy_test.dist_table_with_generated_col_81080016(id int primary key, new_id int GENERATED ALWAYS AS ( id + 3 ) stored, value char); + +-- BEGIN: List row count for source shard and targets shard in Worker1. +\c - - - :worker_1_port +SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080000; +SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080015; + +-- BEGIN: List row count for target shard in Worker2. +\c - - - :worker_2_port +SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080016; + +\c - - - :worker_1_port +SELECT * from worker_split_copy( + 81080000, -- source shard id to copy + 'id', + ARRAY[ + -- split copy info for split children 1 + ROW(81080015, -- destination shard id + -2147483648, -- split range begin + -1073741824, --split range end + :worker_1_node)::pg_catalog.split_copy_info, + -- split copy info for split children 2 + ROW(81080016, --destination shard id + -1073741823, --split range begin + -1, --split range end + :worker_2_node)::pg_catalog.split_copy_info + ] + ); + +\c - - - :worker_1_port +SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080015; + +\c - - - :worker_2_port +SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080016; + -- BEGIN: CLEANUP. \c - - - :master_port SET client_min_messages TO WARNING; +CALL citus_cleanup_orphaned_resources(); DROP SCHEMA worker_split_copy_test CASCADE; -- END: CLEANUP. From d82c11f7931636551f6d97b98cdfcf6d4b20a797 Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Wed, 8 Mar 2023 13:38:51 +0300 Subject: [PATCH 8/9] Refactor CreateDistributedTable() (#6742) Split the main logic that allows creating a Citus table into the internal function CreateCitusTable(). Old CreateDistributedTable() function was assuming that it's creating a reference table when the distribution method is DISTRIBUTE_BY_NONE. However, soon this won't be the case when adding support for creating single-shard distributed tables because their distribution method would also be the same. Now the internal method CreateCitusTable() doesn't make any assumptions about table's replication model or such. Instead, it expects callers to properly set all such metadata bits. Even more, some of the parameters the old CreateDistributedTable() takes --such as the shard count-- were not meaningful for a reference table, and would be the same as for new table type. --- .../distributed/commands/alter_table.c | 3 +- .../commands/create_distributed_table.c | 95 +++++++++++++------ src/include/distributed/metadata_utility.h | 1 + 3 files changed, 70 insertions(+), 29 deletions(-) diff --git a/src/backend/distributed/commands/alter_table.c b/src/backend/distributed/commands/alter_table.c index 0592cb762..f51b62535 100644 --- a/src/backend/distributed/commands/alter_table.c +++ b/src/backend/distributed/commands/alter_table.c @@ -1348,8 +1348,7 @@ CreateCitusTableLike(TableConversionState *con) } else if (IsCitusTableType(con->relationId, REFERENCE_TABLE)) { - CreateDistributedTable(con->newRelationId, NULL, DISTRIBUTE_BY_NONE, 0, false, - NULL); + CreateReferenceTable(con->newRelationId); } else if (IsCitusTableType(con->relationId, CITUS_LOCAL_TABLE)) { diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 86133322d..101d866f0 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -106,11 +106,17 @@ static void CreateDistributedTableConcurrently(Oid relationId, char *colocateWithTableName, int shardCount, bool shardCountIsStrict); -static char DecideReplicationModel(char distributionMethod, char *colocateWithTableName); +static char DecideDistTableReplicationModel(char distributionMethod, + char *colocateWithTableName); static List * HashSplitPointsForShardList(List *shardList); static List * HashSplitPointsForShardCount(int shardCount); static List * WorkerNodesForShardList(List *shardList); static List * RoundRobinWorkerNodeList(List *workerNodeList, int listLength); +static void CreateCitusTable(Oid relationId, char *distributionColumnName, + char distributionMethod, + int shardCount, bool shardCountIsStrict, + char *colocateWithTableName, + char replicationModel); static void CreateHashDistributedTableShards(Oid relationId, int shardCount, Oid colocatedTableId, bool localTableEmpty); static uint32 ColocationIdForNewTable(Oid relationId, Var *distributionColumn, @@ -377,8 +383,8 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName, EnsureForeignKeysForDistributedTableConcurrently(relationId); - char replicationModel = DecideReplicationModel(distributionMethod, - colocateWithTableName); + char replicationModel = DecideDistTableReplicationModel(distributionMethod, + colocateWithTableName); /* * we fail transaction before local table conversion if the table could not be colocated with @@ -622,8 +628,8 @@ static void EnsureColocateWithTableIsValid(Oid relationId, char distributionMethod, char *distributionColumnName, char *colocateWithTableName) { - char replicationModel = DecideReplicationModel(distributionMethod, - colocateWithTableName); + char replicationModel = DecideDistTableReplicationModel(distributionMethod, + colocateWithTableName); /* * we fail transaction before local table conversion if the table could not be colocated with @@ -860,9 +866,6 @@ create_reference_table(PG_FUNCTION_ARGS) CheckCitusVersion(ERROR); Oid relationId = PG_GETARG_OID(0); - char *colocateWithTableName = NULL; - char *distributionColumnName = NULL; - EnsureCitusTableCanBeCreated(relationId); /* enable create_reference_table on an empty node */ @@ -895,8 +898,7 @@ create_reference_table(PG_FUNCTION_ARGS) errdetail("There are no active worker nodes."))); } - CreateDistributedTable(relationId, distributionColumnName, DISTRIBUTE_BY_NONE, - ShardCount, false, colocateWithTableName); + CreateReferenceTable(relationId); PG_RETURN_VOID(); } @@ -951,17 +953,61 @@ EnsureRelationExists(Oid relationId) /* - * CreateDistributedTable creates distributed table in the given configuration. + * CreateReferenceTable is a wrapper around CreateCitusTable that creates a + * distributed table. + */ +void +CreateDistributedTable(Oid relationId, char *distributionColumnName, + char distributionMethod, + int shardCount, bool shardCountIsStrict, + char *colocateWithTableName) +{ + Assert(distributionMethod != DISTRIBUTE_BY_NONE); + + char replicationModel = DecideDistTableReplicationModel(distributionMethod, + colocateWithTableName); + CreateCitusTable(relationId, distributionColumnName, + distributionMethod, shardCount, + shardCountIsStrict, colocateWithTableName, + replicationModel); +} + + +/* + * CreateReferenceTable is a wrapper around CreateCitusTable that creates a + * reference table. + */ +void +CreateReferenceTable(Oid relationId) +{ + char *distributionColumnName = NULL; + char distributionMethod = DISTRIBUTE_BY_NONE; + int shardCount = 1; + bool shardCountIsStrict = true; + char *colocateWithTableName = NULL; + char replicationModel = REPLICATION_MODEL_2PC; + CreateCitusTable(relationId, distributionColumnName, + distributionMethod, shardCount, + shardCountIsStrict, colocateWithTableName, + replicationModel); +} + + +/* + * CreateCitusTable is the internal method that creates a Citus table in + * given configuration. + * * This functions contains all necessary logic to create distributed tables. It * performs necessary checks to ensure distributing the table is safe. If it is * safe to distribute the table, this function creates distributed table metadata, * creates shards and copies local data to shards. This function also handles * partitioned tables by distributing its partitions as well. */ -void -CreateDistributedTable(Oid relationId, char *distributionColumnName, - char distributionMethod, int shardCount, - bool shardCountIsStrict, char *colocateWithTableName) +static void +CreateCitusTable(Oid relationId, char *distributionColumnName, + char distributionMethod, int shardCount, + bool shardCountIsStrict, char *colocateWithTableName, + char replicationModel) { /* * EnsureTableNotDistributed errors out when relation is a citus table but @@ -1022,9 +1068,6 @@ CreateDistributedTable(Oid relationId, char *distributionColumnName, PropagatePrerequisiteObjectsForDistributedTable(relationId); - char replicationModel = DecideReplicationModel(distributionMethod, - colocateWithTableName); - Var *distributionColumn = BuildDistributionKeyFromColumnName(relationId, distributionColumnName, NoLock); @@ -1420,18 +1463,16 @@ DropFKeysRelationInvolvedWithTableType(Oid relationId, int tableTypeFlag) /* - * DecideReplicationModel function decides which replication model should be - * used depending on given distribution configuration. + * DecideDistTableReplicationModel function decides which replication model should be + * used for a distributed table depending on given distribution configuration. */ static char -DecideReplicationModel(char distributionMethod, char *colocateWithTableName) +DecideDistTableReplicationModel(char distributionMethod, char *colocateWithTableName) { - if (distributionMethod == DISTRIBUTE_BY_NONE) - { - return REPLICATION_MODEL_2PC; - } - else if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0 && - !IsColocateWithNone(colocateWithTableName)) + Assert(distributionMethod != DISTRIBUTE_BY_NONE); + + if (!IsColocateWithDefault(colocateWithTableName) && + !IsColocateWithNone(colocateWithTableName)) { text *colocateWithTableNameText = cstring_to_text(colocateWithTableName); Oid colocatedRelationId = ResolveRelationId(colocateWithTableNameText, false); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index ceea51678..acb4ae5da 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -325,6 +325,7 @@ extern void DeleteShardPlacementRow(uint64 placementId); extern void CreateDistributedTable(Oid relationId, char *distributionColumnName, char distributionMethod, int shardCount, bool shardCountIsStrict, char *colocateWithTableName); +extern void CreateReferenceTable(Oid relationId); extern void CreateTruncateTrigger(Oid relationId); extern TableConversionReturn * UndistributeTable(TableConversionParameters *params); From e3cf7ace7c1b43c380348e3a5ebd5f65d0f27a76 Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Wed, 8 Mar 2023 15:25:36 +0300 Subject: [PATCH 9/9] Stabilize single_node.sql and others that report illegal node removal (#6751) See https://app.circleci.com/pipelines/github/citusdata/citus/30859/workflows/223d61db-8c1d-4909-9aea-d8e470f0368b/jobs/1009243. --- .../distributed/metadata/node_metadata.c | 4 +++ .../planner/multi_physical_planner.c | 32 +++++++++++++++++-- .../distributed/multi_physical_planner.h | 1 + 3 files changed, 35 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index f6639f8d2..72103b9e1 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -1918,6 +1918,10 @@ ErrorIfNodeContainsNonRemovablePlacements(WorkerNode *workerNode) { int32 groupId = workerNode->groupId; List *shardPlacements = AllShardPlacementsOnNodeGroup(groupId); + + /* sort the list to prevent regression tests getting flaky */ + shardPlacements = SortList(shardPlacements, CompareGroupShardPlacements); + GroupShardPlacement *placement = NULL; foreach_ptr(placement, shardPlacements) { diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 901e9de17..03206ea9b 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -5343,8 +5343,7 @@ ActiveShardPlacementLists(List *taskList) /* - * CompareShardPlacements compares two shard placements by their tuple oid; this - * oid reflects the tuple's insertion order into pg_dist_placement. + * CompareShardPlacements compares two shard placements by placement id. */ int CompareShardPlacements(const void *leftElement, const void *rightElement) @@ -5370,6 +5369,35 @@ CompareShardPlacements(const void *leftElement, const void *rightElement) } +/* + * CompareGroupShardPlacements compares two group shard placements by placement id. + */ +int +CompareGroupShardPlacements(const void *leftElement, const void *rightElement) +{ + const GroupShardPlacement *leftPlacement = + *((const GroupShardPlacement **) leftElement); + const GroupShardPlacement *rightPlacement = + *((const GroupShardPlacement **) rightElement); + + uint64 leftPlacementId = leftPlacement->placementId; + uint64 rightPlacementId = rightPlacement->placementId; + + if (leftPlacementId < rightPlacementId) + { + return -1; + } + else if (leftPlacementId > rightPlacementId) + { + return 1; + } + else + { + return 0; + } +} + + /* * LeftRotateList returns a copy of the given list that has been cyclically * shifted to the left by the given rotation count. For this, the function diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 920541e97..d6ad4c248 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -553,6 +553,7 @@ extern bool BinaryOpExpression(Expr *clause, Node **leftOperand, Node **rightOpe /* helper functions */ extern Var * MakeInt4Column(void); extern int CompareShardPlacements(const void *leftElement, const void *rightElement); +extern int CompareGroupShardPlacements(const void *leftElement, const void *rightElement); extern bool ShardIntervalsOverlap(ShardInterval *firstInterval, ShardInterval *secondInterval); extern bool ShardIntervalsOverlapWithParams(Datum firstMin, Datum firstMax,