Add the infrastructure to count the number of client backends

Considering the adaptive connection management
improvements that we plan to roll soon, it makes it
very helpful to know the number of active client
backends.

We are doing this addition to simplify yhe adaptive connection
management for single node Citus. In single node Citus, both the
client backends and Citus parallel queries would compete to get
slots on Postgres' `max_connections` on the same Citus database.

With adaptive connection management, we have the counters for
Citus parallel queries. That helps us to adaptively decide
on the remote executions pool size (e.g., throttle connections
if necessary).

However, we do not have any counters for the total number of
client backends on the database. For single node Citus, we
should consider all the client backends, not only the remote
connections that Citus does.

Of course Postgres internally knows how many client
backends are active. However, to get that number Postgres
iterates over all the backends. For examaple, see [pg_stat_get_db_numbackends](8e90ec5580/src/backend/utils/adt/pgstatfuncs.c (L1240))
where Postgres iterates over all the backends.

For our purpuses, we need this information on every connection
establishment. That's why we cannot affort to do this kind of
iterattion.
pull/4312/head
Onder Kalaci 2020-11-12 09:15:21 +01:00
parent 180195b445
commit 629ecc3dee
6 changed files with 316 additions and 1 deletions

View File

@ -74,6 +74,7 @@
#include "distributed/worker_protocol.h"
#include "distributed/worker_shard_visibility.h"
#include "distributed/adaptive_executor.h"
#include "libpq/auth.h"
#include "port/atomics.h"
#include "postmaster/postmaster.h"
#include "storage/ipc.h"
@ -100,7 +101,9 @@ static void DoInitialCleanup(void);
static void ResizeStackToMaximumDepth(void);
static void multi_log_hook(ErrorData *edata);
static void RegisterConnectionCleanup(void);
static void RegisterClientBackendCounterDecrement(void);
static void CitusCleanupConnectionsAtExit(int code, Datum arg);
static void DecrementClientBackendCounterAtExit(int code, Datum arg);
static void CreateRequiredDirectories(void);
static void RegisterCitusConfigVariables(void);
static bool ErrorIfNotASuitableDeadlockFactor(double *newval, void **extra,
@ -112,11 +115,15 @@ static void NodeConninfoGucAssignHook(const char *newval, void *extra);
static const char * MaxSharedPoolSizeGucShowHook(void);
static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource
source);
static void CitusAuthHook(Port *port, int status);
/* static variable to hold value of deprecated GUC variable */
static bool DeprecatedBool = false;
static int DeprecatedInt = 0;
static ClientAuthentication_hook_type original_client_auth_hook = NULL;
/* *INDENT-OFF* */
/* GUC enum definitions */
@ -286,6 +293,14 @@ _PG_init(void)
/* register hook for error messages */
emit_log_hook = multi_log_hook;
/*
* Register hook for counting client backends that
* are successfully authenticated.
*/
original_client_auth_hook = ClientAuthentication_hook;
ClientAuthentication_hook = CitusAuthHook;
InitializeMaintenanceDaemon();
/* initialize coordinated transaction management */
@ -447,6 +462,23 @@ RegisterConnectionCleanup(void)
}
/*
* RegisterClientBackendCounterDecrement is called when the backend terminates.
* For all client backends, we register a callback that will undo
*/
static void
RegisterClientBackendCounterDecrement(void)
{
static bool registeredCleanup = false;
if (registeredCleanup == false)
{
before_shmem_exit(DecrementClientBackendCounterAtExit, 0);
registeredCleanup = true;
}
}
/*
* CitusCleanupConnectionsAtExit is called before_shmem_exit() of the
* backend for the purposes of any clean-up needed.
@ -466,6 +498,17 @@ CitusCleanupConnectionsAtExit(int code, Datum arg)
}
/*
* DecrementClientBackendCounterAtExit is called before_shmem_exit() of the
* backend for the purposes decrementing
*/
static void
DecrementClientBackendCounterAtExit(int code, Datum arg)
{
DecrementClientBackendCounter();
}
/*
* CreateRequiredDirectories - Create directories required for Citus to
* function.
@ -1741,3 +1784,21 @@ StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource source)
}
#endif
}
/*
* CitusAuthHook is a callback for client authentication that Postgres provides.
* Citus uses this hook to count the number of active backends.
*/
static void
CitusAuthHook(Port *port, int status)
{
/* let other authentication hooks to kick in first */
if (original_client_auth_hook)
{
original_client_auth_hook(port, status);
}
RegisterClientBackendCounterDecrement();
IncrementClientBackendCounter();
}

