diff --git a/src/backend/distributed/connection/shared_connection_stats.c b/src/backend/distributed/connection/shared_connection_stats.c index 7592a6feb..4cdd065d7 100644 --- a/src/backend/distributed/connection/shared_connection_stats.c +++ b/src/backend/distributed/connection/shared_connection_stats.c @@ -106,6 +106,9 @@ int MaxSharedPoolSize = 0; */ int LocalSharedPoolSize = 0; +/* number of connections reserved for Citus */ +int MaxClientConnections = ALLOW_ALL_EXTERNAL_CONNECTIONS; + /* the following two structs are used for accessing shared memory */ static HTAB *SharedConnStatsHash = NULL; @@ -192,6 +195,25 @@ StoreAllRemoteConnectionStats(Tuplestorestate *tupleStore, TupleDesc tupleDescri } +/* + * GetMaxClientConnections returns the value of citus.max_client_connections, + * or max_connections when it is -1 or when connecting as superuser. + * + * The latter is done because citus.max_client_connections does not apply to + * superuser. + */ +int +GetMaxClientConnections(void) +{ + if (MaxClientConnections == ALLOW_ALL_EXTERNAL_CONNECTIONS || superuser()) + { + return MaxConnections; + } + + return MaxClientConnections; +} + + /* * GetMaxSharedPoolSize is a wrapper around MaxSharedPoolSize which is controlled * via a GUC. @@ -204,7 +226,7 @@ GetMaxSharedPoolSize(void) { if (MaxSharedPoolSize == ADJUST_POOLSIZE_AUTOMATICALLY) { - return MaxConnections; + return GetMaxClientConnections(); } return MaxSharedPoolSize; @@ -223,7 +245,7 @@ GetLocalSharedPoolSize(void) { if (LocalSharedPoolSize == ADJUST_POOLSIZE_AUTOMATICALLY) { - return MaxConnections * 0.5; + return GetMaxClientConnections() * 0.5; } return LocalSharedPoolSize; @@ -318,7 +340,7 @@ TryToIncrementSharedConnectionCounter(const char *hostname, int port) return false; } - activeBackendCount = GetAllActiveClientBackendCount(); + activeBackendCount = GetExternalClientBackendCount(); } LockConnectionSharedMemory(LW_EXCLUSIVE); diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 7897cbefe..b5b8ac43f 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -23,6 +23,7 @@ #include "safe_lib.h" +#include "catalog/pg_authid.h" #include "citus_version.h" #include "commands/explain.h" #include "common/string.h" @@ -84,12 +85,14 @@ #include "libpq/auth.h" #include "port/atomics.h" #include "postmaster/postmaster.h" +#include "replication/walsender.h" #include "storage/ipc.h" #include "optimizer/planner.h" #include "optimizer/paths.h" #include "tcop/tcopprot.h" #include "utils/guc.h" #include "utils/guc_tables.h" +#include "utils/syscache.h" #include "utils/varlena.h" #include "columnar/mod.h" @@ -113,9 +116,9 @@ static void DoInitialCleanup(void); static void ResizeStackToMaximumDepth(void); static void multi_log_hook(ErrorData *edata); static void RegisterConnectionCleanup(void); -static void RegisterClientBackendCounterDecrement(void); +static void RegisterExternalClientBackendCounterDecrement(void); static void CitusCleanupConnectionsAtExit(int code, Datum arg); -static void DecrementClientBackendCounterAtExit(int code, Datum arg); +static void DecrementExternalClientBackendCounterAtExit(int code, Datum arg); static void CreateRequiredDirectories(void); static void RegisterCitusConfigVariables(void); static void OverridePostgresConfigAssignHooks(void); @@ -135,6 +138,7 @@ static const char * LocalPoolSizeGucShowHook(void); static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource source); static void CitusAuthHook(Port *port, int status); +static bool IsSuperuser(char *userName); static ClientAuthentication_hook_type original_client_auth_hook = NULL; @@ -488,16 +492,16 @@ RegisterConnectionCleanup(void) /* - * RegisterClientBackendCounterDecrement is called when the backend terminates. + * RegisterExternalClientBackendCounterDecrement is called when the backend terminates. * For all client backends, we register a callback that will undo */ static void -RegisterClientBackendCounterDecrement(void) +RegisterExternalClientBackendCounterDecrement(void) { static bool registeredCleanup = false; if (registeredCleanup == false) { - before_shmem_exit(DecrementClientBackendCounterAtExit, 0); + before_shmem_exit(DecrementExternalClientBackendCounterAtExit, 0); registeredCleanup = true; } @@ -527,13 +531,13 @@ CitusCleanupConnectionsAtExit(int code, Datum arg) /* - * DecrementClientBackendCounterAtExit is called before_shmem_exit() of the + * DecrementExternalClientBackendCounterAtExit is called before_shmem_exit() of the * backend for the purposes decrementing */ static void -DecrementClientBackendCounterAtExit(int code, Datum arg) +DecrementExternalClientBackendCounterAtExit(int code, Datum arg) { - DecrementClientBackendCounter(); + DecrementExternalClientBackendCounter(); } @@ -1340,6 +1344,23 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); + DefineCustomIntVariable( + "citus.max_client_connections", + gettext_noop("Sets the maximum number of connections regular clients can make"), + gettext_noop("To ensure that a Citus cluster has a sufficient number of " + "connection slots to serve queries internally, it can be " + "useful to reserve connection slots for Citus internal " + "connections. When max_client_connections is set to a value " + "below max_connections, the remaining connections are reserved " + "for connections between Citus nodes. This does not affect " + "superuser_reserved_connections. If set to -1, no connections " + "are reserved."), + &MaxClientConnections, + -1, -1, MaxConnections, + PGC_SUSET, + GUC_STANDARD, + NULL, NULL, NULL); + DefineCustomIntVariable( "citus.max_intermediate_result_size", gettext_noop("Sets the maximum size of the intermediate results in KB for " @@ -2171,12 +2192,73 @@ StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource source) static void CitusAuthHook(Port *port, int status) { + uint64 gpid = ExtractGlobalPID(port->application_name); + + /* external connections to not have a GPID immediately */ + if (gpid == INVALID_CITUS_INTERNAL_BACKEND_GPID) + { + /* + * We raise the shared connection counter pre-emptively. As a result, we may + * have scenarios in which a few simultaneous connection attempts prevent + * each other from succeeding, but we avoid scenarios where we oversubscribe + * the system. + * + * By also calling RegisterExternalClientBackendCounterDecrement here, we + * immediately lower the counter if we throw a FATAL error below. The client + * connection counter may temporarily exceed maxClientConnections in between. + */ + RegisterExternalClientBackendCounterDecrement(); + + uint32 externalClientCount = IncrementExternalClientBackendCounter(); + + /* + * Limit non-superuser client connections if citus.max_client_connections + * is set. + */ + if (MaxClientConnections >= 0 && + !IsSuperuser(port->user_name) && + externalClientCount > MaxClientConnections) + { + ereport(FATAL, (errcode(ERRCODE_TOO_MANY_CONNECTIONS), + errmsg("remaining connection slots are reserved for " + "non-replication superuser connections"), + errdetail("the server is configured to accept up to %d " + "regular client connections", + MaxClientConnections))); + } + } + /* let other authentication hooks to kick in first */ if (original_client_auth_hook) { original_client_auth_hook(port, status); } - - RegisterClientBackendCounterDecrement(); - IncrementClientBackendCounter(); +} + + +/* + * IsSuperuser returns whether the role with the given name is superuser. + */ +static bool +IsSuperuser(char *roleName) +{ + if (roleName == NULL) + { + return false; + } + + HeapTuple roleTuple = SearchSysCache1(AUTHNAME, CStringGetDatum(roleName)); + if (!HeapTupleIsValid(roleTuple)) + { + ereport(FATAL, + (errcode(ERRCODE_INVALID_AUTHORIZATION_SPECIFICATION), + errmsg("role \"%s\" does not exist", roleName))); + } + + Form_pg_authid rform = (Form_pg_authid) GETSTRUCT(roleTuple); + bool isSuperuser = rform->rolsuper; + + ReleaseSysCache(roleTuple); + + return isSuperuser; } diff --git a/src/backend/distributed/test/backend_counter.c b/src/backend/distributed/test/backend_counter.c index c63a45543..1b9984ac9 100644 --- a/src/backend/distributed/test/backend_counter.c +++ b/src/backend/distributed/test/backend_counter.c @@ -27,5 +27,5 @@ PG_FUNCTION_INFO_V1(get_all_active_client_backend_count); Datum get_all_active_client_backend_count(PG_FUNCTION_ARGS) { - PG_RETURN_UINT32(GetAllActiveClientBackendCount()); + PG_RETURN_UINT32(GetExternalClientBackendCount()); } diff --git a/src/backend/distributed/test/make_external_connection.c b/src/backend/distributed/test/make_external_connection.c new file mode 100644 index 000000000..424793dea --- /dev/null +++ b/src/backend/distributed/test/make_external_connection.c @@ -0,0 +1,67 @@ +/*------------------------------------------------------------------------- + * + * test/src/make_external_connection.c + * + * This file contains UDF to connect to a node without using the Citus + * internal application_name. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "miscadmin.h" +#include "libpq-fe.h" + +#include "access/xact.h" +#include "distributed/connection_management.h" +#include "distributed/coordinator_protocol.h" +#include "distributed/function_utils.h" +#include "distributed/intermediate_result_pruning.h" +#include "distributed/lock_graph.h" +#include "distributed/metadata_cache.h" +#include "distributed/remote_commands.h" +#include "distributed/run_from_same_connection.h" + +#include "distributed/version_compat.h" +#include "executor/spi.h" +#include "lib/stringinfo.h" +#include "postmaster/postmaster.h" +#include "utils/builtins.h" +#include "utils/memutils.h" + + +PG_FUNCTION_INFO_V1(make_external_connection_to_node); + + +/* + * make_external_connection_to_node opens a conneciton to a node + * and keeps it until the end of the session. + */ +Datum +make_external_connection_to_node(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + char *nodeName = text_to_cstring(PG_GETARG_TEXT_P(0)); + uint32 nodePort = PG_GETARG_UINT32(1); + char *userName = text_to_cstring(PG_GETARG_TEXT_P(2)); + char *databaseName = text_to_cstring(PG_GETARG_TEXT_P(3)); + + StringInfo connectionString = makeStringInfo(); + appendStringInfo(connectionString, + "host=%s port=%d user=%s dbname=%s", + nodeName, nodePort, userName, databaseName); + + PGconn *pgConn = PQconnectdb(connectionString->data); + + if (PQstatus(pgConn) != CONNECTION_OK) + { + PQfinish(pgConn); + + ereport(ERROR, (errmsg("connection failed"))); + } + + PG_RETURN_VOID(); +} diff --git a/src/backend/distributed/transaction/backend_data.c b/src/backend/distributed/transaction/backend_data.c index 3af20631a..f19fd9350 100644 --- a/src/backend/distributed/transaction/backend_data.c +++ b/src/backend/distributed/transaction/backend_data.c @@ -68,14 +68,12 @@ typedef struct BackendManagementShmemData pg_atomic_uint64 nextTransactionNumber; /* - * Total number of client backends that are authenticated. - * We only care about activeClientBackendCounter when adaptive - * connection management is enabled, otherwise always zero. + * Total number of external client backends that are authenticated. * * Note that the counter does not consider any background workers - * or such, it only counts client_backends. + * or such, and also exludes internal connections between nodes. */ - pg_atomic_uint32 activeClientBackendCounter; + pg_atomic_uint32 externalClientBackendCounter; BackendData backends[FLEXIBLE_ARRAY_MEMBER]; } BackendManagementShmemData; @@ -548,7 +546,7 @@ BackendManagementShmemInit(void) pg_atomic_init_u64(&backendManagementShmemData->nextTransactionNumber, 1); /* there are no active backends yet, so start with zero */ - pg_atomic_init_u32(&backendManagementShmemData->activeClientBackendCounter, 0); + pg_atomic_init_u32(&backendManagementShmemData->externalClientBackendCounter, 0); /* * We need to init per backend's spinlock before any backend @@ -1166,36 +1164,37 @@ GetMyProcLocalTransactionId(void) /* - * GetAllActiveClientBackendCount returns activeClientBackendCounter in + * GetExternalClientBackendCount returns externalClientBackendCounter in * the shared memory. */ int -GetAllActiveClientBackendCount(void) +GetExternalClientBackendCount(void) { uint32 activeBackendCount = - pg_atomic_read_u32(&backendManagementShmemData->activeClientBackendCounter); + pg_atomic_read_u32(&backendManagementShmemData->externalClientBackendCounter); return activeBackendCount; } /* - * IncrementClientBackendCounter increments activeClientBackendCounter in + * IncrementExternalClientBackendCounter increments externalClientBackendCounter in * the shared memory by one. */ -void -IncrementClientBackendCounter(void) +uint32 +IncrementExternalClientBackendCounter(void) { - pg_atomic_add_fetch_u32(&backendManagementShmemData->activeClientBackendCounter, 1); + return pg_atomic_add_fetch_u32( + &backendManagementShmemData->externalClientBackendCounter, 1); } /* - * DecrementClientBackendCounter decrements activeClientBackendCounter in + * DecrementExternalClientBackendCounter decrements externalClientBackendCounter in * the shared memory by one. */ void -DecrementClientBackendCounter(void) +DecrementExternalClientBackendCounter(void) { - pg_atomic_sub_fetch_u32(&backendManagementShmemData->activeClientBackendCounter, 1); + pg_atomic_sub_fetch_u32(&backendManagementShmemData->externalClientBackendCounter, 1); } diff --git a/src/include/distributed/backend_data.h b/src/include/distributed/backend_data.h index c34d94670..74f86177e 100644 --- a/src/include/distributed/backend_data.h +++ b/src/include/distributed/backend_data.h @@ -66,9 +66,9 @@ extern bool MyBackendGotCancelledDueToDeadlock(bool clearState); extern bool MyBackendIsInDisributedTransaction(void); extern List * ActiveDistributedTransactionNumbers(void); extern LocalTransactionId GetMyProcLocalTransactionId(void); -extern int GetAllActiveClientBackendCount(void); -extern void IncrementClientBackendCounter(void); -extern void DecrementClientBackendCounter(void); +extern int GetExternalClientBackendCount(void); +extern uint32 IncrementExternalClientBackendCounter(void); +extern void DecrementExternalClientBackendCounter(void); extern bool ExecuteRemoteQueryOrCommand(char *nodeName, uint32 nodePort, char *queryString, StringInfo queryResultString, diff --git a/src/include/distributed/shared_connection_stats.h b/src/include/distributed/shared_connection_stats.h index 1efb37d28..638d42f63 100644 --- a/src/include/distributed/shared_connection_stats.h +++ b/src/include/distributed/shared_connection_stats.h @@ -14,15 +14,18 @@ #define ADJUST_POOLSIZE_AUTOMATICALLY 0 #define DISABLE_CONNECTION_THROTTLING -1 #define DISABLE_REMOTE_CONNECTIONS_FOR_LOCAL_QUERIES -1 +#define ALLOW_ALL_EXTERNAL_CONNECTIONS -1 extern int MaxSharedPoolSize; extern int LocalSharedPoolSize; +extern int MaxClientConnections; extern void InitializeSharedConnectionStats(void); extern void WaitForSharedConnection(void); extern void WakeupWaiterBackendsForSharedConnection(void); +extern int GetMaxClientConnections(void); extern int GetMaxSharedPoolSize(void); extern int GetLocalSharedPoolSize(void); extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port); diff --git a/src/test/regress/expected/isolation_max_client_connections.out b/src/test/regress/expected/isolation_max_client_connections.out new file mode 100644 index 000000000..ada303511 --- /dev/null +++ b/src/test/regress/expected/isolation_max_client_connections.out @@ -0,0 +1,55 @@ +Parsed test spec with 3 sessions + +starting permutation: s1-grant s1-connect s2-connect s2-connect-superuser s3-select +run_command_on_workers +--------------------------------------------------------------------- +(localhost,57637,t,t) +(localhost,57638,t,t) +(2 rows) + +step s1-grant: + SELECT result FROM run_command_on_placements('my_table', 'GRANT SELECT ON TABLE %s TO my_user'); + +result +--------------------------------------------------------------------- +GRANT +GRANT +GRANT +GRANT +(4 rows) + +step s1-connect: + SELECT make_external_connection_to_node('localhost', 57637, 'my_user', current_database()); + +make_external_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step s2-connect: + SELECT make_external_connection_to_node('localhost', 57637, 'my_user', current_database()); + +ERROR: connection failed +step s2-connect-superuser: + SELECT make_external_connection_to_node('localhost', 57637, 'postgres', current_database()); + +make_external_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step s3-select: + SET ROLE my_user; + SELECT count(*) FROM my_table; + +count +--------------------------------------------------------------------- + 0 +(1 row) + +run_command_on_workers +--------------------------------------------------------------------- +(localhost,57637,t,t) +(localhost,57638,t,t) +(2 rows) + diff --git a/src/test/regress/expected/single_node.out b/src/test/regress/expected/single_node.out index 3b9f9f2c7..7e7e84a3a 100644 --- a/src/test/regress/expected/single_node.out +++ b/src/test/regress/expected/single_node.out @@ -1723,8 +1723,7 @@ SELECT pg_sleep(0.1); -- since max_cached_conns_per_worker == 0 at this point, the -- backend(s) that execute on the shards will be terminated --- so show that there is only a single client backend, --- which is actually the backend that executes here +-- so show that there no internal backends SET search_path TO single_node; SELECT count(*) from should_commit; count @@ -1732,7 +1731,13 @@ SELECT count(*) from should_commit; 0 (1 row) -SELECT pg_catalog.get_all_active_client_backend_count(); +SELECT count(*) FROM pg_stat_activity WHERE application_name LIKE 'citus_internal%'; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT get_all_active_client_backend_count(); get_all_active_client_backend_count --------------------------------------------------------------------- 1 @@ -1757,10 +1762,17 @@ BEGIN; (1 row) -- now, we should have additional 32 connections - SELECT pg_catalog.get_all_active_client_backend_count(); + SELECT count(*) FROM pg_stat_activity WHERE application_name LIKE 'citus_internal%'; + count +--------------------------------------------------------------------- + 32 +(1 row) + + -- single external connection + SELECT get_all_active_client_backend_count(); get_all_active_client_backend_count --------------------------------------------------------------------- - 33 + 1 (1 row) ROLLBACK; diff --git a/src/test/regress/isolation_schedule b/src/test/regress/isolation_schedule index e5602618f..8f849d5dd 100644 --- a/src/test/regress/isolation_schedule +++ b/src/test/regress/isolation_schedule @@ -62,6 +62,7 @@ test: isolation_validate_vs_insert test: isolation_insert_select_conflict test: shared_connection_waits test: isolation_cancellation +test: isolation_max_client_connections test: isolation_undistribute_table test: isolation_fix_partition_shard_index_names test: isolation_global_pid diff --git a/src/test/regress/spec/isolation_max_client_connections.spec b/src/test/regress/spec/isolation_max_client_connections.spec new file mode 100644 index 000000000..d7d80ed76 --- /dev/null +++ b/src/test/regress/spec/isolation_max_client_connections.spec @@ -0,0 +1,66 @@ +setup +{ + SET citus.shard_replication_factor TO 1; + + CREATE USER my_user; + SELECT run_command_on_workers('CREATE USER my_user'); + + CREATE TABLE my_table (test_id integer NOT NULL, data text); + SELECT create_distributed_table('my_table', 'test_id'); + + GRANT USAGE ON SCHEMA public TO my_user; + GRANT SELECT ON TABLE my_table TO my_user; + + CREATE FUNCTION make_external_connection_to_node(text,int,text,text) + RETURNS void + AS 'citus' + LANGUAGE C STRICT; + + SELECT run_command_on_workers('ALTER SYSTEM SET citus.max_client_connections TO 1'); + SELECT run_command_on_workers('SELECT pg_reload_conf()'); +} + +teardown +{ + SELECT run_command_on_workers('ALTER SYSTEM RESET citus.max_client_connections'); + SELECT run_command_on_workers('SELECT pg_reload_conf()'); +} + +session "s1" + +// Setup runs as a transaction, so run_command_on_placements must be separate +step "s1-grant" +{ + SELECT result FROM run_command_on_placements('my_table', 'GRANT SELECT ON TABLE %s TO my_user'); +} + +// Open one external connection as non-superuser, is allowed +step "s1-connect" +{ + SELECT make_external_connection_to_node('localhost', 57637, 'my_user', current_database()); +} + +session "s2" + +// Open another external connection as non-superuser, not allowed +step "s2-connect" +{ + SELECT make_external_connection_to_node('localhost', 57637, 'my_user', current_database()); +} + +// Open another external connection as superuser, allowed +step "s2-connect-superuser" +{ + SELECT make_external_connection_to_node('localhost', 57637, 'postgres', current_database()); +} + +session "s3" + +// Open internal connections as non-superuser, allowed +step "s3-select" +{ + SET ROLE my_user; + SELECT count(*) FROM my_table; +} + +permutation "s1-grant" "s1-connect" "s2-connect" "s2-connect-superuser" "s3-select" diff --git a/src/test/regress/sql/single_node.sql b/src/test/regress/sql/single_node.sql index 3ca456108..fbd57c0b6 100644 --- a/src/test/regress/sql/single_node.sql +++ b/src/test/regress/sql/single_node.sql @@ -915,11 +915,11 @@ SELECT table_name, citus_table_type, distribution_column, shard_count FROM publi SELECT pg_sleep(0.1); -- since max_cached_conns_per_worker == 0 at this point, the -- backend(s) that execute on the shards will be terminated --- so show that there is only a single client backend, --- which is actually the backend that executes here +-- so show that there no internal backends SET search_path TO single_node; SELECT count(*) from should_commit; -SELECT pg_catalog.get_all_active_client_backend_count(); +SELECT count(*) FROM pg_stat_activity WHERE application_name LIKE 'citus_internal%'; +SELECT get_all_active_client_backend_count(); BEGIN; SET LOCAL citus.shard_count TO 32; SET LOCAL citus.force_max_query_parallelization TO ON; @@ -931,7 +931,10 @@ BEGIN; SELECT count(*) FROM test; -- now, we should have additional 32 connections - SELECT pg_catalog.get_all_active_client_backend_count(); + SELECT count(*) FROM pg_stat_activity WHERE application_name LIKE 'citus_internal%'; + + -- single external connection + SELECT get_all_active_client_backend_count(); ROLLBACK;