diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 1bc11a295..4a4d7de93 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -40,6 +40,7 @@ #include "distributed/metadata_cache.h" #include "distributed/metadata_sync.h" #include "distributed/multi_copy.h" +#include "distributed/multi_executor.h" #include "distributed/multi_logical_planner.h" #include "distributed/multi_partitioning_utils.h" #include "distributed/multi_utility.h" @@ -475,9 +476,34 @@ CreateHashDistributedTableShards(Oid relationId, Oid colocatedTableId, */ EnsureSchemaExistsOnAllNodes(relationId); + /* + * Decide whether to use exclusive connections per placement or not. Note that + * if the local table is not empty, we cannot use sequential mode since the COPY + * operation that'd load the data into shards currently requires exclusive + * connections. + */ if (RegularTable(relationId)) { - useExclusiveConnection = IsTransactionBlock() || !localTableEmpty; + if (!localTableEmpty && MultiShardConnectionType == SEQUENTIAL_CONNECTION) + { + char *relationName = get_rel_name(relationId); + + ereport(ERROR, (errmsg("cannot distribute \"%s\" in sequential mode " + "because it is not empty", relationName), + errhint("If you have manually set " + "citus.multi_shard_modify_mode to 'sequential', " + "try with 'parallel' option. If that is not the " + "case, try distributing local tables when they " + "are empty."))); + } + else if (MultiShardConnectionType == SEQUENTIAL_CONNECTION) + { + useExclusiveConnection = false; + } + else if (!localTableEmpty || IsTransactionBlock()) + { + useExclusiveConnection = true; + } } if (colocatedTableId != InvalidOid) diff --git a/src/test/regress/expected/sequential_modifications.out b/src/test/regress/expected/sequential_modifications.out index f72ea7684..77f4ee0d3 100644 --- a/src/test/regress/expected/sequential_modifications.out +++ b/src/test/regress/expected/sequential_modifications.out @@ -6,6 +6,8 @@ -- CREATE SCHEMA test_seq_ddl; SET search_path TO 'test_seq_ddl'; +SET citus.next_shard_id TO 1600; +SET citus.next_placement_id TO 1600; -- this function simply checks the equality of the number of transactions in the -- pg_dist_transaction and number of primary worker nodes -- The function is useful to ensure that a single connection is opened per worker @@ -326,15 +328,14 @@ SELECT distributed_2PCs_are_equal_to_worker_count(); -- truncate with rep > 1 should work both in parallel and seq. modes CREATE TABLE test_seq_truncate_rep_2 (a int); -INSERT INTO test_seq_truncate_rep_2 SELECT i FROM generate_series(0, 100) i; SET citus.shard_replication_factor TO 2; SELECT create_distributed_table('test_seq_truncate_rep_2', 'a'); -NOTICE: Copying data from local table... create_distributed_table -------------------------- (1 row) +INSERT INTO test_seq_truncate_rep_2 SELECT i FROM generate_series(0, 100) i; SET citus.multi_shard_modify_mode TO 'sequential'; SELECT recover_prepared_transactions(); recover_prepared_transactions @@ -491,9 +492,159 @@ SELECT pg_reload_conf(); t (1 row) +-- The following tests are added to test if create_distributed_table honors sequential mode +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +-- Check if multi_shard_update works properly after create_distributed_table in sequential mode +CREATE TABLE test_seq_multi_shard_update(a int, b int); +BEGIN; + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + SELECT create_distributed_table('test_seq_multi_shard_update', 'a'); + create_distributed_table +-------------------------- + +(1 row) + + INSERT INTO test_seq_multi_shard_update VALUES (0, 0), (1, 0), (2, 0), (3, 0), (4, 0); + SELECT master_modify_multiple_shards('DELETE FROM test_seq_multi_shard_update WHERE b < 2'); + master_modify_multiple_shards +------------------------------- + 5 +(1 row) + +COMMIT; +SELECT distributed_2PCs_are_equal_to_worker_count(); + distributed_2pcs_are_equal_to_worker_count +-------------------------------------------- + t +(1 row) + +DROP TABLE test_seq_multi_shard_update; +-- Check if truncate works properly after create_distributed_table in sequential mode +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +CREATE TABLE test_seq_truncate_after_create(a int, b int); +BEGIN; + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + SELECT create_distributed_table('test_seq_truncate_after_create', 'a'); + create_distributed_table +-------------------------- + +(1 row) + + INSERT INTO test_seq_truncate_after_create VALUES (0, 0), (1, 0), (2, 0), (3, 0), (4, 0); + TRUNCATE test_seq_truncate_after_create; +COMMIT; +SELECT distributed_2PCs_are_equal_to_worker_count(); + distributed_2pcs_are_equal_to_worker_count +-------------------------------------------- + t +(1 row) + +DROP TABLE test_seq_truncate_after_create; +-- Check if drop table works properly after create_distributed_table in sequential mode +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +CREATE TABLE test_seq_drop_table(a int, b int); +BEGIN; + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + SELECT create_distributed_table('test_seq_drop_table', 'a'); + create_distributed_table +-------------------------- + +(1 row) + + DROP TABLE test_seq_drop_table; +COMMIT; +SELECT distributed_2PCs_are_equal_to_worker_count(); + distributed_2pcs_are_equal_to_worker_count +-------------------------------------------- + t +(1 row) + +-- Check if copy errors out properly after create_distributed_table in sequential mode +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +CREATE TABLE test_seq_copy(a int, b int); +BEGIN; + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + SELECT create_distributed_table('test_seq_copy', 'a'); + create_distributed_table +-------------------------- + +(1 row) + + \COPY test_seq_copy FROM STDIN DELIMITER AS ','; +ERROR: cannot establish a new connection for placement 1673, since DDL has been executed on a connection that is in use +CONTEXT: COPY test_seq_copy, line 2: "2,2" +ROLLBACK; +SELECT distributed_2PCs_are_equal_to_worker_count(); + distributed_2pcs_are_equal_to_worker_count +-------------------------------------------- + f +(1 row) + +DROP TABLE test_seq_copy; +-- Check if DDL + CREATE INDEX works properly after create_distributed_table in sequential mode +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +CREATE TABLE test_seq_ddl_index(a int, b int); +BEGIN; + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + SELECT create_distributed_table('test_seq_ddl_index', 'a'); + create_distributed_table +-------------------------- + +(1 row) + + INSERT INTO test_seq_ddl_index VALUES (0, 0), (1, 0), (2, 0), (3, 0), (4, 0); + ALTER TABLE test_seq_ddl_index ADD COLUMN c int; + CREATE INDEX idx ON test_seq_ddl_index(c); +COMMIT; +SELECT distributed_2PCs_are_equal_to_worker_count(); + distributed_2pcs_are_equal_to_worker_count +-------------------------------------------- + t +(1 row) + +DROP TABLE test_seq_ddl_index; +-- create_distributed_table should fail on relations with data in sequential mode in and out transaction block +CREATE TABLE test_create_seq_table (a int); +INSERT INTO test_create_seq_table VALUES (1); +SET citus.multi_shard_modify_mode TO 'sequential'; +SELECT create_distributed_table('test_create_seq_table' ,'a'); +ERROR: cannot distribute "test_create_seq_table" in sequential mode because it is not empty +HINT: If you have manually set citus.multi_shard_modify_mode to 'sequential', try with 'parallel' option. If that is not the case, try distributing local tables when they are empty. +RESET citus.multi_shard_modify_mode; +BEGIN; + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + select create_distributed_table('test_create_seq_table' ,'a'); +ERROR: cannot distribute "test_create_seq_table" in sequential mode because it is not empty +HINT: If you have manually set citus.multi_shard_modify_mode to 'sequential', try with 'parallel' option. If that is not the case, try distributing local tables when they are empty. +ROLLBACK; SET search_path TO 'public'; DROP SCHEMA test_seq_ddl CASCADE; -NOTICE: drop cascades to 10 other objects +NOTICE: drop cascades to 11 other objects DETAIL: drop cascades to function test_seq_ddl.distributed_2pcs_are_equal_to_worker_count() drop cascades to function test_seq_ddl.distributed_2pcs_are_equal_to_placement_count() drop cascades to function test_seq_ddl.no_distributed_2pcs() @@ -504,3 +655,4 @@ drop cascades to table test_seq_ddl.test_table_rep_2 drop cascades to table test_seq_ddl.test_seq_truncate drop cascades to table test_seq_ddl.test_seq_truncate_rep_2 drop cascades to table test_seq_ddl.multi_shard_modify_test +drop cascades to table test_seq_ddl.test_create_seq_table diff --git a/src/test/regress/sql/sequential_modifications.sql b/src/test/regress/sql/sequential_modifications.sql index e95feb6cf..0f3856477 100644 --- a/src/test/regress/sql/sequential_modifications.sql +++ b/src/test/regress/sql/sequential_modifications.sql @@ -6,6 +6,8 @@ -- CREATE SCHEMA test_seq_ddl; SET search_path TO 'test_seq_ddl'; +SET citus.next_shard_id TO 1600; +SET citus.next_placement_id TO 1600; -- this function simply checks the equality of the number of transactions in the -- pg_dist_transaction and number of primary worker nodes @@ -182,9 +184,9 @@ SELECT distributed_2PCs_are_equal_to_worker_count(); -- truncate with rep > 1 should work both in parallel and seq. modes CREATE TABLE test_seq_truncate_rep_2 (a int); -INSERT INTO test_seq_truncate_rep_2 SELECT i FROM generate_series(0, 100) i; SET citus.shard_replication_factor TO 2; SELECT create_distributed_table('test_seq_truncate_rep_2', 'a'); +INSERT INTO test_seq_truncate_rep_2 SELECT i FROM generate_series(0, 100) i; SET citus.multi_shard_modify_mode TO 'sequential'; SELECT recover_prepared_transactions(); @@ -256,5 +258,87 @@ ALTER SYSTEM SET citus.recover_2pc_interval TO DEFAULT; SET citus.shard_replication_factor TO DEFAULT; SELECT pg_reload_conf(); +-- The following tests are added to test if create_distributed_table honors sequential mode +SELECT recover_prepared_transactions(); + +-- Check if multi_shard_update works properly after create_distributed_table in sequential mode +CREATE TABLE test_seq_multi_shard_update(a int, b int); +BEGIN; + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + SELECT create_distributed_table('test_seq_multi_shard_update', 'a'); + INSERT INTO test_seq_multi_shard_update VALUES (0, 0), (1, 0), (2, 0), (3, 0), (4, 0); + SELECT master_modify_multiple_shards('DELETE FROM test_seq_multi_shard_update WHERE b < 2'); +COMMIT; +SELECT distributed_2PCs_are_equal_to_worker_count(); +DROP TABLE test_seq_multi_shard_update; + + +-- Check if truncate works properly after create_distributed_table in sequential mode +SELECT recover_prepared_transactions(); +CREATE TABLE test_seq_truncate_after_create(a int, b int); +BEGIN; + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + SELECT create_distributed_table('test_seq_truncate_after_create', 'a'); + INSERT INTO test_seq_truncate_after_create VALUES (0, 0), (1, 0), (2, 0), (3, 0), (4, 0); + TRUNCATE test_seq_truncate_after_create; +COMMIT; +SELECT distributed_2PCs_are_equal_to_worker_count(); +DROP TABLE test_seq_truncate_after_create; + + +-- Check if drop table works properly after create_distributed_table in sequential mode +SELECT recover_prepared_transactions(); +CREATE TABLE test_seq_drop_table(a int, b int); +BEGIN; + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + SELECT create_distributed_table('test_seq_drop_table', 'a'); + DROP TABLE test_seq_drop_table; +COMMIT; +SELECT distributed_2PCs_are_equal_to_worker_count(); + + +-- Check if copy errors out properly after create_distributed_table in sequential mode +SELECT recover_prepared_transactions(); +CREATE TABLE test_seq_copy(a int, b int); +BEGIN; + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + SELECT create_distributed_table('test_seq_copy', 'a'); + \COPY test_seq_copy FROM STDIN DELIMITER AS ','; +1,1 +2,2 +3,3 +\. +ROLLBACK; +SELECT distributed_2PCs_are_equal_to_worker_count(); +DROP TABLE test_seq_copy; + + +-- Check if DDL + CREATE INDEX works properly after create_distributed_table in sequential mode +SELECT recover_prepared_transactions(); +CREATE TABLE test_seq_ddl_index(a int, b int); +BEGIN; + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + SELECT create_distributed_table('test_seq_ddl_index', 'a'); + INSERT INTO test_seq_ddl_index VALUES (0, 0), (1, 0), (2, 0), (3, 0), (4, 0); + ALTER TABLE test_seq_ddl_index ADD COLUMN c int; + CREATE INDEX idx ON test_seq_ddl_index(c); +COMMIT; +SELECT distributed_2PCs_are_equal_to_worker_count(); +DROP TABLE test_seq_ddl_index; + +-- create_distributed_table should fail on relations with data in sequential mode in and out transaction block +CREATE TABLE test_create_seq_table (a int); +INSERT INTO test_create_seq_table VALUES (1); + +SET citus.multi_shard_modify_mode TO 'sequential'; +SELECT create_distributed_table('test_create_seq_table' ,'a'); + +RESET citus.multi_shard_modify_mode; + +BEGIN; + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + select create_distributed_table('test_create_seq_table' ,'a'); +ROLLBACK; + SET search_path TO 'public'; DROP SCHEMA test_seq_ddl CASCADE;