diff --git a/src/test/regress/expected/failure_cte_subquery.out b/src/test/regress/expected/failure_cte_subquery.out new file mode 100644 index 000000000..5a8b9737d --- /dev/null +++ b/src/test/regress/expected/failure_cte_subquery.out @@ -0,0 +1,391 @@ +CREATE SCHEMA cte_failure; +SET SEARCH_PATH=cte_failure; +SET citus.shard_count to 2; +SET citus.shard_replication_factor to 1; +SELECT pg_backend_pid() as pid \gset +CREATE TABLE users_table (user_id int, user_name text); +CREATE TABLE events_table(user_id int, event_id int, event_type int); +SELECT create_distributed_table('users_table', 'user_id'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT create_distributed_table('events_table', 'user_id'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE users_table_local AS SELECT * FROM users_table; +-- kill at the first copy (push) +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()'); + mitmproxy +----------- + +(1 row) + +WITH cte AS ( + WITH local_cte AS ( + SELECT * FROM users_table_local + ), + dist_cte AS ( + SELECT user_id FROM events_table + ) + SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id +) +SELECT + count(*) +FROM + cte, + (SELECT + DISTINCT users_table.user_id + FROM + users_table, events_table + WHERE + users_table.user_id = events_table.user_id AND + event_type IN (1,2,3,4) + ORDER BY 1 DESC LIMIT 5 + ) as foo + WHERE foo.user_id = cte.user_id; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +-- kill at the second copy (pull) +SELECT citus.mitmproxy('conn.onQuery(query="SELECT user_id FROM cte_failure.events_table_102250").kill()'); + mitmproxy +----------- + +(1 row) + +WITH cte AS ( + WITH local_cte AS ( + SELECT * FROM users_table_local + ), + dist_cte AS ( + SELECT user_id FROM events_table + ) + SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id +) +SELECT + count(*) +FROM + cte, + (SELECT + DISTINCT users_table.user_id + FROM + users_table, events_table + WHERE + users_table.user_id = events_table.user_id AND + event_type IN (1,2,3,4) + ORDER BY 1 DESC LIMIT 5 + ) as foo + WHERE foo.user_id = cte.user_id; +WARNING: could not consume data from worker node +WARNING: could not consume data from worker node +WARNING: could not consume data from worker node +ERROR: failed to execute task 1 + +-- kill at the third copy (pull) +SELECT citus.mitmproxy('conn.onQuery(query="SELECT DISTINCT users_table.user").kill()'); + mitmproxy +----------- + +(1 row) + +WITH cte AS ( + WITH local_cte AS ( + SELECT * FROM users_table_local + ), + dist_cte AS ( + SELECT user_id FROM events_table + ) + SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id +) +SELECT + count(*) +FROM + cte, + (SELECT + DISTINCT users_table.user_id + FROM + users_table, events_table + WHERE + users_table.user_id = events_table.user_id AND + event_type IN (1,2,3,4) + ORDER BY 1 DESC LIMIT 5 + ) as foo + WHERE foo.user_id = cte.user_id; +WARNING: could not consume data from worker node +WARNING: could not consume data from worker node +ERROR: failed to execute task 1 +-- cancel at the first copy (push) +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || :pid || ')'); + mitmproxy +----------- + +(1 row) + +WITH cte AS ( + WITH local_cte AS ( + SELECT * FROM users_table_local + ), + dist_cte AS ( + SELECT user_id FROM events_table + ) + SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id +) +SELECT + count(*) +FROM + cte, + (SELECT + DISTINCT users_table.user_id + FROM + users_table, events_table + WHERE + users_table.user_id = events_table.user_id AND + event_type IN (1,2,3,4) + ORDER BY 1 DESC LIMIT 5 + ) as foo + WHERE foo.user_id = cte.user_id; +ERROR: canceling statement due to user request +-- cancel at the second copy (pull) +SELECT citus.mitmproxy('conn.onQuery(query="SELECT user_id FROM").cancel(' || :pid || ')'); + mitmproxy +----------- + +(1 row) + +WITH cte AS ( + WITH local_cte AS ( + SELECT * FROM users_table_local + ), + dist_cte AS ( + SELECT user_id FROM events_table + ) + SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id +) +SELECT + count(*) +FROM + cte, + (SELECT + DISTINCT users_table.user_id + FROM + users_table, events_table + WHERE + users_table.user_id = events_table.user_id AND + event_type IN (1,2,3,4) + ORDER BY 1 DESC LIMIT 5 + ) as foo + WHERE foo.user_id = cte.user_id; +ERROR: canceling statement due to user request +-- cancel at the third copy (pull) +SELECT citus.mitmproxy('conn.onQuery(query="SELECT DISTINCT users_table.user").cancel(' || :pid || ')'); + mitmproxy +----------- + +(1 row) + +WITH cte AS ( + WITH local_cte AS ( + SELECT * FROM users_table_local + ), + dist_cte AS ( + SELECT user_id FROM events_table + ) + SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id +) +SELECT + count(*) +FROM + cte, + (SELECT + DISTINCT users_table.user_id + FROM + users_table, events_table + WHERE + users_table.user_id = events_table.user_id AND + event_type IN (1,2,3,4) + ORDER BY 1 DESC LIMIT 5 + ) as foo + WHERE foo.user_id = cte.user_id; +ERROR: canceling statement due to user request +-- distributed update tests +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +-- insert some rows +INSERT INTO users_table VALUES (1, 'A'), (2, 'B'), (3, 'C'), (4, 'D'), (5, 'E'); +INSERT INTO events_table VALUES (1,1,1), (1,2,1), (1,3,1), (2,1, 4), (3, 4,1), (5, 1, 2), (5, 2, 1), (5, 2,2); +SELECT * FROM users_table ORDER BY 1, 2; + user_id | user_name +---------+----------- + 1 | A + 2 | B + 3 | C + 4 | D + 5 | E +(5 rows) + +-- following will delete and insert the same rows +WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *) +INSERT INTO users_table SELECT * FROM cte_delete; +-- verify contents are the same +SELECT * FROM users_table ORDER BY 1, 2; + user_id | user_name +---------+----------- + 1 | A + 2 | B + 3 | C + 4 | D + 5 | E +(5 rows) + +-- kill connection during deletion +SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM").kill()'); + mitmproxy +----------- + +(1 row) + +WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *) +INSERT INTO users_table SELECT * FROM cte_delete; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +-- verify contents are the same +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT * FROM users_table ORDER BY 1, 2; + user_id | user_name +---------+----------- + 1 | A + 2 | B + 3 | C + 4 | D + 5 | E +(5 rows) + +-- kill connection during insert +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()'); + mitmproxy +----------- + +(1 row) + +WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *) +INSERT INTO users_table SELECT * FROM cte_delete; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +-- verify contents are the same +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT * FROM users_table ORDER BY 1, 2; + user_id | user_name +---------+----------- + 1 | A + 2 | B + 3 | C + 4 | D + 5 | E +(5 rows) + +-- cancel during deletion +SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM").cancel(' || :pid || ')'); + mitmproxy +----------- + +(1 row) + +WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *) +INSERT INTO users_table SELECT * FROM cte_delete; +ERROR: canceling statement due to user request +-- verify contents are the same +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT * FROM users_table ORDER BY 1, 2; + user_id | user_name +---------+----------- + 1 | A + 2 | B + 3 | C + 4 | D + 5 | E +(5 rows) + +-- cancel during insert +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || :pid || ')'); + mitmproxy +----------- + +(1 row) + +WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *) +INSERT INTO users_table SELECT * FROM cte_delete; +ERROR: canceling statement due to user request +-- verify contents are the same +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT * FROM users_table ORDER BY 1, 2; + user_id | user_name +---------+----------- + 1 | A + 2 | B + 3 | C + 4 | D + 5 | E +(5 rows) + +-- test sequential delete/insert +SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM").kill()'); + mitmproxy +----------- + +(1 row) + +BEGIN; +SET LOCAL citus.multi_shard_modify_mode = 'sequential'; +WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *) +INSERT INTO users_table SELECT * FROM cte_delete; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +END; +RESET SEARCH_PATH; +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +DROP SCHEMA cte_failure CASCADE; +NOTICE: drop cascades to 3 other objects +DETAIL: drop cascades to table cte_failure.users_table +drop cascades to table cte_failure.events_table +drop cascades to table cte_failure.users_table_local diff --git a/src/test/regress/expected/failure_multi_shard_update_delete.out b/src/test/regress/expected/failure_multi_shard_update_delete.out new file mode 100644 index 000000000..ffcaeca16 --- /dev/null +++ b/src/test/regress/expected/failure_multi_shard_update_delete.out @@ -0,0 +1,692 @@ +-- +-- failure_multi_shard_update_delete +-- +CREATE SCHEMA IF NOT EXISTS multi_shard; +SET SEARCH_PATH = multi_shard; +SET citus.shard_count TO 4; +SET citus.next_shard_id TO 201000; +SET citus.shard_replication_factor TO 1; +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +CREATE TABLE t1(a int PRIMARY KEY, b int, c int); +CREATE TABLE r1(a int, b int PRIMARY KEY); +CREATE TABLE t2(a int REFERENCES t1(a) ON DELETE CASCADE, b int REFERENCES r1(b) ON DELETE CASCADE, c int); +SELECT create_distributed_table('t1', 'a'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT create_reference_table('r1'); + create_reference_table +------------------------ + +(1 row) + +SELECT create_distributed_table('t2', 'a'); + create_distributed_table +-------------------------- + +(1 row) + +-- insert some data +INSERT INTO r1 VALUES (1, 1), (2, 2), (3, 3); +INSERT INTO t1 VALUES (1, 1, 1), (2, 2, 2), (3, 3, 3); +INSERT INTO t2 VALUES (1, 1, 1), (1, 2, 1), (2, 1, 2), (2, 2, 4), (3, 1, 3), (3, 2, 3), (3, 3, 3); +SELECT pg_backend_pid() as pid \gset +SELECT count(*) FROM t2; + count +------- + 7 +(1 row) + +SHOW citus.multi_shard_commit_protocol ; + citus.multi_shard_commit_protocol +----------------------------------- + 2pc +(1 row) + +-- DELETION TESTS +-- delete using a filter on non-partition column filter +-- test both kill and cancellation +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM").kill()'); + mitmproxy +----------- + +(1 row) + +-- issue a multi shard delete +DELETE FROM t2 WHERE b = 2; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +-- verify nothing is deleted +SELECT count(*) FROM t2; + count +------- + 7 +(1 row) + +-- kill just one connection +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM multi_shard.t2_201005").kill()'); + mitmproxy +----------- + +(1 row) + +DELETE FROM t2 WHERE b = 2; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +-- verify nothing is deleted +SELECT count(*) FROM t2; + count +------- + 7 +(1 row) + +-- cancellation +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM").cancel(' || :pid || ')'); + mitmproxy +----------- + +(1 row) + +-- issue a multi shard delete +DELETE FROM t2 WHERE b = 2; +ERROR: canceling statement due to user request +-- verify nothing is deleted +SELECT count(*) FROM t2; + count +------- + 7 +(1 row) + +-- cancel just one connection +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM multi_shard.t2_201005").cancel(' || :pid || ')'); + mitmproxy +----------- + +(1 row) + +DELETE FROM t2 WHERE b = 2; +ERROR: canceling statement due to user request +-- verify nothing is deleted +SELECT count(*) FROM t2; + count +------- + 7 +(1 row) + +-- UPDATE TESTS +-- update non-partition column based on a filter on another non-partition column +-- DELETION TESTS +-- delete using a filter on non-partition column filter +-- test both kill and cancellation +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + b2 | c4 +----+---- + 3 | 1 +(1 row) + +SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()'); + mitmproxy +----------- + +(1 row) + +-- issue a multi shard update +UPDATE t2 SET c = 4 WHERE b = 2; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + b2 | c4 +----+---- + 3 | 1 +(1 row) + +-- kill just one connection +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE multi_shard.t2_201005").kill()'); + mitmproxy +----------- + +(1 row) + +UPDATE t2 SET c = 4 WHERE b = 2; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + b2 | c4 +----+---- + 3 | 1 +(1 row) + +-- cancellation +SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").cancel(' || :pid || ')'); + mitmproxy +----------- + +(1 row) + +-- issue a multi shard update +UPDATE t2 SET c = 4 WHERE b = 2; +ERROR: canceling statement due to user request +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + b2 | c4 +----+---- + 3 | 1 +(1 row) + +-- cancel just one connection +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE multi_shard.t2_201005").cancel(' || :pid || ')'); + mitmproxy +----------- + +(1 row) + +UPDATE t2 SET c = 4 WHERE b = 2; +ERROR: canceling statement due to user request +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + b2 | c4 +----+---- + 3 | 1 +(1 row) + +-- switch to 1PC +SET citus.multi_shard_commit_protocol TO '1PC'; +-- DELETION TESTS +-- delete using a filter on non-partition column filter +-- test both kill and cancellation +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM").kill()'); + mitmproxy +----------- + +(1 row) + +-- issue a multi shard delete +DELETE FROM t2 WHERE b = 2; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +-- verify nothing is deleted +SELECT count(*) FROM t2; + count +------- + 7 +(1 row) + +-- kill just one connection +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM multi_shard.t2_201005").kill()'); + mitmproxy +----------- + +(1 row) + +DELETE FROM t2 WHERE b = 2; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +-- verify nothing is deleted +SELECT count(*) FROM t2; + count +------- + 7 +(1 row) + +-- cancellation +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM").cancel(' || :pid || ')'); + mitmproxy +----------- + +(1 row) + +-- issue a multi shard delete +DELETE FROM t2 WHERE b = 2; +ERROR: canceling statement due to user request +-- verify nothing is deleted +SELECT count(*) FROM t2; + count +------- + 7 +(1 row) + +-- cancel just one connection +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM multi_shard.t2_201005").cancel(' || :pid || ')'); + mitmproxy +----------- + +(1 row) + +DELETE FROM t2 WHERE b = 2; +ERROR: canceling statement due to user request +-- verify nothing is deleted +SELECT count(*) FROM t2; + count +------- + 7 +(1 row) + +-- UPDATE TESTS +-- update non-partition column based on a filter on another non-partition column +-- DELETION TESTS +-- delete using a filter on non-partition column filter +-- test both kill and cancellation +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + b2 | c4 +----+---- + 3 | 1 +(1 row) + +SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()'); + mitmproxy +----------- + +(1 row) + +-- issue a multi shard update +UPDATE t2 SET c = 4 WHERE b = 2; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + b2 | c4 +----+---- + 3 | 1 +(1 row) + +-- kill just one connection +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE multi_shard.t2_201005").kill()'); + mitmproxy +----------- + +(1 row) + +UPDATE t2 SET c = 4 WHERE b = 2; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + b2 | c4 +----+---- + 3 | 1 +(1 row) + +-- cancellation +SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").cancel(' || :pid || ')'); + mitmproxy +----------- + +(1 row) + +-- issue a multi shard update +UPDATE t2 SET c = 4 WHERE b = 2; +ERROR: canceling statement due to user request +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + b2 | c4 +----+---- + 3 | 1 +(1 row) + +-- cancel just one connection +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE multi_shard.t2_201005").cancel(' || :pid || ')'); + mitmproxy +----------- + +(1 row) + +UPDATE t2 SET c = 4 WHERE b = 2; +ERROR: canceling statement due to user request +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + b2 | c4 +----+---- + 3 | 1 +(1 row) + +RESET citus.multi_shard_commit_protocol; +-- +-- fail when cascading deletes from foreign key +-- unfortunately cascading deletes from foreign keys +-- are done inside the worker only and do not +-- generate any network output +-- therefore we can't just fail cascade part +-- following tests are added for completeness purposes +-- it is safe to remove them without reducing any +-- test coverage +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +-- check counts before delete +SELECT count(*) FILTER (WHERE b = 2) AS b2 FROM t2; + b2 +---- + 3 +(1 row) + +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM").kill()'); + mitmproxy +----------- + +(1 row) + +DELETE FROM r1 WHERE a = 2; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +-- verify nothing is deleted +SELECT count(*) FILTER (WHERE b = 2) AS b2 FROM t2; + b2 +---- + 3 +(1 row) + +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM").kill()'); + mitmproxy +----------- + +(1 row) + +DELETE FROM t2 WHERE b = 2; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +-- verify nothing is deleted +SELECT count(*) FILTER (WHERE b = 2) AS b2 FROM t2; + b2 +---- + 3 +(1 row) + +-- test update with subquery pull +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +CREATE TABLE t3 AS SELECT * FROM t2; +SELECT create_distributed_table('t3', 'a'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +SELECT * FROM t3 ORDER BY 1, 2, 3; + a | b | c +---+---+--- + 1 | 1 | 1 + 1 | 2 | 1 + 2 | 1 | 2 + 2 | 2 | 4 + 3 | 1 | 3 + 3 | 2 | 3 + 3 | 3 | 3 +(7 rows) + +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()'); + mitmproxy +----------- + +(1 row) + +UPDATE t3 SET c = q.c FROM ( + SELECT b, max(c) as c FROM t2 GROUP BY b) q +WHERE t3.b = q.b +RETURNING *; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +--- verify nothing is updated +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT * FROM t3 ORDER BY 1, 2, 3; + a | b | c +---+---+--- + 1 | 1 | 1 + 1 | 2 | 1 + 2 | 1 | 2 + 2 | 2 | 4 + 3 | 1 | 3 + 3 | 2 | 3 + 3 | 3 | 3 +(7 rows) + +-- kill update part +SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE multi_shard.t3_201009").kill()'); + mitmproxy +----------- + +(1 row) + +UPDATE t3 SET c = q.c FROM ( + SELECT b, max(c) as c FROM t2 GROUP BY b) q +WHERE t3.b = q.b +RETURNING *; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +--- verify nothing is updated +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT * FROM t3 ORDER BY 1, 2, 3; + a | b | c +---+---+--- + 1 | 1 | 1 + 1 | 2 | 1 + 2 | 1 | 2 + 2 | 2 | 4 + 3 | 1 | 3 + 3 | 2 | 3 + 3 | 3 | 3 +(7 rows) + +-- test with replication_factor = 2 +-- table can not have foreign reference with this setting so +-- use a different set of table +SET citus.shard_replication_factor to 2; +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +DROP TABLE t3; +CREATE TABLE t3 AS SELECT * FROM t2; +SELECT create_distributed_table('t3', 'a'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3; + b1 | b2 +----+---- + 3 | 3 +(1 row) + +-- prevent update of one replica of one shard +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE multi_shard.t3_201013").kill()'); + mitmproxy +----------- + +(1 row) + +UPDATE t3 SET b = 2 WHERE b = 1; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3; + b1 | b2 +----+---- + 3 | 3 +(1 row) + +-- fail only one update verify transaction is rolled back correctly +BEGIN; +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t2; + b1 | b2 +----+---- + 3 | 3 +(1 row) + +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3; + b1 | b2 +----+---- + 3 | 3 +(1 row) + +UPDATE t2 SET b = 2 WHERE b = 1; +-- verify update is performed on t2 +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t2; + b1 | b2 +----+---- + 0 | 6 +(1 row) + +-- following will fail +UPDATE t3 SET b = 2 WHERE b = 1; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +END; +-- verify everything is rolled back +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t2; + b1 | b2 +----+---- + 3 | 3 +(1 row) + +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3; + b1 | b2 +----+---- + 3 | 3 +(1 row) + +UPDATE t3 SET b = 1 WHERE b = 2 RETURNING *; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3; + b1 | b2 +----+---- + 3 | 3 +(1 row) + +-- switch to 1PC +SET citus.multi_shard_commit_protocol TO '1PC'; +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3; + b1 | b2 +----+---- + 3 | 3 +(1 row) + +UPDATE t3 SET b = 2 WHERE b = 1; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3; + b1 | b2 +----+---- + 3 | 3 +(1 row) + +-- fail only one update verify transaction is rolled back correctly +BEGIN; +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t2; + b1 | b2 +----+---- + 3 | 3 +(1 row) + +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3; + b1 | b2 +----+---- + 3 | 3 +(1 row) + +UPDATE t2 SET b = 2 WHERE b = 1; +-- verify update is performed on t2 +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t2; + b1 | b2 +----+---- + 0 | 6 +(1 row) + +-- following will fail +UPDATE t3 SET b = 2 WHERE b = 1; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +END; +-- verify everything is rolled back +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t2; + b1 | b2 +----+---- + 3 | 3 +(1 row) + +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3; + b1 | b2 +----+---- + 3 | 3 +(1 row) + +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +RESET SEARCH_PATH; +DROP SCHEMA multi_shard CASCADE; +NOTICE: drop cascades to 4 other objects +DETAIL: drop cascades to table multi_shard.t1 +drop cascades to table multi_shard.r1 +drop cascades to table multi_shard.t2 +drop cascades to table multi_shard.t3 diff --git a/src/test/regress/failure_schedule b/src/test/regress/failure_schedule index b84e795f3..a05f36a1d 100644 --- a/src/test/regress/failure_schedule +++ b/src/test/regress/failure_schedule @@ -17,3 +17,5 @@ test: failure_create_table test: failure_1pc_copy_hash test: failure_1pc_copy_append +test: failure_multi_shard_update_delete +test: failure_cte_subquery diff --git a/src/test/regress/sql/failure_cte_subquery.sql b/src/test/regress/sql/failure_cte_subquery.sql new file mode 100644 index 000000000..502ef0e28 --- /dev/null +++ b/src/test/regress/sql/failure_cte_subquery.sql @@ -0,0 +1,237 @@ + +CREATE SCHEMA cte_failure; +SET SEARCH_PATH=cte_failure; +SET citus.shard_count to 2; +SET citus.shard_replication_factor to 1; + +SELECT pg_backend_pid() as pid \gset + +CREATE TABLE users_table (user_id int, user_name text); +CREATE TABLE events_table(user_id int, event_id int, event_type int); +SELECT create_distributed_table('users_table', 'user_id'); +SELECT create_distributed_table('events_table', 'user_id'); +CREATE TABLE users_table_local AS SELECT * FROM users_table; + +-- kill at the first copy (push) +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()'); + +WITH cte AS ( + WITH local_cte AS ( + SELECT * FROM users_table_local + ), + dist_cte AS ( + SELECT user_id FROM events_table + ) + SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id +) +SELECT + count(*) +FROM + cte, + (SELECT + DISTINCT users_table.user_id + FROM + users_table, events_table + WHERE + users_table.user_id = events_table.user_id AND + event_type IN (1,2,3,4) + ORDER BY 1 DESC LIMIT 5 + ) as foo + WHERE foo.user_id = cte.user_id; + +-- kill at the second copy (pull) +SELECT citus.mitmproxy('conn.onQuery(query="SELECT user_id FROM cte_failure.events_table_102250").kill()'); + +WITH cte AS ( + WITH local_cte AS ( + SELECT * FROM users_table_local + ), + dist_cte AS ( + SELECT user_id FROM events_table + ) + SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id +) +SELECT + count(*) +FROM + cte, + (SELECT + DISTINCT users_table.user_id + FROM + users_table, events_table + WHERE + users_table.user_id = events_table.user_id AND + event_type IN (1,2,3,4) + ORDER BY 1 DESC LIMIT 5 + ) as foo + WHERE foo.user_id = cte.user_id; + +-- kill at the third copy (pull) +SELECT citus.mitmproxy('conn.onQuery(query="SELECT DISTINCT users_table.user").kill()'); + +WITH cte AS ( + WITH local_cte AS ( + SELECT * FROM users_table_local + ), + dist_cte AS ( + SELECT user_id FROM events_table + ) + SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id +) +SELECT + count(*) +FROM + cte, + (SELECT + DISTINCT users_table.user_id + FROM + users_table, events_table + WHERE + users_table.user_id = events_table.user_id AND + event_type IN (1,2,3,4) + ORDER BY 1 DESC LIMIT 5 + ) as foo + WHERE foo.user_id = cte.user_id; + +-- cancel at the first copy (push) +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || :pid || ')'); + +WITH cte AS ( + WITH local_cte AS ( + SELECT * FROM users_table_local + ), + dist_cte AS ( + SELECT user_id FROM events_table + ) + SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id +) +SELECT + count(*) +FROM + cte, + (SELECT + DISTINCT users_table.user_id + FROM + users_table, events_table + WHERE + users_table.user_id = events_table.user_id AND + event_type IN (1,2,3,4) + ORDER BY 1 DESC LIMIT 5 + ) as foo + WHERE foo.user_id = cte.user_id; + +-- cancel at the second copy (pull) +SELECT citus.mitmproxy('conn.onQuery(query="SELECT user_id FROM").cancel(' || :pid || ')'); + +WITH cte AS ( + WITH local_cte AS ( + SELECT * FROM users_table_local + ), + dist_cte AS ( + SELECT user_id FROM events_table + ) + SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id +) +SELECT + count(*) +FROM + cte, + (SELECT + DISTINCT users_table.user_id + FROM + users_table, events_table + WHERE + users_table.user_id = events_table.user_id AND + event_type IN (1,2,3,4) + ORDER BY 1 DESC LIMIT 5 + ) as foo + WHERE foo.user_id = cte.user_id; + +-- cancel at the third copy (pull) +SELECT citus.mitmproxy('conn.onQuery(query="SELECT DISTINCT users_table.user").cancel(' || :pid || ')'); + +WITH cte AS ( + WITH local_cte AS ( + SELECT * FROM users_table_local + ), + dist_cte AS ( + SELECT user_id FROM events_table + ) + SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id +) +SELECT + count(*) +FROM + cte, + (SELECT + DISTINCT users_table.user_id + FROM + users_table, events_table + WHERE + users_table.user_id = events_table.user_id AND + event_type IN (1,2,3,4) + ORDER BY 1 DESC LIMIT 5 + ) as foo + WHERE foo.user_id = cte.user_id; + +-- distributed update tests +SELECT citus.mitmproxy('conn.allow()'); + +-- insert some rows +INSERT INTO users_table VALUES (1, 'A'), (2, 'B'), (3, 'C'), (4, 'D'), (5, 'E'); +INSERT INTO events_table VALUES (1,1,1), (1,2,1), (1,3,1), (2,1, 4), (3, 4,1), (5, 1, 2), (5, 2, 1), (5, 2,2); + +SELECT * FROM users_table ORDER BY 1, 2; +-- following will delete and insert the same rows +WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *) +INSERT INTO users_table SELECT * FROM cte_delete; +-- verify contents are the same +SELECT * FROM users_table ORDER BY 1, 2; + +-- kill connection during deletion +SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM").kill()'); +WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *) +INSERT INTO users_table SELECT * FROM cte_delete; + +-- verify contents are the same +SELECT citus.mitmproxy('conn.allow()'); +SELECT * FROM users_table ORDER BY 1, 2; + +-- kill connection during insert +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()'); +WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *) +INSERT INTO users_table SELECT * FROM cte_delete; + +-- verify contents are the same +SELECT citus.mitmproxy('conn.allow()'); +SELECT * FROM users_table ORDER BY 1, 2; + +-- cancel during deletion +SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM").cancel(' || :pid || ')'); +WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *) +INSERT INTO users_table SELECT * FROM cte_delete; + +-- verify contents are the same +SELECT citus.mitmproxy('conn.allow()'); +SELECT * FROM users_table ORDER BY 1, 2; + +-- cancel during insert +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || :pid || ')'); +WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *) +INSERT INTO users_table SELECT * FROM cte_delete; + +-- verify contents are the same +SELECT citus.mitmproxy('conn.allow()'); +SELECT * FROM users_table ORDER BY 1, 2; + +-- test sequential delete/insert +SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM").kill()'); +BEGIN; +SET LOCAL citus.multi_shard_modify_mode = 'sequential'; +WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *) +INSERT INTO users_table SELECT * FROM cte_delete; +END; + +RESET SEARCH_PATH; +SELECT citus.mitmproxy('conn.allow()'); +DROP SCHEMA cte_failure CASCADE; diff --git a/src/test/regress/sql/failure_multi_shard_update_delete.sql b/src/test/regress/sql/failure_multi_shard_update_delete.sql new file mode 100644 index 000000000..34c873ef2 --- /dev/null +++ b/src/test/regress/sql/failure_multi_shard_update_delete.sql @@ -0,0 +1,299 @@ +-- +-- failure_multi_shard_update_delete +-- + +CREATE SCHEMA IF NOT EXISTS multi_shard; +SET SEARCH_PATH = multi_shard; +SET citus.shard_count TO 4; +SET citus.next_shard_id TO 201000; +SET citus.shard_replication_factor TO 1; + +SELECT citus.mitmproxy('conn.allow()'); + +CREATE TABLE t1(a int PRIMARY KEY, b int, c int); +CREATE TABLE r1(a int, b int PRIMARY KEY); +CREATE TABLE t2(a int REFERENCES t1(a) ON DELETE CASCADE, b int REFERENCES r1(b) ON DELETE CASCADE, c int); + +SELECT create_distributed_table('t1', 'a'); +SELECT create_reference_table('r1'); +SELECT create_distributed_table('t2', 'a'); + +-- insert some data +INSERT INTO r1 VALUES (1, 1), (2, 2), (3, 3); +INSERT INTO t1 VALUES (1, 1, 1), (2, 2, 2), (3, 3, 3); +INSERT INTO t2 VALUES (1, 1, 1), (1, 2, 1), (2, 1, 2), (2, 2, 4), (3, 1, 3), (3, 2, 3), (3, 3, 3); + +SELECT pg_backend_pid() as pid \gset +SELECT count(*) FROM t2; + +SHOW citus.multi_shard_commit_protocol ; + +-- DELETION TESTS +-- delete using a filter on non-partition column filter +-- test both kill and cancellation + +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM").kill()'); +-- issue a multi shard delete +DELETE FROM t2 WHERE b = 2; + +-- verify nothing is deleted +SELECT count(*) FROM t2; + +-- kill just one connection +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM multi_shard.t2_201005").kill()'); +DELETE FROM t2 WHERE b = 2; + +-- verify nothing is deleted +SELECT count(*) FROM t2; + +-- cancellation +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM").cancel(' || :pid || ')'); +-- issue a multi shard delete +DELETE FROM t2 WHERE b = 2; + +-- verify nothing is deleted +SELECT count(*) FROM t2; + +-- cancel just one connection +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM multi_shard.t2_201005").cancel(' || :pid || ')'); +DELETE FROM t2 WHERE b = 2; + +-- verify nothing is deleted +SELECT count(*) FROM t2; + +-- UPDATE TESTS +-- update non-partition column based on a filter on another non-partition column +-- DELETION TESTS +-- delete using a filter on non-partition column filter +-- test both kill and cancellation + + +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + +SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()'); +-- issue a multi shard update +UPDATE t2 SET c = 4 WHERE b = 2; + +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + +-- kill just one connection +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE multi_shard.t2_201005").kill()'); +UPDATE t2 SET c = 4 WHERE b = 2; + +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + +-- cancellation +SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").cancel(' || :pid || ')'); +-- issue a multi shard update +UPDATE t2 SET c = 4 WHERE b = 2; + +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + +-- cancel just one connection +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE multi_shard.t2_201005").cancel(' || :pid || ')'); +UPDATE t2 SET c = 4 WHERE b = 2; + +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + +-- switch to 1PC +SET citus.multi_shard_commit_protocol TO '1PC'; + +-- DELETION TESTS +-- delete using a filter on non-partition column filter +-- test both kill and cancellation + +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM").kill()'); +-- issue a multi shard delete +DELETE FROM t2 WHERE b = 2; + +-- verify nothing is deleted +SELECT count(*) FROM t2; + +-- kill just one connection +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM multi_shard.t2_201005").kill()'); +DELETE FROM t2 WHERE b = 2; + +-- verify nothing is deleted +SELECT count(*) FROM t2; + +-- cancellation +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM").cancel(' || :pid || ')'); +-- issue a multi shard delete +DELETE FROM t2 WHERE b = 2; + +-- verify nothing is deleted +SELECT count(*) FROM t2; + +-- cancel just one connection +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM multi_shard.t2_201005").cancel(' || :pid || ')'); +DELETE FROM t2 WHERE b = 2; + +-- verify nothing is deleted +SELECT count(*) FROM t2; + +-- UPDATE TESTS +-- update non-partition column based on a filter on another non-partition column +-- DELETION TESTS +-- delete using a filter on non-partition column filter +-- test both kill and cancellation + + +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + +SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()'); +-- issue a multi shard update +UPDATE t2 SET c = 4 WHERE b = 2; + +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + +-- kill just one connection +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE multi_shard.t2_201005").kill()'); +UPDATE t2 SET c = 4 WHERE b = 2; + +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + +-- cancellation +SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").cancel(' || :pid || ')'); +-- issue a multi shard update +UPDATE t2 SET c = 4 WHERE b = 2; + +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + +-- cancel just one connection +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE multi_shard.t2_201005").cancel(' || :pid || ')'); +UPDATE t2 SET c = 4 WHERE b = 2; + +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + +RESET citus.multi_shard_commit_protocol; + +-- +-- fail when cascading deletes from foreign key +-- unfortunately cascading deletes from foreign keys +-- are done inside the worker only and do not +-- generate any network output +-- therefore we can't just fail cascade part +-- following tests are added for completeness purposes +-- it is safe to remove them without reducing any +-- test coverage +SELECT citus.mitmproxy('conn.allow()'); +-- check counts before delete +SELECT count(*) FILTER (WHERE b = 2) AS b2 FROM t2; + +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM").kill()'); +DELETE FROM r1 WHERE a = 2; + +-- verify nothing is deleted +SELECT count(*) FILTER (WHERE b = 2) AS b2 FROM t2; + +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM").kill()'); + +DELETE FROM t2 WHERE b = 2; + +-- verify nothing is deleted +SELECT count(*) FILTER (WHERE b = 2) AS b2 FROM t2; + +-- test update with subquery pull +SELECT citus.mitmproxy('conn.allow()'); +CREATE TABLE t3 AS SELECT * FROM t2; +SELECT create_distributed_table('t3', 'a'); +SELECT * FROM t3 ORDER BY 1, 2, 3; + +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()'); + +UPDATE t3 SET c = q.c FROM ( + SELECT b, max(c) as c FROM t2 GROUP BY b) q +WHERE t3.b = q.b +RETURNING *; + +--- verify nothing is updated +SELECT citus.mitmproxy('conn.allow()'); +SELECT * FROM t3 ORDER BY 1, 2, 3; + +-- kill update part +SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE multi_shard.t3_201009").kill()'); +UPDATE t3 SET c = q.c FROM ( + SELECT b, max(c) as c FROM t2 GROUP BY b) q +WHERE t3.b = q.b +RETURNING *; + +--- verify nothing is updated +SELECT citus.mitmproxy('conn.allow()'); +SELECT * FROM t3 ORDER BY 1, 2, 3; + +-- test with replication_factor = 2 +-- table can not have foreign reference with this setting so +-- use a different set of table + +SET citus.shard_replication_factor to 2; +SELECT citus.mitmproxy('conn.allow()'); +DROP TABLE t3; +CREATE TABLE t3 AS SELECT * FROM t2; +SELECT create_distributed_table('t3', 'a'); +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3; +-- prevent update of one replica of one shard +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE multi_shard.t3_201013").kill()'); + +UPDATE t3 SET b = 2 WHERE b = 1; + +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3; + +-- fail only one update verify transaction is rolled back correctly +BEGIN; +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t2; +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3; + +UPDATE t2 SET b = 2 WHERE b = 1; +-- verify update is performed on t2 +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t2; +-- following will fail +UPDATE t3 SET b = 2 WHERE b = 1; +END; + +-- verify everything is rolled back +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t2; +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3; + +UPDATE t3 SET b = 1 WHERE b = 2 RETURNING *; + +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3; + +-- switch to 1PC +SET citus.multi_shard_commit_protocol TO '1PC'; + +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3; + +UPDATE t3 SET b = 2 WHERE b = 1; + +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3; + +-- fail only one update verify transaction is rolled back correctly +BEGIN; +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t2; +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3; + +UPDATE t2 SET b = 2 WHERE b = 1; +-- verify update is performed on t2 +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t2; +-- following will fail +UPDATE t3 SET b = 2 WHERE b = 1; +END; + +-- verify everything is rolled back +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t2; +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3; + +SELECT citus.mitmproxy('conn.allow()'); +RESET SEARCH_PATH; +DROP SCHEMA multi_shard CASCADE;