Merge pull request #4034 from citusdata/copy_shared_pool_size_with_reservations

Implement shared connection count reservation  &  enable `citus.max_shared_pool_size` for COPY
pull/4088/head
Önder Kalacı 2020-08-03 19:03:24 +02:00 committed by GitHub
commit c79c6506b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 1540 additions and 87 deletions

View File

@ -79,12 +79,14 @@
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_router_planner.h"
#include "distributed/multi_executor.h"
#include "distributed/locally_reserved_shared_connections.h"
#include "distributed/placement_connection.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h"
#include "distributed/remote_transaction.h"
#include "distributed/resource_lock.h"
#include "distributed/shard_pruning.h"
#include "distributed/shared_connection_stats.h"
#include "distributed/version_compat.h"
#include "distributed/worker_protocol.h"
#include "distributed/local_multi_copy.h"
@ -259,7 +261,7 @@ static MultiConnection * GetLeastUtilisedCopyConnection(List *connectionStateLis
char *nodeName, int nodePort);
static List * ConnectionStateList(HTAB *connectionStateHash);
static List * ConnectionStateListToNode(HTAB *connectionStateHash,
char *hostname, int port);
const char *hostname, int32 port);
static void InitializeCopyShardState(CopyShardState *shardState,
HTAB *connectionStateHash,
uint64 shardId, bool stopOnFailure, bool
@ -815,6 +817,12 @@ OpenCopyConnectionsForNewShards(CopyStmt *copyStatement,
MultiConnection *connection = GetPlacementConnection(connectionFlags, placement,
nodeUser);
/*
* This code-path doesn't support optional connections, so we don't expect
* NULL connections.
*/
Assert(connection != NULL);
if (PQstatus(connection->pgConn) != CONNECTION_OK)
{
if (stopOnFailure)
@ -2265,6 +2273,15 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
copyDest->connectionStateHash = CreateConnectionStateHash(TopTransactionContext);
RecordRelationAccessIfReferenceTable(tableId, PLACEMENT_ACCESS_DML);
/*
* For all the primary (e.g., writable) nodes, reserve a shared connection.
* We do this upfront because we cannot know which nodes are going to be
* accessed. Since the order of the reservation is important, we need to
* do it right here. For the details on why the order important, see
* the function.
*/
EnsureConnectionPossibilityForPrimaryNodes();
}
@ -2938,6 +2955,12 @@ CitusCopyTo(CopyStmt *copyStatement, char *completionTag)
shardPlacement,
userName);
/*
* This code-path doesn't support optional connections, so we don't expect
* NULL connections.
*/
Assert(connection != NULL);
if (placementIndex == list_length(shardPlacementList) - 1)
{
/* last chance for this shard */
@ -3286,10 +3309,10 @@ ConnectionStateList(HTAB *connectionStateHash)
/*
* ConnectionStateListToNode returns all CopyConnectionState structures in
* the given hash.
* the given hash for a given hostname and port values.
*/
static List *
ConnectionStateListToNode(HTAB *connectionStateHash, char *hostname, int port)
ConnectionStateListToNode(HTAB *connectionStateHash, const char *hostname, int32 port)
{
List *connectionStateList = NIL;
HASH_SEQ_STATUS status;
@ -3533,18 +3556,37 @@ CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement,
ConnectionStateListToNode(connectionStateHash, nodeName, nodePort);
if (HasReachedAdaptiveExecutorPoolSize(copyConnectionStateList))
{
connection =
GetLeastUtilisedCopyConnection(copyConnectionStateList, nodeName, nodePort);
/*
* If we've already reached the executor pool size, there should be at
* least one connection to any given node.
*
* Note that we don't need to mark the connection as critical, since the
* connection was already returned by this function before.
*/
Assert(connection != NULL);
connection = GetLeastUtilisedCopyConnection(copyConnectionStateList,
nodeName,
nodePort);
return connection;
}
if (IsReservationPossible())
{
/*
* Enforce the requirements for adaptive connection management
* (a.k.a., throttle connections if citus.max_shared_pool_size
* reached).
*
* Given that we have done reservations per node, we do not ever
* need to pass WAIT_FOR_CONNECTION, we are sure that there is a
* connection either reserved for this backend or already established
* by the previous commands in the same transaction block.
*/
int adaptiveConnectionManagementFlag = OPTIONAL_CONNECTION;
connectionFlags |= adaptiveConnectionManagementFlag;
}
/*
* For placements that haven't been assigned a connection by a previous command
* in the current transaction, we use a separate connection per placement for
@ -3571,6 +3613,52 @@ CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement,
}
connection = GetPlacementConnection(connectionFlags, placement, nodeUser);
if (connection == NULL)
{
if (list_length(copyConnectionStateList) > 0)
{
/*
* The connection manager throttled any new connections, so pick an existing
* connection with least utilization.
*
* Note that we don't need to mark the connection as critical, since the
* connection was already returned by this function before.
*/
connection =
GetLeastUtilisedCopyConnection(copyConnectionStateList, nodeName,
nodePort);
}
else
{
/*
* For this COPY command, we have not established any connections
* and adaptive connection management throttled the new connection
* request. This could only happen if this COPY command is the
* second (or later) COPY command in a transaction block as the
* first COPY command always gets a connection per node thanks to
* the connection reservation.
*
* As we know that there has been at least one COPY command happened
* earlier, we need to find the connection to that node, and use it.
*/
connection =
ConnectionAvailableToNode(nodeName, nodePort, CurrentUserName(),
CurrentDatabaseName());
/*
* We do not expect this to happen, but still instead of an assert,
* we prefer explicit error message.
*/
if (connection == NULL)
{
ereport(ERROR, (errmsg("could not find an available connection"),
errhint("Set citus.max_shared_pool_size TO -1 to let "
"COPY command finish")));
}
}
return connection;
}
if (PQstatus(connection->pgConn) != CONNECTION_OK)
{
@ -3628,6 +3716,8 @@ HasReachedAdaptiveExecutorPoolSize(List *connectionStateList)
/*
* GetLeastUtilisedCopyConnection returns a MultiConnection to the given node
* with the least number of placements assigned to it.
*
* It is assumed that there exists at least one connection to the node.
*/
static MultiConnection *
GetLeastUtilisedCopyConnection(List *connectionStateList, char *nodeName,
@ -3637,6 +3727,14 @@ GetLeastUtilisedCopyConnection(List *connectionStateList, char *nodeName,
int minPlacementCount = PG_INT32_MAX;
ListCell *connectionStateCell = NULL;
/*
* We only pick the least utilised connection when some connection limits are
* reached such as max_shared_pool_size or max_adaptive_executor_pool_size.
*
* Therefore there should be some connections to choose from.
*/
Assert(list_length(connectionStateList) > 0);
foreach(connectionStateCell, connectionStateList)
{
CopyConnectionState *connectionState = lfirst(connectionStateCell);

View File

@ -46,18 +46,19 @@ int MaxCachedConnectionsPerWorker = 1;
HTAB *ConnectionHash = NULL;
HTAB *ConnParamsHash = NULL;
MemoryContext ConnectionContext = NULL;
static uint32 ConnectionHashHash(const void *key, Size keysize);
static int ConnectionHashCompare(const void *a, const void *b, Size keysize);
static void StartConnectionEstablishment(MultiConnection *connectionn,
ConnectionHashKey *key);
static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags);
static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry);
static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit);
static bool ShouldShutdownConnection(MultiConnection *connection, const int
cachedConnectionCount);
static void ResetConnection(MultiConnection *connection);
static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags);
static bool RemoteTransactionIdle(MultiConnection *connection);
static int EventSetSizeForConnectionList(List *connections);
@ -497,6 +498,38 @@ CloseAllConnectionsAfterTransaction(void)
}
/*
* ConnectionAvailableToNode returns a MultiConnection if the session has at least
* one connection established and avaliable to use to the give node. Else, returns
* false.
*/
MultiConnection *
ConnectionAvailableToNode(char *hostName, int nodePort, const char *userName,
const char *database)
{
ConnectionHashKey key;
bool found = false;
strlcpy(key.hostname, hostName, MAX_NODE_LENGTH);
key.port = nodePort;
strlcpy(key.user, userName, NAMEDATALEN);
strlcpy(key.database, database, NAMEDATALEN);
ConnectionHashEntry *entry =
(ConnectionHashEntry *) hash_search(ConnectionHash, &key, HASH_FIND, &found);
if (!found)
{
return false;
}
int flags = 0;
MultiConnection *connection = FindAvailableConnection(entry->connections, flags);
return connection;
}
/*
* CloseNodeConnectionsAfterTransaction sets the forceClose flag of the connections
* to a particular node as true such that the connections are no longer cached. This

View File

@ -0,0 +1,511 @@
/*-------------------------------------------------------------------------
*
* locally_reserved_shared_connections.c
*
* Keeps track of the number of reserved connections to remote nodes
* for this backend. The primary goal is to complement the logic
* implemented in shared_connections.c which aims to prevent excessive
* number of connections (typically > max_connections) to any worker node.
* With this locally reserved connection stats, we enforce the same
* constraints considering these locally reserved shared connections.
*
* To be more precise, shared connection stats are incremented only with two
* operations: (a) Establishing a connection to a remote node
* (b) Reserving connections, the logic that this
* file implements.
*
* Finally, as the name already implies, once a node has reserved a shared
* connection, it is guaranteed to have the right to establish a connection
* to the given remote node when needed.
*
* For COPY command, we use this fact to reserve connections to the remote nodes
* in the same order as the adaptive executor in order to prevent any resource
* starvations. We need to do this because COPY establishes connections when it
* recieves a tuple that targets a remote node. This is a valuable optimization
* to prevent unnecessary connection establishments, which are pretty expensive.
* Instead, COPY command can reserve connections upfront, and utilize them when
* they are actually needed.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include "access/hash.h"
#include "commands/dbcommands.h"
#include "distributed/listutils.h"
#include "distributed/locally_reserved_shared_connections.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_executor.h"
#include "distributed/placement_connection.h"
#include "distributed/shared_connection_stats.h"
#include "distributed/tuplestore.h"
#include "distributed/worker_manager.h"
#include "utils/hashutils.h"
#include "utils/builtins.h"
#define RESERVED_CONNECTION_COLUMNS 4
/* session specific hash map*/
static HTAB *SessionLocalReservedConnections = NULL;
/*
* Hash key for connection reservations
*/
typedef struct ReservedConnectionHashKey
{
char hostname[MAX_NODE_LENGTH];
int32 port;
Oid databaseOid;
Oid userId;
} ReservedConnectionHashKey;
/*
* Hash entry for per worker information. The rules are as follows:
* - If there is no entry in the hash, we can make a reservation.
* - If usedReservation is false, we have a reservation that we can use.
* - If usedReservation is true, we used the reservation and cannot make more reservations.
*/
typedef struct ReservedConnectionHashEntry
{
ReservedConnectionHashKey key;
bool usedReservation;
} ReservedConnectionHashEntry;
static void StoreAllReservedConnections(Tuplestorestate *tupleStore,
TupleDesc tupleDescriptor);
static ReservedConnectionHashEntry * AllocateOrGetReservedConectionEntry(char *hostName,
int nodePort, Oid
userId, Oid
databaseOid,
bool *found);
static void EnsureConnectionPossibilityForNodeList(List *nodeList);
static uint32 LocalConnectionReserveHashHash(const void *key, Size keysize);
static int LocalConnectionReserveHashCompare(const void *a, const void *b, Size keysize);
PG_FUNCTION_INFO_V1(citus_reserved_connection_stats);
/*
* citus_reserved_connection_stats returns all the avaliable information about all
* the reserved connections. This function is used mostly for testing.
*/
Datum
citus_reserved_connection_stats(PG_FUNCTION_ARGS)
{
TupleDesc tupleDescriptor = NULL;
CheckCitusVersion(ERROR);
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
StoreAllReservedConnections(tupleStore, tupleDescriptor);
/* clean up and return the tuplestore */
tuplestore_donestoring(tupleStore);
PG_RETURN_VOID();
}
/*
* StoreAllReservedConnections gets connections established from the current node
* and inserts them into the given tuplestore.
*
* We don't need to enforce any access privileges as the number of backends
* on any node is already visible on pg_stat_activity to all users.
*/
static void
StoreAllReservedConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor)
{
Datum values[RESERVED_CONNECTION_COLUMNS];
bool isNulls[RESERVED_CONNECTION_COLUMNS];
HASH_SEQ_STATUS status;
ReservedConnectionHashEntry *connectionEntry = NULL;
hash_seq_init(&status, SessionLocalReservedConnections);
while ((connectionEntry =
(ReservedConnectionHashEntry *) hash_seq_search(&status)) != 0)
{
/* get ready for the next tuple */
memset(values, 0, sizeof(values));
memset(isNulls, false, sizeof(isNulls));
char *databaseName = get_database_name(connectionEntry->key.databaseOid);
if (databaseName == NULL)
{
/* database might have been dropped */
continue;
}
values[0] = PointerGetDatum(cstring_to_text(connectionEntry->key.hostname));
values[1] = Int32GetDatum(connectionEntry->key.port);
values[2] = PointerGetDatum(cstring_to_text(databaseName));
values[3] = BoolGetDatum(connectionEntry->usedReservation);
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
}
}
/*
* InitializeLocallyReservedSharedConnections initializes the hashmap in
* ConnectionContext.
*/
void
InitializeLocallyReservedSharedConnections(void)
{
HASHCTL reservedConnectionInfo;
memset(&reservedConnectionInfo, 0, sizeof(reservedConnectionInfo));
reservedConnectionInfo.keysize = sizeof(ReservedConnectionHashKey);
reservedConnectionInfo.entrysize = sizeof(ReservedConnectionHashEntry);
/*
* ConnectionContext is the session local memory context that is used for
* tracking remote connections.
*/
reservedConnectionInfo.hcxt = ConnectionContext;
reservedConnectionInfo.hash = LocalConnectionReserveHashHash;
reservedConnectionInfo.match = LocalConnectionReserveHashCompare;
uint32 hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT | HASH_COMPARE);
SessionLocalReservedConnections =
hash_create("citus session level reserved connectios (host,port,database,user)",
64, &reservedConnectionInfo, hashFlags);
}
/*
* CanUseReservedConnection returns true if we have already reserved at least
* one shared connection in this session that is not used.
*/
bool
CanUseReservedConnection(const char *hostName, int nodePort, Oid userId,
Oid databaseOid)
{
ReservedConnectionHashKey key;
strlcpy(key.hostname, hostName, MAX_NODE_LENGTH);
key.userId = userId;
key.port = nodePort;
key.databaseOid = databaseOid;
bool found = false;
ReservedConnectionHashEntry *entry =
(ReservedConnectionHashEntry *) hash_search(SessionLocalReservedConnections, &key,
HASH_FIND, &found);
if (!found || !entry)
{
return false;
}
return !entry->usedReservation;
}
/*
* DeallocateReservedConnections is responsible for two things. First, if the operation
* has reserved a connection but not used, it gives back the connection back to the
* shared memory pool. Second, for all cases, it deallocates the session local entry from
* the hash.
*/
void
DeallocateReservedConnections(void)
{
HASH_SEQ_STATUS status;
ReservedConnectionHashEntry *entry;
hash_seq_init(&status, SessionLocalReservedConnections);
while ((entry = (ReservedConnectionHashEntry *) hash_seq_search(&status)) != 0)
{
if (!entry->usedReservation)
{
/*
* We have not used this reservation, make sure to clean-up from
* the shared memory as well.
*/
DecrementSharedConnectionCounter(entry->key.hostname, entry->key.port);
/* for completeness, set it to true */
entry->usedReservation = true;
}
/*
* We cleaned up all the entries because we may not need reserved connections
* in the next iteration.
*/
bool found = false;
hash_search(SessionLocalReservedConnections, entry, HASH_REMOVE, &found);
Assert(found);
}
}
/*
* MarkReservedConnectionUsed sets the local hash that the reservation is used.
*/
void
MarkReservedConnectionUsed(const char *hostName, int nodePort, Oid userId,
Oid databaseOid)
{
ReservedConnectionHashKey key;
strlcpy(key.hostname, hostName, MAX_NODE_LENGTH);
key.userId = userId;
key.port = nodePort;
key.databaseOid = databaseOid;
bool found = false;
ReservedConnectionHashEntry *entry =
(ReservedConnectionHashEntry *) hash_search(
SessionLocalReservedConnections, &key, HASH_FIND, &found);
if (!found)
{
ereport(ERROR, (errmsg("BUG: untracked reserved connection"),
errhint("Set citus.max_shared_pool_size TO -1 to "
"disable reserved connection counters")));
}
/* a reservation can only be used once */
Assert(!entry->usedReservation);
entry->usedReservation = true;
}
/*
* EnsureConnectionPossibilityForPrimaryNodes is a wrapper around
* EnsureConnectionPossibilityForNodeList.
*/
void
EnsureConnectionPossibilityForPrimaryNodes(void)
{
/*
* By using NoLock there is a tiny risk of that we miss to reserve a
* connection for a concurrently added node. However, that doesn't
* seem to cause any problems as none of the placements that we are
* going to access would be on the new node.
*/
List *primaryNodeList = ActivePrimaryNodeList(NoLock);
EnsureConnectionPossibilityForNodeList(primaryNodeList);
}
/*
* EnsureConnectionPossibilityForNodeList reserves a shared connection
* counter per node in the nodeList unless:
* - Reservation is needed (see IsReservationPossible())
* - there is at least one connection to the node so that we are guranteed
* to get a connection
* - An earlier call already reserved a connection (e.g., we allow only a
* single reservation per backend)
*/
static void
EnsureConnectionPossibilityForNodeList(List *nodeList)
{
if (!IsReservationPossible())
{
return;
}
/*
* We sort the workerList because adaptive connection management
* (e.g., OPTIONAL_CONNECTION) requires any concurrent executions
* to wait for the connections in the same order to prevent any
* starvation. If we don't sort, we might end up with:
* Execution 1: Get connection for worker 1, wait for worker 2
* Execution 2: Get connection for worker 2, wait for worker 1
*
* and, none could proceed. Instead, we enforce every execution establish
* the required connections to workers in the same order.
*/
nodeList = SortList(nodeList, CompareWorkerNodes);
char *databaseName = get_database_name(MyDatabaseId);
Oid userId = GetUserId();
char *userName = GetUserNameFromId(userId, false);
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, nodeList)
{
if (ConnectionAvailableToNode(workerNode->workerName, workerNode->workerPort,
userName, databaseName) != NULL)
{
/*
* The same user has already an active connection for the node. It
* means that the execution can use the same connection, so reservation
* is not necessary.
*/
continue;
}
/*
* We are trying to be defensive here by ensuring that the required hash
* table entry can be allocated. The main goal is that we don't want to be
* in a situation where shared connection counter is incremented but not
* the local reserved counter due to out-of-memory.
*
* Note that shared connection stats operate on the shared memory, and we
* pre-allocate all the necessary memory. In other words, it would never
* throw out of memory error.
*/
bool found = false;
ReservedConnectionHashEntry *hashEntry =
AllocateOrGetReservedConectionEntry(workerNode->workerName,
workerNode->workerPort,
userId, MyDatabaseId, &found);
if (found)
{
/*
* We have already reserved a connection for this user and database
* on the worker. We only allow a single reservation per
* transaction block. The reason is that the earlier command (either in
* a transaction block or a function call triggered by a single command)
* was able to reserve or establish a connection. That connection is
* guranteed to be avaliable for us.
*/
continue;
}
/*
* Increment the shared counter, we may need to wait if there are
* no space left.
*/
WaitLoopForSharedConnection(workerNode->workerName, workerNode->workerPort);
/* locally mark that we have one connection reserved */
hashEntry->usedReservation = false;
}
}
/*
* IsReservationPossible returns true if the state of the current
* session is eligible for shared connection reservation.
*/
bool
IsReservationPossible(void)
{
if (GetMaxSharedPoolSize() == DISABLE_CONNECTION_THROTTLING)
{
/* connection throttling disabled */
return false;
}
if (UseConnectionPerPlacement())
{
/*
* For this case, we are not enforcing adaptive
* connection management anyway.
*/
return false;
}
if (SessionLocalReservedConnections == NULL)
{
/*
* This is unexpected as SessionLocalReservedConnections hash table is
* created at startup. Still, let's be defensive.
*/
return false;
}
return true;
}
/*
* AllocateReservedConectionEntry allocates the required entry in the hash
* map by HASH_ENTER. The function throws an error if it cannot allocate
* the entry.
*/
static ReservedConnectionHashEntry *
AllocateOrGetReservedConectionEntry(char *hostName, int nodePort, Oid userId,
Oid databaseOid, bool *found)
{
ReservedConnectionHashKey key;
*found = false;
strlcpy(key.hostname, hostName, MAX_NODE_LENGTH);
key.userId = userId;
key.port = nodePort;
key.databaseOid = databaseOid;
/*
* Entering a new entry with HASH_ENTER flag is enough as it would
* throw out-of-memory error as it internally does palloc.
*/
ReservedConnectionHashEntry *entry =
(ReservedConnectionHashEntry *) hash_search(SessionLocalReservedConnections,
&key, HASH_ENTER, found);
if (!*found)
{
/*
* Until we reserve connection in the shared memory, we treat
* as if have used the reservation.
*/
entry->usedReservation = true;
}
return entry;
}
/*
* LocalConnectionReserveHashHash is a utilty function to calculate hash of
* ReservedConnectionHashKey.
*/
static uint32
LocalConnectionReserveHashHash(const void *key, Size keysize)
{
ReservedConnectionHashKey *entry = (ReservedConnectionHashKey *) key;
uint32 hash = string_hash(entry->hostname, MAX_NODE_LENGTH);
hash = hash_combine(hash, hash_uint32(entry->userId));
hash = hash_combine(hash, hash_uint32(entry->port));
hash = hash_combine(hash, hash_uint32(entry->databaseOid));
return hash;
}
/*
* LocalConnectionReserveHashCompare is a utilty function to compare
* ReservedConnectionHashKeys.
*/
static int
LocalConnectionReserveHashCompare(const void *a, const void *b, Size keysize)
{
ReservedConnectionHashKey *ca = (ReservedConnectionHashKey *) a;
ReservedConnectionHashKey *cb = (ReservedConnectionHashKey *) b;
if (ca->port != cb->port ||
ca->databaseOid != cb->databaseOid ||
ca->userId != cb->userId ||
strncmp(ca->hostname, cb->hostname, MAX_NODE_LENGTH) != 0)
{
return 1;
}
else
{
return 0;
}
}

