Throttle connections to the worker nodes

With this commit, we're introducing a new infrastructure to throttle
connections to the worker nodes. This infrastructure is useful for
multi-shard queries, router queries are have not been affected by this.

The goal is to prevent establishing more than citus.max_shared_pool_size
number of connections per worker node in total, across sessions.

To do that, we've introduced a new connection flag OPTIONAL_CONNECTION.
The idea is that some connections are optional such as the second
(and further connections) for the adaptive executor. A single connection
is enough to finish the distributed execution, the others are useful to
execute the query faster. Thus, they can be consider as optional connections.
When an optional connection is not allowed to the adaptive executor, it
simply skips it and continues the execution with the already established
connections. However, it'll keep retrying to establish optional
connections, in case some slots are open again.
pull/3692/head
Onder Kalaci 2020-03-24 13:20:40 +01:00
parent 38b8a9ad62
commit aa6b641828
26 changed files with 1815 additions and 112 deletions

View File

@ -64,7 +64,6 @@
#include "utils/lsyscache.h"
#include "utils/syscache.h"
bool EnableDDLPropagation = true; /* ddl propagation is enabled */
PropSetCmdBehavior PropagateSetCommands = PROPSETCMD_NONE; /* SET prop off */
static bool shouldInvalidateForeignKeyGraph = false;

View File

@ -28,11 +28,13 @@
#include "distributed/hash_helpers.h"
#include "distributed/placement_connection.h"
#include "distributed/run_from_same_connection.h"
#include "distributed/shared_connection_stats.h"
#include "distributed/cancel_utils.h"
#include "distributed/remote_commands.h"
#include "distributed/version_compat.h"
#include "mb/pg_wchar.h"
#include "portability/instr_time.h"
#include "storage/ipc.h"
#include "utils/hsearch.h"
#include "utils/memutils.h"
@ -46,7 +48,8 @@ MemoryContext ConnectionContext = NULL;
static uint32 ConnectionHashHash(const void *key, Size keysize);
static int ConnectionHashCompare(const void *a, const void *b, Size keysize);
static MultiConnection * StartConnectionEstablishment(ConnectionHashKey *key);
static void StartConnectionEstablishment(MultiConnection *connectionn,
ConnectionHashKey *key);
static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry);
static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit);
static bool ShouldShutdownConnection(MultiConnection *connection, const int
@ -78,7 +81,7 @@ static WaitEventSet * WaitEventSetFromMultiConnectionStates(List *connections,
int *waitCount);
static void CloseNotReadyMultiConnectionStates(List *connectionStates);
static uint32 MultiConnectionStateEventMask(MultiConnectionPollState *connectionState);
static void CitusPQFinish(MultiConnection *connection);
static int CitusNoticeLogLevel = DEFAULT_CITUS_NOTICE_LEVEL;
@ -257,7 +260,6 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
const char *user, const char *database)
{
ConnectionHashKey key;
MultiConnection *connection;
bool found;
/* do some minimal input checks */
@ -310,24 +312,75 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
if (!(flags & FORCE_NEW_CONNECTION))
{
/* check connection cache for a connection that's not already in use */
connection = FindAvailableConnection(entry->connections, flags);
MultiConnection *connection = FindAvailableConnection(entry->connections, flags);
if (connection)
{
return connection;
}
}
/*
* Either no caching desired, or no pre-established, non-claimed,
* connection present. Initiate connection establishment.
*/
connection = StartConnectionEstablishment(&key);
MultiConnection *connection = MemoryContextAllocZero(ConnectionContext,
sizeof(MultiConnection));
connection->initilizationState = POOL_STATE_NOT_INITIALIZED;
dlist_push_tail(entry->connections, &connection->connectionNode);
/* these two flags are by nature cannot happen at the same time */
Assert(!((flags & WAIT_FOR_CONNECTION) && (flags & OPTIONAL_CONNECTION)));
if (flags & WAIT_FOR_CONNECTION)
{
WaitLoopForSharedConnection(hostname, port);
}
else if (flags & OPTIONAL_CONNECTION)
{
/*
* We can afford to skip establishing an optional connection. For
* non-optional connections, we first retry for some time. If we still
* cannot reserve the right to establish a connection, we prefer to
* error out.
*/
if (!TryToIncrementSharedConnectionCounter(hostname, port))
{
/* do not track the connection anymore */
dlist_delete(&connection->connectionNode);
pfree(connection);
return NULL;
}
}
else
{
/*
* The caller doesn't want the connection manager to wait
* until a connection slot is available on the remote node.
* In the end, we might fail to establish connection to the
* remote node as it might not have any space in
* max_connections for this connection establishment.
*
* Still, we keep track of the connnection counter.
*/
IncrementSharedConnectionCounter(hostname, port);
}
/*
* We've already incremented the counter above, so we should decrement
* when we're done with the connection.
*/
connection->initilizationState = POOL_STATE_COUNTER_INCREMENTED;
StartConnectionEstablishment(connection, &key);
ResetShardPlacementAssociation(connection);
/* fully initialized the connection, record it */
connection->initilizationState = POOL_STATE_INITIALIZED;
return connection;
}
@ -456,8 +509,7 @@ CloseConnection(MultiConnection *connection)
bool found;
/* close connection */
PQfinish(connection->pgConn);
connection->pgConn = NULL;
CitusPQFinish(connection);
strlcpy(key.hostname, connection->hostname, MAX_NODE_LENGTH);
key.port = connection->port;
@ -537,8 +589,7 @@ ShutdownConnection(MultiConnection *connection)
{
SendCancelationRequest(connection);
}
PQfinish(connection->pgConn);
connection->pgConn = NULL;
CitusPQFinish(connection);
}
@ -903,9 +954,30 @@ CloseNotReadyMultiConnectionStates(List *connectionStates)
}
/* close connection, otherwise we take up resource on the other side */
CitusPQFinish(connection);
}
}
/*
* CitusPQFinish is a wrapper around PQfinish and does book keeping on shared connection
* counters.
*/
static void
CitusPQFinish(MultiConnection *connection)
{
if (connection->pgConn != NULL)
{
PQfinish(connection->pgConn);
connection->pgConn = NULL;
}
/* behave idempotently, there is no gurantee that CitusPQFinish() is called once */
if (connection->initilizationState >= POOL_STATE_COUNTER_INCREMENTED)
{
DecrementSharedConnectionCounter(connection->hostname, connection->port);
connection->initilizationState = POOL_STATE_NOT_INITIALIZED;
}
}
@ -985,8 +1057,8 @@ ConnectionHashCompare(const void *a, const void *b, Size keysize)
* Asynchronously establish connection to a remote node, but don't wait for
* that to finish. DNS lookups etc. are performed synchronously though.
*/
static MultiConnection *
StartConnectionEstablishment(ConnectionHashKey *key)
static void
StartConnectionEstablishment(MultiConnection *connection, ConnectionHashKey *key)
{
bool found = false;
static uint64 connectionId = 1;
@ -1018,9 +1090,6 @@ StartConnectionEstablishment(ConnectionHashKey *key)
entry->isValid = true;
}
MultiConnection *connection = MemoryContextAllocZero(ConnectionContext,
sizeof(MultiConnection));
strlcpy(connection->hostname, key->hostname, MAX_NODE_LENGTH);
connection->port = key->port;
strlcpy(connection->database, key->database, NAMEDATALEN);
@ -1040,8 +1109,6 @@ StartConnectionEstablishment(ConnectionHashKey *key)
PQsetnonblocking(connection->pgConn, true);
SetCitusNoticeProcessor(connection);
return connection;
}
@ -1166,6 +1233,7 @@ ShouldShutdownConnection(MultiConnection *connection, const int cachedConnection
}
return isCitusInitiatedBackend ||
connection->initilizationState != POOL_STATE_INITIALIZED ||
cachedConnectionCount >= MaxCachedConnectionsPerWorker ||
connection->forceCloseAtTransactionEnd ||
PQstatus(connection->pgConn) != CONNECTION_OK ||

View File

