diff --git a/src/test/regress/expected/failure_real_time_select.out b/src/test/regress/expected/failure_real_time_select.out new file mode 100644 index 000000000..da4d77bac --- /dev/null +++ b/src/test/regress/expected/failure_real_time_select.out @@ -0,0 +1,303 @@ +-- +-- Failure tests for real time select queries +-- +CREATE SCHEMA real_time_select_failure; +SET search_path TO 'real_time_select_failure'; +SET citus.next_shard_id TO 190000; +-- Preparation +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SET citus.shard_count to 4; +SET citus.shard_replication_factor to 1; +-- create tables +CREATE TABLE test_table(id int, value_1 int, value_2 int); +SELECT create_distributed_table('test_table','id'); + create_distributed_table +-------------------------- + +(1 row) + +-- Populate data to the table +INSERT INTO test_table VALUES(1,1,1),(1,2,2),(2,1,1),(2,2,2),(3,1,1),(3,2,2); +-- Create a function to make sure that queries returning the same result +CREATE FUNCTION raise_failed_execution(query text) RETURNS void AS $$ +BEGIN + EXECUTE query; + EXCEPTION WHEN OTHERS THEN + IF SQLERRM LIKE 'failed to execute task%' THEN + RAISE 'Task failed to execute'; + END IF; +END; +$$LANGUAGE plpgsql; +-- Kill when the first COPY command arrived, since we have a single placement +-- it is expected to error out. +SET client_min_messages TO ERROR; +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()'); + mitmproxy +----------- + +(1 row) + +SELECT raise_failed_execution('SELECT count(*) FROM test_table'); +ERROR: Task failed to execute +CONTEXT: PL/pgSQL function raise_failed_execution(text) line 6 at RAISE +SET client_min_messages TO DEFAULT; +-- Kill the connection with a CTE +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()'); + mitmproxy +----------- + +(1 row) + +WITH +results AS (SELECT * FROM test_table) +SELECT * FROM test_table, results +WHERE test_table.id = results.id; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +-- Since the outer query uses the connection opened by the CTE, +-- killing connection after first successful query should break. +SET client_min_messages TO ERROR; +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").after(1).kill()'); + mitmproxy +----------- + +(1 row) + +SELECT raise_failed_execution('WITH + results AS (SELECT * FROM test_table) + SELECT * FROM test_table, results + WHERE test_table.id = results.id'); +ERROR: Task failed to execute +CONTEXT: PL/pgSQL function raise_failed_execution(text) line 6 at RAISE +SET client_min_messages TO DEFAULT; +-- In parallel execution mode Citus opens separate connections for each shard +-- so killing the connection after the first copy does not break it. +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").after(1).kill()'); + mitmproxy +----------- + +(1 row) + +SELECT count(*) FROM test_table; + count +------- + 6 +(1 row) + +-- Cancel a real-time executor query +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || pg_backend_pid() || ')'); + mitmproxy +----------- + +(1 row) + +SELECT count(*) FROM test_table; +ERROR: canceling statement due to user request +-- Cancel a query within the transaction +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || pg_backend_pid() || ')'); + mitmproxy +----------- + +(1 row) + +BEGIN; +SELECT count(*) FROM test_table; +ERROR: canceling statement due to user request +COMMIT; +-- Cancel a query within the transaction after a multi-shard update +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || pg_backend_pid() || ')'); + mitmproxy +----------- + +(1 row) + +BEGIN; +UPDATE test_table SET value_1 = value_1 + 1; +SELECT count(*) FROM test_table; +ERROR: canceling statement due to user request +COMMIT; +-- Cancel a query with CTE +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || pg_backend_pid() || ')'); + mitmproxy +----------- + +(1 row) + +WITH +results AS (SELECT * FROM test_table) +SELECT * FROM test_table +WHERE test_table.id > (SELECT id FROM results); +ERROR: canceling statement due to user request +-- Since Citus opens a new connection after a failure within the real time +-- execution and after(1).kill() kills connection after a successful execution +-- for each connection, following transaciton does not fail. +SET citus.multi_shard_modify_mode to sequential; +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").after(1).kill()'); + mitmproxy +----------- + +(1 row) + +BEGIN; +SELECT count(*) FROM test_table; +WARNING: could not consume data from worker node + count +------- + 6 +(1 row) + +COMMIT; +-- Cancel a real-time executor query - in sequential mode +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || pg_backend_pid() || ')'); + mitmproxy +----------- + +(1 row) + +SELECT count(*) FROM test_table; +ERROR: canceling statement due to user request +-- Cancel a query within the transaction - in sequential mode +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || pg_backend_pid() || ')'); + mitmproxy +----------- + +(1 row) + +BEGIN; +SELECT count(*) FROM test_table; +ERROR: canceling statement due to user request +COMMIT; +-- Cancel the query within a transaction after a single succesful run +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").after(1).cancel(' || pg_backend_pid() || ')'); + mitmproxy +----------- + +(1 row) + +BEGIN; +SELECT count(*) FROM test_table; +ERROR: canceling statement due to user request +COMMIT; +-- Now, test with replication factor 2, tests are expected to pass +-- since we have two placements for the same shard +DROP TABLE test_table; +SET citus.multi_shard_modify_mode to default; +-- Create table with shard placements on each node +SET citus.shard_replication_factor to 2; +CREATE TABLE test_table(id int, value_1 int, value_2 int); +SELECT create_distributed_table('test_table','id'); + create_distributed_table +-------------------------- + +(1 row) + +-- Populate data to the table +INSERT INTO test_table VALUES(1,1,1),(1,2,2),(2,1,1),(2,2,2),(3,1,1),(3,2,2); +-- Kill when the first COPY command arrived, since we have placements on each node +-- it shouldn't fail. +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()'); + mitmproxy +----------- + +(1 row) + +SELECT count(*) FROM test_table; +WARNING: could not consume data from worker node +WARNING: could not consume data from worker node + count +------- + 6 +(1 row) + +-- Kill within the transaction, since we have placements on each node +-- it shouldn't fail. +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()'); + mitmproxy +----------- + +(1 row) + +BEGIN; +SELECT count(*) FROM test_table; +WARNING: could not consume data from worker node +WARNING: could not consume data from worker node + count +------- + 6 +(1 row) + +COMMIT; +-- Cancel a real-time executor query +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || pg_backend_pid() || ')'); + mitmproxy +----------- + +(1 row) + +SELECT count(*) FROM test_table; +ERROR: canceling statement due to user request +-- Cancel a query within the transaction +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || pg_backend_pid() || ')'); + mitmproxy +----------- + +(1 row) + +BEGIN; +SELECT count(*) FROM test_table; +ERROR: canceling statement due to user request +COMMIT; +-- Cancel a query within the transaction after a multi-shard update +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || pg_backend_pid() || ')'); + mitmproxy +----------- + +(1 row) + +BEGIN; +UPDATE test_table SET value_1 = value_1 + 1; +SELECT count(*) FROM test_table; +ERROR: canceling statement due to user request +COMMIT; +-- Cancel a query with CTE +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || pg_backend_pid() || ')'); + mitmproxy +----------- + +(1 row) + +WITH +results AS (SELECT * FROM test_table) +SELECT * FROM test_table +WHERE test_table.id > (SELECT id FROM results); +ERROR: canceling statement due to user request +-- Since we have the placement on each node, test with sequential mode +-- should pass as well. +SET citus.multi_shard_modify_mode to sequential; +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").after(1).kill()'); + mitmproxy +----------- + +(1 row) + +BEGIN; +SELECT count(*) FROM test_table; +WARNING: could not consume data from worker node + count +------- + 6 +(1 row) + +COMMIT; +DROP SCHEMA real_time_select_failure CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to function raise_failed_execution(text) +drop cascades to table test_table +SET search_path TO default; diff --git a/src/test/regress/failure_schedule b/src/test/regress/failure_schedule index d6b05865e..153ae77c5 100644 --- a/src/test/regress/failure_schedule +++ b/src/test/regress/failure_schedule @@ -24,3 +24,4 @@ test: failure_multi_dml test: failure_vacuum test: failure_single_select test: failure_ref_tables +test: failure_real_time_select diff --git a/src/test/regress/sql/failure_real_time_select.sql b/src/test/regress/sql/failure_real_time_select.sql new file mode 100644 index 000000000..a9df56c80 --- /dev/null +++ b/src/test/regress/sql/failure_real_time_select.sql @@ -0,0 +1,167 @@ +-- +-- Failure tests for real time select queries +-- +CREATE SCHEMA real_time_select_failure; +SET search_path TO 'real_time_select_failure'; +SET citus.next_shard_id TO 190000; + +-- Preparation +SELECT citus.mitmproxy('conn.allow()'); +SET citus.shard_count to 4; +SET citus.shard_replication_factor to 1; + +-- create tables +CREATE TABLE test_table(id int, value_1 int, value_2 int); +SELECT create_distributed_table('test_table','id'); + +-- Populate data to the table +INSERT INTO test_table VALUES(1,1,1),(1,2,2),(2,1,1),(2,2,2),(3,1,1),(3,2,2); + +-- Create a function to make sure that queries returning the same result +CREATE FUNCTION raise_failed_execution(query text) RETURNS void AS $$ +BEGIN + EXECUTE query; + EXCEPTION WHEN OTHERS THEN + IF SQLERRM LIKE 'failed to execute task%' THEN + RAISE 'Task failed to execute'; + END IF; +END; +$$LANGUAGE plpgsql; + +-- Kill when the first COPY command arrived, since we have a single placement +-- it is expected to error out. +SET client_min_messages TO ERROR; +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()'); +SELECT raise_failed_execution('SELECT count(*) FROM test_table'); +SET client_min_messages TO DEFAULT; + +-- Kill the connection with a CTE +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()'); +WITH +results AS (SELECT * FROM test_table) +SELECT * FROM test_table, results +WHERE test_table.id = results.id; + +-- Since the outer query uses the connection opened by the CTE, +-- killing connection after first successful query should break. +SET client_min_messages TO ERROR; +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").after(1).kill()'); +SELECT raise_failed_execution('WITH + results AS (SELECT * FROM test_table) + SELECT * FROM test_table, results + WHERE test_table.id = results.id'); +SET client_min_messages TO DEFAULT; + +-- In parallel execution mode Citus opens separate connections for each shard +-- so killing the connection after the first copy does not break it. +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").after(1).kill()'); +SELECT count(*) FROM test_table; + +-- Cancel a real-time executor query +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || pg_backend_pid() || ')'); +SELECT count(*) FROM test_table; + +-- Cancel a query within the transaction +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || pg_backend_pid() || ')'); +BEGIN; +SELECT count(*) FROM test_table; +COMMIT; + +-- Cancel a query within the transaction after a multi-shard update +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || pg_backend_pid() || ')'); +BEGIN; +UPDATE test_table SET value_1 = value_1 + 1; +SELECT count(*) FROM test_table; +COMMIT; + +-- Cancel a query with CTE +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || pg_backend_pid() || ')'); +WITH +results AS (SELECT * FROM test_table) +SELECT * FROM test_table +WHERE test_table.id > (SELECT id FROM results); + +-- Since Citus opens a new connection after a failure within the real time +-- execution and after(1).kill() kills connection after a successful execution +-- for each connection, following transaciton does not fail. +SET citus.multi_shard_modify_mode to sequential; +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").after(1).kill()'); +BEGIN; +SELECT count(*) FROM test_table; +COMMIT; + +-- Cancel a real-time executor query - in sequential mode +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || pg_backend_pid() || ')'); +SELECT count(*) FROM test_table; + +-- Cancel a query within the transaction - in sequential mode +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || pg_backend_pid() || ')'); +BEGIN; +SELECT count(*) FROM test_table; +COMMIT; + +-- Cancel the query within a transaction after a single succesful run +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").after(1).cancel(' || pg_backend_pid() || ')'); +BEGIN; +SELECT count(*) FROM test_table; +COMMIT; + +-- Now, test with replication factor 2, tests are expected to pass +-- since we have two placements for the same shard +DROP TABLE test_table; +SET citus.multi_shard_modify_mode to default; + +-- Create table with shard placements on each node +SET citus.shard_replication_factor to 2; +CREATE TABLE test_table(id int, value_1 int, value_2 int); +SELECT create_distributed_table('test_table','id'); + +-- Populate data to the table +INSERT INTO test_table VALUES(1,1,1),(1,2,2),(2,1,1),(2,2,2),(3,1,1),(3,2,2); + +-- Kill when the first COPY command arrived, since we have placements on each node +-- it shouldn't fail. +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()'); +SELECT count(*) FROM test_table; + +-- Kill within the transaction, since we have placements on each node +-- it shouldn't fail. +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()'); +BEGIN; +SELECT count(*) FROM test_table; +COMMIT; + +-- Cancel a real-time executor query +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || pg_backend_pid() || ')'); +SELECT count(*) FROM test_table; + +-- Cancel a query within the transaction +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || pg_backend_pid() || ')'); +BEGIN; +SELECT count(*) FROM test_table; +COMMIT; + +-- Cancel a query within the transaction after a multi-shard update +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || pg_backend_pid() || ')'); +BEGIN; +UPDATE test_table SET value_1 = value_1 + 1; +SELECT count(*) FROM test_table; +COMMIT; + +-- Cancel a query with CTE +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || pg_backend_pid() || ')'); +WITH +results AS (SELECT * FROM test_table) +SELECT * FROM test_table +WHERE test_table.id > (SELECT id FROM results); + +-- Since we have the placement on each node, test with sequential mode +-- should pass as well. +SET citus.multi_shard_modify_mode to sequential; +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").after(1).kill()'); +BEGIN; +SELECT count(*) FROM test_table; +COMMIT; + +DROP SCHEMA real_time_select_failure CASCADE; +SET search_path TO default;