Forbid using connections that sent intermediate results for data access and vice versa

pull/3301/head
Marco Slot 2019-12-13 10:18:27 +01:00
parent 8092529a2c
commit 2f568ad5a5
5 changed files with 137 additions and 60 deletions

View File

@ -50,6 +50,7 @@ static bool ShouldShutdownConnection(MultiConnection *connection, const int
static void ResetConnection(MultiConnection *connection);
static void DefaultCitusNoticeProcessor(void *arg, const char *message);
static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags);
static void GivePurposeToConnection(MultiConnection *connection, int flags);
static bool RemoteTransactionIdle(MultiConnection *connection);
static int EventSetSizeForConnectionList(List *connections);
@ -177,48 +178,6 @@ GetNodeConnection(uint32 flags, const char *hostname, int32 port)
}
/*
* GetNonDataAccessConnection() establishes a connection to remote node, using
* default user and database. The returned connection is guaranteed to not have
* been used for any data access over any placements.
*
* See StartNonDataAccessConnection for details.
*/
MultiConnection *
GetNonDataAccessConnection(const char *hostname, int32 port)
{
MultiConnection *connection = StartNonDataAccessConnection(hostname, port);
FinishConnectionEstablishment(connection);
return connection;
}
/*
* StartNonDataAccessConnection() initiates a connection that is
* guaranteed to not have been used for any data access over any
* placements.
*
* The returned connection is started with the default user and database.
*/
MultiConnection *
StartNonDataAccessConnection(const char *hostname, int32 port)
{
uint32 flags = 0;
MultiConnection *connection = StartNodeConnection(flags, hostname, port);
if (ConnectionUsedForAnyPlacements(connection))
{
flags = FORCE_NEW_CONNECTION;
connection = StartNodeConnection(flags, hostname, port);
}
return connection;
}
/*
* StartNodeConnection initiates a connection to remote node, using default
* user and database.
@ -351,6 +310,8 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
connection = FindAvailableConnection(entry->connections, flags);
if (connection)
{
GivePurposeToConnection(connection, flags);
return connection;
}
}
@ -359,17 +320,27 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
* Either no caching desired, or no pre-established, non-claimed,
* connection present. Initiate connection establishment.
*/
connection = StartConnectionEstablishment(&key);
dlist_push_tail(entry->connections, &connection->connectionNode);
ResetShardPlacementAssociation(connection);
GivePurposeToConnection(connection, flags);
return connection;
}
/* StartNodeUserDatabaseConnection() helper */
/*
* FindAvailableConnection searches the given list of connections for one that
* is not claimed exclusively or marked as a side channel. If the caller passed
* the REQUIRE_SIDECHANNEL flag, it will only return a connection that has not
* been used to access shard placements and that connectoin will only be returned
* in subsequent calls if the REQUIRE_SIDECHANNEL flag is passed.
*
* If no connection is available, FindAvailableConnection returns NULL.
*/
static MultiConnection *
FindAvailableConnection(dlist_head *connections, uint32 flags)
{
@ -380,19 +351,62 @@ FindAvailableConnection(dlist_head *connections, uint32 flags)
MultiConnection *connection =
dlist_container(MultiConnection, connectionNode, iter.cur);
/* don't return claimed connections */
if (connection->claimedExclusively)
{
/* connection is in use for an ongoing operation */
continue;
}
return connection;
if ((flags & REQUIRE_SIDECHANNEL) != 0)
{
if (connection->purpose == CONNECTION_PURPOSE_SIDECHANNEL ||
connection->purpose == CONNECTION_PURPOSE_ANY)
{
/* side channel must not have been used to access data */
Assert(!ConnectionUsedForAnyPlacements(connection));
return connection;
}
}
else if (connection->purpose == CONNECTION_PURPOSE_DATA_ACCESS ||
connection->purpose == CONNECTION_PURPOSE_ANY)
{
/* can use this connection to access data */
return connection;
}
}
return NULL;
}
/*
* GivePurposeToConnection gives purpose to a connection if it does not already
* have a purpose. More specifically, it marks the connection as a sidechannel
* if the REQUIRE_SIDECHANNEL flag is set.
*/
static void
GivePurposeToConnection(MultiConnection *connection, int flags)
{
if (connection->purpose != CONNECTION_PURPOSE_ANY)
{
/* connection already has a purpose */
return;
}
if ((flags & REQUIRE_SIDECHANNEL) != 0)
{
/* connection should not be used for data access */
connection->purpose = CONNECTION_PURPOSE_SIDECHANNEL;
}
else
{
/* connection should be used for data access */
connection->purpose = CONNECTION_PURPOSE_DATA_ACCESS;
}
}
/*
* CloseNodeConnectionsAfterTransaction sets the forceClose flag of the connections
* to a particular node as true such that the connections are no longer cached. This
@ -961,11 +975,11 @@ StartConnectionEstablishment(ConnectionHashKey *key)
strlcpy(connection->database, key->database, NAMEDATALEN);
strlcpy(connection->user, key->user, NAMEDATALEN);
connection->pgConn = PQconnectStartParams((const char **) entry->keywords,
(const char **) entry->values,
false);
connection->connectionStart = GetCurrentTimestamp();
connection->purpose = CONNECTION_PURPOSE_ANY;
/*
* To avoid issues with interrupts not getting caught all our connections
@ -1122,6 +1136,7 @@ ResetConnection(MultiConnection *connection)
/* reset copy state */
connection->copyBytesWrittenSinceLastFlush = 0;
connection->purpose = CONNECTION_PURPOSE_ANY;
UnclaimConnection(connection);
}

View File

