From d5472614df5d92f385f4b09e8b474189bfe01a33 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Thu, 21 Jun 2018 13:26:13 +0300 Subject: [PATCH] Use non-data connection for intermediate results Make sure that intermediate results use a connection that is not associated with any placement. That is useful in two ways: - More complex queries can be executed with CTEs - Safely use the same connections when there is a foreign key to reference table from a distributed table, which needs to use the same connection for modifications since the reference table might cascade to the distributed table. --- .../connection/connection_management.c | 44 +++++++++++++++++++ .../connection/placement_connection.c | 11 +++++ .../executor/intermediate_results.c | 9 +++- .../distributed/connection_management.h | 2 + .../distributed/placement_connection.h | 2 + src/test/regress/expected/with_modifying.out | 19 ++++++++ src/test/regress/sql/with_modifying.sql | 11 +++++ 7 files changed, 96 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 636922db9..c3628f320 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -127,6 +127,50 @@ GetNodeConnection(uint32 flags, const char *hostname, int32 port) } +/* + * GetNonDataAccessConnection() establishes a connection to remote node, using + * default user and database. The returned connection is guaranteed to not have + * been used for any data access over any placements. + * + * See StartNonDataAccessConnection for details. + */ +MultiConnection * +GetNonDataAccessConnection(const char *hostname, int32 port) +{ + MultiConnection *connection; + + connection = StartNonDataAccessConnection(hostname, port); + + FinishConnectionEstablishment(connection); + + return connection; +} + + +/* + * StartNonDataAccessConnection() initiates a connection that is + * guaranteed to not have been used for any data access over any + * placements. + * + * The returned connection is started with the default user and database. + */ +MultiConnection * +StartNonDataAccessConnection(const char *hostname, int32 port) +{ + uint32 flags = 0; + MultiConnection *connection = StartNodeConnection(flags, hostname, port); + + if (ConnectionUsedForAnyPlacements(connection)) + { + flags = FORCE_NEW_CONNECTION; + + connection = StartNodeConnection(flags, hostname, port); + } + + return connection; +} + + /* * StartNodeConnection initiates a connection to remote node, using default * user and database. diff --git a/src/backend/distributed/connection/placement_connection.c b/src/backend/distributed/connection/placement_connection.c index 1d5a1c928..10780fda7 100644 --- a/src/backend/distributed/connection/placement_connection.c +++ b/src/backend/distributed/connection/placement_connection.c @@ -811,6 +811,17 @@ ConnectionAccessedDifferentPlacement(MultiConnection *connection, } +/* + * ConnectionUsedForAnyPlacements returns true if the connection + * has not been associated with any placement. + */ +bool +ConnectionUsedForAnyPlacements(MultiConnection *connection) +{ + return !dlist_is_empty(&connection->referencedPlacements); +} + + /* * AssociatePlacementWithShard records shard->placement relation in * ConnectionShardHash. diff --git a/src/backend/distributed/executor/intermediate_results.c b/src/backend/distributed/executor/intermediate_results.c index 9f417166f..a324b7a64 100644 --- a/src/backend/distributed/executor/intermediate_results.c +++ b/src/backend/distributed/executor/intermediate_results.c @@ -268,12 +268,17 @@ RemoteFileDestReceiverStartup(DestReceiver *dest, int operation, foreach(initialNodeCell, initialNodeList) { WorkerNode *workerNode = (WorkerNode *) lfirst(initialNodeCell); - int connectionFlags = 0; char *nodeName = workerNode->workerName; int nodePort = workerNode->workerPort; MultiConnection *connection = NULL; - connection = StartNodeConnection(connectionFlags, nodeName, nodePort); + /* + * We prefer to use a connection that is not associcated with + * any placements. The reason is that we claim this connection + * exclusively and that would prevent the consecutive DML/DDL + * use the same connection. + */ + connection = StartNonDataAccessConnection(nodeName, nodePort); ClaimConnectionExclusively(connection); MarkRemoteTransactionCritical(connection); diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index f0a763ab9..74c3f4e51 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -152,6 +152,8 @@ extern bool CheckConninfo(const char *conninfo, const char **whitelist, /* Low-level connection establishment APIs */ extern MultiConnection * GetNodeConnection(uint32 flags, const char *hostname, int32 port); +extern MultiConnection * GetNonDataAccessConnection(const char *hostname, int32 port); +extern MultiConnection * StartNonDataAccessConnection(const char *hostname, int32 port); extern MultiConnection * StartNodeConnection(uint32 flags, const char *hostname, int32 port); extern MultiConnection * GetNodeUserDatabaseConnection(uint32 flags, const char *hostname, diff --git a/src/include/distributed/placement_connection.h b/src/include/distributed/placement_connection.h index 71e7a1358..d6deaed7e 100644 --- a/src/include/distributed/placement_connection.h +++ b/src/include/distributed/placement_connection.h @@ -62,4 +62,6 @@ extern void ResetShardPlacementAssociation(struct MultiConnection *connection); extern void InitPlacementConnectionManagement(void); +extern bool ConnectionUsedForAnyPlacements(MultiConnection *connection); + #endif /* PLACEMENT_CONNECTION_H */ diff --git a/src/test/regress/expected/with_modifying.out b/src/test/regress/expected/with_modifying.out index ad689c1e0..6ed757167 100644 --- a/src/test/regress/expected/with_modifying.out +++ b/src/test/regress/expected/with_modifying.out @@ -701,6 +701,25 @@ SELECT * FROM summary_table ORDER BY id, counter; 6 | 11 (6 rows) +-- make sure that the intermediate result uses a connection +-- that does not interfere with placement connections +BEGIN; + INSERT INTO modify_table (id) VALUES (10000); + WITH test_cte AS (SELECT count(*) FROM modify_table) SELECT * FROM test_cte; + count +------- + 1 +(1 row) + +ROLLBACK; +-- similarly, make sure that the intermediate result uses a seperate connection + WITH first_query AS (INSERT INTO modify_table (id) VALUES (10001)), + second_query AS (SELECT * FROM modify_table) SELECT count(*) FROM second_query; + count +------- + 1 +(1 row) + DROP SCHEMA with_modifying CASCADE; NOTICE: drop cascades to 4 other objects DETAIL: drop cascades to table users_table diff --git a/src/test/regress/sql/with_modifying.sql b/src/test/regress/sql/with_modifying.sql index f7fb58ef2..de06ef3db 100644 --- a/src/test/regress/sql/with_modifying.sql +++ b/src/test/regress/sql/with_modifying.sql @@ -417,4 +417,15 @@ INSERT INTO summary_table SELECT id, COUNT(*) AS counter FROM raw_data GROUP BY SELECT COUNT(*) FROM modify_table; SELECT * FROM summary_table ORDER BY id, counter; +-- make sure that the intermediate result uses a connection +-- that does not interfere with placement connections +BEGIN; + INSERT INTO modify_table (id) VALUES (10000); + WITH test_cte AS (SELECT count(*) FROM modify_table) SELECT * FROM test_cte; +ROLLBACK; + +-- similarly, make sure that the intermediate result uses a seperate connection + WITH first_query AS (INSERT INTO modify_table (id) VALUES (10001)), + second_query AS (SELECT * FROM modify_table) SELECT count(*) FROM second_query; + DROP SCHEMA with_modifying CASCADE;