View File

@ -18,6 +18,7 @@
#include "distributed/listutils.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_executor.h"
#include "distributed/distributed_planner.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/placement_connection.h"
@ -209,6 +210,14 @@ GetPlacementConnection(uint32 flags, ShardPlacement *placement, const char *user
{
MultiConnection *connection = StartPlacementConnection(flags, placement, userName);
if (connection == NULL)
{
/* connection can only be NULL for optional connections */
Assert((flags & OPTIONAL_CONNECTION));
return NULL;
}
FinishConnectionEstablishment(connection);
return connection;
}
@ -293,25 +302,25 @@ StartPlacementListConnection(uint32 flags, List *placementAccessList,
*/
chosenConnection = StartNodeUserDatabaseConnection(flags, nodeName, nodePort,
userName, NULL);
if (chosenConnection == NULL)
{
/* connection can only be NULL for optional connections */
Assert((flags & OPTIONAL_CONNECTION));
/*
* chosenConnection can only be NULL for optional connections, which we
* don't support in this codepath.
*/
Assert((flags & OPTIONAL_CONNECTION) == 0);
Assert(chosenConnection != NULL);
return NULL;
}
if ((flags & REQUIRE_CLEAN_CONNECTION) &&
ConnectionAccessedDifferentPlacement(chosenConnection, placement))
{
/*
* Cached connection accessed a non-co-located placement in the same
* table or co-location group, while the caller asked for a connection
* per placement. Open a new connection instead.
* table or co-location group, while the caller asked for a clean
* connection. Open a new connection instead.
*
* We use this for situations in which we want to use a different
* connection for every placement, such as COPY. If we blindly returned
* a cached conection that already modified a different, non-co-located
* a cached connection that already modified a different, non-co-located
* placement B in the same table or in a table with the same co-location
* ID as the current placement, then we'd no longer able to write to
* placement B later in the COPY.
@ -321,12 +330,13 @@ StartPlacementListConnection(uint32 flags, List *placementAccessList,
nodeName, nodePort,
userName, NULL);
/*
* chosenConnection can only be NULL for optional connections,
* which we don't support in this codepath.
*/
Assert((flags & OPTIONAL_CONNECTION) == 0);
Assert(chosenConnection != NULL);
if (chosenConnection == NULL)
{
/* connection can only be NULL for optional connections */
Assert((flags & OPTIONAL_CONNECTION));
return NULL;
}
Assert(!ConnectionAccessedDifferentPlacement(chosenConnection, placement));
}
@ -1175,6 +1185,19 @@ InitPlacementConnectionManagement(void)
}
/*
* UseConnectionPerPlacement returns whether we should use as separate connection
* per placement even if another connection is idle. We mostly use this in testing
* scenarios.
*/
bool
UseConnectionPerPlacement(void)
{
return ForceMaxQueryParallelization &&
MultiShardConnectionType != SEQUENTIAL_CONNECTION;
}
static uint32
ColocatedPlacementsHashHash(const void *key, Size keysize)
{

View File

@ -24,9 +24,12 @@
#include "distributed/cancel_utils.h"
#include "distributed/connection_management.h"
#include "distributed/listutils.h"
#include "distributed/locally_reserved_shared_connections.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_executor.h"
#include "distributed/placement_connection.h"
#include "distributed/shared_connection_stats.h"
#include "distributed/worker_manager.h"
#include "distributed/time_constants.h"
#include "distributed/tuplestore.h"
#include "utils/builtins.h"
@ -37,8 +40,6 @@
#define REMOTE_CONNECTION_STATS_COLUMNS 4
#define ADJUST_POOLSIZE_AUTOMATICALLY 0
#define DISABLE_CONNECTION_THROTTLING -1
/*
* The data structure used to store data in shared memory. This data structure is only
@ -55,6 +56,7 @@ typedef struct ConnectionStatsSharedData
ConditionVariable waitersConditionVariable;
} ConnectionStatsSharedData;
typedef struct SharedConnStatsHashKey
{
/*
@ -106,8 +108,8 @@ static void UnLockConnectionSharedMemory(void);
static void SharedConnectionStatsShmemInit(void);
static size_t SharedConnectionStatsShmemSize(void);
static bool ShouldWaitForConnection(int currentConnectionCount);
static int SharedConnectionHashCompare(const void *a, const void *b, Size keysize);
static uint32 SharedConnectionHashHash(const void *key, Size keysize);
static int SharedConnectionHashCompare(const void *a, const void *b, Size keysize);
PG_FUNCTION_INFO_V1(citus_remote_connection_stats);
@ -247,6 +249,18 @@ TryToIncrementSharedConnectionCounter(const char *hostname, int port)
MAX_NODE_LENGTH)));
}
/*
* The local session might already have some reserved connections to the given
* node. In that case, we don't need to go through the shared memory.
*/
Oid userId = GetUserId();
if (CanUseReservedConnection(hostname, port, userId, MyDatabaseId))
{
MarkReservedConnectionUsed(hostname, port, userId, MyDatabaseId);
return true;
}
connKey.port = port;
connKey.databaseOid = MyDatabaseId;
@ -363,7 +377,7 @@ IncrementSharedConnectionCounter(const char *hostname, int port)
/*
* DecrementSharedConnectionCounter decrements the shared counter
* for the given hostname and port.
* for the given hostname and port for the given count.
*/
void
DecrementSharedConnectionCounter(const char *hostname, int port)
@ -634,19 +648,6 @@ AdaptiveConnectionManagementFlag(int activeConnectionCount)
}
/*
* UseConnectionPerPlacement returns whether we should use a separate connection
* per placement even if another connection is idle. We mostly use this in testing
* scenarios.
*/
bool
UseConnectionPerPlacement(void)
{
return ForceMaxQueryParallelization &&
MultiShardConnectionType != SEQUENTIAL_CONNECTION;
}
/*
* ShouldWaitForConnection returns true if the workerPool should wait to
* get the next connection until one slot is empty within

View File

@ -140,6 +140,7 @@
#include "distributed/connection_management.h"
#include "distributed/commands/multi_copy.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/shared_connection_stats.h"
#include "distributed/distributed_execution_locks.h"
#include "distributed/listutils.h"
#include "distributed/local_executor.h"
@ -1924,10 +1925,8 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution)
/*
* WorkerPoolCompare is based on WorkerNodeCompare function.
*
* The function compares two worker nodes by their host name and port
* number.
* WorkerPoolCompare is based on WorkerNodeCompare function. The function
* compares two worker nodes by their host name and port number.
*/
static int
WorkerPoolCompare(const void *lhsKey, const void *rhsKey)

View File

@ -545,6 +545,12 @@ ExecuteDropShardPlacementCommandRemotely(ShardPlacement *shardPlacement,
shardPlacement,
NULL);
/*
* This code-path doesn't support optional connections, so we don't expect
* NULL connections.
*/
Assert(connection != NULL);
RemoteTransactionBeginIfNecessary(connection);
if (PQstatus(connection->pgConn) != CONNECTION_OK)

View File

@ -292,8 +292,16 @@ master_append_table_to_shard(PG_FUNCTION_ARGS)
ShardPlacement *shardPlacement = NULL;
foreach_ptr(shardPlacement, shardPlacementList)
{
MultiConnection *connection = GetPlacementConnection(FOR_DML, shardPlacement,
NULL);
int connectionFlags = FOR_DML;
MultiConnection *connection =
GetPlacementConnection(connectionFlags, shardPlacement, NULL);
/*
* This code-path doesn't support optional connections, so we don't expect
* NULL connections.
*/
Assert(connection != NULL);
PGresult *queryResult = NULL;
StringInfo workerAppendQuery = makeStringInfo();
@ -862,6 +870,12 @@ WorkerShardStats(ShardPlacement *placement, Oid relationId, const char *shardNam
MultiConnection *connection = GetPlacementConnection(connectionFlags, placement,
NULL);
/*
* This code-path doesn't support optional connections, so we don't expect
* NULL connections.
*/
Assert(connection != NULL);
*shardSize = 0;
*shardMinValue = NULL;
*shardMaxValue = NULL;

View File

@ -625,6 +625,12 @@ FetchRemoteExplainFromWorkers(Task *task, ExplainState *es)
MultiConnection *connection = GetPlacementConnection(connectionFlags,
taskPlacement, NULL);
/*
* This code-path doesn't support optional connections, so we don't expect
* NULL connections.
*/
Assert(connection != NULL);
/* try other placements if we fail to connect this one */
if (PQstatus(connection->pgConn) != CONNECTION_OK)
{

View File

@ -38,6 +38,7 @@
#include "distributed/insert_select_executor.h"
#include "distributed/intermediate_result_pruning.h"
#include "distributed/local_executor.h"
#include "distributed/locally_reserved_shared_connections.h"
#include "distributed/maintenanced.h"
#include "distributed/metadata_utility.h"
#include "distributed/coordinator_protocol.h"
@ -284,6 +285,7 @@ _PG_init(void)
InitPlacementConnectionManagement();
InitializeCitusQueryStats();
InitializeSharedConnectionStats();
InitializeLocallyReservedSharedConnections();
/* enable modification of pg_catalog tables during pg_upgrade */
if (IsBinaryUpgrade)
@ -426,6 +428,13 @@ CitusCleanupConnectionsAtExit(int code, Datum arg)
{
/* properly close all the cached connections */
ShutdownAllConnections();
/*
* Make sure that we give the shared connections back to the shared
* pool if any. This operation is a no-op if the reserved connections
* are already given away.
*/
DeallocateReservedConnections();
}

View File

@ -27,6 +27,7 @@
#include "distributed/intermediate_results.h"
#include "distributed/listutils.h"
#include "distributed/local_executor.h"
#include "distributed/locally_reserved_shared_connections.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_explain.h"
#include "distributed/repartition_join_execution.h"
@ -263,6 +264,13 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
ResetGlobalVariables();
/*
* Make sure that we give the shared connections back to the shared
* pool if any. This operation is a no-op if the reserved connections
* are already given away.
*/
DeallocateReservedConnections();
UnSetDistributedTransactionId();
/* empty the CommitContext to ensure we're not leaking memory */
@ -315,6 +323,13 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
ResetGlobalVariables();
/*
* Make sure that we give the shared connections back to the shared
* pool if any. This operation is a no-op if the reserved connections
* are already given away.
*/
DeallocateReservedConnections();
/*
* We reset these mainly for posterity. The only way we would normally
* get here with ExecutorLevel or PlannerLevel > 0 is during a fatal
@ -459,7 +474,6 @@ ResetGlobalVariables()
activeSetStmts = NULL;
CoordinatedTransactionUses2PC = false;
TransactionModifiedNodeMetadata = false;
ResetWorkerErrorIndication();
}

View File

@ -46,7 +46,11 @@ enum MultiConnectionMode
FOR_DML = 1 << 2,
/* connection must not have accessed any non-co-located placements */
/*
* During COPY we do not want to use a connection that accessed non-co-located
* placements. If there is a connection that did not access another placement,
* then use it. Otherwise open a new clean connection.
*/
REQUIRE_CLEAN_CONNECTION = 1 << 3,
OUTSIDE_TRANSACTION = 1 << 4,
@ -195,7 +199,7 @@ extern int MaxCachedConnectionsPerWorker;
/* parameters used for outbound connections */
extern char *NodeConninfo;
/* the hash table */
/* the hash tables are externally accessiable */
extern HTAB *ConnectionHash;
extern HTAB *ConnParamsHash;
@ -234,6 +238,9 @@ extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags,
const char *database);
extern void CloseAllConnectionsAfterTransaction(void);
extern void CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort);
extern MultiConnection * ConnectionAvailableToNode(char *hostName, int nodePort,
const char *userName,
const char *database);
extern void CloseConnection(MultiConnection *connection);
extern void ShutdownAllConnections(void);
extern void ShutdownConnection(MultiConnection *connection);

View File

@ -0,0 +1,26 @@
/*-------------------------------------------------------------------------
*
* locally_reserved_shared_connection_stats.h
* Management of connection reservations in shard memory pool
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef LOCALLY_RESERVED_SHARED_CONNECTIONS_H_
#define LOCALLY_RESERVED_SHARED_CONNECTIONS_H_
#include "distributed/connection_management.h"
extern void InitializeLocallyReservedSharedConnections(void);
extern bool CanUseReservedConnection(const char *hostName, int nodePort,
Oid userId, Oid databaseOid);
extern void MarkReservedConnectionUsed(const char *hostName, int nodePort,
Oid userId, Oid databaseOid);
extern void DeallocateReservedConnections(void);
extern void EnsureConnectionPossibilityForPrimaryNodes(void);
extern bool IsReservationPossible(void);
#endif /* LOCALLY_RESERVED_SHARED_CONNECTIONS_H_ */

View File

@ -76,6 +76,7 @@ extern void CitusExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint
extern void AdaptiveExecutorPreExecutorRun(CitusScanState *scanState);
extern TupleTableSlot * AdaptiveExecutor(CitusScanState *scanState);
/*
* ExecutionParams contains parameters that are used during the execution.
* Some of these can be the zero value if it is not needed during the execution.

View File

@ -42,5 +42,6 @@ extern void InitPlacementConnectionManagement(void);
extern bool ConnectionModifiedPlacement(MultiConnection *connection);
extern bool ConnectionUsedForAnyPlacements(MultiConnection *connection);
extern bool UseConnectionPerPlacement(void);
#endif /* PLACEMENT_CONNECTION_H */

View File

@ -11,6 +11,10 @@
#ifndef SHARED_CONNECTION_STATS_H
#define SHARED_CONNECTION_STATS_H
#define ADJUST_POOLSIZE_AUTOMATICALLY 0
#define DISABLE_CONNECTION_THROTTLING -1
extern int MaxSharedPoolSize;
@ -23,6 +27,5 @@ extern void WaitLoopForSharedConnection(const char *hostname, int port);
extern void DecrementSharedConnectionCounter(const char *hostname, int port);
extern void IncrementSharedConnectionCounter(const char *hostname, int port);
extern int AdaptiveConnectionManagementFlag(int activeConnectionCount);
extern bool UseConnectionPerPlacement(void);
#endif /* SHARED_CONNECTION_STATS_H */

View File

@ -77,6 +77,17 @@ SET search_path TO set_role_in_transaction;
INSERT INTO t values (2);
ERROR: cannot perform query on placements that were modified in this transaction by a different user
ROLLBACK;
-- we cannot change role in between COPY commands as well
SET ROLE user1;
SET search_path TO set_role_in_transaction;
BEGIN;
COPY t FROM STDIN;
SET ROLE user2;
SET search_path TO set_role_in_transaction;
COPY t FROM STDIN;
ERROR: cannot perform query on placements that were modified in this transaction by a different user
CONTEXT: COPY t, line 1: "1"
ROLLBACK;
RESET ROLE;
REVOKE ALL ON SCHEMA set_role_in_transaction FROM user1;
REVOKE ALL ON SCHEMA set_role_in_transaction FROM user2;

View File

@ -1,5 +1,18 @@
CREATE SCHEMA shared_connection_stats;
SET search_path TO shared_connection_stats;
SET citus.next_shard_id TO 14000000;
-- returns the reserved connections per backend
-- given that the code aggresively cleans up reserved connections
-- this function returns empty set in all the tests
-- In fact, we're testing that no reserved connections remain
CREATE OR REPLACE FUNCTION citus_reserved_connection_stats(
OUT hostname text,
OUT port int,
OUT database_name text,
OUT used_reserved_connection bool)
RETURNS SETOF RECORD
LANGUAGE C STRICT
AS 'citus', $$citus_reserved_connection_stats$$;
-- set the cached connections to zero
-- and execute a distributed query so that
-- we end up with zero cached connections afterwards
@ -323,7 +336,7 @@ COMMIT;
BEGIN;
-- now allow at most 2 connections for COPY
SET LOCAL citus.max_adaptive_executor_pool_size TO 2;
COPY test FROM STDIN;
COPY test FROM PROGRAM 'seq 32';
SELECT
connection_count_to_node
FROM
@ -340,6 +353,270 @@ COPY test FROM STDIN;
(2 rows)
ROLLBACK;
-- now, show that COPY doesn't open more connections than the shared_pool_size
-- now, decrease the shared pool size, and show that COPY doesn't exceed that
ALTER SYSTEM SET citus.max_shared_pool_size TO 3;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT pg_sleep(0.1);
pg_sleep
---------------------------------------------------------------------
(1 row)
BEGIN;
COPY test FROM PROGRAM 'seq 32';
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
connection_count_to_node
---------------------------------------------------------------------
3
3
(2 rows)
ROLLBACK;
BEGIN;
-- in this test, we trigger touching only one of the workers
-- the first copy touches 3 shards
COPY test FROM STDIN;
-- we see one worker has 3 connections, the other is 1, which is not
-- an already established connection, but a reservation
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
connection_count_to_node
---------------------------------------------------------------------
3
1
(2 rows)
-- in this second COPY, we access the same node but different shards
-- so we test the case where the second COPY cannot get any new connections
-- due to adaptive connection management, and can still continue
COPY test FROM STDIN;
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
connection_count_to_node
---------------------------------------------------------------------
3
1
(2 rows)
ROLLBACK;
BEGIN;
-- in this test, we trigger touching only one of the workers
-- the first copy touches 3 shards
SELECT count(*) FROM test WHERE a IN (2,4,5);
count
---------------------------------------------------------------------
3
(1 row)
-- we see one worker has 3 connections, the other is 1, which is not
-- an already established connection, but a reservation
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
connection_count_to_node
---------------------------------------------------------------------
3
(1 row)
-- in this second COPY, we access the same node but different shards
-- so we test the case where the second COPY cannot get any new connections
-- due to adaptive connection management, and can still continue
COPY test FROM STDIN;
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
connection_count_to_node
---------------------------------------------------------------------
3
1
(2 rows)
ROLLBACK;
BEGIN;
-- when COPY is used with _max_query_parallelization
-- it ignores the shared pool size
SET LOCAL citus.force_max_query_parallelization TO ON;
COPY test FROM PROGRAM 'seq 32';
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
connection_count_to_node
---------------------------------------------------------------------
10
11
(2 rows)
ROLLBACK;
-- INSERT SELECT with RETURNING/ON CONFLICT clauses should honor shared_pool_size
-- in underlying COPY commands
BEGIN;
SELECT pg_sleep(0.1);
pg_sleep
---------------------------------------------------------------------
(1 row)
INSERT INTO test SELECT i FROM generate_series(0,10) i RETURNING *;
a
---------------------------------------------------------------------
0
1
2
3
4
5
6
7
8
9
10
(11 rows)
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
connection_count_to_node
---------------------------------------------------------------------
3
3
(2 rows)
ROLLBACK;
-- COPY operations to range partitioned tables will honor max_shared_pool_size
-- as we use a single connection to each worker
CREATE TABLE range_table(a int);
SELECT create_distributed_table('range_table', 'a', 'range');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CALL public.create_range_partitioned_shards('range_table',
'{0,25,50,76}',
'{24,49,75,200}');
BEGIN;
SELECT pg_sleep(0.1);
pg_sleep
---------------------------------------------------------------------
(1 row)
COPY range_table FROM PROGRAM 'seq 32';
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
connection_count_to_node
---------------------------------------------------------------------
1
1
(2 rows)
ROLLBACK;
-- COPY operations to reference tables will use one connection per worker
-- so we will always honor max_shared_pool_size.
CREATE TABLE ref_table(a int);
SELECT create_reference_table('ref_table');
create_reference_table
---------------------------------------------------------------------
(1 row)
BEGIN;
SELECT pg_sleep(0.1);
pg_sleep
---------------------------------------------------------------------
(1 row)
COPY ref_table FROM PROGRAM 'seq 32';
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
connection_count_to_node
---------------------------------------------------------------------
1
1
(2 rows)
ROLLBACK;
-- reset max_shared_pool_size to default
ALTER SYSTEM RESET citus.max_shared_pool_size;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT pg_sleep(0.1);
pg_sleep
---------------------------------------------------------------------
(1 row)
-- now show that when max_cached_conns_per_worker > 1
-- Citus forces the first execution to open at least 2
-- connections that are cached. Later, that 2 cached
@ -388,6 +665,149 @@ BEGIN;
t
(2 rows)
COMMIT;
-- we should not have any reserved connection
-- as all of them have already been either used
-- or cleaned up
SELECT * FROM citus_reserved_connection_stats();
hostname | port | database_name | used_reserved_connection
---------------------------------------------------------------------
(0 rows)
-- reconnect to get rid of cached connections
\c - - - :master_port
SET search_path TO shared_connection_stats;
BEGIN;
INSERT INTO test SELECT i FROM generate_series(0,10)i;
-- after COPY finishes, citus should see the used
-- reserved connections
SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2;
hostname | port | database_name | used_reserved_connection
---------------------------------------------------------------------
localhost | 57637 | regression | t
localhost | 57638 | regression | t
(2 rows)
ROLLBACK;
BEGIN;
-- even if we hit a single shard, all the other reserved
-- connections should be cleaned-up because we do not
-- reserve for the second call as we have the cached
-- connections
INSERT INTO test SELECT 1 FROM generate_series(0,100)i;
SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2;
hostname | port | database_name | used_reserved_connection
---------------------------------------------------------------------
localhost | 57637 | regression | f
localhost | 57638 | regression | t
(2 rows)
ROLLBACK;
BEGIN;
TRUNCATE test;
CREATE UNIQUE INDEX test_unique_index ON test(a);
-- even if we hit a single shard and later fail, all the
-- other reserved connections should be cleaned-up
INSERT INTO test SELECT 1 FROM generate_series(0,10)i;
ERROR: duplicate key value violates unique constraint "test_unique_index_14000001"
DETAIL: Key (a)=(1) already exists.
ROLLBACK;
SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2;
hostname | port | database_name | used_reserved_connection
---------------------------------------------------------------------
(0 rows)
BEGIN;
-- hits a single shard
INSERT INTO test SELECT 1 FROM generate_series(0,10)i;
-- if COPY hits a single shard, we should have reserved connections
-- to the other nodes
SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2;
hostname | port | database_name | used_reserved_connection
---------------------------------------------------------------------
localhost | 57637 | regression | f
localhost | 57638 | regression | t
(2 rows)
-- we should be able to see this again if the query hits
-- the same shard
INSERT INTO test SELECT 1 FROM generate_series(0,10)i;
SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2;
hostname | port | database_name | used_reserved_connection
---------------------------------------------------------------------
localhost | 57637 | regression | f
localhost | 57638 | regression | t
(2 rows)
-- but when the query hits the other shard(s), we should
-- see that all the reserved connections are used
INSERT INTO test SELECT i FROM generate_series(0,10)i;
SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2;
hostname | port | database_name | used_reserved_connection
---------------------------------------------------------------------
localhost | 57637 | regression | t
localhost | 57638 | regression | t
(2 rows)
ROLLBACK;
-- at the end of the transaction, all should be cleared
SELECT * FROM citus_reserved_connection_stats();
hostname | port | database_name | used_reserved_connection
---------------------------------------------------------------------
(0 rows)
BEGIN;
SELECT count(*) FROM test;
count
---------------------------------------------------------------------
101
(1 row)
-- the above command used at least one connection per node
-- so the next commands would not need any reserved connections
INSERT INTO test SELECT 1 FROM generate_series(0,10)i;
SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2;
hostname | port | database_name | used_reserved_connection
---------------------------------------------------------------------
(0 rows)
INSERT INTO test SELECT i FROM generate_series(0,10)i;
SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2;
hostname | port | database_name | used_reserved_connection
---------------------------------------------------------------------
(0 rows)
COMMIT;
-- checkout the reserved connections with cached connections
ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 1;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT pg_sleep(0.1);
pg_sleep
---------------------------------------------------------------------
(1 row)
-- cache connections to the nodes
SELECT count(*) FROM test;
count
---------------------------------------------------------------------
123
(1 row)
BEGIN;
-- we should not have any reserved connections
-- because we already have available connections
COPY test FROM PROGRAM 'seq 32';
SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2;
hostname | port | database_name | used_reserved_connection
---------------------------------------------------------------------
(0 rows)
COMMIT;
-- in case other tests relies on these setting, reset them
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
@ -399,5 +819,7 @@ SELECT pg_reload_conf();
t
(1 row)
BEGIN;
SET LOCAL client_min_messages TO WARNING;
DROP SCHEMA shared_connection_stats CASCADE;
NOTICE: drop cascades to table test
COMMIT;

View File

@ -48,6 +48,24 @@ SET search_path TO set_role_in_transaction;
INSERT INTO t values (2);
ROLLBACK;
-- we cannot change role in between COPY commands as well
SET ROLE user1;
SET search_path TO set_role_in_transaction;
BEGIN;
COPY t FROM STDIN;
1
2
3
\.
SET ROLE user2;
SET search_path TO set_role_in_transaction;
COPY t FROM STDIN;
1
2
3
\.
ROLLBACK;
RESET ROLE;
REVOKE ALL ON SCHEMA set_role_in_transaction FROM user1;

View File

@ -1,6 +1,21 @@
CREATE SCHEMA shared_connection_stats;
SET search_path TO shared_connection_stats;
SET citus.next_shard_id TO 14000000;
-- returns the reserved connections per backend
-- given that the code aggresively cleans up reserved connections
-- this function returns empty set in all the tests
-- In fact, we're testing that no reserved connections remain
CREATE OR REPLACE FUNCTION citus_reserved_connection_stats(
OUT hostname text,
OUT port int,
OUT database_name text,
OUT used_reserved_connection bool)
RETURNS SETOF RECORD
LANGUAGE C STRICT
AS 'citus', $$citus_reserved_connection_stats$$;
-- set the cached connections to zero
-- and execute a distributed query so that
-- we end up with zero cached connections afterwards
@ -193,41 +208,7 @@ COMMIT;
BEGIN;
-- now allow at most 2 connections for COPY
SET LOCAL citus.max_adaptive_executor_pool_size TO 2;
COPY test FROM STDIN;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
\.
COPY test FROM PROGRAM 'seq 32';
SELECT
connection_count_to_node
@ -240,6 +221,188 @@ COPY test FROM STDIN;
hostname, port;
ROLLBACK;
-- now, show that COPY doesn't open more connections than the shared_pool_size
-- now, decrease the shared pool size, and show that COPY doesn't exceed that
ALTER SYSTEM SET citus.max_shared_pool_size TO 3;
SELECT pg_reload_conf();
SELECT pg_sleep(0.1);
BEGIN;
COPY test FROM PROGRAM 'seq 32';
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
ROLLBACK;
BEGIN;
-- in this test, we trigger touching only one of the workers
-- the first copy touches 3 shards
COPY test FROM STDIN;
2
4
5
\.
-- we see one worker has 3 connections, the other is 1, which is not
-- an already established connection, but a reservation
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
-- in this second COPY, we access the same node but different shards
-- so we test the case where the second COPY cannot get any new connections
-- due to adaptive connection management, and can still continue
COPY test FROM STDIN;
6
8
9
\.
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
ROLLBACK;
BEGIN;
-- in this test, we trigger touching only one of the workers
-- the first copy touches 3 shards
SELECT count(*) FROM test WHERE a IN (2,4,5);
-- we see one worker has 3 connections, the other is 1, which is not
-- an already established connection, but a reservation
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
-- in this second COPY, we access the same node but different shards
-- so we test the case where the second COPY cannot get any new connections
-- due to adaptive connection management, and can still continue
COPY test FROM STDIN;
6
8
9
\.
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
ROLLBACK;
BEGIN;
-- when COPY is used with _max_query_parallelization
-- it ignores the shared pool size
SET LOCAL citus.force_max_query_parallelization TO ON;
COPY test FROM PROGRAM 'seq 32';
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
ROLLBACK;
-- INSERT SELECT with RETURNING/ON CONFLICT clauses should honor shared_pool_size
-- in underlying COPY commands
BEGIN;
SELECT pg_sleep(0.1);
INSERT INTO test SELECT i FROM generate_series(0,10) i RETURNING *;
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
ROLLBACK;
-- COPY operations to range partitioned tables will honor max_shared_pool_size
-- as we use a single connection to each worker
CREATE TABLE range_table(a int);
SELECT create_distributed_table('range_table', 'a', 'range');
CALL public.create_range_partitioned_shards('range_table',
'{0,25,50,76}',
'{24,49,75,200}');
BEGIN;
SELECT pg_sleep(0.1);
COPY range_table FROM PROGRAM 'seq 32';
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
ROLLBACK;
-- COPY operations to reference tables will use one connection per worker
-- so we will always honor max_shared_pool_size.
CREATE TABLE ref_table(a int);
SELECT create_reference_table('ref_table');
BEGIN;
SELECT pg_sleep(0.1);
COPY ref_table FROM PROGRAM 'seq 32';
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
ROLLBACK;
-- reset max_shared_pool_size to default
ALTER SYSTEM RESET citus.max_shared_pool_size;
SELECT pg_reload_conf();
SELECT pg_sleep(0.1);
-- now show that when max_cached_conns_per_worker > 1
-- Citus forces the first execution to open at least 2
-- connections that are cached. Later, that 2 cached
@ -268,10 +431,97 @@ BEGIN;
hostname, port;
COMMIT;
-- we should not have any reserved connection
-- as all of them have already been either used
-- or cleaned up
SELECT * FROM citus_reserved_connection_stats();
-- reconnect to get rid of cached connections
\c - - - :master_port
SET search_path TO shared_connection_stats;
BEGIN;
INSERT INTO test SELECT i FROM generate_series(0,10)i;
-- after COPY finishes, citus should see the used
-- reserved connections
SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2;
ROLLBACK;
BEGIN;
-- even if we hit a single shard, all the other reserved
-- connections should be cleaned-up because we do not
-- reserve for the second call as we have the cached
-- connections
INSERT INTO test SELECT 1 FROM generate_series(0,100)i;
SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2;
ROLLBACK;
BEGIN;
TRUNCATE test;
CREATE UNIQUE INDEX test_unique_index ON test(a);
-- even if we hit a single shard and later fail, all the
-- other reserved connections should be cleaned-up
INSERT INTO test SELECT 1 FROM generate_series(0,10)i;
ROLLBACK;
SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2;
BEGIN;
-- hits a single shard
INSERT INTO test SELECT 1 FROM generate_series(0,10)i;
-- if COPY hits a single shard, we should have reserved connections
-- to the other nodes
SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2;
-- we should be able to see this again if the query hits
-- the same shard
INSERT INTO test SELECT 1 FROM generate_series(0,10)i;
SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2;
-- but when the query hits the other shard(s), we should
-- see that all the reserved connections are used
INSERT INTO test SELECT i FROM generate_series(0,10)i;
SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2;
ROLLBACK;
-- at the end of the transaction, all should be cleared
SELECT * FROM citus_reserved_connection_stats();
BEGIN;
SELECT count(*) FROM test;
-- the above command used at least one connection per node
-- so the next commands would not need any reserved connections
INSERT INTO test SELECT 1 FROM generate_series(0,10)i;
SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2;
INSERT INTO test SELECT i FROM generate_series(0,10)i;
SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2;
COMMIT;
-- checkout the reserved connections with cached connections
ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 1;
SELECT pg_reload_conf();
SELECT pg_sleep(0.1);
-- cache connections to the nodes
SELECT count(*) FROM test;
BEGIN;
-- we should not have any reserved connections
-- because we already have available connections
COPY test FROM PROGRAM 'seq 32';
SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2;
COMMIT;
-- in case other tests relies on these setting, reset them
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
ALTER SYSTEM RESET citus.recover_2pc_interval;
ALTER SYSTEM RESET citus.max_cached_conns_per_worker;
SELECT pg_reload_conf();
BEGIN;
SET LOCAL client_min_messages TO WARNING;
DROP SCHEMA shared_connection_stats CASCADE;
COMMIT;