@ -20,20 +20,27 @@
#include "access/hash.h"
#include "access/htup_details.h"
#include "catalog/pg_authid.h"
#include "commands/dbcommands.h"
#include "distributed/cancel_utils.h"
#include "distributed/connection_management.h"
#include "distributed/listutils.h"
#include "distributed/metadata_cache.h"
#include "distributed/shared_connection_stats.h"
#include "distributed/time_constants.h"
#include "distributed/tuplestore.h"
#include "utils/builtins.h"
#include "utils/hashutils.h"
#include "utils/hsearch.h"
#include "storage/ipc.h"
#define REMOTE_CONNECTION_STATS_COLUMNS 6
#define REMOTE_CONNECTION_STATS_COLUMNS 4
#define ADJUST_POOLSIZE_AUTOMATICALLY 0
#define DISABLE_CONNECTION_THROTTLING -1
/*
* The data structure used to store data in shared memory. This data structure only
* The data structure used to store data in shared memory. This data structure is only
* used for storing the lock. The actual statistics about the connections are stored
* in the hashmap, which is allocated separately, as Postgres provides different APIs
* for allocating hashmaps in the shared memory.
@ -42,22 +49,26 @@ typedef struct ConnectionStatsSharedData
{
int sharedConnectionHashTrancheId;
char *sharedConnectionHashTrancheName;
LWLock sharedConnectionHashLock;
ConditionVariable waitersConditionVariable;
} ConnectionStatsSharedData;
typedef struct SharedConnStatsHashKey
{
/*
* Using nodeId (over hostname/hostport) make the tracking resiliant to
* master_update_node(). Plus, requires a little less memory.
* We keep the entries in the shared memory even after master_update_node()
* as there might be some cached connections to the old node.
* That's why, we prefer to use "hostname/port" over nodeId.
*/
uint32 nodeId;
char hostname[MAX_NODE_LENGTH];
int32 port;
/*
* Given that citus.shared_max_pool_size can be defined per database, we
* should keep track of shared connections per database.
*/
char database[NAMEDATALEN];
Oid databaseOid;
} SharedConnStatsHashKey;
/* hash entry for per worker stats */
@ -70,16 +81,15 @@ typedef struct SharedConnStatsHashEntry
/*
* Controlled via a GUC.
*
* By default, Citus tracks 1024 worker nodes, which is already
* very unlikely number of worker nodes. Given that the shared
* memory required per worker is pretty small (~120 Bytes), we think it
* is a good default that wouldn't hurt any users in any dimension.
* Controlled via a GUC, never access directly, use GetMaxSharedPoolSize().
* "0" means adjust MaxSharedPoolSize automatically by using MaxConnections.
* "-1" means do not apply connection throttling
* Anything else means use that number
*/
int MaxTrackedWorkerNodes = 1024;
int MaxSharedPoolSize = 0;
/* the following two structs used for accessing shared memory */
/* the following two structs are used for accessing shared memory */
static HTAB *SharedConnStatsHash = NULL;
static ConnectionStatsSharedData *ConnectionStatsSharedState = NULL;
@ -88,9 +98,10 @@ static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
/* local function declarations */
static void StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor);
static void UnLockConnectionSharedMemory(void);
static void StoreAllRemoteConnectionStats(Tuplestorestate *tupleStore, TupleDesc
tupleDescriptor);
static void LockConnectionSharedMemory(LWLockMode lockMode);
static void UnLockConnectionSharedMemory(void);
static void SharedConnectionStatsShmemInit(void);
static size_t SharedConnectionStatsShmemSize(void);
static int SharedConnectionHashCompare(const void *a, const void *b, Size keysize);
@ -112,7 +123,7 @@ citus_remote_connection_stats(PG_FUNCTION_ARGS)
CheckCitusVersion(ERROR);
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
StoreAllConnections(tupleStore, tupleDescriptor);
StoreAllRemoteConnectionStats(tupleStore, tupleDescriptor);
/* clean up and return the tuplestore */
tuplestore_donestoring(tupleStore);
@ -122,19 +133,19 @@ citus_remote_connection_stats(PG_FUNCTION_ARGS)
/*
* StoreAllConnections gets connections established from the current node
* StoreAllRemoteConnectionStats gets connections established from the current node
* and inserts them into the given tuplestore.
*
* We don't need to enforce any access privileges as the number of backends
* on any node is already visible on pg_stat_activity to all users.
*/
static void
StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor)
StoreAllRemoteConnectionStats(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor)
{
Datum values[REMOTE_CONNECTION_STATS_COLUMNS];
bool isNulls[REMOTE_CONNECTION_STATS_COLUMNS];
/* we're reading all distributed transactions, prevent new backends */
/* we're reading all shared connections, prevent any changes */
LockConnectionSharedMemory(LW_SHARED);
HASH_SEQ_STATUS status;
@ -147,9 +158,17 @@ StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor)
memset(values, 0, sizeof(values));
memset(isNulls, false, sizeof(isNulls));
values[0] = Int32GetDatum(connectionEntry->key.nodeId);
values[1] = PointerGetDatum(connectionEntry->key.database);
values[2] = Int32GetDatum(connectionEntry->connectionCount);
char *databaseName = get_database_name(connectionEntry->key.databaseOid);
if (databaseName == NULL)
{
/* database might have been dropped */
continue;
}
values[0] = PointerGetDatum(cstring_to_text(connectionEntry->key.hostname));
values[1] = Int32GetDatum(connectionEntry->key.port);
values[2] = PointerGetDatum(cstring_to_text(databaseName));
values[3] = Int32GetDatum(connectionEntry->connectionCount);
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
}
@ -158,6 +177,313 @@ StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor)
}
/*
* RemoveInactiveNodesFromSharedConnections goes over the SharedConnStatsHash
* and removes the inactive entries.
*/
void
RemoveInactiveNodesFromSharedConnections(void)
{
/* we're modifying connections, prevent any changes */
LockConnectionSharedMemory(LW_EXCLUSIVE);
HASH_SEQ_STATUS status;
SharedConnStatsHashEntry *connectionEntry = NULL;
/*
* In the first iteration, try to remove worker nodes that doesn't have any active
* conections and the node does not exits in the metadata anymore.
*/
hash_seq_init(&status, SharedConnStatsHash);
while ((connectionEntry = (SharedConnStatsHashEntry *) hash_seq_search(&status)) != 0)
{
SharedConnStatsHashKey connectionKey = connectionEntry->key;
WorkerNode *workerNode =
FindWorkerNode(connectionKey.hostname, connectionKey.port);
if (connectionEntry->connectionCount == 0 &&
(workerNode == NULL || !workerNode->isActive))
{
hash_search(SharedConnStatsHash, &connectionKey, HASH_REMOVE, NULL);
}
}
int entryCount = hash_get_num_entries(SharedConnStatsHash);
if (entryCount + 1 < MaxWorkerNodesTracked)
{
/* we're good, we have at least one more space for a new worker */
UnLockConnectionSharedMemory();
return;
}
/*
* We aimed to remove nodes that don't have any open connections. If we
* failed to find one, we have to be more aggressive and remove at least
* one of the inactive ones.
*/
hash_seq_init(&status, SharedConnStatsHash);
while ((connectionEntry = (SharedConnStatsHashEntry *) hash_seq_search(&status)) != 0)
{
SharedConnStatsHashKey connectionKey = connectionEntry->key;
WorkerNode *workerNode =
FindWorkerNode(connectionKey.hostname, connectionKey.port);
if (workerNode == NULL || !workerNode->isActive)
{
hash_search(SharedConnStatsHash, &connectionKey, HASH_REMOVE, NULL);
hash_seq_term(&status);
break;
}
}
UnLockConnectionSharedMemory();
}
/*
* GetMaxSharedPoolSize is a wrapper around MaxSharedPoolSize which is controlled
* via a GUC.
* "0" means adjust MaxSharedPoolSize automatically by using MaxConnections
* "-1" means do not apply connection throttling
* Anything else means use that number
*/
int
GetMaxSharedPoolSize(void)
{
if (MaxSharedPoolSize == ADJUST_POOLSIZE_AUTOMATICALLY)
{
return MaxConnections;
}
return MaxSharedPoolSize;
}
/*
* WaitLoopForSharedConnection tries to increment the shared connection
* counter for the given hostname/port and the current database in
* SharedConnStatsHash.
*
* The function implements a retry mechanism via a condition variable.
*/
void
WaitLoopForSharedConnection(const char *hostname, int port)
{
while (!TryToIncrementSharedConnectionCounter(hostname, port))
{
CHECK_FOR_INTERRUPTS();
WaitForSharedConnection();
}
ConditionVariableCancelSleep();
}
/*
* TryToIncrementSharedConnectionCounter tries to increment the shared
* connection counter for the given nodeId and the current database in
* SharedConnStatsHash.
*
* If the function returns true, the caller is allowed (and expected)
* to establish a new connection to the given node. Else, the caller
* is not allowed to establish a new connection.
*/
bool
TryToIncrementSharedConnectionCounter(const char *hostname, int port)
{
if (GetMaxSharedPoolSize() == DISABLE_CONNECTION_THROTTLING)
{
/* connection throttling disabled */
return true;
}
bool counterIncremented = false;
SharedConnStatsHashKey connKey;
strlcpy(connKey.hostname, hostname, MAX_NODE_LENGTH);
if (strlen(hostname) > MAX_NODE_LENGTH)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("hostname exceeds the maximum length of %d",
MAX_NODE_LENGTH)));
}
connKey.port = port;
connKey.databaseOid = MyDatabaseId;
LockConnectionSharedMemory(LW_EXCLUSIVE);
/*
* As the hash map is allocated in shared memory, it doesn't rely on palloc for
* memory allocation, so we could get NULL via HASH_ENTER_NULL when there is no
* space in the shared memory. That's why we prefer continuing the execution
* instead of throwing an error.
*/
bool entryFound = false;
SharedConnStatsHashEntry *connectionEntry =
hash_search(SharedConnStatsHash, &connKey, HASH_ENTER_NULL, &entryFound);
/*
* It is possible to throw an error at this point, but that doesn't help us in anyway.
* Instead, we try our best, let the connection establishment continue by-passing the
* connection throttling.
*/
if (!connectionEntry)
{
UnLockConnectionSharedMemory();
return true;
}
if (!entryFound)
{
/* we successfully allocated the entry for the first time, so initialize it */
connectionEntry->connectionCount = 1;
counterIncremented = true;
}
else if (connectionEntry->connectionCount + 1 > GetMaxSharedPoolSize())
{
/* there is no space left for this connection */
counterIncremented = false;
}
else
{
connectionEntry->connectionCount++;
counterIncremented = true;
}
UnLockConnectionSharedMemory();
return counterIncremented;
}
/*
* IncrementSharedConnectionCounter increments the shared counter
* for the given hostname and port.
*/
void
IncrementSharedConnectionCounter(const char *hostname, int port)
{
SharedConnStatsHashKey connKey;
if (GetMaxSharedPoolSize() == DISABLE_CONNECTION_THROTTLING)
{
/* connection throttling disabled */
return;
}
strlcpy(connKey.hostname, hostname, MAX_NODE_LENGTH);
if (strlen(hostname) > MAX_NODE_LENGTH)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("hostname exceeds the maximum length of %d",
MAX_NODE_LENGTH)));
}
connKey.port = port;
connKey.databaseOid = MyDatabaseId;
LockConnectionSharedMemory(LW_EXCLUSIVE);
/*
* As the hash map is allocated in shared memory, it doesn't rely on palloc for
* memory allocation, so we could get NULL via HASH_ENTER_NULL. That's why we prefer
* continuing the execution instead of throwing an error.
*/
bool entryFound = false;
SharedConnStatsHashEntry *connectionEntry =
hash_search(SharedConnStatsHash, &connKey, HASH_ENTER_NULL, &entryFound);
/*
* It is possible to throw an error at this point, but that doesn't help us in anyway.
* Instead, we try our best, let the connection establishment continue by-passing the
* connection throttling.
*/
if (!connectionEntry)
{
UnLockConnectionSharedMemory();
ereport(DEBUG4, (errmsg("No entry found for node %s:%d while incrementing "
"connection counter", hostname, port)));
return;
}
if (!entryFound)
{
/* we successfully allocated the entry for the first time, so initialize it */
connectionEntry->connectionCount = 0;
}
connectionEntry->connectionCount += 1;
UnLockConnectionSharedMemory();
}
/*
* DecrementSharedConnectionCounter decrements the shared counter
* for the given hostname and port.
*/
void
DecrementSharedConnectionCounter(const char *hostname, int port)
{
SharedConnStatsHashKey connKey;
if (GetMaxSharedPoolSize() == DISABLE_CONNECTION_THROTTLING)
{
/* connection throttling disabled */
return;
}
strlcpy(connKey.hostname, hostname, MAX_NODE_LENGTH);
if (strlen(hostname) > MAX_NODE_LENGTH)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("hostname exceeds the maximum length of %d",
MAX_NODE_LENGTH)));
}
connKey.port = port;
connKey.databaseOid = MyDatabaseId;
LockConnectionSharedMemory(LW_EXCLUSIVE);
bool entryFound = false;
SharedConnStatsHashEntry *connectionEntry =
hash_search(SharedConnStatsHash, &connKey, HASH_FIND, &entryFound);
/* this worker node is removed or updated, no need to care */
if (!entryFound)
{
UnLockConnectionSharedMemory();
/* wake up any waiters in case any backend is waiting for this node */
WakeupWaiterBackendsForSharedConnection();
ereport(DEBUG4, (errmsg("No entry found for node %s:%d while decrementing "
"connection counter", hostname, port)));
return;
}
/* we should never go below 0 */
Assert(connectionEntry->connectionCount > 0);
connectionEntry->connectionCount -= 1;
UnLockConnectionSharedMemory();
WakeupWaiterBackendsForSharedConnection();
}
/*
* LockConnectionSharedMemory is a utility function that should be used when
* accessing to the SharedConnStatsHash, which is in the shared memory.
@ -180,6 +506,43 @@ UnLockConnectionSharedMemory(void)
}
/*
* WakeupWaiterBackendsForSharedConnection is a wrapper around the condition variable
* broadcast operation.
*
* We use a single condition variable, for all worker nodes, to implement the connection
* throttling mechanism. Combination of all the backends are allowed to establish
* MaxSharedPoolSize number of connections per worker node. If a backend requires a
* non-optional connection (see WAIT_FOR_CONNECTION for details), it is not allowed
* to establish it immediately if the total connections are equal to MaxSharedPoolSize.
* Instead, the backend waits on the condition variable. When any other backend
* terminates an existing connection to any remote node, this function is called.
* The main goal is to trigger all waiting backends to try getting a connection slot
* in MaxSharedPoolSize. The ones which can get connection slot are allowed to continue
* with the connection establishments. Others should wait another backend to call
* this function.
*/
void
WakeupWaiterBackendsForSharedConnection(void)
{
ConditionVariableBroadcast(&ConnectionStatsSharedState->waitersConditionVariable);
}
/*
* WaitForSharedConnection is a wrapper around the condition variable sleep operation.
*
* For the details of the use of the condition variable, see
* WakeupWaiterBackendsForSharedConnection().
*/
void
WaitForSharedConnection(void)
{
ConditionVariableSleep(&ConnectionStatsSharedState->waitersConditionVariable,
PG_WAIT_EXTENSION);
}
/*
* InitializeSharedConnectionStats requests the necessary shared memory
* from Postgres and sets up the shared memory startup hook.
@ -208,9 +571,8 @@ SharedConnectionStatsShmemSize(void)
Size size = 0;
size = add_size(size, sizeof(ConnectionStatsSharedData));
size = add_size(size, mul_size(sizeof(LWLock), MaxTrackedWorkerNodes));
Size hashSize = hash_estimate_size(MaxTrackedWorkerNodes,
Size hashSize = hash_estimate_size(MaxWorkerNodesTracked,
sizeof(SharedConnStatsHashEntry));
size = add_size(size, hashSize);
@ -229,7 +591,7 @@ SharedConnectionStatsShmemInit(void)
bool alreadyInitialized = false;
HASHCTL info;
/* create (nodeId,database) -> [counter] */
/* create (hostname, port, database) -> [counter] */
memset(&info, 0, sizeof(info));
info.keysize = sizeof(SharedConnStatsHashKey);
info.entrysize = sizeof(SharedConnStatsHashEntry);
@ -260,12 +622,14 @@ SharedConnectionStatsShmemInit(void)
LWLockInitialize(&ConnectionStatsSharedState->sharedConnectionHashLock,
ConnectionStatsSharedState->sharedConnectionHashTrancheId);
ConditionVariableInit(&ConnectionStatsSharedState->waitersConditionVariable);
}
/* allocate hash table */
SharedConnStatsHash =
ShmemInitHash("Shared Conn. Stats Hash", MaxTrackedWorkerNodes,
MaxTrackedWorkerNodes, &info, hashFlags);
ShmemInitHash("Shared Conn. Stats Hash", MaxWorkerNodesTracked,
MaxWorkerNodesTracked, &info, hashFlags);
LWLockRelease(AddinShmemInitLock);
@ -284,8 +648,9 @@ SharedConnectionHashHash(const void *key, Size keysize)
{
SharedConnStatsHashKey *entry = (SharedConnStatsHashKey *) key;
uint32 hash = hash_uint32(entry->nodeId);
hash = hash_combine(hash, string_hash(entry->database, NAMEDATALEN));
uint32 hash = string_hash(entry->hostname, NAMEDATALEN);
hash = hash_combine(hash, hash_uint32(entry->port));
hash = hash_combine(hash, hash_uint32(entry->databaseOid));
return hash;
}
@ -297,8 +662,9 @@ SharedConnectionHashCompare(const void *a, const void *b, Size keysize)
SharedConnStatsHashKey *ca = (SharedConnStatsHashKey *) a;
SharedConnStatsHashKey *cb = (SharedConnStatsHashKey *) b;
if (ca->nodeId != cb->nodeId ||
strncmp(ca->database, cb->database, NAMEDATALEN) != 0)
if (strncmp(ca->hostname, cb->hostname, MAX_NODE_LENGTH) != 0 ||
ca->port != cb->port ||
ca->databaseOid != cb->databaseOid)
{
return 1;
}

