-- -- Test failures related to distributed_intermediate_results.c -- CREATE SCHEMA failure_distributed_results; SET search_path TO 'failure_distributed_results'; -- do not cache any connections SET citus.max_cached_conns_per_worker TO 0; -- we don't want to see the prepared transaction numbers in the warnings SET client_min_messages TO WARNING; SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- (1 row) SET citus.next_shard_id TO 100800; -- Needed because of issue #7306 SET citus.force_max_query_parallelization TO true; -- always try the 1st replica before the 2nd replica. SET citus.task_assignment_policy TO 'first-replica'; -- -- Case 1. -- Source is replicated, target is not. -- Fail the partition query on the first node. -- CREATE TABLE source_table(a int); SET citus.shard_replication_factor TO 2; SET citus.shard_count TO 5; SELECT create_distributed_table('source_table', 'a'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO source_table SELECT * FROM generate_series(1, 100); CREATE TABLE target_table(a int); SET citus.shard_count TO 4; SET citus.shard_replication_factor TO 1; SELECT create_distributed_table('target_table', 'a'); create_distributed_table --------------------------------------------------------------------- (1 row) -- without failure results from 100802 should be stored on 9060 BEGIN; CREATE TABLE distributed_result_info AS SELECT resultId, nodeport, rowcount, targetShardId, targetShardIndex FROM partition_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table') NATURAL JOIN pg_dist_node; SELECT * FROM distributed_result_info ORDER BY resultId; resultid | nodeport | rowcount | targetshardid | targetshardindex --------------------------------------------------------------------- test_from_100800_to_0 | 9060 | 22 | 100805 | 0 test_from_100801_to_0 | 57637 | 2 | 100805 | 0 test_from_100801_to_1 | 57637 | 15 | 100806 | 1 test_from_100802_to_1 | 9060 | 10 | 100806 | 1 test_from_100802_to_2 | 9060 | 5 | 100807 | 2 test_from_100803_to_2 | 57637 | 18 | 100807 | 2 test_from_100803_to_3 | 57637 | 4 | 100808 | 3 test_from_100804_to_3 | 9060 | 24 | 100808 | 3 (8 rows) SELECT fetch_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[], 'localhost', :worker_2_port) > 0 AS fetched; fetched --------------------------------------------------------------------- t (1 row) SELECT count(*), sum(x) FROM read_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[],'binary') AS res (x int); count | sum --------------------------------------------------------------------- 15 | 863 (1 row) ROLLBACk; -- with failure, results from 100802 should be retried and succeed on 57637 SELECT citus.mitmproxy('conn.onQuery(query="worker_partition_query_result.*test_from_100802").kill()'); mitmproxy --------------------------------------------------------------------- (1 row) BEGIN; SET LOCAL client_min_messages TO DEBUG1; CREATE TABLE distributed_result_info AS SELECT resultId, nodeport, rowcount, targetShardId, targetShardIndex FROM partition_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table') NATURAL JOIN pg_dist_node; WARNING: connection to the remote node postgres@localhost:xxxxx failed with the following error: connection not open SELECT * FROM distributed_result_info ORDER BY resultId; resultid | nodeport | rowcount | targetshardid | targetshardindex --------------------------------------------------------------------- test_from_100800_to_0 | 9060 | 22 | 100805 | 0 test_from_100801_to_0 | 57637 | 2 | 100805 | 0 test_from_100801_to_1 | 57637 | 15 | 100806 | 1 test_from_100802_to_1 | 57637 | 10 | 100806 | 1 test_from_100802_to_2 | 57637 | 5 | 100807 | 2 test_from_100803_to_2 | 57637 | 18 | 100807 | 2 test_from_100803_to_3 | 57637 | 4 | 100808 | 3 test_from_100804_to_3 | 9060 | 24 | 100808 | 3 (8 rows) -- fetch from worker 2 should fail SAVEPOINT s1; SELECT fetch_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[], 'localhost', :worker_2_port) > 0 AS fetched; ERROR: could not open file "base/pgsql_job_cache/xx_x_xxx/test_from_100802_to_1.data": No such file or directory CONTEXT: while executing command on localhost:xxxxx ROLLBACK TO SAVEPOINT s1; -- fetch from worker 1 should succeed SELECT fetch_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[], 'localhost', :worker_1_port) > 0 AS fetched; fetched --------------------------------------------------------------------- t (1 row) -- make sure the results read are same as the previous transaction block SELECT count(*), sum(x) FROM read_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[],'binary') AS res (x int); count | sum --------------------------------------------------------------------- 15 | 863 (1 row) ROLLBACk; SET search_path TO 'public'; RESET citus.shard_replication_factor; RESET citus.shard_count; RESET citus.task_assignment_policy; DROP SCHEMA failure_distributed_results CASCADE; RESET client_min_messages;