diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index bd25d14ef..d51de7202 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -74,6 +74,7 @@ #include "distributed/worker_protocol.h" #include "distributed/worker_shard_visibility.h" #include "distributed/adaptive_executor.h" +#include "libpq/auth.h" #include "port/atomics.h" #include "postmaster/postmaster.h" #include "storage/ipc.h" @@ -100,7 +101,9 @@ static void DoInitialCleanup(void); static void ResizeStackToMaximumDepth(void); static void multi_log_hook(ErrorData *edata); static void RegisterConnectionCleanup(void); +static void RegisterClientBackendCounterDecrement(void); static void CitusCleanupConnectionsAtExit(int code, Datum arg); +static void DecrementClientBackendCounterAtExit(int code, Datum arg); static void CreateRequiredDirectories(void); static void RegisterCitusConfigVariables(void); static bool ErrorIfNotASuitableDeadlockFactor(double *newval, void **extra, @@ -112,11 +115,15 @@ static void NodeConninfoGucAssignHook(const char *newval, void *extra); static const char * MaxSharedPoolSizeGucShowHook(void); static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource source); +static void CitusAuthHook(Port *port, int status); + /* static variable to hold value of deprecated GUC variable */ static bool DeprecatedBool = false; static int DeprecatedInt = 0; +static ClientAuthentication_hook_type original_client_auth_hook = NULL; + /* *INDENT-OFF* */ /* GUC enum definitions */ @@ -286,6 +293,14 @@ _PG_init(void) /* register hook for error messages */ emit_log_hook = multi_log_hook; + + /* + * Register hook for counting client backends that + * are successfully authenticated. + */ + original_client_auth_hook = ClientAuthentication_hook; + ClientAuthentication_hook = CitusAuthHook; + InitializeMaintenanceDaemon(); /* initialize coordinated transaction management */ @@ -447,6 +462,23 @@ RegisterConnectionCleanup(void) } +/* + * RegisterClientBackendCounterDecrement is called when the backend terminates. + * For all client backends, we register a callback that will undo + */ +static void +RegisterClientBackendCounterDecrement(void) +{ + static bool registeredCleanup = false; + if (registeredCleanup == false) + { + before_shmem_exit(DecrementClientBackendCounterAtExit, 0); + + registeredCleanup = true; + } +} + + /* * CitusCleanupConnectionsAtExit is called before_shmem_exit() of the * backend for the purposes of any clean-up needed. @@ -466,6 +498,17 @@ CitusCleanupConnectionsAtExit(int code, Datum arg) } +/* + * DecrementClientBackendCounterAtExit is called before_shmem_exit() of the + * backend for the purposes decrementing + */ +static void +DecrementClientBackendCounterAtExit(int code, Datum arg) +{ + DecrementClientBackendCounter(); +} + + /* * CreateRequiredDirectories - Create directories required for Citus to * function. @@ -1741,3 +1784,21 @@ StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource source) } #endif } + + +/* + * CitusAuthHook is a callback for client authentication that Postgres provides. + * Citus uses this hook to count the number of active backends. + */ +static void +CitusAuthHook(Port *port, int status) +{ + /* let other authentication hooks to kick in first */ + if (original_client_auth_hook) + { + original_client_auth_hook(port, status); + } + + RegisterClientBackendCounterDecrement(); + IncrementClientBackendCounter(); +} diff --git a/src/backend/distributed/test/backend_counter.c b/src/backend/distributed/test/backend_counter.c new file mode 100644 index 000000000..c63a45543 --- /dev/null +++ b/src/backend/distributed/test/backend_counter.c @@ -0,0 +1,31 @@ +/*------------------------------------------------------------------------- + * + * test/src/backend_counter.c + * + * This file contains functions to test the active backend counter + * within Citus. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "fmgr.h" + +#include "distributed/backend_data.h" + + +/* declarations for dynamic loading */ +PG_FUNCTION_INFO_V1(get_all_active_client_backend_count); + + +/* + * get_all_active_client_backend_count returns the active + * client backend count. + */ +Datum +get_all_active_client_backend_count(PG_FUNCTION_ARGS) +{ + PG_RETURN_UINT32(GetAllActiveClientBackendCount()); +} diff --git a/src/backend/distributed/transaction/backend_data.c b/src/backend/distributed/transaction/backend_data.c index e40fdf999..88dbff271 100644 --- a/src/backend/distributed/transaction/backend_data.c +++ b/src/backend/distributed/transaction/backend_data.c @@ -27,6 +27,7 @@ #include "distributed/lock_graph.h" #include "distributed/metadata_cache.h" #include "distributed/remote_commands.h" +#include "distributed/shared_connection_stats.h" #include "distributed/transaction_identifier.h" #include "distributed/tuplestore.h" #include "nodes/execnodes.h" @@ -63,6 +64,16 @@ typedef struct BackendManagementShmemData */ pg_atomic_uint64 nextTransactionNumber; + /* + * Total number of client backends that are authenticated. + * We only care about activeClientBackendCounter when adaptive + * connection management is enabled, otherwise always zero. + * + * Note that the counter does not consider any background workers + * or such, it only counts client_backends. + */ + pg_atomic_uint32 activeClientBackendCounter; + BackendData backends[FLEXIBLE_ARRAY_MEMBER]; } BackendManagementShmemData; @@ -496,6 +507,9 @@ BackendManagementShmemInit(void) /* start the distributed transaction ids from 1 */ pg_atomic_init_u64(&backendManagementShmemData->nextTransactionNumber, 1); + /* there are no active backends yet, so start with zero */ + pg_atomic_init_u32(&backendManagementShmemData->activeClientBackendCounter, 0); + /* * We need to init per backend's spinlock before any backend * starts its execution. Note that we initialize TotalProcs (e.g., not @@ -945,3 +959,39 @@ GetMyProcLocalTransactionId(void) { return MyProc->lxid; } + + +/* + * GetAllActiveClientBackendCount returns activeClientBackendCounter in + * the shared memory. + */ +int +GetAllActiveClientBackendCount(void) +{ + uint32 activeBackendCount = + pg_atomic_read_u32(&backendManagementShmemData->activeClientBackendCounter); + + return activeBackendCount; +} + + +/* + * IncrementClientBackendCounter increments activeClientBackendCounter in + * the shared memory by one. + */ +void +IncrementClientBackendCounter(void) +{ + pg_atomic_add_fetch_u32(&backendManagementShmemData->activeClientBackendCounter, 1); +} + + +/* + * DecrementClientBackendCounter decrements activeClientBackendCounter in + * the shared memory by one. + */ +void +DecrementClientBackendCounter(void) +{ + pg_atomic_sub_fetch_u32(&backendManagementShmemData->activeClientBackendCounter, 1); +} diff --git a/src/include/distributed/backend_data.h b/src/include/distributed/backend_data.h index ff2f926f7..3ab12d234 100644 --- a/src/include/distributed/backend_data.h +++ b/src/include/distributed/backend_data.h @@ -67,6 +67,9 @@ extern void GetBackendDataForProc(PGPROC *proc, BackendData *result); extern void CancelTransactionDueToDeadlock(PGPROC *proc); extern bool MyBackendGotCancelledDueToDeadlock(bool clearState); extern List * ActiveDistributedTransactionNumbers(void); -LocalTransactionId GetMyProcLocalTransactionId(void); +extern LocalTransactionId GetMyProcLocalTransactionId(void); +extern int GetAllActiveClientBackendCount(void); +extern void IncrementClientBackendCounter(void); +extern void DecrementClientBackendCounter(void); #endif /* BACKEND_DATA_H */ diff --git a/src/test/regress/expected/single_node.out b/src/test/regress/expected/single_node.out index 0f68947c0..5edbdbc17 100644 --- a/src/test/regress/expected/single_node.out +++ b/src/test/regress/expected/single_node.out @@ -965,6 +965,113 @@ DEBUG: not pushing down function to the same node SET client_min_messages TO WARNING; DROP TABLE test CASCADE; +CREATE OR REPLACE FUNCTION pg_catalog.get_all_active_client_backend_count() + RETURNS bigint + LANGUAGE C STRICT + AS 'citus', $$get_all_active_client_backend_count$$; +-- set the cached connections to zero +-- and execute a distributed query so that +-- we end up with zero cached connections afterwards +ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +-- disable deadlock detection and re-trigger 2PC recovery +-- once more when citus.max_cached_conns_per_worker is zero +-- so that we can be sure that the connections established for +-- maintanince daemon is closed properly. +-- this is to prevent random failures in the tests (otherwise, we +-- might see connections established for this operations) +ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1; +ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms'; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SELECT pg_sleep(0.1); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +-- now that last 2PC recovery is done, we're good to disable it +ALTER SYSTEM SET citus.recover_2pc_interval TO '-1'; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +\c - - - :master_port +-- sometimes Postgres is a little slow to terminate the backends +-- even if PGFinish is sent. So, to prevent any flaky tests, sleep +SELECT pg_sleep(0.1); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +-- since max_cached_conns_per_worker == 0 at this point, the +-- backend(s) that execute on the shards will be terminated +-- so show that there is only a single client backend, +-- which is actually the backend that executes here +SET search_path TO single_node; +SELECT count(*) from should_commit; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT pg_catalog.get_all_active_client_backend_count(); + get_all_active_client_backend_count +--------------------------------------------------------------------- + 1 +(1 row) + +BEGIN; + SET citus.shard_count TO 32; + SET citus.force_max_query_parallelization TO ON; + SET citus.enable_local_execution TO false; + CREATE TABLE test (a int); + SET citus.shard_replication_factor TO 1; + SELECT create_distributed_table('test', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + + SELECT count(*) FROM test; + count +--------------------------------------------------------------------- + 0 +(1 row) + + -- now, we should have additional 32 connections + SELECT pg_catalog.get_all_active_client_backend_count(); + get_all_active_client_backend_count +--------------------------------------------------------------------- + 33 +(1 row) + +ROLLBACK; +-- set the values to originals back +ALTER SYSTEM RESET citus.max_cached_conns_per_worker; +ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; +ALTER SYSTEM RESET citus.recover_2pc_interval; +ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +-- suppress notices +SET client_min_messages TO error; -- cannot remove coordinator since a reference table exists on coordinator and no other worker nodes are added SELECT 1 FROM master_remove_node('localhost', :master_port); ERROR: cannot remove the last worker node because there are reference tables and it would cause data loss on reference tables diff --git a/src/test/regress/sql/single_node.sql b/src/test/regress/sql/single_node.sql index e977bf06c..1edea90ba 100644 --- a/src/test/regress/sql/single_node.sql +++ b/src/test/regress/sql/single_node.sql @@ -512,6 +512,69 @@ SELECT function_delegation(1); SET client_min_messages TO WARNING; DROP TABLE test CASCADE; + +CREATE OR REPLACE FUNCTION pg_catalog.get_all_active_client_backend_count() + RETURNS bigint + LANGUAGE C STRICT + AS 'citus', $$get_all_active_client_backend_count$$; + +-- set the cached connections to zero +-- and execute a distributed query so that +-- we end up with zero cached connections afterwards +ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0; +SELECT pg_reload_conf(); + +-- disable deadlock detection and re-trigger 2PC recovery +-- once more when citus.max_cached_conns_per_worker is zero +-- so that we can be sure that the connections established for +-- maintanince daemon is closed properly. +-- this is to prevent random failures in the tests (otherwise, we +-- might see connections established for this operations) +ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1; +ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms'; +SELECT pg_reload_conf(); +SELECT pg_sleep(0.1); + +-- now that last 2PC recovery is done, we're good to disable it +ALTER SYSTEM SET citus.recover_2pc_interval TO '-1'; +SELECT pg_reload_conf(); + + +\c - - - :master_port +-- sometimes Postgres is a little slow to terminate the backends +-- even if PGFinish is sent. So, to prevent any flaky tests, sleep +SELECT pg_sleep(0.1); +-- since max_cached_conns_per_worker == 0 at this point, the +-- backend(s) that execute on the shards will be terminated +-- so show that there is only a single client backend, +-- which is actually the backend that executes here +SET search_path TO single_node; +SELECT count(*) from should_commit; +SELECT pg_catalog.get_all_active_client_backend_count(); +BEGIN; + SET citus.shard_count TO 32; + SET citus.force_max_query_parallelization TO ON; + SET citus.enable_local_execution TO false; + + CREATE TABLE test (a int); + SET citus.shard_replication_factor TO 1; + SELECT create_distributed_table('test', 'a'); + SELECT count(*) FROM test; + + -- now, we should have additional 32 connections + SELECT pg_catalog.get_all_active_client_backend_count(); +ROLLBACK; + +-- set the values to originals back +ALTER SYSTEM RESET citus.max_cached_conns_per_worker; +ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; +ALTER SYSTEM RESET citus.recover_2pc_interval; +ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; +SELECT pg_reload_conf(); + +-- suppress notices +SET client_min_messages TO error; + -- cannot remove coordinator since a reference table exists on coordinator and no other worker nodes are added SELECT 1 FROM master_remove_node('localhost', :master_port);