diff --git a/src/test/regress/expected/failure_insert_select_via_coordinator.out b/src/test/regress/expected/failure_insert_select_via_coordinator.out new file mode 100644 index 000000000..983dfe3e3 --- /dev/null +++ b/src/test/regress/expected/failure_insert_select_via_coordinator.out @@ -0,0 +1,229 @@ +-- +-- failure_insert_select_via_coordinator +-- +-- performs failure/cancellation test for insert/select executed by coordinator. +-- test for insert using CTEs are done in failure_cte_subquery, not repeating them here +-- +CREATE SCHEMA coordinator_insert_select; +SET SEARCH_PATH=coordinator_insert_select; +SET citus.shard_count to 2; +SET citus.shard_replication_factor to 1; +SELECT pg_backend_pid() as pid \gset +CREATE TABLE events_table(user_id int, event_id int, event_type int); +CREATE TABLE events_summary(event_id int, event_type int, event_count int); +CREATE TABLE events_reference(event_type int, event_count int); +CREATE TABLE events_reference_distributed(event_type int, event_count int); +SELECT create_distributed_table('events_table', 'user_id'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT create_distributed_table('events_summary', 'event_id'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT create_reference_table('events_reference'); + create_reference_table +------------------------ + +(1 row) + +SELECT create_distributed_table('events_reference_distributed', 'event_type'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO events_table VALUES (1, 1, 3 ), (1, 2, 1), (1, 3, 2), (2, 4, 3), (3, 5, 1), (4, 7, 1), (4, 1, 9), (4, 3, 2); +SELECT count(*) FROM events_summary; + count +------- + 0 +(1 row) + +-- insert/select from one distributed table to another +-- kill coordinator pull query +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()'); + mitmproxy +----------- + +(1 row) + +INSERT INTO events_summary SELECT event_id, event_type, count(*) FROM events_table GROUP BY 1,2; +WARNING: could not consume data from worker node +WARNING: could not consume data from worker node +WARNING: could not consume data from worker node +ERROR: failed to execute task 1 +-- kill data push +SELECT citus.mitmproxy('conn.onQuery(query="^COPY coordinator_insert_select").kill()'); + mitmproxy +----------- + +(1 row) + +INSERT INTO events_summary SELECT event_id, event_type, count(*) FROM events_table GROUP BY 1,2; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +-- cancel coordinator pull query +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || :pid || ')'); + mitmproxy +----------- + +(1 row) + +INSERT INTO events_summary SELECT event_id, event_type, count(*) FROM events_table GROUP BY 1,2; +ERROR: canceling statement due to user request +-- cancel data push +SELECT citus.mitmproxy('conn.onQuery(query="^COPY coordinator_insert_select").cancel(' || :pid || ')'); + mitmproxy +----------- + +(1 row) + +INSERT INTO events_summary SELECT event_id, event_type, count(*) FROM events_table GROUP BY 1,2; +ERROR: canceling statement due to user request +--verify nothing is modified +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT count(*) FROM events_summary; + count +------- + 0 +(1 row) + +-- insert into reference table from a distributed table +-- kill coordinator pull query +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()'); + mitmproxy +----------- + +(1 row) + +INSERT INTO events_reference SELECT event_type, count(*) FROM events_table GROUP BY 1; +WARNING: could not consume data from worker node +WARNING: could not consume data from worker node +WARNING: could not consume data from worker node +ERROR: failed to execute task 1 +-- kill data push +SELECT citus.mitmproxy('conn.onQuery(query="^COPY coordinator_insert_select").kill()'); + mitmproxy +----------- + +(1 row) + +INSERT INTO events_reference SELECT event_type, count(*) FROM events_table GROUP BY 1; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +-- cancel coordinator pull query +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || :pid || ')'); + mitmproxy +----------- + +(1 row) + +INSERT INTO events_reference SELECT event_type, count(*) FROM events_table GROUP BY 1; +ERROR: canceling statement due to user request +-- cancel data push +SELECT citus.mitmproxy('conn.onQuery(query="^COPY coordinator_insert_select").cancel(' || :pid || ')'); + mitmproxy +----------- + +(1 row) + +INSERT INTO events_reference SELECT event_type, count(*) FROM events_table GROUP BY 1; +ERROR: canceling statement due to user request +--verify nothing is modified +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT count(*) FROM events_reference; + count +------- + 0 +(1 row) + +-- insert/select from reference table to distributed +-- fill up reference table first +INSERT INTO events_reference SELECT event_type, count(*) FROM events_table GROUP BY 1; +-- kill coordinator pull query +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()'); + mitmproxy +----------- + +(1 row) + +INSERT INTO events_reference_distributed SELECT * FROM events_reference; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +-- kill data push +SELECT citus.mitmproxy('conn.onQuery(query="^COPY coordinator_insert_select").kill()'); + mitmproxy +----------- + +(1 row) + +INSERT INTO events_reference_distributed SELECT * FROM events_reference; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +-- cancel coordinator pull query +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || :pid || ')'); + mitmproxy +----------- + +(1 row) + +INSERT INTO events_reference_distributed SELECT * FROM events_reference; +ERROR: canceling statement due to user request +-- cancel data push +SELECT citus.mitmproxy('conn.onQuery(query="^COPY coordinator_insert_select").cancel(' || :pid || ')'); + mitmproxy +----------- + +(1 row) + +INSERT INTO events_reference_distributed SELECT * FROM events_reference; +ERROR: canceling statement due to user request +--verify nothing is modified +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT count(*) FROM events_reference_distributed; + count +------- + 0 +(1 row) + +RESET SEARCH_PATH; +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +DROP SCHEMA coordinator_insert_select CASCADE; +NOTICE: drop cascades to 4 other objects +DETAIL: drop cascades to table coordinator_insert_select.events_table +drop cascades to table coordinator_insert_select.events_summary +drop cascades to table coordinator_insert_select.events_reference +drop cascades to table coordinator_insert_select.events_reference_distributed diff --git a/src/test/regress/failure_schedule b/src/test/regress/failure_schedule index a05f36a1d..2cefc7ba9 100644 --- a/src/test/regress/failure_schedule +++ b/src/test/regress/failure_schedule @@ -19,3 +19,4 @@ test: failure_1pc_copy_hash test: failure_1pc_copy_append test: failure_multi_shard_update_delete test: failure_cte_subquery +test: failure_insert_select_via_coordinator diff --git a/src/test/regress/sql/failure_insert_select_via_coordinator.sql b/src/test/regress/sql/failure_insert_select_via_coordinator.sql new file mode 100644 index 000000000..1d836b6c1 --- /dev/null +++ b/src/test/regress/sql/failure_insert_select_via_coordinator.sql @@ -0,0 +1,97 @@ +-- +-- failure_insert_select_via_coordinator +-- +-- performs failure/cancellation test for insert/select executed by coordinator. +-- test for insert using CTEs are done in failure_cte_subquery, not repeating them here +-- + +CREATE SCHEMA coordinator_insert_select; +SET SEARCH_PATH=coordinator_insert_select; +SET citus.shard_count to 2; +SET citus.shard_replication_factor to 1; +SELECT pg_backend_pid() as pid \gset + +CREATE TABLE events_table(user_id int, event_id int, event_type int); +CREATE TABLE events_summary(event_id int, event_type int, event_count int); +CREATE TABLE events_reference(event_type int, event_count int); +CREATE TABLE events_reference_distributed(event_type int, event_count int); +SELECT create_distributed_table('events_table', 'user_id'); +SELECT create_distributed_table('events_summary', 'event_id'); +SELECT create_reference_table('events_reference'); +SELECT create_distributed_table('events_reference_distributed', 'event_type'); + +INSERT INTO events_table VALUES (1, 1, 3 ), (1, 2, 1), (1, 3, 2), (2, 4, 3), (3, 5, 1), (4, 7, 1), (4, 1, 9), (4, 3, 2); + +SELECT count(*) FROM events_summary; + +-- insert/select from one distributed table to another + +-- kill coordinator pull query +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()'); +INSERT INTO events_summary SELECT event_id, event_type, count(*) FROM events_table GROUP BY 1,2; + +-- kill data push +SELECT citus.mitmproxy('conn.onQuery(query="^COPY coordinator_insert_select").kill()'); +INSERT INTO events_summary SELECT event_id, event_type, count(*) FROM events_table GROUP BY 1,2; + +-- cancel coordinator pull query +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || :pid || ')'); +INSERT INTO events_summary SELECT event_id, event_type, count(*) FROM events_table GROUP BY 1,2; + +-- cancel data push +SELECT citus.mitmproxy('conn.onQuery(query="^COPY coordinator_insert_select").cancel(' || :pid || ')'); +INSERT INTO events_summary SELECT event_id, event_type, count(*) FROM events_table GROUP BY 1,2; + +--verify nothing is modified +SELECT citus.mitmproxy('conn.allow()'); +SELECT count(*) FROM events_summary; + +-- insert into reference table from a distributed table +-- kill coordinator pull query +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()'); +INSERT INTO events_reference SELECT event_type, count(*) FROM events_table GROUP BY 1; + +-- kill data push +SELECT citus.mitmproxy('conn.onQuery(query="^COPY coordinator_insert_select").kill()'); +INSERT INTO events_reference SELECT event_type, count(*) FROM events_table GROUP BY 1; + +-- cancel coordinator pull query +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || :pid || ')'); +INSERT INTO events_reference SELECT event_type, count(*) FROM events_table GROUP BY 1; + +-- cancel data push +SELECT citus.mitmproxy('conn.onQuery(query="^COPY coordinator_insert_select").cancel(' || :pid || ')'); +INSERT INTO events_reference SELECT event_type, count(*) FROM events_table GROUP BY 1; + +--verify nothing is modified +SELECT citus.mitmproxy('conn.allow()'); +SELECT count(*) FROM events_reference; + +-- insert/select from reference table to distributed + +-- fill up reference table first +INSERT INTO events_reference SELECT event_type, count(*) FROM events_table GROUP BY 1; + +-- kill coordinator pull query +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()'); +INSERT INTO events_reference_distributed SELECT * FROM events_reference; + +-- kill data push +SELECT citus.mitmproxy('conn.onQuery(query="^COPY coordinator_insert_select").kill()'); +INSERT INTO events_reference_distributed SELECT * FROM events_reference; + +-- cancel coordinator pull query +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || :pid || ')'); +INSERT INTO events_reference_distributed SELECT * FROM events_reference; + +-- cancel data push +SELECT citus.mitmproxy('conn.onQuery(query="^COPY coordinator_insert_select").cancel(' || :pid || ')'); +INSERT INTO events_reference_distributed SELECT * FROM events_reference; + +--verify nothing is modified +SELECT citus.mitmproxy('conn.allow()'); +SELECT count(*) FROM events_reference_distributed; + +RESET SEARCH_PATH; +SELECT citus.mitmproxy('conn.allow()'); +DROP SCHEMA coordinator_insert_select CASCADE;