diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index ca0a7dfc6..03b45e48c 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -50,6 +50,7 @@ static bool ShouldShutdownConnection(MultiConnection *connection, const int static void ResetConnection(MultiConnection *connection); static void DefaultCitusNoticeProcessor(void *arg, const char *message); static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags); +static void GivePurposeToConnection(MultiConnection *connection, int flags); static bool RemoteTransactionIdle(MultiConnection *connection); static int EventSetSizeForConnectionList(List *connections); @@ -177,48 +178,6 @@ GetNodeConnection(uint32 flags, const char *hostname, int32 port) } -/* - * GetNonDataAccessConnection() establishes a connection to remote node, using - * default user and database. The returned connection is guaranteed to not have - * been used for any data access over any placements. - * - * See StartNonDataAccessConnection for details. - */ -MultiConnection * -GetNonDataAccessConnection(const char *hostname, int32 port) -{ - MultiConnection *connection = StartNonDataAccessConnection(hostname, port); - - FinishConnectionEstablishment(connection); - - return connection; -} - - -/* - * StartNonDataAccessConnection() initiates a connection that is - * guaranteed to not have been used for any data access over any - * placements. - * - * The returned connection is started with the default user and database. - */ -MultiConnection * -StartNonDataAccessConnection(const char *hostname, int32 port) -{ - uint32 flags = 0; - MultiConnection *connection = StartNodeConnection(flags, hostname, port); - - if (ConnectionUsedForAnyPlacements(connection)) - { - flags = FORCE_NEW_CONNECTION; - - connection = StartNodeConnection(flags, hostname, port); - } - - return connection; -} - - /* * StartNodeConnection initiates a connection to remote node, using default * user and database. @@ -351,6 +310,8 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, connection = FindAvailableConnection(entry->connections, flags); if (connection) { + GivePurposeToConnection(connection, flags); + return connection; } } @@ -359,17 +320,27 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, * Either no caching desired, or no pre-established, non-claimed, * connection present. Initiate connection establishment. */ + connection = StartConnectionEstablishment(&key); dlist_push_tail(entry->connections, &connection->connectionNode); ResetShardPlacementAssociation(connection); + GivePurposeToConnection(connection, flags); return connection; } -/* StartNodeUserDatabaseConnection() helper */ +/* + * FindAvailableConnection searches the given list of connections for one that + * is not claimed exclusively or marked as a side channel. If the caller passed + * the REQUIRE_SIDECHANNEL flag, it will only return a connection that has not + * been used to access shard placements and that connectoin will only be returned + * in subsequent calls if the REQUIRE_SIDECHANNEL flag is passed. + * + * If no connection is available, FindAvailableConnection returns NULL. + */ static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags) { @@ -380,19 +351,62 @@ FindAvailableConnection(dlist_head *connections, uint32 flags) MultiConnection *connection = dlist_container(MultiConnection, connectionNode, iter.cur); - /* don't return claimed connections */ if (connection->claimedExclusively) { + /* connection is in use for an ongoing operation */ continue; } - return connection; + if ((flags & REQUIRE_SIDECHANNEL) != 0) + { + if (connection->purpose == CONNECTION_PURPOSE_SIDECHANNEL || + connection->purpose == CONNECTION_PURPOSE_ANY) + { + /* side channel must not have been used to access data */ + Assert(!ConnectionUsedForAnyPlacements(connection)); + + return connection; + } + } + else if (connection->purpose == CONNECTION_PURPOSE_DATA_ACCESS || + connection->purpose == CONNECTION_PURPOSE_ANY) + { + /* can use this connection to access data */ + return connection; + } } return NULL; } +/* + * GivePurposeToConnection gives purpose to a connection if it does not already + * have a purpose. More specifically, it marks the connection as a sidechannel + * if the REQUIRE_SIDECHANNEL flag is set. + */ +static void +GivePurposeToConnection(MultiConnection *connection, int flags) +{ + if (connection->purpose != CONNECTION_PURPOSE_ANY) + { + /* connection already has a purpose */ + return; + } + + if ((flags & REQUIRE_SIDECHANNEL) != 0) + { + /* connection should not be used for data access */ + connection->purpose = CONNECTION_PURPOSE_SIDECHANNEL; + } + else + { + /* connection should be used for data access */ + connection->purpose = CONNECTION_PURPOSE_DATA_ACCESS; + } +} + + /* * CloseNodeConnectionsAfterTransaction sets the forceClose flag of the connections * to a particular node as true such that the connections are no longer cached. This @@ -961,11 +975,11 @@ StartConnectionEstablishment(ConnectionHashKey *key) strlcpy(connection->database, key->database, NAMEDATALEN); strlcpy(connection->user, key->user, NAMEDATALEN); - connection->pgConn = PQconnectStartParams((const char **) entry->keywords, (const char **) entry->values, false); connection->connectionStart = GetCurrentTimestamp(); + connection->purpose = CONNECTION_PURPOSE_ANY; /* * To avoid issues with interrupts not getting caught all our connections @@ -1122,6 +1136,7 @@ ResetConnection(MultiConnection *connection) /* reset copy state */ connection->copyBytesWrittenSinceLastFlush = 0; + connection->purpose = CONNECTION_PURPOSE_ANY; UnclaimConnection(connection); } diff --git a/src/backend/distributed/executor/intermediate_results.c b/src/backend/distributed/executor/intermediate_results.c index 574d8a363..cdcc53005 100644 --- a/src/backend/distributed/executor/intermediate_results.c +++ b/src/backend/distributed/executor/intermediate_results.c @@ -276,7 +276,9 @@ RemoteFileDestReceiverStartup(DestReceiver *dest, int operation, * exclusively and that would prevent the consecutive DML/DDL * use the same connection. */ - MultiConnection *connection = StartNonDataAccessConnection(nodeName, nodePort); + int flags = REQUIRE_SIDECHANNEL; + + MultiConnection *connection = StartNodeConnection(flags, nodeName, nodePort); ClaimConnectionExclusively(connection); MarkRemoteTransactionCritical(connection); diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 1060a23a9..716d4617f 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -47,9 +47,29 @@ enum MultiConnectionMode FOR_DML = 1 << 2, /* open a connection per (co-located set of) placement(s) */ - CONNECTION_PER_PLACEMENT = 1 << 3 + CONNECTION_PER_PLACEMENT = 1 << 3, + + /* connection has not been used to access data */ + REQUIRE_SIDECHANNEL = 1 << 4 }; +/* + * ConnectionPurpose defines what a connection is used for during the + * current transaction. This is primarily to not allocate connections + * that are needed for data access to other purposes. + */ +typedef enum ConnectionPurpose +{ + /* connection can be used for any purpose */ + CONNECTION_PURPOSE_ANY, + + /* connection can be used to access placements */ + CONNECTION_PURPOSE_DATA_ACCESS, + + /* connection can be used for auxiliary functions, but not data access */ + CONNECTION_PURPOSE_SIDECHANNEL +} ConnectionPurpose; + typedef enum MultiConnectionState { MULTI_CONNECTION_INITIAL, @@ -88,6 +108,9 @@ typedef struct MultiConnection /* is the connection currently in use, and shouldn't be used by anything else */ bool claimedExclusively; + /* defines the purpose of the connection */ + ConnectionPurpose purpose; + /* time connection establishment was started, for timeout */ TimestampTz connectionStart; @@ -178,8 +201,6 @@ extern bool CheckConninfo(const char *conninfo, const char **whitelist, /* Low-level connection establishment APIs */ extern MultiConnection * GetNodeConnection(uint32 flags, const char *hostname, int32 port); -extern MultiConnection * GetNonDataAccessConnection(const char *hostname, int32 port); -extern MultiConnection * StartNonDataAccessConnection(const char *hostname, int32 port); extern MultiConnection * StartNodeConnection(uint32 flags, const char *hostname, int32 port); extern MultiConnection * GetNodeUserDatabaseConnection(uint32 flags, const char *hostname, diff --git a/src/test/regress/expected/with_transactions.out b/src/test/regress/expected/with_transactions.out index 9387b558c..95cc90eaf 100644 --- a/src/test/regress/expected/with_transactions.out +++ b/src/test/regress/expected/with_transactions.out @@ -17,15 +17,15 @@ SELECT create_distributed_table('second_raw_table', 'tenant_id'); (1 row) -INSERT INTO - raw_table (tenant_id, income, created_at) -SELECT - i % 10, i * 10.0, timestamp '2014-01-10 20:00:00' + i * interval '1 day' -FROM +INSERT INTO + raw_table (tenant_id, income, created_at) +SELECT + i % 10, i * 10.0, timestamp '2014-01-10 20:00:00' + i * interval '1 day' +FROM generate_series (0, 100) i; INSERT INTO second_raw_table SELECT * FROM raw_table; SET client_min_messages TO DEBUG1; --- run a transaction which DELETE +-- run a transaction which DELETE BEGIN; WITH ids_to_delete AS ( @@ -78,7 +78,7 @@ COMMIT; -- sequential insert followed by parallel update works just fine WITH ids_inserted AS ( - INSERT INTO raw_table VALUES (11, 1000, now()), (12, 1000, now()), (13, 1000, now()) RETURNING tenant_id + INSERT INTO raw_table VALUES (11, 1000, now()), (12, 1000, now()), (13, 1000, now()) RETURNING tenant_id ) UPDATE raw_table SET created_at = '2001-02-10 20:00:00' WHERE tenant_id IN (SELECT tenant_id FROM ids_inserted); DEBUG: generating subplan 12_1 for CTE ids_inserted: INSERT INTO with_transactions.raw_table (tenant_id, income, created_at) VALUES (11,1000,now()), (12,1000,now()), (13,1000,now()) RETURNING raw_table.tenant_id @@ -119,8 +119,29 @@ DEBUG: push down of limit count: 3 ROLLBACK; RESET client_min_messages; -RESET citus.shard_count; +CREATE OR REPLACE FUNCTION run_ctes(id int) +RETURNS text +LANGUAGE 'plpgsql' +AS $BODY$ +DECLARE + value text; +BEGIN + + WITH + dist AS (SELECT tenant_id FROM raw_table WHERE tenant_id < 10 OFFSET 0) + SELECT count(*) INTO value FROM dist WHERE id = tenant_id; + + RETURN value ; +END; +$BODY$; +SELECT count(*) FROM (SELECT run_ctes(s) FROM generate_series(1,current_setting('max_connections')::int+2) s) a; + count +------- + 102 +(1 row) + DROP SCHEMA with_transactions CASCADE; -NOTICE: drop cascades to 2 other objects +NOTICE: drop cascades to 3 other objects DETAIL: drop cascades to table raw_table drop cascades to table second_raw_table +drop cascades to function run_ctes(integer) diff --git a/src/test/regress/sql/with_transactions.sql b/src/test/regress/sql/with_transactions.sql index 6cc7d53eb..7ec722f1c 100644 --- a/src/test/regress/sql/with_transactions.sql +++ b/src/test/regress/sql/with_transactions.sql @@ -82,5 +82,23 @@ SELECT income FROM second_raw_table WHERE tenant_id IN (SELECT * FROM ids_insert ROLLBACK; RESET client_min_messages; -RESET citus.shard_count; + +CREATE OR REPLACE FUNCTION run_ctes(id int) +RETURNS text +LANGUAGE 'plpgsql' +AS $BODY$ +DECLARE + value text; +BEGIN + + WITH + dist AS (SELECT tenant_id FROM raw_table WHERE tenant_id < 10 OFFSET 0) + SELECT count(*) INTO value FROM dist WHERE id = tenant_id; + + RETURN value ; +END; +$BODY$; + +SELECT count(*) FROM (SELECT run_ctes(s) FROM generate_series(1,current_setting('max_connections')::int+2) s) a; + DROP SCHEMA with_transactions CASCADE;