INSERT .. SELECT pushdown honors multi_shard_modification_mode

pull/2198/head
Onder Kalaci 2018-06-04 17:16:02 +03:00
parent 336044f2a8
commit d918556dca
5 changed files with 88 additions and 5 deletions

View File

@ -213,11 +213,10 @@ RouterCreateScan(CustomScan *scan)
Assert(isModificationQuery);
if (IsMultiRowInsert(workerJob->jobQuery) ||
(IsUpdateOrDelete(distributedPlan) &&
MultiShardConnectionType == SEQUENTIAL_CONNECTION))
MultiShardConnectionType == SEQUENTIAL_CONNECTION)
{
/*
* Multi shard update deletes while multi_shard_modify_mode equals
* Multi shard modifications while multi_shard_modify_mode equals
* to 'sequential' or Multi-row INSERT are executed sequentially
* instead of using parallel connections.
*/

View File

@ -1717,13 +1717,20 @@ ROLLBACK;
-- Altering a reference table and then performing an INSERT ... SELECT which
-- joins with the reference table is not allowed, since the INSERT ... SELECT
-- would read from the reference table over others connections than the ones
-- that performed the DDL.
-- that performed the parallel DDL.
BEGIN;
ALTER TABLE reference_table ADD COLUMN z int;
INSERT INTO raw_events_first (user_id)
SELECT user_id FROM raw_events_second JOIN reference_table USING (user_id);
ERROR: cannot establish a new connection for placement 13300025, since DDL has been executed on a connection that is in use
ROLLBACK;
-- the same test with sequential DDL should work fine
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
ALTER TABLE reference_table ADD COLUMN z int;
INSERT INTO raw_events_first (user_id)
SELECT user_id FROM raw_events_second JOIN reference_table USING (user_id);
ROLLBACK;
-- Insert after copy is allowed
BEGIN;
COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ',';

View File

@ -416,6 +416,51 @@ SELECT count(*) FROM multi_shard_modify_test;
0
(1 row)
-- test INSERT ... SELECT queries
-- with sequential modification mode, we should see #primary worker records
SET citus.multi_shard_modify_mode TO 'sequential';
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
INSERT INTO multi_shard_modify_test SELECT * FROM multi_shard_modify_test;
SELECT distributed_2PCs_are_equal_to_worker_count();
distributed_2pcs_are_equal_to_worker_count
--------------------------------------------
t
(1 row)
SET citus.multi_shard_modify_mode TO 'parallel';
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
INSERT INTO multi_shard_modify_test SELECT * FROM multi_shard_modify_test;
SELECT distributed_2PCs_are_equal_to_placement_count();
distributed_2pcs_are_equal_to_placement_count
-----------------------------------------------
t
(1 row)
-- one more realistic test with sequential inserts and INSERT .. SELECT in the same tx
INSERT INTO multi_shard_modify_test SELECT i, i::text, i FROM generate_series(0,100) i;
BEGIN;
INSERT INTO multi_shard_modify_test VALUES (1,'1',1), (2,'2',2), (3,'3',3), (4,'4',4);
-- now switch to sequential mode to enable a successful INSERT .. SELECT
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
INSERT INTO multi_shard_modify_test SELECT * FROM multi_shard_modify_test;
COMMIT;
-- see that all the data successfully inserted
SELECT count(*) FROM multi_shard_modify_test;
count
-------
210
(1 row)
ALTER SYSTEM SET citus.recover_2pc_interval TO DEFAULT;
SET citus.shard_replication_factor TO DEFAULT;
SELECT pg_reload_conf();

View File

@ -1361,13 +1361,21 @@ ROLLBACK;
-- Altering a reference table and then performing an INSERT ... SELECT which
-- joins with the reference table is not allowed, since the INSERT ... SELECT
-- would read from the reference table over others connections than the ones
-- that performed the DDL.
-- that performed the parallel DDL.
BEGIN;
ALTER TABLE reference_table ADD COLUMN z int;
INSERT INTO raw_events_first (user_id)
SELECT user_id FROM raw_events_second JOIN reference_table USING (user_id);
ROLLBACK;
-- the same test with sequential DDL should work fine
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
ALTER TABLE reference_table ADD COLUMN z int;
INSERT INTO raw_events_first (user_id)
SELECT user_id FROM raw_events_second JOIN reference_table USING (user_id);
ROLLBACK;
-- Insert after copy is allowed
BEGIN;
COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ',';

View File

@ -219,6 +219,30 @@ COMMIT;
-- see that all the data successfully removed
SELECT count(*) FROM multi_shard_modify_test;
-- test INSERT ... SELECT queries
-- with sequential modification mode, we should see #primary worker records
SET citus.multi_shard_modify_mode TO 'sequential';
SELECT recover_prepared_transactions();
INSERT INTO multi_shard_modify_test SELECT * FROM multi_shard_modify_test;
SELECT distributed_2PCs_are_equal_to_worker_count();
SET citus.multi_shard_modify_mode TO 'parallel';
SELECT recover_prepared_transactions();
INSERT INTO multi_shard_modify_test SELECT * FROM multi_shard_modify_test;
SELECT distributed_2PCs_are_equal_to_placement_count();
-- one more realistic test with sequential inserts and INSERT .. SELECT in the same tx
INSERT INTO multi_shard_modify_test SELECT i, i::text, i FROM generate_series(0,100) i;
BEGIN;
INSERT INTO multi_shard_modify_test VALUES (1,'1',1), (2,'2',2), (3,'3',3), (4,'4',4);
-- now switch to sequential mode to enable a successful INSERT .. SELECT
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
INSERT INTO multi_shard_modify_test SELECT * FROM multi_shard_modify_test;
COMMIT;
-- see that all the data successfully inserted
SELECT count(*) FROM multi_shard_modify_test;
ALTER SYSTEM SET citus.recover_2pc_interval TO DEFAULT;
SET citus.shard_replication_factor TO DEFAULT;