Merge pull request #3692 from citusdata/shared_connection_counter

Throttle connections to the worker  nodes
pull/3744/head
Önder Kalacı 2020-04-14 10:37:57 +02:00 committed by GitHub
commit 9229db2081
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 2113 additions and 51 deletions

View File

@ -64,7 +64,6 @@
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
#include "utils/syscache.h" #include "utils/syscache.h"
bool EnableDDLPropagation = true; /* ddl propagation is enabled */ bool EnableDDLPropagation = true; /* ddl propagation is enabled */
PropSetCmdBehavior PropagateSetCommands = PROPSETCMD_NONE; /* SET prop off */ PropSetCmdBehavior PropagateSetCommands = PROPSETCMD_NONE; /* SET prop off */
static bool shouldInvalidateForeignKeyGraph = false; static bool shouldInvalidateForeignKeyGraph = false;

View File

@ -28,11 +28,13 @@
#include "distributed/hash_helpers.h" #include "distributed/hash_helpers.h"
#include "distributed/placement_connection.h" #include "distributed/placement_connection.h"
#include "distributed/run_from_same_connection.h" #include "distributed/run_from_same_connection.h"
#include "distributed/shared_connection_stats.h"
#include "distributed/cancel_utils.h" #include "distributed/cancel_utils.h"
#include "distributed/remote_commands.h" #include "distributed/remote_commands.h"
#include "distributed/version_compat.h" #include "distributed/version_compat.h"
#include "mb/pg_wchar.h" #include "mb/pg_wchar.h"
#include "portability/instr_time.h" #include "portability/instr_time.h"
#include "storage/ipc.h"
#include "utils/hsearch.h" #include "utils/hsearch.h"
#include "utils/memutils.h" #include "utils/memutils.h"
@ -46,7 +48,8 @@ MemoryContext ConnectionContext = NULL;
static uint32 ConnectionHashHash(const void *key, Size keysize); static uint32 ConnectionHashHash(const void *key, Size keysize);
static int ConnectionHashCompare(const void *a, const void *b, Size keysize); static int ConnectionHashCompare(const void *a, const void *b, Size keysize);
static MultiConnection * StartConnectionEstablishment(ConnectionHashKey *key); static void StartConnectionEstablishment(MultiConnection *connectionn,
ConnectionHashKey *key);
static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry); static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry);
static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit); static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit);
static bool ShouldShutdownConnection(MultiConnection *connection, const int static bool ShouldShutdownConnection(MultiConnection *connection, const int
@ -78,7 +81,7 @@ static WaitEventSet * WaitEventSetFromMultiConnectionStates(List *connections,
int *waitCount); int *waitCount);
static void CloseNotReadyMultiConnectionStates(List *connectionStates); static void CloseNotReadyMultiConnectionStates(List *connectionStates);
static uint32 MultiConnectionStateEventMask(MultiConnectionPollState *connectionState); static uint32 MultiConnectionStateEventMask(MultiConnectionPollState *connectionState);
static void CitusPQFinish(MultiConnection *connection);
static int CitusNoticeLogLevel = DEFAULT_CITUS_NOTICE_LEVEL; static int CitusNoticeLogLevel = DEFAULT_CITUS_NOTICE_LEVEL;
@ -257,7 +260,6 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
const char *user, const char *database) const char *user, const char *database)
{ {
ConnectionHashKey key; ConnectionHashKey key;
MultiConnection *connection;
bool found; bool found;
/* do some minimal input checks */ /* do some minimal input checks */
@ -310,24 +312,75 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
if (!(flags & FORCE_NEW_CONNECTION)) if (!(flags & FORCE_NEW_CONNECTION))
{ {
/* check connection cache for a connection that's not already in use */ /* check connection cache for a connection that's not already in use */
connection = FindAvailableConnection(entry->connections, flags); MultiConnection *connection = FindAvailableConnection(entry->connections, flags);
if (connection) if (connection)
{ {
return connection; return connection;
} }
} }
/* /*
* Either no caching desired, or no pre-established, non-claimed, * Either no caching desired, or no pre-established, non-claimed,
* connection present. Initiate connection establishment. * connection present. Initiate connection establishment.
*/ */
MultiConnection *connection = MemoryContextAllocZero(ConnectionContext,
connection = StartConnectionEstablishment(&key); sizeof(MultiConnection));
connection->initilizationState = POOL_STATE_NOT_INITIALIZED;
dlist_push_tail(entry->connections, &connection->connectionNode); dlist_push_tail(entry->connections, &connection->connectionNode);
/* these two flags are by nature cannot happen at the same time */
Assert(!((flags & WAIT_FOR_CONNECTION) && (flags & OPTIONAL_CONNECTION)));
if (flags & WAIT_FOR_CONNECTION)
{
WaitLoopForSharedConnection(hostname, port);
}
else if (flags & OPTIONAL_CONNECTION)
{
/*
* We can afford to skip establishing an optional connection. For
* non-optional connections, we first retry for some time. If we still
* cannot reserve the right to establish a connection, we prefer to
* error out.
*/
if (!TryToIncrementSharedConnectionCounter(hostname, port))
{
/* do not track the connection anymore */
dlist_delete(&connection->connectionNode);
pfree(connection);
return NULL;
}
}
else
{
/*
* The caller doesn't want the connection manager to wait
* until a connection slot is available on the remote node.
* In the end, we might fail to establish connection to the
* remote node as it might not have any space in
* max_connections for this connection establishment.
*
* Still, we keep track of the connnection counter.
*/
IncrementSharedConnectionCounter(hostname, port);
}
/*
* We've already incremented the counter above, so we should decrement
* when we're done with the connection.
*/
connection->initilizationState = POOL_STATE_COUNTER_INCREMENTED;
StartConnectionEstablishment(connection, &key);
ResetShardPlacementAssociation(connection); ResetShardPlacementAssociation(connection);
/* fully initialized the connection, record it */
connection->initilizationState = POOL_STATE_INITIALIZED;
return connection; return connection;
} }
@ -456,8 +509,7 @@ CloseConnection(MultiConnection *connection)
bool found; bool found;
/* close connection */ /* close connection */
PQfinish(connection->pgConn); CitusPQFinish(connection);
connection->pgConn = NULL;
strlcpy(key.hostname, connection->hostname, MAX_NODE_LENGTH); strlcpy(key.hostname, connection->hostname, MAX_NODE_LENGTH);
key.port = connection->port; key.port = connection->port;
@ -537,8 +589,7 @@ ShutdownConnection(MultiConnection *connection)
{ {
SendCancelationRequest(connection); SendCancelationRequest(connection);
} }
PQfinish(connection->pgConn); CitusPQFinish(connection);
connection->pgConn = NULL;
} }
@ -903,9 +954,30 @@ CloseNotReadyMultiConnectionStates(List *connectionStates)
} }
/* close connection, otherwise we take up resource on the other side */ /* close connection, otherwise we take up resource on the other side */
CitusPQFinish(connection);
}
}
/*
* CitusPQFinish is a wrapper around PQfinish and does book keeping on shared connection
* counters.
*/
static void
CitusPQFinish(MultiConnection *connection)
{
if (connection->pgConn != NULL)
{
PQfinish(connection->pgConn); PQfinish(connection->pgConn);
connection->pgConn = NULL; connection->pgConn = NULL;
} }
/* behave idempotently, there is no gurantee that CitusPQFinish() is called once */
if (connection->initilizationState >= POOL_STATE_COUNTER_INCREMENTED)
{
DecrementSharedConnectionCounter(connection->hostname, connection->port);
connection->initilizationState = POOL_STATE_NOT_INITIALIZED;
}
} }
@ -985,8 +1057,8 @@ ConnectionHashCompare(const void *a, const void *b, Size keysize)
* Asynchronously establish connection to a remote node, but don't wait for * Asynchronously establish connection to a remote node, but don't wait for
* that to finish. DNS lookups etc. are performed synchronously though. * that to finish. DNS lookups etc. are performed synchronously though.
*/ */
static MultiConnection * static void
StartConnectionEstablishment(ConnectionHashKey *key) StartConnectionEstablishment(MultiConnection *connection, ConnectionHashKey *key)
{ {
bool found = false; bool found = false;
static uint64 connectionId = 1; static uint64 connectionId = 1;
@ -1018,9 +1090,6 @@ StartConnectionEstablishment(ConnectionHashKey *key)
entry->isValid = true; entry->isValid = true;
} }
MultiConnection *connection = MemoryContextAllocZero(ConnectionContext,
sizeof(MultiConnection));
strlcpy(connection->hostname, key->hostname, MAX_NODE_LENGTH); strlcpy(connection->hostname, key->hostname, MAX_NODE_LENGTH);
connection->port = key->port; connection->port = key->port;
strlcpy(connection->database, key->database, NAMEDATALEN); strlcpy(connection->database, key->database, NAMEDATALEN);
@ -1040,8 +1109,6 @@ StartConnectionEstablishment(ConnectionHashKey *key)
PQsetnonblocking(connection->pgConn, true); PQsetnonblocking(connection->pgConn, true);
SetCitusNoticeProcessor(connection); SetCitusNoticeProcessor(connection);
return connection;
} }
@ -1166,6 +1233,7 @@ ShouldShutdownConnection(MultiConnection *connection, const int cachedConnection
} }
return isCitusInitiatedBackend || return isCitusInitiatedBackend ||
connection->initilizationState != POOL_STATE_INITIALIZED ||
cachedConnectionCount >= MaxCachedConnectionsPerWorker || cachedConnectionCount >= MaxCachedConnectionsPerWorker ||
connection->forceCloseAtTransactionEnd || connection->forceCloseAtTransactionEnd ||
PQstatus(connection->pgConn) != CONNECTION_OK || PQstatus(connection->pgConn) != CONNECTION_OK ||

