From 629ecc3dee7cdfd658313536dd8f68769e722d4b Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Thu, 12 Nov 2020 09:15:21 +0100 Subject: [PATCH] Add the infrastructure to count the number of client backends Considering the adaptive connection management improvements that we plan to roll soon, it makes it very helpful to know the number of active client backends. We are doing this addition to simplify yhe adaptive connection management for single node Citus. In single node Citus, both the client backends and Citus parallel queries would compete to get slots on Postgres' `max_connections` on the same Citus database. With adaptive connection management, we have the counters for Citus parallel queries. That helps us to adaptively decide on the remote executions pool size (e.g., throttle connections if necessary). However, we do not have any counters for the total number of client backends on the database. For single node Citus, we should consider all the client backends, not only the remote connections that Citus does. Of course Postgres internally knows how many client backends are active. However, to get that number Postgres iterates over all the backends. For examaple, see [pg_stat_get_db_numbackends](https://github.com/postgres/postgres/blob/8e90ec5580d5345fef31005d7cc2215ba2125070/src/backend/utils/adt/pgstatfuncs.c#L1240) where Postgres iterates over all the backends. For our purpuses, we need this information on every connection establishment. That's why we cannot affort to do this kind of iterattion. --- src/backend/distributed/shared_library_init.c | 61 ++++++++++ .../distributed/test/backend_counter.c | 31 +++++ .../distributed/transaction/backend_data.c | 50 ++++++++ src/include/distributed/backend_data.h | 5 +- src/test/regress/expected/single_node.out | 107 ++++++++++++++++++ src/test/regress/sql/single_node.sql | 63 +++++++++++ 6 files changed, 316 insertions(+), 1 deletion(-) create mode 100644 src/backend/distributed/test/backend_counter.c diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index bd25d14ef..d51de7202 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -74,6 +74,7 @@ #include "distributed/worker_protocol.h" #include "distributed/worker_shard_visibility.h" #include "distributed/adaptive_executor.h" +#include "libpq/auth.h" #include "port/atomics.h" #include "postmaster/postmaster.h" #include "storage/ipc.h" @@ -100,7 +101,9 @@ static void DoInitialCleanup(void); static void ResizeStackToMaximumDepth(void); static void multi_log_hook(ErrorData *edata); static void RegisterConnectionCleanup(void); +static void RegisterClientBackendCounterDecrement(void); static void CitusCleanupConnectionsAtExit(int code, Datum arg); +static void DecrementClientBackendCounterAtExit(int code, Datum arg); static void CreateRequiredDirectories(void); static void RegisterCitusConfigVariables(void); static bool ErrorIfNotASuitableDeadlockFactor(double *newval, void **extra, @@ -112,11 +115,15 @@ static void NodeConninfoGucAssignHook(const char *newval, void *extra); static const char * MaxSharedPoolSizeGucShowHook(void); static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource source); +static void CitusAuthHook(Port *port, int status); + /* static variable to hold value of deprecated GUC variable */ static bool DeprecatedBool = false; static int DeprecatedInt = 0; +static ClientAuthentication_hook_type original_client_auth_hook = NULL; + /* *INDENT-OFF* */ /* GUC enum definitions */ @@ -286,6 +293,14 @@ _PG_init(void) /* register hook for error messages */ emit_log_hook = multi_log_hook; + + /* + * Register hook for counting client backends that + * are successfully authenticated. + */ + original_client_auth_hook = ClientAuthentication_hook; + ClientAuthentication_hook = CitusAuthHook; + InitializeMaintenanceDaemon(); /* initialize coordinated transaction management */ @@ -447,6 +462,23 @@ RegisterConnectionCleanup(void) } +/* + * RegisterClientBackendCounterDecrement is called when the backend terminates. + * For all client backends, we register a callback that will undo + */ +static void +RegisterClientBackendCounterDecrement(void) +{ + static bool registeredCleanup = false; + if (registeredCleanup == false) + { + before_shmem_exit(DecrementClientBackendCounterAtExit, 0); + + registeredCleanup = true; + } +} + + /* * CitusCleanupConnectionsAtExit is called before_shmem_exit() of the * backend for the purposes of any clean-up needed. @@ -466,6 +498,17 @@ CitusCleanupConnectionsAtExit(int code, Datum arg) } +/* + * DecrementClientBackendCounterAtExit is called before_shmem_exit() of the + * backend for the purposes decrementing + */ +static void +DecrementClientBackendCounterAtExit(int code, Datum arg) +{ + DecrementClientBackendCounter(); +} + + /* * CreateRequiredDirectories - Create directories required for Citus to * function. @@ -1741,3 +1784,21 @@ StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource source) } #endif } + + +/* + * CitusAuthHook is a callback for client authentication that Postgres provides. + * Citus uses this hook to count the number of active backends. + */ +static void +CitusAuthHook(Port *port, int status) +{ + /* let other authentication hooks to kick in first */ + if (original_client_auth_hook) + { + original_client_auth_hook(port, status); + } + + RegisterClientBackendCounterDecrement(); + IncrementClientBackendCounter(); +} diff --git a/src/backend/distributed/test/backend_counter.c b/src/backend/distributed/test/backend_counter.c new file mode 100644 index 000000000..c63a45543 --- /dev/null +++ b/src/backend/distributed/test/backend_counter.c @@ -0,0 +1,31 @@ +/*------------------------------------------------------------------------- + * + * test/src/backend_counter.c + * + * This file contains functions to test the active backend counter + * within Citus. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "fmgr.h" + +#include "distributed/backend_data.h" + + +/* declarations for dynamic loading */ +PG_FUNCTION_INFO_V1(get_all_active_client_backend_count); + + +/* + * get_all_active_client_backend_count returns the active + * client backend count. + */ +Datum +get_all_active_client_backend_count(PG_FUNCTION_ARGS) +{ + PG_RETURN_UINT32(GetAllActiveClientBackendCount()); +} diff --git a/src/backend/distributed/transaction/backend_data.c b/src/backend/distributed/transaction/backend_data.c index e40fdf999..88dbff271 100644 --- a/src/backend/distributed/transaction/backend_data.c +++ b/src/backend/distributed/transaction/backend_data.c @@ -27,6 +27,7 @@ #include "distributed/lock_graph.h" #include "distributed/metadata_cache.h" #include "distributed/remote_commands.h" +#include "distributed/shared_connection_stats.h" #include "distributed/transaction_identifier.h" #include "distributed/tuplestore.h" #include "nodes/execnodes.h" @@ -63,6 +64,16 @@ typedef struct BackendManagementShmemData */ pg_atomic_uint64 nextTransactionNumber; + /* + * Total number of client backends that are authenticated. + * We only care about activeClientBackendCounter when adaptive + * connection management is enabled, otherwise always zero. + * + * Note that the counter does not consider any background workers + * or such, it only counts client_backends. + */ + pg_atomic_uint32 activeClientBackendCounter; + BackendData backends[FLEXIBLE_ARRAY_MEMBER]; } BackendManagementShmemData; @@ -496,6 +507,9 @@ BackendManagementShmemInit(void) /* start the distributed transaction ids from 1 */ pg_atomic_init_u64(&backendManagementShmemData->nextTransactionNumber, 1); + /* there are no active backends yet, so start with zero */ + pg_atomic_init_u32(&backendManagementShmemData->activeClientBackendCounter, 0); + /* * We need to init per backend's spinlock before any backend * starts its execution. Note that we initialize TotalProcs (e.g., not @@ -945,3 +959,39 @@ GetMyProcLocalTransactionId(void) { return MyProc->lxid; } + + +/* + * GetAllActiveClientBackendCount returns activeClientBackendCounter in + * the shared memory. + */ +int +GetAllActiveClientBackendCount(void) +{ + uint32 activeBackendCount = + pg_atomic_read_u32(&backendManagementShmemData->activeClientBackendCounter); + + return activeBackendCount; +} + + +/* + * IncrementClientBackendCounter increments activeClientBackendCounter in + * the shared memory by one. + */ +void +IncrementClientBackendCounter(void) +{ + pg_atomic_add_fetch_u32(&backendManagementShmemData->activeClientBackendCounter, 1); +} + + +/* + * DecrementClientBackendCounter decrements activeClientBackendCounter in + * the shared memory by one. + */ +void +DecrementClientBackendCounter(void) +{ + pg_atomic_sub_fetch_u32(&backendManagementShmemData->activeClientBackendCounter, 1); +} diff --git a/src/include/distributed/backend_data.h b/src/include/distributed/backend_data.h index ff2f926f7..3ab12d234 100644 --- a/src/include/distributed/backend_data.h +++ b/src/include/distributed/backend_data.h @@ -67,6 +67,9 @@ extern void GetBackendDataForProc(PGPROC *proc, BackendData *result); extern void CancelTransactionDueToDeadlock(PGPROC *proc); extern bool MyBackendGotCancelledDueToDeadlock(bool clearState); extern List * ActiveDistributedTransactionNumbers(void); -LocalTransactionId GetMyProcLocalTransactionId(void); +extern LocalTransactionId GetMyProcLocalTransactionId(void); +extern int GetAllActiveClientBackendCount(void); +extern void IncrementClientBackendCounter(void); +extern void DecrementClientBackendCounter(void); #endif /* BACKEND_DATA_H */ diff --git a/src/test/regress/expected/single_node.out b/src/test/regress/expected/single_node.out index 0f68947c0..5edbdbc17 100644 --- a/src/test/regress/expected/single_node.out +++ b/src/test/regress/expected/single_node.out @@ -965,6 +965,113 @@ DEBUG: not pushing down function to the same node SET client_min_messages TO WARNING; DROP TABLE test CASCADE; +CREATE OR REPLACE FUNCTION pg_catalog.get_all_active_client_backend_count() + RETURNS bigint + LANGUAGE C STRICT + AS 'citus', $$get_all_active_client_backend_count$$; +-- set the cached connections to zero +-- and execute a distributed query so that +-- we end up with zero cached connections afterwards +ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +-- disable deadlock detection and re-trigger 2PC recovery +-- once more when citus.max_cached_conns_per_worker is zero +-- so that we can be sure that the connections established for +-- maintanince daemon is closed properly. +-- this is to prevent random failures in the tests (otherwise, we +-- might see connections established for this operations) +ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1; +ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms'; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SELECT pg_sleep(0.1); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +-- now that last 2PC recovery is done, we're good to disable it +ALTER SYSTEM SET citus.recover_2pc_interval TO '-1'; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +\c - - - :master_port +-- sometimes Postgres is a little slow to terminate the backends +-- even if PGFinish is sent. So, to prevent any flaky tests, sleep +SELECT pg_sleep(0.1); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +-- since max_cached_conns_per_worker == 0 at this point, the +-- backend(s) that execute on the shards will be terminated +-- so show that there is only a single client backend, +-- which is actually the backend that executes here +SET search_path TO single_node; +SELECT count(*) from should_commit; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT pg_catalog.get_all_active_client_backend_count(); + get_all_active_client_backend_count +--------------------------------------------------------------------- + 1 +(1 row) + +BEGIN; + SET citus.shard_count TO 32; + SET citus.force_max_query_parallelization TO ON; + SET citus.enable_local_execution TO false; + CREATE TABLE test (a int); + SET citus.shard_replication_factor TO 1; + SELECT create_distributed_table('test', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + + SELECT count(*) FROM test; + count +--------------------------------------------------------------------- + 0 +(1 row) + + -- now, we should have additional 32 connections + SELECT pg_catalog.get_all_active_client_backend_count(); + get_all_active_client_backend_count +--------------------------------------------------------------------- + 33 +(1 row) + +ROLLBACK; +-- set the values to originals back +ALTER SYSTEM RESET citus.max_cached_conns_per_worker; +ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; +ALTER SYSTEM RESET citus.recover_2pc_interval; +ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +-- suppress notices +SET client_min_messages TO error; -- cannot remove coordinator since a reference table exists on coordinator and no other worker nodes are added SELECT 1 FROM master_remove_node('localhost', :master_port); ERROR: cannot remove the last worker node because there are reference tables and it would cause data loss on reference tables diff --git a/src/test/regress/sql/single_node.sql b/src/test/regress/sql/single_node.sql index e977bf06c..1edea90ba 100644 --- a/src/test/regress/sql/single_node.sql +++ b/src/test/regress/sql/single_node.sql @@ -512,6 +512,69 @@ SELECT function_delegation(1); SET client_min_messages TO WARNING; DROP TABLE test CASCADE; + +CREATE OR REPLACE FUNCTION pg_catalog.get_all_active_client_backend_count() + RETURNS bigint + LANGUAGE C STRICT + AS 'citus', $$get_all_active_client_backend_count$$; + +-- set the cached connections to zero +-- and execute a distributed query so that +-- we end up with zero cached connections afterwards +ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0; +SELECT pg_reload_conf(); + +-- disable deadlock detection and re-trigger 2PC recovery +-- once more when citus.max_cached_conns_per_worker is zero +-- so that we can be sure that the connections established for +-- maintanince daemon is closed properly. +-- this is to prevent random failures in the tests (otherwise, we +-- might see connections established for this operations) +ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1; +ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms'; +SELECT pg_reload_conf(); +SELECT pg_sleep(0.1); + +-- now that last 2PC recovery is done, we're good to disable it +ALTER SYSTEM SET citus.recover_2pc_interval TO '-1'; +SELECT pg_reload_conf(); + + +\c - - - :master_port +-- sometimes Postgres is a little slow to terminate the backends +-- even if PGFinish is sent. So, to prevent any flaky tests, sleep +SELECT pg_sleep(0.1); +-- since max_cached_conns_per_worker == 0 at this point, the +-- backend(s) that execute on the shards will be terminated +-- so show that there is only a single client backend, +-- which is actually the backend that executes here +SET search_path TO single_node; +SELECT count(*) from should_commit; +SELECT pg_catalog.get_all_active_client_backend_count(); +BEGIN; + SET citus.shard_count TO 32; + SET citus.force_max_query_parallelization TO ON; + SET citus.enable_local_execution TO false; + + CREATE TABLE test (a int); + SET citus.shard_replication_factor TO 1; + SELECT create_distributed_table('test', 'a'); + SELECT count(*) FROM test; + + -- now, we should have additional 32 connections + SELECT pg_catalog.get_all_active_client_backend_count(); +ROLLBACK; + +-- set the values to originals back +ALTER SYSTEM RESET citus.max_cached_conns_per_worker; +ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; +ALTER SYSTEM RESET citus.recover_2pc_interval; +ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; +SELECT pg_reload_conf(); + +-- suppress notices +SET client_min_messages TO error; + -- cannot remove coordinator since a reference table exists on coordinator and no other worker nodes are added SELECT 1 FROM master_remove_node('localhost', :master_port);