Merge branch 'main' into fix-flaky-multi_alter_table_statements

pull/7321/head
Jelte Fennema-Nio 2023-11-02 13:34:56 +01:00 committed by GitHub
commit f623e36a70
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 145 additions and 96 deletions

View File

@ -497,7 +497,6 @@ jobs:
matrix: ${{ fromJson(needs.prepare_parallelization_matrix_32.outputs.json) }}
steps:
- uses: actions/checkout@v3.5.0
- uses: actions/download-artifact@v3.0.1
- uses: "./.github/actions/setup_extension"
- name: Run minimal tests
run: |-

View File

@ -702,6 +702,7 @@ DissuadePlannerFromUsingPlan(PlannedStmt *plan)
* Arbitrarily high cost, but low enough that it can be added up
* without overflowing by choose_custom_plan().
*/
Assert(plan != NULL);
plan->planTree->total_cost = FLT_MAX / 100000000;
}

View File

@ -525,8 +525,16 @@ ShardPlacementForFunctionColocatedWithDistTable(DistObjectCacheEntry *procedure,
if (partitionParam->paramkind == PARAM_EXTERN)
{
/* Don't log a message, we should end up here again without a parameter */
DissuadePlannerFromUsingPlan(plan);
/*
* Don't log a message, we should end up here again without a
* parameter.
* Note that "plan" can be null, for example when a CALL statement
* is prepared.
*/
if (plan)
{
DissuadePlannerFromUsingPlan(plan);
}
return NULL;
}
}

View File