View File

@ -0,0 +1,675 @@
/*-------------------------------------------------------------------------
*
* shared_connection_stats.c
* Keeps track of the number of connections to remote nodes across
* backends. The primary goal is to prevent excessive number of
* connections (typically > max_connections) to any worker node.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "pgstat.h"
#include "libpq-fe.h"
#include "miscadmin.h"
#include "access/hash.h"
#include "access/htup_details.h"
#include "catalog/pg_authid.h"
#include "commands/dbcommands.h"
#include "distributed/cancel_utils.h"
#include "distributed/connection_management.h"
#include "distributed/listutils.h"
#include "distributed/metadata_cache.h"
#include "distributed/shared_connection_stats.h"
#include "distributed/time_constants.h"
#include "distributed/tuplestore.h"
#include "utils/builtins.h"
#include "utils/hashutils.h"
#include "utils/hsearch.h"
#include "storage/ipc.h"
#define REMOTE_CONNECTION_STATS_COLUMNS 4
#define ADJUST_POOLSIZE_AUTOMATICALLY 0
#define DISABLE_CONNECTION_THROTTLING -1
/*
* The data structure used to store data in shared memory. This data structure is only
* used for storing the lock. The actual statistics about the connections are stored
* in the hashmap, which is allocated separately, as Postgres provides different APIs
* for allocating hashmaps in the shared memory.
*/
typedef struct ConnectionStatsSharedData
{
int sharedConnectionHashTrancheId;
char *sharedConnectionHashTrancheName;
LWLock sharedConnectionHashLock;
ConditionVariable waitersConditionVariable;
} ConnectionStatsSharedData;
typedef struct SharedConnStatsHashKey
{
/*
* We keep the entries in the shared memory even after master_update_node()
* as there might be some cached connections to the old node.
* That's why, we prefer to use "hostname/port" over nodeId.
*/
char hostname[MAX_NODE_LENGTH];
int32 port;
/*
* Given that citus.shared_max_pool_size can be defined per database, we
* should keep track of shared connections per database.
*/
Oid databaseOid;
} SharedConnStatsHashKey;
/* hash entry for per worker stats */
typedef struct SharedConnStatsHashEntry
{
SharedConnStatsHashKey key;
int connectionCount;
} SharedConnStatsHashEntry;
/*
* Controlled via a GUC, never access directly, use GetMaxSharedPoolSize().
* "0" means adjust MaxSharedPoolSize automatically by using MaxConnections.
* "-1" means do not apply connection throttling
* Anything else means use that number
*/
int MaxSharedPoolSize = 0;
/* the following two structs are used for accessing shared memory */
static HTAB *SharedConnStatsHash = NULL;
static ConnectionStatsSharedData *ConnectionStatsSharedState = NULL;
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
/* local function declarations */
static void StoreAllRemoteConnectionStats(Tuplestorestate *tupleStore, TupleDesc
tupleDescriptor);
static void LockConnectionSharedMemory(LWLockMode lockMode);
static void UnLockConnectionSharedMemory(void);
static void SharedConnectionStatsShmemInit(void);
static size_t SharedConnectionStatsShmemSize(void);
static int SharedConnectionHashCompare(const void *a, const void *b, Size keysize);
static uint32 SharedConnectionHashHash(const void *key, Size keysize);
PG_FUNCTION_INFO_V1(citus_remote_connection_stats);
/*
* citus_remote_connection_stats returns all the avaliable information about all
* the remote connections (a.k.a., connections to remote nodes).
*/
Datum
citus_remote_connection_stats(PG_FUNCTION_ARGS)
{
TupleDesc tupleDescriptor = NULL;
CheckCitusVersion(ERROR);
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
StoreAllRemoteConnectionStats(tupleStore, tupleDescriptor);
/* clean up and return the tuplestore */
tuplestore_donestoring(tupleStore);
PG_RETURN_VOID();
}
/*
* StoreAllRemoteConnectionStats gets connections established from the current node
* and inserts them into the given tuplestore.
*
* We don't need to enforce any access privileges as the number of backends
* on any node is already visible on pg_stat_activity to all users.
*/
static void
StoreAllRemoteConnectionStats(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor)
{
Datum values[REMOTE_CONNECTION_STATS_COLUMNS];
bool isNulls[REMOTE_CONNECTION_STATS_COLUMNS];
/* we're reading all shared connections, prevent any changes */
LockConnectionSharedMemory(LW_SHARED);
HASH_SEQ_STATUS status;
SharedConnStatsHashEntry *connectionEntry = NULL;
hash_seq_init(&status, SharedConnStatsHash);
while ((connectionEntry = (SharedConnStatsHashEntry *) hash_seq_search(&status)) != 0)
{
/* get ready for the next tuple */
memset(values, 0, sizeof(values));
memset(isNulls, false, sizeof(isNulls));
char *databaseName = get_database_name(connectionEntry->key.databaseOid);
if (databaseName == NULL)
{
/* database might have been dropped */
continue;
}
values[0] = PointerGetDatum(cstring_to_text(connectionEntry->key.hostname));
values[1] = Int32GetDatum(connectionEntry->key.port);
values[2] = PointerGetDatum(cstring_to_text(databaseName));
values[3] = Int32GetDatum(connectionEntry->connectionCount);
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
}
UnLockConnectionSharedMemory();
}
/*
* RemoveInactiveNodesFromSharedConnections goes over the SharedConnStatsHash
* and removes the inactive entries.
*/
void
RemoveInactiveNodesFromSharedConnections(void)
{
/* we're modifying connections, prevent any changes */
LockConnectionSharedMemory(LW_EXCLUSIVE);
HASH_SEQ_STATUS status;
SharedConnStatsHashEntry *connectionEntry = NULL;
/*
* In the first iteration, try to remove worker nodes that doesn't have any active
* conections and the node does not exits in the metadata anymore.
*/
hash_seq_init(&status, SharedConnStatsHash);
while ((connectionEntry = (SharedConnStatsHashEntry *) hash_seq_search(&status)) != 0)
{
SharedConnStatsHashKey connectionKey = connectionEntry->key;
WorkerNode *workerNode =
FindWorkerNode(connectionKey.hostname, connectionKey.port);
if (connectionEntry->connectionCount == 0 &&
(workerNode == NULL || !workerNode->isActive))
{
hash_search(SharedConnStatsHash, &connectionKey, HASH_REMOVE, NULL);
}
}
int entryCount = hash_get_num_entries(SharedConnStatsHash);
if (entryCount + 1 < MaxWorkerNodesTracked)
{
/* we're good, we have at least one more space for a new worker */
UnLockConnectionSharedMemory();
return;
}
/*
* We aimed to remove nodes that don't have any open connections. If we
* failed to find one, we have to be more aggressive and remove at least
* one of the inactive ones.
*/
hash_seq_init(&status, SharedConnStatsHash);
while ((connectionEntry = (SharedConnStatsHashEntry *) hash_seq_search(&status)) != 0)
{
SharedConnStatsHashKey connectionKey = connectionEntry->key;
WorkerNode *workerNode =
FindWorkerNode(connectionKey.hostname, connectionKey.port);
if (workerNode == NULL || !workerNode->isActive)
{
hash_search(SharedConnStatsHash, &connectionKey, HASH_REMOVE, NULL);
hash_seq_term(&status);
break;
}
}
UnLockConnectionSharedMemory();
}
/*
* GetMaxSharedPoolSize is a wrapper around MaxSharedPoolSize which is controlled
* via a GUC.
* "0" means adjust MaxSharedPoolSize automatically by using MaxConnections
* "-1" means do not apply connection throttling
* Anything else means use that number
*/
int
GetMaxSharedPoolSize(void)
{
if (MaxSharedPoolSize == ADJUST_POOLSIZE_AUTOMATICALLY)
{
return MaxConnections;
}
return MaxSharedPoolSize;
}
/*
* WaitLoopForSharedConnection tries to increment the shared connection
* counter for the given hostname/port and the current database in
* SharedConnStatsHash.
*
* The function implements a retry mechanism via a condition variable.
*/
void
WaitLoopForSharedConnection(const char *hostname, int port)
{
while (!TryToIncrementSharedConnectionCounter(hostname, port))
{
CHECK_FOR_INTERRUPTS();
WaitForSharedConnection();
}
ConditionVariableCancelSleep();
}
/*
* TryToIncrementSharedConnectionCounter tries to increment the shared
* connection counter for the given nodeId and the current database in
* SharedConnStatsHash.
*
* If the function returns true, the caller is allowed (and expected)
* to establish a new connection to the given node. Else, the caller
* is not allowed to establish a new connection.
*/
bool
TryToIncrementSharedConnectionCounter(const char *hostname, int port)
{
if (GetMaxSharedPoolSize() == DISABLE_CONNECTION_THROTTLING)
{
/* connection throttling disabled */
return true;
}
bool counterIncremented = false;
SharedConnStatsHashKey connKey;
strlcpy(connKey.hostname, hostname, MAX_NODE_LENGTH);
if (strlen(hostname) > MAX_NODE_LENGTH)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("hostname exceeds the maximum length of %d",
MAX_NODE_LENGTH)));
}
connKey.port = port;
connKey.databaseOid = MyDatabaseId;
LockConnectionSharedMemory(LW_EXCLUSIVE);
/*
* As the hash map is allocated in shared memory, it doesn't rely on palloc for
* memory allocation, so we could get NULL via HASH_ENTER_NULL when there is no
* space in the shared memory. That's why we prefer continuing the execution
* instead of throwing an error.
*/
bool entryFound = false;
SharedConnStatsHashEntry *connectionEntry =
hash_search(SharedConnStatsHash, &connKey, HASH_ENTER_NULL, &entryFound);
/*
* It is possible to throw an error at this point, but that doesn't help us in anyway.
* Instead, we try our best, let the connection establishment continue by-passing the
* connection throttling.
*/
if (!connectionEntry)
{
UnLockConnectionSharedMemory();
return true;
}
if (!entryFound)
{
/* we successfully allocated the entry for the first time, so initialize it */
connectionEntry->connectionCount = 1;
counterIncremented = true;
}
else if (connectionEntry->connectionCount + 1 > GetMaxSharedPoolSize())
{
/* there is no space left for this connection */
counterIncremented = false;
}
else
{
connectionEntry->connectionCount++;
counterIncremented = true;
}
UnLockConnectionSharedMemory();
return counterIncremented;
}
/*
* IncrementSharedConnectionCounter increments the shared counter
* for the given hostname and port.
*/
void
IncrementSharedConnectionCounter(const char *hostname, int port)
{
SharedConnStatsHashKey connKey;
if (GetMaxSharedPoolSize() == DISABLE_CONNECTION_THROTTLING)
{
/* connection throttling disabled */
return;
}
strlcpy(connKey.hostname, hostname, MAX_NODE_LENGTH);
if (strlen(hostname) > MAX_NODE_LENGTH)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("hostname exceeds the maximum length of %d",
MAX_NODE_LENGTH)));
}
connKey.port = port;
connKey.databaseOid = MyDatabaseId;
LockConnectionSharedMemory(LW_EXCLUSIVE);
/*
* As the hash map is allocated in shared memory, it doesn't rely on palloc for
* memory allocation, so we could get NULL via HASH_ENTER_NULL. That's why we prefer
* continuing the execution instead of throwing an error.
*/
bool entryFound = false;
SharedConnStatsHashEntry *connectionEntry =
hash_search(SharedConnStatsHash, &connKey, HASH_ENTER_NULL, &entryFound);
/*
* It is possible to throw an error at this point, but that doesn't help us in anyway.
* Instead, we try our best, let the connection establishment continue by-passing the
* connection throttling.
*/
if (!connectionEntry)
{
UnLockConnectionSharedMemory();
ereport(DEBUG4, (errmsg("No entry found for node %s:%d while incrementing "
"connection counter", hostname, port)));
return;
}
if (!entryFound)
{
/* we successfully allocated the entry for the first time, so initialize it */
connectionEntry->connectionCount = 0;
}
connectionEntry->connectionCount += 1;
UnLockConnectionSharedMemory();
}
/*
* DecrementSharedConnectionCounter decrements the shared counter
* for the given hostname and port.
*/
void
DecrementSharedConnectionCounter(const char *hostname, int port)
{
SharedConnStatsHashKey connKey;
if (GetMaxSharedPoolSize() == DISABLE_CONNECTION_THROTTLING)
{
/* connection throttling disabled */
return;
}
strlcpy(connKey.hostname, hostname, MAX_NODE_LENGTH);
if (strlen(hostname) > MAX_NODE_LENGTH)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("hostname exceeds the maximum length of %d",
MAX_NODE_LENGTH)));
}
connKey.port = port;
connKey.databaseOid = MyDatabaseId;
LockConnectionSharedMemory(LW_EXCLUSIVE);
bool entryFound = false;
SharedConnStatsHashEntry *connectionEntry =
hash_search(SharedConnStatsHash, &connKey, HASH_FIND, &entryFound);
/* this worker node is removed or updated, no need to care */
if (!entryFound)
{
UnLockConnectionSharedMemory();
/* wake up any waiters in case any backend is waiting for this node */
WakeupWaiterBackendsForSharedConnection();
ereport(DEBUG4, (errmsg("No entry found for node %s:%d while decrementing "
"connection counter", hostname, port)));
return;
}
/* we should never go below 0 */
Assert(connectionEntry->connectionCount > 0);
connectionEntry->connectionCount -= 1;
UnLockConnectionSharedMemory();
WakeupWaiterBackendsForSharedConnection();
}
/*
* LockConnectionSharedMemory is a utility function that should be used when
* accessing to the SharedConnStatsHash, which is in the shared memory.
*/
static void
LockConnectionSharedMemory(LWLockMode lockMode)
{
LWLockAcquire(&ConnectionStatsSharedState->sharedConnectionHashLock, lockMode);
}
/*
* UnLockConnectionSharedMemory is a utility function that should be used after
* LockConnectionSharedMemory().
*/
static void
UnLockConnectionSharedMemory(void)
{
LWLockRelease(&ConnectionStatsSharedState->sharedConnectionHashLock);
}
/*
* WakeupWaiterBackendsForSharedConnection is a wrapper around the condition variable
* broadcast operation.
*
* We use a single condition variable, for all worker nodes, to implement the connection
* throttling mechanism. Combination of all the backends are allowed to establish
* MaxSharedPoolSize number of connections per worker node. If a backend requires a
* non-optional connection (see WAIT_FOR_CONNECTION for details), it is not allowed
* to establish it immediately if the total connections are equal to MaxSharedPoolSize.
* Instead, the backend waits on the condition variable. When any other backend
* terminates an existing connection to any remote node, this function is called.
* The main goal is to trigger all waiting backends to try getting a connection slot
* in MaxSharedPoolSize. The ones which can get connection slot are allowed to continue
* with the connection establishments. Others should wait another backend to call
* this function.
*/
void
WakeupWaiterBackendsForSharedConnection(void)
{
ConditionVariableBroadcast(&ConnectionStatsSharedState->waitersConditionVariable);
}
/*
* WaitForSharedConnection is a wrapper around the condition variable sleep operation.
*
* For the details of the use of the condition variable, see
* WakeupWaiterBackendsForSharedConnection().
*/
void
WaitForSharedConnection(void)
{
ConditionVariableSleep(&ConnectionStatsSharedState->waitersConditionVariable,
PG_WAIT_EXTENSION);
}
/*
* InitializeSharedConnectionStats requests the necessary shared memory
* from Postgres and sets up the shared memory startup hook.
*/
void
InitializeSharedConnectionStats(void)
{
/* allocate shared memory */
if (!IsUnderPostmaster)
{
RequestAddinShmemSpace(SharedConnectionStatsShmemSize());
}
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = SharedConnectionStatsShmemInit;
}
/*
* SharedConnectionStatsShmemSize returns the size that should be allocated
* on the shared memory for shared connection stats.
*/
static size_t
SharedConnectionStatsShmemSize(void)
{
Size size = 0;
size = add_size(size, sizeof(ConnectionStatsSharedData));
Size hashSize = hash_estimate_size(MaxWorkerNodesTracked,
sizeof(SharedConnStatsHashEntry));
size = add_size(size, hashSize);
return size;
}
/*
* SharedConnectionStatsShmemInit initializes the shared memory used
* for keeping track of connection stats across backends.
*/
static void
SharedConnectionStatsShmemInit(void)
{
bool alreadyInitialized = false;
HASHCTL info;
/* create (hostname, port, database) -> [counter] */
memset(&info, 0, sizeof(info));
info.keysize = sizeof(SharedConnStatsHashKey);
info.entrysize = sizeof(SharedConnStatsHashEntry);
info.hash = SharedConnectionHashHash;
info.match = SharedConnectionHashCompare;
uint32 hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_COMPARE);
/*
* Currently the lock isn't required because allocation only happens at
* startup in postmaster, but it doesn't hurt, and makes things more
* consistent with other extensions.
*/
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
ConnectionStatsSharedState =
(ConnectionStatsSharedData *) ShmemInitStruct(
"Shared Connection Stats Data",
sizeof(ConnectionStatsSharedData),
&alreadyInitialized);
if (!alreadyInitialized)
{
ConnectionStatsSharedState->sharedConnectionHashTrancheId = LWLockNewTrancheId();
ConnectionStatsSharedState->sharedConnectionHashTrancheName =
"Shared Connection Tracking Hash Tranche";
LWLockRegisterTranche(ConnectionStatsSharedState->sharedConnectionHashTrancheId,
ConnectionStatsSharedState->sharedConnectionHashTrancheName);
LWLockInitialize(&ConnectionStatsSharedState->sharedConnectionHashLock,
ConnectionStatsSharedState->sharedConnectionHashTrancheId);
ConditionVariableInit(&ConnectionStatsSharedState->waitersConditionVariable);
}
/* allocate hash table */
SharedConnStatsHash =
ShmemInitHash("Shared Conn. Stats Hash", MaxWorkerNodesTracked,
MaxWorkerNodesTracked, &info, hashFlags);
LWLockRelease(AddinShmemInitLock);
Assert(SharedConnStatsHash != NULL);
Assert(ConnectionStatsSharedState->sharedConnectionHashTrancheId != 0);
if (prev_shmem_startup_hook != NULL)
{
prev_shmem_startup_hook();
}
}
static uint32
SharedConnectionHashHash(const void *key, Size keysize)
{
SharedConnStatsHashKey *entry = (SharedConnStatsHashKey *) key;
uint32 hash = string_hash(entry->hostname, NAMEDATALEN);
hash = hash_combine(hash, hash_uint32(entry->port));
hash = hash_combine(hash, hash_uint32(entry->databaseOid));
return hash;
}
static int
SharedConnectionHashCompare(const void *a, const void *b, Size keysize)
{
SharedConnStatsHashKey *ca = (SharedConnStatsHashKey *) a;
SharedConnStatsHashKey *cb = (SharedConnStatsHashKey *) b;
if (strncmp(ca->hostname, cb->hostname, MAX_NODE_LENGTH) != 0 ||
ca->port != cb->port ||
ca->databaseOid != cb->databaseOid)
{
return 1;
}
else
{
return 0;
}
}

