From d26b312cad39360cea2113ac17463d8c73440530 Mon Sep 17 00:00:00 2001 From: Murat Tuncer Date: Thu, 26 Jul 2018 16:16:19 +0300 Subject: [PATCH] Add failure test for coordinator pull/push for cte --- .../regress/expected/failure_cte_subquery.out | 405 ++++++++++++++++++ src/test/regress/failure_schedule | 1 + src/test/regress/sql/failure_cte_subquery.sql | 236 ++++++++++ 3 files changed, 642 insertions(+) create mode 100644 src/test/regress/expected/failure_cte_subquery.out create mode 100644 src/test/regress/sql/failure_cte_subquery.sql diff --git a/src/test/regress/expected/failure_cte_subquery.out b/src/test/regress/expected/failure_cte_subquery.out new file mode 100644 index 000000000..a1bf9cf3b --- /dev/null +++ b/src/test/regress/expected/failure_cte_subquery.out @@ -0,0 +1,405 @@ +CREATE SCHEMA cte_failure; +SET SEARCH_PATH=cte_failure; +SET citus.shard_count to 2; +SET citus.shard_replication_factor to 1; +SELECT pg_backend_pid() as pid \gset +CREATE TABLE users_table (user_id int, user_name text); +CREATE TABLE events_table(user_id int, event_id int, event_type int); +SELECT create_distributed_table('users_table', 'user_id'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT create_distributed_table('events_table', 'user_id'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE users_table_local AS SELECT * FROM users_table; +-- kill at the first copy (push) +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()'); + mitmproxy +----------- + +(1 row) + +WITH cte AS ( + WITH local_cte AS ( + SELECT * FROM users_table_local + ), + dist_cte AS ( + SELECT user_id FROM events_table + ) + SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id +) +SELECT + count(*) +FROM + cte, + (SELECT + DISTINCT users_table.user_id + FROM + users_table, events_table + WHERE + users_table.user_id = events_table.user_id AND + event_type IN (1,2,3,4) + ORDER BY 1 DESC LIMIT 5 + ) as foo + WHERE foo.user_id = cte.user_id; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:57640 +-- kill at the second copy (pull) +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT citus.mitmproxy('conn.onQuery(query="SELECT user_id FROM").kill()'); + mitmproxy +----------- + +(1 row) + +WITH cte AS ( + WITH local_cte AS ( + SELECT * FROM users_table_local + ), + dist_cte AS ( + SELECT user_id FROM events_table + ) + SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id +) +SELECT + count(*) +FROM + cte, + (SELECT + DISTINCT users_table.user_id + FROM + users_table, events_table + WHERE + users_table.user_id = events_table.user_id AND + event_type IN (1,2,3,4) + ORDER BY 1 DESC LIMIT 5 + ) as foo + WHERE foo.user_id = cte.user_id; +WARNING: could not consume data from worker node +WARNING: could not consume data from worker node +WARNING: could not consume data from worker node +ERROR: failed to execute task 2 + +-- kill at the third copy (pull) +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT citus.mitmproxy('conn.onQuery(query="SELECT DISTINCT users_table.user").kill()'); + mitmproxy +----------- + +(1 row) + +WITH cte AS ( + WITH local_cte AS ( + SELECT * FROM users_table_local + ), + dist_cte AS ( + SELECT user_id FROM events_table + ) + SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id +) +SELECT + count(*) +FROM + cte, + (SELECT + DISTINCT users_table.user_id + FROM + users_table, events_table + WHERE + users_table.user_id = events_table.user_id AND + event_type IN (1,2,3,4) + ORDER BY 1 DESC LIMIT 5 + ) as foo + WHERE foo.user_id = cte.user_id; +WARNING: could not consume data from worker node +WARNING: could not consume data from worker node +ERROR: failed to execute task 2 +-- cancel at the first copy (push) +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || :pid || ')'); + mitmproxy +----------- + +(1 row) + +WITH cte AS ( + WITH local_cte AS ( + SELECT * FROM users_table_local + ), + dist_cte AS ( + SELECT user_id FROM events_table + ) + SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id +) +SELECT + count(*) +FROM + cte, + (SELECT + DISTINCT users_table.user_id + FROM + users_table, events_table + WHERE + users_table.user_id = events_table.user_id AND + event_type IN (1,2,3,4) + ORDER BY 1 DESC LIMIT 5 + ) as foo + WHERE foo.user_id = cte.user_id; +ERROR: canceling statement due to user request +-- cancel at the second copy (pull) +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT citus.mitmproxy('conn.onQuery(query="SELECT user_id FROM").cancel(' || :pid || ')'); + mitmproxy +----------- + +(1 row) + +WITH cte AS ( + WITH local_cte AS ( + SELECT * FROM users_table_local + ), + dist_cte AS ( + SELECT user_id FROM events_table + ) + SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id +) +SELECT + count(*) +FROM + cte, + (SELECT + DISTINCT users_table.user_id + FROM + users_table, events_table + WHERE + users_table.user_id = events_table.user_id AND + event_type IN (1,2,3,4) + ORDER BY 1 DESC LIMIT 5 + ) as foo + WHERE foo.user_id = cte.user_id; +ERROR: canceling statement due to user request +-- cancel at the third copy (pull) +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT citus.mitmproxy('conn.onQuery(query="SELECT DISTINCT users_table.user").cancel(' || :pid || ')'); + mitmproxy +----------- + +(1 row) + +WITH cte AS ( + WITH local_cte AS ( + SELECT * FROM users_table_local + ), + dist_cte AS ( + SELECT user_id FROM events_table + ) + SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id +) +SELECT + count(*) +FROM + cte, + (SELECT + DISTINCT users_table.user_id + FROM + users_table, events_table + WHERE + users_table.user_id = events_table.user_id AND + event_type IN (1,2,3,4) + ORDER BY 1 DESC LIMIT 5 + ) as foo + WHERE foo.user_id = cte.user_id; +ERROR: canceling statement due to user request +-- distributed update tests +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +-- insert some rows +INSERT INTO users_table VALUES (1, 'A'), (2, 'B'), (3, 'C'), (4, 'D'), (5, 'E'); +INSERT INTO events_table VALUES (1,1,1), (1,2,1), (1,3,1), (2,1, 4), (3, 4,1), (5, 1, 2), (5, 2, 1), (5, 2,2); +SELECT * FROM users_table ORDER BY 1, 2; + user_id | user_name +---------+----------- + 1 | A + 2 | B + 3 | C + 4 | D + 5 | E +(5 rows) + +-- following will delete and insert the same rows +WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *) +INSERT INTO users_table SELECT * FROM cte_delete; +-- verify contents are the same +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT * FROM users_table ORDER BY 1, 2; + user_id | user_name +---------+----------- + 1 | A + 2 | B + 3 | C + 4 | D + 5 | E +(5 rows) + +-- kill connection during deletion +SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM").kill()'); + mitmproxy +----------- + +(1 row) + +WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *) +INSERT INTO users_table SELECT * FROM cte_delete; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:57640 +-- verify contents are the same +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT * FROM users_table ORDER BY 1, 2; + user_id | user_name +---------+----------- + 1 | A + 2 | B + 3 | C + 4 | D + 5 | E +(5 rows) + +-- kill connection during insert +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()'); + mitmproxy +----------- + +(1 row) + +WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *) +INSERT INTO users_table SELECT * FROM cte_delete; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:57640 +-- verify contents are the same +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT * FROM users_table ORDER BY 1, 2; + user_id | user_name +---------+----------- + 1 | A + 2 | B + 3 | C + 4 | D + 5 | E +(5 rows) + +-- cancel during deletion +SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM").cancel(' || :pid || ')'); + mitmproxy +----------- + +(1 row) + +WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *) +INSERT INTO users_table SELECT * FROM cte_delete; +ERROR: canceling statement due to user request +-- verify contents are the same +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT * FROM users_table ORDER BY 1, 2; + user_id | user_name +---------+----------- + 1 | A + 2 | B + 3 | C + 4 | D + 5 | E +(5 rows) + +-- cancel during insert +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || :pid || ')'); + mitmproxy +----------- + +(1 row) + +WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *) +INSERT INTO users_table SELECT * FROM cte_delete; +ERROR: canceling statement due to user request +-- verify contents are the same +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT * FROM users_table ORDER BY 1, 2; + user_id | user_name +---------+----------- + 1 | A + 2 | B + 3 | C + 4 | D + 5 | E +(5 rows) + +RESET SEARCH_PATH; +DROP SCHEMA cte_failure CASCADE; +NOTICE: drop cascades to 3 other objects +DETAIL: drop cascades to table cte_failure.users_table +drop cascades to table cte_failure.events_table +drop cascades to table cte_failure.users_table_local diff --git a/src/test/regress/failure_schedule b/src/test/regress/failure_schedule index 4aeb9344b..a05f36a1d 100644 --- a/src/test/regress/failure_schedule +++ b/src/test/regress/failure_schedule @@ -18,3 +18,4 @@ test: failure_create_table test: failure_1pc_copy_hash test: failure_1pc_copy_append test: failure_multi_shard_update_delete +test: failure_cte_subquery diff --git a/src/test/regress/sql/failure_cte_subquery.sql b/src/test/regress/sql/failure_cte_subquery.sql new file mode 100644 index 000000000..8ae1ff948 --- /dev/null +++ b/src/test/regress/sql/failure_cte_subquery.sql @@ -0,0 +1,236 @@ + +CREATE SCHEMA cte_failure; +SET SEARCH_PATH=cte_failure; +SET citus.shard_count to 2; +SET citus.shard_replication_factor to 1; + +SELECT pg_backend_pid() as pid \gset + +CREATE TABLE users_table (user_id int, user_name text); +CREATE TABLE events_table(user_id int, event_id int, event_type int); +SELECT create_distributed_table('users_table', 'user_id'); +SELECT create_distributed_table('events_table', 'user_id'); +CREATE TABLE users_table_local AS SELECT * FROM users_table; + +-- kill at the first copy (push) +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()'); + +WITH cte AS ( + WITH local_cte AS ( + SELECT * FROM users_table_local + ), + dist_cte AS ( + SELECT user_id FROM events_table + ) + SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id +) +SELECT + count(*) +FROM + cte, + (SELECT + DISTINCT users_table.user_id + FROM + users_table, events_table + WHERE + users_table.user_id = events_table.user_id AND + event_type IN (1,2,3,4) + ORDER BY 1 DESC LIMIT 5 + ) as foo + WHERE foo.user_id = cte.user_id; + +-- kill at the second copy (pull) +SELECT citus.mitmproxy('conn.allow()'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT user_id FROM").kill()'); + +WITH cte AS ( + WITH local_cte AS ( + SELECT * FROM users_table_local + ), + dist_cte AS ( + SELECT user_id FROM events_table + ) + SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id +) +SELECT + count(*) +FROM + cte, + (SELECT + DISTINCT users_table.user_id + FROM + users_table, events_table + WHERE + users_table.user_id = events_table.user_id AND + event_type IN (1,2,3,4) + ORDER BY 1 DESC LIMIT 5 + ) as foo + WHERE foo.user_id = cte.user_id; + +-- kill at the third copy (pull) +SELECT citus.mitmproxy('conn.allow()'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT DISTINCT users_table.user").kill()'); + +WITH cte AS ( + WITH local_cte AS ( + SELECT * FROM users_table_local + ), + dist_cte AS ( + SELECT user_id FROM events_table + ) + SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id +) +SELECT + count(*) +FROM + cte, + (SELECT + DISTINCT users_table.user_id + FROM + users_table, events_table + WHERE + users_table.user_id = events_table.user_id AND + event_type IN (1,2,3,4) + ORDER BY 1 DESC LIMIT 5 + ) as foo + WHERE foo.user_id = cte.user_id; + +-- cancel at the first copy (push) +SELECT citus.mitmproxy('conn.allow()'); +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || :pid || ')'); + +WITH cte AS ( + WITH local_cte AS ( + SELECT * FROM users_table_local + ), + dist_cte AS ( + SELECT user_id FROM events_table + ) + SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id +) +SELECT + count(*) +FROM + cte, + (SELECT + DISTINCT users_table.user_id + FROM + users_table, events_table + WHERE + users_table.user_id = events_table.user_id AND + event_type IN (1,2,3,4) + ORDER BY 1 DESC LIMIT 5 + ) as foo + WHERE foo.user_id = cte.user_id; + +-- cancel at the second copy (pull) +SELECT citus.mitmproxy('conn.allow()'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT user_id FROM").cancel(' || :pid || ')'); + +WITH cte AS ( + WITH local_cte AS ( + SELECT * FROM users_table_local + ), + dist_cte AS ( + SELECT user_id FROM events_table + ) + SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id +) +SELECT + count(*) +FROM + cte, + (SELECT + DISTINCT users_table.user_id + FROM + users_table, events_table + WHERE + users_table.user_id = events_table.user_id AND + event_type IN (1,2,3,4) + ORDER BY 1 DESC LIMIT 5 + ) as foo + WHERE foo.user_id = cte.user_id; + +-- cancel at the third copy (pull) +SELECT citus.mitmproxy('conn.allow()'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT DISTINCT users_table.user").cancel(' || :pid || ')'); + +WITH cte AS ( + WITH local_cte AS ( + SELECT * FROM users_table_local + ), + dist_cte AS ( + SELECT user_id FROM events_table + ) + SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id +) +SELECT + count(*) +FROM + cte, + (SELECT + DISTINCT users_table.user_id + FROM + users_table, events_table + WHERE + users_table.user_id = events_table.user_id AND + event_type IN (1,2,3,4) + ORDER BY 1 DESC LIMIT 5 + ) as foo + WHERE foo.user_id = cte.user_id; + +-- distributed update tests +SELECT citus.mitmproxy('conn.allow()'); + +-- insert some rows +INSERT INTO users_table VALUES (1, 'A'), (2, 'B'), (3, 'C'), (4, 'D'), (5, 'E'); +INSERT INTO events_table VALUES (1,1,1), (1,2,1), (1,3,1), (2,1, 4), (3, 4,1), (5, 1, 2), (5, 2, 1), (5, 2,2); + +SELECT * FROM users_table ORDER BY 1, 2; +-- following will delete and insert the same rows +WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *) +INSERT INTO users_table SELECT * FROM cte_delete; +-- verify contents are the same +SELECT citus.mitmproxy('conn.allow()'); +SELECT * FROM users_table ORDER BY 1, 2; + +-- kill connection during deletion +SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM").kill()'); +WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *) +INSERT INTO users_table SELECT * FROM cte_delete; + +-- verify contents are the same +SELECT citus.mitmproxy('conn.allow()'); +SELECT * FROM users_table ORDER BY 1, 2; + +-- kill connection during insert +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()'); +WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *) +INSERT INTO users_table SELECT * FROM cte_delete; + +-- verify contents are the same +SELECT citus.mitmproxy('conn.allow()'); +SELECT * FROM users_table ORDER BY 1, 2; + +-- cancel during deletion +SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM").cancel(' || :pid || ')'); +WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *) +INSERT INTO users_table SELECT * FROM cte_delete; + +-- verify contents are the same +SELECT citus.mitmproxy('conn.allow()'); +SELECT * FROM users_table ORDER BY 1, 2; + +-- cancel during insert +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || :pid || ')'); +WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *) +INSERT INTO users_table SELECT * FROM cte_delete; + +-- verify contents are the same +SELECT citus.mitmproxy('conn.allow()'); +SELECT * FROM users_table ORDER BY 1, 2; + +RESET SEARCH_PATH; +DROP SCHEMA cte_failure CASCADE; + +