From 6c66033455685bcf162eb34e01b72a09ee1675d7 Mon Sep 17 00:00:00 2001 From: Murat Tuncer Date: Mon, 16 Jul 2018 09:48:40 +0300 Subject: [PATCH 1/3] Add failure tests for multi-shard update/delete Failure tests for update/delete on hash distributed tables using 1PC and 2PC --- .../failure_multi_shard_update_delete.out | 692 ++++++++++++++++++ src/test/regress/failure_schedule | 1 + .../sql/failure_multi_shard_update_delete.sql | 299 ++++++++ 3 files changed, 992 insertions(+) create mode 100644 src/test/regress/expected/failure_multi_shard_update_delete.out create mode 100644 src/test/regress/sql/failure_multi_shard_update_delete.sql diff --git a/src/test/regress/expected/failure_multi_shard_update_delete.out b/src/test/regress/expected/failure_multi_shard_update_delete.out new file mode 100644 index 000000000..ffcaeca16 --- /dev/null +++ b/src/test/regress/expected/failure_multi_shard_update_delete.out @@ -0,0 +1,692 @@ +-- +-- failure_multi_shard_update_delete +-- +CREATE SCHEMA IF NOT EXISTS multi_shard; +SET SEARCH_PATH = multi_shard; +SET citus.shard_count TO 4; +SET citus.next_shard_id TO 201000; +SET citus.shard_replication_factor TO 1; +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +CREATE TABLE t1(a int PRIMARY KEY, b int, c int); +CREATE TABLE r1(a int, b int PRIMARY KEY); +CREATE TABLE t2(a int REFERENCES t1(a) ON DELETE CASCADE, b int REFERENCES r1(b) ON DELETE CASCADE, c int); +SELECT create_distributed_table('t1', 'a'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT create_reference_table('r1'); + create_reference_table +------------------------ + +(1 row) + +SELECT create_distributed_table('t2', 'a'); + create_distributed_table +-------------------------- + +(1 row) + +-- insert some data +INSERT INTO r1 VALUES (1, 1), (2, 2), (3, 3); +INSERT INTO t1 VALUES (1, 1, 1), (2, 2, 2), (3, 3, 3); +INSERT INTO t2 VALUES (1, 1, 1), (1, 2, 1), (2, 1, 2), (2, 2, 4), (3, 1, 3), (3, 2, 3), (3, 3, 3); +SELECT pg_backend_pid() as pid \gset +SELECT count(*) FROM t2; + count +------- + 7 +(1 row) + +SHOW citus.multi_shard_commit_protocol ; + citus.multi_shard_commit_protocol +----------------------------------- + 2pc +(1 row) + +-- DELETION TESTS +-- delete using a filter on non-partition column filter +-- test both kill and cancellation +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM").kill()'); + mitmproxy +----------- + +(1 row) + +-- issue a multi shard delete +DELETE FROM t2 WHERE b = 2; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +-- verify nothing is deleted +SELECT count(*) FROM t2; + count +------- + 7 +(1 row) + +-- kill just one connection +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM multi_shard.t2_201005").kill()'); + mitmproxy +----------- + +(1 row) + +DELETE FROM t2 WHERE b = 2; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +-- verify nothing is deleted +SELECT count(*) FROM t2; + count +------- + 7 +(1 row) + +-- cancellation +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM").cancel(' || :pid || ')'); + mitmproxy +----------- + +(1 row) + +-- issue a multi shard delete +DELETE FROM t2 WHERE b = 2; +ERROR: canceling statement due to user request +-- verify nothing is deleted +SELECT count(*) FROM t2; + count +------- + 7 +(1 row) + +-- cancel just one connection +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM multi_shard.t2_201005").cancel(' || :pid || ')'); + mitmproxy +----------- + +(1 row) + +DELETE FROM t2 WHERE b = 2; +ERROR: canceling statement due to user request +-- verify nothing is deleted +SELECT count(*) FROM t2; + count +------- + 7 +(1 row) + +-- UPDATE TESTS +-- update non-partition column based on a filter on another non-partition column +-- DELETION TESTS +-- delete using a filter on non-partition column filter +-- test both kill and cancellation +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + b2 | c4 +----+---- + 3 | 1 +(1 row) + +SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()'); + mitmproxy +----------- + +(1 row) + +-- issue a multi shard update +UPDATE t2 SET c = 4 WHERE b = 2; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + b2 | c4 +----+---- + 3 | 1 +(1 row) + +-- kill just one connection +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE multi_shard.t2_201005").kill()'); + mitmproxy +----------- + +(1 row) + +UPDATE t2 SET c = 4 WHERE b = 2; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + b2 | c4 +----+---- + 3 | 1 +(1 row) + +-- cancellation +SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").cancel(' || :pid || ')'); + mitmproxy +----------- + +(1 row) + +-- issue a multi shard update +UPDATE t2 SET c = 4 WHERE b = 2; +ERROR: canceling statement due to user request +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + b2 | c4 +----+---- + 3 | 1 +(1 row) + +-- cancel just one connection +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE multi_shard.t2_201005").cancel(' || :pid || ')'); + mitmproxy +----------- + +(1 row) + +UPDATE t2 SET c = 4 WHERE b = 2; +ERROR: canceling statement due to user request +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + b2 | c4 +----+---- + 3 | 1 +(1 row) + +-- switch to 1PC +SET citus.multi_shard_commit_protocol TO '1PC'; +-- DELETION TESTS +-- delete using a filter on non-partition column filter +-- test both kill and cancellation +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM").kill()'); + mitmproxy +----------- + +(1 row) + +-- issue a multi shard delete +DELETE FROM t2 WHERE b = 2; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +-- verify nothing is deleted +SELECT count(*) FROM t2; + count +------- + 7 +(1 row) + +-- kill just one connection +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM multi_shard.t2_201005").kill()'); + mitmproxy +----------- + +(1 row) + +DELETE FROM t2 WHERE b = 2; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +-- verify nothing is deleted +SELECT count(*) FROM t2; + count +------- + 7 +(1 row) + +-- cancellation +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM").cancel(' || :pid || ')'); + mitmproxy +----------- + +(1 row) + +-- issue a multi shard delete +DELETE FROM t2 WHERE b = 2; +ERROR: canceling statement due to user request +-- verify nothing is deleted +SELECT count(*) FROM t2; + count +------- + 7 +(1 row) + +-- cancel just one connection +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM multi_shard.t2_201005").cancel(' || :pid || ')'); + mitmproxy +----------- + +(1 row) + +DELETE FROM t2 WHERE b = 2; +ERROR: canceling statement due to user request +-- verify nothing is deleted +SELECT count(*) FROM t2; + count +------- + 7 +(1 row) + +-- UPDATE TESTS +-- update non-partition column based on a filter on another non-partition column +-- DELETION TESTS +-- delete using a filter on non-partition column filter +-- test both kill and cancellation +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + b2 | c4 +----+---- + 3 | 1 +(1 row) + +SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()'); + mitmproxy +----------- + +(1 row) + +-- issue a multi shard update +UPDATE t2 SET c = 4 WHERE b = 2; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + b2 | c4 +----+---- + 3 | 1 +(1 row) + +-- kill just one connection +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE multi_shard.t2_201005").kill()'); + mitmproxy +----------- + +(1 row) + +UPDATE t2 SET c = 4 WHERE b = 2; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + b2 | c4 +----+---- + 3 | 1 +(1 row) + +-- cancellation +SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").cancel(' || :pid || ')'); + mitmproxy +----------- + +(1 row) + +-- issue a multi shard update +UPDATE t2 SET c = 4 WHERE b = 2; +ERROR: canceling statement due to user request +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + b2 | c4 +----+---- + 3 | 1 +(1 row) + +-- cancel just one connection +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE multi_shard.t2_201005").cancel(' || :pid || ')'); + mitmproxy +----------- + +(1 row) + +UPDATE t2 SET c = 4 WHERE b = 2; +ERROR: canceling statement due to user request +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + b2 | c4 +----+---- + 3 | 1 +(1 row) + +RESET citus.multi_shard_commit_protocol; +-- +-- fail when cascading deletes from foreign key +-- unfortunately cascading deletes from foreign keys +-- are done inside the worker only and do not +-- generate any network output +-- therefore we can't just fail cascade part +-- following tests are added for completeness purposes +-- it is safe to remove them without reducing any +-- test coverage +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +-- check counts before delete +SELECT count(*) FILTER (WHERE b = 2) AS b2 FROM t2; + b2 +---- + 3 +(1 row) + +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM").kill()'); + mitmproxy +----------- + +(1 row) + +DELETE FROM r1 WHERE a = 2; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +-- verify nothing is deleted +SELECT count(*) FILTER (WHERE b = 2) AS b2 FROM t2; + b2 +---- + 3 +(1 row) + +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM").kill()'); + mitmproxy +----------- + +(1 row) + +DELETE FROM t2 WHERE b = 2; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +-- verify nothing is deleted +SELECT count(*) FILTER (WHERE b = 2) AS b2 FROM t2; + b2 +---- + 3 +(1 row) + +-- test update with subquery pull +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +CREATE TABLE t3 AS SELECT * FROM t2; +SELECT create_distributed_table('t3', 'a'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +SELECT * FROM t3 ORDER BY 1, 2, 3; + a | b | c +---+---+--- + 1 | 1 | 1 + 1 | 2 | 1 + 2 | 1 | 2 + 2 | 2 | 4 + 3 | 1 | 3 + 3 | 2 | 3 + 3 | 3 | 3 +(7 rows) + +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()'); + mitmproxy +----------- + +(1 row) + +UPDATE t3 SET c = q.c FROM ( + SELECT b, max(c) as c FROM t2 GROUP BY b) q +WHERE t3.b = q.b +RETURNING *; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +--- verify nothing is updated +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT * FROM t3 ORDER BY 1, 2, 3; + a | b | c +---+---+--- + 1 | 1 | 1 + 1 | 2 | 1 + 2 | 1 | 2 + 2 | 2 | 4 + 3 | 1 | 3 + 3 | 2 | 3 + 3 | 3 | 3 +(7 rows) + +-- kill update part +SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE multi_shard.t3_201009").kill()'); + mitmproxy +----------- + +(1 row) + +UPDATE t3 SET c = q.c FROM ( + SELECT b, max(c) as c FROM t2 GROUP BY b) q +WHERE t3.b = q.b +RETURNING *; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +--- verify nothing is updated +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT * FROM t3 ORDER BY 1, 2, 3; + a | b | c +---+---+--- + 1 | 1 | 1 + 1 | 2 | 1 + 2 | 1 | 2 + 2 | 2 | 4 + 3 | 1 | 3 + 3 | 2 | 3 + 3 | 3 | 3 +(7 rows) + +-- test with replication_factor = 2 +-- table can not have foreign reference with this setting so +-- use a different set of table +SET citus.shard_replication_factor to 2; +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +DROP TABLE t3; +CREATE TABLE t3 AS SELECT * FROM t2; +SELECT create_distributed_table('t3', 'a'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3; + b1 | b2 +----+---- + 3 | 3 +(1 row) + +-- prevent update of one replica of one shard +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE multi_shard.t3_201013").kill()'); + mitmproxy +----------- + +(1 row) + +UPDATE t3 SET b = 2 WHERE b = 1; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3; + b1 | b2 +----+---- + 3 | 3 +(1 row) + +-- fail only one update verify transaction is rolled back correctly +BEGIN; +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t2; + b1 | b2 +----+---- + 3 | 3 +(1 row) + +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3; + b1 | b2 +----+---- + 3 | 3 +(1 row) + +UPDATE t2 SET b = 2 WHERE b = 1; +-- verify update is performed on t2 +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t2; + b1 | b2 +----+---- + 0 | 6 +(1 row) + +-- following will fail +UPDATE t3 SET b = 2 WHERE b = 1; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +END; +-- verify everything is rolled back +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t2; + b1 | b2 +----+---- + 3 | 3 +(1 row) + +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3; + b1 | b2 +----+---- + 3 | 3 +(1 row) + +UPDATE t3 SET b = 1 WHERE b = 2 RETURNING *; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3; + b1 | b2 +----+---- + 3 | 3 +(1 row) + +-- switch to 1PC +SET citus.multi_shard_commit_protocol TO '1PC'; +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3; + b1 | b2 +----+---- + 3 | 3 +(1 row) + +UPDATE t3 SET b = 2 WHERE b = 1; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3; + b1 | b2 +----+---- + 3 | 3 +(1 row) + +-- fail only one update verify transaction is rolled back correctly +BEGIN; +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t2; + b1 | b2 +----+---- + 3 | 3 +(1 row) + +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3; + b1 | b2 +----+---- + 3 | 3 +(1 row) + +UPDATE t2 SET b = 2 WHERE b = 1; +-- verify update is performed on t2 +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t2; + b1 | b2 +----+---- + 0 | 6 +(1 row) + +-- following will fail +UPDATE t3 SET b = 2 WHERE b = 1; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +END; +-- verify everything is rolled back +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t2; + b1 | b2 +----+---- + 3 | 3 +(1 row) + +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3; + b1 | b2 +----+---- + 3 | 3 +(1 row) + +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +RESET SEARCH_PATH; +DROP SCHEMA multi_shard CASCADE; +NOTICE: drop cascades to 4 other objects +DETAIL: drop cascades to table multi_shard.t1 +drop cascades to table multi_shard.r1 +drop cascades to table multi_shard.t2 +drop cascades to table multi_shard.t3 diff --git a/src/test/regress/failure_schedule b/src/test/regress/failure_schedule index b84e795f3..4aeb9344b 100644 --- a/src/test/regress/failure_schedule +++ b/src/test/regress/failure_schedule @@ -17,3 +17,4 @@ test: failure_create_table test: failure_1pc_copy_hash test: failure_1pc_copy_append +test: failure_multi_shard_update_delete diff --git a/src/test/regress/sql/failure_multi_shard_update_delete.sql b/src/test/regress/sql/failure_multi_shard_update_delete.sql new file mode 100644 index 000000000..34c873ef2 --- /dev/null +++ b/src/test/regress/sql/failure_multi_shard_update_delete.sql @@ -0,0 +1,299 @@ +-- +-- failure_multi_shard_update_delete +-- + +CREATE SCHEMA IF NOT EXISTS multi_shard; +SET SEARCH_PATH = multi_shard; +SET citus.shard_count TO 4; +SET citus.next_shard_id TO 201000; +SET citus.shard_replication_factor TO 1; + +SELECT citus.mitmproxy('conn.allow()'); + +CREATE TABLE t1(a int PRIMARY KEY, b int, c int); +CREATE TABLE r1(a int, b int PRIMARY KEY); +CREATE TABLE t2(a int REFERENCES t1(a) ON DELETE CASCADE, b int REFERENCES r1(b) ON DELETE CASCADE, c int); + +SELECT create_distributed_table('t1', 'a'); +SELECT create_reference_table('r1'); +SELECT create_distributed_table('t2', 'a'); + +-- insert some data +INSERT INTO r1 VALUES (1, 1), (2, 2), (3, 3); +INSERT INTO t1 VALUES (1, 1, 1), (2, 2, 2), (3, 3, 3); +INSERT INTO t2 VALUES (1, 1, 1), (1, 2, 1), (2, 1, 2), (2, 2, 4), (3, 1, 3), (3, 2, 3), (3, 3, 3); + +SELECT pg_backend_pid() as pid \gset +SELECT count(*) FROM t2; + +SHOW citus.multi_shard_commit_protocol ; + +-- DELETION TESTS +-- delete using a filter on non-partition column filter +-- test both kill and cancellation + +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM").kill()'); +-- issue a multi shard delete +DELETE FROM t2 WHERE b = 2; + +-- verify nothing is deleted +SELECT count(*) FROM t2; + +-- kill just one connection +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM multi_shard.t2_201005").kill()'); +DELETE FROM t2 WHERE b = 2; + +-- verify nothing is deleted +SELECT count(*) FROM t2; + +-- cancellation +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM").cancel(' || :pid || ')'); +-- issue a multi shard delete +DELETE FROM t2 WHERE b = 2; + +-- verify nothing is deleted +SELECT count(*) FROM t2; + +-- cancel just one connection +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM multi_shard.t2_201005").cancel(' || :pid || ')'); +DELETE FROM t2 WHERE b = 2; + +-- verify nothing is deleted +SELECT count(*) FROM t2; + +-- UPDATE TESTS +-- update non-partition column based on a filter on another non-partition column +-- DELETION TESTS +-- delete using a filter on non-partition column filter +-- test both kill and cancellation + + +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + +SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()'); +-- issue a multi shard update +UPDATE t2 SET c = 4 WHERE b = 2; + +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + +-- kill just one connection +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE multi_shard.t2_201005").kill()'); +UPDATE t2 SET c = 4 WHERE b = 2; + +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + +-- cancellation +SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").cancel(' || :pid || ')'); +-- issue a multi shard update +UPDATE t2 SET c = 4 WHERE b = 2; + +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + +-- cancel just one connection +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE multi_shard.t2_201005").cancel(' || :pid || ')'); +UPDATE t2 SET c = 4 WHERE b = 2; + +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + +-- switch to 1PC +SET citus.multi_shard_commit_protocol TO '1PC'; + +-- DELETION TESTS +-- delete using a filter on non-partition column filter +-- test both kill and cancellation + +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM").kill()'); +-- issue a multi shard delete +DELETE FROM t2 WHERE b = 2; + +-- verify nothing is deleted +SELECT count(*) FROM t2; + +-- kill just one connection +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM multi_shard.t2_201005").kill()'); +DELETE FROM t2 WHERE b = 2; + +-- verify nothing is deleted +SELECT count(*) FROM t2; + +-- cancellation +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM").cancel(' || :pid || ')'); +-- issue a multi shard delete +DELETE FROM t2 WHERE b = 2; + +-- verify nothing is deleted +SELECT count(*) FROM t2; + +-- cancel just one connection +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM multi_shard.t2_201005").cancel(' || :pid || ')'); +DELETE FROM t2 WHERE b = 2; + +-- verify nothing is deleted +SELECT count(*) FROM t2; + +-- UPDATE TESTS +-- update non-partition column based on a filter on another non-partition column +-- DELETION TESTS +-- delete using a filter on non-partition column filter +-- test both kill and cancellation + + +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + +SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()'); +-- issue a multi shard update +UPDATE t2 SET c = 4 WHERE b = 2; + +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + +-- kill just one connection +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE multi_shard.t2_201005").kill()'); +UPDATE t2 SET c = 4 WHERE b = 2; + +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + +-- cancellation +SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").cancel(' || :pid || ')'); +-- issue a multi shard update +UPDATE t2 SET c = 4 WHERE b = 2; + +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + +-- cancel just one connection +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE multi_shard.t2_201005").cancel(' || :pid || ')'); +UPDATE t2 SET c = 4 WHERE b = 2; + +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + +RESET citus.multi_shard_commit_protocol; + +-- +-- fail when cascading deletes from foreign key +-- unfortunately cascading deletes from foreign keys +-- are done inside the worker only and do not +-- generate any network output +-- therefore we can't just fail cascade part +-- following tests are added for completeness purposes +-- it is safe to remove them without reducing any +-- test coverage +SELECT citus.mitmproxy('conn.allow()'); +-- check counts before delete +SELECT count(*) FILTER (WHERE b = 2) AS b2 FROM t2; + +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM").kill()'); +DELETE FROM r1 WHERE a = 2; + +-- verify nothing is deleted +SELECT count(*) FILTER (WHERE b = 2) AS b2 FROM t2; + +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM").kill()'); + +DELETE FROM t2 WHERE b = 2; + +-- verify nothing is deleted +SELECT count(*) FILTER (WHERE b = 2) AS b2 FROM t2; + +-- test update with subquery pull +SELECT citus.mitmproxy('conn.allow()'); +CREATE TABLE t3 AS SELECT * FROM t2; +SELECT create_distributed_table('t3', 'a'); +SELECT * FROM t3 ORDER BY 1, 2, 3; + +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()'); + +UPDATE t3 SET c = q.c FROM ( + SELECT b, max(c) as c FROM t2 GROUP BY b) q +WHERE t3.b = q.b +RETURNING *; + +--- verify nothing is updated +SELECT citus.mitmproxy('conn.allow()'); +SELECT * FROM t3 ORDER BY 1, 2, 3; + +-- kill update part +SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE multi_shard.t3_201009").kill()'); +UPDATE t3 SET c = q.c FROM ( + SELECT b, max(c) as c FROM t2 GROUP BY b) q +WHERE t3.b = q.b +RETURNING *; + +--- verify nothing is updated +SELECT citus.mitmproxy('conn.allow()'); +SELECT * FROM t3 ORDER BY 1, 2, 3; + +-- test with replication_factor = 2 +-- table can not have foreign reference with this setting so +-- use a different set of table + +SET citus.shard_replication_factor to 2; +SELECT citus.mitmproxy('conn.allow()'); +DROP TABLE t3; +CREATE TABLE t3 AS SELECT * FROM t2; +SELECT create_distributed_table('t3', 'a'); +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3; +-- prevent update of one replica of one shard +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE multi_shard.t3_201013").kill()'); + +UPDATE t3 SET b = 2 WHERE b = 1; + +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3; + +-- fail only one update verify transaction is rolled back correctly +BEGIN; +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t2; +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3; + +UPDATE t2 SET b = 2 WHERE b = 1; +-- verify update is performed on t2 +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t2; +-- following will fail +UPDATE t3 SET b = 2 WHERE b = 1; +END; + +-- verify everything is rolled back +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t2; +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3; + +UPDATE t3 SET b = 1 WHERE b = 2 RETURNING *; + +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3; + +-- switch to 1PC +SET citus.multi_shard_commit_protocol TO '1PC'; + +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3; + +UPDATE t3 SET b = 2 WHERE b = 1; + +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3; + +-- fail only one update verify transaction is rolled back correctly +BEGIN; +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t2; +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3; + +UPDATE t2 SET b = 2 WHERE b = 1; +-- verify update is performed on t2 +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t2; +-- following will fail +UPDATE t3 SET b = 2 WHERE b = 1; +END; + +-- verify everything is rolled back +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t2; +SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3; + +SELECT citus.mitmproxy('conn.allow()'); +RESET SEARCH_PATH; +DROP SCHEMA multi_shard CASCADE; From d26b312cad39360cea2113ac17463d8c73440530 Mon Sep 17 00:00:00 2001 From: Murat Tuncer Date: Thu, 26 Jul 2018 16:16:19 +0300 Subject: [PATCH 2/3] Add failure test for coordinator pull/push for cte --- .../regress/expected/failure_cte_subquery.out | 405 ++++++++++++++++++ src/test/regress/failure_schedule | 1 + src/test/regress/sql/failure_cte_subquery.sql | 236 ++++++++++ 3 files changed, 642 insertions(+) create mode 100644 src/test/regress/expected/failure_cte_subquery.out create mode 100644 src/test/regress/sql/failure_cte_subquery.sql diff --git a/src/test/regress/expected/failure_cte_subquery.out b/src/test/regress/expected/failure_cte_subquery.out new file mode 100644 index 000000000..a1bf9cf3b --- /dev/null +++ b/src/test/regress/expected/failure_cte_subquery.out @@ -0,0 +1,405 @@ +CREATE SCHEMA cte_failure; +SET SEARCH_PATH=cte_failure; +SET citus.shard_count to 2; +SET citus.shard_replication_factor to 1; +SELECT pg_backend_pid() as pid \gset +CREATE TABLE users_table (user_id int, user_name text); +CREATE TABLE events_table(user_id int, event_id int, event_type int); +SELECT create_distributed_table('users_table', 'user_id'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT create_distributed_table('events_table', 'user_id'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE users_table_local AS SELECT * FROM users_table; +-- kill at the first copy (push) +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()'); + mitmproxy +----------- + +(1 row) + +WITH cte AS ( + WITH local_cte AS ( + SELECT * FROM users_table_local + ), + dist_cte AS ( + SELECT user_id FROM events_table + ) + SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id +) +SELECT + count(*) +FROM + cte, + (SELECT + DISTINCT users_table.user_id + FROM + users_table, events_table + WHERE + users_table.user_id = events_table.user_id AND + event_type IN (1,2,3,4) + ORDER BY 1 DESC LIMIT 5 + ) as foo + WHERE foo.user_id = cte.user_id; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:57640 +-- kill at the second copy (pull) +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT citus.mitmproxy('conn.onQuery(query="SELECT user_id FROM").kill()'); + mitmproxy +----------- + +(1 row) + +WITH cte AS ( + WITH local_cte AS ( + SELECT * FROM users_table_local + ), + dist_cte AS ( + SELECT user_id FROM events_table + ) + SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id +) +SELECT + count(*) +FROM + cte, + (SELECT + DISTINCT users_table.user_id + FROM + users_table, events_table + WHERE + users_table.user_id = events_table.user_id AND + event_type IN (1,2,3,4) + ORDER BY 1 DESC LIMIT 5 + ) as foo + WHERE foo.user_id = cte.user_id; +WARNING: could not consume data from worker node +WARNING: could not consume data from worker node +WARNING: could not consume data from worker node +ERROR: failed to execute task 2 + +-- kill at the third copy (pull) +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT citus.mitmproxy('conn.onQuery(query="SELECT DISTINCT users_table.user").kill()'); + mitmproxy +----------- + +(1 row) + +WITH cte AS ( + WITH local_cte AS ( + SELECT * FROM users_table_local + ), + dist_cte AS ( + SELECT user_id FROM events_table + ) + SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id +) +SELECT + count(*) +FROM + cte, + (SELECT + DISTINCT users_table.user_id + FROM + users_table, events_table + WHERE + users_table.user_id = events_table.user_id AND + event_type IN (1,2,3,4) + ORDER BY 1 DESC LIMIT 5 + ) as foo + WHERE foo.user_id = cte.user_id; +WARNING: could not consume data from worker node +WARNING: could not consume data from worker node +ERROR: failed to execute task 2 +-- cancel at the first copy (push) +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || :pid || ')'); + mitmproxy +----------- + +(1 row) + +WITH cte AS ( + WITH local_cte AS ( + SELECT * FROM users_table_local + ), + dist_cte AS ( + SELECT user_id FROM events_table + ) + SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id +) +SELECT + count(*) +FROM + cte, + (SELECT + DISTINCT users_table.user_id + FROM + users_table, events_table + WHERE + users_table.user_id = events_table.user_id AND + event_type IN (1,2,3,4) + ORDER BY 1 DESC LIMIT 5 + ) as foo + WHERE foo.user_id = cte.user_id; +ERROR: canceling statement due to user request +-- cancel at the second copy (pull) +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT citus.mitmproxy('conn.onQuery(query="SELECT user_id FROM").cancel(' || :pid || ')'); + mitmproxy +----------- + +(1 row) + +WITH cte AS ( + WITH local_cte AS ( + SELECT * FROM users_table_local + ), + dist_cte AS ( + SELECT user_id FROM events_table + ) + SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id +) +SELECT + count(*) +FROM + cte, + (SELECT + DISTINCT users_table.user_id + FROM + users_table, events_table + WHERE + users_table.user_id = events_table.user_id AND + event_type IN (1,2,3,4) + ORDER BY 1 DESC LIMIT 5 + ) as foo + WHERE foo.user_id = cte.user_id; +ERROR: canceling statement due to user request +-- cancel at the third copy (pull) +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT citus.mitmproxy('conn.onQuery(query="SELECT DISTINCT users_table.user").cancel(' || :pid || ')'); + mitmproxy +----------- + +(1 row) + +WITH cte AS ( + WITH local_cte AS ( + SELECT * FROM users_table_local + ), + dist_cte AS ( + SELECT user_id FROM events_table + ) + SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id +) +SELECT + count(*) +FROM + cte, + (SELECT + DISTINCT users_table.user_id + FROM + users_table, events_table + WHERE + users_table.user_id = events_table.user_id AND + event_type IN (1,2,3,4) + ORDER BY 1 DESC LIMIT 5 + ) as foo + WHERE foo.user_id = cte.user_id; +ERROR: canceling statement due to user request +-- distributed update tests +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +-- insert some rows +INSERT INTO users_table VALUES (1, 'A'), (2, 'B'), (3, 'C'), (4, 'D'), (5, 'E'); +INSERT INTO events_table VALUES (1,1,1), (1,2,1), (1,3,1), (2,1, 4), (3, 4,1), (5, 1, 2), (5, 2, 1), (5, 2,2); +SELECT * FROM users_table ORDER BY 1, 2; + user_id | user_name +---------+----------- + 1 | A + 2 | B + 3 | C + 4 | D + 5 | E +(5 rows) + +-- following will delete and insert the same rows +WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *) +INSERT INTO users_table SELECT * FROM cte_delete; +-- verify contents are the same +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT * FROM users_table ORDER BY 1, 2; + user_id | user_name +---------+----------- + 1 | A + 2 | B + 3 | C + 4 | D + 5 | E +(5 rows) + +-- kill connection during deletion +SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM").kill()'); + mitmproxy +----------- + +(1 row) + +WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *) +INSERT INTO users_table SELECT * FROM cte_delete; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:57640 +-- verify contents are the same +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT * FROM users_table ORDER BY 1, 2; + user_id | user_name +---------+----------- + 1 | A + 2 | B + 3 | C + 4 | D + 5 | E +(5 rows) + +-- kill connection during insert +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()'); + mitmproxy +----------- + +(1 row) + +WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *) +INSERT INTO users_table SELECT * FROM cte_delete; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:57640 +-- verify contents are the same +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT * FROM users_table ORDER BY 1, 2; + user_id | user_name +---------+----------- + 1 | A + 2 | B + 3 | C + 4 | D + 5 | E +(5 rows) + +-- cancel during deletion +SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM").cancel(' || :pid || ')'); + mitmproxy +----------- + +(1 row) + +WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *) +INSERT INTO users_table SELECT * FROM cte_delete; +ERROR: canceling statement due to user request +-- verify contents are the same +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT * FROM users_table ORDER BY 1, 2; + user_id | user_name +---------+----------- + 1 | A + 2 | B + 3 | C + 4 | D + 5 | E +(5 rows) + +-- cancel during insert +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || :pid || ')'); + mitmproxy +----------- + +(1 row) + +WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *) +INSERT INTO users_table SELECT * FROM cte_delete; +ERROR: canceling statement due to user request +-- verify contents are the same +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT * FROM users_table ORDER BY 1, 2; + user_id | user_name +---------+----------- + 1 | A + 2 | B + 3 | C + 4 | D + 5 | E +(5 rows) + +RESET SEARCH_PATH; +DROP SCHEMA cte_failure CASCADE; +NOTICE: drop cascades to 3 other objects +DETAIL: drop cascades to table cte_failure.users_table +drop cascades to table cte_failure.events_table +drop cascades to table cte_failure.users_table_local diff --git a/src/test/regress/failure_schedule b/src/test/regress/failure_schedule index 4aeb9344b..a05f36a1d 100644 --- a/src/test/regress/failure_schedule +++ b/src/test/regress/failure_schedule @@ -18,3 +18,4 @@ test: failure_create_table test: failure_1pc_copy_hash test: failure_1pc_copy_append test: failure_multi_shard_update_delete +test: failure_cte_subquery diff --git a/src/test/regress/sql/failure_cte_subquery.sql b/src/test/regress/sql/failure_cte_subquery.sql new file mode 100644 index 000000000..8ae1ff948 --- /dev/null +++ b/src/test/regress/sql/failure_cte_subquery.sql @@ -0,0 +1,236 @@ + +CREATE SCHEMA cte_failure; +SET SEARCH_PATH=cte_failure; +SET citus.shard_count to 2; +SET citus.shard_replication_factor to 1; + +SELECT pg_backend_pid() as pid \gset + +CREATE TABLE users_table (user_id int, user_name text); +CREATE TABLE events_table(user_id int, event_id int, event_type int); +SELECT create_distributed_table('users_table', 'user_id'); +SELECT create_distributed_table('events_table', 'user_id'); +CREATE TABLE users_table_local AS SELECT * FROM users_table; + +-- kill at the first copy (push) +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()'); + +WITH cte AS ( + WITH local_cte AS ( + SELECT * FROM users_table_local + ), + dist_cte AS ( + SELECT user_id FROM events_table + ) + SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id +) +SELECT + count(*) +FROM + cte, + (SELECT + DISTINCT users_table.user_id + FROM + users_table, events_table + WHERE + users_table.user_id = events_table.user_id AND + event_type IN (1,2,3,4) + ORDER BY 1 DESC LIMIT 5 + ) as foo + WHERE foo.user_id = cte.user_id; + +-- kill at the second copy (pull) +SELECT citus.mitmproxy('conn.allow()'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT user_id FROM").kill()'); + +WITH cte AS ( + WITH local_cte AS ( + SELECT * FROM users_table_local + ), + dist_cte AS ( + SELECT user_id FROM events_table + ) + SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id +) +SELECT + count(*) +FROM + cte, + (SELECT + DISTINCT users_table.user_id + FROM + users_table, events_table + WHERE + users_table.user_id = events_table.user_id AND + event_type IN (1,2,3,4) + ORDER BY 1 DESC LIMIT 5 + ) as foo + WHERE foo.user_id = cte.user_id; + +-- kill at the third copy (pull) +SELECT citus.mitmproxy('conn.allow()'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT DISTINCT users_table.user").kill()'); + +WITH cte AS ( + WITH local_cte AS ( + SELECT * FROM users_table_local + ), + dist_cte AS ( + SELECT user_id FROM events_table + ) + SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id +) +SELECT + count(*) +FROM + cte, + (SELECT + DISTINCT users_table.user_id + FROM + users_table, events_table + WHERE + users_table.user_id = events_table.user_id AND + event_type IN (1,2,3,4) + ORDER BY 1 DESC LIMIT 5 + ) as foo + WHERE foo.user_id = cte.user_id; + +-- cancel at the first copy (push) +SELECT citus.mitmproxy('conn.allow()'); +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || :pid || ')'); + +WITH cte AS ( + WITH local_cte AS ( + SELECT * FROM users_table_local + ), + dist_cte AS ( + SELECT user_id FROM events_table + ) + SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id +) +SELECT + count(*) +FROM + cte, + (SELECT + DISTINCT users_table.user_id + FROM + users_table, events_table + WHERE + users_table.user_id = events_table.user_id AND + event_type IN (1,2,3,4) + ORDER BY 1 DESC LIMIT 5 + ) as foo + WHERE foo.user_id = cte.user_id; + +-- cancel at the second copy (pull) +SELECT citus.mitmproxy('conn.allow()'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT user_id FROM").cancel(' || :pid || ')'); + +WITH cte AS ( + WITH local_cte AS ( + SELECT * FROM users_table_local + ), + dist_cte AS ( + SELECT user_id FROM events_table + ) + SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id +) +SELECT + count(*) +FROM + cte, + (SELECT + DISTINCT users_table.user_id + FROM + users_table, events_table + WHERE + users_table.user_id = events_table.user_id AND + event_type IN (1,2,3,4) + ORDER BY 1 DESC LIMIT 5 + ) as foo + WHERE foo.user_id = cte.user_id; + +-- cancel at the third copy (pull) +SELECT citus.mitmproxy('conn.allow()'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT DISTINCT users_table.user").cancel(' || :pid || ')'); + +WITH cte AS ( + WITH local_cte AS ( + SELECT * FROM users_table_local + ), + dist_cte AS ( + SELECT user_id FROM events_table + ) + SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id +) +SELECT + count(*) +FROM + cte, + (SELECT + DISTINCT users_table.user_id + FROM + users_table, events_table + WHERE + users_table.user_id = events_table.user_id AND + event_type IN (1,2,3,4) + ORDER BY 1 DESC LIMIT 5 + ) as foo + WHERE foo.user_id = cte.user_id; + +-- distributed update tests +SELECT citus.mitmproxy('conn.allow()'); + +-- insert some rows +INSERT INTO users_table VALUES (1, 'A'), (2, 'B'), (3, 'C'), (4, 'D'), (5, 'E'); +INSERT INTO events_table VALUES (1,1,1), (1,2,1), (1,3,1), (2,1, 4), (3, 4,1), (5, 1, 2), (5, 2, 1), (5, 2,2); + +SELECT * FROM users_table ORDER BY 1, 2; +-- following will delete and insert the same rows +WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *) +INSERT INTO users_table SELECT * FROM cte_delete; +-- verify contents are the same +SELECT citus.mitmproxy('conn.allow()'); +SELECT * FROM users_table ORDER BY 1, 2; + +-- kill connection during deletion +SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM").kill()'); +WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *) +INSERT INTO users_table SELECT * FROM cte_delete; + +-- verify contents are the same +SELECT citus.mitmproxy('conn.allow()'); +SELECT * FROM users_table ORDER BY 1, 2; + +-- kill connection during insert +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()'); +WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *) +INSERT INTO users_table SELECT * FROM cte_delete; + +-- verify contents are the same +SELECT citus.mitmproxy('conn.allow()'); +SELECT * FROM users_table ORDER BY 1, 2; + +-- cancel during deletion +SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM").cancel(' || :pid || ')'); +WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *) +INSERT INTO users_table SELECT * FROM cte_delete; + +-- verify contents are the same +SELECT citus.mitmproxy('conn.allow()'); +SELECT * FROM users_table ORDER BY 1, 2; + +-- cancel during insert +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || :pid || ')'); +WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *) +INSERT INTO users_table SELECT * FROM cte_delete; + +-- verify contents are the same +SELECT citus.mitmproxy('conn.allow()'); +SELECT * FROM users_table ORDER BY 1, 2; + +RESET SEARCH_PATH; +DROP SCHEMA cte_failure CASCADE; + + From 0a987e9c0ea9cf50528de2e440c006179b499a54 Mon Sep 17 00:00:00 2001 From: Murat Tuncer Date: Wed, 3 Oct 2018 12:23:27 +0300 Subject: [PATCH 3/3] Fix cte subquery failure test --- .../regress/expected/failure_cte_subquery.out | 70 ++++++++----------- src/test/regress/sql/failure_cte_subquery.sql | 19 ++--- 2 files changed, 38 insertions(+), 51 deletions(-) diff --git a/src/test/regress/expected/failure_cte_subquery.out b/src/test/regress/expected/failure_cte_subquery.out index a1bf9cf3b..5a8b9737d 100644 --- a/src/test/regress/expected/failure_cte_subquery.out +++ b/src/test/regress/expected/failure_cte_subquery.out @@ -51,15 +51,9 @@ FROM ERROR: server closed the connection unexpectedly This probably means the server terminated abnormally before or while processing the request. -CONTEXT: while executing command on localhost:57640 +CONTEXT: while executing command on localhost:9060 -- kill at the second copy (pull) -SELECT citus.mitmproxy('conn.allow()'); - mitmproxy ------------ - -(1 row) - -SELECT citus.mitmproxy('conn.onQuery(query="SELECT user_id FROM").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT user_id FROM cte_failure.events_table_102250").kill()'); mitmproxy ----------- @@ -91,15 +85,9 @@ FROM WARNING: could not consume data from worker node WARNING: could not consume data from worker node WARNING: could not consume data from worker node -ERROR: failed to execute task 2 +ERROR: failed to execute task 1 -- kill at the third copy (pull) -SELECT citus.mitmproxy('conn.allow()'); - mitmproxy ------------ - -(1 row) - SELECT citus.mitmproxy('conn.onQuery(query="SELECT DISTINCT users_table.user").kill()'); mitmproxy ----------- @@ -131,14 +119,8 @@ FROM WHERE foo.user_id = cte.user_id; WARNING: could not consume data from worker node WARNING: could not consume data from worker node -ERROR: failed to execute task 2 +ERROR: failed to execute task 1 -- cancel at the first copy (push) -SELECT citus.mitmproxy('conn.allow()'); - mitmproxy ------------ - -(1 row) - SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || :pid || ')'); mitmproxy ----------- @@ -170,12 +152,6 @@ FROM WHERE foo.user_id = cte.user_id; ERROR: canceling statement due to user request -- cancel at the second copy (pull) -SELECT citus.mitmproxy('conn.allow()'); - mitmproxy ------------ - -(1 row) - SELECT citus.mitmproxy('conn.onQuery(query="SELECT user_id FROM").cancel(' || :pid || ')'); mitmproxy ----------- @@ -207,12 +183,6 @@ FROM WHERE foo.user_id = cte.user_id; ERROR: canceling statement due to user request -- cancel at the third copy (pull) -SELECT citus.mitmproxy('conn.allow()'); - mitmproxy ------------ - -(1 row) - SELECT citus.mitmproxy('conn.onQuery(query="SELECT DISTINCT users_table.user").cancel(' || :pid || ')'); mitmproxy ----------- @@ -267,12 +237,6 @@ SELECT * FROM users_table ORDER BY 1, 2; WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *) INSERT INTO users_table SELECT * FROM cte_delete; -- verify contents are the same -SELECT citus.mitmproxy('conn.allow()'); - mitmproxy ------------ - -(1 row) - SELECT * FROM users_table ORDER BY 1, 2; user_id | user_name ---------+----------- @@ -295,7 +259,7 @@ INSERT INTO users_table SELECT * FROM cte_delete; ERROR: server closed the connection unexpectedly This probably means the server terminated abnormally before or while processing the request. -CONTEXT: while executing command on localhost:57640 +CONTEXT: while executing command on localhost:9060 -- verify contents are the same SELECT citus.mitmproxy('conn.allow()'); mitmproxy @@ -325,7 +289,7 @@ INSERT INTO users_table SELECT * FROM cte_delete; ERROR: server closed the connection unexpectedly This probably means the server terminated abnormally before or while processing the request. -CONTEXT: while executing command on localhost:57640 +CONTEXT: while executing command on localhost:9060 -- verify contents are the same SELECT citus.mitmproxy('conn.allow()'); mitmproxy @@ -397,7 +361,29 @@ SELECT * FROM users_table ORDER BY 1, 2; 5 | E (5 rows) +-- test sequential delete/insert +SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM").kill()'); + mitmproxy +----------- + +(1 row) + +BEGIN; +SET LOCAL citus.multi_shard_modify_mode = 'sequential'; +WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *) +INSERT INTO users_table SELECT * FROM cte_delete; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +END; RESET SEARCH_PATH; +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + DROP SCHEMA cte_failure CASCADE; NOTICE: drop cascades to 3 other objects DETAIL: drop cascades to table cte_failure.users_table diff --git a/src/test/regress/sql/failure_cte_subquery.sql b/src/test/regress/sql/failure_cte_subquery.sql index 8ae1ff948..502ef0e28 100644 --- a/src/test/regress/sql/failure_cte_subquery.sql +++ b/src/test/regress/sql/failure_cte_subquery.sql @@ -40,8 +40,7 @@ FROM WHERE foo.user_id = cte.user_id; -- kill at the second copy (pull) -SELECT citus.mitmproxy('conn.allow()'); -SELECT citus.mitmproxy('conn.onQuery(query="SELECT user_id FROM").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT user_id FROM cte_failure.events_table_102250").kill()'); WITH cte AS ( WITH local_cte AS ( @@ -68,7 +67,6 @@ FROM WHERE foo.user_id = cte.user_id; -- kill at the third copy (pull) -SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.onQuery(query="SELECT DISTINCT users_table.user").kill()'); WITH cte AS ( @@ -96,7 +94,6 @@ FROM WHERE foo.user_id = cte.user_id; -- cancel at the first copy (push) -SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || :pid || ')'); WITH cte AS ( @@ -124,7 +121,6 @@ FROM WHERE foo.user_id = cte.user_id; -- cancel at the second copy (pull) -SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.onQuery(query="SELECT user_id FROM").cancel(' || :pid || ')'); WITH cte AS ( @@ -152,7 +148,6 @@ FROM WHERE foo.user_id = cte.user_id; -- cancel at the third copy (pull) -SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.onQuery(query="SELECT DISTINCT users_table.user").cancel(' || :pid || ')'); WITH cte AS ( @@ -191,7 +186,6 @@ SELECT * FROM users_table ORDER BY 1, 2; WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *) INSERT INTO users_table SELECT * FROM cte_delete; -- verify contents are the same -SELECT citus.mitmproxy('conn.allow()'); SELECT * FROM users_table ORDER BY 1, 2; -- kill connection during deletion @@ -230,7 +224,14 @@ INSERT INTO users_table SELECT * FROM cte_delete; SELECT citus.mitmproxy('conn.allow()'); SELECT * FROM users_table ORDER BY 1, 2; +-- test sequential delete/insert +SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM").kill()'); +BEGIN; +SET LOCAL citus.multi_shard_modify_mode = 'sequential'; +WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *) +INSERT INTO users_table SELECT * FROM cte_delete; +END; + RESET SEARCH_PATH; +SELECT citus.mitmproxy('conn.allow()'); DROP SCHEMA cte_failure CASCADE; - -