From 6fed82609c432ffc8d66a3b5967a66dc91d50439 Mon Sep 17 00:00:00 2001 From: Jelte Fennema-Nio Date: Thu, 2 Nov 2023 13:13:29 +0100 Subject: [PATCH 1/5] Do not download all artifacts for flaky test detection (#7320) This is causing 404 failures due to a race condition: https://github.com/actions/toolkit/issues/1235 It also makes the tests take unnecessarily long. This was tested by changing a test file and seeing that the flaky test detection was still working. --- .github/workflows/build_and_test.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index f80e42f6d..e938e3904 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -497,7 +497,6 @@ jobs: matrix: ${{ fromJson(needs.prepare_parallelization_matrix_32.outputs.json) }} steps: - uses: actions/checkout@v3.5.0 - - uses: actions/download-artifact@v3.0.1 - uses: "./.github/actions/setup_extension" - name: Run minimal tests run: |- From 5a48a1602e6a42b1f6261c50c726d02339f153f0 Mon Sep 17 00:00:00 2001 From: Jelte Fennema-Nio Date: Thu, 2 Nov 2023 13:15:02 +0100 Subject: [PATCH 2/5] Debug flaky logical_replication test (#7309) Sometimes in CI our logical_replication test fails like this: ```diff +++ /__w/citus/citus/src/test/regress/results/logical_replication.out.modified 2023-11-01 14:15:08.562758546 +0000 @@ -40,21 +40,21 @@ SELECT count(*) from pg_publication; count ------- 0 (1 row) SELECT count(*) from pg_replication_slots; count ------- - 0 + 1 (1 row) SELECT count(*) FROM dist; count ------- ``` It's hard to understand what is going on here, just based on the wrong number. So this PR changes the test to show the name of the subscription, publication and replication slot to make finding the cause easier. In passing this also fixes another flaky test in the same file that our flaky test detection picked up. This is done by waiting for resource cleanup after the shard move. --- .../regress/expected/logical_replication.out | 113 +++++++++--------- src/test/regress/sql/logical_replication.sql | 32 ++--- 2 files changed, 72 insertions(+), 73 deletions(-) diff --git a/src/test/regress/expected/logical_replication.out b/src/test/regress/expected/logical_replication.out index 8a3e96da9..b5a36125a 100644 --- a/src/test/regress/expected/logical_replication.out +++ b/src/test/regress/expected/logical_replication.out @@ -32,23 +32,21 @@ CREATE SUBSCRIPTION citus_shard_move_subscription_:postgres_oid PUBLICATION citus_shard_move_publication_:postgres_oid WITH (enabled=false, slot_name=citus_shard_move_slot_:postgres_oid); NOTICE: created replication slot "citus_shard_move_slot_10" on publisher -SELECT count(*) from pg_subscription; - count +SELECT subname from pg_subscription; + subname --------------------------------------------------------------------- - 1 + citus_shard_move_subscription_10 (1 row) -SELECT count(*) from pg_publication; - count +SELECT pubname from pg_publication; + pubname --------------------------------------------------------------------- - 0 -(1 row) +(0 rows) -SELECT count(*) from pg_replication_slots; - count +SELECT slot_name from pg_replication_slots; + slot_name --------------------------------------------------------------------- - 0 -(1 row) +(0 rows) SELECT count(*) FROM dist; count @@ -58,22 +56,21 @@ SELECT count(*) FROM dist; \c - - - :worker_1_port SET search_path TO logical_replication; -SELECT count(*) from pg_subscription; - count +SELECT subname from pg_subscription; + subname --------------------------------------------------------------------- - 0 +(0 rows) + +SELECT pubname from pg_publication; + pubname +--------------------------------------------------------------------- + citus_shard_move_publication_10 (1 row) -SELECT count(*) from pg_publication; - count +SELECT slot_name from pg_replication_slots; + slot_name --------------------------------------------------------------------- - 1 -(1 row) - -SELECT count(*) from pg_replication_slots; - count ---------------------------------------------------------------------- - 1 + citus_shard_move_slot_10 (1 row) SELECT count(*) FROM dist; @@ -90,25 +87,29 @@ select citus_move_shard_placement(6830002, 'localhost', :worker_1_port, 'localho (1 row) +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + -- the subscription is still there, as there is no cleanup record for it -- we have created it manually -SELECT count(*) from pg_subscription; - count +SELECT subname from pg_subscription; + subname --------------------------------------------------------------------- - 1 + citus_shard_move_subscription_10 (1 row) -SELECT count(*) from pg_publication; - count +SELECT pubname from pg_publication; + pubname --------------------------------------------------------------------- - 0 -(1 row) +(0 rows) -SELECT count(*) from pg_replication_slots; - count +SELECT slot_name from pg_replication_slots; + slot_name --------------------------------------------------------------------- - 0 -(1 row) +(0 rows) SELECT count(*) from dist; count @@ -120,22 +121,21 @@ SELECT count(*) from dist; SET search_path TO logical_replication; -- the publication and repslot are still there, as there are no cleanup records for them -- we have created them manually -SELECT count(*) from pg_subscription; - count +SELECT subname from pg_subscription; + subname --------------------------------------------------------------------- - 0 +(0 rows) + +SELECT pubname from pg_publication; + pubname +--------------------------------------------------------------------- + citus_shard_move_publication_10 (1 row) -SELECT count(*) from pg_publication; - count +SELECT slot_name from pg_replication_slots; + slot_name --------------------------------------------------------------------- - 1 -(1 row) - -SELECT count(*) from pg_replication_slots; - count ---------------------------------------------------------------------- - 1 + citus_shard_move_slot_10 (1 row) SELECT count(*) from dist; @@ -153,23 +153,20 @@ SELECT pg_drop_replication_slot('citus_shard_move_slot_' || :postgres_oid); \c - - - :worker_2_port SET search_path TO logical_replication; -SELECT count(*) from pg_subscription; - count +SELECT subname from pg_subscription; + subname --------------------------------------------------------------------- - 0 -(1 row) +(0 rows) -SELECT count(*) from pg_publication; - count +SELECT pubname from pg_publication; + pubname --------------------------------------------------------------------- - 0 -(1 row) +(0 rows) -SELECT count(*) from pg_replication_slots; - count +SELECT slot_name from pg_replication_slots; + slot_name --------------------------------------------------------------------- - 0 -(1 row) +(0 rows) SELECT count(*) from dist; count diff --git a/src/test/regress/sql/logical_replication.sql b/src/test/regress/sql/logical_replication.sql index 3f8e048ca..a85c70b08 100644 --- a/src/test/regress/sql/logical_replication.sql +++ b/src/test/regress/sql/logical_replication.sql @@ -35,17 +35,17 @@ CREATE SUBSCRIPTION citus_shard_move_subscription_:postgres_oid WITH (enabled=false, slot_name=citus_shard_move_slot_:postgres_oid); -SELECT count(*) from pg_subscription; -SELECT count(*) from pg_publication; -SELECT count(*) from pg_replication_slots; +SELECT subname from pg_subscription; +SELECT pubname from pg_publication; +SELECT slot_name from pg_replication_slots; SELECT count(*) FROM dist; \c - - - :worker_1_port SET search_path TO logical_replication; -SELECT count(*) from pg_subscription; -SELECT count(*) from pg_publication; -SELECT count(*) from pg_replication_slots; +SELECT subname from pg_subscription; +SELECT pubname from pg_publication; +SELECT slot_name from pg_replication_slots; SELECT count(*) FROM dist; \c - - - :master_port @@ -53,11 +53,13 @@ SET search_path TO logical_replication; select citus_move_shard_placement(6830002, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical'); +SELECT public.wait_for_resource_cleanup(); + -- the subscription is still there, as there is no cleanup record for it -- we have created it manually -SELECT count(*) from pg_subscription; -SELECT count(*) from pg_publication; -SELECT count(*) from pg_replication_slots; +SELECT subname from pg_subscription; +SELECT pubname from pg_publication; +SELECT slot_name from pg_replication_slots; SELECT count(*) from dist; \c - - - :worker_1_port @@ -65,9 +67,9 @@ SET search_path TO logical_replication; -- the publication and repslot are still there, as there are no cleanup records for them -- we have created them manually -SELECT count(*) from pg_subscription; -SELECT count(*) from pg_publication; -SELECT count(*) from pg_replication_slots; +SELECT subname from pg_subscription; +SELECT pubname from pg_publication; +SELECT slot_name from pg_replication_slots; SELECT count(*) from dist; DROP PUBLICATION citus_shard_move_publication_:postgres_oid; @@ -76,9 +78,9 @@ SELECT pg_drop_replication_slot('citus_shard_move_slot_' || :postgres_oid); \c - - - :worker_2_port SET search_path TO logical_replication; -SELECT count(*) from pg_subscription; -SELECT count(*) from pg_publication; -SELECT count(*) from pg_replication_slots; +SELECT subname from pg_subscription; +SELECT pubname from pg_publication; +SELECT slot_name from pg_replication_slots; SELECT count(*) from dist; \c - - - :master_port From 0678a2fd895670e0dd0a3b594915144d468f20e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9dric=20Villemain?= Date: Thu, 2 Nov 2023 13:15:24 +0100 Subject: [PATCH 3/5] Fix #7242, CALL(@0) crash backend (#7288) When executing a prepared CALL, which is not pure SQL but available with some drivers like npgsql and jpgdbc, Citus entered a code path where a plan is not defined, while trying to increase its cost. Thus SIG11 when plan is a NULL pointer. Fix by only increasing plan cost when plan is not null. However, it is a bit suspicious to get here with a NULL plan and maybe a better change will be to not call ShardPlacementForFunctionColocatedWithDistTable() with a NULL plan at all (in call.c:134) bug hit with for example: ``` CallableStatement proc = con.prepareCall("{CALL p(?)}"); proc.registerOutParameter(1, java.sql.Types.BIGINT); proc.setInt(1, -100); proc.execute(); ``` where `p(bigint)` is a distributed "function" and the param the distribution key (also in a distributed table), see #7242 for details Fixes #7242 --- .../distributed/planner/distributed_planner.c | 1 + .../planner/function_call_delegation.c | 12 ++++++-- src/test/regress/citus_tests/common.py | 8 +++++ .../test/test_prepared_statements.py | 30 +++++++++++++++++++ 4 files changed, 49 insertions(+), 2 deletions(-) create mode 100644 src/test/regress/citus_tests/test/test_prepared_statements.py diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 65278d1ea..4f7612f8f 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -702,6 +702,7 @@ DissuadePlannerFromUsingPlan(PlannedStmt *plan) * Arbitrarily high cost, but low enough that it can be added up * without overflowing by choose_custom_plan(). */ + Assert(plan != NULL); plan->planTree->total_cost = FLT_MAX / 100000000; } diff --git a/src/backend/distributed/planner/function_call_delegation.c b/src/backend/distributed/planner/function_call_delegation.c index 2f8da29c0..ce9c818d7 100644 --- a/src/backend/distributed/planner/function_call_delegation.c +++ b/src/backend/distributed/planner/function_call_delegation.c @@ -525,8 +525,16 @@ ShardPlacementForFunctionColocatedWithDistTable(DistObjectCacheEntry *procedure, if (partitionParam->paramkind == PARAM_EXTERN) { - /* Don't log a message, we should end up here again without a parameter */ - DissuadePlannerFromUsingPlan(plan); + /* + * Don't log a message, we should end up here again without a + * parameter. + * Note that "plan" can be null, for example when a CALL statement + * is prepared. + */ + if (plan) + { + DissuadePlannerFromUsingPlan(plan); + } return NULL; } } diff --git a/src/test/regress/citus_tests/common.py b/src/test/regress/citus_tests/common.py index 53c9c7944..40c727189 100644 --- a/src/test/regress/citus_tests/common.py +++ b/src/test/regress/citus_tests/common.py @@ -581,6 +581,14 @@ class QueryRunner(ABC): with self.cur(**kwargs) as cur: cur.execute(query, params=params) + def sql_prepared(self, query, params=None, **kwargs): + """Run an SQL query, with prepare=True + + This opens a new connection and closes it once the query is done + """ + with self.cur(**kwargs) as cur: + cur.execute(query, params=params, prepare=True) + def sql_row(self, query, params=None, allow_empty_result=False, **kwargs): """Run an SQL query that returns a single row and returns this row diff --git a/src/test/regress/citus_tests/test/test_prepared_statements.py b/src/test/regress/citus_tests/test/test_prepared_statements.py new file mode 100644 index 000000000..761ecc30c --- /dev/null +++ b/src/test/regress/citus_tests/test/test_prepared_statements.py @@ -0,0 +1,30 @@ +def test_call_param(cluster): + # create a distributed table and an associated distributed procedure + # to ensure parameterized CALL succeed, even when the param is the + # distribution key. + coord = cluster.coordinator + coord.sql("CREATE TABLE test(i int)") + coord.sql( + """ + CREATE PROCEDURE p(_i INT) LANGUAGE plpgsql AS $$ + BEGIN + INSERT INTO test(i) VALUES (_i); + END; $$ + """ + ) + sql = "CALL p(%s)" + + # prepare/exec before distributing + coord.sql_prepared(sql, (1,)) + + coord.sql("SELECT create_distributed_table('test', 'i')") + coord.sql( + "SELECT create_distributed_function('p(int)', distribution_arg_name := '_i', colocate_with := 'test')" + ) + + # prepare/exec after distribution + coord.sql_prepared(sql, (2,)) + + sum_i = coord.sql_value("select sum(i) from test;") + + assert sum_i == 3 From b47c8b3fb077ea7eb2047801cceb47ebb61539fa Mon Sep 17 00:00:00 2001 From: Jelte Fennema-Nio Date: Thu, 2 Nov 2023 13:15:43 +0100 Subject: [PATCH 4/5] Fix flaky insert_select_connection_leak (#7302) Sometimes in CI insert_select_connection_leak would fail like this: ```diff END; SELECT worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections AS leaked_worker_1_connections, worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections AS leaked_worker_2_connections; leaked_worker_1_connections | leaked_worker_2_connections -----------------------------+----------------------------- - 0 | 0 + -1 | 0 (1 row) -- ROLLBACK BEGIN; INSERT INTO target_table SELECT * FROM source_table; INSERT INTO target_table SELECT * FROM source_table; ROLLBACK; SELECT worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections AS leaked_worker_1_connections, worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections AS leaked_worker_2_connections; leaked_worker_1_connections | leaked_worker_2_connections -----------------------------+----------------------------- - 0 | 0 + -1 | 0 (1 row) \set VERBOSITY TERSE -- Error on constraint failure BEGIN; INSERT INTO target_table SELECT * FROM source_table; SELECT worker_connection_count(:worker_1_port) AS worker_1_connections, worker_connection_count(:worker_2_port) AS worker_2_connections \gset SAVEPOINT s1; INSERT INTO target_table SELECT a, CASE WHEN a < 50 THEN b ELSE null END FROM source_table; @@ -89,15 +89,15 @@ leaked_worker_1_connections | leaked_worker_2_connections -----------------------------+----------------------------- 0 | 0 (1 row) END; SELECT worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections AS leaked_worker_1_connections, worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections AS leaked_worker_2_connections; leaked_worker_1_connections | leaked_worker_2_connections -----------------------------+----------------------------- - 0 | 0 + -1 | 0 (1 row) ``` Source: https://github.com/citusdata/citus/actions/runs/6718401194/attempts/1#summary-18258258387 A negative amount of leaked connectios is obviously not possible. For some reason there was a connection open when we checked the initial amount of connections that was closed afterwards. This could be the from the maintenance daemon or maybe from the previous test that had not fully closed its connections just yet. The change in this PR doesnt't actually fix the cause of the negative connection, but it simply considers it good as well, by changing the result to zero for negative values. With this fix we might sometimes miss a leak, because the negative number can cancel out the leak and still result in a 0. But since the negative number only occurs sometimes, we'll still find the leak often enough. --- .../insert_select_connection_leak.out | 20 +++++++++---------- .../sql/insert_select_connection_leak.sql | 20 +++++++++---------- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/src/test/regress/expected/insert_select_connection_leak.out b/src/test/regress/expected/insert_select_connection_leak.out index 8a983acd5..b342ecde1 100644 --- a/src/test/regress/expected/insert_select_connection_leak.out +++ b/src/test/regress/expected/insert_select_connection_leak.out @@ -47,16 +47,16 @@ INSERT INTO target_table SELECT * FROM source_table; INSERT INTO target_table SELECT * FROM source_table; INSERT INTO target_table SELECT * FROM source_table; INSERT INTO target_table SELECT * FROM source_table; -SELECT worker_connection_count(:worker_1_port) - :worker_1_connections AS leaked_worker_1_connections, - worker_connection_count(:worker_2_port) - :worker_2_connections AS leaked_worker_2_connections; +SELECT GREATEST(0, worker_connection_count(:worker_1_port) - :worker_1_connections) AS leaked_worker_1_connections, + GREATEST(0, worker_connection_count(:worker_2_port) - :worker_2_connections) AS leaked_worker_2_connections; leaked_worker_1_connections | leaked_worker_2_connections --------------------------------------------------------------------- 0 | 0 (1 row) END; -SELECT worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections AS leaked_worker_1_connections, - worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections AS leaked_worker_2_connections; +SELECT GREATEST(0, worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections) AS leaked_worker_1_connections, + GREATEST(0, worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections) AS leaked_worker_2_connections; leaked_worker_1_connections | leaked_worker_2_connections --------------------------------------------------------------------- 0 | 0 @@ -67,8 +67,8 @@ BEGIN; INSERT INTO target_table SELECT * FROM source_table; INSERT INTO target_table SELECT * FROM source_table; ROLLBACK; -SELECT worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections AS leaked_worker_1_connections, - worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections AS leaked_worker_2_connections; +SELECT GREATEST(0, worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections) AS leaked_worker_1_connections, + GREATEST(0, worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections) AS leaked_worker_2_connections; leaked_worker_1_connections | leaked_worker_2_connections --------------------------------------------------------------------- 0 | 0 @@ -84,16 +84,16 @@ SAVEPOINT s1; INSERT INTO target_table SELECT a, CASE WHEN a < 50 THEN b ELSE null END FROM source_table; ERROR: null value in column "b" violates not-null constraint ROLLBACK TO SAVEPOINT s1; -SELECT worker_connection_count(:worker_1_port) - :worker_1_connections AS leaked_worker_1_connections, - worker_connection_count(:worker_2_port) - :worker_2_connections AS leaked_worker_2_connections; +SELECT GREATEST(0, worker_connection_count(:worker_1_port) - :worker_1_connections) AS leaked_worker_1_connections, + GREATEST(0, worker_connection_count(:worker_2_port) - :worker_2_connections) AS leaked_worker_2_connections; leaked_worker_1_connections | leaked_worker_2_connections --------------------------------------------------------------------- 0 | 0 (1 row) END; -SELECT worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections AS leaked_worker_1_connections, - worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections AS leaked_worker_2_connections; +SELECT GREATEST(0, worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections) AS leaked_worker_1_connections, + GREATEST(0, worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections) AS leaked_worker_2_connections; leaked_worker_1_connections | leaked_worker_2_connections --------------------------------------------------------------------- 0 | 0 diff --git a/src/test/regress/sql/insert_select_connection_leak.sql b/src/test/regress/sql/insert_select_connection_leak.sql index 05afb10a0..e138f6c4d 100644 --- a/src/test/regress/sql/insert_select_connection_leak.sql +++ b/src/test/regress/sql/insert_select_connection_leak.sql @@ -33,12 +33,12 @@ INSERT INTO target_table SELECT * FROM source_table; INSERT INTO target_table SELECT * FROM source_table; INSERT INTO target_table SELECT * FROM source_table; INSERT INTO target_table SELECT * FROM source_table; -SELECT worker_connection_count(:worker_1_port) - :worker_1_connections AS leaked_worker_1_connections, - worker_connection_count(:worker_2_port) - :worker_2_connections AS leaked_worker_2_connections; +SELECT GREATEST(0, worker_connection_count(:worker_1_port) - :worker_1_connections) AS leaked_worker_1_connections, + GREATEST(0, worker_connection_count(:worker_2_port) - :worker_2_connections) AS leaked_worker_2_connections; END; -SELECT worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections AS leaked_worker_1_connections, - worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections AS leaked_worker_2_connections; +SELECT GREATEST(0, worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections) AS leaked_worker_1_connections, + GREATEST(0, worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections) AS leaked_worker_2_connections; -- ROLLBACK BEGIN; @@ -46,8 +46,8 @@ INSERT INTO target_table SELECT * FROM source_table; INSERT INTO target_table SELECT * FROM source_table; ROLLBACK; -SELECT worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections AS leaked_worker_1_connections, - worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections AS leaked_worker_2_connections; +SELECT GREATEST(0, worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections) AS leaked_worker_1_connections, + GREATEST(0, worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections) AS leaked_worker_2_connections; \set VERBOSITY TERSE @@ -59,12 +59,12 @@ SELECT worker_connection_count(:worker_1_port) AS worker_1_connections, SAVEPOINT s1; INSERT INTO target_table SELECT a, CASE WHEN a < 50 THEN b ELSE null END FROM source_table; ROLLBACK TO SAVEPOINT s1; -SELECT worker_connection_count(:worker_1_port) - :worker_1_connections AS leaked_worker_1_connections, - worker_connection_count(:worker_2_port) - :worker_2_connections AS leaked_worker_2_connections; +SELECT GREATEST(0, worker_connection_count(:worker_1_port) - :worker_1_connections) AS leaked_worker_1_connections, + GREATEST(0, worker_connection_count(:worker_2_port) - :worker_2_connections) AS leaked_worker_2_connections; END; -SELECT worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections AS leaked_worker_1_connections, - worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections AS leaked_worker_2_connections; +SELECT GREATEST(0, worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections) AS leaked_worker_1_connections, + GREATEST(0, worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections) AS leaked_worker_2_connections; SET client_min_messages TO WARNING; DROP SCHEMA insert_select_connection_leak CASCADE; From f171ec98fc58ac1ab2fdf808535c3259c4dfc3d1 Mon Sep 17 00:00:00 2001 From: Jelte Fennema-Nio Date: Thu, 2 Nov 2023 13:31:56 +0100 Subject: [PATCH 5/5] Fix flaky failure_distributed_results (#7307) Sometimes in CI we run into this failure: ```diff SELECT resultId, nodeport, rowcount, targetShardId, targetShardIndex FROM partition_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table') NATURAL JOIN pg_dist_node; -WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open +ERROR: connection to the remote node localhost:9060 failed with the following error: connection not open SELECT * FROM distributed_result_info ORDER BY resultId; - resultid | nodeport | rowcount | targetshardid | targetshardindex ---------------------------------------------------------------------- - test_from_100800_to_0 | 9060 | 22 | 100805 | 0 - test_from_100801_to_0 | 57637 | 2 | 100805 | 0 - test_from_100801_to_1 | 57637 | 15 | 100806 | 1 - test_from_100802_to_1 | 57637 | 10 | 100806 | 1 - test_from_100802_to_2 | 57637 | 5 | 100807 | 2 - test_from_100803_to_2 | 57637 | 18 | 100807 | 2 - test_from_100803_to_3 | 57637 | 4 | 100808 | 3 - test_from_100804_to_3 | 9060 | 24 | 100808 | 3 -(8 rows) - +ERROR: current transaction is aborted, commands ignored until end of transaction block -- fetch from worker 2 should fail SAVEPOINT s1; +ERROR: current transaction is aborted, commands ignored until end of transaction block SELECT fetch_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[], 'localhost', :worker_2_port) > 0 AS fetched; -ERROR: could not open file "base/pgsql_job_cache/xx_x_xxx/test_from_100802_to_1.data": No such file or directory -CONTEXT: while executing command on localhost:xxxxx +ERROR: current transaction is aborted, commands ignored until end of transaction block ROLLBACK TO SAVEPOINT s1; +ERROR: savepoint "s1" does not exist -- fetch from worker 1 should succeed SELECT fetch_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[], 'localhost', :worker_1_port) > 0 AS fetched; - fetched ---------------------------------------------------------------------- - t -(1 row) - +ERROR: current transaction is aborted, commands ignored until end of transaction block -- make sure the results read are same as the previous transaction block SELECT count(*), sum(x) FROM read_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[],'binary') AS res (x int); - count | sum ---------------------------------------------------------------------- - 15 | 863 -(1 row) - +ERROR: current transaction is aborted, commands ignored until end of transaction block ROLLBACk; ``` As outlined in the #7306 I created, the reason for this is related to only having a single connection open to the node. Finding and fixing the full cause is not trivial, so instead this PR starts working around this bug by forcing maximum parallelism. Preferably we'd want this workaround not to be necessary, but that requires spending time to fix this. For now having a less flaky CI is good enough. --- src/test/regress/expected/failure_distributed_results.out | 2 ++ src/test/regress/sql/failure_distributed_results.sql | 2 ++ 2 files changed, 4 insertions(+) diff --git a/src/test/regress/expected/failure_distributed_results.out b/src/test/regress/expected/failure_distributed_results.out index fc97c9af6..a316763e3 100644 --- a/src/test/regress/expected/failure_distributed_results.out +++ b/src/test/regress/expected/failure_distributed_results.out @@ -14,6 +14,8 @@ SELECT citus.mitmproxy('conn.allow()'); (1 row) SET citus.next_shard_id TO 100800; +-- Needed because of issue #7306 +SET citus.force_max_query_parallelization TO true; -- always try the 1st replica before the 2nd replica. SET citus.task_assignment_policy TO 'first-replica'; -- diff --git a/src/test/regress/sql/failure_distributed_results.sql b/src/test/regress/sql/failure_distributed_results.sql index 95e4d5513..93e4a9a33 100644 --- a/src/test/regress/sql/failure_distributed_results.sql +++ b/src/test/regress/sql/failure_distributed_results.sql @@ -15,6 +15,8 @@ SET client_min_messages TO WARNING; SELECT citus.mitmproxy('conn.allow()'); SET citus.next_shard_id TO 100800; +-- Needed because of issue #7306 +SET citus.force_max_query_parallelization TO true; -- always try the 1st replica before the 2nd replica. SET citus.task_assignment_policy TO 'first-replica';