View File

@ -589,6 +589,7 @@ static WorkerPool * FindOrCreateWorkerPool(DistributedExecution *execution,
static WorkerSession * FindOrCreateWorkerSession(WorkerPool *workerPool,
MultiConnection *connection);
static void ManageWorkerPool(WorkerPool *workerPool);
static bool ShouldWaitForConnection(WorkerPool *workerPool);
static void CheckConnectionTimeout(WorkerPool *workerPool);
static int UsableConnectionCount(WorkerPool *workerPool);
static long NextEventTimeout(DistributedExecution *execution);
@ -1944,9 +1945,6 @@ FindOrCreateWorkerPool(DistributedExecution *execution, char *nodeName, int node
workerPool->nodeName = pstrdup(nodeName);
workerPool->nodePort = nodePort;
INSTR_TIME_SET_ZERO(workerPool->poolStartTime);
workerPool->distributedExecution = execution;
/* "open" connections aggressively when there are cached connections */
int nodeConnectionCount = MaxCachedConnectionsPerWorker;
workerPool->maxNewConnectionsPerCycle = Max(1, nodeConnectionCount);
@ -1954,6 +1952,8 @@ FindOrCreateWorkerPool(DistributedExecution *execution, char *nodeName, int node
dlist_init(&workerPool->pendingTaskQueue);
dlist_init(&workerPool->readyTaskQueue);
workerPool->distributedExecution = execution;
execution->workerList = lappend(execution->workerList, workerPool);
return workerPool;
@ -2385,11 +2385,48 @@ ManageWorkerPool(WorkerPool *workerPool)
connectionFlags |= OUTSIDE_TRANSACTION;
}
if (UseConnectionPerPlacement())
{
/*
* User wants one connection per placement, so no throttling is desired
* and we do not set any flags.
*
* The primary reason for this is that allowing multiple backends to use
* connection per placement could lead to unresolved self deadlocks. In other
* words, each backend may stuck waiting for other backends to get a slot
* in the shared connection counters.
*/
}
else if (ShouldWaitForConnection(workerPool))
{
/*
* We need this connection to finish the execution. If it is not
* available based on the current number of connections to the worker
* then wait for it.
*/
connectionFlags |= WAIT_FOR_CONNECTION;
}
else
{
/*
* The executor can finish the execution with a single connection,
* remaining are optional. If the executor can get more connections,
* it can increase the parallelism.
*/
connectionFlags |= OPTIONAL_CONNECTION;
}
/* open a new connection to the worker */
MultiConnection *connection = StartNodeUserDatabaseConnection(connectionFlags,
workerPool->nodeName,
workerPool->nodePort,
NULL, NULL);
if (!connection)
{
/* connection can only be NULL for optional connections */
Assert((connectionFlags & OPTIONAL_CONNECTION));
continue;
}
/*
* Assign the initial state in the connection state machine. The connection
@ -2404,6 +2441,17 @@ ManageWorkerPool(WorkerPool *workerPool)
*/
connection->claimedExclusively = true;
if (list_length(workerPool->sessionList) == 0)
{
/*
* The worker pool has just started to establish connections. We need to
* defer this initilization after StartNodeUserDatabaseConnection()
* because for non-optional connections, we have some logic to wait
* until a connection is allowed to be established.
*/
INSTR_TIME_SET_ZERO(workerPool->poolStartTime);
}
/* create a session for the connection */
WorkerSession *session = FindOrCreateWorkerSession(workerPool, connection);
@ -2416,6 +2464,42 @@ ManageWorkerPool(WorkerPool *workerPool)
}
/*
* ShouldWaitForConnection returns true if the workerPool should wait to
* get the next connection until one slot is empty within
* citus.max_shared_pool_size on the worker. Note that, if there is an
* empty slot, the connection will not wait anyway.
*/
static bool
ShouldWaitForConnection(WorkerPool *workerPool)
{
if (list_length(workerPool->sessionList) == 0)
{
/*
* We definitely need at least 1 connection to finish the execution.
* All single shard queries hit here with the default settings.
*/
return true;
}
if (list_length(workerPool->sessionList) < MaxCachedConnectionsPerWorker)
{
/*
* Until this session caches MaxCachedConnectionsPerWorker connections,
* this might lead some optional connections to be considered as non-optional
* when MaxCachedConnectionsPerWorker > 1.
*
* However, once the session caches MaxCachedConnectionsPerWorker (which is
* the second transaction executed in the session), Citus would utilize the
* cached connections as much as possible.
*/
return true;
}
return false;
}
/*
* CheckConnectionTimeout makes sure that the execution enforces the connection
* establishment timeout defined by the user (NodeConnectionTimeout).

View File

@ -39,6 +39,7 @@
#include "distributed/remote_commands.h"
#include "distributed/resource_lock.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/shared_connection_stats.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_transaction.h"
#include "lib/stringinfo.h"
@ -293,6 +294,9 @@ master_disable_node(PG_FUNCTION_ARGS)
bool onlyConsiderActivePlacements = false;
MemoryContext savedContext = CurrentMemoryContext;
/* remove the shared connection counters to have some space */
RemoveInactiveNodesFromSharedConnections();
PG_TRY();
{
if (NodeIsPrimary(workerNode))
@ -604,6 +608,9 @@ ActivateNode(char *nodeName, int nodePort)
{
bool isActive = true;
/* remove the shared connection counters to have some space */
RemoveInactiveNodesFromSharedConnections();
/* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */
LockRelationOid(DistNodeRelationId(), ExclusiveLock);
@ -646,6 +653,9 @@ master_update_node(PG_FUNCTION_ARGS)
CheckCitusVersion(ERROR);
/* remove the shared connection counters to have some space */
RemoveInactiveNodesFromSharedConnections();
WorkerNode *workerNodeWithSameAddress = FindWorkerNodeAnyCluster(newNodeNameString,
newNodePort);
if (workerNodeWithSameAddress != NULL)
@ -672,6 +682,7 @@ master_update_node(PG_FUNCTION_ARGS)
errmsg("node %u not found", nodeId)));
}
/*
* If the node is a primary node we block reads and writes.
*
@ -715,8 +726,9 @@ master_update_node(PG_FUNCTION_ARGS)
UpdateNodeLocation(nodeId, newNodeNameString, newNodePort);
strlcpy(workerNode->workerName, newNodeNameString, WORKER_LENGTH);
workerNode->workerPort = newNodePort;
/* we should be able to find the new node from the metadata */
workerNode = FindWorkerNode(newNodeNameString, newNodePort);
Assert(workerNode->nodeId == nodeId);
/*
* Propagate the updated pg_dist_node entry to all metadata workers.
@ -1012,8 +1024,10 @@ ReadDistNode(bool includeNodesFromOtherClusters)
static void
RemoveNodeFromCluster(char *nodeName, int32 nodePort)
{
WorkerNode *workerNode = ModifiableWorkerNode(nodeName, nodePort);
/* remove the shared connection counters to have some space */
RemoveInactiveNodesFromSharedConnections();
WorkerNode *workerNode = ModifiableWorkerNode(nodeName, nodePort);
if (NodeIsPrimary(workerNode))
{
bool onlyConsiderActivePlacements = false;

View File

@ -73,6 +73,7 @@
#include "distributed/adaptive_executor.h"
#include "port/atomics.h"
#include "postmaster/postmaster.h"
#include "storage/ipc.h"
#include "optimizer/planner.h"
#include "optimizer/paths.h"
#include "tcop/tcopprot.h"
@ -89,9 +90,10 @@ static char *CitusVersion = CITUS_VERSION;
void _PG_init(void);
static void CitusBackendAtExit(void);
static void ResizeStackToMaximumDepth(void);
static void multi_log_hook(ErrorData *edata);
static void RegisterConnectionCleanup(void);
static void CitusCleanupConnectionsAtExit(int code, Datum arg);
static void CreateRequiredDirectories(void);
static void RegisterCitusConfigVariables(void);
static bool ErrorIfNotASuitableDeadlockFactor(double *newval, void **extra,
@ -99,6 +101,7 @@ static bool ErrorIfNotASuitableDeadlockFactor(double *newval, void **extra,
static bool WarnIfDeprecatedExecutorUsed(int *newval, void **extra, GucSource source);
static bool NodeConninfoGucCheckHook(char **newval, void **extra, GucSource source);
static void NodeConninfoGucAssignHook(const char *newval, void *extra);
static const char * MaxSharedPoolSizeGucShowHook(void);
static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource
source);
@ -274,8 +277,6 @@ _PG_init(void)
InitializeCitusQueryStats();
InitializeSharedConnectionStats();
atexit(CitusBackendAtExit);
/* enable modification of pg_catalog tables during pg_upgrade */
if (IsBinaryUpgrade)
{
@ -285,18 +286,6 @@ _PG_init(void)
}
/*
* CitusBackendAtExit is called atexit of the backend for the purposes of
* any clean-up needed.
*/
static void
CitusBackendAtExit(void)
{
/* properly close all the cached connections */
ShutdownAllConnections();
}
/*
* Stack size increase during high memory load may cause unexpected crashes.
* With this alloca call, we are increasing stack size explicitly, so that if
@ -382,6 +371,37 @@ StartupCitusBackend(void)
{
InitializeMaintenanceDaemonBackend();
InitializeBackendData();
RegisterConnectionCleanup();
}
/*
* RegisterConnectionCleanup cleans up any resources left at the end of the
* session. We prefer to cleanup before shared memory exit to make sure that
* this session properly releases anything hold in the shared memory.
*/
static void
RegisterConnectionCleanup(void)
{
static bool registeredCleanup = false;
if (registeredCleanup == false)
{
before_shmem_exit(CitusCleanupConnectionsAtExit, 0);
registeredCleanup = true;
}
}
/*
* CitusCleanupConnectionsAtExit is called before_shmem_exit() of the
* backend for the purposes of any clean-up needed.
*/
static void
CitusCleanupConnectionsAtExit(int code, Datum arg)
{
/* properly close all the cached connections */
ShutdownAllConnections();
}
@ -930,6 +950,20 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.max_shared_pool_size",
gettext_noop("Sets the maximum number of connections allowed per worker node "
"across all the backends from this node. Setting to -1 disables "
"connections throttling. Setting to 0 makes it auto-adjust, meaning "
"equal to max_connections on the coordinator."),
gettext_noop("As a rule of thumb, the value should be at most equal to the "
"max_connections on the remote nodes."),
&MaxSharedPoolSize,
0, -1, INT_MAX,
PGC_SIGHUP,
GUC_SUPERUSER_ONLY,
NULL, NULL, MaxSharedPoolSizeGucShowHook);
DefineCustomIntVariable(
"citus.max_worker_nodes_tracked",
gettext_noop("Sets the maximum number of worker nodes that are tracked."),
@ -937,9 +971,14 @@ RegisterCitusConfigVariables(void)
"health status are tracked in a shared hash table on "
"the master node. This configuration value limits the "
"size of the hash table, and consequently the maximum "
"number of worker nodes that can be tracked."),
"number of worker nodes that can be tracked."
"Citus keeps some information about the worker nodes "
"in the shared memory for certain optimizations. The "
"optimizations are enforced up to this number of worker "
"nodes. Any additional worker nodes may not benefit from"
"the optimizations."),
&MaxWorkerNodesTracked,
2048, 8, INT_MAX,
2048, 1024, INT_MAX,
PGC_POSTMASTER,
GUC_STANDARD,
NULL, NULL, NULL);
@ -1013,23 +1052,6 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.max_tracked_worker_nodes",
gettext_noop("Sets the maximum number of worker tracked."),
gettext_noop("Citus doesn't have any limitations in terms of the "
"number of worker nodes allowed in the cluster. But, "
"Citus keeps some information about the worker nodes "
"in the shared memory for certain optimizations. The "
"optimizations are enforced up to this number of worker "
"nodes. Any additional worker nodes may not benefit from"
"the optimizations."),
&MaxTrackedWorkerNodes,
1024, 256, INT_MAX,
PGC_POSTMASTER,
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.max_running_tasks_per_node",
gettext_noop("Sets the maximum number of tasks to run concurrently per node."),
@ -1539,6 +1561,28 @@ NodeConninfoGucAssignHook(const char *newval, void *extra)
}
/*
* MaxSharedPoolSizeGucShowHook overrides the value that is shown to the
* user when the default value has not been set.
*/
static const char *
MaxSharedPoolSizeGucShowHook(void)
{
StringInfo newvalue = makeStringInfo();
if (MaxSharedPoolSize == 0)
{
appendStringInfo(newvalue, "%d", GetMaxSharedPoolSize());
}
else
{
appendStringInfo(newvalue, "%d", MaxSharedPoolSize);
}
return (const char *) newvalue->data;
}
static bool
StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource source)
{

View File

@ -5,3 +5,4 @@
#include "udfs/citus_extradata_container/9.3-2.sql"
#include "udfs/update_distributed_table_colocation/9.3-2.sql"
#include "udfs/replicate_reference_tables/9.3-2.sql"
#include "udfs/citus_remote_connection_stats/9.3-2.sql"

View File

@ -1 +0,0 @@
#include "udfs/citus_remote_connection_stats/9.3-2.sql"

View File

@ -1,6 +1,22 @@
CREATE OR REPLACE FUNCTION citus_remote_connection_stats(OUT node_id int, OUT database_name text, OUT connection_count_to_node int)
CREATE OR REPLACE FUNCTION pg_catalog.citus_remote_connection_stats(
OUT hostname text,
OUT port int,
OUT database_name text,
OUT connection_count_to_node int)
RETURNS SETOF RECORD
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_remote_connection_stats$$;
COMMENT ON FUNCTION citus_remote_connection_stats(OUT node_id int, OUT database_name text, OUT connection_count_to_node int)
COMMENT ON FUNCTION pg_catalog.citus_remote_connection_stats(
OUT hostname text,
OUT port int,
OUT database_name text,
OUT connection_count_to_node int)
IS 'returns statistics about remote connections';
REVOKE ALL ON FUNCTION pg_catalog.citus_remote_connection_stats(
OUT hostname text,
OUT port int,
OUT database_name text,
OUT connection_count_to_node int)
FROM PUBLIC;

View File

@ -1,6 +1,22 @@
CREATE OR REPLACE FUNCTION citus_remote_connection_stats(OUT node_id int, OUT database_name text, OUT connection_count_to_node int)
CREATE OR REPLACE FUNCTION pg_catalog.citus_remote_connection_stats(
OUT hostname text,
OUT port int,
OUT database_name text,
OUT connection_count_to_node int)
RETURNS SETOF RECORD
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_remote_connection_stats$$;
COMMENT ON FUNCTION citus_remote_connection_stats(OUT node_id int, OUT database_name text, OUT connection_count_to_node int)
COMMENT ON FUNCTION pg_catalog.citus_remote_connection_stats(
OUT hostname text,
OUT port int,
OUT database_name text,
OUT connection_count_to_node int)
IS 'returns statistics about remote connections';
REVOKE ALL ON FUNCTION pg_catalog.citus_remote_connection_stats(
OUT hostname text,
OUT port int,
OUT database_name text,
OUT connection_count_to_node int)
FROM PUBLIC;

View File

@ -0,0 +1,66 @@
/*-------------------------------------------------------------------------
*
* test/src/sequential_execution.c
*
* This file contains functions to test setting citus.multi_shard_modify_mode
* GUC.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include "fmgr.h"
#include "distributed/shared_connection_stats.h"
#include "distributed/listutils.h"
#include "nodes/parsenodes.h"
#include "utils/guc.h"
/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(wake_up_connection_pool_waiters);
PG_FUNCTION_INFO_V1(set_max_shared_pool_size);
/*
* wake_up_waiters_backends is a SQL
* interface for testing WakeupWaiterBackendsForSharedConnection().
*/
Datum
wake_up_connection_pool_waiters(PG_FUNCTION_ARGS)
{
WakeupWaiterBackendsForSharedConnection();
PG_RETURN_VOID();
}
/*
* set_max_shared_pool_size is a SQL
* interface for setting MaxSharedPoolSize. We use this function in isolation
* tester where ALTER SYSTEM is not allowed.
*/
Datum
set_max_shared_pool_size(PG_FUNCTION_ARGS)
{
int value = PG_GETARG_INT32(0);
AlterSystemStmt *alterSystemStmt = palloc0(sizeof(AlterSystemStmt));
A_Const *aConstValue = makeNode(A_Const);
aConstValue->val = *makeInteger(value);
alterSystemStmt->setstmt = makeNode(VariableSetStmt);
alterSystemStmt->setstmt->name = "citus.max_shared_pool_size";
alterSystemStmt->setstmt->is_local = false;
alterSystemStmt->setstmt->kind = VAR_SET_VALUE;
alterSystemStmt->setstmt->args = list_make1(aConstValue);
AlterSystemSetConfigFile(alterSystemStmt);
kill(PostmasterPid, SIGHUP);
PG_RETURN_VOID();
}

View File

@ -31,6 +31,7 @@
#include "distributed/repartition_join_execution.h"
#include "distributed/transaction_management.h"
#include "distributed/placement_connection.h"
#include "distributed/shared_connection_stats.h"
#include "distributed/subplan_execution.h"
#include "distributed/version_compat.h"
#include "utils/hsearch.h"

View File

@ -52,10 +52,29 @@ enum MultiConnectionMode
/* open a connection per (co-located set of) placement(s) */
CONNECTION_PER_PLACEMENT = 1 << 3,
OUTSIDE_TRANSACTION = 1 << 4
OUTSIDE_TRANSACTION = 1 << 4,
/*
* Some connections are optional such as when adaptive executor is executing
* a multi-shard command and requires the second (or further) connections
* per node. In that case, the connection manager may decide not to allow the
* connection.
*/
OPTIONAL_CONNECTION = 1 << 5,
/*
* When this flag is passed, via connection throttling, the connection
* establishments may be suspended until a connection slot is available to
* the remote host.
*/
WAIT_FOR_CONNECTION = 1 << 6
};
/*
* This state is used for keeping track of the initilization
* of the underlying pg_conn struct.
*/
typedef enum MultiConnectionState
{
MULTI_CONNECTION_INITIAL,
@ -65,6 +84,21 @@ typedef enum MultiConnectionState
MULTI_CONNECTION_LOST
} MultiConnectionState;
/*
* This state is used for keeping track of the initilization
* of MultiConnection struct, not specifically the underlying
* pg_conn. The state is useful to determine the action during
* clean-up of connections.
*/
typedef enum MultiConnectionStructInitializationState
{
POOL_STATE_NOT_INITIALIZED,
POOL_STATE_COUNTER_INCREMENTED,
POOL_STATE_INITIALIZED
} MultiConnectionStructInitializationState;
/* declaring this directly above makes uncrustify go crazy */
typedef enum MultiConnectionMode MultiConnectionMode;
@ -114,6 +148,8 @@ typedef struct MultiConnection
/* number of bytes sent to PQputCopyData() since last flush */
uint64 copyBytesWrittenSinceLastFlush;
MultiConnectionStructInitializationState initilizationState;
} MultiConnection;

