diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index f80e42f6d..e938e3904 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -497,7 +497,6 @@ jobs: matrix: ${{ fromJson(needs.prepare_parallelization_matrix_32.outputs.json) }} steps: - uses: actions/checkout@v3.5.0 - - uses: actions/download-artifact@v3.0.1 - uses: "./.github/actions/setup_extension" - name: Run minimal tests run: |- diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 65278d1ea..4f7612f8f 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -702,6 +702,7 @@ DissuadePlannerFromUsingPlan(PlannedStmt *plan) * Arbitrarily high cost, but low enough that it can be added up * without overflowing by choose_custom_plan(). */ + Assert(plan != NULL); plan->planTree->total_cost = FLT_MAX / 100000000; } diff --git a/src/backend/distributed/planner/function_call_delegation.c b/src/backend/distributed/planner/function_call_delegation.c index 2f8da29c0..ce9c818d7 100644 --- a/src/backend/distributed/planner/function_call_delegation.c +++ b/src/backend/distributed/planner/function_call_delegation.c @@ -525,8 +525,16 @@ ShardPlacementForFunctionColocatedWithDistTable(DistObjectCacheEntry *procedure, if (partitionParam->paramkind == PARAM_EXTERN) { - /* Don't log a message, we should end up here again without a parameter */ - DissuadePlannerFromUsingPlan(plan); + /* + * Don't log a message, we should end up here again without a + * parameter. + * Note that "plan" can be null, for example when a CALL statement + * is prepared. + */ + if (plan) + { + DissuadePlannerFromUsingPlan(plan); + } return NULL; } } diff --git a/src/test/regress/citus_tests/common.py b/src/test/regress/citus_tests/common.py index 53c9c7944..40c727189 100644 --- a/src/test/regress/citus_tests/common.py +++ b/src/test/regress/citus_tests/common.py @@ -581,6 +581,14 @@ class QueryRunner(ABC): with self.cur(**kwargs) as cur: cur.execute(query, params=params) + def sql_prepared(self, query, params=None, **kwargs): + """Run an SQL query, with prepare=True + + This opens a new connection and closes it once the query is done + """ + with self.cur(**kwargs) as cur: + cur.execute(query, params=params, prepare=True) + def sql_row(self, query, params=None, allow_empty_result=False, **kwargs): """Run an SQL query that returns a single row and returns this row diff --git a/src/test/regress/citus_tests/test/test_prepared_statements.py b/src/test/regress/citus_tests/test/test_prepared_statements.py new file mode 100644 index 000000000..761ecc30c --- /dev/null +++ b/src/test/regress/citus_tests/test/test_prepared_statements.py @@ -0,0 +1,30 @@ +def test_call_param(cluster): + # create a distributed table and an associated distributed procedure + # to ensure parameterized CALL succeed, even when the param is the + # distribution key. + coord = cluster.coordinator + coord.sql("CREATE TABLE test(i int)") + coord.sql( + """ + CREATE PROCEDURE p(_i INT) LANGUAGE plpgsql AS $$ + BEGIN + INSERT INTO test(i) VALUES (_i); + END; $$ + """ + ) + sql = "CALL p(%s)" + + # prepare/exec before distributing + coord.sql_prepared(sql, (1,)) + + coord.sql("SELECT create_distributed_table('test', 'i')") + coord.sql( + "SELECT create_distributed_function('p(int)', distribution_arg_name := '_i', colocate_with := 'test')" + ) + + # prepare/exec after distribution + coord.sql_prepared(sql, (2,)) + + sum_i = coord.sql_value("select sum(i) from test;") + + assert sum_i == 3 diff --git a/src/test/regress/expected/failure_distributed_results.out b/src/test/regress/expected/failure_distributed_results.out index fc97c9af6..a316763e3 100644 --- a/src/test/regress/expected/failure_distributed_results.out +++ b/src/test/regress/expected/failure_distributed_results.out @@ -14,6 +14,8 @@ SELECT citus.mitmproxy('conn.allow()'); (1 row) SET citus.next_shard_id TO 100800; +-- Needed because of issue #7306 +SET citus.force_max_query_parallelization TO true; -- always try the 1st replica before the 2nd replica. SET citus.task_assignment_policy TO 'first-replica'; -- diff --git a/src/test/regress/expected/insert_select_connection_leak.out b/src/test/regress/expected/insert_select_connection_leak.out index 8a983acd5..b342ecde1 100644 --- a/src/test/regress/expected/insert_select_connection_leak.out +++ b/src/test/regress/expected/insert_select_connection_leak.out @@ -47,16 +47,16 @@ INSERT INTO target_table SELECT * FROM source_table; INSERT INTO target_table SELECT * FROM source_table; INSERT INTO target_table SELECT * FROM source_table; INSERT INTO target_table SELECT * FROM source_table; -SELECT worker_connection_count(:worker_1_port) - :worker_1_connections AS leaked_worker_1_connections, - worker_connection_count(:worker_2_port) - :worker_2_connections AS leaked_worker_2_connections; +SELECT GREATEST(0, worker_connection_count(:worker_1_port) - :worker_1_connections) AS leaked_worker_1_connections, + GREATEST(0, worker_connection_count(:worker_2_port) - :worker_2_connections) AS leaked_worker_2_connections; leaked_worker_1_connections | leaked_worker_2_connections --------------------------------------------------------------------- 0 | 0 (1 row) END; -SELECT worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections AS leaked_worker_1_connections, - worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections AS leaked_worker_2_connections; +SELECT GREATEST(0, worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections) AS leaked_worker_1_connections, + GREATEST(0, worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections) AS leaked_worker_2_connections; leaked_worker_1_connections | leaked_worker_2_connections --------------------------------------------------------------------- 0 | 0 @@ -67,8 +67,8 @@ BEGIN; INSERT INTO target_table SELECT * FROM source_table; INSERT INTO target_table SELECT * FROM source_table; ROLLBACK; -SELECT worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections AS leaked_worker_1_connections, - worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections AS leaked_worker_2_connections; +SELECT GREATEST(0, worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections) AS leaked_worker_1_connections, + GREATEST(0, worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections) AS leaked_worker_2_connections; leaked_worker_1_connections | leaked_worker_2_connections --------------------------------------------------------------------- 0 | 0 @@ -84,16 +84,16 @@ SAVEPOINT s1; INSERT INTO target_table SELECT a, CASE WHEN a < 50 THEN b ELSE null END FROM source_table; ERROR: null value in column "b" violates not-null constraint ROLLBACK TO SAVEPOINT s1; -SELECT worker_connection_count(:worker_1_port) - :worker_1_connections AS leaked_worker_1_connections, - worker_connection_count(:worker_2_port) - :worker_2_connections AS leaked_worker_2_connections; +SELECT GREATEST(0, worker_connection_count(:worker_1_port) - :worker_1_connections) AS leaked_worker_1_connections, + GREATEST(0, worker_connection_count(:worker_2_port) - :worker_2_connections) AS leaked_worker_2_connections; leaked_worker_1_connections | leaked_worker_2_connections --------------------------------------------------------------------- 0 | 0 (1 row) END; -SELECT worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections AS leaked_worker_1_connections, - worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections AS leaked_worker_2_connections; +SELECT GREATEST(0, worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections) AS leaked_worker_1_connections, + GREATEST(0, worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections) AS leaked_worker_2_connections; leaked_worker_1_connections | leaked_worker_2_connections --------------------------------------------------------------------- 0 | 0 diff --git a/src/test/regress/expected/logical_replication.out b/src/test/regress/expected/logical_replication.out index 8a3e96da9..b5a36125a 100644 --- a/src/test/regress/expected/logical_replication.out +++ b/src/test/regress/expected/logical_replication.out @@ -32,23 +32,21 @@ CREATE SUBSCRIPTION citus_shard_move_subscription_:postgres_oid PUBLICATION citus_shard_move_publication_:postgres_oid WITH (enabled=false, slot_name=citus_shard_move_slot_:postgres_oid); NOTICE: created replication slot "citus_shard_move_slot_10" on publisher -SELECT count(*) from pg_subscription; - count +SELECT subname from pg_subscription; + subname --------------------------------------------------------------------- - 1 + citus_shard_move_subscription_10 (1 row) -SELECT count(*) from pg_publication; - count +SELECT pubname from pg_publication; + pubname --------------------------------------------------------------------- - 0 -(1 row) +(0 rows) -SELECT count(*) from pg_replication_slots; - count +SELECT slot_name from pg_replication_slots; + slot_name --------------------------------------------------------------------- - 0 -(1 row) +(0 rows) SELECT count(*) FROM dist; count @@ -58,22 +56,21 @@ SELECT count(*) FROM dist; \c - - - :worker_1_port SET search_path TO logical_replication; -SELECT count(*) from pg_subscription; - count +SELECT subname from pg_subscription; + subname --------------------------------------------------------------------- - 0 +(0 rows) + +SELECT pubname from pg_publication; + pubname +--------------------------------------------------------------------- + citus_shard_move_publication_10 (1 row) -SELECT count(*) from pg_publication; - count +SELECT slot_name from pg_replication_slots; + slot_name --------------------------------------------------------------------- - 1 -(1 row) - -SELECT count(*) from pg_replication_slots; - count ---------------------------------------------------------------------- - 1 + citus_shard_move_slot_10 (1 row) SELECT count(*) FROM dist; @@ -90,25 +87,29 @@ select citus_move_shard_placement(6830002, 'localhost', :worker_1_port, 'localho (1 row) +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + -- the subscription is still there, as there is no cleanup record for it -- we have created it manually -SELECT count(*) from pg_subscription; - count +SELECT subname from pg_subscription; + subname --------------------------------------------------------------------- - 1 + citus_shard_move_subscription_10 (1 row) -SELECT count(*) from pg_publication; - count +SELECT pubname from pg_publication; + pubname --------------------------------------------------------------------- - 0 -(1 row) +(0 rows) -SELECT count(*) from pg_replication_slots; - count +SELECT slot_name from pg_replication_slots; + slot_name --------------------------------------------------------------------- - 0 -(1 row) +(0 rows) SELECT count(*) from dist; count @@ -120,22 +121,21 @@ SELECT count(*) from dist; SET search_path TO logical_replication; -- the publication and repslot are still there, as there are no cleanup records for them -- we have created them manually -SELECT count(*) from pg_subscription; - count +SELECT subname from pg_subscription; + subname --------------------------------------------------------------------- - 0 +(0 rows) + +SELECT pubname from pg_publication; + pubname +--------------------------------------------------------------------- + citus_shard_move_publication_10 (1 row) -SELECT count(*) from pg_publication; - count +SELECT slot_name from pg_replication_slots; + slot_name --------------------------------------------------------------------- - 1 -(1 row) - -SELECT count(*) from pg_replication_slots; - count ---------------------------------------------------------------------- - 1 + citus_shard_move_slot_10 (1 row) SELECT count(*) from dist; @@ -153,23 +153,20 @@ SELECT pg_drop_replication_slot('citus_shard_move_slot_' || :postgres_oid); \c - - - :worker_2_port SET search_path TO logical_replication; -SELECT count(*) from pg_subscription; - count +SELECT subname from pg_subscription; + subname --------------------------------------------------------------------- - 0 -(1 row) +(0 rows) -SELECT count(*) from pg_publication; - count +SELECT pubname from pg_publication; + pubname --------------------------------------------------------------------- - 0 -(1 row) +(0 rows) -SELECT count(*) from pg_replication_slots; - count +SELECT slot_name from pg_replication_slots; + slot_name --------------------------------------------------------------------- - 0 -(1 row) +(0 rows) SELECT count(*) from dist; count diff --git a/src/test/regress/sql/failure_distributed_results.sql b/src/test/regress/sql/failure_distributed_results.sql index 95e4d5513..93e4a9a33 100644 --- a/src/test/regress/sql/failure_distributed_results.sql +++ b/src/test/regress/sql/failure_distributed_results.sql @@ -15,6 +15,8 @@ SET client_min_messages TO WARNING; SELECT citus.mitmproxy('conn.allow()'); SET citus.next_shard_id TO 100800; +-- Needed because of issue #7306 +SET citus.force_max_query_parallelization TO true; -- always try the 1st replica before the 2nd replica. SET citus.task_assignment_policy TO 'first-replica'; diff --git a/src/test/regress/sql/insert_select_connection_leak.sql b/src/test/regress/sql/insert_select_connection_leak.sql index 05afb10a0..e138f6c4d 100644 --- a/src/test/regress/sql/insert_select_connection_leak.sql +++ b/src/test/regress/sql/insert_select_connection_leak.sql @@ -33,12 +33,12 @@ INSERT INTO target_table SELECT * FROM source_table; INSERT INTO target_table SELECT * FROM source_table; INSERT INTO target_table SELECT * FROM source_table; INSERT INTO target_table SELECT * FROM source_table; -SELECT worker_connection_count(:worker_1_port) - :worker_1_connections AS leaked_worker_1_connections, - worker_connection_count(:worker_2_port) - :worker_2_connections AS leaked_worker_2_connections; +SELECT GREATEST(0, worker_connection_count(:worker_1_port) - :worker_1_connections) AS leaked_worker_1_connections, + GREATEST(0, worker_connection_count(:worker_2_port) - :worker_2_connections) AS leaked_worker_2_connections; END; -SELECT worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections AS leaked_worker_1_connections, - worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections AS leaked_worker_2_connections; +SELECT GREATEST(0, worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections) AS leaked_worker_1_connections, + GREATEST(0, worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections) AS leaked_worker_2_connections; -- ROLLBACK BEGIN; @@ -46,8 +46,8 @@ INSERT INTO target_table SELECT * FROM source_table; INSERT INTO target_table SELECT * FROM source_table; ROLLBACK; -SELECT worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections AS leaked_worker_1_connections, - worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections AS leaked_worker_2_connections; +SELECT GREATEST(0, worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections) AS leaked_worker_1_connections, + GREATEST(0, worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections) AS leaked_worker_2_connections; \set VERBOSITY TERSE @@ -59,12 +59,12 @@ SELECT worker_connection_count(:worker_1_port) AS worker_1_connections, SAVEPOINT s1; INSERT INTO target_table SELECT a, CASE WHEN a < 50 THEN b ELSE null END FROM source_table; ROLLBACK TO SAVEPOINT s1; -SELECT worker_connection_count(:worker_1_port) - :worker_1_connections AS leaked_worker_1_connections, - worker_connection_count(:worker_2_port) - :worker_2_connections AS leaked_worker_2_connections; +SELECT GREATEST(0, worker_connection_count(:worker_1_port) - :worker_1_connections) AS leaked_worker_1_connections, + GREATEST(0, worker_connection_count(:worker_2_port) - :worker_2_connections) AS leaked_worker_2_connections; END; -SELECT worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections AS leaked_worker_1_connections, - worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections AS leaked_worker_2_connections; +SELECT GREATEST(0, worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections) AS leaked_worker_1_connections, + GREATEST(0, worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections) AS leaked_worker_2_connections; SET client_min_messages TO WARNING; DROP SCHEMA insert_select_connection_leak CASCADE; diff --git a/src/test/regress/sql/logical_replication.sql b/src/test/regress/sql/logical_replication.sql index 3f8e048ca..a85c70b08 100644 --- a/src/test/regress/sql/logical_replication.sql +++ b/src/test/regress/sql/logical_replication.sql @@ -35,17 +35,17 @@ CREATE SUBSCRIPTION citus_shard_move_subscription_:postgres_oid WITH (enabled=false, slot_name=citus_shard_move_slot_:postgres_oid); -SELECT count(*) from pg_subscription; -SELECT count(*) from pg_publication; -SELECT count(*) from pg_replication_slots; +SELECT subname from pg_subscription; +SELECT pubname from pg_publication; +SELECT slot_name from pg_replication_slots; SELECT count(*) FROM dist; \c - - - :worker_1_port SET search_path TO logical_replication; -SELECT count(*) from pg_subscription; -SELECT count(*) from pg_publication; -SELECT count(*) from pg_replication_slots; +SELECT subname from pg_subscription; +SELECT pubname from pg_publication; +SELECT slot_name from pg_replication_slots; SELECT count(*) FROM dist; \c - - - :master_port @@ -53,11 +53,13 @@ SET search_path TO logical_replication; select citus_move_shard_placement(6830002, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical'); +SELECT public.wait_for_resource_cleanup(); + -- the subscription is still there, as there is no cleanup record for it -- we have created it manually -SELECT count(*) from pg_subscription; -SELECT count(*) from pg_publication; -SELECT count(*) from pg_replication_slots; +SELECT subname from pg_subscription; +SELECT pubname from pg_publication; +SELECT slot_name from pg_replication_slots; SELECT count(*) from dist; \c - - - :worker_1_port @@ -65,9 +67,9 @@ SET search_path TO logical_replication; -- the publication and repslot are still there, as there are no cleanup records for them -- we have created them manually -SELECT count(*) from pg_subscription; -SELECT count(*) from pg_publication; -SELECT count(*) from pg_replication_slots; +SELECT subname from pg_subscription; +SELECT pubname from pg_publication; +SELECT slot_name from pg_replication_slots; SELECT count(*) from dist; DROP PUBLICATION citus_shard_move_publication_:postgres_oid; @@ -76,9 +78,9 @@ SELECT pg_drop_replication_slot('citus_shard_move_slot_' || :postgres_oid); \c - - - :worker_2_port SET search_path TO logical_replication; -SELECT count(*) from pg_subscription; -SELECT count(*) from pg_publication; -SELECT count(*) from pg_replication_slots; +SELECT subname from pg_subscription; +SELECT pubname from pg_publication; +SELECT slot_name from pg_replication_slots; SELECT count(*) from dist; \c - - - :master_port