mirror of https://github.com/citusdata/citus.git
Make connection assignment more liberal after parallel join wit… (#3456)
Make connection assignment more liberal after parallel join with reference tablepull/3460/head
commit
2e8c118a8f
|
@ -592,6 +592,15 @@ FindPlacementListConnection(int flags, List *placementAccessList, const char *us
|
||||||
"modified over multiple connections")));
|
"modified over multiple connections")));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
else if (accessType == PLACEMENT_ACCESS_SELECT &&
|
||||||
|
placementEntry->hasSecondaryConnections &&
|
||||||
|
!placementConnection->hadDDL && !placementConnection->hadDML)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Two separate connections have already selected from this placement
|
||||||
|
* and it was not modified. There is no benefit to using this connection.
|
||||||
|
*/
|
||||||
|
}
|
||||||
else if (CanUseExistingConnection(flags, userName, placementConnection))
|
else if (CanUseExistingConnection(flags, userName, placementConnection))
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
|
@ -599,7 +608,6 @@ FindPlacementListConnection(int flags, List *placementAccessList, const char *us
|
||||||
*/
|
*/
|
||||||
|
|
||||||
Assert(placementConnection != NULL);
|
Assert(placementConnection != NULL);
|
||||||
|
|
||||||
chosenConnection = placementConnection->connection;
|
chosenConnection = placementConnection->connection;
|
||||||
|
|
||||||
if (placementConnection->hadDDL || placementConnection->hadDML)
|
if (placementConnection->hadDDL || placementConnection->hadDML)
|
||||||
|
|
|
@ -0,0 +1,75 @@
|
||||||
|
--
|
||||||
|
-- failure_parallel_connection.sql tests some behaviour of connection management
|
||||||
|
-- where Citus is expected to use multiple connections.
|
||||||
|
--
|
||||||
|
-- In other words, we're not testing any failures in this test. We're trying to make
|
||||||
|
-- sure that Citus uses 1-connection per placement of distributed table even after
|
||||||
|
-- a join with distributed table
|
||||||
|
--
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE SCHEMA fail_parallel_connection;
|
||||||
|
SET search_path TO 'fail_parallel_connection';
|
||||||
|
SET citus.shard_count TO 4;
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1880000;
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 1880000;
|
||||||
|
CREATE TABLE distributed_table (
|
||||||
|
key int,
|
||||||
|
value int
|
||||||
|
);
|
||||||
|
SELECT create_distributed_table('distributed_table', 'key');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE reference_table (
|
||||||
|
key int,
|
||||||
|
value int
|
||||||
|
);
|
||||||
|
SELECT create_reference_table('reference_table');
|
||||||
|
create_reference_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- make sure that access to the placements of the distributed
|
||||||
|
-- tables use 1 connection
|
||||||
|
SET citus.force_max_query_parallelization TO ON;
|
||||||
|
BEGIN;
|
||||||
|
SELECT count(*) FROM distributed_table JOIN reference_table USING (key);
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").after(1).kill()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- this query should not fail because each placement should be acceessed
|
||||||
|
-- over a seperate connection
|
||||||
|
SELECT count(*) FROM distributed_table JOIN reference_table USING (key);
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP SCHEMA fail_parallel_connection CASCADE;
|
||||||
|
NOTICE: drop cascades to 2 other objects
|
||||||
|
DETAIL: drop cascades to table distributed_table
|
||||||
|
drop cascades to table reference_table
|
||||||
|
SET search_path TO default;
|
|
@ -29,6 +29,54 @@ SELECT create_reference_table('ref_test_table');
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
\COPY ref_test_table FROM stdin delimiter ',';
|
\COPY ref_test_table FROM stdin delimiter ',';
|
||||||
|
-- Test two reference table joins, will both run in parallel
|
||||||
|
BEGIN;
|
||||||
|
SELECT COUNT(*) FROM test_table JOIN ref_test_table USING (id);
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT COUNT(*) FROM test_table JOIN ref_test_table USING (id);
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
-- Test two reference table joins, second one will be serialized
|
||||||
|
BEGIN;
|
||||||
|
SELECT COUNT(*) FROM test_table JOIN ref_test_table USING (id);
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO ref_test_table VALUES(1,2,'da');
|
||||||
|
SELECT COUNT(*) FROM test_table JOIN ref_test_table USING (id);
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
5
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
-- this does not work because the inserts into shards go over different connections
|
||||||
|
-- and the insert into the reference table goes over a single connection, and the
|
||||||
|
-- final SELECT cannot see both
|
||||||
|
BEGIN;
|
||||||
|
SELECT COUNT(*) FROM test_table JOIN ref_test_table USING (id);
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO test_table VALUES(1,2,'da');
|
||||||
|
INSERT INTO test_table VALUES(2,2,'da');
|
||||||
|
INSERT INTO test_table VALUES(3,3,'da');
|
||||||
|
INSERT INTO ref_test_table VALUES(1,2,'da');
|
||||||
|
SELECT COUNT(*) FROM test_table JOIN ref_test_table USING (id);
|
||||||
|
ERROR: cannot perform query with placements that were modified over multiple connections
|
||||||
|
ROLLBACK;
|
||||||
-- Test with select and router insert
|
-- Test with select and router insert
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SELECT COUNT(*) FROM test_table;
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
|
|
@ -4,6 +4,7 @@ test: failure_test_helpers
|
||||||
# this should only be run by pg_regress_multi, you don't need it
|
# this should only be run by pg_regress_multi, you don't need it
|
||||||
test: failure_setup
|
test: failure_setup
|
||||||
test: multi_test_helpers
|
test: multi_test_helpers
|
||||||
|
test: failure_parallel_connection
|
||||||
test: failure_replicated_partitions
|
test: failure_replicated_partitions
|
||||||
test: multi_test_catalog_views
|
test: multi_test_catalog_views
|
||||||
test: failure_insert_select_repartition
|
test: failure_insert_select_repartition
|
||||||
|
|
|
@ -0,0 +1,48 @@
|
||||||
|
--
|
||||||
|
-- failure_parallel_connection.sql tests some behaviour of connection management
|
||||||
|
-- where Citus is expected to use multiple connections.
|
||||||
|
--
|
||||||
|
-- In other words, we're not testing any failures in this test. We're trying to make
|
||||||
|
-- sure that Citus uses 1-connection per placement of distributed table even after
|
||||||
|
-- a join with distributed table
|
||||||
|
--
|
||||||
|
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
|
||||||
|
CREATE SCHEMA fail_parallel_connection;
|
||||||
|
SET search_path TO 'fail_parallel_connection';
|
||||||
|
|
||||||
|
SET citus.shard_count TO 4;
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1880000;
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 1880000;
|
||||||
|
|
||||||
|
CREATE TABLE distributed_table (
|
||||||
|
key int,
|
||||||
|
value int
|
||||||
|
);
|
||||||
|
SELECT create_distributed_table('distributed_table', 'key');
|
||||||
|
|
||||||
|
CREATE TABLE reference_table (
|
||||||
|
key int,
|
||||||
|
value int
|
||||||
|
);
|
||||||
|
SELECT create_reference_table('reference_table');
|
||||||
|
|
||||||
|
-- make sure that access to the placements of the distributed
|
||||||
|
-- tables use 1 connection
|
||||||
|
SET citus.force_max_query_parallelization TO ON;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
SELECT count(*) FROM distributed_table JOIN reference_table USING (key);
|
||||||
|
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").after(1).kill()');
|
||||||
|
|
||||||
|
-- this query should not fail because each placement should be acceessed
|
||||||
|
-- over a seperate connection
|
||||||
|
SELECT count(*) FROM distributed_table JOIN reference_table USING (key);
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
DROP SCHEMA fail_parallel_connection CASCADE;
|
||||||
|
SET search_path TO default;
|
|
@ -40,6 +40,31 @@ SELECT create_reference_table('ref_test_table');
|
||||||
4,5,'rr4'
|
4,5,'rr4'
|
||||||
\.
|
\.
|
||||||
|
|
||||||
|
-- Test two reference table joins, will both run in parallel
|
||||||
|
BEGIN;
|
||||||
|
SELECT COUNT(*) FROM test_table JOIN ref_test_table USING (id);
|
||||||
|
SELECT COUNT(*) FROM test_table JOIN ref_test_table USING (id);
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- Test two reference table joins, second one will be serialized
|
||||||
|
BEGIN;
|
||||||
|
SELECT COUNT(*) FROM test_table JOIN ref_test_table USING (id);
|
||||||
|
INSERT INTO ref_test_table VALUES(1,2,'da');
|
||||||
|
SELECT COUNT(*) FROM test_table JOIN ref_test_table USING (id);
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- this does not work because the inserts into shards go over different connections
|
||||||
|
-- and the insert into the reference table goes over a single connection, and the
|
||||||
|
-- final SELECT cannot see both
|
||||||
|
BEGIN;
|
||||||
|
SELECT COUNT(*) FROM test_table JOIN ref_test_table USING (id);
|
||||||
|
INSERT INTO test_table VALUES(1,2,'da');
|
||||||
|
INSERT INTO test_table VALUES(2,2,'da');
|
||||||
|
INSERT INTO test_table VALUES(3,3,'da');
|
||||||
|
INSERT INTO ref_test_table VALUES(1,2,'da');
|
||||||
|
SELECT COUNT(*) FROM test_table JOIN ref_test_table USING (id);
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
-- Test with select and router insert
|
-- Test with select and router insert
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SELECT COUNT(*) FROM test_table;
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
|
Loading…
Reference in New Issue