mirror of https://github.com/citusdata/citus.git
Add failure test for coordinator pull/push for cte
parent
6c66033455
commit
d26b312cad
|
@ -0,0 +1,405 @@
|
|||
CREATE SCHEMA cte_failure;
|
||||
SET SEARCH_PATH=cte_failure;
|
||||
SET citus.shard_count to 2;
|
||||
SET citus.shard_replication_factor to 1;
|
||||
SELECT pg_backend_pid() as pid \gset
|
||||
CREATE TABLE users_table (user_id int, user_name text);
|
||||
CREATE TABLE events_table(user_id int, event_id int, event_type int);
|
||||
SELECT create_distributed_table('users_table', 'user_id');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('events_table', 'user_id');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE users_table_local AS SELECT * FROM users_table;
|
||||
-- kill at the first copy (push)
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
WITH cte AS (
|
||||
WITH local_cte AS (
|
||||
SELECT * FROM users_table_local
|
||||
),
|
||||
dist_cte AS (
|
||||
SELECT user_id FROM events_table
|
||||
)
|
||||
SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id
|
||||
)
|
||||
SELECT
|
||||
count(*)
|
||||
FROM
|
||||
cte,
|
||||
(SELECT
|
||||
DISTINCT users_table.user_id
|
||||
FROM
|
||||
users_table, events_table
|
||||
WHERE
|
||||
users_table.user_id = events_table.user_id AND
|
||||
event_type IN (1,2,3,4)
|
||||
ORDER BY 1 DESC LIMIT 5
|
||||
) as foo
|
||||
WHERE foo.user_id = cte.user_id;
|
||||
ERROR: server closed the connection unexpectedly
|
||||
This probably means the server terminated abnormally
|
||||
before or while processing the request.
|
||||
CONTEXT: while executing command on localhost:57640
|
||||
-- kill at the second copy (pull)
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="SELECT user_id FROM").kill()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
WITH cte AS (
|
||||
WITH local_cte AS (
|
||||
SELECT * FROM users_table_local
|
||||
),
|
||||
dist_cte AS (
|
||||
SELECT user_id FROM events_table
|
||||
)
|
||||
SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id
|
||||
)
|
||||
SELECT
|
||||
count(*)
|
||||
FROM
|
||||
cte,
|
||||
(SELECT
|
||||
DISTINCT users_table.user_id
|
||||
FROM
|
||||
users_table, events_table
|
||||
WHERE
|
||||
users_table.user_id = events_table.user_id AND
|
||||
event_type IN (1,2,3,4)
|
||||
ORDER BY 1 DESC LIMIT 5
|
||||
) as foo
|
||||
WHERE foo.user_id = cte.user_id;
|
||||
WARNING: could not consume data from worker node
|
||||
WARNING: could not consume data from worker node
|
||||
WARNING: could not consume data from worker node
|
||||
ERROR: failed to execute task 2
|
||||
|
||||
-- kill at the third copy (pull)
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="SELECT DISTINCT users_table.user").kill()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
WITH cte AS (
|
||||
WITH local_cte AS (
|
||||
SELECT * FROM users_table_local
|
||||
),
|
||||
dist_cte AS (
|
||||
SELECT user_id FROM events_table
|
||||
)
|
||||
SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id
|
||||
)
|
||||
SELECT
|
||||
count(*)
|
||||
FROM
|
||||
cte,
|
||||
(SELECT
|
||||
DISTINCT users_table.user_id
|
||||
FROM
|
||||
users_table, events_table
|
||||
WHERE
|
||||
users_table.user_id = events_table.user_id AND
|
||||
event_type IN (1,2,3,4)
|
||||
ORDER BY 1 DESC LIMIT 5
|
||||
) as foo
|
||||
WHERE foo.user_id = cte.user_id;
|
||||
WARNING: could not consume data from worker node
|
||||
WARNING: could not consume data from worker node
|
||||
ERROR: failed to execute task 2
|
||||
-- cancel at the first copy (push)
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || :pid || ')');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
WITH cte AS (
|
||||
WITH local_cte AS (
|
||||
SELECT * FROM users_table_local
|
||||
),
|
||||
dist_cte AS (
|
||||
SELECT user_id FROM events_table
|
||||
)
|
||||
SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id
|
||||
)
|
||||
SELECT
|
||||
count(*)
|
||||
FROM
|
||||
cte,
|
||||
(SELECT
|
||||
DISTINCT users_table.user_id
|
||||
FROM
|
||||
users_table, events_table
|
||||
WHERE
|
||||
users_table.user_id = events_table.user_id AND
|
||||
event_type IN (1,2,3,4)
|
||||
ORDER BY 1 DESC LIMIT 5
|
||||
) as foo
|
||||
WHERE foo.user_id = cte.user_id;
|
||||
ERROR: canceling statement due to user request
|
||||
-- cancel at the second copy (pull)
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="SELECT user_id FROM").cancel(' || :pid || ')');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
WITH cte AS (
|
||||
WITH local_cte AS (
|
||||
SELECT * FROM users_table_local
|
||||
),
|
||||
dist_cte AS (
|
||||
SELECT user_id FROM events_table
|
||||
)
|
||||
SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id
|
||||
)
|
||||
SELECT
|
||||
count(*)
|
||||
FROM
|
||||
cte,
|
||||
(SELECT
|
||||
DISTINCT users_table.user_id
|
||||
FROM
|
||||
users_table, events_table
|
||||
WHERE
|
||||
users_table.user_id = events_table.user_id AND
|
||||
event_type IN (1,2,3,4)
|
||||
ORDER BY 1 DESC LIMIT 5
|
||||
) as foo
|
||||
WHERE foo.user_id = cte.user_id;
|
||||
ERROR: canceling statement due to user request
|
||||
-- cancel at the third copy (pull)
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="SELECT DISTINCT users_table.user").cancel(' || :pid || ')');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
WITH cte AS (
|
||||
WITH local_cte AS (
|
||||
SELECT * FROM users_table_local
|
||||
),
|
||||
dist_cte AS (
|
||||
SELECT user_id FROM events_table
|
||||
)
|
||||
SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id
|
||||
)
|
||||
SELECT
|
||||
count(*)
|
||||
FROM
|
||||
cte,
|
||||
(SELECT
|
||||
DISTINCT users_table.user_id
|
||||
FROM
|
||||
users_table, events_table
|
||||
WHERE
|
||||
users_table.user_id = events_table.user_id AND
|
||||
event_type IN (1,2,3,4)
|
||||
ORDER BY 1 DESC LIMIT 5
|
||||
) as foo
|
||||
WHERE foo.user_id = cte.user_id;
|
||||
ERROR: canceling statement due to user request
|
||||
-- distributed update tests
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- insert some rows
|
||||
INSERT INTO users_table VALUES (1, 'A'), (2, 'B'), (3, 'C'), (4, 'D'), (5, 'E');
|
||||
INSERT INTO events_table VALUES (1,1,1), (1,2,1), (1,3,1), (2,1, 4), (3, 4,1), (5, 1, 2), (5, 2, 1), (5, 2,2);
|
||||
SELECT * FROM users_table ORDER BY 1, 2;
|
||||
user_id | user_name
|
||||
---------+-----------
|
||||
1 | A
|
||||
2 | B
|
||||
3 | C
|
||||
4 | D
|
||||
5 | E
|
||||
(5 rows)
|
||||
|
||||
-- following will delete and insert the same rows
|
||||
WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *)
|
||||
INSERT INTO users_table SELECT * FROM cte_delete;
|
||||
-- verify contents are the same
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM users_table ORDER BY 1, 2;
|
||||
user_id | user_name
|
||||
---------+-----------
|
||||
1 | A
|
||||
2 | B
|
||||
3 | C
|
||||
4 | D
|
||||
5 | E
|
||||
(5 rows)
|
||||
|
||||
-- kill connection during deletion
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM").kill()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *)
|
||||
INSERT INTO users_table SELECT * FROM cte_delete;
|
||||
ERROR: server closed the connection unexpectedly
|
||||
This probably means the server terminated abnormally
|
||||
before or while processing the request.
|
||||
CONTEXT: while executing command on localhost:57640
|
||||
-- verify contents are the same
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM users_table ORDER BY 1, 2;
|
||||
user_id | user_name
|
||||
---------+-----------
|
||||
1 | A
|
||||
2 | B
|
||||
3 | C
|
||||
4 | D
|
||||
5 | E
|
||||
(5 rows)
|
||||
|
||||
-- kill connection during insert
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *)
|
||||
INSERT INTO users_table SELECT * FROM cte_delete;
|
||||
ERROR: server closed the connection unexpectedly
|
||||
This probably means the server terminated abnormally
|
||||
before or while processing the request.
|
||||
CONTEXT: while executing command on localhost:57640
|
||||
-- verify contents are the same
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM users_table ORDER BY 1, 2;
|
||||
user_id | user_name
|
||||
---------+-----------
|
||||
1 | A
|
||||
2 | B
|
||||
3 | C
|
||||
4 | D
|
||||
5 | E
|
||||
(5 rows)
|
||||
|
||||
-- cancel during deletion
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM").cancel(' || :pid || ')');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *)
|
||||
INSERT INTO users_table SELECT * FROM cte_delete;
|
||||
ERROR: canceling statement due to user request
|
||||
-- verify contents are the same
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM users_table ORDER BY 1, 2;
|
||||
user_id | user_name
|
||||
---------+-----------
|
||||
1 | A
|
||||
2 | B
|
||||
3 | C
|
||||
4 | D
|
||||
5 | E
|
||||
(5 rows)
|
||||
|
||||
-- cancel during insert
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || :pid || ')');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *)
|
||||
INSERT INTO users_table SELECT * FROM cte_delete;
|
||||
ERROR: canceling statement due to user request
|
||||
-- verify contents are the same
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM users_table ORDER BY 1, 2;
|
||||
user_id | user_name
|
||||
---------+-----------
|
||||
1 | A
|
||||
2 | B
|
||||
3 | C
|
||||
4 | D
|
||||
5 | E
|
||||
(5 rows)
|
||||
|
||||
RESET SEARCH_PATH;
|
||||
DROP SCHEMA cte_failure CASCADE;
|
||||
NOTICE: drop cascades to 3 other objects
|
||||
DETAIL: drop cascades to table cte_failure.users_table
|
||||
drop cascades to table cte_failure.events_table
|
||||
drop cascades to table cte_failure.users_table_local
|
|
@ -18,3 +18,4 @@ test: failure_create_table
|
|||
test: failure_1pc_copy_hash
|
||||
test: failure_1pc_copy_append
|
||||
test: failure_multi_shard_update_delete
|
||||
test: failure_cte_subquery
|
||||
|
|
|
@ -0,0 +1,236 @@
|
|||
|
||||
CREATE SCHEMA cte_failure;
|
||||
SET SEARCH_PATH=cte_failure;
|
||||
SET citus.shard_count to 2;
|
||||
SET citus.shard_replication_factor to 1;
|
||||
|
||||
SELECT pg_backend_pid() as pid \gset
|
||||
|
||||
CREATE TABLE users_table (user_id int, user_name text);
|
||||
CREATE TABLE events_table(user_id int, event_id int, event_type int);
|
||||
SELECT create_distributed_table('users_table', 'user_id');
|
||||
SELECT create_distributed_table('events_table', 'user_id');
|
||||
CREATE TABLE users_table_local AS SELECT * FROM users_table;
|
||||
|
||||
-- kill at the first copy (push)
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()');
|
||||
|
||||
WITH cte AS (
|
||||
WITH local_cte AS (
|
||||
SELECT * FROM users_table_local
|
||||
),
|
||||
dist_cte AS (
|
||||
SELECT user_id FROM events_table
|
||||
)
|
||||
SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id
|
||||
)
|
||||
SELECT
|
||||
count(*)
|
||||
FROM
|
||||
cte,
|
||||
(SELECT
|
||||
DISTINCT users_table.user_id
|
||||
FROM
|
||||
users_table, events_table
|
||||
WHERE
|
||||
users_table.user_id = events_table.user_id AND
|
||||
event_type IN (1,2,3,4)
|
||||
ORDER BY 1 DESC LIMIT 5
|
||||
) as foo
|
||||
WHERE foo.user_id = cte.user_id;
|
||||
|
||||
-- kill at the second copy (pull)
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="SELECT user_id FROM").kill()');
|
||||
|
||||
WITH cte AS (
|
||||
WITH local_cte AS (
|
||||
SELECT * FROM users_table_local
|
||||
),
|
||||
dist_cte AS (
|
||||
SELECT user_id FROM events_table
|
||||
)
|
||||
SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id
|
||||
)
|
||||
SELECT
|
||||
count(*)
|
||||
FROM
|
||||
cte,
|
||||
(SELECT
|
||||
DISTINCT users_table.user_id
|
||||
FROM
|
||||
users_table, events_table
|
||||
WHERE
|
||||
users_table.user_id = events_table.user_id AND
|
||||
event_type IN (1,2,3,4)
|
||||
ORDER BY 1 DESC LIMIT 5
|
||||
) as foo
|
||||
WHERE foo.user_id = cte.user_id;
|
||||
|
||||
-- kill at the third copy (pull)
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="SELECT DISTINCT users_table.user").kill()');
|
||||
|
||||
WITH cte AS (
|
||||
WITH local_cte AS (
|
||||
SELECT * FROM users_table_local
|
||||
),
|
||||
dist_cte AS (
|
||||
SELECT user_id FROM events_table
|
||||
)
|
||||
SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id
|
||||
)
|
||||
SELECT
|
||||
count(*)
|
||||
FROM
|
||||
cte,
|
||||
(SELECT
|
||||
DISTINCT users_table.user_id
|
||||
FROM
|
||||
users_table, events_table
|
||||
WHERE
|
||||
users_table.user_id = events_table.user_id AND
|
||||
event_type IN (1,2,3,4)
|
||||
ORDER BY 1 DESC LIMIT 5
|
||||
) as foo
|
||||
WHERE foo.user_id = cte.user_id;
|
||||
|
||||
-- cancel at the first copy (push)
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || :pid || ')');
|
||||
|
||||
WITH cte AS (
|
||||
WITH local_cte AS (
|
||||
SELECT * FROM users_table_local
|
||||
),
|
||||
dist_cte AS (
|
||||
SELECT user_id FROM events_table
|
||||
)
|
||||
SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id
|
||||
)
|
||||
SELECT
|
||||
count(*)
|
||||
FROM
|
||||
cte,
|
||||
(SELECT
|
||||
DISTINCT users_table.user_id
|
||||
FROM
|
||||
users_table, events_table
|
||||
WHERE
|
||||
users_table.user_id = events_table.user_id AND
|
||||
event_type IN (1,2,3,4)
|
||||
ORDER BY 1 DESC LIMIT 5
|
||||
) as foo
|
||||
WHERE foo.user_id = cte.user_id;
|
||||
|
||||
-- cancel at the second copy (pull)
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="SELECT user_id FROM").cancel(' || :pid || ')');
|
||||
|
||||
WITH cte AS (
|
||||
WITH local_cte AS (
|
||||
SELECT * FROM users_table_local
|
||||
),
|
||||
dist_cte AS (
|
||||
SELECT user_id FROM events_table
|
||||
)
|
||||
SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id
|
||||
)
|
||||
SELECT
|
||||
count(*)
|
||||
FROM
|
||||
cte,
|
||||
(SELECT
|
||||
DISTINCT users_table.user_id
|
||||
FROM
|
||||
users_table, events_table
|
||||
WHERE
|
||||
users_table.user_id = events_table.user_id AND
|
||||
event_type IN (1,2,3,4)
|
||||
ORDER BY 1 DESC LIMIT 5
|
||||
) as foo
|
||||
WHERE foo.user_id = cte.user_id;
|
||||
|
||||
-- cancel at the third copy (pull)
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="SELECT DISTINCT users_table.user").cancel(' || :pid || ')');
|
||||
|
||||
WITH cte AS (
|
||||
WITH local_cte AS (
|
||||
SELECT * FROM users_table_local
|
||||
),
|
||||
dist_cte AS (
|
||||
SELECT user_id FROM events_table
|
||||
)
|
||||
SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id
|
||||
)
|
||||
SELECT
|
||||
count(*)
|
||||
FROM
|
||||
cte,
|
||||
(SELECT
|
||||
DISTINCT users_table.user_id
|
||||
FROM
|
||||
users_table, events_table
|
||||
WHERE
|
||||
users_table.user_id = events_table.user_id AND
|
||||
event_type IN (1,2,3,4)
|
||||
ORDER BY 1 DESC LIMIT 5
|
||||
) as foo
|
||||
WHERE foo.user_id = cte.user_id;
|
||||
|
||||
-- distributed update tests
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
|
||||
-- insert some rows
|
||||
INSERT INTO users_table VALUES (1, 'A'), (2, 'B'), (3, 'C'), (4, 'D'), (5, 'E');
|
||||
INSERT INTO events_table VALUES (1,1,1), (1,2,1), (1,3,1), (2,1, 4), (3, 4,1), (5, 1, 2), (5, 2, 1), (5, 2,2);
|
||||
|
||||
SELECT * FROM users_table ORDER BY 1, 2;
|
||||
-- following will delete and insert the same rows
|
||||
WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *)
|
||||
INSERT INTO users_table SELECT * FROM cte_delete;
|
||||
-- verify contents are the same
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
SELECT * FROM users_table ORDER BY 1, 2;
|
||||
|
||||
-- kill connection during deletion
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM").kill()');
|
||||
WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *)
|
||||
INSERT INTO users_table SELECT * FROM cte_delete;
|
||||
|
||||
-- verify contents are the same
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
SELECT * FROM users_table ORDER BY 1, 2;
|
||||
|
||||
-- kill connection during insert
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()');
|
||||
WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *)
|
||||
INSERT INTO users_table SELECT * FROM cte_delete;
|
||||
|
||||
-- verify contents are the same
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
SELECT * FROM users_table ORDER BY 1, 2;
|
||||
|
||||
-- cancel during deletion
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM").cancel(' || :pid || ')');
|
||||
WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *)
|
||||
INSERT INTO users_table SELECT * FROM cte_delete;
|
||||
|
||||
-- verify contents are the same
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
SELECT * FROM users_table ORDER BY 1, 2;
|
||||
|
||||
-- cancel during insert
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || :pid || ')');
|
||||
WITH cte_delete as (DELETE FROM users_table WHERE user_name in ('A', 'D') RETURNING *)
|
||||
INSERT INTO users_table SELECT * FROM cte_delete;
|
||||
|
||||
-- verify contents are the same
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
SELECT * FROM users_table ORDER BY 1, 2;
|
||||
|
||||
RESET SEARCH_PATH;
|
||||
DROP SCHEMA cte_failure CASCADE;
|
||||
|
||||
|
Loading…
Reference in New Issue