@ -581,6 +581,14 @@ class QueryRunner(ABC):
with self.cur(**kwargs) as cur:
cur.execute(query, params=params)
def sql_prepared(self, query, params=None, **kwargs):
"""Run an SQL query, with prepare=True
This opens a new connection and closes it once the query is done
"""
with self.cur(**kwargs) as cur:
cur.execute(query, params=params, prepare=True)
def sql_row(self, query, params=None, allow_empty_result=False, **kwargs):
"""Run an SQL query that returns a single row and returns this row

View File

@ -0,0 +1,30 @@
def test_call_param(cluster):
# create a distributed table and an associated distributed procedure
# to ensure parameterized CALL succeed, even when the param is the
# distribution key.
coord = cluster.coordinator
coord.sql("CREATE TABLE test(i int)")
coord.sql(
"""
CREATE PROCEDURE p(_i INT) LANGUAGE plpgsql AS $$
BEGIN
INSERT INTO test(i) VALUES (_i);
END; $$
"""
)
sql = "CALL p(%s)"
# prepare/exec before distributing
coord.sql_prepared(sql, (1,))
coord.sql("SELECT create_distributed_table('test', 'i')")
coord.sql(
"SELECT create_distributed_function('p(int)', distribution_arg_name := '_i', colocate_with := 'test')"
)
# prepare/exec after distribution
coord.sql_prepared(sql, (2,))
sum_i = coord.sql_value("select sum(i) from test;")
assert sum_i == 3

View File

@ -14,6 +14,8 @@ SELECT citus.mitmproxy('conn.allow()');
(1 row)
SET citus.next_shard_id TO 100800;
-- Needed because of issue #7306
SET citus.force_max_query_parallelization TO true;
-- always try the 1st replica before the 2nd replica.
SET citus.task_assignment_policy TO 'first-replica';
--

View File

@ -47,16 +47,16 @@ INSERT INTO target_table SELECT * FROM source_table;
INSERT INTO target_table SELECT * FROM source_table;
INSERT INTO target_table SELECT * FROM source_table;
INSERT INTO target_table SELECT * FROM source_table;
SELECT worker_connection_count(:worker_1_port) - :worker_1_connections AS leaked_worker_1_connections,
worker_connection_count(:worker_2_port) - :worker_2_connections AS leaked_worker_2_connections;
SELECT GREATEST(0, worker_connection_count(:worker_1_port) - :worker_1_connections) AS leaked_worker_1_connections,
GREATEST(0, worker_connection_count(:worker_2_port) - :worker_2_connections) AS leaked_worker_2_connections;
leaked_worker_1_connections | leaked_worker_2_connections
---------------------------------------------------------------------
0 | 0
(1 row)
END;
SELECT worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections AS leaked_worker_1_connections,
worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections AS leaked_worker_2_connections;
SELECT GREATEST(0, worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections) AS leaked_worker_1_connections,
GREATEST(0, worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections) AS leaked_worker_2_connections;
leaked_worker_1_connections | leaked_worker_2_connections
---------------------------------------------------------------------
0 | 0
@ -67,8 +67,8 @@ BEGIN;
INSERT INTO target_table SELECT * FROM source_table;
INSERT INTO target_table SELECT * FROM source_table;
ROLLBACK;
SELECT worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections AS leaked_worker_1_connections,
worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections AS leaked_worker_2_connections;
SELECT GREATEST(0, worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections) AS leaked_worker_1_connections,
GREATEST(0, worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections) AS leaked_worker_2_connections;
leaked_worker_1_connections | leaked_worker_2_connections
---------------------------------------------------------------------
0 | 0
@ -84,16 +84,16 @@ SAVEPOINT s1;
INSERT INTO target_table SELECT a, CASE WHEN a < 50 THEN b ELSE null END FROM source_table;
ERROR: null value in column "b" violates not-null constraint
ROLLBACK TO SAVEPOINT s1;
SELECT worker_connection_count(:worker_1_port) - :worker_1_connections AS leaked_worker_1_connections,
worker_connection_count(:worker_2_port) - :worker_2_connections AS leaked_worker_2_connections;
SELECT GREATEST(0, worker_connection_count(:worker_1_port) - :worker_1_connections) AS leaked_worker_1_connections,
GREATEST(0, worker_connection_count(:worker_2_port) - :worker_2_connections) AS leaked_worker_2_connections;
leaked_worker_1_connections | leaked_worker_2_connections
---------------------------------------------------------------------
0 | 0
(1 row)
END;
SELECT worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections AS leaked_worker_1_connections,
worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections AS leaked_worker_2_connections;
SELECT GREATEST(0, worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections) AS leaked_worker_1_connections,
GREATEST(0, worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections) AS leaked_worker_2_connections;
leaked_worker_1_connections | leaked_worker_2_connections
---------------------------------------------------------------------
0 | 0

View File

@ -32,23 +32,21 @@ CREATE SUBSCRIPTION citus_shard_move_subscription_:postgres_oid
PUBLICATION citus_shard_move_publication_:postgres_oid
WITH (enabled=false, slot_name=citus_shard_move_slot_:postgres_oid);
NOTICE: created replication slot "citus_shard_move_slot_10" on publisher
SELECT count(*) from pg_subscription;
count
SELECT subname from pg_subscription;
subname
---------------------------------------------------------------------
1
citus_shard_move_subscription_10
(1 row)
SELECT count(*) from pg_publication;
count
SELECT pubname from pg_publication;
pubname
---------------------------------------------------------------------
0
(1 row)
(0 rows)
SELECT count(*) from pg_replication_slots;
count
SELECT slot_name from pg_replication_slots;
slot_name
---------------------------------------------------------------------
0
(1 row)
(0 rows)
SELECT count(*) FROM dist;
count
@ -58,22 +56,21 @@ SELECT count(*) FROM dist;
\c - - - :worker_1_port
SET search_path TO logical_replication;
SELECT count(*) from pg_subscription;
count
SELECT subname from pg_subscription;
subname
---------------------------------------------------------------------
0
(0 rows)
SELECT pubname from pg_publication;
pubname
---------------------------------------------------------------------
citus_shard_move_publication_10
(1 row)
SELECT count(*) from pg_publication;
count
SELECT slot_name from pg_replication_slots;
slot_name
---------------------------------------------------------------------
1
(1 row)
SELECT count(*) from pg_replication_slots;
count
---------------------------------------------------------------------
1
citus_shard_move_slot_10
(1 row)
SELECT count(*) FROM dist;
@ -90,25 +87,29 @@ select citus_move_shard_placement(6830002, 'localhost', :worker_1_port, 'localho
(1 row)
SELECT public.wait_for_resource_cleanup();
wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
-- the subscription is still there, as there is no cleanup record for it
-- we have created it manually
SELECT count(*) from pg_subscription;
count
SELECT subname from pg_subscription;
subname
---------------------------------------------------------------------
1
citus_shard_move_subscription_10
(1 row)
SELECT count(*) from pg_publication;
count
SELECT pubname from pg_publication;
pubname
---------------------------------------------------------------------
0
(1 row)
(0 rows)
SELECT count(*) from pg_replication_slots;
count
SELECT slot_name from pg_replication_slots;
slot_name
---------------------------------------------------------------------
0
(1 row)
(0 rows)
SELECT count(*) from dist;
count
@ -120,22 +121,21 @@ SELECT count(*) from dist;
SET search_path TO logical_replication;
-- the publication and repslot are still there, as there are no cleanup records for them
-- we have created them manually
SELECT count(*) from pg_subscription;
count
SELECT subname from pg_subscription;
subname
---------------------------------------------------------------------
0
(0 rows)
SELECT pubname from pg_publication;
pubname
---------------------------------------------------------------------
citus_shard_move_publication_10
(1 row)
SELECT count(*) from pg_publication;
count
SELECT slot_name from pg_replication_slots;
slot_name
---------------------------------------------------------------------
1
(1 row)
SELECT count(*) from pg_replication_slots;
count
---------------------------------------------------------------------
1
citus_shard_move_slot_10
(1 row)
SELECT count(*) from dist;
@ -153,23 +153,20 @@ SELECT pg_drop_replication_slot('citus_shard_move_slot_' || :postgres_oid);
\c - - - :worker_2_port
SET search_path TO logical_replication;
SELECT count(*) from pg_subscription;
count
SELECT subname from pg_subscription;
subname
---------------------------------------------------------------------
0
(1 row)
(0 rows)
SELECT count(*) from pg_publication;
count
SELECT pubname from pg_publication;
pubname
---------------------------------------------------------------------
0
(1 row)
(0 rows)
SELECT count(*) from pg_replication_slots;
count
SELECT slot_name from pg_replication_slots;
slot_name
---------------------------------------------------------------------
0
(1 row)
(0 rows)
SELECT count(*) from dist;
count

View File

@ -15,6 +15,8 @@ SET client_min_messages TO WARNING;
SELECT citus.mitmproxy('conn.allow()');
SET citus.next_shard_id TO 100800;
-- Needed because of issue #7306
SET citus.force_max_query_parallelization TO true;
-- always try the 1st replica before the 2nd replica.
SET citus.task_assignment_policy TO 'first-replica';

View File

@ -33,12 +33,12 @@ INSERT INTO target_table SELECT * FROM source_table;
INSERT INTO target_table SELECT * FROM source_table;
INSERT INTO target_table SELECT * FROM source_table;
INSERT INTO target_table SELECT * FROM source_table;
SELECT worker_connection_count(:worker_1_port) - :worker_1_connections AS leaked_worker_1_connections,
worker_connection_count(:worker_2_port) - :worker_2_connections AS leaked_worker_2_connections;
SELECT GREATEST(0, worker_connection_count(:worker_1_port) - :worker_1_connections) AS leaked_worker_1_connections,
GREATEST(0, worker_connection_count(:worker_2_port) - :worker_2_connections) AS leaked_worker_2_connections;
END;
SELECT worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections AS leaked_worker_1_connections,
worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections AS leaked_worker_2_connections;
SELECT GREATEST(0, worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections) AS leaked_worker_1_connections,
GREATEST(0, worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections) AS leaked_worker_2_connections;
-- ROLLBACK
BEGIN;
@ -46,8 +46,8 @@ INSERT INTO target_table SELECT * FROM source_table;
INSERT INTO target_table SELECT * FROM source_table;
ROLLBACK;
SELECT worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections AS leaked_worker_1_connections,
worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections AS leaked_worker_2_connections;
SELECT GREATEST(0, worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections) AS leaked_worker_1_connections,
GREATEST(0, worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections) AS leaked_worker_2_connections;
\set VERBOSITY TERSE
@ -59,12 +59,12 @@ SELECT worker_connection_count(:worker_1_port) AS worker_1_connections,
SAVEPOINT s1;
INSERT INTO target_table SELECT a, CASE WHEN a < 50 THEN b ELSE null END FROM source_table;
ROLLBACK TO SAVEPOINT s1;
SELECT worker_connection_count(:worker_1_port) - :worker_1_connections AS leaked_worker_1_connections,
worker_connection_count(:worker_2_port) - :worker_2_connections AS leaked_worker_2_connections;
SELECT GREATEST(0, worker_connection_count(:worker_1_port) - :worker_1_connections) AS leaked_worker_1_connections,
GREATEST(0, worker_connection_count(:worker_2_port) - :worker_2_connections) AS leaked_worker_2_connections;
END;
SELECT worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections AS leaked_worker_1_connections,
worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections AS leaked_worker_2_connections;
SELECT GREATEST(0, worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections) AS leaked_worker_1_connections,
GREATEST(0, worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections) AS leaked_worker_2_connections;
SET client_min_messages TO WARNING;
DROP SCHEMA insert_select_connection_leak CASCADE;

View File

@ -35,17 +35,17 @@ CREATE SUBSCRIPTION citus_shard_move_subscription_:postgres_oid
WITH (enabled=false, slot_name=citus_shard_move_slot_:postgres_oid);
SELECT count(*) from pg_subscription;
SELECT count(*) from pg_publication;
SELECT count(*) from pg_replication_slots;
SELECT subname from pg_subscription;
SELECT pubname from pg_publication;
SELECT slot_name from pg_replication_slots;
SELECT count(*) FROM dist;
\c - - - :worker_1_port
SET search_path TO logical_replication;
SELECT count(*) from pg_subscription;
SELECT count(*) from pg_publication;
SELECT count(*) from pg_replication_slots;
SELECT subname from pg_subscription;
SELECT pubname from pg_publication;
SELECT slot_name from pg_replication_slots;
SELECT count(*) FROM dist;
\c - - - :master_port
@ -53,11 +53,13 @@ SET search_path TO logical_replication;
select citus_move_shard_placement(6830002, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical');
SELECT public.wait_for_resource_cleanup();
-- the subscription is still there, as there is no cleanup record for it
-- we have created it manually
SELECT count(*) from pg_subscription;
SELECT count(*) from pg_publication;
SELECT count(*) from pg_replication_slots;
SELECT subname from pg_subscription;
SELECT pubname from pg_publication;
SELECT slot_name from pg_replication_slots;
SELECT count(*) from dist;
\c - - - :worker_1_port
@ -65,9 +67,9 @@ SET search_path TO logical_replication;
-- the publication and repslot are still there, as there are no cleanup records for them
-- we have created them manually
SELECT count(*) from pg_subscription;
SELECT count(*) from pg_publication;
SELECT count(*) from pg_replication_slots;
SELECT subname from pg_subscription;
SELECT pubname from pg_publication;
SELECT slot_name from pg_replication_slots;
SELECT count(*) from dist;
DROP PUBLICATION citus_shard_move_publication_:postgres_oid;
@ -76,9 +78,9 @@ SELECT pg_drop_replication_slot('citus_shard_move_slot_' || :postgres_oid);
\c - - - :worker_2_port
SET search_path TO logical_replication;
SELECT count(*) from pg_subscription;
SELECT count(*) from pg_publication;
SELECT count(*) from pg_replication_slots;
SELECT subname from pg_subscription;
SELECT pubname from pg_publication;
SELECT slot_name from pg_replication_slots;
SELECT count(*) from dist;
\c - - - :master_port