From d918556dca7dce13d06e651a8fecbc3a8abd1760 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Mon, 4 Jun 2018 17:16:02 +0300 Subject: [PATCH] INSERT .. SELECT pushdown honors multi_shard_modification_mode --- .../distributed/executor/citus_custom_scan.c | 5 +-- .../regress/expected/multi_insert_select.out | 9 +++- .../expected/sequential_modifications.out | 45 +++++++++++++++++++ src/test/regress/sql/multi_insert_select.sql | 10 ++++- .../regress/sql/sequential_modifications.sql | 24 ++++++++++ 5 files changed, 88 insertions(+), 5 deletions(-) diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index 62379fc3a..25fb34e43 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -213,11 +213,10 @@ RouterCreateScan(CustomScan *scan) Assert(isModificationQuery); if (IsMultiRowInsert(workerJob->jobQuery) || - (IsUpdateOrDelete(distributedPlan) && - MultiShardConnectionType == SEQUENTIAL_CONNECTION)) + MultiShardConnectionType == SEQUENTIAL_CONNECTION) { /* - * Multi shard update deletes while multi_shard_modify_mode equals + * Multi shard modifications while multi_shard_modify_mode equals * to 'sequential' or Multi-row INSERT are executed sequentially * instead of using parallel connections. */ diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index 7e1955dcb..af7d0577c 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -1717,13 +1717,20 @@ ROLLBACK; -- Altering a reference table and then performing an INSERT ... SELECT which -- joins with the reference table is not allowed, since the INSERT ... SELECT -- would read from the reference table over others connections than the ones --- that performed the DDL. +-- that performed the parallel DDL. BEGIN; ALTER TABLE reference_table ADD COLUMN z int; INSERT INTO raw_events_first (user_id) SELECT user_id FROM raw_events_second JOIN reference_table USING (user_id); ERROR: cannot establish a new connection for placement 13300025, since DDL has been executed on a connection that is in use ROLLBACK; +-- the same test with sequential DDL should work fine +BEGIN; +SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; +ALTER TABLE reference_table ADD COLUMN z int; +INSERT INTO raw_events_first (user_id) +SELECT user_id FROM raw_events_second JOIN reference_table USING (user_id); +ROLLBACK; -- Insert after copy is allowed BEGIN; COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ','; diff --git a/src/test/regress/expected/sequential_modifications.out b/src/test/regress/expected/sequential_modifications.out index 3c8d5a287..8483abf68 100644 --- a/src/test/regress/expected/sequential_modifications.out +++ b/src/test/regress/expected/sequential_modifications.out @@ -416,6 +416,51 @@ SELECT count(*) FROM multi_shard_modify_test; 0 (1 row) +-- test INSERT ... SELECT queries +-- with sequential modification mode, we should see #primary worker records +SET citus.multi_shard_modify_mode TO 'sequential'; +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +INSERT INTO multi_shard_modify_test SELECT * FROM multi_shard_modify_test; +SELECT distributed_2PCs_are_equal_to_worker_count(); + distributed_2pcs_are_equal_to_worker_count +-------------------------------------------- + t +(1 row) + +SET citus.multi_shard_modify_mode TO 'parallel'; +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +INSERT INTO multi_shard_modify_test SELECT * FROM multi_shard_modify_test; +SELECT distributed_2PCs_are_equal_to_placement_count(); + distributed_2pcs_are_equal_to_placement_count +----------------------------------------------- + t +(1 row) + +-- one more realistic test with sequential inserts and INSERT .. SELECT in the same tx +INSERT INTO multi_shard_modify_test SELECT i, i::text, i FROM generate_series(0,100) i; +BEGIN; + INSERT INTO multi_shard_modify_test VALUES (1,'1',1), (2,'2',2), (3,'3',3), (4,'4',4); + -- now switch to sequential mode to enable a successful INSERT .. SELECT + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + INSERT INTO multi_shard_modify_test SELECT * FROM multi_shard_modify_test; +COMMIT; +-- see that all the data successfully inserted +SELECT count(*) FROM multi_shard_modify_test; + count +------- + 210 +(1 row) + ALTER SYSTEM SET citus.recover_2pc_interval TO DEFAULT; SET citus.shard_replication_factor TO DEFAULT; SELECT pg_reload_conf(); diff --git a/src/test/regress/sql/multi_insert_select.sql b/src/test/regress/sql/multi_insert_select.sql index a05d79318..ecb6d34b5 100644 --- a/src/test/regress/sql/multi_insert_select.sql +++ b/src/test/regress/sql/multi_insert_select.sql @@ -1361,13 +1361,21 @@ ROLLBACK; -- Altering a reference table and then performing an INSERT ... SELECT which -- joins with the reference table is not allowed, since the INSERT ... SELECT -- would read from the reference table over others connections than the ones --- that performed the DDL. +-- that performed the parallel DDL. BEGIN; ALTER TABLE reference_table ADD COLUMN z int; INSERT INTO raw_events_first (user_id) SELECT user_id FROM raw_events_second JOIN reference_table USING (user_id); ROLLBACK; +-- the same test with sequential DDL should work fine +BEGIN; +SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; +ALTER TABLE reference_table ADD COLUMN z int; +INSERT INTO raw_events_first (user_id) +SELECT user_id FROM raw_events_second JOIN reference_table USING (user_id); +ROLLBACK; + -- Insert after copy is allowed BEGIN; COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ','; diff --git a/src/test/regress/sql/sequential_modifications.sql b/src/test/regress/sql/sequential_modifications.sql index 4ca637355..3f2d8af12 100644 --- a/src/test/regress/sql/sequential_modifications.sql +++ b/src/test/regress/sql/sequential_modifications.sql @@ -219,6 +219,30 @@ COMMIT; -- see that all the data successfully removed SELECT count(*) FROM multi_shard_modify_test; +-- test INSERT ... SELECT queries +-- with sequential modification mode, we should see #primary worker records +SET citus.multi_shard_modify_mode TO 'sequential'; +SELECT recover_prepared_transactions(); +INSERT INTO multi_shard_modify_test SELECT * FROM multi_shard_modify_test; +SELECT distributed_2PCs_are_equal_to_worker_count(); + +SET citus.multi_shard_modify_mode TO 'parallel'; +SELECT recover_prepared_transactions(); +INSERT INTO multi_shard_modify_test SELECT * FROM multi_shard_modify_test; +SELECT distributed_2PCs_are_equal_to_placement_count(); + +-- one more realistic test with sequential inserts and INSERT .. SELECT in the same tx +INSERT INTO multi_shard_modify_test SELECT i, i::text, i FROM generate_series(0,100) i; +BEGIN; + INSERT INTO multi_shard_modify_test VALUES (1,'1',1), (2,'2',2), (3,'3',3), (4,'4',4); + + -- now switch to sequential mode to enable a successful INSERT .. SELECT + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + INSERT INTO multi_shard_modify_test SELECT * FROM multi_shard_modify_test; +COMMIT; + +-- see that all the data successfully inserted +SELECT count(*) FROM multi_shard_modify_test; ALTER SYSTEM SET citus.recover_2pc_interval TO DEFAULT; SET citus.shard_replication_factor TO DEFAULT;