diff --git a/src/backend/distributed/connection/placement_connection.c b/src/backend/distributed/connection/placement_connection.c index ee9aa0c50..f92a93831 100644 --- a/src/backend/distributed/connection/placement_connection.c +++ b/src/backend/distributed/connection/placement_connection.c @@ -592,6 +592,15 @@ FindPlacementListConnection(int flags, List *placementAccessList, const char *us "modified over multiple connections"))); } } + else if (accessType == PLACEMENT_ACCESS_SELECT && + placementEntry->hasSecondaryConnections && + !placementConnection->hadDDL && !placementConnection->hadDML) + { + /* + * Two separate connections have already selected from this placement + * and it was not modified. There is no benefit to using this connection. + */ + } else if (CanUseExistingConnection(flags, userName, placementConnection)) { /* @@ -599,7 +608,6 @@ FindPlacementListConnection(int flags, List *placementAccessList, const char *us */ Assert(placementConnection != NULL); - chosenConnection = placementConnection->connection; if (placementConnection->hadDDL || placementConnection->hadDML) diff --git a/src/test/regress/expected/failure_parallel_connection.out b/src/test/regress/expected/failure_parallel_connection.out new file mode 100644 index 000000000..37321b1b3 --- /dev/null +++ b/src/test/regress/expected/failure_parallel_connection.out @@ -0,0 +1,75 @@ +-- +-- failure_parallel_connection.sql tests some behaviour of connection management +-- where Citus is expected to use multiple connections. +-- +-- In other words, we're not testing any failures in this test. We're trying to make +-- sure that Citus uses 1-connection per placement of distributed table even after +-- a join with distributed table +-- +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +CREATE SCHEMA fail_parallel_connection; +SET search_path TO 'fail_parallel_connection'; +SET citus.shard_count TO 4; +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1880000; +ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 1880000; +CREATE TABLE distributed_table ( + key int, + value int +); +SELECT create_distributed_table('distributed_table', 'key'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE reference_table ( + key int, + value int +); +SELECT create_reference_table('reference_table'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +-- make sure that access to the placements of the distributed +-- tables use 1 connection +SET citus.force_max_query_parallelization TO ON; +BEGIN; + SELECT count(*) FROM distributed_table JOIN reference_table USING (key); + count +--------------------------------------------------------------------- + 0 +(1 row) + + SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").after(1).kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + + -- this query should not fail because each placement should be acceessed + -- over a seperate connection + SELECT count(*) FROM distributed_table JOIN reference_table USING (key); + count +--------------------------------------------------------------------- + 0 +(1 row) + +COMMIT; +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +DROP SCHEMA fail_parallel_connection CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to table distributed_table +drop cascades to table reference_table +SET search_path TO default; diff --git a/src/test/regress/expected/multi_real_time_transaction.out b/src/test/regress/expected/multi_real_time_transaction.out index 795ce8464..663d8c8d2 100644 --- a/src/test/regress/expected/multi_real_time_transaction.out +++ b/src/test/regress/expected/multi_real_time_transaction.out @@ -29,6 +29,54 @@ SELECT create_reference_table('ref_test_table'); (1 row) \COPY ref_test_table FROM stdin delimiter ','; +-- Test two reference table joins, will both run in parallel +BEGIN; +SELECT COUNT(*) FROM test_table JOIN ref_test_table USING (id); + count +--------------------------------------------------------------------- + 4 +(1 row) + +SELECT COUNT(*) FROM test_table JOIN ref_test_table USING (id); + count +--------------------------------------------------------------------- + 4 +(1 row) + +ROLLBACK; +-- Test two reference table joins, second one will be serialized +BEGIN; +SELECT COUNT(*) FROM test_table JOIN ref_test_table USING (id); + count +--------------------------------------------------------------------- + 4 +(1 row) + +INSERT INTO ref_test_table VALUES(1,2,'da'); +SELECT COUNT(*) FROM test_table JOIN ref_test_table USING (id); + count +--------------------------------------------------------------------- + 5 +(1 row) + +ROLLBACK; +-- this does not work because the inserts into shards go over different connections +-- and the insert into the reference table goes over a single connection, and the +-- final SELECT cannot see both +BEGIN; +SELECT COUNT(*) FROM test_table JOIN ref_test_table USING (id); + count +--------------------------------------------------------------------- + 4 +(1 row) + +INSERT INTO test_table VALUES(1,2,'da'); +INSERT INTO test_table VALUES(2,2,'da'); +INSERT INTO test_table VALUES(3,3,'da'); +INSERT INTO ref_test_table VALUES(1,2,'da'); +SELECT COUNT(*) FROM test_table JOIN ref_test_table USING (id); +ERROR: cannot perform query with placements that were modified over multiple connections +ROLLBACK; -- Test with select and router insert BEGIN; SELECT COUNT(*) FROM test_table; diff --git a/src/test/regress/failure_schedule b/src/test/regress/failure_schedule index 0e2143370..fb0ecd0bf 100644 --- a/src/test/regress/failure_schedule +++ b/src/test/regress/failure_schedule @@ -4,6 +4,7 @@ test: failure_test_helpers # this should only be run by pg_regress_multi, you don't need it test: failure_setup test: multi_test_helpers +test: failure_parallel_connection test: failure_replicated_partitions test: multi_test_catalog_views test: failure_insert_select_repartition diff --git a/src/test/regress/sql/failure_parallel_connection.sql b/src/test/regress/sql/failure_parallel_connection.sql new file mode 100644 index 000000000..595b10ced --- /dev/null +++ b/src/test/regress/sql/failure_parallel_connection.sql @@ -0,0 +1,48 @@ +-- +-- failure_parallel_connection.sql tests some behaviour of connection management +-- where Citus is expected to use multiple connections. +-- +-- In other words, we're not testing any failures in this test. We're trying to make +-- sure that Citus uses 1-connection per placement of distributed table even after +-- a join with distributed table +-- + +SELECT citus.mitmproxy('conn.allow()'); + +CREATE SCHEMA fail_parallel_connection; +SET search_path TO 'fail_parallel_connection'; + +SET citus.shard_count TO 4; +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1880000; +ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 1880000; + +CREATE TABLE distributed_table ( + key int, + value int +); +SELECT create_distributed_table('distributed_table', 'key'); + +CREATE TABLE reference_table ( + key int, + value int +); +SELECT create_reference_table('reference_table'); + +-- make sure that access to the placements of the distributed +-- tables use 1 connection +SET citus.force_max_query_parallelization TO ON; + +BEGIN; + SELECT count(*) FROM distributed_table JOIN reference_table USING (key); + + SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").after(1).kill()'); + + -- this query should not fail because each placement should be acceessed + -- over a seperate connection + SELECT count(*) FROM distributed_table JOIN reference_table USING (key); +COMMIT; + + +SELECT citus.mitmproxy('conn.allow()'); +DROP SCHEMA fail_parallel_connection CASCADE; +SET search_path TO default; diff --git a/src/test/regress/sql/multi_real_time_transaction.sql b/src/test/regress/sql/multi_real_time_transaction.sql index ffbf1a43a..f2d77e801 100644 --- a/src/test/regress/sql/multi_real_time_transaction.sql +++ b/src/test/regress/sql/multi_real_time_transaction.sql @@ -40,6 +40,31 @@ SELECT create_reference_table('ref_test_table'); 4,5,'rr4' \. +-- Test two reference table joins, will both run in parallel +BEGIN; +SELECT COUNT(*) FROM test_table JOIN ref_test_table USING (id); +SELECT COUNT(*) FROM test_table JOIN ref_test_table USING (id); +ROLLBACK; + +-- Test two reference table joins, second one will be serialized +BEGIN; +SELECT COUNT(*) FROM test_table JOIN ref_test_table USING (id); +INSERT INTO ref_test_table VALUES(1,2,'da'); +SELECT COUNT(*) FROM test_table JOIN ref_test_table USING (id); +ROLLBACK; + +-- this does not work because the inserts into shards go over different connections +-- and the insert into the reference table goes over a single connection, and the +-- final SELECT cannot see both +BEGIN; +SELECT COUNT(*) FROM test_table JOIN ref_test_table USING (id); +INSERT INTO test_table VALUES(1,2,'da'); +INSERT INTO test_table VALUES(2,2,'da'); +INSERT INTO test_table VALUES(3,3,'da'); +INSERT INTO ref_test_table VALUES(1,2,'da'); +SELECT COUNT(*) FROM test_table JOIN ref_test_table USING (id); +ROLLBACK; + -- Test with select and router insert BEGIN; SELECT COUNT(*) FROM test_table;