View File

@ -11,8 +11,17 @@
#ifndef SHARED_CONNECTION_STATS_H
#define SHARED_CONNECTION_STATS_H
extern int MaxTrackedWorkerNodes;
extern int MaxSharedPoolSize;
extern void InitializeSharedConnectionStats(void);
extern void WaitForSharedConnection(void);
extern void WakeupWaiterBackendsForSharedConnection(void);
extern void RemoveInactiveNodesFromSharedConnections(void);
extern int GetMaxSharedPoolSize(void);
extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port);
extern void WaitLoopForSharedConnection(const char *hostname, int port);
extern void DecrementSharedConnectionCounter(const char *hostname, int port);
extern void IncrementSharedConnectionCounter(const char *hostname, int port);
#endif /* SHARED_CONNECTION_STATS_H */

View File

@ -0,0 +1,160 @@
-- this test file is intended to be called at the end
-- of any test schedule, ensuring that there is not
-- leak/wrong calculation of the connection stats
-- in the shared memory
CREATE SCHEMA ensure_no_shared_connection_leak;
SET search_path TO ensure_no_shared_connection_leak;
-- set the cached connections to zero
-- and execute a distributed query so that
-- we end up with zero cached connections afterwards
ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
-- disable deadlock detection and re-trigger 2PC recovery
-- once more when citus.max_cached_conns_per_worker is zero
-- so that we can be sure that the connections established for
-- maintanince daemon is closed properly.
-- this is to prevent random failures in the tests (otherwise, we
-- might see connections established for this operations)
ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1;
ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms';
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT pg_sleep(0.1);
pg_sleep
---------------------------------------------------------------------
(1 row)
-- now that last 2PC recovery is done, we're good to disable it
ALTER SYSTEM SET citus.recover_2pc_interval TO '-1';
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
CREATE TABLE test (a int);
SELECT create_distributed_table('test', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT count(*) FROM test;
count
---------------------------------------------------------------------
0
(1 row)
-- in case of MX, we should prevent deadlock detection and
-- 2PC recover from the workers as well
\c - - - :worker_1_port
ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1;
ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms';
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT pg_sleep(0.1);
pg_sleep
---------------------------------------------------------------------
(1 row)
ALTER SYSTEM SET citus.recover_2pc_interval TO '-1';
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
\c - - - :worker_2_port
ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1;
ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms';
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT pg_sleep(0.1);
pg_sleep
---------------------------------------------------------------------
(1 row)
ALTER SYSTEM SET citus.recover_2pc_interval TO '-1';
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
\c - - - :master_port
SET search_path TO ensure_no_shared_connection_leak;
-- ensure that we only have at most citus.max_cached_conns_per_worker
-- connections per node
select
(connection_count_to_node = 0) as no_connection_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY 1;
no_connection_to_node
---------------------------------------------------------------------
t
t
(2 rows)
-- now, ensure this from the workers perspective
-- we should only see the connection/backend that is running the command below
SELECT
result, success
FROM
run_command_on_workers($$select count(*) from pg_stat_activity WHERE backend_type = 'client backend';$$)
ORDER BY 1, 2;
result | success
---------------------------------------------------------------------
1 | t
1 | t
(2 rows)
-- in case other tests relies on these setting, reset them
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
ALTER SYSTEM RESET citus.recover_2pc_interval;
ALTER SYSTEM RESET citus.max_cached_conns_per_worker;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
DROP SCHEMA ensure_no_shared_connection_leak CASCADE;
NOTICE: drop cascades to table test

View File

@ -0,0 +1,391 @@
CREATE SCHEMA shared_connection_stats;
SET search_path TO shared_connection_stats;
-- set the cached connections to zero
-- and execute a distributed query so that
-- we end up with zero cached connections afterwards
ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
-- disable deadlock detection and re-trigger 2PC recovery
-- once more when citus.max_cached_conns_per_worker is zero
-- so that we can be sure that the connections established for
-- maintanince daemon is closed properly.
-- this is to prevent random failures in the tests (otherwise, we
-- might see connections established for this operations)
ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1;
ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms';
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT pg_sleep(0.1);
pg_sleep
---------------------------------------------------------------------
(1 row)
-- now that last 2PC recovery is done, we're good to disable it
ALTER SYSTEM SET citus.recover_2pc_interval TO '1h';
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SET citus.shard_count TO 32;
SET citus.shard_replication_factor TO 1;
CREATE TABLE test (a int);
SELECT create_distributed_table('test', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO test SELECT i FROM generate_series(0,100)i;
-- show that no connections are cached
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
connection_count_to_node
---------------------------------------------------------------------
0
0
(2 rows)
-- single shard queries require single connection per node
BEGIN;
SELECT count(*) FROM test WHERE a = 1;
count
---------------------------------------------------------------------
1
(1 row)
SELECT count(*) FROM test WHERE a = 2;
count
---------------------------------------------------------------------
1
(1 row)
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
connection_count_to_node
---------------------------------------------------------------------
1
1
(2 rows)
COMMIT;
-- show that no connections are cached
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
connection_count_to_node
---------------------------------------------------------------------
0
0
(2 rows)
-- executor is only allowed to establish a single connection per node
BEGIN;
SET LOCAL citus.max_adaptive_executor_pool_size TO 1;
SELECT count(*) FROM test;
count
---------------------------------------------------------------------
101
(1 row)
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
connection_count_to_node
---------------------------------------------------------------------
1
1
(2 rows)
COMMIT;
-- show that no connections are cached
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
connection_count_to_node
---------------------------------------------------------------------
0
0
(2 rows)
-- sequential mode is allowed to establish a single connection per node
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
SELECT count(*) FROM test;
count
---------------------------------------------------------------------
101
(1 row)
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
connection_count_to_node
---------------------------------------------------------------------
1
1
(2 rows)
COMMIT;
-- show that no connections are cached
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
connection_count_to_node
---------------------------------------------------------------------
0
0
(2 rows)
-- now, decrease the shared pool size, and still force
-- one connection per placement
ALTER SYSTEM SET citus.max_shared_pool_size TO 5;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT pg_sleep(0.1);
pg_sleep
---------------------------------------------------------------------
(1 row)
BEGIN;
SET LOCAL citus.node_connection_timeout TO 1000;
SET LOCAL citus.force_max_query_parallelization TO ON;
SELECT count(*) FROM test;
count
---------------------------------------------------------------------
101
(1 row)
COMMIT;
-- pg_sleep forces almost 1 connection per placement
-- now, some of the optional connections would be skipped,
-- and only 5 connections are used per node
BEGIN;
SELECT count(*), pg_sleep(0.1) FROM test;
count | pg_sleep
---------------------------------------------------------------------
101 |
(1 row)
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
connection_count_to_node
---------------------------------------------------------------------
5
5
(2 rows)
COMMIT;
SHOW citus.max_shared_pool_size;
citus.max_shared_pool_size
---------------------------------------------------------------------
5
(1 row)
-- by default max_shared_pool_size equals to max_connections;
ALTER SYSTEM RESET citus.max_shared_pool_size;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT pg_sleep(0.1);
pg_sleep
---------------------------------------------------------------------
(1 row)
SHOW citus.max_shared_pool_size;
citus.max_shared_pool_size
---------------------------------------------------------------------
100
(1 row)
SHOW max_connections;
max_connections
---------------------------------------------------------------------
100
(1 row)
-- now, each node gets 16 connections as we force 1 connection per placement
BEGIN;
SET LOCAL citus.force_max_query_parallelization TO ON;
SELECT count(*) FROM test;
count
---------------------------------------------------------------------
101
(1 row)
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
connection_count_to_node
---------------------------------------------------------------------
16
16
(2 rows)
COMMIT;
BEGIN;
-- now allow at most 1 connection, and ensure that intermediate
-- results don't require any extra connections
SET LOCAL citus.max_adaptive_executor_pool_size TO 1;
SET LOCAL citus.task_assignment_policy TO "round-robin";
SELECT cnt FROM (SELECT count(*) as cnt, random() FROM test LIMIT 1) as foo;
cnt
---------------------------------------------------------------------
101
(1 row)
-- queries with intermediate results don't use any extra connections
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
connection_count_to_node
---------------------------------------------------------------------
1
1
(2 rows)
COMMIT;
-- now show that when max_cached_conns_per_worker > 1
-- Citus forces the first execution to open at least 2
-- connections that are cached. Later, that 2 cached
-- connections are user
BEGIN;
SET LOCAL citus.max_cached_conns_per_worker TO 2;
SELECT count(*) FROM test;
count
---------------------------------------------------------------------
101
(1 row)
SELECT
connection_count_to_node >= 2
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
?column?
---------------------------------------------------------------------
t
t
(2 rows)
SELECT count(*) FROM test;
count
---------------------------------------------------------------------
101
(1 row)
SELECT
connection_count_to_node >= 2
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
?column?
---------------------------------------------------------------------
t
t
(2 rows)
COMMIT;
-- in case other tests relies on these setting, reset them
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
ALTER SYSTEM RESET citus.recover_2pc_interval;
ALTER SYSTEM RESET citus.max_cached_conns_per_worker;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
DROP SCHEMA shared_connection_stats CASCADE;
NOTICE: drop cascades to table test

View File

@ -0,0 +1,33 @@
Parsed test spec with 3 sessions
starting permutation: s3-lower-pool-size s1-begin s1-count-slow s3-increase-pool-size s2-select s1-commit
step s3-lower-pool-size:
SELECT set_max_shared_pool_size(5);
set_max_shared_pool_size
step s1-begin:
BEGIN;
step s1-count-slow:
SELECT pg_sleep(0.1), count(*) FROM test;
pg_sleep count
101
step s3-increase-pool-size:
SELECT set_max_shared_pool_size(100);
set_max_shared_pool_size
step s2-select:
SELECT count(*) FROM test;
count
101
step s1-commit:
COMMIT;

View File

@ -37,3 +37,9 @@ test: failure_connection_establishment
# test that no tests leaked intermediate results. This should always be last
test: ensure_no_intermediate_data_leak
# ---------
# ensures that we never leak any connection counts
# in the shared memory
# --------
test: ensure_no_shared_connection_leak

View File

@ -62,6 +62,7 @@ test: isolation_validate_vs_insert
test: isolation_insert_select_conflict
test: isolation_ref2ref_foreign_keys
test: isolation_multiuser_locking
test: shared_connection_waits
# MX tests
test: isolation_reference_on_mx

View File

@ -50,3 +50,9 @@ test: locally_execute_intermediate_results
# test that no tests leaked intermediate results. This should always be last
test: ensure_no_intermediate_data_leak
# ---------
# ensures that we never leak any connection counts
# in the shared memory
# --------
test: ensure_no_shared_connection_leak

View File

@ -341,7 +341,19 @@ test: distributed_procedure
# ---------
test: multi_deparse_function multi_deparse_procedure
# --------
# cannot be run in parallel with any other tests as it checks
# statistics across sessions
# --------
test: shared_connection_stats
# ---------
# test that no tests leaked intermediate results. This should always be last
# ---------
test: ensure_no_intermediate_data_leak
# ---------
# ensures that we never leak any connection counts
# in the shared memory
# --------
test: ensure_no_shared_connection_leak

View File

@ -115,3 +115,9 @@ test: multi_schema_support
# test that no tests leaked intermediate results. This should always be last
# ----------
test: ensure_no_intermediate_data_leak
# ---------
# ensures that we never leak any connection counts
# in the shared memory
# --------
test: ensure_no_shared_connection_leak

View File

@ -0,0 +1,65 @@
setup
{
CREATE OR REPLACE FUNCTION wake_up_connection_pool_waiters()
RETURNS void
LANGUAGE C STABLE STRICT
AS 'citus', $$wake_up_connection_pool_waiters$$;
CREATE OR REPLACE FUNCTION set_max_shared_pool_size(int)
RETURNS void
LANGUAGE C STABLE STRICT
AS 'citus', $$set_max_shared_pool_size$$;
CREATE TABLE test (a int, b int);
SET citus.shard_count TO 32;
SELECT create_distributed_table('test', 'a');
INSERT INTO test SELECT i, i FROM generate_series(0,100)i;
}
teardown
{
SELECT set_max_shared_pool_size(100);
DROP FUNCTION wake_up_connection_pool_waiters();
DROP FUNCTION set_max_shared_pool_size(int);
}
session "s1"
step "s1-begin"
{
BEGIN;
}
step "s1-count-slow"
{
SELECT pg_sleep(0.1), count(*) FROM test;
}
step "s1-commit"
{
COMMIT;
}
session "s2"
step "s2-select"
{
SELECT count(*) FROM test;
}
session "s3"
step "s3-lower-pool-size"
{
SELECT set_max_shared_pool_size(5);
}
step "s3-increase-pool-size"
{
SELECT set_max_shared_pool_size(100);
}
permutation "s3-lower-pool-size" "s1-begin" "s1-count-slow" "s3-increase-pool-size" "s2-select" "s1-commit"

View File

@ -0,0 +1,83 @@
-- this test file is intended to be called at the end
-- of any test schedule, ensuring that there is not
-- leak/wrong calculation of the connection stats
-- in the shared memory
CREATE SCHEMA ensure_no_shared_connection_leak;
SET search_path TO ensure_no_shared_connection_leak;
-- set the cached connections to zero
-- and execute a distributed query so that
-- we end up with zero cached connections afterwards
ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0;
SELECT pg_reload_conf();
-- disable deadlock detection and re-trigger 2PC recovery
-- once more when citus.max_cached_conns_per_worker is zero
-- so that we can be sure that the connections established for
-- maintanince daemon is closed properly.
-- this is to prevent random failures in the tests (otherwise, we
-- might see connections established for this operations)
ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1;
ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms';
SELECT pg_reload_conf();
SELECT pg_sleep(0.1);
-- now that last 2PC recovery is done, we're good to disable it
ALTER SYSTEM SET citus.recover_2pc_interval TO '-1';
SELECT pg_reload_conf();
CREATE TABLE test (a int);
SELECT create_distributed_table('test', 'a');
SELECT count(*) FROM test;
-- in case of MX, we should prevent deadlock detection and
-- 2PC recover from the workers as well
\c - - - :worker_1_port
ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0;
SELECT pg_reload_conf();
ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1;
ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms';
SELECT pg_reload_conf();
SELECT pg_sleep(0.1);
ALTER SYSTEM SET citus.recover_2pc_interval TO '-1';
SELECT pg_reload_conf();
\c - - - :worker_2_port
ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0;
SELECT pg_reload_conf();
ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1;
ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms';
SELECT pg_reload_conf();
SELECT pg_sleep(0.1);
ALTER SYSTEM SET citus.recover_2pc_interval TO '-1';
SELECT pg_reload_conf();
\c - - - :master_port
SET search_path TO ensure_no_shared_connection_leak;
-- ensure that we only have at most citus.max_cached_conns_per_worker
-- connections per node
select
(connection_count_to_node = 0) as no_connection_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY 1;
-- now, ensure this from the workers perspective
-- we should only see the connection/backend that is running the command below
SELECT
result, success
FROM
run_command_on_workers($$select count(*) from pg_stat_activity WHERE backend_type = 'client backend';$$)
ORDER BY 1, 2;
-- in case other tests relies on these setting, reset them
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
ALTER SYSTEM RESET citus.recover_2pc_interval;
ALTER SYSTEM RESET citus.max_cached_conns_per_worker;
SELECT pg_reload_conf();
DROP SCHEMA ensure_no_shared_connection_leak CASCADE;

View File

@ -10,7 +10,6 @@ select create_distributed_table('test', 'a');
-- Make sure a connection is opened and cached
select count(*) from test where a = 0;
show citus.node_conninfo;
-- Set sslmode to something that does not work when connecting
@ -30,8 +29,8 @@ show citus.node_conninfo;
-- Should work again
select count(*) from test where a = 0;
ALTER SYSTEM SET citus.node_conninfo = 'sslmode=doesnotexist';
BEGIN;
-- Should still work (no SIGHUP yet);
select count(*) from test where a = 0;
@ -43,19 +42,15 @@ show citus.node_conninfo;
-- query
select count(*) from test where a = 0;
COMMIT;
-- Should fail now with connection error, when transaction is finished
select count(*) from test where a = 0;
-- Reset it again
ALTER SYSTEM RESET citus.node_conninfo;
select pg_reload_conf();
select pg_sleep(0.1); -- wait for config reload to apply
show citus.node_conninfo;
-- Should work again
select count(*) from test where a = 0;
ALTER SYSTEM SET citus.node_conninfo = 'sslmode=doesnotexist';
BEGIN;
-- Should still work (no SIGHUP yet);
@ -68,10 +63,8 @@ show citus.node_conninfo;
-- query
select count(*) from test where a = 0;
COMMIT;
-- Should fail now, when transaction is finished
select count(*) from test where a = 0;
-- Reset it again
ALTER SYSTEM RESET citus.node_conninfo;
select pg_reload_conf();

View File

@ -0,0 +1,228 @@
CREATE SCHEMA shared_connection_stats;
SET search_path TO shared_connection_stats;
-- set the cached connections to zero
-- and execute a distributed query so that
-- we end up with zero cached connections afterwards
ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0;
SELECT pg_reload_conf();
-- disable deadlock detection and re-trigger 2PC recovery
-- once more when citus.max_cached_conns_per_worker is zero
-- so that we can be sure that the connections established for
-- maintanince daemon is closed properly.
-- this is to prevent random failures in the tests (otherwise, we
-- might see connections established for this operations)
ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1;
ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms';
SELECT pg_reload_conf();
SELECT pg_sleep(0.1);
-- now that last 2PC recovery is done, we're good to disable it
ALTER SYSTEM SET citus.recover_2pc_interval TO '1h';
SELECT pg_reload_conf();
SET citus.shard_count TO 32;
SET citus.shard_replication_factor TO 1;
CREATE TABLE test (a int);
SELECT create_distributed_table('test', 'a');
INSERT INTO test SELECT i FROM generate_series(0,100)i;
-- show that no connections are cached
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
-- single shard queries require single connection per node
BEGIN;
SELECT count(*) FROM test WHERE a = 1;
SELECT count(*) FROM test WHERE a = 2;
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
COMMIT;
-- show that no connections are cached
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
-- executor is only allowed to establish a single connection per node
BEGIN;
SET LOCAL citus.max_adaptive_executor_pool_size TO 1;
SELECT count(*) FROM test;
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
COMMIT;
-- show that no connections are cached
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
-- sequential mode is allowed to establish a single connection per node
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
SELECT count(*) FROM test;
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
COMMIT;
-- show that no connections are cached
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
-- now, decrease the shared pool size, and still force
-- one connection per placement
ALTER SYSTEM SET citus.max_shared_pool_size TO 5;
SELECT pg_reload_conf();
SELECT pg_sleep(0.1);
BEGIN;
SET LOCAL citus.node_connection_timeout TO 1000;
SET LOCAL citus.force_max_query_parallelization TO ON;
SELECT count(*) FROM test;
COMMIT;
-- pg_sleep forces almost 1 connection per placement
-- now, some of the optional connections would be skipped,
-- and only 5 connections are used per node
BEGIN;
SELECT count(*), pg_sleep(0.1) FROM test;
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
COMMIT;
SHOW citus.max_shared_pool_size;
-- by default max_shared_pool_size equals to max_connections;
ALTER SYSTEM RESET citus.max_shared_pool_size;
SELECT pg_reload_conf();
SELECT pg_sleep(0.1);
SHOW citus.max_shared_pool_size;
SHOW max_connections;
-- now, each node gets 16 connections as we force 1 connection per placement
BEGIN;
SET LOCAL citus.force_max_query_parallelization TO ON;
SELECT count(*) FROM test;
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
COMMIT;
BEGIN;
-- now allow at most 1 connection, and ensure that intermediate
-- results don't require any extra connections
SET LOCAL citus.max_adaptive_executor_pool_size TO 1;
SET LOCAL citus.task_assignment_policy TO "round-robin";
SELECT cnt FROM (SELECT count(*) as cnt, random() FROM test LIMIT 1) as foo;
-- queries with intermediate results don't use any extra connections
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
COMMIT;
-- now show that when max_cached_conns_per_worker > 1
-- Citus forces the first execution to open at least 2
-- connections that are cached. Later, that 2 cached
-- connections are user
BEGIN;
SET LOCAL citus.max_cached_conns_per_worker TO 2;
SELECT count(*) FROM test;
SELECT
connection_count_to_node >= 2
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
SELECT count(*) FROM test;
SELECT
connection_count_to_node >= 2
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
COMMIT;
-- in case other tests relies on these setting, reset them
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
ALTER SYSTEM RESET citus.recover_2pc_interval;
ALTER SYSTEM RESET citus.max_cached_conns_per_worker;
SELECT pg_reload_conf();
DROP SCHEMA shared_connection_stats CASCADE;