mirror of https://github.com/citusdata/citus.git
Use any available non-data connection for intermediate results (#3301)
Use any available non-data connection for intermediate resultspull/3304/head
commit
8cea662f17
|
@ -50,6 +50,7 @@ static bool ShouldShutdownConnection(MultiConnection *connection, const int
|
|||
static void ResetConnection(MultiConnection *connection);
|
||||
static void DefaultCitusNoticeProcessor(void *arg, const char *message);
|
||||
static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags);
|
||||
static void GivePurposeToConnection(MultiConnection *connection, int flags);
|
||||
static bool RemoteTransactionIdle(MultiConnection *connection);
|
||||
static int EventSetSizeForConnectionList(List *connections);
|
||||
|
||||
|
@ -177,48 +178,6 @@ GetNodeConnection(uint32 flags, const char *hostname, int32 port)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetNonDataAccessConnection() establishes a connection to remote node, using
|
||||
* default user and database. The returned connection is guaranteed to not have
|
||||
* been used for any data access over any placements.
|
||||
*
|
||||
* See StartNonDataAccessConnection for details.
|
||||
*/
|
||||
MultiConnection *
|
||||
GetNonDataAccessConnection(const char *hostname, int32 port)
|
||||
{
|
||||
MultiConnection *connection = StartNonDataAccessConnection(hostname, port);
|
||||
|
||||
FinishConnectionEstablishment(connection);
|
||||
|
||||
return connection;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* StartNonDataAccessConnection() initiates a connection that is
|
||||
* guaranteed to not have been used for any data access over any
|
||||
* placements.
|
||||
*
|
||||
* The returned connection is started with the default user and database.
|
||||
*/
|
||||
MultiConnection *
|
||||
StartNonDataAccessConnection(const char *hostname, int32 port)
|
||||
{
|
||||
uint32 flags = 0;
|
||||
MultiConnection *connection = StartNodeConnection(flags, hostname, port);
|
||||
|
||||
if (ConnectionUsedForAnyPlacements(connection))
|
||||
{
|
||||
flags = FORCE_NEW_CONNECTION;
|
||||
|
||||
connection = StartNodeConnection(flags, hostname, port);
|
||||
}
|
||||
|
||||
return connection;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* StartNodeConnection initiates a connection to remote node, using default
|
||||
* user and database.
|
||||
|
@ -351,6 +310,8 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
|
|||
connection = FindAvailableConnection(entry->connections, flags);
|
||||
if (connection)
|
||||
{
|
||||
GivePurposeToConnection(connection, flags);
|
||||
|
||||
return connection;
|
||||
}
|
||||
}
|
||||
|
@ -359,17 +320,27 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
|
|||
* Either no caching desired, or no pre-established, non-claimed,
|
||||
* connection present. Initiate connection establishment.
|
||||
*/
|
||||
|
||||
connection = StartConnectionEstablishment(&key);
|
||||
|
||||
dlist_push_tail(entry->connections, &connection->connectionNode);
|
||||
|
||||
ResetShardPlacementAssociation(connection);
|
||||
GivePurposeToConnection(connection, flags);
|
||||
|
||||
return connection;
|
||||
}
|
||||
|
||||
|
||||
/* StartNodeUserDatabaseConnection() helper */
|
||||
/*
|
||||
* FindAvailableConnection searches the given list of connections for one that
|
||||
* is not claimed exclusively or marked as a side channel. If the caller passed
|
||||
* the REQUIRE_SIDECHANNEL flag, it will only return a connection that has not
|
||||
* been used to access shard placements and that connectoin will only be returned
|
||||
* in subsequent calls if the REQUIRE_SIDECHANNEL flag is passed.
|
||||
*
|
||||
* If no connection is available, FindAvailableConnection returns NULL.
|
||||
*/
|
||||
static MultiConnection *
|
||||
FindAvailableConnection(dlist_head *connections, uint32 flags)
|
||||
{
|
||||
|
@ -380,19 +351,62 @@ FindAvailableConnection(dlist_head *connections, uint32 flags)
|
|||
MultiConnection *connection =
|
||||
dlist_container(MultiConnection, connectionNode, iter.cur);
|
||||
|
||||
/* don't return claimed connections */
|
||||
if (connection->claimedExclusively)
|
||||
{
|
||||
/* connection is in use for an ongoing operation */
|
||||
continue;
|
||||
}
|
||||
|
||||
return connection;
|
||||
if ((flags & REQUIRE_SIDECHANNEL) != 0)
|
||||
{
|
||||
if (connection->purpose == CONNECTION_PURPOSE_SIDECHANNEL ||
|
||||
connection->purpose == CONNECTION_PURPOSE_ANY)
|
||||
{
|
||||
/* side channel must not have been used to access data */
|
||||
Assert(!ConnectionUsedForAnyPlacements(connection));
|
||||
|
||||
return connection;
|
||||
}
|
||||
}
|
||||
else if (connection->purpose == CONNECTION_PURPOSE_DATA_ACCESS ||
|
||||
connection->purpose == CONNECTION_PURPOSE_ANY)
|
||||
{
|
||||
/* can use this connection to access data */
|
||||
return connection;
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GivePurposeToConnection gives purpose to a connection if it does not already
|
||||
* have a purpose. More specifically, it marks the connection as a sidechannel
|
||||
* if the REQUIRE_SIDECHANNEL flag is set.
|
||||
*/
|
||||
static void
|
||||
GivePurposeToConnection(MultiConnection *connection, int flags)
|
||||
{
|
||||
if (connection->purpose != CONNECTION_PURPOSE_ANY)
|
||||
{
|
||||
/* connection already has a purpose */
|
||||
return;
|
||||
}
|
||||
|
||||
if ((flags & REQUIRE_SIDECHANNEL) != 0)
|
||||
{
|
||||
/* connection should not be used for data access */
|
||||
connection->purpose = CONNECTION_PURPOSE_SIDECHANNEL;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* connection should be used for data access */
|
||||
connection->purpose = CONNECTION_PURPOSE_DATA_ACCESS;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CloseNodeConnectionsAfterTransaction sets the forceClose flag of the connections
|
||||
* to a particular node as true such that the connections are no longer cached. This
|
||||
|
@ -961,11 +975,11 @@ StartConnectionEstablishment(ConnectionHashKey *key)
|
|||
strlcpy(connection->database, key->database, NAMEDATALEN);
|
||||
strlcpy(connection->user, key->user, NAMEDATALEN);
|
||||
|
||||
|
||||
connection->pgConn = PQconnectStartParams((const char **) entry->keywords,
|
||||
(const char **) entry->values,
|
||||
false);
|
||||
connection->connectionStart = GetCurrentTimestamp();
|
||||
connection->purpose = CONNECTION_PURPOSE_ANY;
|
||||
|
||||
/*
|
||||
* To avoid issues with interrupts not getting caught all our connections
|
||||
|
@ -1122,6 +1136,7 @@ ResetConnection(MultiConnection *connection)
|
|||
|
||||
/* reset copy state */
|
||||
connection->copyBytesWrittenSinceLastFlush = 0;
|
||||
connection->purpose = CONNECTION_PURPOSE_ANY;
|
||||
|
||||
UnclaimConnection(connection);
|
||||
}
|
||||
|
|
|
@ -276,7 +276,9 @@ RemoteFileDestReceiverStartup(DestReceiver *dest, int operation,
|
|||
* exclusively and that would prevent the consecutive DML/DDL
|
||||
* use the same connection.
|
||||
*/
|
||||
MultiConnection *connection = StartNonDataAccessConnection(nodeName, nodePort);
|
||||
int flags = REQUIRE_SIDECHANNEL;
|
||||
|
||||
MultiConnection *connection = StartNodeConnection(flags, nodeName, nodePort);
|
||||
ClaimConnectionExclusively(connection);
|
||||
MarkRemoteTransactionCritical(connection);
|
||||
|
||||
|
|
|
@ -47,9 +47,29 @@ enum MultiConnectionMode
|
|||
FOR_DML = 1 << 2,
|
||||
|
||||
/* open a connection per (co-located set of) placement(s) */
|
||||
CONNECTION_PER_PLACEMENT = 1 << 3
|
||||
CONNECTION_PER_PLACEMENT = 1 << 3,
|
||||
|
||||
/* connection has not been used to access data */
|
||||
REQUIRE_SIDECHANNEL = 1 << 4
|
||||
};
|
||||
|
||||
/*
|
||||
* ConnectionPurpose defines what a connection is used for during the
|
||||
* current transaction. This is primarily to not allocate connections
|
||||
* that are needed for data access to other purposes.
|
||||
*/
|
||||
typedef enum ConnectionPurpose
|
||||
{
|
||||
/* connection can be used for any purpose */
|
||||
CONNECTION_PURPOSE_ANY,
|
||||
|
||||
/* connection can be used to access placements */
|
||||
CONNECTION_PURPOSE_DATA_ACCESS,
|
||||
|
||||
/* connection can be used for auxiliary functions, but not data access */
|
||||
CONNECTION_PURPOSE_SIDECHANNEL
|
||||
} ConnectionPurpose;
|
||||
|
||||
typedef enum MultiConnectionState
|
||||
{
|
||||
MULTI_CONNECTION_INITIAL,
|
||||
|
@ -88,6 +108,9 @@ typedef struct MultiConnection
|
|||
/* is the connection currently in use, and shouldn't be used by anything else */
|
||||
bool claimedExclusively;
|
||||
|
||||
/* defines the purpose of the connection */
|
||||
ConnectionPurpose purpose;
|
||||
|
||||
/* time connection establishment was started, for timeout */
|
||||
TimestampTz connectionStart;
|
||||
|
||||
|
@ -178,8 +201,6 @@ extern bool CheckConninfo(const char *conninfo, const char **whitelist,
|
|||
/* Low-level connection establishment APIs */
|
||||
extern MultiConnection * GetNodeConnection(uint32 flags, const char *hostname,
|
||||
int32 port);
|
||||
extern MultiConnection * GetNonDataAccessConnection(const char *hostname, int32 port);
|
||||
extern MultiConnection * StartNonDataAccessConnection(const char *hostname, int32 port);
|
||||
extern MultiConnection * StartNodeConnection(uint32 flags, const char *hostname,
|
||||
int32 port);
|
||||
extern MultiConnection * GetNodeUserDatabaseConnection(uint32 flags, const char *hostname,
|
||||
|
|
|
@ -119,8 +119,29 @@ DEBUG: push down of limit count: 3
|
|||
|
||||
ROLLBACK;
|
||||
RESET client_min_messages;
|
||||
RESET citus.shard_count;
|
||||
CREATE OR REPLACE FUNCTION run_ctes(id int)
|
||||
RETURNS text
|
||||
LANGUAGE 'plpgsql'
|
||||
AS $BODY$
|
||||
DECLARE
|
||||
value text;
|
||||
BEGIN
|
||||
|
||||
WITH
|
||||
dist AS (SELECT tenant_id FROM raw_table WHERE tenant_id < 10 OFFSET 0)
|
||||
SELECT count(*) INTO value FROM dist WHERE id = tenant_id;
|
||||
|
||||
RETURN value ;
|
||||
END;
|
||||
$BODY$;
|
||||
SELECT count(*) FROM (SELECT run_ctes(s) FROM generate_series(1,current_setting('max_connections')::int+2) s) a;
|
||||
count
|
||||
-------
|
||||
102
|
||||
(1 row)
|
||||
|
||||
DROP SCHEMA with_transactions CASCADE;
|
||||
NOTICE: drop cascades to 2 other objects
|
||||
NOTICE: drop cascades to 3 other objects
|
||||
DETAIL: drop cascades to table raw_table
|
||||
drop cascades to table second_raw_table
|
||||
drop cascades to function run_ctes(integer)
|
||||
|
|
|
@ -82,5 +82,23 @@ SELECT income FROM second_raw_table WHERE tenant_id IN (SELECT * FROM ids_insert
|
|||
ROLLBACK;
|
||||
|
||||
RESET client_min_messages;
|
||||
RESET citus.shard_count;
|
||||
|
||||
CREATE OR REPLACE FUNCTION run_ctes(id int)
|
||||
RETURNS text
|
||||
LANGUAGE 'plpgsql'
|
||||
AS $BODY$
|
||||
DECLARE
|
||||
value text;
|
||||
BEGIN
|
||||
|
||||
WITH
|
||||
dist AS (SELECT tenant_id FROM raw_table WHERE tenant_id < 10 OFFSET 0)
|
||||
SELECT count(*) INTO value FROM dist WHERE id = tenant_id;
|
||||
|
||||
RETURN value ;
|
||||
END;
|
||||
$BODY$;
|
||||
|
||||
SELECT count(*) FROM (SELECT run_ctes(s) FROM generate_series(1,current_setting('max_connections')::int+2) s) a;
|
||||
|
||||
DROP SCHEMA with_transactions CASCADE;
|
||||
|
|
Loading…
Reference in New Issue