@ -276,7 +276,9 @@ RemoteFileDestReceiverStartup(DestReceiver *dest, int operation,
* exclusively and that would prevent the consecutive DML/DDL
* use the same connection.
*/
MultiConnection *connection = StartNonDataAccessConnection(nodeName, nodePort);
int flags = REQUIRE_SIDECHANNEL;
MultiConnection *connection = StartNodeConnection(flags, nodeName, nodePort);
ClaimConnectionExclusively(connection);
MarkRemoteTransactionCritical(connection);

View File

@ -47,9 +47,29 @@ enum MultiConnectionMode
FOR_DML = 1 << 2,
/* open a connection per (co-located set of) placement(s) */
CONNECTION_PER_PLACEMENT = 1 << 3
CONNECTION_PER_PLACEMENT = 1 << 3,
/* connection has not been used to access data */
REQUIRE_SIDECHANNEL = 1 << 4
};
/*
* ConnectionPurpose defines what a connection is used for during the
* current transaction. This is primarily to not allocate connections
* that are needed for data access to other purposes.
*/
typedef enum ConnectionPurpose
{
/* connection can be used for any purpose */
CONNECTION_PURPOSE_ANY,
/* connection can be used to access placements */
CONNECTION_PURPOSE_DATA_ACCESS,
/* connection can be used for auxiliary functions, but not data access */
CONNECTION_PURPOSE_SIDECHANNEL
} ConnectionPurpose;
typedef enum MultiConnectionState
{
MULTI_CONNECTION_INITIAL,
@ -88,6 +108,9 @@ typedef struct MultiConnection
/* is the connection currently in use, and shouldn't be used by anything else */
bool claimedExclusively;
/* defines the purpose of the connection */
ConnectionPurpose purpose;
/* time connection establishment was started, for timeout */
TimestampTz connectionStart;
@ -178,8 +201,6 @@ extern bool CheckConninfo(const char *conninfo, const char **whitelist,
/* Low-level connection establishment APIs */
extern MultiConnection * GetNodeConnection(uint32 flags, const char *hostname,
int32 port);
extern MultiConnection * GetNonDataAccessConnection(const char *hostname, int32 port);
extern MultiConnection * StartNonDataAccessConnection(const char *hostname, int32 port);
extern MultiConnection * StartNodeConnection(uint32 flags, const char *hostname,
int32 port);
extern MultiConnection * GetNodeUserDatabaseConnection(uint32 flags, const char *hostname,

View File

@ -17,15 +17,15 @@ SELECT create_distributed_table('second_raw_table', 'tenant_id');
(1 row)
INSERT INTO
raw_table (tenant_id, income, created_at)
SELECT
i % 10, i * 10.0, timestamp '2014-01-10 20:00:00' + i * interval '1 day'
FROM
INSERT INTO
raw_table (tenant_id, income, created_at)
SELECT
i % 10, i * 10.0, timestamp '2014-01-10 20:00:00' + i * interval '1 day'
FROM
generate_series (0, 100) i;
INSERT INTO second_raw_table SELECT * FROM raw_table;
SET client_min_messages TO DEBUG1;
-- run a transaction which DELETE
-- run a transaction which DELETE
BEGIN;
WITH ids_to_delete AS
(
@ -78,7 +78,7 @@ COMMIT;
-- sequential insert followed by parallel update works just fine
WITH ids_inserted AS
(
INSERT INTO raw_table VALUES (11, 1000, now()), (12, 1000, now()), (13, 1000, now()) RETURNING tenant_id
INSERT INTO raw_table VALUES (11, 1000, now()), (12, 1000, now()), (13, 1000, now()) RETURNING tenant_id
)
UPDATE raw_table SET created_at = '2001-02-10 20:00:00' WHERE tenant_id IN (SELECT tenant_id FROM ids_inserted);
DEBUG: generating subplan 12_1 for CTE ids_inserted: INSERT INTO with_transactions.raw_table (tenant_id, income, created_at) VALUES (11,1000,now()), (12,1000,now()), (13,1000,now()) RETURNING raw_table.tenant_id
@ -119,8 +119,29 @@ DEBUG: push down of limit count: 3
ROLLBACK;
RESET client_min_messages;
RESET citus.shard_count;
CREATE OR REPLACE FUNCTION run_ctes(id int)
RETURNS text
LANGUAGE 'plpgsql'
AS $BODY$
DECLARE
value text;
BEGIN
WITH
dist AS (SELECT tenant_id FROM raw_table WHERE tenant_id < 10 OFFSET 0)
SELECT count(*) INTO value FROM dist WHERE id = tenant_id;
RETURN value ;
END;
$BODY$;
SELECT count(*) FROM (SELECT run_ctes(s) FROM generate_series(1,current_setting('max_connections')::int+2) s) a;
count
-------
102
(1 row)
DROP SCHEMA with_transactions CASCADE;
NOTICE: drop cascades to 2 other objects
NOTICE: drop cascades to 3 other objects
DETAIL: drop cascades to table raw_table
drop cascades to table second_raw_table
drop cascades to function run_ctes(integer)

View File

@ -82,5 +82,23 @@ SELECT income FROM second_raw_table WHERE tenant_id IN (SELECT * FROM ids_insert
ROLLBACK;
RESET client_min_messages;
RESET citus.shard_count;
CREATE OR REPLACE FUNCTION run_ctes(id int)
RETURNS text
LANGUAGE 'plpgsql'
AS $BODY$
DECLARE
value text;
BEGIN
WITH
dist AS (SELECT tenant_id FROM raw_table WHERE tenant_id < 10 OFFSET 0)
SELECT count(*) INTO value FROM dist WHERE id = tenant_id;
RETURN value ;
END;
$BODY$;
SELECT count(*) FROM (SELECT run_ctes(s) FROM generate_series(1,current_setting('max_connections')::int+2) s) a;
DROP SCHEMA with_transactions CASCADE;