View File

@ -589,6 +589,7 @@ static WorkerPool * FindOrCreateWorkerPool(DistributedExecution *execution,
static WorkerSession * FindOrCreateWorkerSession(WorkerPool *workerPool, static WorkerSession * FindOrCreateWorkerSession(WorkerPool *workerPool,
MultiConnection *connection); MultiConnection *connection);
static void ManageWorkerPool(WorkerPool *workerPool); static void ManageWorkerPool(WorkerPool *workerPool);
static bool ShouldWaitForConnection(WorkerPool *workerPool);
static void CheckConnectionTimeout(WorkerPool *workerPool); static void CheckConnectionTimeout(WorkerPool *workerPool);
static int UsableConnectionCount(WorkerPool *workerPool); static int UsableConnectionCount(WorkerPool *workerPool);
static long NextEventTimeout(DistributedExecution *execution); static long NextEventTimeout(DistributedExecution *execution);
@ -1944,9 +1945,6 @@ FindOrCreateWorkerPool(DistributedExecution *execution, char *nodeName, int node
workerPool->nodeName = pstrdup(nodeName); workerPool->nodeName = pstrdup(nodeName);
workerPool->nodePort = nodePort; workerPool->nodePort = nodePort;
INSTR_TIME_SET_ZERO(workerPool->poolStartTime);
workerPool->distributedExecution = execution;
/* "open" connections aggressively when there are cached connections */ /* "open" connections aggressively when there are cached connections */
int nodeConnectionCount = MaxCachedConnectionsPerWorker; int nodeConnectionCount = MaxCachedConnectionsPerWorker;
workerPool->maxNewConnectionsPerCycle = Max(1, nodeConnectionCount); workerPool->maxNewConnectionsPerCycle = Max(1, nodeConnectionCount);
@ -1954,6 +1952,8 @@ FindOrCreateWorkerPool(DistributedExecution *execution, char *nodeName, int node
dlist_init(&workerPool->pendingTaskQueue); dlist_init(&workerPool->pendingTaskQueue);
dlist_init(&workerPool->readyTaskQueue); dlist_init(&workerPool->readyTaskQueue);
workerPool->distributedExecution = execution;
execution->workerList = lappend(execution->workerList, workerPool); execution->workerList = lappend(execution->workerList, workerPool);
return workerPool; return workerPool;
@ -2385,11 +2385,48 @@ ManageWorkerPool(WorkerPool *workerPool)
connectionFlags |= OUTSIDE_TRANSACTION; connectionFlags |= OUTSIDE_TRANSACTION;
} }
if (UseConnectionPerPlacement())
{
/*
* User wants one connection per placement, so no throttling is desired
* and we do not set any flags.
*
* The primary reason for this is that allowing multiple backends to use
* connection per placement could lead to unresolved self deadlocks. In other
* words, each backend may stuck waiting for other backends to get a slot
* in the shared connection counters.
*/
}
else if (ShouldWaitForConnection(workerPool))
{
/*
* We need this connection to finish the execution. If it is not
* available based on the current number of connections to the worker
* then wait for it.
*/
connectionFlags |= WAIT_FOR_CONNECTION;
}
else
{
/*
* The executor can finish the execution with a single connection,
* remaining are optional. If the executor can get more connections,
* it can increase the parallelism.
*/
connectionFlags |= OPTIONAL_CONNECTION;
}
/* open a new connection to the worker */ /* open a new connection to the worker */
MultiConnection *connection = StartNodeUserDatabaseConnection(connectionFlags, MultiConnection *connection = StartNodeUserDatabaseConnection(connectionFlags,
workerPool->nodeName, workerPool->nodeName,
workerPool->nodePort, workerPool->nodePort,
NULL, NULL); NULL, NULL);
if (!connection)
{
/* connection can only be NULL for optional connections */
Assert((connectionFlags & OPTIONAL_CONNECTION));
continue;
}
/* /*
* Assign the initial state in the connection state machine. The connection * Assign the initial state in the connection state machine. The connection
@ -2404,6 +2441,17 @@ ManageWorkerPool(WorkerPool *workerPool)
*/ */
connection->claimedExclusively = true; connection->claimedExclusively = true;
if (list_length(workerPool->sessionList) == 0)
{
/*
* The worker pool has just started to establish connections. We need to
* defer this initilization after StartNodeUserDatabaseConnection()
* because for non-optional connections, we have some logic to wait
* until a connection is allowed to be established.
*/
INSTR_TIME_SET_ZERO(workerPool->poolStartTime);
}
/* create a session for the connection */ /* create a session for the connection */
WorkerSession *session = FindOrCreateWorkerSession(workerPool, connection); WorkerSession *session = FindOrCreateWorkerSession(workerPool, connection);
@ -2416,6 +2464,42 @@ ManageWorkerPool(WorkerPool *workerPool)
} }
/*
* ShouldWaitForConnection returns true if the workerPool should wait to
* get the next connection until one slot is empty within
* citus.max_shared_pool_size on the worker. Note that, if there is an
* empty slot, the connection will not wait anyway.
*/
static bool
ShouldWaitForConnection(WorkerPool *workerPool)
{
if (list_length(workerPool->sessionList) == 0)
{
/*
* We definitely need at least 1 connection to finish the execution.
* All single shard queries hit here with the default settings.
*/
return true;
}
if (list_length(workerPool->sessionList) < MaxCachedConnectionsPerWorker)
{
/*
* Until this session caches MaxCachedConnectionsPerWorker connections,
* this might lead some optional connections to be considered as non-optional
* when MaxCachedConnectionsPerWorker > 1.
*
* However, once the session caches MaxCachedConnectionsPerWorker (which is
* the second transaction executed in the session), Citus would utilize the
* cached connections as much as possible.
*/
return true;
}
return false;
}
/* /*
* CheckConnectionTimeout makes sure that the execution enforces the connection * CheckConnectionTimeout makes sure that the execution enforces the connection
* establishment timeout defined by the user (NodeConnectionTimeout). * establishment timeout defined by the user (NodeConnectionTimeout).

View File

@ -39,6 +39,7 @@
#include "distributed/remote_commands.h" #include "distributed/remote_commands.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "distributed/shardinterval_utils.h" #include "distributed/shardinterval_utils.h"
#include "distributed/shared_connection_stats.h"
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
#include "distributed/worker_transaction.h" #include "distributed/worker_transaction.h"
#include "lib/stringinfo.h" #include "lib/stringinfo.h"
@ -293,6 +294,9 @@ master_disable_node(PG_FUNCTION_ARGS)
bool onlyConsiderActivePlacements = false; bool onlyConsiderActivePlacements = false;
MemoryContext savedContext = CurrentMemoryContext; MemoryContext savedContext = CurrentMemoryContext;
/* remove the shared connection counters to have some space */
RemoveInactiveNodesFromSharedConnections();
PG_TRY(); PG_TRY();
{ {
if (NodeIsPrimary(workerNode)) if (NodeIsPrimary(workerNode))
@ -604,6 +608,9 @@ ActivateNode(char *nodeName, int nodePort)
{ {
bool isActive = true; bool isActive = true;
/* remove the shared connection counters to have some space */
RemoveInactiveNodesFromSharedConnections();
/* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */ /* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */
LockRelationOid(DistNodeRelationId(), ExclusiveLock); LockRelationOid(DistNodeRelationId(), ExclusiveLock);
@ -646,6 +653,9 @@ master_update_node(PG_FUNCTION_ARGS)
CheckCitusVersion(ERROR); CheckCitusVersion(ERROR);
/* remove the shared connection counters to have some space */
RemoveInactiveNodesFromSharedConnections();
WorkerNode *workerNodeWithSameAddress = FindWorkerNodeAnyCluster(newNodeNameString, WorkerNode *workerNodeWithSameAddress = FindWorkerNodeAnyCluster(newNodeNameString,
newNodePort); newNodePort);
if (workerNodeWithSameAddress != NULL) if (workerNodeWithSameAddress != NULL)
@ -672,6 +682,7 @@ master_update_node(PG_FUNCTION_ARGS)
errmsg("node %u not found", nodeId))); errmsg("node %u not found", nodeId)));
} }
/* /*
* If the node is a primary node we block reads and writes. * If the node is a primary node we block reads and writes.
* *
@ -715,8 +726,9 @@ master_update_node(PG_FUNCTION_ARGS)
UpdateNodeLocation(nodeId, newNodeNameString, newNodePort); UpdateNodeLocation(nodeId, newNodeNameString, newNodePort);
strlcpy(workerNode->workerName, newNodeNameString, WORKER_LENGTH); /* we should be able to find the new node from the metadata */
workerNode->workerPort = newNodePort; workerNode = FindWorkerNode(newNodeNameString, newNodePort);
Assert(workerNode->nodeId == nodeId);
/* /*
* Propagate the updated pg_dist_node entry to all metadata workers. * Propagate the updated pg_dist_node entry to all metadata workers.
@ -1012,8 +1024,10 @@ ReadDistNode(bool includeNodesFromOtherClusters)
static void static void
RemoveNodeFromCluster(char *nodeName, int32 nodePort) RemoveNodeFromCluster(char *nodeName, int32 nodePort)
{ {
WorkerNode *workerNode = ModifiableWorkerNode(nodeName, nodePort); /* remove the shared connection counters to have some space */
RemoveInactiveNodesFromSharedConnections();
WorkerNode *workerNode = ModifiableWorkerNode(nodeName, nodePort);
if (NodeIsPrimary(workerNode)) if (NodeIsPrimary(workerNode))
{ {
bool onlyConsiderActivePlacements = false; bool onlyConsiderActivePlacements = false;

View File

@ -56,6 +56,7 @@
#include "distributed/reference_table_utils.h" #include "distributed/reference_table_utils.h"
#include "distributed/relation_access_tracking.h" #include "distributed/relation_access_tracking.h"
#include "distributed/run_from_same_connection.h" #include "distributed/run_from_same_connection.h"
#include "distributed/shared_connection_stats.h"
#include "distributed/query_pushdown_planning.h" #include "distributed/query_pushdown_planning.h"
#include "distributed/time_constants.h" #include "distributed/time_constants.h"
#include "distributed/query_stats.h" #include "distributed/query_stats.h"
@ -72,6 +73,7 @@
#include "distributed/adaptive_executor.h" #include "distributed/adaptive_executor.h"
#include "port/atomics.h" #include "port/atomics.h"
#include "postmaster/postmaster.h" #include "postmaster/postmaster.h"
#include "storage/ipc.h"
#include "optimizer/planner.h" #include "optimizer/planner.h"
#include "optimizer/paths.h" #include "optimizer/paths.h"
#include "tcop/tcopprot.h" #include "tcop/tcopprot.h"
@ -88,9 +90,10 @@ static char *CitusVersion = CITUS_VERSION;
void _PG_init(void); void _PG_init(void);
static void CitusBackendAtExit(void);
static void ResizeStackToMaximumDepth(void); static void ResizeStackToMaximumDepth(void);
static void multi_log_hook(ErrorData *edata); static void multi_log_hook(ErrorData *edata);
static void RegisterConnectionCleanup(void);
static void CitusCleanupConnectionsAtExit(int code, Datum arg);
static void CreateRequiredDirectories(void); static void CreateRequiredDirectories(void);
static void RegisterCitusConfigVariables(void); static void RegisterCitusConfigVariables(void);
static bool ErrorIfNotASuitableDeadlockFactor(double *newval, void **extra, static bool ErrorIfNotASuitableDeadlockFactor(double *newval, void **extra,
@ -98,6 +101,7 @@ static bool ErrorIfNotASuitableDeadlockFactor(double *newval, void **extra,
static bool WarnIfDeprecatedExecutorUsed(int *newval, void **extra, GucSource source); static bool WarnIfDeprecatedExecutorUsed(int *newval, void **extra, GucSource source);
static bool NodeConninfoGucCheckHook(char **newval, void **extra, GucSource source); static bool NodeConninfoGucCheckHook(char **newval, void **extra, GucSource source);
static void NodeConninfoGucAssignHook(const char *newval, void *extra); static void NodeConninfoGucAssignHook(const char *newval, void *extra);
static const char * MaxSharedPoolSizeGucShowHook(void);
static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource
source); source);
@ -271,8 +275,7 @@ _PG_init(void)
InitializeConnectionManagement(); InitializeConnectionManagement();
InitPlacementConnectionManagement(); InitPlacementConnectionManagement();
InitializeCitusQueryStats(); InitializeCitusQueryStats();
InitializeSharedConnectionStats();
atexit(CitusBackendAtExit);
/* enable modification of pg_catalog tables during pg_upgrade */ /* enable modification of pg_catalog tables during pg_upgrade */
if (IsBinaryUpgrade) if (IsBinaryUpgrade)
@ -283,18 +286,6 @@ _PG_init(void)
} }
/*
* CitusBackendAtExit is called atexit of the backend for the purposes of
* any clean-up needed.
*/
static void
CitusBackendAtExit(void)
{
/* properly close all the cached connections */
ShutdownAllConnections();
}
/* /*
* Stack size increase during high memory load may cause unexpected crashes. * Stack size increase during high memory load may cause unexpected crashes.
* With this alloca call, we are increasing stack size explicitly, so that if * With this alloca call, we are increasing stack size explicitly, so that if
@ -380,6 +371,37 @@ StartupCitusBackend(void)
{ {
InitializeMaintenanceDaemonBackend(); InitializeMaintenanceDaemonBackend();
InitializeBackendData(); InitializeBackendData();
RegisterConnectionCleanup();
}
/*
* RegisterConnectionCleanup cleans up any resources left at the end of the
* session. We prefer to cleanup before shared memory exit to make sure that
* this session properly releases anything hold in the shared memory.
*/
static void
RegisterConnectionCleanup(void)
{
static bool registeredCleanup = false;
if (registeredCleanup == false)
{
before_shmem_exit(CitusCleanupConnectionsAtExit, 0);
registeredCleanup = true;
}
}
/*
* CitusCleanupConnectionsAtExit is called before_shmem_exit() of the
* backend for the purposes of any clean-up needed.
*/
static void
CitusCleanupConnectionsAtExit(int code, Datum arg)
{
/* properly close all the cached connections */
ShutdownAllConnections();
} }
@ -928,6 +950,20 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD, GUC_STANDARD,
NULL, NULL, NULL); NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.max_shared_pool_size",
gettext_noop("Sets the maximum number of connections allowed per worker node "
"across all the backends from this node. Setting to -1 disables "
"connections throttling. Setting to 0 makes it auto-adjust, meaning "
"equal to max_connections on the coordinator."),
gettext_noop("As a rule of thumb, the value should be at most equal to the "
"max_connections on the remote nodes."),
&MaxSharedPoolSize,
0, -1, INT_MAX,
PGC_SIGHUP,
GUC_SUPERUSER_ONLY,
NULL, NULL, MaxSharedPoolSizeGucShowHook);
DefineCustomIntVariable( DefineCustomIntVariable(
"citus.max_worker_nodes_tracked", "citus.max_worker_nodes_tracked",
gettext_noop("Sets the maximum number of worker nodes that are tracked."), gettext_noop("Sets the maximum number of worker nodes that are tracked."),
@ -935,9 +971,14 @@ RegisterCitusConfigVariables(void)
"health status are tracked in a shared hash table on " "health status are tracked in a shared hash table on "
"the master node. This configuration value limits the " "the master node. This configuration value limits the "
"size of the hash table, and consequently the maximum " "size of the hash table, and consequently the maximum "
"number of worker nodes that can be tracked."), "number of worker nodes that can be tracked."
"Citus keeps some information about the worker nodes "
"in the shared memory for certain optimizations. The "
"optimizations are enforced up to this number of worker "
"nodes. Any additional worker nodes may not benefit from"
"the optimizations."),
&MaxWorkerNodesTracked, &MaxWorkerNodesTracked,
2048, 8, INT_MAX, 2048, 1024, INT_MAX,
PGC_POSTMASTER, PGC_POSTMASTER,
GUC_STANDARD, GUC_STANDARD,
NULL, NULL, NULL); NULL, NULL, NULL);
@ -1520,6 +1561,28 @@ NodeConninfoGucAssignHook(const char *newval, void *extra)
} }
/*
* MaxSharedPoolSizeGucShowHook overrides the value that is shown to the
* user when the default value has not been set.
*/
static const char *
MaxSharedPoolSizeGucShowHook(void)
{
StringInfo newvalue = makeStringInfo();
if (MaxSharedPoolSize == 0)
{
appendStringInfo(newvalue, "%d", GetMaxSharedPoolSize());
}
else
{
appendStringInfo(newvalue, "%d", MaxSharedPoolSize);
}
return (const char *) newvalue->data;
}
static bool static bool
StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource source) StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource source)
{ {

View File

@ -5,3 +5,4 @@
#include "udfs/citus_extradata_container/9.3-2.sql" #include "udfs/citus_extradata_container/9.3-2.sql"
#include "udfs/update_distributed_table_colocation/9.3-2.sql" #include "udfs/update_distributed_table_colocation/9.3-2.sql"
#include "udfs/replicate_reference_tables/9.3-2.sql" #include "udfs/replicate_reference_tables/9.3-2.sql"
#include "udfs/citus_remote_connection_stats/9.3-2.sql"

View File

@ -0,0 +1,22 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_remote_connection_stats(
OUT hostname text,
OUT port int,
OUT database_name text,
OUT connection_count_to_node int)
RETURNS SETOF RECORD
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_remote_connection_stats$$;
COMMENT ON FUNCTION pg_catalog.citus_remote_connection_stats(
OUT hostname text,
OUT port int,
OUT database_name text,
OUT connection_count_to_node int)
IS 'returns statistics about remote connections';
REVOKE ALL ON FUNCTION pg_catalog.citus_remote_connection_stats(
OUT hostname text,
OUT port int,
OUT database_name text,
OUT connection_count_to_node int)
FROM PUBLIC;

View File

@ -0,0 +1,22 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_remote_connection_stats(
OUT hostname text,
OUT port int,
OUT database_name text,
OUT connection_count_to_node int)
RETURNS SETOF RECORD
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_remote_connection_stats$$;
COMMENT ON FUNCTION pg_catalog.citus_remote_connection_stats(
OUT hostname text,
OUT port int,
OUT database_name text,
OUT connection_count_to_node int)
IS 'returns statistics about remote connections';
REVOKE ALL ON FUNCTION pg_catalog.citus_remote_connection_stats(
OUT hostname text,
OUT port int,
OUT database_name text,
OUT connection_count_to_node int)
FROM PUBLIC;

View File

@ -0,0 +1,66 @@
/*-------------------------------------------------------------------------
*
* test/src/sequential_execution.c
*
* This file contains functions to test setting citus.multi_shard_modify_mode
* GUC.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include "fmgr.h"
#include "distributed/shared_connection_stats.h"
#include "distributed/listutils.h"
#include "nodes/parsenodes.h"
#include "utils/guc.h"
/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(wake_up_connection_pool_waiters);
PG_FUNCTION_INFO_V1(set_max_shared_pool_size);
/*
* wake_up_waiters_backends is a SQL
* interface for testing WakeupWaiterBackendsForSharedConnection().
*/
Datum
wake_up_connection_pool_waiters(PG_FUNCTION_ARGS)
{
WakeupWaiterBackendsForSharedConnection();
PG_RETURN_VOID();
}
/*
* set_max_shared_pool_size is a SQL
* interface for setting MaxSharedPoolSize. We use this function in isolation
* tester where ALTER SYSTEM is not allowed.
*/
Datum
set_max_shared_pool_size(PG_FUNCTION_ARGS)
{
int value = PG_GETARG_INT32(0);
AlterSystemStmt *alterSystemStmt = palloc0(sizeof(AlterSystemStmt));
A_Const *aConstValue = makeNode(A_Const);
aConstValue->val = *makeInteger(value);
alterSystemStmt->setstmt = makeNode(VariableSetStmt);
alterSystemStmt->setstmt->name = "citus.max_shared_pool_size";
alterSystemStmt->setstmt->is_local = false;
alterSystemStmt->setstmt->kind = VAR_SET_VALUE;
alterSystemStmt->setstmt->args = list_make1(aConstValue);
AlterSystemSetConfigFile(alterSystemStmt);
kill(PostmasterPid, SIGHUP);
PG_RETURN_VOID();
}

View File

@ -31,6 +31,7 @@
#include "distributed/repartition_join_execution.h" #include "distributed/repartition_join_execution.h"
#include "distributed/transaction_management.h" #include "distributed/transaction_management.h"
#include "distributed/placement_connection.h" #include "distributed/placement_connection.h"
#include "distributed/shared_connection_stats.h"
#include "distributed/subplan_execution.h" #include "distributed/subplan_execution.h"
#include "distributed/version_compat.h" #include "distributed/version_compat.h"
#include "utils/hsearch.h" #include "utils/hsearch.h"

View File

@ -52,10 +52,29 @@ enum MultiConnectionMode
/* open a connection per (co-located set of) placement(s) */ /* open a connection per (co-located set of) placement(s) */
CONNECTION_PER_PLACEMENT = 1 << 3, CONNECTION_PER_PLACEMENT = 1 << 3,
OUTSIDE_TRANSACTION = 1 << 4 OUTSIDE_TRANSACTION = 1 << 4,
/*
* Some connections are optional such as when adaptive executor is executing
* a multi-shard command and requires the second (or further) connections
* per node. In that case, the connection manager may decide not to allow the
* connection.
*/
OPTIONAL_CONNECTION = 1 << 5,
/*
* When this flag is passed, via connection throttling, the connection
* establishments may be suspended until a connection slot is available to
* the remote host.
*/
WAIT_FOR_CONNECTION = 1 << 6
}; };
/*
* This state is used for keeping track of the initilization
* of the underlying pg_conn struct.
*/
typedef enum MultiConnectionState typedef enum MultiConnectionState
{ {
MULTI_CONNECTION_INITIAL, MULTI_CONNECTION_INITIAL,
@ -65,6 +84,21 @@ typedef enum MultiConnectionState
MULTI_CONNECTION_LOST MULTI_CONNECTION_LOST
} MultiConnectionState; } MultiConnectionState;
/*
* This state is used for keeping track of the initilization
* of MultiConnection struct, not specifically the underlying
* pg_conn. The state is useful to determine the action during
* clean-up of connections.
*/
typedef enum MultiConnectionStructInitializationState
{
POOL_STATE_NOT_INITIALIZED,
POOL_STATE_COUNTER_INCREMENTED,
POOL_STATE_INITIALIZED
} MultiConnectionStructInitializationState;
/* declaring this directly above makes uncrustify go crazy */ /* declaring this directly above makes uncrustify go crazy */
typedef enum MultiConnectionMode MultiConnectionMode; typedef enum MultiConnectionMode MultiConnectionMode;
@ -114,6 +148,8 @@ typedef struct MultiConnection
/* number of bytes sent to PQputCopyData() since last flush */ /* number of bytes sent to PQputCopyData() since last flush */
uint64 copyBytesWrittenSinceLastFlush; uint64 copyBytesWrittenSinceLastFlush;
MultiConnectionStructInitializationState initilizationState;
} MultiConnection; } MultiConnection;

