-- -- multi shard update delete -- this file is intended to test multi shard update/delete queries -- SET citus.next_shard_id TO 1440000; ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1440000; SET citus.shard_replication_factor to 1; SET citus.multi_shard_modify_mode to 'parallel'; CREATE TABLE users_test_table(user_id int, value_1 int, value_2 int, value_3 int); SELECT create_distributed_table('users_test_table', 'user_id'); create_distributed_table -------------------------- (1 row) \COPY users_test_table FROM STDIN DELIMITER AS ','; CREATE TABLE events_test_table (user_id int, value_1 int, value_2 int, value_3 int); SELECT create_distributed_table('events_test_table', 'user_id'); create_distributed_table -------------------------- (1 row) \COPY events_test_table FROM STDIN DELIMITER AS ','; CREATE TABLE events_reference_copy_table (like events_test_table); SELECT create_reference_table('events_reference_copy_table'); create_reference_table ------------------------ (1 row) INSERT INTO events_reference_copy_table SELECT * FROM events_test_table; CREATE TABLE users_reference_copy_table (like users_test_table); SELECT create_reference_table('users_reference_copy_table'); create_reference_table ------------------------ (1 row) INSERT INTO users_reference_copy_table SELECT * FROM users_test_table; -- Run multi shard updates and deletes without transaction on hash distributed tables UPDATE users_test_table SET value_1 = 1; SELECT COUNT(*), SUM(value_1) FROM users_test_table; count | sum -------+----- 15 | 15 (1 row) SELECT COUNT(*), SUM(value_2) FROM users_test_table WHERE user_id = 1 or user_id = 3; count | sum -------+----- 4 | 52 (1 row) UPDATE users_test_table SET value_2 = value_2 + 1 WHERE user_id = 1 or user_id = 3; SELECT COUNT(*), SUM(value_2) FROM users_test_table WHERE user_id = 1 or user_id = 3; count | sum -------+----- 4 | 56 (1 row) UPDATE users_test_table SET value_3 = 0 WHERE user_id <> 5; SELECT SUM(value_3) FROM users_test_table WHERE user_id <> 5; sum ----- 0 (1 row) SELECT COUNT(*) FROM users_test_table WHERE user_id = 3 or user_id = 5; count ------- 4 (1 row) DELETE FROM users_test_table WHERE user_id = 3 or user_id = 5; SELECT COUNT(*) FROM users_test_table WHERE user_id = 3 or user_id = 5; count ------- 0 (1 row) -- Run multi shard update delete queries within transactions BEGIN; UPDATE users_test_table SET value_3 = 0; END; SELECT SUM(value_3) FROM users_test_table; sum ----- 0 (1 row) -- Update can also be rollbacked BEGIN; UPDATE users_test_table SET value_3 = 1; ROLLBACK; SELECT SUM(value_3) FROM users_test_table; sum ----- 0 (1 row) -- Run with inserts (we need to set citus.multi_shard_modify_mode to sequential) BEGIN; INSERT INTO users_test_table (user_id, value_3) VALUES(20, 15); INSERT INTO users_test_table (user_id, value_3) VALUES(16,1), (20,16), (7,1), (20,17); SET citus.multi_shard_modify_mode to sequential; UPDATE users_test_table SET value_3 = 1; END; SELECT COUNT()SUM(value_3) FROM users_test_table; ERROR: syntax error at or near "(" LINE 1: SELECT COUNT()SUM(value_3) FROM users_test_table; ^ SET citus.multi_shard_modify_mode to 'sequential'; -- Run multiple multi shard updates (with sequential executor) BEGIN; UPDATE users_test_table SET value_3 = 5; UPDATE users_test_table SET value_3 = 0; END; SELECT SUM(value_3) FROM users_copy_table; ERROR: relation "users_copy_table" does not exist LINE 1: SELECT SUM(value_3) FROM users_copy_table; ^ -- Run multiple multi shard updates (with parallel executor) SET citus.multi_shard_modify_mode to 'parallel'; UPDATE users_test_table SET value_3 = 5; BEGIN; UPDATE users_test_table SET value_3 = 2; UPDATE users_test_table SET value_3 = 0; END; SELECT SUM(value_3) FROM users_test_table; sum ----- 0 (1 row) -- Check with kind of constraints UPDATE users_test_table SET value_3 = 1 WHERE user_id = 3 or true; SELECT COUNT(*), SUM(value_3) FROM users_test_table; count | sum -------+----- 16 | 16 (1 row) UPDATE users_test_table SET value_3 = 0 WHERE user_id = 20 and false; SELECT COUNT(*), SUM(value_3) FROM users_test_table; count | sum -------+----- 16 | 16 (1 row) -- Run multi shard updates with prepared statements PREPARE foo_plan(int,int) AS UPDATE users_test_table SET value_1 = $1, value_3 = $2; EXECUTE foo_plan(1,5); EXECUTE foo_plan(3,15); EXECUTE foo_plan(5,25); EXECUTE foo_plan(7,35); EXECUTE foo_plan(9,45); EXECUTE foo_plan(0,0); SELECT SUM(value_1), SUM(value_3) FROM users_test_table; sum | sum -----+----- 0 | 0 (1 row) -- Test on append table (set executor mode to sequential, since with the append -- distributed tables parallel executor may create tons of connections) SET citus.multi_shard_modify_mode to sequential; CREATE TABLE append_stage_table(id int, col_2 int); INSERT INTO append_stage_table VALUES(1,3); INSERT INTO append_stage_table VALUES(3,2); INSERT INTO append_stage_table VALUES(5,4); CREATE TABLE test_append_table(id int, col_2 int); SELECT create_distributed_table('test_append_table','id','append'); create_distributed_table -------------------------- (1 row) SELECT master_create_empty_shard('test_append_table'); master_create_empty_shard --------------------------- 1440010 (1 row) SELECT * FROM master_append_table_to_shard(1440010, 'append_stage_table', 'localhost', :master_port); master_append_table_to_shard ------------------------------ 0.0266667 (1 row) SELECT master_create_empty_shard('test_append_table') AS new_shard_id; new_shard_id -------------- 1440011 (1 row) SELECT * FROM master_append_table_to_shard(1440011, 'append_stage_table', 'localhost', :master_port); master_append_table_to_shard ------------------------------ 0.0266667 (1 row) UPDATE test_append_table SET col_2 = 5; SELECT * FROM test_append_table; id | col_2 ----+------- 1 | 5 3 | 5 5 | 5 1 | 5 3 | 5 5 | 5 (6 rows) DROP TABLE append_stage_table; DROP TABLE test_append_table; -- Update multi shard of partitioned distributed table SET citus.multi_shard_modify_mode to 'parallel'; SET citus.shard_replication_factor to 1; CREATE TABLE tt1(id int, col_2 int) partition by range (col_2); ERROR: syntax error at or near "partition" LINE 1: CREATE TABLE tt1(id int, col_2 int) partition by range (col_... ^ CREATE TABLE tt1_510 partition of tt1 for VALUES FROM (5) to (10); ERROR: syntax error at or near "partition" LINE 1: CREATE TABLE tt1_510 partition of tt1 for VALUES FROM (5) to... ^ CREATE TABLE tt1_1120 partition of tt1 for VALUES FROM (11) to (20); ERROR: syntax error at or near "partition" LINE 1: CREATE TABLE tt1_1120 partition of tt1 for VALUES FROM (11) ... ^ INSERT INTO tt1 VALUES (1,11), (3,15), (5,17), (6,19), (8,17), (2,12); ERROR: relation "tt1" does not exist LINE 1: INSERT INTO tt1 VALUES (1,11), (3,15), (5,17), (6,19), (8,17... ^ SELECT create_distributed_table('tt1','id'); ERROR: relation "tt1" does not exist LINE 1: SELECT create_distributed_table('tt1','id'); ^ UPDATE tt1 SET col_2 = 13; ERROR: relation "tt1" does not exist LINE 1: UPDATE tt1 SET col_2 = 13; ^ DELETE FROM tt1 WHERE id = 1 or id = 3 or id = 5; ERROR: relation "tt1" does not exist LINE 1: DELETE FROM tt1 WHERE id = 1 or id = 3 or id = 5; ^ SELECT * FROM tt1; ERROR: relation "tt1" does not exist LINE 1: SELECT * FROM tt1; ^ -- Partitioned distributed table within transaction INSERT INTO tt1 VALUES(4,6); ERROR: relation "tt1" does not exist LINE 1: INSERT INTO tt1 VALUES(4,6); ^ INSERT INTO tt1 VALUES(7,7); ERROR: relation "tt1" does not exist LINE 1: INSERT INTO tt1 VALUES(7,7); ^ INSERT INTO tt1 VALUES(9,8); ERROR: relation "tt1" does not exist LINE 1: INSERT INTO tt1 VALUES(9,8); ^ BEGIN; -- Update rows from partititon tt1_1120 UPDATE tt1 SET col_2 = 12 WHERE col_2 > 10 and col_2 < 20; ERROR: relation "tt1" does not exist LINE 1: UPDATE tt1 SET col_2 = 12 WHERE col_2 > 10 and col_2 < 20; ^ -- Update rows from partititon tt1_510 UPDATE tt1 SET col_2 = 7 WHERE col_2 < 10 and col_2 > 5; ERROR: current transaction is aborted, commands ignored until end of transaction block COMMIT; SELECT * FROM tt1 ORDER BY id; ERROR: relation "tt1" does not exist LINE 1: SELECT * FROM tt1 ORDER BY id; ^ -- Modify main table and partition table within same transaction BEGIN; UPDATE tt1 SET col_2 = 12 WHERE col_2 > 10 and col_2 < 20; ERROR: relation "tt1" does not exist LINE 1: UPDATE tt1 SET col_2 = 12 WHERE col_2 > 10 and col_2 < 20; ^ UPDATE tt1 SET col_2 = 7 WHERE col_2 < 10 and col_2 > 5; ERROR: current transaction is aborted, commands ignored until end of transaction block DELETE FROM tt1_510; ERROR: current transaction is aborted, commands ignored until end of transaction block DELETE FROM tt1_1120; ERROR: current transaction is aborted, commands ignored until end of transaction block COMMIT; SELECT * FROM tt1 ORDER BY id; ERROR: relation "tt1" does not exist LINE 1: SELECT * FROM tt1 ORDER BY id; ^ DROP TABLE tt1; ERROR: table "tt1" does not exist -- Update and copy in the same transaction CREATE TABLE tt2(id int, col_2 int); SELECT create_distributed_table('tt2','id'); create_distributed_table -------------------------- (1 row) BEGIN; \COPY tt2 FROM STDIN DELIMITER AS ','; UPDATE tt2 SET col_2 = 1; COMMIT; SELECT * FROM tt2 ORDER BY id; id | col_2 ----+------- 1 | 1 2 | 1 3 | 1 7 | 1 9 | 1 (5 rows) -- Test returning with both type of executors UPDATE tt2 SET col_2 = 5 RETURNING id, col_2; id | col_2 ----+------- 1 | 5 3 | 5 7 | 5 9 | 5 2 | 5 (5 rows) SET citus.multi_shard_modify_mode to sequential; UPDATE tt2 SET col_2 = 3 RETURNING id, col_2; id | col_2 ----+------- 1 | 3 3 | 3 7 | 3 9 | 3 2 | 3 (5 rows) DROP TABLE tt2; -- Multiple RTEs are not supported SET citus.multi_shard_modify_mode to DEFAULT; UPDATE users_test_table SET value_2 = 5 FROM events_test_table WHERE users_test_table.user_id = events_test_table.user_id; ERROR: cannot perform distributed planning for the given modification DETAIL: Joins are not supported in distributed modifications. UPDATE users_test_table SET value_2 = (SELECT value_3 FROM users_test_table); ERROR: subqueries are not supported in modifications across multiple shards DETAIL: Consider using an equality filter on partition column "user_id" to target a single shard. UPDATE users_test_table SET value_2 = (SELECT value_2 FROM events_test_table); ERROR: subqueries are not supported in modifications across multiple shards DETAIL: Consider using an equality filter on partition column "user_id" to target a single shard. DELETE FROM users_test_table USING events_test_table WHERE users_test_table.user_id = events_test_table.user_id; ERROR: cannot perform distributed planning for the given modification DETAIL: Joins are not supported in distributed modifications. DELETE FROM users_test_table WHERE users_test_table.user_id = (SELECT user_id FROM events_test_table); ERROR: subqueries are not supported in modifications across multiple shards DETAIL: Consider using an equality filter on partition column "user_id" to target a single shard. DELETE FROM users_test_table WHERE users_test_table.user_id = (SELECT value_1 FROM users_test_table); ERROR: subqueries are not supported in modifications across multiple shards DETAIL: Consider using an equality filter on partition column "user_id" to target a single shard. -- Cursors are not supported BEGIN; DECLARE test_cursor CURSOR FOR SELECT * FROM users_test_table; FETCH test_cursor; user_id | value_1 | value_2 | value_3 ---------+---------+---------+--------- 8 | 0 | 13 | 0 (1 row) UPDATE users_test_table SET value_2 = 5 WHERE CURRENT OF test_cursor; ERROR: cannot run DML queries with cursors ROLLBACK; -- Stable functions are supported CREATE TABLE test_table_1(id int, date_col timestamptz, col_3 int); INSERT INTO test_table_1 VALUES(1, '2014-04-05 08:32:12', 5); INSERT INTO test_table_1 VALUES(2, '2015-02-01 08:31:16', 7); INSERT INTO test_table_1 VALUES(3, '2011-01-12 08:35:19', 9); SELECT create_distributed_table('test_table_1', 'id'); NOTICE: Copying data from local table... create_distributed_table -------------------------- (1 row) SELECT * FROM test_table_1; id | date_col | col_3 ----+------------------------------+------- 1 | Sat Apr 05 08:32:12 2014 PDT | 5 3 | Wed Jan 12 08:35:19 2011 PST | 9 2 | Sun Feb 01 08:31:16 2015 PST | 7 (3 rows) UPDATE test_table_1 SET col_3 = 3 WHERE date_col < now(); SELECT * FROM test_table_1; id | date_col | col_3 ----+------------------------------+------- 1 | Sat Apr 05 08:32:12 2014 PDT | 3 3 | Wed Jan 12 08:35:19 2011 PST | 3 2 | Sun Feb 01 08:31:16 2015 PST | 3 (3 rows) DELETE FROM test_table_1 WHERE date_col < current_timestamp; SELECT * FROM test_table_1; id | date_col | col_3 ----+----------+------- (0 rows) DROP TABLE test_table_1; -- Volatile functions are not supported CREATE TABLE test_table_2(id int, double_col double precision); INSERT INTO test_table_2 VALUES(1, random()); INSERT INTO test_table_2 VALUES(2, random()); INSERT INTO test_table_2 VALUES(3, random()); SELECT create_distributed_table('test_table_2', 'id'); NOTICE: Copying data from local table... create_distributed_table -------------------------- (1 row) UPDATE test_table_2 SET double_col = random(); ERROR: functions used in UPDATE queries on distributed tables must not be VOLATILE DROP TABLE test_table_2; -- Run multi shard updates and deletes without transaction on reference tables SELECT COUNT(*) FROM users_reference_copy_table; count ------- 15 (1 row) UPDATE users_reference_copy_table SET value_1 = 1; SELECT SUM(value_1) FROM users_reference_copy_table; sum ----- 15 (1 row) SELECT COUNT(*), SUM(value_2) FROM users_reference_copy_table WHERE user_id = 3 or user_id = 5; count | sum -------+----- 4 | 52 (1 row) UPDATE users_reference_copy_table SET value_2 = value_2 + 1 WHERE user_id = 3 or user_id = 5; SELECT COUNT(*), SUM(value_2) FROM users_reference_copy_table WHERE user_id = 3 or user_id = 5; count | sum -------+----- 4 | 56 (1 row) UPDATE users_reference_copy_table SET value_3 = 0 WHERE user_id <> 3; SELECT SUM(value_3) FROM users_reference_copy_table WHERE user_id <> 3; sum ----- 0 (1 row) DELETE FROM users_reference_copy_table WHERE user_id = 3 or user_id = 5; SELECT COUNT(*) FROM users_reference_copy_table WHERE user_id = 3 or user_id = 5; count ------- 0 (1 row) -- Do some tests by changing shard replication factor DROP TABLE users_test_table; SET citus.shard_replication_factor to 2; CREATE TABLE users_test_table(user_id int, value_1 int, value_2 int, value_3 int); SELECT create_distributed_table('users_test_table', 'user_id'); create_distributed_table -------------------------- (1 row) \COPY users_test_table FROM STDIN DELIMITER AS ','; -- Run multi shard updates and deletes without transaction on hash distributed tables UPDATE users_test_table SET value_1 = 1; SELECT COUNT(*), SUM(value_1) FROM users_test_table; count | sum -------+----- 15 | 15 (1 row) SELECT COUNT(*), SUM(value_2) FROM users_test_table WHERE user_id = 1 or user_id = 3; count | sum -------+----- 4 | 52 (1 row) UPDATE users_test_table SET value_2 = value_2 + 1 WHERE user_id = 1 or user_id = 3; SELECT COUNT(*), SUM(value_2) FROM users_test_table WHERE user_id = 1 or user_id = 3; count | sum -------+----- 4 | 56 (1 row) UPDATE users_test_table SET value_3 = 0 WHERE user_id <> 5; SELECT SUM(value_3) FROM users_test_table WHERE user_id <> 5; sum ----- 0 (1 row) SELECT COUNT(*) FROM users_test_table WHERE user_id = 3 or user_id = 5; count ------- 4 (1 row) DELETE FROM users_test_table WHERE user_id = 3 or user_id = 5; SELECT COUNT(*) FROM users_test_table WHERE user_id = 3 or user_id = 5; count ------- 0 (1 row) DROP TABLE users_test_table; DROP TABLE events_test_table; DROP TABLE events_reference_copy_table; DROP TABLE users_reference_copy_table;