View File

@ -0,0 +1,31 @@
/*-------------------------------------------------------------------------
*
* test/src/backend_counter.c
*
* This file contains functions to test the active backend counter
* within Citus.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "fmgr.h"
#include "distributed/backend_data.h"
/* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(get_all_active_client_backend_count);
/*
* get_all_active_client_backend_count returns the active
* client backend count.
*/
Datum
get_all_active_client_backend_count(PG_FUNCTION_ARGS)
{
PG_RETURN_UINT32(GetAllActiveClientBackendCount());
}

View File

@ -27,6 +27,7 @@
#include "distributed/lock_graph.h"
#include "distributed/metadata_cache.h"
#include "distributed/remote_commands.h"
#include "distributed/shared_connection_stats.h"
#include "distributed/transaction_identifier.h"
#include "distributed/tuplestore.h"
#include "nodes/execnodes.h"
@ -63,6 +64,16 @@ typedef struct BackendManagementShmemData
*/
pg_atomic_uint64 nextTransactionNumber;
/*
* Total number of client backends that are authenticated.
* We only care about activeClientBackendCounter when adaptive
* connection management is enabled, otherwise always zero.
*
* Note that the counter does not consider any background workers
* or such, it only counts client_backends.
*/
pg_atomic_uint32 activeClientBackendCounter;
BackendData backends[FLEXIBLE_ARRAY_MEMBER];
} BackendManagementShmemData;
@ -496,6 +507,9 @@ BackendManagementShmemInit(void)
/* start the distributed transaction ids from 1 */
pg_atomic_init_u64(&backendManagementShmemData->nextTransactionNumber, 1);
/* there are no active backends yet, so start with zero */
pg_atomic_init_u32(&backendManagementShmemData->activeClientBackendCounter, 0);
/*
* We need to init per backend's spinlock before any backend
* starts its execution. Note that we initialize TotalProcs (e.g., not
@ -945,3 +959,39 @@ GetMyProcLocalTransactionId(void)
{
return MyProc->lxid;
}
/*
* GetAllActiveClientBackendCount returns activeClientBackendCounter in
* the shared memory.
*/
int
GetAllActiveClientBackendCount(void)
{
uint32 activeBackendCount =
pg_atomic_read_u32(&backendManagementShmemData->activeClientBackendCounter);
return activeBackendCount;
}
/*
* IncrementClientBackendCounter increments activeClientBackendCounter in
* the shared memory by one.
*/
void
IncrementClientBackendCounter(void)
{
pg_atomic_add_fetch_u32(&backendManagementShmemData->activeClientBackendCounter, 1);
}
/*
* DecrementClientBackendCounter decrements activeClientBackendCounter in
* the shared memory by one.
*/
void
DecrementClientBackendCounter(void)
{
pg_atomic_sub_fetch_u32(&backendManagementShmemData->activeClientBackendCounter, 1);
}

View File

@ -67,6 +67,9 @@ extern void GetBackendDataForProc(PGPROC *proc, BackendData *result);
extern void CancelTransactionDueToDeadlock(PGPROC *proc);
extern bool MyBackendGotCancelledDueToDeadlock(bool clearState);
extern List * ActiveDistributedTransactionNumbers(void);
LocalTransactionId GetMyProcLocalTransactionId(void);
extern LocalTransactionId GetMyProcLocalTransactionId(void);
extern int GetAllActiveClientBackendCount(void);
extern void IncrementClientBackendCounter(void);
extern void DecrementClientBackendCounter(void);
#endif /* BACKEND_DATA_H */

View File

