From b9b419ef167586e93f6660c8ad93876a5b3acfac Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Tue, 25 Jan 2022 11:27:40 +0100 Subject: [PATCH] Allow creating distributed tables in sequential mode With https://github.com/citusdata/citus/pull/2780, we allow COPY to use any number of connections that the executor used in a tx block. Meaning that, while COPYing data to the shards, create_distributed_table could allow sequential mode. --- .../commands/create_distributed_table.c | 14 +---- .../foreign_key_restriction_enforcement.out | 40 ++++++++------ src/test/regress/expected/multi_truncate.out | 21 ++++---- .../expected/sequential_modifications.out | 53 +++++++++++++++++-- .../foreign_key_restriction_enforcement.sql | 8 +-- src/test/regress/sql/multi_truncate.sql | 6 +-- .../regress/sql/sequential_modifications.sql | 14 ++++- 7 files changed, 102 insertions(+), 54 deletions(-) diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 041415686..870c78621 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -1539,19 +1539,7 @@ CanUseExclusiveConnections(Oid relationId, bool localTableEmpty) bool shouldRunSequential = MultiShardConnectionType == SEQUENTIAL_CONNECTION || hasForeignKeyToReferenceTable; - if (!localTableEmpty && shouldRunSequential) - { - char *relationName = get_rel_name(relationId); - - ereport(ERROR, (errmsg("cannot distribute \"%s\" in sequential mode " - "because it is not empty", relationName), - errhint("If you have manually set " - "citus.multi_shard_modify_mode to 'sequential', " - "try with 'parallel' option. If that is not the " - "case, try distributing local tables when they " - "are empty."))); - } - else if (shouldRunSequential && ParallelQueryExecutedInTransaction()) + if (shouldRunSequential && ParallelQueryExecutedInTransaction()) { /* * We decided to use sequential execution. It's either because relation diff --git a/src/test/regress/expected/foreign_key_restriction_enforcement.out b/src/test/regress/expected/foreign_key_restriction_enforcement.out index 240f0bc5a..15facd198 100644 --- a/src/test/regress/expected/foreign_key_restriction_enforcement.out +++ b/src/test/regress/expected/foreign_key_restriction_enforcement.out @@ -722,7 +722,7 @@ BEGIN; UPDATE reference_table SET id = 101 WHERE id = 99; ERROR: cannot modify table "reference_table" because there was a parallel operation on a distributed table -DETAIL: When there is a foreign key to a reference table or to a local table, Citus needs to perform all operations over a single connection per node to ensure consistency. +DETAIL: When there is a foreign key to a reference table or to a local table, Citus needs to perform all operations over a single connection per node to ensure consistency. HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';" ROLLBACK; BEGIN; @@ -734,7 +734,7 @@ BEGIN; UPDATE transitive_reference_table SET id = 101 WHERE id = 99; ERROR: cannot modify table "transitive_reference_table" because there was a parallel operation on a distributed table -DETAIL: When there is a foreign key to a reference table or to a local table, Citus needs to perform all operations over a single connection per node to ensure consistency. +DETAIL: When there is a foreign key to a reference table or to a local table, Citus needs to perform all operations over a single connection per node to ensure consistency. HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';" ROLLBACK; -- case 4.3: SELECT to a dist table is follwed by an unrelated DDL to a reference table @@ -1074,7 +1074,7 @@ BEGIN; UPDATE unrelated_dist_table SET value_1 = 15; UPDATE reference_table SET id = 101 WHERE id = 99; ERROR: cannot modify table "reference_table" because there was a parallel operation on a distributed table -DETAIL: When there is a foreign key to a reference table or to a local table, Citus needs to perform all operations over a single connection per node to ensure consistency. +DETAIL: When there is a foreign key to a reference table or to a local table, Citus needs to perform all operations over a single connection per node to ensure consistency. HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';" UPDATE on_update_fkey_table SET value_1 = 5 WHERE id != 11; ERROR: current transaction is aborted, commands ignored until end of transaction block @@ -1282,8 +1282,8 @@ ERROR: current transaction is aborted, commands ignored until end of transactio DROP TABLE test_table_1 CASCADE; ERROR: current transaction is aborted, commands ignored until end of transaction block ROLLBACK; --- make sure that we cannot create hash distributed tables with --- foreign keys to reference tables when they have data in it +-- make sure that we can create hash distributed tables with +-- even when foreign keys to reference tables and they have data in it BEGIN; CREATE TABLE test_table_1(id int PRIMARY KEY); INSERT INTO test_table_1 SELECT i FROM generate_series(0,100) i; @@ -1300,16 +1300,21 @@ HINT: To remove the local data, run: SELECT truncate_local_data_after_distribut (1 row) SELECT create_distributed_table('test_table_2', 'id'); -ERROR: cannot distribute "test_table_2" in sequential mode because it is not empty -HINT: If you have manually set citus.multi_shard_modify_mode to 'sequential', try with 'parallel' option. If that is not the case, try distributing local tables when they are empty. +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$test_fkey_to_ref_in_tx.test_table_2$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + -- make sure that the output isn't too verbose SET LOCAL client_min_messages TO ERROR; -ERROR: current transaction is aborted, commands ignored until end of transaction block DROP TABLE test_table_2, test_table_1; -ERROR: current transaction is aborted, commands ignored until end of transaction block COMMIT; --- the same test with above in sequential mode would still not work --- since COPY cannot be executed in sequential mode +-- the same test with above in sequential mode would just work +-- as COPY can be executed in sequential mode BEGIN; SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; CREATE TABLE test_table_1(id int PRIMARY KEY); @@ -1327,13 +1332,18 @@ HINT: To remove the local data, run: SELECT truncate_local_data_after_distribut (1 row) SELECT create_distributed_table('test_table_2', 'id'); -ERROR: cannot distribute "test_table_2" in sequential mode because it is not empty -HINT: If you have manually set citus.multi_shard_modify_mode to 'sequential', try with 'parallel' option. If that is not the case, try distributing local tables when they are empty. +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$test_fkey_to_ref_in_tx.test_table_2$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + -- make sure that the output isn't too verbose SET LOCAL client_min_messages TO ERROR; -ERROR: current transaction is aborted, commands ignored until end of transaction block DROP TABLE test_table_2, test_table_1; -ERROR: current transaction is aborted, commands ignored until end of transaction block COMMIT; -- we should be able to execute and DML/DDL/SELECT after we've -- switched to sequential via create_distributed_table diff --git a/src/test/regress/expected/multi_truncate.out b/src/test/regress/expected/multi_truncate.out index 7ac388661..bf661ccb8 100644 --- a/src/test/regress/expected/multi_truncate.out +++ b/src/test/regress/expected/multi_truncate.out @@ -501,24 +501,23 @@ INSERT INTO dist SELECT x,x FROM generate_series(1,10000) x; SELECT truncate_local_data_after_distributing_table('ref'); ERROR: cannot truncate a table referenced in a foreign key constraint by a local table DETAIL: Table "dist" references "ref" --- test that we do not allow distributing tables that have foreign keys to reference tables +-- test that we allow distributing tables that have foreign keys to reference tables SELECT create_distributed_table('dist','id'); -ERROR: cannot distribute "dist" in sequential mode because it is not empty -HINT: If you have manually set citus.multi_shard_modify_mode to 'sequential', try with 'parallel' option. If that is not the case, try distributing local tables when they are empty. +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$multi_truncate.dist$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + SHOW citus.multi_shard_modify_mode; citus.multi_shard_modify_mode --------------------------------------------------------------------- parallel (1 row) --- distribute the table after a truncate -TRUNCATE dist; -SELECT create_distributed_table('dist','id'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -- the following should truncate ref and dist BEGIN; SELECT truncate_local_data_after_distributing_table('ref'); diff --git a/src/test/regress/expected/sequential_modifications.out b/src/test/regress/expected/sequential_modifications.out index 80bbba25a..e5092ae56 100644 --- a/src/test/regress/expected/sequential_modifications.out +++ b/src/test/regress/expected/sequential_modifications.out @@ -610,20 +610,63 @@ SELECT distributed_2PCs_are_equal_to_worker_count(); (1 row) DROP TABLE test_seq_ddl_index; --- create_distributed_table should fail on relations with data in sequential mode in and out transaction block +-- create_distributed_table should works on relations with data in sequential mode in and out transaction block CREATE TABLE test_create_seq_table (a int); INSERT INTO test_create_seq_table VALUES (1); SET citus.multi_shard_modify_mode TO 'sequential'; SELECT create_distributed_table('test_create_seq_table' ,'a'); -ERROR: cannot distribute "test_create_seq_table" in sequential mode because it is not empty -HINT: If you have manually set citus.multi_shard_modify_mode to 'sequential', try with 'parallel' option. If that is not the case, try distributing local tables when they are empty. +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$test_seq_ddl.test_create_seq_table$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT undistribute_table('test_create_seq_table'); +NOTICE: creating a new table for test_seq_ddl.test_create_seq_table +NOTICE: moving the data of test_seq_ddl.test_create_seq_table +NOTICE: dropping the old test_seq_ddl.test_create_seq_table +NOTICE: renaming the new table to test_seq_ddl.test_create_seq_table + undistribute_table +--------------------------------------------------------------------- + +(1 row) + RESET citus.multi_shard_modify_mode; BEGIN; SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; select create_distributed_table('test_create_seq_table' ,'a'); -ERROR: cannot distribute "test_create_seq_table" in sequential mode because it is not empty -HINT: If you have manually set citus.multi_shard_modify_mode to 'sequential', try with 'parallel' option. If that is not the case, try distributing local tables when they are empty. +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$test_seq_ddl.test_create_seq_table$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + ROLLBACK; +-- trigger switch-over when using single connection per worker +BEGIN; +SET citus.next_shard_id TO 16900; +SET LOCAL citus.shard_count TO 4; +SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; +CREATE UNLOGGED TABLE trigger_switchover(a int, b int, c int, d int, e int, f int, g int, h int); +INSERT INTO trigger_switchover + SELECT s AS a, s AS b, s AS c, s AS d, s AS e, s AS f, s AS g, s AS h FROM generate_series(1,250000) s; +SELECT create_distributed_table('trigger_switchover','a'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$test_seq_ddl.trigger_switchover$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +ABORT; SET search_path TO 'public'; DROP SCHEMA test_seq_ddl CASCADE; NOTICE: drop cascades to 11 other objects diff --git a/src/test/regress/sql/foreign_key_restriction_enforcement.sql b/src/test/regress/sql/foreign_key_restriction_enforcement.sql index 7900e7a19..cdef1c798 100644 --- a/src/test/regress/sql/foreign_key_restriction_enforcement.sql +++ b/src/test/regress/sql/foreign_key_restriction_enforcement.sql @@ -803,8 +803,8 @@ BEGIN; DROP TABLE test_table_1 CASCADE; ROLLBACK; --- make sure that we cannot create hash distributed tables with --- foreign keys to reference tables when they have data in it +-- make sure that we can create hash distributed tables with +-- even when foreign keys to reference tables and they have data in it BEGIN; CREATE TABLE test_table_1(id int PRIMARY KEY); @@ -822,8 +822,8 @@ BEGIN; COMMIT; --- the same test with above in sequential mode would still not work --- since COPY cannot be executed in sequential mode +-- the same test with above in sequential mode would just work +-- as COPY can be executed in sequential mode BEGIN; SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; diff --git a/src/test/regress/sql/multi_truncate.sql b/src/test/regress/sql/multi_truncate.sql index 83b4da202..252d02ab1 100644 --- a/src/test/regress/sql/multi_truncate.sql +++ b/src/test/regress/sql/multi_truncate.sql @@ -295,14 +295,10 @@ INSERT INTO dist SELECT x,x FROM generate_series(1,10000) x; -- test that we do not cascade truncates to local referencing tables SELECT truncate_local_data_after_distributing_table('ref'); --- test that we do not allow distributing tables that have foreign keys to reference tables +-- test that we allow distributing tables that have foreign keys to reference tables SELECT create_distributed_table('dist','id'); SHOW citus.multi_shard_modify_mode; --- distribute the table after a truncate -TRUNCATE dist; -SELECT create_distributed_table('dist','id'); - -- the following should truncate ref and dist BEGIN; SELECT truncate_local_data_after_distributing_table('ref'); diff --git a/src/test/regress/sql/sequential_modifications.sql b/src/test/regress/sql/sequential_modifications.sql index d3293c5af..79e0a1211 100644 --- a/src/test/regress/sql/sequential_modifications.sql +++ b/src/test/regress/sql/sequential_modifications.sql @@ -326,12 +326,13 @@ COMMIT; SELECT distributed_2PCs_are_equal_to_worker_count(); DROP TABLE test_seq_ddl_index; --- create_distributed_table should fail on relations with data in sequential mode in and out transaction block +-- create_distributed_table should works on relations with data in sequential mode in and out transaction block CREATE TABLE test_create_seq_table (a int); INSERT INTO test_create_seq_table VALUES (1); SET citus.multi_shard_modify_mode TO 'sequential'; SELECT create_distributed_table('test_create_seq_table' ,'a'); +SELECT undistribute_table('test_create_seq_table'); RESET citus.multi_shard_modify_mode; @@ -340,5 +341,16 @@ BEGIN; select create_distributed_table('test_create_seq_table' ,'a'); ROLLBACK; +-- trigger switch-over when using single connection per worker +BEGIN; +SET citus.next_shard_id TO 16900; +SET LOCAL citus.shard_count TO 4; +SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; +CREATE UNLOGGED TABLE trigger_switchover(a int, b int, c int, d int, e int, f int, g int, h int); +INSERT INTO trigger_switchover + SELECT s AS a, s AS b, s AS c, s AS d, s AS e, s AS f, s AS g, s AS h FROM generate_series(1,250000) s; +SELECT create_distributed_table('trigger_switchover','a'); +ABORT; + SET search_path TO 'public'; DROP SCHEMA test_seq_ddl CASCADE;