View File

@ -0,0 +1,27 @@
/*-------------------------------------------------------------------------
*
* shared_connection_stats.h
* Central management of connections and their life-cycle
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef SHARED_CONNECTION_STATS_H
#define SHARED_CONNECTION_STATS_H
extern int MaxSharedPoolSize;
extern void InitializeSharedConnectionStats(void);
extern void WaitForSharedConnection(void);
extern void WakeupWaiterBackendsForSharedConnection(void);
extern void RemoveInactiveNodesFromSharedConnections(void);
extern int GetMaxSharedPoolSize(void);
extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port);
extern void WaitLoopForSharedConnection(const char *hostname, int port);
extern void DecrementSharedConnectionCounter(const char *hostname, int port);
extern void IncrementSharedConnectionCounter(const char *hostname, int port);
#endif /* SHARED_CONNECTION_STATS_H */

View File

@ -0,0 +1,160 @@
-- this test file is intended to be called at the end
-- of any test schedule, ensuring that there is not
-- leak/wrong calculation of the connection stats
-- in the shared memory
CREATE SCHEMA ensure_no_shared_connection_leak;
SET search_path TO ensure_no_shared_connection_leak;
-- set the cached connections to zero
-- and execute a distributed query so that
-- we end up with zero cached connections afterwards
ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
-- disable deadlock detection and re-trigger 2PC recovery
-- once more when citus.max_cached_conns_per_worker is zero
-- so that we can be sure that the connections established for
-- maintanince daemon is closed properly.
-- this is to prevent random failures in the tests (otherwise, we
-- might see connections established for this operations)
ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1;
ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms';
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT pg_sleep(0.1);
pg_sleep
---------------------------------------------------------------------
(1 row)
-- now that last 2PC recovery is done, we're good to disable it
ALTER SYSTEM SET citus.recover_2pc_interval TO '-1';
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
CREATE TABLE test (a int);
SELECT create_distributed_table('test', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT count(*) FROM test;
count
---------------------------------------------------------------------
0
(1 row)
-- in case of MX, we should prevent deadlock detection and
-- 2PC recover from the workers as well
\c - - - :worker_1_port
ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1;
ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms';
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT pg_sleep(0.1);
pg_sleep
---------------------------------------------------------------------
(1 row)
ALTER SYSTEM SET citus.recover_2pc_interval TO '-1';
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
\c - - - :worker_2_port
ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1;
ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms';
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT pg_sleep(0.1);
pg_sleep
---------------------------------------------------------------------
(1 row)
ALTER SYSTEM SET citus.recover_2pc_interval TO '-1';
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
\c - - - :master_port
SET search_path TO ensure_no_shared_connection_leak;
-- ensure that we only have at most citus.max_cached_conns_per_worker
-- connections per node
select
(connection_count_to_node = 0) as no_connection_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY 1;
no_connection_to_node
---------------------------------------------------------------------
t
t
(2 rows)
-- now, ensure this from the workers perspective
-- we should only see the connection/backend that is running the command below
SELECT
result, success
FROM
run_command_on_workers($$select count(*) from pg_stat_activity WHERE backend_type = 'client backend';$$)
ORDER BY 1, 2;
result | success
---------------------------------------------------------------------
1 | t
1 | t
(2 rows)
-- in case other tests relies on these setting, reset them
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
ALTER SYSTEM RESET citus.recover_2pc_interval;
ALTER SYSTEM RESET citus.max_cached_conns_per_worker;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
DROP SCHEMA ensure_no_shared_connection_leak CASCADE;
NOTICE: drop cascades to table test

View File

@ -0,0 +1,391 @@
CREATE SCHEMA shared_connection_stats;
SET search_path TO shared_connection_stats;
-- set the cached connections to zero
-- and execute a distributed query so that
-- we end up with zero cached connections afterwards
ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
-- disable deadlock detection and re-trigger 2PC recovery
-- once more when citus.max_cached_conns_per_worker is zero
-- so that we can be sure that the connections established for
-- maintanince daemon is closed properly.
-- this is to prevent random failures in the tests (otherwise, we
-- might see connections established for this operations)
ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1;
ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms';
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT pg_sleep(0.1);
pg_sleep
---------------------------------------------------------------------
(1 row)
-- now that last 2PC recovery is done, we're good to disable it
ALTER SYSTEM SET citus.recover_2pc_interval TO '1h';
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SET citus.shard_count TO 32;
SET citus.shard_replication_factor TO 1;
CREATE TABLE test (a int);
SELECT create_distributed_table('test', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO test SELECT i FROM generate_series(0,100)i;
-- show that no connections are cached
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
connection_count_to_node
---------------------------------------------------------------------
0
0
(2 rows)
-- single shard queries require single connection per node
BEGIN;
SELECT count(*) FROM test WHERE a = 1;
count
---------------------------------------------------------------------
1
(1 row)
SELECT count(*) FROM test WHERE a = 2;
count
---------------------------------------------------------------------
1
(1 row)
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
connection_count_to_node
---------------------------------------------------------------------
1
1
(2 rows)
COMMIT;
-- show that no connections are cached
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
connection_count_to_node
---------------------------------------------------------------------
0
0
(2 rows)
-- executor is only allowed to establish a single connection per node
BEGIN;
SET LOCAL citus.max_adaptive_executor_pool_size TO 1;
SELECT count(*) FROM test;
count
---------------------------------------------------------------------
101
(1 row)
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
connection_count_to_node
---------------------------------------------------------------------
1
1
(2 rows)
COMMIT;
-- show that no connections are cached
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
connection_count_to_node
---------------------------------------------------------------------
0
0
(2 rows)
-- sequential mode is allowed to establish a single connection per node
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
SELECT count(*) FROM test;
count
---------------------------------------------------------------------
101
(1 row)
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
connection_count_to_node
---------------------------------------------------------------------
1
1
(2 rows)
COMMIT;
-- show that no connections are cached
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
connection_count_to_node
---------------------------------------------------------------------
0
0
(2 rows)
-- now, decrease the shared pool size, and still force
-- one connection per placement
ALTER SYSTEM SET citus.max_shared_pool_size TO 5;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT pg_sleep(0.1);
pg_sleep
---------------------------------------------------------------------
(1 row)
BEGIN;
SET LOCAL citus.node_connection_timeout TO 1000;
SET LOCAL citus.force_max_query_parallelization TO ON;
SELECT count(*) FROM test;
count
---------------------------------------------------------------------
101
(1 row)
COMMIT;
-- pg_sleep forces almost 1 connection per placement
-- now, some of the optional connections would be skipped,
-- and only 5 connections are used per node
BEGIN;
SELECT count(*), pg_sleep(0.1) FROM test;
count | pg_sleep
---------------------------------------------------------------------
101 |
(1 row)
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
connection_count_to_node
---------------------------------------------------------------------
5
5
(2 rows)
COMMIT;
SHOW citus.max_shared_pool_size;
citus.max_shared_pool_size
---------------------------------------------------------------------
5
(1 row)
-- by default max_shared_pool_size equals to max_connections;
ALTER SYSTEM RESET citus.max_shared_pool_size;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT pg_sleep(0.1);
pg_sleep
---------------------------------------------------------------------
(1 row)
SHOW citus.max_shared_pool_size;
citus.max_shared_pool_size
---------------------------------------------------------------------
100
(1 row)
SHOW max_connections;
max_connections
---------------------------------------------------------------------
100
(1 row)
-- now, each node gets 16 connections as we force 1 connection per placement
BEGIN;
SET LOCAL citus.force_max_query_parallelization TO ON;
SELECT count(*) FROM test;
count
---------------------------------------------------------------------
101
(1 row)
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
connection_count_to_node
---------------------------------------------------------------------
16
16
(2 rows)
COMMIT;
BEGIN;
-- now allow at most 1 connection, and ensure that intermediate
-- results don't require any extra connections
SET LOCAL citus.max_adaptive_executor_pool_size TO 1;
SET LOCAL citus.task_assignment_policy TO "round-robin";
SELECT cnt FROM (SELECT count(*) as cnt, random() FROM test LIMIT 1) as foo;
cnt
---------------------------------------------------------------------
101
(1 row)
-- queries with intermediate results don't use any extra connections
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
connection_count_to_node
---------------------------------------------------------------------
1
1
(2 rows)
COMMIT;
-- now show that when max_cached_conns_per_worker > 1
-- Citus forces the first execution to open at least 2
-- connections that are cached. Later, that 2 cached
-- connections are user
BEGIN;
SET LOCAL citus.max_cached_conns_per_worker TO 2;
SELECT count(*) FROM test;
count
---------------------------------------------------------------------
101
(1 row)
SELECT
connection_count_to_node >= 2
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
?column?
---------------------------------------------------------------------
t
t
(2 rows)
SELECT count(*) FROM test;
count
---------------------------------------------------------------------
101
(1 row)
SELECT
connection_count_to_node >= 2
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
?column?
---------------------------------------------------------------------
t
t
(2 rows)
COMMIT;
-- in case other tests relies on these setting, reset them
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
ALTER SYSTEM RESET citus.recover_2pc_interval;
ALTER SYSTEM RESET citus.max_cached_conns_per_worker;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
DROP SCHEMA shared_connection_stats CASCADE;
NOTICE: drop cascades to table test

View File

@ -0,0 +1,33 @@
Parsed test spec with 3 sessions
starting permutation: s3-lower-pool-size s1-begin s1-count-slow s3-increase-pool-size s2-select s1-commit
step s3-lower-pool-size:
SELECT set_max_shared_pool_size(5);
set_max_shared_pool_size
step s1-begin:
BEGIN;
step s1-count-slow:
SELECT pg_sleep(0.1), count(*) FROM test;
pg_sleep count
101
step s3-increase-pool-size:
SELECT set_max_shared_pool_size(100);
set_max_shared_pool_size
step s2-select:
SELECT count(*) FROM test;
count
101
step s1-commit:
COMMIT;

View File

@ -37,3 +37,9 @@ test: failure_connection_establishment
# test that no tests leaked intermediate results. This should always be last # test that no tests leaked intermediate results. This should always be last
test: ensure_no_intermediate_data_leak test: ensure_no_intermediate_data_leak
# ---------
# ensures that we never leak any connection counts
# in the shared memory
# --------
test: ensure_no_shared_connection_leak

View File

@ -62,6 +62,7 @@ test: isolation_validate_vs_insert
test: isolation_insert_select_conflict test: isolation_insert_select_conflict
test: isolation_ref2ref_foreign_keys test: isolation_ref2ref_foreign_keys
test: isolation_multiuser_locking test: isolation_multiuser_locking
test: shared_connection_waits
# MX tests # MX tests
test: isolation_reference_on_mx test: isolation_reference_on_mx

View File

@ -50,3 +50,9 @@ test: locally_execute_intermediate_results
# test that no tests leaked intermediate results. This should always be last # test that no tests leaked intermediate results. This should always be last
test: ensure_no_intermediate_data_leak test: ensure_no_intermediate_data_leak
# ---------
# ensures that we never leak any connection counts
# in the shared memory
# --------
test: ensure_no_shared_connection_leak

View File

@ -341,7 +341,19 @@ test: distributed_procedure
# --------- # ---------
test: multi_deparse_function multi_deparse_procedure test: multi_deparse_function multi_deparse_procedure
# --------
# cannot be run in parallel with any other tests as it checks
# statistics across sessions
# --------
test: shared_connection_stats
# --------- # ---------
# test that no tests leaked intermediate results. This should always be last # test that no tests leaked intermediate results. This should always be last
# --------- # ---------
test: ensure_no_intermediate_data_leak test: ensure_no_intermediate_data_leak
# ---------
# ensures that we never leak any connection counts
# in the shared memory
# --------
test: ensure_no_shared_connection_leak

View File

@ -115,3 +115,9 @@ test: multi_schema_support
# test that no tests leaked intermediate results. This should always be last # test that no tests leaked intermediate results. This should always be last
# ---------- # ----------
test: ensure_no_intermediate_data_leak test: ensure_no_intermediate_data_leak
# ---------
# ensures that we never leak any connection counts
# in the shared memory
# --------
test: ensure_no_shared_connection_leak

View File

@ -0,0 +1,65 @@
setup
{
CREATE OR REPLACE FUNCTION wake_up_connection_pool_waiters()
RETURNS void
LANGUAGE C STABLE STRICT
AS 'citus', $$wake_up_connection_pool_waiters$$;
CREATE OR REPLACE FUNCTION set_max_shared_pool_size(int)
RETURNS void
LANGUAGE C STABLE STRICT
AS 'citus', $$set_max_shared_pool_size$$;
CREATE TABLE test (a int, b int);
SET citus.shard_count TO 32;
SELECT create_distributed_table('test', 'a');
INSERT INTO test SELECT i, i FROM generate_series(0,100)i;
}
teardown
{
SELECT set_max_shared_pool_size(100);
DROP FUNCTION wake_up_connection_pool_waiters();
DROP FUNCTION set_max_shared_pool_size(int);
}
session "s1"
step "s1-begin"
{
BEGIN;
}
step "s1-count-slow"
{
SELECT pg_sleep(0.1), count(*) FROM test;
}
step "s1-commit"
{
COMMIT;
}
session "s2"
step "s2-select"
{
SELECT count(*) FROM test;
}
session "s3"
step "s3-lower-pool-size"
{
SELECT set_max_shared_pool_size(5);
}
step "s3-increase-pool-size"
{
SELECT set_max_shared_pool_size(100);
}
permutation "s3-lower-pool-size" "s1-begin" "s1-count-slow" "s3-increase-pool-size" "s2-select" "s1-commit"

View File

@ -0,0 +1,83 @@
-- this test file is intended to be called at the end
-- of any test schedule, ensuring that there is not
-- leak/wrong calculation of the connection stats
-- in the shared memory
CREATE SCHEMA ensure_no_shared_connection_leak;
SET search_path TO ensure_no_shared_connection_leak;
-- set the cached connections to zero
-- and execute a distributed query so that
-- we end up with zero cached connections afterwards
ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0;
SELECT pg_reload_conf();
-- disable deadlock detection and re-trigger 2PC recovery
-- once more when citus.max_cached_conns_per_worker is zero
-- so that we can be sure that the connections established for
-- maintanince daemon is closed properly.
-- this is to prevent random failures in the tests (otherwise, we
-- might see connections established for this operations)
ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1;
ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms';
SELECT pg_reload_conf();
SELECT pg_sleep(0.1);
-- now that last 2PC recovery is done, we're good to disable it
ALTER SYSTEM SET citus.recover_2pc_interval TO '-1';
SELECT pg_reload_conf();
CREATE TABLE test (a int);
SELECT create_distributed_table('test', 'a');
SELECT count(*) FROM test;
-- in case of MX, we should prevent deadlock detection and
-- 2PC recover from the workers as well
\c - - - :worker_1_port
ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0;
SELECT pg_reload_conf();
ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1;
ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms';
SELECT pg_reload_conf();
SELECT pg_sleep(0.1);
ALTER SYSTEM SET citus.recover_2pc_interval TO '-1';
SELECT pg_reload_conf();
\c - - - :worker_2_port
ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0;
SELECT pg_reload_conf();
ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1;
ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms';
SELECT pg_reload_conf();
SELECT pg_sleep(0.1);
ALTER SYSTEM SET citus.recover_2pc_interval TO '-1';
SELECT pg_reload_conf();
\c - - - :master_port
SET search_path TO ensure_no_shared_connection_leak;
-- ensure that we only have at most citus.max_cached_conns_per_worker
-- connections per node
select
(connection_count_to_node = 0) as no_connection_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY 1;
-- now, ensure this from the workers perspective
-- we should only see the connection/backend that is running the command below
SELECT
result, success
FROM
run_command_on_workers($$select count(*) from pg_stat_activity WHERE backend_type = 'client backend';$$)
ORDER BY 1, 2;
-- in case other tests relies on these setting, reset them
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
ALTER SYSTEM RESET citus.recover_2pc_interval;
ALTER SYSTEM RESET citus.max_cached_conns_per_worker;
SELECT pg_reload_conf();
DROP SCHEMA ensure_no_shared_connection_leak CASCADE;

View File

@ -10,7 +10,6 @@ select create_distributed_table('test', 'a');
-- Make sure a connection is opened and cached -- Make sure a connection is opened and cached
select count(*) from test where a = 0; select count(*) from test where a = 0;
show citus.node_conninfo; show citus.node_conninfo;
-- Set sslmode to something that does not work when connecting -- Set sslmode to something that does not work when connecting
@ -30,8 +29,8 @@ show citus.node_conninfo;
-- Should work again -- Should work again
select count(*) from test where a = 0; select count(*) from test where a = 0;
ALTER SYSTEM SET citus.node_conninfo = 'sslmode=doesnotexist'; ALTER SYSTEM SET citus.node_conninfo = 'sslmode=doesnotexist';
BEGIN; BEGIN;
-- Should still work (no SIGHUP yet); -- Should still work (no SIGHUP yet);
select count(*) from test where a = 0; select count(*) from test where a = 0;
@ -43,19 +42,15 @@ show citus.node_conninfo;
-- query -- query
select count(*) from test where a = 0; select count(*) from test where a = 0;
COMMIT; COMMIT;
-- Should fail now with connection error, when transaction is finished -- Should fail now with connection error, when transaction is finished
select count(*) from test where a = 0; select count(*) from test where a = 0;
-- Reset it again -- Reset it again
ALTER SYSTEM RESET citus.node_conninfo; ALTER SYSTEM RESET citus.node_conninfo;
select pg_reload_conf(); select pg_reload_conf();
select pg_sleep(0.1); -- wait for config reload to apply select pg_sleep(0.1); -- wait for config reload to apply
show citus.node_conninfo; show citus.node_conninfo;
-- Should work again -- Should work again
select count(*) from test where a = 0; select count(*) from test where a = 0;
ALTER SYSTEM SET citus.node_conninfo = 'sslmode=doesnotexist'; ALTER SYSTEM SET citus.node_conninfo = 'sslmode=doesnotexist';
BEGIN; BEGIN;
-- Should still work (no SIGHUP yet); -- Should still work (no SIGHUP yet);
@ -68,10 +63,8 @@ show citus.node_conninfo;
-- query -- query
select count(*) from test where a = 0; select count(*) from test where a = 0;
COMMIT; COMMIT;
-- Should fail now, when transaction is finished -- Should fail now, when transaction is finished
select count(*) from test where a = 0; select count(*) from test where a = 0;
-- Reset it again -- Reset it again
ALTER SYSTEM RESET citus.node_conninfo; ALTER SYSTEM RESET citus.node_conninfo;
select pg_reload_conf(); select pg_reload_conf();

View File

@ -0,0 +1,228 @@
CREATE SCHEMA shared_connection_stats;
SET search_path TO shared_connection_stats;
-- set the cached connections to zero
-- and execute a distributed query so that
-- we end up with zero cached connections afterwards
ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0;
SELECT pg_reload_conf();
-- disable deadlock detection and re-trigger 2PC recovery
-- once more when citus.max_cached_conns_per_worker is zero
-- so that we can be sure that the connections established for
-- maintanince daemon is closed properly.
-- this is to prevent random failures in the tests (otherwise, we
-- might see connections established for this operations)
ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1;
ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms';
SELECT pg_reload_conf();
SELECT pg_sleep(0.1);
-- now that last 2PC recovery is done, we're good to disable it
ALTER SYSTEM SET citus.recover_2pc_interval TO '1h';
SELECT pg_reload_conf();
SET citus.shard_count TO 32;
SET citus.shard_replication_factor TO 1;
CREATE TABLE test (a int);
SELECT create_distributed_table('test', 'a');
INSERT INTO test SELECT i FROM generate_series(0,100)i;
-- show that no connections are cached
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
-- single shard queries require single connection per node
BEGIN;
SELECT count(*) FROM test WHERE a = 1;
SELECT count(*) FROM test WHERE a = 2;
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
COMMIT;
-- show that no connections are cached
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
-- executor is only allowed to establish a single connection per node
BEGIN;
SET LOCAL citus.max_adaptive_executor_pool_size TO 1;
SELECT count(*) FROM test;
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
COMMIT;
-- show that no connections are cached
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
-- sequential mode is allowed to establish a single connection per node
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
SELECT count(*) FROM test;
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
COMMIT;
-- show that no connections are cached
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
-- now, decrease the shared pool size, and still force
-- one connection per placement
ALTER SYSTEM SET citus.max_shared_pool_size TO 5;
SELECT pg_reload_conf();
SELECT pg_sleep(0.1);
BEGIN;
SET LOCAL citus.node_connection_timeout TO 1000;
SET LOCAL citus.force_max_query_parallelization TO ON;
SELECT count(*) FROM test;
COMMIT;
-- pg_sleep forces almost 1 connection per placement
-- now, some of the optional connections would be skipped,
-- and only 5 connections are used per node
BEGIN;
SELECT count(*), pg_sleep(0.1) FROM test;
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
COMMIT;
SHOW citus.max_shared_pool_size;
-- by default max_shared_pool_size equals to max_connections;
ALTER SYSTEM RESET citus.max_shared_pool_size;
SELECT pg_reload_conf();
SELECT pg_sleep(0.1);
SHOW citus.max_shared_pool_size;
SHOW max_connections;
-- now, each node gets 16 connections as we force 1 connection per placement
BEGIN;
SET LOCAL citus.force_max_query_parallelization TO ON;
SELECT count(*) FROM test;
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
COMMIT;
BEGIN;
-- now allow at most 1 connection, and ensure that intermediate
-- results don't require any extra connections
SET LOCAL citus.max_adaptive_executor_pool_size TO 1;
SET LOCAL citus.task_assignment_policy TO "round-robin";
SELECT cnt FROM (SELECT count(*) as cnt, random() FROM test LIMIT 1) as foo;
-- queries with intermediate results don't use any extra connections
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
COMMIT;
-- now show that when max_cached_conns_per_worker > 1
-- Citus forces the first execution to open at least 2
-- connections that are cached. Later, that 2 cached
-- connections are user
BEGIN;
SET LOCAL citus.max_cached_conns_per_worker TO 2;
SELECT count(*) FROM test;
SELECT
connection_count_to_node >= 2
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
SELECT count(*) FROM test;
SELECT
connection_count_to_node >= 2
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
COMMIT;
-- in case other tests relies on these setting, reset them
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
ALTER SYSTEM RESET citus.recover_2pc_interval;
ALTER SYSTEM RESET citus.max_cached_conns_per_worker;
SELECT pg_reload_conf();
DROP SCHEMA shared_connection_stats CASCADE;