@ -965,6 +965,113 @@ DEBUG: not pushing down function to the same node
SET client_min_messages TO WARNING;
DROP TABLE test CASCADE;
CREATE OR REPLACE FUNCTION pg_catalog.get_all_active_client_backend_count()
RETURNS bigint
LANGUAGE C STRICT
AS 'citus', $$get_all_active_client_backend_count$$;
-- set the cached connections to zero
-- and execute a distributed query so that
-- we end up with zero cached connections afterwards
ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
-- disable deadlock detection and re-trigger 2PC recovery
-- once more when citus.max_cached_conns_per_worker is zero
-- so that we can be sure that the connections established for
-- maintanince daemon is closed properly.
-- this is to prevent random failures in the tests (otherwise, we
-- might see connections established for this operations)
ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1;
ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms';
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT pg_sleep(0.1);
pg_sleep
---------------------------------------------------------------------
(1 row)
-- now that last 2PC recovery is done, we're good to disable it
ALTER SYSTEM SET citus.recover_2pc_interval TO '-1';
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
\c - - - :master_port
-- sometimes Postgres is a little slow to terminate the backends
-- even if PGFinish is sent. So, to prevent any flaky tests, sleep
SELECT pg_sleep(0.1);
pg_sleep
---------------------------------------------------------------------
(1 row)
-- since max_cached_conns_per_worker == 0 at this point, the
-- backend(s) that execute on the shards will be terminated
-- so show that there is only a single client backend,
-- which is actually the backend that executes here
SET search_path TO single_node;
SELECT count(*) from should_commit;
count
---------------------------------------------------------------------
0
(1 row)
SELECT pg_catalog.get_all_active_client_backend_count();
get_all_active_client_backend_count
---------------------------------------------------------------------
1
(1 row)
BEGIN;
SET citus.shard_count TO 32;
SET citus.force_max_query_parallelization TO ON;
SET citus.enable_local_execution TO false;
CREATE TABLE test (a int);
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('test', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT count(*) FROM test;
count
---------------------------------------------------------------------
0
(1 row)
-- now, we should have additional 32 connections
SELECT pg_catalog.get_all_active_client_backend_count();
get_all_active_client_backend_count
---------------------------------------------------------------------
33
(1 row)
ROLLBACK;
-- set the values to originals back
ALTER SYSTEM RESET citus.max_cached_conns_per_worker;
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
ALTER SYSTEM RESET citus.recover_2pc_interval;
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
-- suppress notices
SET client_min_messages TO error;
-- cannot remove coordinator since a reference table exists on coordinator and no other worker nodes are added
SELECT 1 FROM master_remove_node('localhost', :master_port);
ERROR: cannot remove the last worker node because there are reference tables and it would cause data loss on reference tables

View File

@ -512,6 +512,69 @@ SELECT function_delegation(1);
SET client_min_messages TO WARNING;
DROP TABLE test CASCADE;
CREATE OR REPLACE FUNCTION pg_catalog.get_all_active_client_backend_count()
RETURNS bigint
LANGUAGE C STRICT
AS 'citus', $$get_all_active_client_backend_count$$;
-- set the cached connections to zero
-- and execute a distributed query so that
-- we end up with zero cached connections afterwards
ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0;
SELECT pg_reload_conf();
-- disable deadlock detection and re-trigger 2PC recovery
-- once more when citus.max_cached_conns_per_worker is zero
-- so that we can be sure that the connections established for
-- maintanince daemon is closed properly.
-- this is to prevent random failures in the tests (otherwise, we
-- might see connections established for this operations)
ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1;
ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms';
SELECT pg_reload_conf();
SELECT pg_sleep(0.1);
-- now that last 2PC recovery is done, we're good to disable it
ALTER SYSTEM SET citus.recover_2pc_interval TO '-1';
SELECT pg_reload_conf();
\c - - - :master_port
-- sometimes Postgres is a little slow to terminate the backends
-- even if PGFinish is sent. So, to prevent any flaky tests, sleep
SELECT pg_sleep(0.1);
-- since max_cached_conns_per_worker == 0 at this point, the
-- backend(s) that execute on the shards will be terminated
-- so show that there is only a single client backend,
-- which is actually the backend that executes here
SET search_path TO single_node;
SELECT count(*) from should_commit;
SELECT pg_catalog.get_all_active_client_backend_count();
BEGIN;
SET citus.shard_count TO 32;
SET citus.force_max_query_parallelization TO ON;
SET citus.enable_local_execution TO false;
CREATE TABLE test (a int);
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('test', 'a');
SELECT count(*) FROM test;
-- now, we should have additional 32 connections
SELECT pg_catalog.get_all_active_client_backend_count();
ROLLBACK;
-- set the values to originals back
ALTER SYSTEM RESET citus.max_cached_conns_per_worker;
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
ALTER SYSTEM RESET citus.recover_2pc_interval;
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
SELECT pg_reload_conf();
-- suppress notices
SET client_min_messages TO error;
-- cannot remove coordinator since a reference table exists on coordinator and no other worker nodes are added
SELECT 1 FROM master_remove_node('localhost', :master_port);