mirror of https://github.com/citusdata/citus.git
904 lines
27 KiB
Plaintext
904 lines
27 KiB
Plaintext
--
|
|
-- multi shard update delete
|
|
-- this file is intended to test multi shard update/delete queries
|
|
--
|
|
SET citus.next_shard_id TO 1440000;
|
|
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1440000;
|
|
SET citus.shard_replication_factor to 1;
|
|
SET citus.multi_shard_modify_mode to 'parallel';
|
|
CREATE TABLE users_test_table(user_id int, value_1 int, value_2 int, value_3 int);
|
|
SELECT create_distributed_table('users_test_table', 'user_id');
|
|
create_distributed_table
|
|
--------------------------
|
|
|
|
(1 row)
|
|
|
|
\COPY users_test_table FROM STDIN DELIMITER AS ',';
|
|
CREATE TABLE events_test_table (user_id int, value_1 int, value_2 int, value_3 int);
|
|
SELECT create_distributed_table('events_test_table', 'user_id');
|
|
create_distributed_table
|
|
--------------------------
|
|
|
|
(1 row)
|
|
|
|
\COPY events_test_table FROM STDIN DELIMITER AS ',';
|
|
CREATE TABLE events_reference_copy_table (like events_test_table);
|
|
SELECT create_reference_table('events_reference_copy_table');
|
|
create_reference_table
|
|
------------------------
|
|
|
|
(1 row)
|
|
|
|
INSERT INTO events_reference_copy_table SELECT * FROM events_test_table;
|
|
CREATE TABLE users_reference_copy_table (like users_test_table);
|
|
SELECT create_reference_table('users_reference_copy_table');
|
|
create_reference_table
|
|
------------------------
|
|
|
|
(1 row)
|
|
|
|
INSERT INTO users_reference_copy_table SELECT * FROM users_test_table;
|
|
-- Run multi shard updates and deletes without transaction on hash distributed tables
|
|
UPDATE users_test_table SET value_1 = 1;
|
|
SELECT COUNT(*), SUM(value_1) FROM users_test_table;
|
|
count | sum
|
|
-------+-----
|
|
15 | 15
|
|
(1 row)
|
|
|
|
SELECT COUNT(*), SUM(value_2) FROM users_test_table WHERE user_id = 1 or user_id = 3;
|
|
count | sum
|
|
-------+-----
|
|
4 | 52
|
|
(1 row)
|
|
|
|
UPDATE users_test_table SET value_2 = value_2 + 1 WHERE user_id = 1 or user_id = 3;
|
|
SELECT COUNT(*), SUM(value_2) FROM users_test_table WHERE user_id = 1 or user_id = 3;
|
|
count | sum
|
|
-------+-----
|
|
4 | 56
|
|
(1 row)
|
|
|
|
UPDATE users_test_table SET value_3 = 0 WHERE user_id <> 5;
|
|
SELECT SUM(value_3) FROM users_test_table WHERE user_id <> 5;
|
|
sum
|
|
-----
|
|
0
|
|
(1 row)
|
|
|
|
SELECT COUNT(*) FROM users_test_table WHERE user_id = 3 or user_id = 5;
|
|
count
|
|
-------
|
|
4
|
|
(1 row)
|
|
|
|
DELETE FROM users_test_table WHERE user_id = 3 or user_id = 5;
|
|
SELECT COUNT(*) FROM users_test_table WHERE user_id = 3 or user_id = 5;
|
|
count
|
|
-------
|
|
0
|
|
(1 row)
|
|
|
|
-- Run multi shard update delete queries within transactions
|
|
BEGIN;
|
|
UPDATE users_test_table SET value_3 = 0;
|
|
END;
|
|
SELECT SUM(value_3) FROM users_test_table;
|
|
sum
|
|
-----
|
|
0
|
|
(1 row)
|
|
|
|
-- Update can also be rollbacked
|
|
BEGIN;
|
|
UPDATE users_test_table SET value_3 = 1;
|
|
ROLLBACK;
|
|
SELECT SUM(value_3) FROM users_test_table;
|
|
sum
|
|
-----
|
|
0
|
|
(1 row)
|
|
|
|
-- Run with inserts (we need to set citus.multi_shard_modify_mode to sequential)
|
|
BEGIN;
|
|
INSERT INTO users_test_table (user_id, value_3) VALUES(20, 15);
|
|
INSERT INTO users_test_table (user_id, value_3) VALUES(16,1), (20,16), (7,1), (20,17);
|
|
SET citus.multi_shard_modify_mode to sequential;
|
|
UPDATE users_test_table SET value_3 = 1;
|
|
END;
|
|
SELECT SUM(value_3) FROM users_test_table;
|
|
sum
|
|
-----
|
|
16
|
|
(1 row)
|
|
|
|
SET citus.multi_shard_modify_mode to 'sequential';
|
|
-- Run multiple multi shard updates (with sequential executor)
|
|
BEGIN;
|
|
UPDATE users_test_table SET value_3 = 5;
|
|
UPDATE users_test_table SET value_3 = 0;
|
|
END;
|
|
SELECT SUM(value_3) FROM users_copy_table;
|
|
ERROR: relation "users_copy_table" does not exist
|
|
LINE 1: SELECT SUM(value_3) FROM users_copy_table;
|
|
^
|
|
-- Run multiple multi shard updates (with parallel executor)
|
|
SET citus.multi_shard_modify_mode to 'parallel';
|
|
UPDATE users_test_table SET value_3 = 5;
|
|
BEGIN;
|
|
UPDATE users_test_table SET value_3 = 2;
|
|
UPDATE users_test_table SET value_3 = 0;
|
|
END;
|
|
SELECT SUM(value_3) FROM users_test_table;
|
|
sum
|
|
-----
|
|
0
|
|
(1 row)
|
|
|
|
-- Check with kind of constraints
|
|
UPDATE users_test_table SET value_3 = 1 WHERE user_id = 3 or true;
|
|
SELECT COUNT(*), SUM(value_3) FROM users_test_table;
|
|
count | sum
|
|
-------+-----
|
|
16 | 16
|
|
(1 row)
|
|
|
|
UPDATE users_test_table SET value_3 = 0 WHERE user_id = 20 and false;
|
|
SELECT COUNT(*), SUM(value_3) FROM users_test_table;
|
|
count | sum
|
|
-------+-----
|
|
16 | 16
|
|
(1 row)
|
|
|
|
-- Run multi shard updates with prepared statements
|
|
PREPARE foo_plan(int,int) AS UPDATE users_test_table SET value_1 = $1, value_3 = $2;
|
|
EXECUTE foo_plan(1,5);
|
|
EXECUTE foo_plan(3,15);
|
|
EXECUTE foo_plan(5,25);
|
|
EXECUTE foo_plan(7,35);
|
|
EXECUTE foo_plan(9,45);
|
|
EXECUTE foo_plan(0,0);
|
|
SELECT SUM(value_1), SUM(value_3) FROM users_test_table;
|
|
sum | sum
|
|
-----+-----
|
|
0 | 0
|
|
(1 row)
|
|
|
|
-- Test on append table (set executor mode to sequential, since with the append
|
|
-- distributed tables parallel executor may create tons of connections)
|
|
SET citus.multi_shard_modify_mode to sequential;
|
|
CREATE TABLE append_stage_table(id int, col_2 int);
|
|
INSERT INTO append_stage_table VALUES(1,3);
|
|
INSERT INTO append_stage_table VALUES(3,2);
|
|
INSERT INTO append_stage_table VALUES(5,4);
|
|
CREATE TABLE append_stage_table_2(id int, col_2 int);
|
|
INSERT INTO append_stage_table_2 VALUES(8,3);
|
|
INSERT INTO append_stage_table_2 VALUES(9,2);
|
|
INSERT INTO append_stage_table_2 VALUES(10,4);
|
|
CREATE TABLE test_append_table(id int, col_2 int);
|
|
SELECT create_distributed_table('test_append_table','id','append');
|
|
create_distributed_table
|
|
--------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT master_create_empty_shard('test_append_table');
|
|
master_create_empty_shard
|
|
---------------------------
|
|
1440010
|
|
(1 row)
|
|
|
|
SELECT * FROM master_append_table_to_shard(1440010, 'append_stage_table', 'localhost', :master_port);
|
|
master_append_table_to_shard
|
|
------------------------------
|
|
0.00533333
|
|
(1 row)
|
|
|
|
SELECT master_create_empty_shard('test_append_table') AS new_shard_id;
|
|
new_shard_id
|
|
--------------
|
|
1440011
|
|
(1 row)
|
|
|
|
SELECT * FROM master_append_table_to_shard(1440011, 'append_stage_table_2', 'localhost', :master_port);
|
|
master_append_table_to_shard
|
|
------------------------------
|
|
0.00533333
|
|
(1 row)
|
|
|
|
UPDATE test_append_table SET col_2 = 5;
|
|
SELECT * FROM test_append_table ORDER BY 1 DESC, 2 DESC;
|
|
id | col_2
|
|
----+-------
|
|
10 | 5
|
|
9 | 5
|
|
8 | 5
|
|
5 | 5
|
|
3 | 5
|
|
1 | 5
|
|
(6 rows)
|
|
|
|
DROP TABLE append_stage_table;
|
|
DROP TABLE append_stage_table_2;
|
|
DROP TABLE test_append_table;
|
|
-- Update multi shard of partitioned distributed table
|
|
SET citus.multi_shard_modify_mode to 'parallel';
|
|
SET citus.shard_replication_factor to 1;
|
|
CREATE TABLE tt1(id int, col_2 int) partition by range (col_2);
|
|
CREATE TABLE tt1_510 partition of tt1 for VALUES FROM (5) to (10);
|
|
CREATE TABLE tt1_1120 partition of tt1 for VALUES FROM (11) to (20);
|
|
INSERT INTO tt1 VALUES (1,11), (3,15), (5,17), (6,19), (8,17), (2,12);
|
|
SELECT create_distributed_table('tt1','id');
|
|
NOTICE: Copying data from local table...
|
|
create_distributed_table
|
|
--------------------------
|
|
|
|
(1 row)
|
|
|
|
UPDATE tt1 SET col_2 = 13;
|
|
DELETE FROM tt1 WHERE id = 1 or id = 3 or id = 5;
|
|
SELECT * FROM tt1 ORDER BY 1 DESC, 2 DESC;
|
|
id | col_2
|
|
----+-------
|
|
8 | 13
|
|
6 | 13
|
|
2 | 13
|
|
(3 rows)
|
|
|
|
-- Partitioned distributed table within transaction
|
|
INSERT INTO tt1 VALUES(4,6);
|
|
INSERT INTO tt1 VALUES(7,7);
|
|
INSERT INTO tt1 VALUES(9,8);
|
|
BEGIN;
|
|
-- Update rows from partititon tt1_1120
|
|
UPDATE tt1 SET col_2 = 12 WHERE col_2 > 10 and col_2 < 20;
|
|
-- Update rows from partititon tt1_510
|
|
UPDATE tt1 SET col_2 = 7 WHERE col_2 < 10 and col_2 > 5;
|
|
COMMIT;
|
|
SELECT * FROM tt1 ORDER BY id;
|
|
id | col_2
|
|
----+-------
|
|
2 | 12
|
|
4 | 7
|
|
6 | 12
|
|
7 | 7
|
|
8 | 12
|
|
9 | 7
|
|
(6 rows)
|
|
|
|
-- Modify main table and partition table within same transaction
|
|
BEGIN;
|
|
UPDATE tt1 SET col_2 = 12 WHERE col_2 > 10 and col_2 < 20;
|
|
UPDATE tt1 SET col_2 = 7 WHERE col_2 < 10 and col_2 > 5;
|
|
DELETE FROM tt1_510;
|
|
DELETE FROM tt1_1120;
|
|
COMMIT;
|
|
SELECT * FROM tt1 ORDER BY id;
|
|
id | col_2
|
|
----+-------
|
|
(0 rows)
|
|
|
|
DROP TABLE tt1;
|
|
-- Update and copy in the same transaction
|
|
CREATE TABLE tt2(id int, col_2 int);
|
|
SELECT create_distributed_table('tt2','id');
|
|
create_distributed_table
|
|
--------------------------
|
|
|
|
(1 row)
|
|
|
|
BEGIN;
|
|
\COPY tt2 FROM STDIN DELIMITER AS ',';
|
|
UPDATE tt2 SET col_2 = 1;
|
|
COMMIT;
|
|
SELECT * FROM tt2 ORDER BY id;
|
|
id | col_2
|
|
----+-------
|
|
1 | 1
|
|
2 | 1
|
|
3 | 1
|
|
7 | 1
|
|
9 | 1
|
|
(5 rows)
|
|
|
|
-- Test returning with both type of executors
|
|
UPDATE tt2 SET col_2 = 5 RETURNING id, col_2;
|
|
id | col_2
|
|
----+-------
|
|
1 | 5
|
|
2 | 5
|
|
3 | 5
|
|
7 | 5
|
|
9 | 5
|
|
(5 rows)
|
|
|
|
SET citus.multi_shard_modify_mode to sequential;
|
|
UPDATE tt2 SET col_2 = 3 RETURNING id, col_2;
|
|
id | col_2
|
|
----+-------
|
|
1 | 3
|
|
2 | 3
|
|
3 | 3
|
|
7 | 3
|
|
9 | 3
|
|
(5 rows)
|
|
|
|
DROP TABLE tt2;
|
|
-- Multiple RTEs are only supported if subquery is pushdownable
|
|
SET citus.multi_shard_modify_mode to DEFAULT;
|
|
-- To test colocation between tables in modify query
|
|
SET citus.shard_count to 6;
|
|
CREATE TABLE events_test_table_2 (user_id int, value_1 int, value_2 int, value_3 int);
|
|
SELECT create_distributed_table('events_test_table_2', 'user_id');
|
|
create_distributed_table
|
|
--------------------------
|
|
|
|
(1 row)
|
|
|
|
\COPY events_test_table_2 FROM STDIN DELIMITER AS ',';
|
|
CREATE TABLE events_test_table_local (user_id int, value_1 int, value_2 int, value_3 int);
|
|
\COPY events_test_table_local FROM STDIN DELIMITER AS ',';
|
|
CREATE TABLE test_table_1(id int, date_col timestamptz, col_3 int);
|
|
INSERT INTO test_table_1 VALUES(1, '2014-04-05 08:32:12', 5);
|
|
INSERT INTO test_table_1 VALUES(2, '2015-02-01 08:31:16', 7);
|
|
INSERT INTO test_table_1 VALUES(3, '2111-01-12 08:35:19', 9);
|
|
SELECT create_distributed_table('test_table_1', 'id');
|
|
NOTICE: Copying data from local table...
|
|
create_distributed_table
|
|
--------------------------
|
|
|
|
(1 row)
|
|
|
|
-- We can pushdown query if there is partition key equality
|
|
UPDATE users_test_table
|
|
SET value_2 = 5
|
|
FROM events_test_table
|
|
WHERE users_test_table.user_id = events_test_table.user_id;
|
|
DELETE FROM users_test_table
|
|
USING events_test_table
|
|
WHERE users_test_table.user_id = events_test_table.user_id;
|
|
UPDATE users_test_table
|
|
SET value_1 = 3
|
|
WHERE user_id IN (SELECT user_id
|
|
FROM events_test_table);
|
|
DELETE FROM users_test_table
|
|
WHERE user_id IN (SELECT user_id
|
|
FROM events_test_table);
|
|
DELETE FROM events_test_table_2
|
|
WHERE now() > (SELECT max(date_col)
|
|
FROM test_table_1
|
|
WHERE test_table_1.id = events_test_table_2.user_id
|
|
GROUP BY id)
|
|
RETURNING *;
|
|
user_id | value_1 | value_2 | value_3
|
|
---------+---------+---------+---------
|
|
1 | 5 | 7 | 7
|
|
1 | 20 | 12 | 25
|
|
1 | 60 | 17 | 17
|
|
(3 rows)
|
|
|
|
UPDATE users_test_table
|
|
SET value_1 = 5
|
|
FROM events_test_table
|
|
WHERE users_test_table.user_id = events_test_table.user_id
|
|
AND events_test_table.user_id > 5;
|
|
UPDATE users_test_table
|
|
SET value_1 = 4
|
|
WHERE user_id IN (SELECT user_id
|
|
FROM users_test_table
|
|
UNION
|
|
SELECT user_id
|
|
FROM events_test_table);
|
|
UPDATE users_test_table
|
|
SET value_1 = 4
|
|
WHERE user_id IN (SELECT user_id
|
|
FROM users_test_table
|
|
UNION
|
|
SELECT user_id
|
|
FROM events_test_table) returning value_3;
|
|
value_3
|
|
---------
|
|
0
|
|
0
|
|
0
|
|
0
|
|
0
|
|
0
|
|
0
|
|
0
|
|
0
|
|
0
|
|
0
|
|
(11 rows)
|
|
|
|
UPDATE users_test_table
|
|
SET value_1 = 4
|
|
WHERE user_id IN (SELECT user_id
|
|
FROM users_test_table
|
|
UNION ALL
|
|
SELECT user_id
|
|
FROM events_test_table) returning value_3;
|
|
value_3
|
|
---------
|
|
0
|
|
0
|
|
0
|
|
0
|
|
0
|
|
0
|
|
0
|
|
0
|
|
0
|
|
0
|
|
0
|
|
(11 rows)
|
|
|
|
UPDATE users_test_table
|
|
SET value_1 = 5
|
|
WHERE
|
|
value_2 >
|
|
(SELECT
|
|
max(value_2)
|
|
FROM
|
|
events_test_table
|
|
WHERE
|
|
users_test_table.user_id = events_test_table.user_id
|
|
GROUP BY
|
|
user_id
|
|
);
|
|
UPDATE users_test_table
|
|
SET value_3 = 1
|
|
WHERE
|
|
value_2 >
|
|
(SELECT
|
|
max(value_2)
|
|
FROM
|
|
events_test_table
|
|
WHERE
|
|
users_test_table.user_id = events_test_table.user_id AND
|
|
users_test_table.value_2 > events_test_table.value_2
|
|
GROUP BY
|
|
user_id
|
|
);
|
|
UPDATE users_test_table
|
|
SET value_2 = 4
|
|
WHERE
|
|
value_1 > 1 AND value_1 < 3
|
|
AND value_2 >= 1
|
|
AND user_id IN
|
|
(
|
|
SELECT
|
|
e1.user_id
|
|
FROM (
|
|
SELECT
|
|
user_id,
|
|
1 AS view_homepage
|
|
FROM events_test_table
|
|
WHERE
|
|
value_1 IN (0, 1)
|
|
) e1 LEFT JOIN LATERAL (
|
|
SELECT
|
|
user_id,
|
|
1 AS use_demo
|
|
FROM events_test_table
|
|
WHERE
|
|
user_id = e1.user_id
|
|
) e2 ON true
|
|
);
|
|
UPDATE users_test_table
|
|
SET value_3 = 5
|
|
WHERE value_2 IN (SELECT AVG(value_1) OVER (PARTITION BY user_id) FROM events_test_table WHERE events_test_table.user_id = users_test_table.user_id);
|
|
-- Test it within transaction
|
|
BEGIN;
|
|
INSERT INTO users_test_table
|
|
SELECT * FROM events_test_table
|
|
WHERE events_test_table.user_id = 1 OR events_test_table.user_id = 5;
|
|
SELECT SUM(value_2) FROM users_test_table;
|
|
sum
|
|
-----
|
|
169
|
|
(1 row)
|
|
|
|
UPDATE users_test_table
|
|
SET value_2 = 1
|
|
FROM events_test_table
|
|
WHERE users_test_table.user_id = events_test_table.user_id;
|
|
SELECT SUM(value_2) FROM users_test_table;
|
|
sum
|
|
-----
|
|
97
|
|
(1 row)
|
|
|
|
COMMIT;
|
|
-- Test with schema
|
|
CREATE SCHEMA sec_schema;
|
|
CREATE TABLE sec_schema.tt1(id int, value_1 int);
|
|
SELECT create_distributed_table('sec_schema.tt1','id');
|
|
create_distributed_table
|
|
--------------------------
|
|
|
|
(1 row)
|
|
|
|
INSERT INTO sec_schema.tt1 values(1,1),(2,2),(7,7),(9,9);
|
|
UPDATE sec_schema.tt1
|
|
SET value_1 = 11
|
|
WHERE id < (SELECT max(value_2) FROM events_test_table_2
|
|
WHERE sec_schema.tt1.id = events_test_table_2.user_id
|
|
GROUP BY user_id)
|
|
RETURNING *;
|
|
id | value_1
|
|
----+---------
|
|
7 | 11
|
|
9 | 11
|
|
(2 rows)
|
|
|
|
DROP SCHEMA sec_schema CASCADE;
|
|
NOTICE: drop cascades to table sec_schema.tt1
|
|
-- We don't need partition key equality with reference tables
|
|
UPDATE events_test_table
|
|
SET value_2 = 5
|
|
FROM users_reference_copy_table
|
|
WHERE users_reference_copy_table.user_id = events_test_table.value_1;
|
|
-- Both reference tables and hash distributed tables can be used in subquery
|
|
UPDATE events_test_table as ett
|
|
SET value_2 = 6
|
|
WHERE ett.value_3 IN (SELECT utt.value_3
|
|
FROM users_test_table as utt, users_reference_copy_table as uct
|
|
WHERE utt.user_id = uct.user_id AND utt.user_id = ett.user_id);
|
|
-- We don't need equality check with constant values in sub-select
|
|
UPDATE users_reference_copy_table
|
|
SET value_2 = 6
|
|
WHERE user_id IN (SELECT 2);
|
|
UPDATE users_reference_copy_table
|
|
SET value_2 = 6
|
|
WHERE value_1 IN (SELECT 2);
|
|
UPDATE users_test_table
|
|
SET value_2 = 6
|
|
WHERE user_id IN (SELECT 2);
|
|
UPDATE users_test_table
|
|
SET value_2 = 6
|
|
WHERE value_1 IN (SELECT 2);
|
|
-- Function calls in subqueries will be recursively planned
|
|
UPDATE test_table_1
|
|
SET col_3 = 6
|
|
WHERE date_col IN (SELECT now());
|
|
-- Test with prepared statements
|
|
SELECT COUNT(*) FROM users_test_table WHERE value_1 = 0;
|
|
count
|
|
-------
|
|
0
|
|
(1 row)
|
|
|
|
PREPARE foo_plan_2(int,int) AS UPDATE users_test_table
|
|
SET value_1 = $1, value_3 = $2
|
|
FROM events_test_table
|
|
WHERE users_test_table.user_id = events_test_table.user_id;
|
|
EXECUTE foo_plan_2(1,5);
|
|
EXECUTE foo_plan_2(3,15);
|
|
EXECUTE foo_plan_2(5,25);
|
|
EXECUTE foo_plan_2(7,35);
|
|
EXECUTE foo_plan_2(9,45);
|
|
EXECUTE foo_plan_2(0,0);
|
|
SELECT COUNT(*) FROM users_test_table WHERE value_1 = 0;
|
|
count
|
|
-------
|
|
6
|
|
(1 row)
|
|
|
|
-- Test with varying WHERE expressions
|
|
UPDATE users_test_table
|
|
SET value_1 = 7
|
|
FROM events_test_table
|
|
WHERE users_test_table.user_id = events_test_table.user_id OR FALSE;
|
|
UPDATE users_test_table
|
|
SET value_1 = 7
|
|
FROM events_test_table
|
|
WHERE users_test_table.user_id = events_test_table.user_id AND TRUE;
|
|
-- Test with inactive shard-placement
|
|
-- manually set shardstate of one placement of users_test_table as inactive
|
|
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1440000;
|
|
UPDATE users_test_table
|
|
SET value_2 = 5
|
|
FROM events_test_table
|
|
WHERE users_test_table.user_id = events_test_table.user_id;
|
|
ERROR: cannot find a worker that has active placements for all shards in the query
|
|
-- manually set shardstate of one placement of events_test_table as inactive
|
|
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1440004;
|
|
UPDATE users_test_table
|
|
SET value_2 = 5
|
|
FROM events_test_table
|
|
WHERE users_test_table.user_id = events_test_table.user_id;
|
|
ERROR: cannot find a worker that has active placements for all shards in the query
|
|
UPDATE pg_dist_shard_placement SET shardstate = 1 WHERE shardid = 1440000;
|
|
UPDATE pg_dist_shard_placement SET shardstate = 1 WHERE shardid = 1440004;
|
|
-- Subquery must return single value to use it with comparison operators
|
|
UPDATE users_test_table as utt
|
|
SET value_1 = 3
|
|
WHERE value_2 > (SELECT value_3 FROM events_test_table as ett WHERE utt.user_id = ett.user_id);
|
|
ERROR: more than one row returned by a subquery used as an expression
|
|
CONTEXT: while executing command on localhost:57637
|
|
-- We can not pushdown a query if the target relation is reference table
|
|
UPDATE users_reference_copy_table
|
|
SET value_2 = 5
|
|
FROM events_test_table
|
|
WHERE users_reference_copy_table.user_id = events_test_table.user_id;
|
|
ERROR: only reference tables may be queried when targeting a reference table with multi shard UPDATE/DELETE queries with multiple tables
|
|
-- We cannot push down it if the query has outer join and using
|
|
UPDATE events_test_table
|
|
SET value_2 = users_test_table.user_id
|
|
FROM users_test_table
|
|
FULL OUTER JOIN events_test_table e2 USING (user_id)
|
|
WHERE e2.user_id = events_test_table.user_id RETURNING events_test_table.value_2;
|
|
ERROR: a join with USING causes an internal naming conflict, use ON instead
|
|
-- Non-pushdownable subqueries, but will be handled through recursive planning
|
|
UPDATE users_test_table
|
|
SET value_1 = 1
|
|
WHERE user_id IN (SELECT Count(value_1)
|
|
FROM events_test_table
|
|
GROUP BY user_id);
|
|
UPDATE users_test_table
|
|
SET value_1 = (SELECT Count(*)
|
|
FROM events_test_table);
|
|
UPDATE users_test_table
|
|
SET value_1 = 4
|
|
WHERE user_id IN (SELECT user_id
|
|
FROM users_test_table
|
|
UNION
|
|
SELECT value_1
|
|
FROM events_test_table);
|
|
UPDATE users_test_table
|
|
SET value_1 = 4
|
|
WHERE user_id IN (SELECT user_id
|
|
FROM users_test_table
|
|
INTERSECT
|
|
SELECT Sum(value_1)
|
|
FROM events_test_table
|
|
GROUP BY user_id);
|
|
UPDATE users_test_table
|
|
SET value_2 = (SELECT value_3
|
|
FROM users_test_table);
|
|
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
|
|
UPDATE users_test_table
|
|
SET value_2 = 2
|
|
WHERE
|
|
value_2 >
|
|
(SELECT
|
|
max(value_2)
|
|
FROM
|
|
events_test_table
|
|
WHERE
|
|
users_test_table.user_id > events_test_table.user_id AND
|
|
users_test_table.value_1 = events_test_table.value_1
|
|
GROUP BY
|
|
user_id
|
|
);
|
|
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
|
|
UPDATE users_test_table
|
|
SET (value_1, value_2) = (2,1)
|
|
WHERE user_id IN
|
|
(SELECT user_id
|
|
FROM users_test_table
|
|
INTERSECT
|
|
SELECT user_id
|
|
FROM events_test_table);
|
|
-- Reference tables can not locate on the outer part of the outer join
|
|
UPDATE users_test_table
|
|
SET value_1 = 4
|
|
WHERE user_id IN
|
|
(SELECT DISTINCT e2.user_id
|
|
FROM users_reference_copy_table
|
|
LEFT JOIN users_test_table e2 ON (e2.user_id = users_reference_copy_table.value_1)) RETURNING *;
|
|
ERROR: cannot pushdown the subquery
|
|
DETAIL: There exist a reference table in the outer part of the outer join
|
|
-- Volatile functions are also not supported
|
|
UPDATE users_test_table
|
|
SET value_2 = 5
|
|
FROM events_test_table
|
|
WHERE users_test_table.user_id = events_test_table.user_id * random();
|
|
ERROR: functions used in the WHERE clause of modification queries on distributed tables must not be VOLATILE
|
|
UPDATE users_test_table
|
|
SET value_2 = 5 * random()
|
|
FROM events_test_table
|
|
WHERE users_test_table.user_id = events_test_table.user_id;
|
|
ERROR: functions used in UPDATE queries on distributed tables must not be VOLATILE
|
|
-- Recursive modify planner does not take care of following test because the query
|
|
-- is fully pushdownable, yet not allowed because it would lead to inconsistent replicas.
|
|
UPDATE users_test_table
|
|
SET value_2 = subquery.random FROM (SELECT user_id, random()
|
|
FROM events_test_table) subquery
|
|
WHERE users_test_table.user_id = subquery.user_id;
|
|
ERROR: functions used in UPDATE queries on distributed tables must not be VOLATILE
|
|
-- Volatile functions in a subquery are recursively planned
|
|
UPDATE users_test_table
|
|
SET value_2 = 5
|
|
WHERE users_test_table.user_id IN (SELECT user_id * random() FROM events_test_table);
|
|
UPDATE users_test_table
|
|
SET value_2 = subquery.random FROM (SELECT user_id, random()
|
|
FROM events_test_table) subquery;
|
|
UPDATE users_test_table
|
|
SET value_2 = subquery.random FROM (SELECT user_id, random()
|
|
FROM events_test_table OFFSET 0) subquery
|
|
WHERE users_test_table.user_id = subquery.user_id;
|
|
-- Make following tests consistent
|
|
UPDATE users_test_table SET value_2 = 0;
|
|
-- Local tables are not supported
|
|
UPDATE users_test_table
|
|
SET value_2 = 5
|
|
FROM events_test_table_local
|
|
WHERE users_test_table.user_id = events_test_table_local.user_id;
|
|
ERROR: relation events_test_table_local is not distributed
|
|
UPDATE events_test_table_local
|
|
SET value_2 = 5
|
|
FROM users_test_table
|
|
WHERE events_test_table_local.user_id = users_test_table.user_id;
|
|
ERROR: relation events_test_table_local is not distributed
|
|
-- Local tables in a subquery are supported through recursive planning
|
|
UPDATE users_test_table
|
|
SET value_2 = 5
|
|
WHERE users_test_table.user_id IN(SELECT user_id FROM events_test_table_local);
|
|
-- Shard counts of tables must be equal to pushdown the query
|
|
UPDATE users_test_table
|
|
SET value_2 = 5
|
|
FROM events_test_table_2
|
|
WHERE users_test_table.user_id = events_test_table_2.user_id;
|
|
ERROR: cannot push down this subquery
|
|
DETAIL: Shards of relations in subquery need to have 1-to-1 shard partitioning
|
|
-- Should error out due to multiple row return from subquery, but we can not get this information within
|
|
-- subquery pushdown planner. This query will be sent to worker with recursive planner.
|
|
\set VERBOSITY terse
|
|
DELETE FROM users_test_table
|
|
WHERE users_test_table.user_id = (SELECT user_id
|
|
FROM events_test_table);
|
|
ERROR: more than one row returned by a subquery used as an expression
|
|
\set VERBOSITY default
|
|
-- Cursors are not supported
|
|
BEGIN;
|
|
DECLARE test_cursor CURSOR FOR SELECT * FROM users_test_table ORDER BY user_id;
|
|
FETCH test_cursor;
|
|
user_id | value_1 | value_2 | value_3
|
|
---------+---------+---------+---------
|
|
1 | 2 | 5 | 0
|
|
(1 row)
|
|
|
|
UPDATE users_test_table SET value_2 = 5 WHERE CURRENT OF test_cursor;
|
|
ERROR: cannot run DML queries with cursors
|
|
ROLLBACK;
|
|
-- Stable functions are supported
|
|
SELECT * FROM test_table_1 ORDER BY 1 DESC, 2 DESC, 3 DESC;
|
|
id | date_col | col_3
|
|
----+------------------------------+-------
|
|
3 | Mon Jan 12 08:35:19 2111 PST | 9
|
|
2 | Sun Feb 01 08:31:16 2015 PST | 7
|
|
1 | Sat Apr 05 08:32:12 2014 PDT | 5
|
|
(3 rows)
|
|
|
|
UPDATE test_table_1 SET col_3 = 3 WHERE date_col < now();
|
|
SELECT * FROM test_table_1 ORDER BY 1 DESC, 2 DESC, 3 DESC;
|
|
id | date_col | col_3
|
|
----+------------------------------+-------
|
|
3 | Mon Jan 12 08:35:19 2111 PST | 9
|
|
2 | Sun Feb 01 08:31:16 2015 PST | 3
|
|
1 | Sat Apr 05 08:32:12 2014 PDT | 3
|
|
(3 rows)
|
|
|
|
DELETE FROM test_table_1 WHERE date_col < current_timestamp;
|
|
SELECT * FROM test_table_1 ORDER BY 1 DESC, 2 DESC, 3 DESC;
|
|
id | date_col | col_3
|
|
----+------------------------------+-------
|
|
3 | Mon Jan 12 08:35:19 2111 PST | 9
|
|
(1 row)
|
|
|
|
DROP TABLE test_table_1;
|
|
-- Volatile functions are not supported
|
|
CREATE TABLE test_table_2(id int, double_col double precision);
|
|
INSERT INTO test_table_2 VALUES(1, random());
|
|
INSERT INTO test_table_2 VALUES(2, random());
|
|
INSERT INTO test_table_2 VALUES(3, random());
|
|
SELECT create_distributed_table('test_table_2', 'id');
|
|
NOTICE: Copying data from local table...
|
|
create_distributed_table
|
|
--------------------------
|
|
|
|
(1 row)
|
|
|
|
UPDATE test_table_2 SET double_col = random();
|
|
ERROR: functions used in UPDATE queries on distributed tables must not be VOLATILE
|
|
DROP TABLE test_table_2;
|
|
-- Run multi shard updates and deletes without transaction on reference tables
|
|
SELECT COUNT(*) FROM users_reference_copy_table;
|
|
count
|
|
-------
|
|
15
|
|
(1 row)
|
|
|
|
UPDATE users_reference_copy_table SET value_1 = 1;
|
|
SELECT SUM(value_1) FROM users_reference_copy_table;
|
|
sum
|
|
-----
|
|
15
|
|
(1 row)
|
|
|
|
SELECT COUNT(*), SUM(value_2) FROM users_reference_copy_table WHERE user_id = 3 or user_id = 5;
|
|
count | sum
|
|
-------+-----
|
|
4 | 52
|
|
(1 row)
|
|
|
|
UPDATE users_reference_copy_table SET value_2 = value_2 + 1 WHERE user_id = 3 or user_id = 5;
|
|
SELECT COUNT(*), SUM(value_2) FROM users_reference_copy_table WHERE user_id = 3 or user_id = 5;
|
|
count | sum
|
|
-------+-----
|
|
4 | 56
|
|
(1 row)
|
|
|
|
UPDATE users_reference_copy_table SET value_3 = 0 WHERE user_id <> 3;
|
|
SELECT SUM(value_3) FROM users_reference_copy_table WHERE user_id <> 3;
|
|
sum
|
|
-----
|
|
0
|
|
(1 row)
|
|
|
|
DELETE FROM users_reference_copy_table WHERE user_id = 3 or user_id = 5;
|
|
SELECT COUNT(*) FROM users_reference_copy_table WHERE user_id = 3 or user_id = 5;
|
|
count
|
|
-------
|
|
0
|
|
(1 row)
|
|
|
|
-- Do some tests by changing shard replication factor
|
|
DROP TABLE users_test_table;
|
|
SET citus.shard_replication_factor to 2;
|
|
CREATE TABLE users_test_table(user_id int, value_1 int, value_2 int, value_3 int);
|
|
SELECT create_distributed_table('users_test_table', 'user_id');
|
|
create_distributed_table
|
|
--------------------------
|
|
|
|
(1 row)
|
|
|
|
\COPY users_test_table FROM STDIN DELIMITER AS ',';
|
|
-- Run multi shard updates and deletes without transaction on hash distributed tables
|
|
UPDATE users_test_table SET value_1 = 1;
|
|
SELECT COUNT(*), SUM(value_1) FROM users_test_table;
|
|
count | sum
|
|
-------+-----
|
|
15 | 15
|
|
(1 row)
|
|
|
|
SELECT COUNT(*), SUM(value_2) FROM users_test_table WHERE user_id = 1 or user_id = 3;
|
|
count | sum
|
|
-------+-----
|
|
4 | 52
|
|
(1 row)
|
|
|
|
UPDATE users_test_table SET value_2 = value_2 + 1 WHERE user_id = 1 or user_id = 3;
|
|
SELECT COUNT(*), SUM(value_2) FROM users_test_table WHERE user_id = 1 or user_id = 3;
|
|
count | sum
|
|
-------+-----
|
|
4 | 56
|
|
(1 row)
|
|
|
|
UPDATE users_test_table SET value_3 = 0 WHERE user_id <> 5;
|
|
SELECT SUM(value_3) FROM users_test_table WHERE user_id <> 5;
|
|
sum
|
|
-----
|
|
0
|
|
(1 row)
|
|
|
|
SELECT COUNT(*) FROM users_test_table WHERE user_id = 3 or user_id = 5;
|
|
count
|
|
-------
|
|
4
|
|
(1 row)
|
|
|
|
DELETE FROM users_test_table WHERE user_id = 3 or user_id = 5;
|
|
SELECT COUNT(*) FROM users_test_table WHERE user_id = 3 or user_id = 5;
|
|
count
|
|
-------
|
|
0
|
|
(1 row)
|
|
|
|
DROP TABLE users_test_table;
|
|
DROP TABLE events_test_table;
|
|
DROP TABLE events_reference_copy_table;
|
|
DROP TABLE users_reference_copy_table;
|