mirror of https://github.com/citusdata/citus.git
Merge pull request #2318 from citusdata/mt_failure_test
Add new failure tests for multi-shard/CTE modify and cte coordinator pullpull/2426/head
commit
c8151818e7
|
@ -0,0 +1,391 @@
|
||||||
|
CREATE SCHEMA cte_failure;
|
||||||
|
SET SEARCH_PATH=cte_failure;
|
||||||
|
SET citus.shard_count to 2;
|
||||||
|
SET citus.shard_replication_factor to 1;
|
||||||
|
SELECT pg_backend_pid() as pid \gset
|
||||||
|
CREATE TABLE users_table (user_id int, user_name text);
|
||||||
|
CREATE TABLE events_table(user_id int, event_id int, event_type int);
|
||||||
|
SELECT create_distributed_table('users_table', 'user_id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT create_distributed_table('events_table', 'user_id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE users_table_local AS SELECT * FROM users_table;
|
||||||
|
-- kill at the first copy (push)
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
WITH cte AS (
|
||||||
|
WITH local_cte AS (
|
||||||
|
SELECT * FROM users_table_local
|
||||||
|
),
|
||||||
|
dist_cte AS (
|
||||||
|
SELECT user_id FROM events_table
|
||||||
|
)
|
||||||
|
SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id
|
||||||
|
)
|
||||||
|
SELECT
|
||||||
|
count(*)
|
||||||
|
FROM
|
||||||
|
cte,
|
||||||
|
(SELECT
|
||||||
|
DISTINCT users_table.user_id
|
||||||
|
FROM
|
||||||
|
users_table, events_table
|
||||||
|
WHERE
|
||||||
|
users_table.user_id = events_table.user_id AND
|
||||||
|
event_type IN (1,2,3,4)
|
||||||
|
ORDER BY 1 DESC LIMIT 5
|
||||||
|
) as foo
|
||||||
|
WHERE foo.user_id = cte.user_id;
|
||||||
|
ERROR: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: while executing command on localhost:9060
|
||||||
|
-- kill at the second copy (pull)
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="SELECT user_id FROM cte_failure.events_table_102250").kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
WITH cte AS (
|
||||||
|
WITH local_cte AS (
|
||||||
|
SELECT * FROM users_table_local
|
||||||
|
),
|
||||||
|
dist_cte AS (
|
||||||
|
SELECT user_id FROM events_table
|
||||||
|
)
|
||||||
|
SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id
|
||||||
|
)
|
||||||
|
SELECT
|
||||||
|
count(*)
|
||||||
|
FROM
|
||||||
|
cte,
|
||||||
|
(SELECT
|
||||||
|
DISTINCT users_table.user_id
|
||||||
|
FROM
|
||||||
|
users_table, events_table
|
||||||
|
WHERE
|
||||||
|
users_table.user_id = events_table.user_id AND
|
||||||
|
event_type IN (1,2,3,4)
|
||||||
|
ORDER BY 1 DESC LIMIT 5
|
||||||
|
) as foo
|
||||||
|
WHERE foo.user_id = cte.user_id;
|
||||||
|
WARNING: could not consume data from worker node
|
||||||
|
WARNING: could not consume data from worker node
|
||||||
|
WARNING: could not consume data from worker node
|
||||||
|
ERROR: failed to execute task 1
|
||||||
|
|
||||||
|
-- kill at the third copy (pull)
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="SELECT DISTINCT users_table.user").kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
WITH cte AS (
|
||||||
|
WITH local_cte AS (
|
||||||
|
SELECT * FROM users_table_local
|
||||||
|
),
|
||||||
|
dist_cte AS (
|
||||||
|
SELECT user_id FROM events_table
|
||||||
|
)
|
||||||
|
SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id
|
||||||
|
)
|
||||||
|
SELECT
|
||||||
|
count(*)
|
||||||
|
FROM
|
||||||
|
cte,
|
||||||
|
(SELECT
|
||||||
|
DISTINCT users_table.user_id
|
||||||
|
FROM
|
||||||
|
users_table, events_table
|
||||||
|
WHERE
|
||||||
|
users_table.user_id = events_table.user_id AND
|
||||||
|
event_type IN (1,2,3,4)
|
||||||
|
ORDER BY 1 DESC LIMIT 5
|
||||||
|
) as foo
|
||||||
|
WHERE foo.user_id = cte.user_id;
|
||||||
|
WARNING: could not consume data from worker node
|
||||||
|
WARNING: could not consume data from worker node
|
||||||
|
ERROR: failed to execute task 1
|
||||||
|
-- cancel at the first copy (push)
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || :pid || ')');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
WITH cte AS (
|
||||||
|
WITH local_cte AS (
|
||||||
|
SELECT * FROM users_table_local
|
||||||
|
),
|
||||||
|
dist_cte AS (
|
||||||
|
SELECT user_id FROM events_table
|
||||||
|
)
|
||||||
|
SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id
|
||||||
|
)
|
||||||
|
SELECT
|
||||||
|
count(*)
|
||||||
|
FROM
|
||||||
|
cte,
|
||||||
|
(SELECT
|
||||||
|
DISTINCT users_table.user_id
|
||||||
|
FROM
|
||||||
|
users_table, events_table
|
||||||
|
WHERE
|
||||||
|
users_table.user_id = events_table.user_id AND
|
||||||
|
event_type IN (1,2,3,4)
|
||||||
|
ORDER BY 1 DESC LIMIT 5
|
||||||
|
) as foo
|
||||||
|
WHERE foo.user_id = cte.user_id;
|
||||||
|
ERROR: canceling statement due to user request
|
||||||
|
-- cancel at the second copy (pull)
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="SELECT user_id FROM").cancel(' || :pid || ')');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
WITH cte AS (
|
||||||
|
WITH local_cte AS (
|
||||||
|
SELECT * FROM users_table_local
|
||||||
|
),
|
||||||
|
dist_cte AS (
|
||||||
|
SELECT user_id FROM events_table
|
||||||
|
)
|
||||||
|
SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id
|
||||||
|
)
|
||||||
|
SELECT
|
||||||
|
count(*)
|
||||||
|
FROM
|
||||||
|
cte,
|
||||||
|
(SELECT
|
||||||
|
DISTINCT users_table.user_id
|
||||||
|
FROM
|
||||||
|
users_table, events_table
|
||||||
|
WHERE
|
||||||
|
users_table.user_id = events_table.user_id AND
|
||||||
|
event_type IN (1,2,3,4)
|
||||||
|
ORDER BY 1 DESC LIMIT 5
|
||||||
|
) as foo
|
||||||
|
WHERE foo.user_id = cte.user_id;
|
||||||
|
ERROR: canceling statement due to user request
|
||||||
|
-- cancel at the third copy (pull)
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="SELECT DISTINCT users_table.user").cancel(' || :pid || ')');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
WITH cte AS (
|
||||||
|
WITH local_cte AS (
|
||||||
|
SELECT * FROM users_table_local
|
||||||
|
),
|
||||||
|
dist_cte AS (
|
||||||
|
SELECT user_id FROM events_table
|
||||||
|
)
|
||||||
|
SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id
|
||||||
|
)
|
||||||
|
SELECT
|
||||||
|
count(*)
|
||||||
|
FROM
|
||||||
|
cte,
|
||||||
|
(SELECT
|
||||||
|
DISTINCT users_table.user_id
|
||||||
|
FROM
|
||||||
|
users_table, events_table
|
||||||
|
WHERE
|
||||||
|
users_table.user_id = events_table.user_id AND
|
||||||
|
event_type IN (1,2,3,4)
|
||||||
|
ORDER BY 1 DESC LIMIT 5
|
||||||
|
) as foo
|
||||||
|
WHERE foo.user_id = cte.user_id;
|
||||||
|
ERROR: canceling statement due to user request
|
||||||
|
-- distributed update tests
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- insert some rows
|
||||||
|
INSERT INTO users_table VALUES (1, 'A'), (2, 'B'), (3, 'C'), (4, 'D'), (5, 'E');
|
||||||
|
INSERT INTO events_table VALUES (1,1,1), (1,2,1), (1,3,1), (2,1, 4), (3, 4,1), (5, 1, 2), (5, 2, 1), (5, 2,2);
|
||||||
|
SELECT * FROM users_table ORDER BY 1, 2;
|
||||||
|
user_id | user_name
|
||||||
|
---------+-----------
|
||||||
|
1 | A
|
||||||
|
2 | B
|
||||||
|
3 | C
|
||||||
|
4 | D
|
||||||
|
5 | E
|
||||||
|
(5 rows)
|
||||||
|
|
||||||
|
-- following will delete and insert the same rows
|
||||||
|
WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *)
|
||||||
|
INSERT INTO users_table SELECT * FROM cte_delete;
|
||||||
|
-- verify contents are the same
|
||||||
|
SELECT * FROM users_table ORDER BY 1, 2;
|
||||||
|
user_id | user_name
|
||||||
|
---------+-----------
|
||||||
|
1 | A
|
||||||
|
2 | B
|
||||||
|
3 | C
|
||||||
|
4 | D
|
||||||
|
5 | E
|
||||||
|
(5 rows)
|
||||||
|
|
||||||
|
-- kill connection during deletion
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM").kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *)
|
||||||
|
INSERT INTO users_table SELECT * FROM cte_delete;
|
||||||
|
ERROR: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: while executing command on localhost:9060
|
||||||
|
-- verify contents are the same
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM users_table ORDER BY 1, 2;
|
||||||
|
user_id | user_name
|
||||||
|
---------+-----------
|
||||||
|
1 | A
|
||||||
|
2 | B
|
||||||
|
3 | C
|
||||||
|
4 | D
|
||||||
|
5 | E
|
||||||
|
(5 rows)
|
||||||
|
|
||||||
|
-- kill connection during insert
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *)
|
||||||
|
INSERT INTO users_table SELECT * FROM cte_delete;
|
||||||
|
ERROR: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: while executing command on localhost:9060
|
||||||
|
-- verify contents are the same
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM users_table ORDER BY 1, 2;
|
||||||
|
user_id | user_name
|
||||||
|
---------+-----------
|
||||||
|
1 | A
|
||||||
|
2 | B
|
||||||
|
3 | C
|
||||||
|
4 | D
|
||||||
|
5 | E
|
||||||
|
(5 rows)
|
||||||
|
|
||||||
|
-- cancel during deletion
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM").cancel(' || :pid || ')');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *)
|
||||||
|
INSERT INTO users_table SELECT * FROM cte_delete;
|
||||||
|
ERROR: canceling statement due to user request
|
||||||
|
-- verify contents are the same
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM users_table ORDER BY 1, 2;
|
||||||
|
user_id | user_name
|
||||||
|
---------+-----------
|
||||||
|
1 | A
|
||||||
|
2 | B
|
||||||
|
3 | C
|
||||||
|
4 | D
|
||||||
|
5 | E
|
||||||
|
(5 rows)
|
||||||
|
|
||||||
|
-- cancel during insert
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || :pid || ')');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *)
|
||||||
|
INSERT INTO users_table SELECT * FROM cte_delete;
|
||||||
|
ERROR: canceling statement due to user request
|
||||||
|
-- verify contents are the same
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM users_table ORDER BY 1, 2;
|
||||||
|
user_id | user_name
|
||||||
|
---------+-----------
|
||||||
|
1 | A
|
||||||
|
2 | B
|
||||||
|
3 | C
|
||||||
|
4 | D
|
||||||
|
5 | E
|
||||||
|
(5 rows)
|
||||||
|
|
||||||
|
-- test sequential delete/insert
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM").kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.multi_shard_modify_mode = 'sequential';
|
||||||
|
WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *)
|
||||||
|
INSERT INTO users_table SELECT * FROM cte_delete;
|
||||||
|
ERROR: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: while executing command on localhost:9060
|
||||||
|
END;
|
||||||
|
RESET SEARCH_PATH;
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP SCHEMA cte_failure CASCADE;
|
||||||
|
NOTICE: drop cascades to 3 other objects
|
||||||
|
DETAIL: drop cascades to table cte_failure.users_table
|
||||||
|
drop cascades to table cte_failure.events_table
|
||||||
|
drop cascades to table cte_failure.users_table_local
|
|
@ -0,0 +1,692 @@
|
||||||
|
--
|
||||||
|
-- failure_multi_shard_update_delete
|
||||||
|
--
|
||||||
|
CREATE SCHEMA IF NOT EXISTS multi_shard;
|
||||||
|
SET SEARCH_PATH = multi_shard;
|
||||||
|
SET citus.shard_count TO 4;
|
||||||
|
SET citus.next_shard_id TO 201000;
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE t1(a int PRIMARY KEY, b int, c int);
|
||||||
|
CREATE TABLE r1(a int, b int PRIMARY KEY);
|
||||||
|
CREATE TABLE t2(a int REFERENCES t1(a) ON DELETE CASCADE, b int REFERENCES r1(b) ON DELETE CASCADE, c int);
|
||||||
|
SELECT create_distributed_table('t1', 'a');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT create_reference_table('r1');
|
||||||
|
create_reference_table
|
||||||
|
------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT create_distributed_table('t2', 'a');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- insert some data
|
||||||
|
INSERT INTO r1 VALUES (1, 1), (2, 2), (3, 3);
|
||||||
|
INSERT INTO t1 VALUES (1, 1, 1), (2, 2, 2), (3, 3, 3);
|
||||||
|
INSERT INTO t2 VALUES (1, 1, 1), (1, 2, 1), (2, 1, 2), (2, 2, 4), (3, 1, 3), (3, 2, 3), (3, 3, 3);
|
||||||
|
SELECT pg_backend_pid() as pid \gset
|
||||||
|
SELECT count(*) FROM t2;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
7
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SHOW citus.multi_shard_commit_protocol ;
|
||||||
|
citus.multi_shard_commit_protocol
|
||||||
|
-----------------------------------
|
||||||
|
2pc
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- DELETION TESTS
|
||||||
|
-- delete using a filter on non-partition column filter
|
||||||
|
-- test both kill and cancellation
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM").kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- issue a multi shard delete
|
||||||
|
DELETE FROM t2 WHERE b = 2;
|
||||||
|
ERROR: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: while executing command on localhost:9060
|
||||||
|
-- verify nothing is deleted
|
||||||
|
SELECT count(*) FROM t2;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
7
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- kill just one connection
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM multi_shard.t2_201005").kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DELETE FROM t2 WHERE b = 2;
|
||||||
|
ERROR: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: while executing command on localhost:9060
|
||||||
|
-- verify nothing is deleted
|
||||||
|
SELECT count(*) FROM t2;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
7
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- cancellation
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM").cancel(' || :pid || ')');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- issue a multi shard delete
|
||||||
|
DELETE FROM t2 WHERE b = 2;
|
||||||
|
ERROR: canceling statement due to user request
|
||||||
|
-- verify nothing is deleted
|
||||||
|
SELECT count(*) FROM t2;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
7
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- cancel just one connection
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM multi_shard.t2_201005").cancel(' || :pid || ')');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DELETE FROM t2 WHERE b = 2;
|
||||||
|
ERROR: canceling statement due to user request
|
||||||
|
-- verify nothing is deleted
|
||||||
|
SELECT count(*) FROM t2;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
7
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- UPDATE TESTS
|
||||||
|
-- update non-partition column based on a filter on another non-partition column
|
||||||
|
-- DELETION TESTS
|
||||||
|
-- delete using a filter on non-partition column filter
|
||||||
|
-- test both kill and cancellation
|
||||||
|
SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2;
|
||||||
|
b2 | c4
|
||||||
|
----+----
|
||||||
|
3 | 1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- issue a multi shard update
|
||||||
|
UPDATE t2 SET c = 4 WHERE b = 2;
|
||||||
|
ERROR: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: while executing command on localhost:9060
|
||||||
|
-- verify nothing is updated
|
||||||
|
SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2;
|
||||||
|
b2 | c4
|
||||||
|
----+----
|
||||||
|
3 | 1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- kill just one connection
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE multi_shard.t2_201005").kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
UPDATE t2 SET c = 4 WHERE b = 2;
|
||||||
|
ERROR: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: while executing command on localhost:9060
|
||||||
|
-- verify nothing is updated
|
||||||
|
SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2;
|
||||||
|
b2 | c4
|
||||||
|
----+----
|
||||||
|
3 | 1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- cancellation
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").cancel(' || :pid || ')');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- issue a multi shard update
|
||||||
|
UPDATE t2 SET c = 4 WHERE b = 2;
|
||||||
|
ERROR: canceling statement due to user request
|
||||||
|
-- verify nothing is updated
|
||||||
|
SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2;
|
||||||
|
b2 | c4
|
||||||
|
----+----
|
||||||
|
3 | 1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- cancel just one connection
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE multi_shard.t2_201005").cancel(' || :pid || ')');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
UPDATE t2 SET c = 4 WHERE b = 2;
|
||||||
|
ERROR: canceling statement due to user request
|
||||||
|
-- verify nothing is updated
|
||||||
|
SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2;
|
||||||
|
b2 | c4
|
||||||
|
----+----
|
||||||
|
3 | 1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- switch to 1PC
|
||||||
|
SET citus.multi_shard_commit_protocol TO '1PC';
|
||||||
|
-- DELETION TESTS
|
||||||
|
-- delete using a filter on non-partition column filter
|
||||||
|
-- test both kill and cancellation
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM").kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- issue a multi shard delete
|
||||||
|
DELETE FROM t2 WHERE b = 2;
|
||||||
|
ERROR: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: while executing command on localhost:9060
|
||||||
|
-- verify nothing is deleted
|
||||||
|
SELECT count(*) FROM t2;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
7
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- kill just one connection
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM multi_shard.t2_201005").kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DELETE FROM t2 WHERE b = 2;
|
||||||
|
ERROR: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: while executing command on localhost:9060
|
||||||
|
-- verify nothing is deleted
|
||||||
|
SELECT count(*) FROM t2;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
7
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- cancellation
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM").cancel(' || :pid || ')');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- issue a multi shard delete
|
||||||
|
DELETE FROM t2 WHERE b = 2;
|
||||||
|
ERROR: canceling statement due to user request
|
||||||
|
-- verify nothing is deleted
|
||||||
|
SELECT count(*) FROM t2;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
7
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- cancel just one connection
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM multi_shard.t2_201005").cancel(' || :pid || ')');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DELETE FROM t2 WHERE b = 2;
|
||||||
|
ERROR: canceling statement due to user request
|
||||||
|
-- verify nothing is deleted
|
||||||
|
SELECT count(*) FROM t2;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
7
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- UPDATE TESTS
|
||||||
|
-- update non-partition column based on a filter on another non-partition column
|
||||||
|
-- DELETION TESTS
|
||||||
|
-- delete using a filter on non-partition column filter
|
||||||
|
-- test both kill and cancellation
|
||||||
|
SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2;
|
||||||
|
b2 | c4
|
||||||
|
----+----
|
||||||
|
3 | 1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- issue a multi shard update
|
||||||
|
UPDATE t2 SET c = 4 WHERE b = 2;
|
||||||
|
ERROR: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: while executing command on localhost:9060
|
||||||
|
-- verify nothing is updated
|
||||||
|
SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2;
|
||||||
|
b2 | c4
|
||||||
|
----+----
|
||||||
|
3 | 1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- kill just one connection
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE multi_shard.t2_201005").kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
UPDATE t2 SET c = 4 WHERE b = 2;
|
||||||
|
ERROR: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: while executing command on localhost:9060
|
||||||
|
-- verify nothing is updated
|
||||||
|
SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2;
|
||||||
|
b2 | c4
|
||||||
|
----+----
|
||||||
|
3 | 1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- cancellation
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").cancel(' || :pid || ')');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- issue a multi shard update
|
||||||
|
UPDATE t2 SET c = 4 WHERE b = 2;
|
||||||
|
ERROR: canceling statement due to user request
|
||||||
|
-- verify nothing is updated
|
||||||
|
SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2;
|
||||||
|
b2 | c4
|
||||||
|
----+----
|
||||||
|
3 | 1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- cancel just one connection
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE multi_shard.t2_201005").cancel(' || :pid || ')');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
UPDATE t2 SET c = 4 WHERE b = 2;
|
||||||
|
ERROR: canceling statement due to user request
|
||||||
|
-- verify nothing is updated
|
||||||
|
SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2;
|
||||||
|
b2 | c4
|
||||||
|
----+----
|
||||||
|
3 | 1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
RESET citus.multi_shard_commit_protocol;
|
||||||
|
--
|
||||||
|
-- fail when cascading deletes from foreign key
|
||||||
|
-- unfortunately cascading deletes from foreign keys
|
||||||
|
-- are done inside the worker only and do not
|
||||||
|
-- generate any network output
|
||||||
|
-- therefore we can't just fail cascade part
|
||||||
|
-- following tests are added for completeness purposes
|
||||||
|
-- it is safe to remove them without reducing any
|
||||||
|
-- test coverage
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- check counts before delete
|
||||||
|
SELECT count(*) FILTER (WHERE b = 2) AS b2 FROM t2;
|
||||||
|
b2
|
||||||
|
----
|
||||||
|
3
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM").kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DELETE FROM r1 WHERE a = 2;
|
||||||
|
ERROR: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: while executing command on localhost:9060
|
||||||
|
-- verify nothing is deleted
|
||||||
|
SELECT count(*) FILTER (WHERE b = 2) AS b2 FROM t2;
|
||||||
|
b2
|
||||||
|
----
|
||||||
|
3
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM").kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DELETE FROM t2 WHERE b = 2;
|
||||||
|
ERROR: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: while executing command on localhost:9060
|
||||||
|
-- verify nothing is deleted
|
||||||
|
SELECT count(*) FILTER (WHERE b = 2) AS b2 FROM t2;
|
||||||
|
b2
|
||||||
|
----
|
||||||
|
3
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- test update with subquery pull
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE t3 AS SELECT * FROM t2;
|
||||||
|
SELECT create_distributed_table('t3', 'a');
|
||||||
|
NOTICE: Copying data from local table...
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM t3 ORDER BY 1, 2, 3;
|
||||||
|
a | b | c
|
||||||
|
---+---+---
|
||||||
|
1 | 1 | 1
|
||||||
|
1 | 2 | 1
|
||||||
|
2 | 1 | 2
|
||||||
|
2 | 2 | 4
|
||||||
|
3 | 1 | 3
|
||||||
|
3 | 2 | 3
|
||||||
|
3 | 3 | 3
|
||||||
|
(7 rows)
|
||||||
|
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
UPDATE t3 SET c = q.c FROM (
|
||||||
|
SELECT b, max(c) as c FROM t2 GROUP BY b) q
|
||||||
|
WHERE t3.b = q.b
|
||||||
|
RETURNING *;
|
||||||
|
ERROR: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: while executing command on localhost:9060
|
||||||
|
--- verify nothing is updated
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM t3 ORDER BY 1, 2, 3;
|
||||||
|
a | b | c
|
||||||
|
---+---+---
|
||||||
|
1 | 1 | 1
|
||||||
|
1 | 2 | 1
|
||||||
|
2 | 1 | 2
|
||||||
|
2 | 2 | 4
|
||||||
|
3 | 1 | 3
|
||||||
|
3 | 2 | 3
|
||||||
|
3 | 3 | 3
|
||||||
|
(7 rows)
|
||||||
|
|
||||||
|
-- kill update part
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE multi_shard.t3_201009").kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
UPDATE t3 SET c = q.c FROM (
|
||||||
|
SELECT b, max(c) as c FROM t2 GROUP BY b) q
|
||||||
|
WHERE t3.b = q.b
|
||||||
|
RETURNING *;
|
||||||
|
ERROR: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: while executing command on localhost:9060
|
||||||
|
--- verify nothing is updated
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM t3 ORDER BY 1, 2, 3;
|
||||||
|
a | b | c
|
||||||
|
---+---+---
|
||||||
|
1 | 1 | 1
|
||||||
|
1 | 2 | 1
|
||||||
|
2 | 1 | 2
|
||||||
|
2 | 2 | 4
|
||||||
|
3 | 1 | 3
|
||||||
|
3 | 2 | 3
|
||||||
|
3 | 3 | 3
|
||||||
|
(7 rows)
|
||||||
|
|
||||||
|
-- test with replication_factor = 2
|
||||||
|
-- table can not have foreign reference with this setting so
|
||||||
|
-- use a different set of table
|
||||||
|
SET citus.shard_replication_factor to 2;
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE t3;
|
||||||
|
CREATE TABLE t3 AS SELECT * FROM t2;
|
||||||
|
SELECT create_distributed_table('t3', 'a');
|
||||||
|
NOTICE: Copying data from local table...
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3;
|
||||||
|
b1 | b2
|
||||||
|
----+----
|
||||||
|
3 | 3
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- prevent update of one replica of one shard
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE multi_shard.t3_201013").kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
UPDATE t3 SET b = 2 WHERE b = 1;
|
||||||
|
ERROR: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: while executing command on localhost:9060
|
||||||
|
-- verify nothing is updated
|
||||||
|
SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3;
|
||||||
|
b1 | b2
|
||||||
|
----+----
|
||||||
|
3 | 3
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- fail only one update verify transaction is rolled back correctly
|
||||||
|
BEGIN;
|
||||||
|
SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t2;
|
||||||
|
b1 | b2
|
||||||
|
----+----
|
||||||
|
3 | 3
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3;
|
||||||
|
b1 | b2
|
||||||
|
----+----
|
||||||
|
3 | 3
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
UPDATE t2 SET b = 2 WHERE b = 1;
|
||||||
|
-- verify update is performed on t2
|
||||||
|
SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t2;
|
||||||
|
b1 | b2
|
||||||
|
----+----
|
||||||
|
0 | 6
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- following will fail
|
||||||
|
UPDATE t3 SET b = 2 WHERE b = 1;
|
||||||
|
ERROR: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: while executing command on localhost:9060
|
||||||
|
END;
|
||||||
|
-- verify everything is rolled back
|
||||||
|
SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t2;
|
||||||
|
b1 | b2
|
||||||
|
----+----
|
||||||
|
3 | 3
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3;
|
||||||
|
b1 | b2
|
||||||
|
----+----
|
||||||
|
3 | 3
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
UPDATE t3 SET b = 1 WHERE b = 2 RETURNING *;
|
||||||
|
ERROR: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: while executing command on localhost:9060
|
||||||
|
-- verify nothing is updated
|
||||||
|
SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3;
|
||||||
|
b1 | b2
|
||||||
|
----+----
|
||||||
|
3 | 3
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- switch to 1PC
|
||||||
|
SET citus.multi_shard_commit_protocol TO '1PC';
|
||||||
|
SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3;
|
||||||
|
b1 | b2
|
||||||
|
----+----
|
||||||
|
3 | 3
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
UPDATE t3 SET b = 2 WHERE b = 1;
|
||||||
|
ERROR: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: while executing command on localhost:9060
|
||||||
|
-- verify nothing is updated
|
||||||
|
SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3;
|
||||||
|
b1 | b2
|
||||||
|
----+----
|
||||||
|
3 | 3
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- fail only one update verify transaction is rolled back correctly
|
||||||
|
BEGIN;
|
||||||
|
SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t2;
|
||||||
|
b1 | b2
|
||||||
|
----+----
|
||||||
|
3 | 3
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3;
|
||||||
|
b1 | b2
|
||||||
|
----+----
|
||||||
|
3 | 3
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
UPDATE t2 SET b = 2 WHERE b = 1;
|
||||||
|
-- verify update is performed on t2
|
||||||
|
SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t2;
|
||||||
|
b1 | b2
|
||||||
|
----+----
|
||||||
|
0 | 6
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- following will fail
|
||||||
|
UPDATE t3 SET b = 2 WHERE b = 1;
|
||||||
|
ERROR: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: while executing command on localhost:9060
|
||||||
|
END;
|
||||||
|
-- verify everything is rolled back
|
||||||
|
SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t2;
|
||||||
|
b1 | b2
|
||||||
|
----+----
|
||||||
|
3 | 3
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3;
|
||||||
|
b1 | b2
|
||||||
|
----+----
|
||||||
|
3 | 3
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
RESET SEARCH_PATH;
|
||||||
|
DROP SCHEMA multi_shard CASCADE;
|
||||||
|
NOTICE: drop cascades to 4 other objects
|
||||||
|
DETAIL: drop cascades to table multi_shard.t1
|
||||||
|
drop cascades to table multi_shard.r1
|
||||||
|
drop cascades to table multi_shard.t2
|
||||||
|
drop cascades to table multi_shard.t3
|
|
@ -17,3 +17,5 @@ test: failure_create_table
|
||||||
|
|
||||||
test: failure_1pc_copy_hash
|
test: failure_1pc_copy_hash
|
||||||
test: failure_1pc_copy_append
|
test: failure_1pc_copy_append
|
||||||
|
test: failure_multi_shard_update_delete
|
||||||
|
test: failure_cte_subquery
|
||||||
|
|
|
@ -0,0 +1,237 @@
|
||||||
|
|
||||||
|
CREATE SCHEMA cte_failure;
|
||||||
|
SET SEARCH_PATH=cte_failure;
|
||||||
|
SET citus.shard_count to 2;
|
||||||
|
SET citus.shard_replication_factor to 1;
|
||||||
|
|
||||||
|
SELECT pg_backend_pid() as pid \gset
|
||||||
|
|
||||||
|
CREATE TABLE users_table (user_id int, user_name text);
|
||||||
|
CREATE TABLE events_table(user_id int, event_id int, event_type int);
|
||||||
|
SELECT create_distributed_table('users_table', 'user_id');
|
||||||
|
SELECT create_distributed_table('events_table', 'user_id');
|
||||||
|
CREATE TABLE users_table_local AS SELECT * FROM users_table;
|
||||||
|
|
||||||
|
-- kill at the first copy (push)
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()');
|
||||||
|
|
||||||
|
WITH cte AS (
|
||||||
|
WITH local_cte AS (
|
||||||
|
SELECT * FROM users_table_local
|
||||||
|
),
|
||||||
|
dist_cte AS (
|
||||||
|
SELECT user_id FROM events_table
|
||||||
|
)
|
||||||
|
SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id
|
||||||
|
)
|
||||||
|
SELECT
|
||||||
|
count(*)
|
||||||
|
FROM
|
||||||
|
cte,
|
||||||
|
(SELECT
|
||||||
|
DISTINCT users_table.user_id
|
||||||
|
FROM
|
||||||
|
users_table, events_table
|
||||||
|
WHERE
|
||||||
|
users_table.user_id = events_table.user_id AND
|
||||||
|
event_type IN (1,2,3,4)
|
||||||
|
ORDER BY 1 DESC LIMIT 5
|
||||||
|
) as foo
|
||||||
|
WHERE foo.user_id = cte.user_id;
|
||||||
|
|
||||||
|
-- kill at the second copy (pull)
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="SELECT user_id FROM cte_failure.events_table_102250").kill()');
|
||||||
|
|
||||||
|
WITH cte AS (
|
||||||
|
WITH local_cte AS (
|
||||||
|
SELECT * FROM users_table_local
|
||||||
|
),
|
||||||
|
dist_cte AS (
|
||||||
|
SELECT user_id FROM events_table
|
||||||
|
)
|
||||||
|
SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id
|
||||||
|
)
|
||||||
|
SELECT
|
||||||
|
count(*)
|
||||||
|
FROM
|
||||||
|
cte,
|
||||||
|
(SELECT
|
||||||
|
DISTINCT users_table.user_id
|
||||||
|
FROM
|
||||||
|
users_table, events_table
|
||||||
|
WHERE
|
||||||
|
users_table.user_id = events_table.user_id AND
|
||||||
|
event_type IN (1,2,3,4)
|
||||||
|
ORDER BY 1 DESC LIMIT 5
|
||||||
|
) as foo
|
||||||
|
WHERE foo.user_id = cte.user_id;
|
||||||
|
|
||||||
|
-- kill at the third copy (pull)
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="SELECT DISTINCT users_table.user").kill()');
|
||||||
|
|
||||||
|
WITH cte AS (
|
||||||
|
WITH local_cte AS (
|
||||||
|
SELECT * FROM users_table_local
|
||||||
|
),
|
||||||
|
dist_cte AS (
|
||||||
|
SELECT user_id FROM events_table
|
||||||
|
)
|
||||||
|
SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id
|
||||||
|
)
|
||||||
|
SELECT
|
||||||
|
count(*)
|
||||||
|
FROM
|
||||||
|
cte,
|
||||||
|
(SELECT
|
||||||
|
DISTINCT users_table.user_id
|
||||||
|
FROM
|
||||||
|
users_table, events_table
|
||||||
|
WHERE
|
||||||
|
users_table.user_id = events_table.user_id AND
|
||||||
|
event_type IN (1,2,3,4)
|
||||||
|
ORDER BY 1 DESC LIMIT 5
|
||||||
|
) as foo
|
||||||
|
WHERE foo.user_id = cte.user_id;
|
||||||
|
|
||||||
|
-- cancel at the first copy (push)
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || :pid || ')');
|
||||||
|
|
||||||
|
WITH cte AS (
|
||||||
|
WITH local_cte AS (
|
||||||
|
SELECT * FROM users_table_local
|
||||||
|
),
|
||||||
|
dist_cte AS (
|
||||||
|
SELECT user_id FROM events_table
|
||||||
|
)
|
||||||
|
SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id
|
||||||
|
)
|
||||||
|
SELECT
|
||||||
|
count(*)
|
||||||
|
FROM
|
||||||
|
cte,
|
||||||
|
(SELECT
|
||||||
|
DISTINCT users_table.user_id
|
||||||
|
FROM
|
||||||
|
users_table, events_table
|
||||||
|
WHERE
|
||||||
|
users_table.user_id = events_table.user_id AND
|
||||||
|
event_type IN (1,2,3,4)
|
||||||
|
ORDER BY 1 DESC LIMIT 5
|
||||||
|
) as foo
|
||||||
|
WHERE foo.user_id = cte.user_id;
|
||||||
|
|
||||||
|
-- cancel at the second copy (pull)
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="SELECT user_id FROM").cancel(' || :pid || ')');
|
||||||
|
|
||||||
|
WITH cte AS (
|
||||||
|
WITH local_cte AS (
|
||||||
|
SELECT * FROM users_table_local
|
||||||
|
),
|
||||||
|
dist_cte AS (
|
||||||
|
SELECT user_id FROM events_table
|
||||||
|
)
|
||||||
|
SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id
|
||||||
|
)
|
||||||
|
SELECT
|
||||||
|
count(*)
|
||||||
|
FROM
|
||||||
|
cte,
|
||||||
|
(SELECT
|
||||||
|
DISTINCT users_table.user_id
|
||||||
|
FROM
|
||||||
|
users_table, events_table
|
||||||
|
WHERE
|
||||||
|
users_table.user_id = events_table.user_id AND
|
||||||
|
event_type IN (1,2,3,4)
|
||||||
|
ORDER BY 1 DESC LIMIT 5
|
||||||
|
) as foo
|
||||||
|
WHERE foo.user_id = cte.user_id;
|
||||||
|
|
||||||
|
-- cancel at the third copy (pull)
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="SELECT DISTINCT users_table.user").cancel(' || :pid || ')');
|
||||||
|
|
||||||
|
WITH cte AS (
|
||||||
|
WITH local_cte AS (
|
||||||
|
SELECT * FROM users_table_local
|
||||||
|
),
|
||||||
|
dist_cte AS (
|
||||||
|
SELECT user_id FROM events_table
|
||||||
|
)
|
||||||
|
SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id
|
||||||
|
)
|
||||||
|
SELECT
|
||||||
|
count(*)
|
||||||
|
FROM
|
||||||
|
cte,
|
||||||
|
(SELECT
|
||||||
|
DISTINCT users_table.user_id
|
||||||
|
FROM
|
||||||
|
users_table, events_table
|
||||||
|
WHERE
|
||||||
|
users_table.user_id = events_table.user_id AND
|
||||||
|
event_type IN (1,2,3,4)
|
||||||
|
ORDER BY 1 DESC LIMIT 5
|
||||||
|
) as foo
|
||||||
|
WHERE foo.user_id = cte.user_id;
|
||||||
|
|
||||||
|
-- distributed update tests
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
|
||||||
|
-- insert some rows
|
||||||
|
INSERT INTO users_table VALUES (1, 'A'), (2, 'B'), (3, 'C'), (4, 'D'), (5, 'E');
|
||||||
|
INSERT INTO events_table VALUES (1,1,1), (1,2,1), (1,3,1), (2,1, 4), (3, 4,1), (5, 1, 2), (5, 2, 1), (5, 2,2);
|
||||||
|
|
||||||
|
SELECT * FROM users_table ORDER BY 1, 2;
|
||||||
|
-- following will delete and insert the same rows
|
||||||
|
WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *)
|
||||||
|
INSERT INTO users_table SELECT * FROM cte_delete;
|
||||||
|
-- verify contents are the same
|
||||||
|
SELECT * FROM users_table ORDER BY 1, 2;
|
||||||
|
|
||||||
|
-- kill connection during deletion
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM").kill()');
|
||||||
|
WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *)
|
||||||
|
INSERT INTO users_table SELECT * FROM cte_delete;
|
||||||
|
|
||||||
|
-- verify contents are the same
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
SELECT * FROM users_table ORDER BY 1, 2;
|
||||||
|
|
||||||
|
-- kill connection during insert
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()');
|
||||||
|
WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *)
|
||||||
|
INSERT INTO users_table SELECT * FROM cte_delete;
|
||||||
|
|
||||||
|
-- verify contents are the same
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
SELECT * FROM users_table ORDER BY 1, 2;
|
||||||
|
|
||||||
|
-- cancel during deletion
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM").cancel(' || :pid || ')');
|
||||||
|
WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *)
|
||||||
|
INSERT INTO users_table SELECT * FROM cte_delete;
|
||||||
|
|
||||||
|
-- verify contents are the same
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
SELECT * FROM users_table ORDER BY 1, 2;
|
||||||
|
|
||||||
|
-- cancel during insert
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || :pid || ')');
|
||||||
|
WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *)
|
||||||
|
INSERT INTO users_table SELECT * FROM cte_delete;
|
||||||
|
|
||||||
|
-- verify contents are the same
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
SELECT * FROM users_table ORDER BY 1, 2;
|
||||||
|
|
||||||
|
-- test sequential delete/insert
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM").kill()');
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.multi_shard_modify_mode = 'sequential';
|
||||||
|
WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *)
|
||||||
|
INSERT INTO users_table SELECT * FROM cte_delete;
|
||||||
|
END;
|
||||||
|
|
||||||
|
RESET SEARCH_PATH;
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
DROP SCHEMA cte_failure CASCADE;
|
|
@ -0,0 +1,299 @@
|
||||||
|
--
|
||||||
|
-- failure_multi_shard_update_delete
|
||||||
|
--
|
||||||
|
|
||||||
|
CREATE SCHEMA IF NOT EXISTS multi_shard;
|
||||||
|
SET SEARCH_PATH = multi_shard;
|
||||||
|
SET citus.shard_count TO 4;
|
||||||
|
SET citus.next_shard_id TO 201000;
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
|
||||||
|
CREATE TABLE t1(a int PRIMARY KEY, b int, c int);
|
||||||
|
CREATE TABLE r1(a int, b int PRIMARY KEY);
|
||||||
|
CREATE TABLE t2(a int REFERENCES t1(a) ON DELETE CASCADE, b int REFERENCES r1(b) ON DELETE CASCADE, c int);
|
||||||
|
|
||||||
|
SELECT create_distributed_table('t1', 'a');
|
||||||
|
SELECT create_reference_table('r1');
|
||||||
|
SELECT create_distributed_table('t2', 'a');
|
||||||
|
|
||||||
|
-- insert some data
|
||||||
|
INSERT INTO r1 VALUES (1, 1), (2, 2), (3, 3);
|
||||||
|
INSERT INTO t1 VALUES (1, 1, 1), (2, 2, 2), (3, 3, 3);
|
||||||
|
INSERT INTO t2 VALUES (1, 1, 1), (1, 2, 1), (2, 1, 2), (2, 2, 4), (3, 1, 3), (3, 2, 3), (3, 3, 3);
|
||||||
|
|
||||||
|
SELECT pg_backend_pid() as pid \gset
|
||||||
|
SELECT count(*) FROM t2;
|
||||||
|
|
||||||
|
SHOW citus.multi_shard_commit_protocol ;
|
||||||
|
|
||||||
|
-- DELETION TESTS
|
||||||
|
-- delete using a filter on non-partition column filter
|
||||||
|
-- test both kill and cancellation
|
||||||
|
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM").kill()');
|
||||||
|
-- issue a multi shard delete
|
||||||
|
DELETE FROM t2 WHERE b = 2;
|
||||||
|
|
||||||
|
-- verify nothing is deleted
|
||||||
|
SELECT count(*) FROM t2;
|
||||||
|
|
||||||
|
-- kill just one connection
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM multi_shard.t2_201005").kill()');
|
||||||
|
DELETE FROM t2 WHERE b = 2;
|
||||||
|
|
||||||
|
-- verify nothing is deleted
|
||||||
|
SELECT count(*) FROM t2;
|
||||||
|
|
||||||
|
-- cancellation
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM").cancel(' || :pid || ')');
|
||||||
|
-- issue a multi shard delete
|
||||||
|
DELETE FROM t2 WHERE b = 2;
|
||||||
|
|
||||||
|
-- verify nothing is deleted
|
||||||
|
SELECT count(*) FROM t2;
|
||||||
|
|
||||||
|
-- cancel just one connection
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM multi_shard.t2_201005").cancel(' || :pid || ')');
|
||||||
|
DELETE FROM t2 WHERE b = 2;
|
||||||
|
|
||||||
|
-- verify nothing is deleted
|
||||||
|
SELECT count(*) FROM t2;
|
||||||
|
|
||||||
|
-- UPDATE TESTS
|
||||||
|
-- update non-partition column based on a filter on another non-partition column
|
||||||
|
-- DELETION TESTS
|
||||||
|
-- delete using a filter on non-partition column filter
|
||||||
|
-- test both kill and cancellation
|
||||||
|
|
||||||
|
|
||||||
|
SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2;
|
||||||
|
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()');
|
||||||
|
-- issue a multi shard update
|
||||||
|
UPDATE t2 SET c = 4 WHERE b = 2;
|
||||||
|
|
||||||
|
-- verify nothing is updated
|
||||||
|
SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2;
|
||||||
|
|
||||||
|
-- kill just one connection
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE multi_shard.t2_201005").kill()');
|
||||||
|
UPDATE t2 SET c = 4 WHERE b = 2;
|
||||||
|
|
||||||
|
-- verify nothing is updated
|
||||||
|
SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2;
|
||||||
|
|
||||||
|
-- cancellation
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").cancel(' || :pid || ')');
|
||||||
|
-- issue a multi shard update
|
||||||
|
UPDATE t2 SET c = 4 WHERE b = 2;
|
||||||
|
|
||||||
|
-- verify nothing is updated
|
||||||
|
SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2;
|
||||||
|
|
||||||
|
-- cancel just one connection
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE multi_shard.t2_201005").cancel(' || :pid || ')');
|
||||||
|
UPDATE t2 SET c = 4 WHERE b = 2;
|
||||||
|
|
||||||
|
-- verify nothing is updated
|
||||||
|
SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2;
|
||||||
|
|
||||||
|
-- switch to 1PC
|
||||||
|
SET citus.multi_shard_commit_protocol TO '1PC';
|
||||||
|
|
||||||
|
-- DELETION TESTS
|
||||||
|
-- delete using a filter on non-partition column filter
|
||||||
|
-- test both kill and cancellation
|
||||||
|
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM").kill()');
|
||||||
|
-- issue a multi shard delete
|
||||||
|
DELETE FROM t2 WHERE b = 2;
|
||||||
|
|
||||||
|
-- verify nothing is deleted
|
||||||
|
SELECT count(*) FROM t2;
|
||||||
|
|
||||||
|
-- kill just one connection
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM multi_shard.t2_201005").kill()');
|
||||||
|
DELETE FROM t2 WHERE b = 2;
|
||||||
|
|
||||||
|
-- verify nothing is deleted
|
||||||
|
SELECT count(*) FROM t2;
|
||||||
|
|
||||||
|
-- cancellation
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM").cancel(' || :pid || ')');
|
||||||
|
-- issue a multi shard delete
|
||||||
|
DELETE FROM t2 WHERE b = 2;
|
||||||
|
|
||||||
|
-- verify nothing is deleted
|
||||||
|
SELECT count(*) FROM t2;
|
||||||
|
|
||||||
|
-- cancel just one connection
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM multi_shard.t2_201005").cancel(' || :pid || ')');
|
||||||
|
DELETE FROM t2 WHERE b = 2;
|
||||||
|
|
||||||
|
-- verify nothing is deleted
|
||||||
|
SELECT count(*) FROM t2;
|
||||||
|
|
||||||
|
-- UPDATE TESTS
|
||||||
|
-- update non-partition column based on a filter on another non-partition column
|
||||||
|
-- DELETION TESTS
|
||||||
|
-- delete using a filter on non-partition column filter
|
||||||
|
-- test both kill and cancellation
|
||||||
|
|
||||||
|
|
||||||
|
SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2;
|
||||||
|
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()');
|
||||||
|
-- issue a multi shard update
|
||||||
|
UPDATE t2 SET c = 4 WHERE b = 2;
|
||||||
|
|
||||||
|
-- verify nothing is updated
|
||||||
|
SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2;
|
||||||
|
|
||||||
|
-- kill just one connection
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE multi_shard.t2_201005").kill()');
|
||||||
|
UPDATE t2 SET c = 4 WHERE b = 2;
|
||||||
|
|
||||||
|
-- verify nothing is updated
|
||||||
|
SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2;
|
||||||
|
|
||||||
|
-- cancellation
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").cancel(' || :pid || ')');
|
||||||
|
-- issue a multi shard update
|
||||||
|
UPDATE t2 SET c = 4 WHERE b = 2;
|
||||||
|
|
||||||
|
-- verify nothing is updated
|
||||||
|
SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2;
|
||||||
|
|
||||||
|
-- cancel just one connection
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE multi_shard.t2_201005").cancel(' || :pid || ')');
|
||||||
|
UPDATE t2 SET c = 4 WHERE b = 2;
|
||||||
|
|
||||||
|
-- verify nothing is updated
|
||||||
|
SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2;
|
||||||
|
|
||||||
|
RESET citus.multi_shard_commit_protocol;
|
||||||
|
|
||||||
|
--
|
||||||
|
-- fail when cascading deletes from foreign key
|
||||||
|
-- unfortunately cascading deletes from foreign keys
|
||||||
|
-- are done inside the worker only and do not
|
||||||
|
-- generate any network output
|
||||||
|
-- therefore we can't just fail cascade part
|
||||||
|
-- following tests are added for completeness purposes
|
||||||
|
-- it is safe to remove them without reducing any
|
||||||
|
-- test coverage
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
-- check counts before delete
|
||||||
|
SELECT count(*) FILTER (WHERE b = 2) AS b2 FROM t2;
|
||||||
|
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM").kill()');
|
||||||
|
DELETE FROM r1 WHERE a = 2;
|
||||||
|
|
||||||
|
-- verify nothing is deleted
|
||||||
|
SELECT count(*) FILTER (WHERE b = 2) AS b2 FROM t2;
|
||||||
|
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM").kill()');
|
||||||
|
|
||||||
|
DELETE FROM t2 WHERE b = 2;
|
||||||
|
|
||||||
|
-- verify nothing is deleted
|
||||||
|
SELECT count(*) FILTER (WHERE b = 2) AS b2 FROM t2;
|
||||||
|
|
||||||
|
-- test update with subquery pull
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
CREATE TABLE t3 AS SELECT * FROM t2;
|
||||||
|
SELECT create_distributed_table('t3', 'a');
|
||||||
|
SELECT * FROM t3 ORDER BY 1, 2, 3;
|
||||||
|
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()');
|
||||||
|
|
||||||
|
UPDATE t3 SET c = q.c FROM (
|
||||||
|
SELECT b, max(c) as c FROM t2 GROUP BY b) q
|
||||||
|
WHERE t3.b = q.b
|
||||||
|
RETURNING *;
|
||||||
|
|
||||||
|
--- verify nothing is updated
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
SELECT * FROM t3 ORDER BY 1, 2, 3;
|
||||||
|
|
||||||
|
-- kill update part
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE multi_shard.t3_201009").kill()');
|
||||||
|
UPDATE t3 SET c = q.c FROM (
|
||||||
|
SELECT b, max(c) as c FROM t2 GROUP BY b) q
|
||||||
|
WHERE t3.b = q.b
|
||||||
|
RETURNING *;
|
||||||
|
|
||||||
|
--- verify nothing is updated
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
SELECT * FROM t3 ORDER BY 1, 2, 3;
|
||||||
|
|
||||||
|
-- test with replication_factor = 2
|
||||||
|
-- table can not have foreign reference with this setting so
|
||||||
|
-- use a different set of table
|
||||||
|
|
||||||
|
SET citus.shard_replication_factor to 2;
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
DROP TABLE t3;
|
||||||
|
CREATE TABLE t3 AS SELECT * FROM t2;
|
||||||
|
SELECT create_distributed_table('t3', 'a');
|
||||||
|
SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3;
|
||||||
|
-- prevent update of one replica of one shard
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE multi_shard.t3_201013").kill()');
|
||||||
|
|
||||||
|
UPDATE t3 SET b = 2 WHERE b = 1;
|
||||||
|
|
||||||
|
-- verify nothing is updated
|
||||||
|
SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3;
|
||||||
|
|
||||||
|
-- fail only one update verify transaction is rolled back correctly
|
||||||
|
BEGIN;
|
||||||
|
SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t2;
|
||||||
|
SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3;
|
||||||
|
|
||||||
|
UPDATE t2 SET b = 2 WHERE b = 1;
|
||||||
|
-- verify update is performed on t2
|
||||||
|
SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t2;
|
||||||
|
-- following will fail
|
||||||
|
UPDATE t3 SET b = 2 WHERE b = 1;
|
||||||
|
END;
|
||||||
|
|
||||||
|
-- verify everything is rolled back
|
||||||
|
SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t2;
|
||||||
|
SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3;
|
||||||
|
|
||||||
|
UPDATE t3 SET b = 1 WHERE b = 2 RETURNING *;
|
||||||
|
|
||||||
|
-- verify nothing is updated
|
||||||
|
SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3;
|
||||||
|
|
||||||
|
-- switch to 1PC
|
||||||
|
SET citus.multi_shard_commit_protocol TO '1PC';
|
||||||
|
|
||||||
|
SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3;
|
||||||
|
|
||||||
|
UPDATE t3 SET b = 2 WHERE b = 1;
|
||||||
|
|
||||||
|
-- verify nothing is updated
|
||||||
|
SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3;
|
||||||
|
|
||||||
|
-- fail only one update verify transaction is rolled back correctly
|
||||||
|
BEGIN;
|
||||||
|
SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t2;
|
||||||
|
SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3;
|
||||||
|
|
||||||
|
UPDATE t2 SET b = 2 WHERE b = 1;
|
||||||
|
-- verify update is performed on t2
|
||||||
|
SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t2;
|
||||||
|
-- following will fail
|
||||||
|
UPDATE t3 SET b = 2 WHERE b = 1;
|
||||||
|
END;
|
||||||
|
|
||||||
|
-- verify everything is rolled back
|
||||||
|
SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t2;
|
||||||
|
SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3;
|
||||||
|
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
RESET SEARCH_PATH;
|
||||||
|
DROP SCHEMA multi_shard CASCADE;
|
Loading…
Reference in New Issue