mirror of https://github.com/citusdata/citus.git
Merge pull request #2426 from citusdata/failure_pull_push_insert_select
Add failure tests for insert/select via coordinatorpull/2428/head
commit
24e247c1b9
|
@ -0,0 +1,229 @@
|
||||||
|
--
|
||||||
|
-- failure_insert_select_via_coordinator
|
||||||
|
--
|
||||||
|
-- performs failure/cancellation test for insert/select executed by coordinator.
|
||||||
|
-- test for insert using CTEs are done in failure_cte_subquery, not repeating them here
|
||||||
|
--
|
||||||
|
CREATE SCHEMA coordinator_insert_select;
|
||||||
|
SET SEARCH_PATH=coordinator_insert_select;
|
||||||
|
SET citus.shard_count to 2;
|
||||||
|
SET citus.shard_replication_factor to 1;
|
||||||
|
SELECT pg_backend_pid() as pid \gset
|
||||||
|
CREATE TABLE events_table(user_id int, event_id int, event_type int);
|
||||||
|
CREATE TABLE events_summary(event_id int, event_type int, event_count int);
|
||||||
|
CREATE TABLE events_reference(event_type int, event_count int);
|
||||||
|
CREATE TABLE events_reference_distributed(event_type int, event_count int);
|
||||||
|
SELECT create_distributed_table('events_table', 'user_id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT create_distributed_table('events_summary', 'event_id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT create_reference_table('events_reference');
|
||||||
|
create_reference_table
|
||||||
|
------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT create_distributed_table('events_reference_distributed', 'event_type');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO events_table VALUES (1, 1, 3 ), (1, 2, 1), (1, 3, 2), (2, 4, 3), (3, 5, 1), (4, 7, 1), (4, 1, 9), (4, 3, 2);
|
||||||
|
SELECT count(*) FROM events_summary;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- insert/select from one distributed table to another
|
||||||
|
-- kill coordinator pull query
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO events_summary SELECT event_id, event_type, count(*) FROM events_table GROUP BY 1,2;
|
||||||
|
WARNING: could not consume data from worker node
|
||||||
|
WARNING: could not consume data from worker node
|
||||||
|
WARNING: could not consume data from worker node
|
||||||
|
ERROR: failed to execute task 1
|
||||||
|
-- kill data push
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^COPY coordinator_insert_select").kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO events_summary SELECT event_id, event_type, count(*) FROM events_table GROUP BY 1,2;
|
||||||
|
ERROR: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: while executing command on localhost:9060
|
||||||
|
-- cancel coordinator pull query
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || :pid || ')');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO events_summary SELECT event_id, event_type, count(*) FROM events_table GROUP BY 1,2;
|
||||||
|
ERROR: canceling statement due to user request
|
||||||
|
-- cancel data push
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^COPY coordinator_insert_select").cancel(' || :pid || ')');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO events_summary SELECT event_id, event_type, count(*) FROM events_table GROUP BY 1,2;
|
||||||
|
ERROR: canceling statement due to user request
|
||||||
|
--verify nothing is modified
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM events_summary;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- insert into reference table from a distributed table
|
||||||
|
-- kill coordinator pull query
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO events_reference SELECT event_type, count(*) FROM events_table GROUP BY 1;
|
||||||
|
WARNING: could not consume data from worker node
|
||||||
|
WARNING: could not consume data from worker node
|
||||||
|
WARNING: could not consume data from worker node
|
||||||
|
ERROR: failed to execute task 1
|
||||||
|
-- kill data push
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^COPY coordinator_insert_select").kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO events_reference SELECT event_type, count(*) FROM events_table GROUP BY 1;
|
||||||
|
ERROR: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: while executing command on localhost:9060
|
||||||
|
-- cancel coordinator pull query
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || :pid || ')');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO events_reference SELECT event_type, count(*) FROM events_table GROUP BY 1;
|
||||||
|
ERROR: canceling statement due to user request
|
||||||
|
-- cancel data push
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^COPY coordinator_insert_select").cancel(' || :pid || ')');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO events_reference SELECT event_type, count(*) FROM events_table GROUP BY 1;
|
||||||
|
ERROR: canceling statement due to user request
|
||||||
|
--verify nothing is modified
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM events_reference;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- insert/select from reference table to distributed
|
||||||
|
-- fill up reference table first
|
||||||
|
INSERT INTO events_reference SELECT event_type, count(*) FROM events_table GROUP BY 1;
|
||||||
|
-- kill coordinator pull query
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO events_reference_distributed SELECT * FROM events_reference;
|
||||||
|
ERROR: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: while executing command on localhost:9060
|
||||||
|
-- kill data push
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^COPY coordinator_insert_select").kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO events_reference_distributed SELECT * FROM events_reference;
|
||||||
|
ERROR: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: while executing command on localhost:9060
|
||||||
|
-- cancel coordinator pull query
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || :pid || ')');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO events_reference_distributed SELECT * FROM events_reference;
|
||||||
|
ERROR: canceling statement due to user request
|
||||||
|
-- cancel data push
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^COPY coordinator_insert_select").cancel(' || :pid || ')');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO events_reference_distributed SELECT * FROM events_reference;
|
||||||
|
ERROR: canceling statement due to user request
|
||||||
|
--verify nothing is modified
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM events_reference_distributed;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
RESET SEARCH_PATH;
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP SCHEMA coordinator_insert_select CASCADE;
|
||||||
|
NOTICE: drop cascades to 4 other objects
|
||||||
|
DETAIL: drop cascades to table coordinator_insert_select.events_table
|
||||||
|
drop cascades to table coordinator_insert_select.events_summary
|
||||||
|
drop cascades to table coordinator_insert_select.events_reference
|
||||||
|
drop cascades to table coordinator_insert_select.events_reference_distributed
|
|
@ -19,3 +19,4 @@ test: failure_1pc_copy_hash
|
||||||
test: failure_1pc_copy_append
|
test: failure_1pc_copy_append
|
||||||
test: failure_multi_shard_update_delete
|
test: failure_multi_shard_update_delete
|
||||||
test: failure_cte_subquery
|
test: failure_cte_subquery
|
||||||
|
test: failure_insert_select_via_coordinator
|
||||||
|
|
|
@ -0,0 +1,97 @@
|
||||||
|
--
|
||||||
|
-- failure_insert_select_via_coordinator
|
||||||
|
--
|
||||||
|
-- performs failure/cancellation test for insert/select executed by coordinator.
|
||||||
|
-- test for insert using CTEs are done in failure_cte_subquery, not repeating them here
|
||||||
|
--
|
||||||
|
|
||||||
|
CREATE SCHEMA coordinator_insert_select;
|
||||||
|
SET SEARCH_PATH=coordinator_insert_select;
|
||||||
|
SET citus.shard_count to 2;
|
||||||
|
SET citus.shard_replication_factor to 1;
|
||||||
|
SELECT pg_backend_pid() as pid \gset
|
||||||
|
|
||||||
|
CREATE TABLE events_table(user_id int, event_id int, event_type int);
|
||||||
|
CREATE TABLE events_summary(event_id int, event_type int, event_count int);
|
||||||
|
CREATE TABLE events_reference(event_type int, event_count int);
|
||||||
|
CREATE TABLE events_reference_distributed(event_type int, event_count int);
|
||||||
|
SELECT create_distributed_table('events_table', 'user_id');
|
||||||
|
SELECT create_distributed_table('events_summary', 'event_id');
|
||||||
|
SELECT create_reference_table('events_reference');
|
||||||
|
SELECT create_distributed_table('events_reference_distributed', 'event_type');
|
||||||
|
|
||||||
|
INSERT INTO events_table VALUES (1, 1, 3 ), (1, 2, 1), (1, 3, 2), (2, 4, 3), (3, 5, 1), (4, 7, 1), (4, 1, 9), (4, 3, 2);
|
||||||
|
|
||||||
|
SELECT count(*) FROM events_summary;
|
||||||
|
|
||||||
|
-- insert/select from one distributed table to another
|
||||||
|
|
||||||
|
-- kill coordinator pull query
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()');
|
||||||
|
INSERT INTO events_summary SELECT event_id, event_type, count(*) FROM events_table GROUP BY 1,2;
|
||||||
|
|
||||||
|
-- kill data push
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^COPY coordinator_insert_select").kill()');
|
||||||
|
INSERT INTO events_summary SELECT event_id, event_type, count(*) FROM events_table GROUP BY 1,2;
|
||||||
|
|
||||||
|
-- cancel coordinator pull query
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || :pid || ')');
|
||||||
|
INSERT INTO events_summary SELECT event_id, event_type, count(*) FROM events_table GROUP BY 1,2;
|
||||||
|
|
||||||
|
-- cancel data push
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^COPY coordinator_insert_select").cancel(' || :pid || ')');
|
||||||
|
INSERT INTO events_summary SELECT event_id, event_type, count(*) FROM events_table GROUP BY 1,2;
|
||||||
|
|
||||||
|
--verify nothing is modified
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
SELECT count(*) FROM events_summary;
|
||||||
|
|
||||||
|
-- insert into reference table from a distributed table
|
||||||
|
-- kill coordinator pull query
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()');
|
||||||
|
INSERT INTO events_reference SELECT event_type, count(*) FROM events_table GROUP BY 1;
|
||||||
|
|
||||||
|
-- kill data push
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^COPY coordinator_insert_select").kill()');
|
||||||
|
INSERT INTO events_reference SELECT event_type, count(*) FROM events_table GROUP BY 1;
|
||||||
|
|
||||||
|
-- cancel coordinator pull query
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || :pid || ')');
|
||||||
|
INSERT INTO events_reference SELECT event_type, count(*) FROM events_table GROUP BY 1;
|
||||||
|
|
||||||
|
-- cancel data push
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^COPY coordinator_insert_select").cancel(' || :pid || ')');
|
||||||
|
INSERT INTO events_reference SELECT event_type, count(*) FROM events_table GROUP BY 1;
|
||||||
|
|
||||||
|
--verify nothing is modified
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
SELECT count(*) FROM events_reference;
|
||||||
|
|
||||||
|
-- insert/select from reference table to distributed
|
||||||
|
|
||||||
|
-- fill up reference table first
|
||||||
|
INSERT INTO events_reference SELECT event_type, count(*) FROM events_table GROUP BY 1;
|
||||||
|
|
||||||
|
-- kill coordinator pull query
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()');
|
||||||
|
INSERT INTO events_reference_distributed SELECT * FROM events_reference;
|
||||||
|
|
||||||
|
-- kill data push
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^COPY coordinator_insert_select").kill()');
|
||||||
|
INSERT INTO events_reference_distributed SELECT * FROM events_reference;
|
||||||
|
|
||||||
|
-- cancel coordinator pull query
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || :pid || ')');
|
||||||
|
INSERT INTO events_reference_distributed SELECT * FROM events_reference;
|
||||||
|
|
||||||
|
-- cancel data push
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^COPY coordinator_insert_select").cancel(' || :pid || ')');
|
||||||
|
INSERT INTO events_reference_distributed SELECT * FROM events_reference;
|
||||||
|
|
||||||
|
--verify nothing is modified
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
SELECT count(*) FROM events_reference_distributed;
|
||||||
|
|
||||||
|
RESET SEARCH_PATH;
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
DROP SCHEMA coordinator_insert_select CASCADE;
|
Loading…
Reference in New Issue