From bb656691862a3ce02349b9775c51034dc9b7a040 Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Thu, 9 Jan 2020 10:51:37 -0800 Subject: [PATCH] Failure tests for PartitionTasklistResults --- .../distributed_intermediate_results.out | 15 -- .../expected/failure_distributed_results.out | 132 ++++++++++++++++++ .../regress/expected/multi_test_helpers.out | 12 ++ src/test/regress/failure_schedule | 1 + .../sql/distributed_intermediate_results.sql | 17 --- .../sql/failure_distributed_results.sql | 74 ++++++++++ src/test/regress/sql/multi_test_helpers.sql | 14 ++ 7 files changed, 233 insertions(+), 32 deletions(-) create mode 100644 src/test/regress/expected/failure_distributed_results.out create mode 100644 src/test/regress/sql/failure_distributed_results.sql diff --git a/src/test/regress/expected/distributed_intermediate_results.out b/src/test/regress/expected/distributed_intermediate_results.out index cc7ad8af9..24d3ce1dc 100644 --- a/src/test/regress/expected/distributed_intermediate_results.out +++ b/src/test/regress/expected/distributed_intermediate_results.out @@ -3,21 +3,6 @@ CREATE SCHEMA distributed_intermediate_results; SET search_path TO 'distributed_intermediate_results'; SET citus.next_shard_id TO 4213581; -- --- Helper UDFs --- --- partition_task_list_results tests the internal PartitionTasklistResults function -CREATE OR REPLACE FUNCTION pg_catalog.partition_task_list_results(resultIdPrefix text, - query text, - target_table regclass, - binaryFormat bool DEFAULT true) - RETURNS TABLE(resultId text, - nodeId int, - rowCount bigint, - targetShardId bigint, - targetShardIndex int) - LANGUAGE C STRICT VOLATILE - AS 'citus', $$partition_task_list_results$$; --- -- We don't have extensive tests for partition_task_results, since it will be -- tested by higher level "INSERT/SELECT with repartitioning" tests anyway. -- diff --git a/src/test/regress/expected/failure_distributed_results.out b/src/test/regress/expected/failure_distributed_results.out new file mode 100644 index 000000000..5681a5173 --- /dev/null +++ b/src/test/regress/expected/failure_distributed_results.out @@ -0,0 +1,132 @@ +-- +-- Test failures related to distributed_intermediate_results.c +-- +CREATE SCHEMA failure_distributed_results; +SET search_path TO 'failure_distributed_results'; +-- do not cache any connections +SET citus.max_cached_conns_per_worker TO 0; +-- we don't want to see the prepared transaction numbers in the warnings +SET client_min_messages TO WARNING; +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SET citus.next_shard_id TO 100800; +-- always try the 1st replica before the 2nd replica. +SET citus.task_assignment_policy TO 'first-replica'; +-- +-- Case 1. +-- Source is replicated, target is not. +-- Fail the partition query on the first node. +-- +CREATE TABLE source_table(a int); +SET citus.shard_replication_factor TO 2; +SET citus.shard_count TO 5; +SELECT create_distributed_table('source_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO source_table SELECT * FROM generate_series(1, 100); +CREATE TABLE target_table(a int); +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 1; +SELECT create_distributed_table('target_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- without failure results from 100802 should be stored on 9060 +BEGIN; +CREATE TABLE distributed_result_info AS + SELECT resultId, nodeport, rowcount, targetShardId, targetShardIndex + FROM partition_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table') + NATURAL JOIN pg_dist_node; +SELECT * FROM distributed_result_info ORDER BY resultId; + resultid | nodeport | rowcount | targetshardid | targetshardindex +--------------------------------------------------------------------- + test_from_100800_to_0 | 9060 | 22 | 100805 | 0 + test_from_100801_to_0 | 57637 | 2 | 100805 | 0 + test_from_100801_to_1 | 57637 | 15 | 100806 | 1 + test_from_100802_to_1 | 9060 | 10 | 100806 | 1 + test_from_100802_to_2 | 9060 | 5 | 100807 | 2 + test_from_100803_to_2 | 57637 | 18 | 100807 | 2 + test_from_100803_to_3 | 57637 | 4 | 100808 | 3 + test_from_100804_to_3 | 9060 | 24 | 100808 | 3 +(8 rows) + +SELECT fetch_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[], 'localhost', :worker_2_port) > 0 AS fetched; + fetched +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*), sum(x) FROM + read_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[],'binary') AS res (x int); + count | sum +--------------------------------------------------------------------- + 15 | 863 +(1 row) + +ROLLBACk; +-- with failure, results from 100802 should be retried and succeed on 57637 +SELECT citus.mitmproxy('conn.onQuery(query="worker_partition_query_result.*test_from_100802").kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +BEGIN; +CREATE TABLE distributed_result_info AS + SELECT resultId, nodeport, rowcount, targetShardId, targetShardIndex + FROM partition_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table') + NATURAL JOIN pg_dist_node; +WARNING: connection error: localhost:xxxxx +DETAIL: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +SELECT * FROM distributed_result_info ORDER BY resultId; + resultid | nodeport | rowcount | targetshardid | targetshardindex +--------------------------------------------------------------------- + test_from_100800_to_0 | 9060 | 22 | 100805 | 0 + test_from_100801_to_0 | 57637 | 2 | 100805 | 0 + test_from_100801_to_1 | 57637 | 15 | 100806 | 1 + test_from_100802_to_1 | 57637 | 10 | 100806 | 1 + test_from_100802_to_2 | 57637 | 5 | 100807 | 2 + test_from_100803_to_2 | 57637 | 18 | 100807 | 2 + test_from_100803_to_3 | 57637 | 4 | 100808 | 3 + test_from_100804_to_3 | 9060 | 24 | 100808 | 3 +(8 rows) + +-- fetch from worker 2 should fail +SAVEPOINT s1; +SELECT fetch_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[], 'localhost', :worker_2_port) > 0 AS fetched; +ERROR: could not open file "base/pgsql_job_cache/xx_x_xxx/test_from_100802_to_1.data": No such file or directory +CONTEXT: while executing command on localhost:xxxxx +ROLLBACK TO SAVEPOINT s1; +-- fetch from worker 1 should succeed +SELECT fetch_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[], 'localhost', :worker_1_port) > 0 AS fetched; + fetched +--------------------------------------------------------------------- + t +(1 row) + +-- make sure the results read are same as the previous transaction block +SELECT count(*), sum(x) FROM + read_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[],'binary') AS res (x int); + count | sum +--------------------------------------------------------------------- + 15 | 863 +(1 row) + +ROLLBACk; +SET search_path TO 'public'; +RESET citus.shard_replication_factor; +RESET citus.shard_count; +RESET citus.task_assignment_policy; +DROP SCHEMA failure_distributed_results CASCADE; +RESET client_min_messages; diff --git a/src/test/regress/expected/multi_test_helpers.out b/src/test/regress/expected/multi_test_helpers.out index 1b17445c8..c901beb5f 100644 --- a/src/test/regress/expected/multi_test_helpers.out +++ b/src/test/regress/expected/multi_test_helpers.out @@ -135,3 +135,15 @@ BEGIN END LOOP; END; $$ LANGUAGE plpgsql; +-- partition_task_list_results tests the internal PartitionTasklistResults function +CREATE OR REPLACE FUNCTION pg_catalog.partition_task_list_results(resultIdPrefix text, + query text, + target_table regclass, + binaryFormat bool DEFAULT true) + RETURNS TABLE(resultId text, + nodeId int, + rowCount bigint, + targetShardId bigint, + targetShardIndex int) + LANGUAGE C STRICT VOLATILE + AS 'citus', $$partition_task_list_results$$; diff --git a/src/test/regress/failure_schedule b/src/test/regress/failure_schedule index f63d28358..ddaff3a8c 100644 --- a/src/test/regress/failure_schedule +++ b/src/test/regress/failure_schedule @@ -6,6 +6,7 @@ test: failure_setup test: multi_test_helpers test: failure_replicated_partitions test: multi_test_catalog_views +test: failure_distributed_results test: failure_ddl test: failure_truncate test: failure_create_index_concurrently diff --git a/src/test/regress/sql/distributed_intermediate_results.sql b/src/test/regress/sql/distributed_intermediate_results.sql index 4b5cecaa6..2d59abf45 100644 --- a/src/test/regress/sql/distributed_intermediate_results.sql +++ b/src/test/regress/sql/distributed_intermediate_results.sql @@ -4,23 +4,6 @@ SET search_path TO 'distributed_intermediate_results'; SET citus.next_shard_id TO 4213581; --- --- Helper UDFs --- - --- partition_task_list_results tests the internal PartitionTasklistResults function -CREATE OR REPLACE FUNCTION pg_catalog.partition_task_list_results(resultIdPrefix text, - query text, - target_table regclass, - binaryFormat bool DEFAULT true) - RETURNS TABLE(resultId text, - nodeId int, - rowCount bigint, - targetShardId bigint, - targetShardIndex int) - LANGUAGE C STRICT VOLATILE - AS 'citus', $$partition_task_list_results$$; - -- -- We don't have extensive tests for partition_task_results, since it will be -- tested by higher level "INSERT/SELECT with repartitioning" tests anyway. diff --git a/src/test/regress/sql/failure_distributed_results.sql b/src/test/regress/sql/failure_distributed_results.sql new file mode 100644 index 000000000..6f6e12805 --- /dev/null +++ b/src/test/regress/sql/failure_distributed_results.sql @@ -0,0 +1,74 @@ +-- +-- Test failures related to distributed_intermediate_results.c +-- + + +CREATE SCHEMA failure_distributed_results; +SET search_path TO 'failure_distributed_results'; + +-- do not cache any connections +SET citus.max_cached_conns_per_worker TO 0; + +-- we don't want to see the prepared transaction numbers in the warnings +SET client_min_messages TO WARNING; + +SELECT citus.mitmproxy('conn.allow()'); + +SET citus.next_shard_id TO 100800; + +-- always try the 1st replica before the 2nd replica. +SET citus.task_assignment_policy TO 'first-replica'; + +-- +-- Case 1. +-- Source is replicated, target is not. +-- Fail the partition query on the first node. +-- +CREATE TABLE source_table(a int); +SET citus.shard_replication_factor TO 2; +SET citus.shard_count TO 5; +SELECT create_distributed_table('source_table', 'a'); +INSERT INTO source_table SELECT * FROM generate_series(1, 100); + +CREATE TABLE target_table(a int); +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 1; +SELECT create_distributed_table('target_table', 'a'); + +-- without failure results from 100802 should be stored on 9060 +BEGIN; +CREATE TABLE distributed_result_info AS + SELECT resultId, nodeport, rowcount, targetShardId, targetShardIndex + FROM partition_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table') + NATURAL JOIN pg_dist_node; +SELECT * FROM distributed_result_info ORDER BY resultId; +SELECT fetch_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[], 'localhost', :worker_2_port) > 0 AS fetched; +SELECT count(*), sum(x) FROM + read_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[],'binary') AS res (x int); +ROLLBACk; + +-- with failure, results from 100802 should be retried and succeed on 57637 +SELECT citus.mitmproxy('conn.onQuery(query="worker_partition_query_result.*test_from_100802").kill()'); +BEGIN; +CREATE TABLE distributed_result_info AS + SELECT resultId, nodeport, rowcount, targetShardId, targetShardIndex + FROM partition_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table') + NATURAL JOIN pg_dist_node; +SELECT * FROM distributed_result_info ORDER BY resultId; +-- fetch from worker 2 should fail +SAVEPOINT s1; +SELECT fetch_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[], 'localhost', :worker_2_port) > 0 AS fetched; +ROLLBACK TO SAVEPOINT s1; +-- fetch from worker 1 should succeed +SELECT fetch_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[], 'localhost', :worker_1_port) > 0 AS fetched; +-- make sure the results read are same as the previous transaction block +SELECT count(*), sum(x) FROM + read_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[],'binary') AS res (x int); +ROLLBACk; + +SET search_path TO 'public'; +RESET citus.shard_replication_factor; +RESET citus.shard_count; +RESET citus.task_assignment_policy; +DROP SCHEMA failure_distributed_results CASCADE; +RESET client_min_messages; diff --git a/src/test/regress/sql/multi_test_helpers.sql b/src/test/regress/sql/multi_test_helpers.sql index 3383c6714..93823ea7a 100644 --- a/src/test/regress/sql/multi_test_helpers.sql +++ b/src/test/regress/sql/multi_test_helpers.sql @@ -141,3 +141,17 @@ BEGIN END LOOP; END; $$ LANGUAGE plpgsql; + + +-- partition_task_list_results tests the internal PartitionTasklistResults function +CREATE OR REPLACE FUNCTION pg_catalog.partition_task_list_results(resultIdPrefix text, + query text, + target_table regclass, + binaryFormat bool DEFAULT true) + RETURNS TABLE(resultId text, + nodeId int, + rowCount bigint, + targetShardId bigint, + targetShardIndex int) + LANGUAGE C STRICT VOLATILE + AS 'citus', $$partition_task_list_results$$;