mirror of https://github.com/citusdata/citus.git
Merge pull request #4312 from citusdata/single_node_conn_mngmt_backend_counter
Add the infrastructure to count the number of client backendspull/4349/head
commit
7539454ccb
|
@ -74,6 +74,7 @@
|
|||
#include "distributed/worker_protocol.h"
|
||||
#include "distributed/worker_shard_visibility.h"
|
||||
#include "distributed/adaptive_executor.h"
|
||||
#include "libpq/auth.h"
|
||||
#include "port/atomics.h"
|
||||
#include "postmaster/postmaster.h"
|
||||
#include "storage/ipc.h"
|
||||
|
@ -100,7 +101,9 @@ static void DoInitialCleanup(void);
|
|||
static void ResizeStackToMaximumDepth(void);
|
||||
static void multi_log_hook(ErrorData *edata);
|
||||
static void RegisterConnectionCleanup(void);
|
||||
static void RegisterClientBackendCounterDecrement(void);
|
||||
static void CitusCleanupConnectionsAtExit(int code, Datum arg);
|
||||
static void DecrementClientBackendCounterAtExit(int code, Datum arg);
|
||||
static void CreateRequiredDirectories(void);
|
||||
static void RegisterCitusConfigVariables(void);
|
||||
static bool ErrorIfNotASuitableDeadlockFactor(double *newval, void **extra,
|
||||
|
@ -112,11 +115,15 @@ static void NodeConninfoGucAssignHook(const char *newval, void *extra);
|
|||
static const char * MaxSharedPoolSizeGucShowHook(void);
|
||||
static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource
|
||||
source);
|
||||
static void CitusAuthHook(Port *port, int status);
|
||||
|
||||
|
||||
/* static variable to hold value of deprecated GUC variable */
|
||||
static bool DeprecatedBool = false;
|
||||
static int DeprecatedInt = 0;
|
||||
|
||||
static ClientAuthentication_hook_type original_client_auth_hook = NULL;
|
||||
|
||||
|
||||
/* *INDENT-OFF* */
|
||||
/* GUC enum definitions */
|
||||
|
@ -286,6 +293,14 @@ _PG_init(void)
|
|||
/* register hook for error messages */
|
||||
emit_log_hook = multi_log_hook;
|
||||
|
||||
|
||||
/*
|
||||
* Register hook for counting client backends that
|
||||
* are successfully authenticated.
|
||||
*/
|
||||
original_client_auth_hook = ClientAuthentication_hook;
|
||||
ClientAuthentication_hook = CitusAuthHook;
|
||||
|
||||
InitializeMaintenanceDaemon();
|
||||
|
||||
/* initialize coordinated transaction management */
|
||||
|
@ -447,6 +462,23 @@ RegisterConnectionCleanup(void)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* RegisterClientBackendCounterDecrement is called when the backend terminates.
|
||||
* For all client backends, we register a callback that will undo
|
||||
*/
|
||||
static void
|
||||
RegisterClientBackendCounterDecrement(void)
|
||||
{
|
||||
static bool registeredCleanup = false;
|
||||
if (registeredCleanup == false)
|
||||
{
|
||||
before_shmem_exit(DecrementClientBackendCounterAtExit, 0);
|
||||
|
||||
registeredCleanup = true;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CitusCleanupConnectionsAtExit is called before_shmem_exit() of the
|
||||
* backend for the purposes of any clean-up needed.
|
||||
|
@ -466,6 +498,17 @@ CitusCleanupConnectionsAtExit(int code, Datum arg)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* DecrementClientBackendCounterAtExit is called before_shmem_exit() of the
|
||||
* backend for the purposes decrementing
|
||||
*/
|
||||
static void
|
||||
DecrementClientBackendCounterAtExit(int code, Datum arg)
|
||||
{
|
||||
DecrementClientBackendCounter();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateRequiredDirectories - Create directories required for Citus to
|
||||
* function.
|
||||
|
@ -1741,3 +1784,21 @@ StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource source)
|
|||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CitusAuthHook is a callback for client authentication that Postgres provides.
|
||||
* Citus uses this hook to count the number of active backends.
|
||||
*/
|
||||
static void
|
||||
CitusAuthHook(Port *port, int status)
|
||||
{
|
||||
/* let other authentication hooks to kick in first */
|
||||
if (original_client_auth_hook)
|
||||
{
|
||||
original_client_auth_hook(port, status);
|
||||
}
|
||||
|
||||
RegisterClientBackendCounterDecrement();
|
||||
IncrementClientBackendCounter();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,31 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* test/src/backend_counter.c
|
||||
*
|
||||
* This file contains functions to test the active backend counter
|
||||
* within Citus.
|
||||
*
|
||||
* Copyright (c) Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#include "postgres.h"
|
||||
#include "fmgr.h"
|
||||
|
||||
#include "distributed/backend_data.h"
|
||||
|
||||
|
||||
/* declarations for dynamic loading */
|
||||
PG_FUNCTION_INFO_V1(get_all_active_client_backend_count);
|
||||
|
||||
|
||||
/*
|
||||
* get_all_active_client_backend_count returns the active
|
||||
* client backend count.
|
||||
*/
|
||||
Datum
|
||||
get_all_active_client_backend_count(PG_FUNCTION_ARGS)
|
||||
{
|
||||
PG_RETURN_UINT32(GetAllActiveClientBackendCount());
|
||||
}
|
|
@ -27,6 +27,7 @@
|
|||
#include "distributed/lock_graph.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/remote_commands.h"
|
||||
#include "distributed/shared_connection_stats.h"
|
||||
#include "distributed/transaction_identifier.h"
|
||||
#include "distributed/tuplestore.h"
|
||||
#include "nodes/execnodes.h"
|
||||
|
@ -63,6 +64,16 @@ typedef struct BackendManagementShmemData
|
|||
*/
|
||||
pg_atomic_uint64 nextTransactionNumber;
|
||||
|
||||
/*
|
||||
* Total number of client backends that are authenticated.
|
||||
* We only care about activeClientBackendCounter when adaptive
|
||||
* connection management is enabled, otherwise always zero.
|
||||
*
|
||||
* Note that the counter does not consider any background workers
|
||||
* or such, it only counts client_backends.
|
||||
*/
|
||||
pg_atomic_uint32 activeClientBackendCounter;
|
||||
|
||||
BackendData backends[FLEXIBLE_ARRAY_MEMBER];
|
||||
} BackendManagementShmemData;
|
||||
|
||||
|
@ -496,6 +507,9 @@ BackendManagementShmemInit(void)
|
|||
/* start the distributed transaction ids from 1 */
|
||||
pg_atomic_init_u64(&backendManagementShmemData->nextTransactionNumber, 1);
|
||||
|
||||
/* there are no active backends yet, so start with zero */
|
||||
pg_atomic_init_u32(&backendManagementShmemData->activeClientBackendCounter, 0);
|
||||
|
||||
/*
|
||||
* We need to init per backend's spinlock before any backend
|
||||
* starts its execution. Note that we initialize TotalProcs (e.g., not
|
||||
|
@ -945,3 +959,39 @@ GetMyProcLocalTransactionId(void)
|
|||
{
|
||||
return MyProc->lxid;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetAllActiveClientBackendCount returns activeClientBackendCounter in
|
||||
* the shared memory.
|
||||
*/
|
||||
int
|
||||
GetAllActiveClientBackendCount(void)
|
||||
{
|
||||
uint32 activeBackendCount =
|
||||
pg_atomic_read_u32(&backendManagementShmemData->activeClientBackendCounter);
|
||||
|
||||
return activeBackendCount;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IncrementClientBackendCounter increments activeClientBackendCounter in
|
||||
* the shared memory by one.
|
||||
*/
|
||||
void
|
||||
IncrementClientBackendCounter(void)
|
||||
{
|
||||
pg_atomic_add_fetch_u32(&backendManagementShmemData->activeClientBackendCounter, 1);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DecrementClientBackendCounter decrements activeClientBackendCounter in
|
||||
* the shared memory by one.
|
||||
*/
|
||||
void
|
||||
DecrementClientBackendCounter(void)
|
||||
{
|
||||
pg_atomic_sub_fetch_u32(&backendManagementShmemData->activeClientBackendCounter, 1);
|
||||
}
|
||||
|
|
|
@ -67,6 +67,9 @@ extern void GetBackendDataForProc(PGPROC *proc, BackendData *result);
|
|||
extern void CancelTransactionDueToDeadlock(PGPROC *proc);
|
||||
extern bool MyBackendGotCancelledDueToDeadlock(bool clearState);
|
||||
extern List * ActiveDistributedTransactionNumbers(void);
|
||||
LocalTransactionId GetMyProcLocalTransactionId(void);
|
||||
extern LocalTransactionId GetMyProcLocalTransactionId(void);
|
||||
extern int GetAllActiveClientBackendCount(void);
|
||||
extern void IncrementClientBackendCounter(void);
|
||||
extern void DecrementClientBackendCounter(void);
|
||||
|
||||
#endif /* BACKEND_DATA_H */
|
||||
|
|
|
@ -965,6 +965,113 @@ DEBUG: not pushing down function to the same node
|
|||
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP TABLE test CASCADE;
|
||||
CREATE OR REPLACE FUNCTION pg_catalog.get_all_active_client_backend_count()
|
||||
RETURNS bigint
|
||||
LANGUAGE C STRICT
|
||||
AS 'citus', $$get_all_active_client_backend_count$$;
|
||||
-- set the cached connections to zero
|
||||
-- and execute a distributed query so that
|
||||
-- we end up with zero cached connections afterwards
|
||||
ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0;
|
||||
SELECT pg_reload_conf();
|
||||
pg_reload_conf
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
-- disable deadlock detection and re-trigger 2PC recovery
|
||||
-- once more when citus.max_cached_conns_per_worker is zero
|
||||
-- so that we can be sure that the connections established for
|
||||
-- maintanince daemon is closed properly.
|
||||
-- this is to prevent random failures in the tests (otherwise, we
|
||||
-- might see connections established for this operations)
|
||||
ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1;
|
||||
ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms';
|
||||
SELECT pg_reload_conf();
|
||||
pg_reload_conf
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT pg_sleep(0.1);
|
||||
pg_sleep
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- now that last 2PC recovery is done, we're good to disable it
|
||||
ALTER SYSTEM SET citus.recover_2pc_interval TO '-1';
|
||||
SELECT pg_reload_conf();
|
||||
pg_reload_conf
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
\c - - - :master_port
|
||||
-- sometimes Postgres is a little slow to terminate the backends
|
||||
-- even if PGFinish is sent. So, to prevent any flaky tests, sleep
|
||||
SELECT pg_sleep(0.1);
|
||||
pg_sleep
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- since max_cached_conns_per_worker == 0 at this point, the
|
||||
-- backend(s) that execute on the shards will be terminated
|
||||
-- so show that there is only a single client backend,
|
||||
-- which is actually the backend that executes here
|
||||
SET search_path TO single_node;
|
||||
SELECT count(*) from should_commit;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT pg_catalog.get_all_active_client_backend_count();
|
||||
get_all_active_client_backend_count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
BEGIN;
|
||||
SET citus.shard_count TO 32;
|
||||
SET citus.force_max_query_parallelization TO ON;
|
||||
SET citus.enable_local_execution TO false;
|
||||
CREATE TABLE test (a int);
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SELECT create_distributed_table('test', 'a');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM test;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- now, we should have additional 32 connections
|
||||
SELECT pg_catalog.get_all_active_client_backend_count();
|
||||
get_all_active_client_backend_count
|
||||
---------------------------------------------------------------------
|
||||
33
|
||||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
-- set the values to originals back
|
||||
ALTER SYSTEM RESET citus.max_cached_conns_per_worker;
|
||||
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
|
||||
ALTER SYSTEM RESET citus.recover_2pc_interval;
|
||||
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
|
||||
SELECT pg_reload_conf();
|
||||
pg_reload_conf
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
-- suppress notices
|
||||
SET client_min_messages TO error;
|
||||
-- cannot remove coordinator since a reference table exists on coordinator and no other worker nodes are added
|
||||
SELECT 1 FROM master_remove_node('localhost', :master_port);
|
||||
ERROR: cannot remove the last worker node because there are reference tables and it would cause data loss on reference tables
|
||||
|
|
|
@ -512,6 +512,69 @@ SELECT function_delegation(1);
|
|||
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP TABLE test CASCADE;
|
||||
|
||||
CREATE OR REPLACE FUNCTION pg_catalog.get_all_active_client_backend_count()
|
||||
RETURNS bigint
|
||||
LANGUAGE C STRICT
|
||||
AS 'citus', $$get_all_active_client_backend_count$$;
|
||||
|
||||
-- set the cached connections to zero
|
||||
-- and execute a distributed query so that
|
||||
-- we end up with zero cached connections afterwards
|
||||
ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0;
|
||||
SELECT pg_reload_conf();
|
||||
|
||||
-- disable deadlock detection and re-trigger 2PC recovery
|
||||
-- once more when citus.max_cached_conns_per_worker is zero
|
||||
-- so that we can be sure that the connections established for
|
||||
-- maintanince daemon is closed properly.
|
||||
-- this is to prevent random failures in the tests (otherwise, we
|
||||
-- might see connections established for this operations)
|
||||
ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1;
|
||||
ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms';
|
||||
SELECT pg_reload_conf();
|
||||
SELECT pg_sleep(0.1);
|
||||
|
||||
-- now that last 2PC recovery is done, we're good to disable it
|
||||
ALTER SYSTEM SET citus.recover_2pc_interval TO '-1';
|
||||
SELECT pg_reload_conf();
|
||||
|
||||
|
||||
\c - - - :master_port
|
||||
-- sometimes Postgres is a little slow to terminate the backends
|
||||
-- even if PGFinish is sent. So, to prevent any flaky tests, sleep
|
||||
SELECT pg_sleep(0.1);
|
||||
-- since max_cached_conns_per_worker == 0 at this point, the
|
||||
-- backend(s) that execute on the shards will be terminated
|
||||
-- so show that there is only a single client backend,
|
||||
-- which is actually the backend that executes here
|
||||
SET search_path TO single_node;
|
||||
SELECT count(*) from should_commit;
|
||||
SELECT pg_catalog.get_all_active_client_backend_count();
|
||||
BEGIN;
|
||||
SET citus.shard_count TO 32;
|
||||
SET citus.force_max_query_parallelization TO ON;
|
||||
SET citus.enable_local_execution TO false;
|
||||
|
||||
CREATE TABLE test (a int);
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SELECT create_distributed_table('test', 'a');
|
||||
SELECT count(*) FROM test;
|
||||
|
||||
-- now, we should have additional 32 connections
|
||||
SELECT pg_catalog.get_all_active_client_backend_count();
|
||||
ROLLBACK;
|
||||
|
||||
-- set the values to originals back
|
||||
ALTER SYSTEM RESET citus.max_cached_conns_per_worker;
|
||||
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
|
||||
ALTER SYSTEM RESET citus.recover_2pc_interval;
|
||||
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
|
||||
SELECT pg_reload_conf();
|
||||
|
||||
-- suppress notices
|
||||
SET client_min_messages TO error;
|
||||
|
||||
-- cannot remove coordinator since a reference table exists on coordinator and no other worker nodes are added
|
||||
SELECT 1 FROM master_remove_node('localhost', :master_port);
|
||||
|
||||
|
|
Loading…
Reference in New Issue