mirror of https://github.com/citusdata/citus.git
Add a citus.internal_reserved_connections setting
parent
2f0c346c8a
commit
43e4dd3808
|
@ -106,6 +106,9 @@ int MaxSharedPoolSize = 0;
|
|||
*/
|
||||
int LocalSharedPoolSize = 0;
|
||||
|
||||
/* number of connections reserved for Citus */
|
||||
int MaxClientConnections = ALLOW_ALL_EXTERNAL_CONNECTIONS;
|
||||
|
||||
|
||||
/* the following two structs are used for accessing shared memory */
|
||||
static HTAB *SharedConnStatsHash = NULL;
|
||||
|
@ -192,6 +195,25 @@ StoreAllRemoteConnectionStats(Tuplestorestate *tupleStore, TupleDesc tupleDescri
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetMaxClientConnections returns the value of citus.max_client_connections,
|
||||
* or max_connections when it is -1 or when connecting as superuser.
|
||||
*
|
||||
* The latter is done because citus.max_client_connections does not apply to
|
||||
* superuser.
|
||||
*/
|
||||
int
|
||||
GetMaxClientConnections(void)
|
||||
{
|
||||
if (MaxClientConnections == ALLOW_ALL_EXTERNAL_CONNECTIONS || superuser())
|
||||
{
|
||||
return MaxConnections;
|
||||
}
|
||||
|
||||
return MaxClientConnections;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetMaxSharedPoolSize is a wrapper around MaxSharedPoolSize which is controlled
|
||||
* via a GUC.
|
||||
|
@ -204,7 +226,7 @@ GetMaxSharedPoolSize(void)
|
|||
{
|
||||
if (MaxSharedPoolSize == ADJUST_POOLSIZE_AUTOMATICALLY)
|
||||
{
|
||||
return MaxConnections;
|
||||
return GetMaxClientConnections();
|
||||
}
|
||||
|
||||
return MaxSharedPoolSize;
|
||||
|
@ -223,7 +245,7 @@ GetLocalSharedPoolSize(void)
|
|||
{
|
||||
if (LocalSharedPoolSize == ADJUST_POOLSIZE_AUTOMATICALLY)
|
||||
{
|
||||
return MaxConnections * 0.5;
|
||||
return GetMaxClientConnections() * 0.5;
|
||||
}
|
||||
|
||||
return LocalSharedPoolSize;
|
||||
|
@ -318,7 +340,7 @@ TryToIncrementSharedConnectionCounter(const char *hostname, int port)
|
|||
return false;
|
||||
}
|
||||
|
||||
activeBackendCount = GetAllActiveClientBackendCount();
|
||||
activeBackendCount = GetExternalClientBackendCount();
|
||||
}
|
||||
|
||||
LockConnectionSharedMemory(LW_EXCLUSIVE);
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
|
||||
#include "safe_lib.h"
|
||||
|
||||
#include "catalog/pg_authid.h"
|
||||
#include "citus_version.h"
|
||||
#include "commands/explain.h"
|
||||
#include "common/string.h"
|
||||
|
@ -84,12 +85,14 @@
|
|||
#include "libpq/auth.h"
|
||||
#include "port/atomics.h"
|
||||
#include "postmaster/postmaster.h"
|
||||
#include "replication/walsender.h"
|
||||
#include "storage/ipc.h"
|
||||
#include "optimizer/planner.h"
|
||||
#include "optimizer/paths.h"
|
||||
#include "tcop/tcopprot.h"
|
||||
#include "utils/guc.h"
|
||||
#include "utils/guc_tables.h"
|
||||
#include "utils/syscache.h"
|
||||
#include "utils/varlena.h"
|
||||
|
||||
#include "columnar/mod.h"
|
||||
|
@ -113,9 +116,9 @@ static void DoInitialCleanup(void);
|
|||
static void ResizeStackToMaximumDepth(void);
|
||||
static void multi_log_hook(ErrorData *edata);
|
||||
static void RegisterConnectionCleanup(void);
|
||||
static void RegisterClientBackendCounterDecrement(void);
|
||||
static void RegisterExternalClientBackendCounterDecrement(void);
|
||||
static void CitusCleanupConnectionsAtExit(int code, Datum arg);
|
||||
static void DecrementClientBackendCounterAtExit(int code, Datum arg);
|
||||
static void DecrementExternalClientBackendCounterAtExit(int code, Datum arg);
|
||||
static void CreateRequiredDirectories(void);
|
||||
static void RegisterCitusConfigVariables(void);
|
||||
static void OverridePostgresConfigAssignHooks(void);
|
||||
|
@ -135,6 +138,7 @@ static const char * LocalPoolSizeGucShowHook(void);
|
|||
static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource
|
||||
source);
|
||||
static void CitusAuthHook(Port *port, int status);
|
||||
static bool IsSuperuser(char *userName);
|
||||
|
||||
|
||||
static ClientAuthentication_hook_type original_client_auth_hook = NULL;
|
||||
|
@ -488,16 +492,16 @@ RegisterConnectionCleanup(void)
|
|||
|
||||
|
||||
/*
|
||||
* RegisterClientBackendCounterDecrement is called when the backend terminates.
|
||||
* RegisterExternalClientBackendCounterDecrement is called when the backend terminates.
|
||||
* For all client backends, we register a callback that will undo
|
||||
*/
|
||||
static void
|
||||
RegisterClientBackendCounterDecrement(void)
|
||||
RegisterExternalClientBackendCounterDecrement(void)
|
||||
{
|
||||
static bool registeredCleanup = false;
|
||||
if (registeredCleanup == false)
|
||||
{
|
||||
before_shmem_exit(DecrementClientBackendCounterAtExit, 0);
|
||||
before_shmem_exit(DecrementExternalClientBackendCounterAtExit, 0);
|
||||
|
||||
registeredCleanup = true;
|
||||
}
|
||||
|
@ -527,13 +531,13 @@ CitusCleanupConnectionsAtExit(int code, Datum arg)
|
|||
|
||||
|
||||
/*
|
||||
* DecrementClientBackendCounterAtExit is called before_shmem_exit() of the
|
||||
* DecrementExternalClientBackendCounterAtExit is called before_shmem_exit() of the
|
||||
* backend for the purposes decrementing
|
||||
*/
|
||||
static void
|
||||
DecrementClientBackendCounterAtExit(int code, Datum arg)
|
||||
DecrementExternalClientBackendCounterAtExit(int code, Datum arg)
|
||||
{
|
||||
DecrementClientBackendCounter();
|
||||
DecrementExternalClientBackendCounter();
|
||||
}
|
||||
|
||||
|
||||
|
@ -1340,6 +1344,23 @@ RegisterCitusConfigVariables(void)
|
|||
GUC_STANDARD,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomIntVariable(
|
||||
"citus.max_client_connections",
|
||||
gettext_noop("Sets the maximum number of connections regular clients can make"),
|
||||
gettext_noop("To ensure that a Citus cluster has a sufficient number of "
|
||||
"connection slots to serve queries internally, it can be "
|
||||
"useful to reserve connection slots for Citus internal "
|
||||
"connections. When max_client_connections is set to a value "
|
||||
"below max_connections, the remaining connections are reserved "
|
||||
"for connections between Citus nodes. This does not affect "
|
||||
"superuser_reserved_connections. If set to -1, no connections "
|
||||
"are reserved."),
|
||||
&MaxClientConnections,
|
||||
-1, -1, MaxConnections,
|
||||
PGC_SUSET,
|
||||
GUC_STANDARD,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomIntVariable(
|
||||
"citus.max_intermediate_result_size",
|
||||
gettext_noop("Sets the maximum size of the intermediate results in KB for "
|
||||
|
@ -2171,12 +2192,73 @@ StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource source)
|
|||
static void
|
||||
CitusAuthHook(Port *port, int status)
|
||||
{
|
||||
uint64 gpid = ExtractGlobalPID(port->application_name);
|
||||
|
||||
/* external connections to not have a GPID immediately */
|
||||
if (gpid == INVALID_CITUS_INTERNAL_BACKEND_GPID)
|
||||
{
|
||||
/*
|
||||
* We raise the shared connection counter pre-emptively. As a result, we may
|
||||
* have scenarios in which a few simultaneous connection attempts prevent
|
||||
* each other from succeeding, but we avoid scenarios where we oversubscribe
|
||||
* the system.
|
||||
*
|
||||
* By also calling RegisterExternalClientBackendCounterDecrement here, we
|
||||
* immediately lower the counter if we throw a FATAL error below. The client
|
||||
* connection counter may temporarily exceed maxClientConnections in between.
|
||||
*/
|
||||
RegisterExternalClientBackendCounterDecrement();
|
||||
|
||||
uint32 externalClientCount = IncrementExternalClientBackendCounter();
|
||||
|
||||
/*
|
||||
* Limit non-superuser client connections if citus.max_client_connections
|
||||
* is set.
|
||||
*/
|
||||
if (MaxClientConnections >= 0 &&
|
||||
!IsSuperuser(port->user_name) &&
|
||||
externalClientCount > MaxClientConnections)
|
||||
{
|
||||
ereport(FATAL, (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
|
||||
errmsg("remaining connection slots are reserved for "
|
||||
"non-replication superuser connections"),
|
||||
errdetail("the server is configured to accept up to %d "
|
||||
"regular client connections",
|
||||
MaxClientConnections)));
|
||||
}
|
||||
}
|
||||
|
||||
/* let other authentication hooks to kick in first */
|
||||
if (original_client_auth_hook)
|
||||
{
|
||||
original_client_auth_hook(port, status);
|
||||
}
|
||||
|
||||
RegisterClientBackendCounterDecrement();
|
||||
IncrementClientBackendCounter();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsSuperuser returns whether the role with the given name is superuser.
|
||||
*/
|
||||
static bool
|
||||
IsSuperuser(char *roleName)
|
||||
{
|
||||
if (roleName == NULL)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
HeapTuple roleTuple = SearchSysCache1(AUTHNAME, CStringGetDatum(roleName));
|
||||
if (!HeapTupleIsValid(roleTuple))
|
||||
{
|
||||
ereport(FATAL,
|
||||
(errcode(ERRCODE_INVALID_AUTHORIZATION_SPECIFICATION),
|
||||
errmsg("role \"%s\" does not exist", roleName)));
|
||||
}
|
||||
|
||||
Form_pg_authid rform = (Form_pg_authid) GETSTRUCT(roleTuple);
|
||||
bool isSuperuser = rform->rolsuper;
|
||||
|
||||
ReleaseSysCache(roleTuple);
|
||||
|
||||
return isSuperuser;
|
||||
}
|
||||
|
|
|
@ -27,5 +27,5 @@ PG_FUNCTION_INFO_V1(get_all_active_client_backend_count);
|
|||
Datum
|
||||
get_all_active_client_backend_count(PG_FUNCTION_ARGS)
|
||||
{
|
||||
PG_RETURN_UINT32(GetAllActiveClientBackendCount());
|
||||
PG_RETURN_UINT32(GetExternalClientBackendCount());
|
||||
}
|
||||
|
|
|
@ -0,0 +1,67 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* test/src/make_external_connection.c
|
||||
*
|
||||
* This file contains UDF to connect to a node without using the Citus
|
||||
* internal application_name.
|
||||
*
|
||||
* Copyright (c) Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#include "postgres.h"
|
||||
#include "miscadmin.h"
|
||||
#include "libpq-fe.h"
|
||||
|
||||
#include "access/xact.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/coordinator_protocol.h"
|
||||
#include "distributed/function_utils.h"
|
||||
#include "distributed/intermediate_result_pruning.h"
|
||||
#include "distributed/lock_graph.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/remote_commands.h"
|
||||
#include "distributed/run_from_same_connection.h"
|
||||
|
||||
#include "distributed/version_compat.h"
|
||||
#include "executor/spi.h"
|
||||
#include "lib/stringinfo.h"
|
||||
#include "postmaster/postmaster.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/memutils.h"
|
||||
|
||||
|
||||
PG_FUNCTION_INFO_V1(make_external_connection_to_node);
|
||||
|
||||
|
||||
/*
|
||||
* make_external_connection_to_node opens a conneciton to a node
|
||||
* and keeps it until the end of the session.
|
||||
*/
|
||||
Datum
|
||||
make_external_connection_to_node(PG_FUNCTION_ARGS)
|
||||
{
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
char *nodeName = text_to_cstring(PG_GETARG_TEXT_P(0));
|
||||
uint32 nodePort = PG_GETARG_UINT32(1);
|
||||
char *userName = text_to_cstring(PG_GETARG_TEXT_P(2));
|
||||
char *databaseName = text_to_cstring(PG_GETARG_TEXT_P(3));
|
||||
|
||||
StringInfo connectionString = makeStringInfo();
|
||||
appendStringInfo(connectionString,
|
||||
"host=%s port=%d user=%s dbname=%s",
|
||||
nodeName, nodePort, userName, databaseName);
|
||||
|
||||
PGconn *pgConn = PQconnectdb(connectionString->data);
|
||||
|
||||
if (PQstatus(pgConn) != CONNECTION_OK)
|
||||
{
|
||||
PQfinish(pgConn);
|
||||
|
||||
ereport(ERROR, (errmsg("connection failed")));
|
||||
}
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
|
@ -68,14 +68,12 @@ typedef struct BackendManagementShmemData
|
|||
pg_atomic_uint64 nextTransactionNumber;
|
||||
|
||||
/*
|
||||
* Total number of client backends that are authenticated.
|
||||
* We only care about activeClientBackendCounter when adaptive
|
||||
* connection management is enabled, otherwise always zero.
|
||||
* Total number of external client backends that are authenticated.
|
||||
*
|
||||
* Note that the counter does not consider any background workers
|
||||
* or such, it only counts client_backends.
|
||||
* or such, and also exludes internal connections between nodes.
|
||||
*/
|
||||
pg_atomic_uint32 activeClientBackendCounter;
|
||||
pg_atomic_uint32 externalClientBackendCounter;
|
||||
|
||||
BackendData backends[FLEXIBLE_ARRAY_MEMBER];
|
||||
} BackendManagementShmemData;
|
||||
|
@ -548,7 +546,7 @@ BackendManagementShmemInit(void)
|
|||
pg_atomic_init_u64(&backendManagementShmemData->nextTransactionNumber, 1);
|
||||
|
||||
/* there are no active backends yet, so start with zero */
|
||||
pg_atomic_init_u32(&backendManagementShmemData->activeClientBackendCounter, 0);
|
||||
pg_atomic_init_u32(&backendManagementShmemData->externalClientBackendCounter, 0);
|
||||
|
||||
/*
|
||||
* We need to init per backend's spinlock before any backend
|
||||
|
@ -1166,36 +1164,37 @@ GetMyProcLocalTransactionId(void)
|
|||
|
||||
|
||||
/*
|
||||
* GetAllActiveClientBackendCount returns activeClientBackendCounter in
|
||||
* GetExternalClientBackendCount returns externalClientBackendCounter in
|
||||
* the shared memory.
|
||||
*/
|
||||
int
|
||||
GetAllActiveClientBackendCount(void)
|
||||
GetExternalClientBackendCount(void)
|
||||
{
|
||||
uint32 activeBackendCount =
|
||||
pg_atomic_read_u32(&backendManagementShmemData->activeClientBackendCounter);
|
||||
pg_atomic_read_u32(&backendManagementShmemData->externalClientBackendCounter);
|
||||
|
||||
return activeBackendCount;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IncrementClientBackendCounter increments activeClientBackendCounter in
|
||||
* IncrementExternalClientBackendCounter increments externalClientBackendCounter in
|
||||
* the shared memory by one.
|
||||
*/
|
||||
void
|
||||
IncrementClientBackendCounter(void)
|
||||
uint32
|
||||
IncrementExternalClientBackendCounter(void)
|
||||
{
|
||||
pg_atomic_add_fetch_u32(&backendManagementShmemData->activeClientBackendCounter, 1);
|
||||
return pg_atomic_add_fetch_u32(
|
||||
&backendManagementShmemData->externalClientBackendCounter, 1);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DecrementClientBackendCounter decrements activeClientBackendCounter in
|
||||
* DecrementExternalClientBackendCounter decrements externalClientBackendCounter in
|
||||
* the shared memory by one.
|
||||
*/
|
||||
void
|
||||
DecrementClientBackendCounter(void)
|
||||
DecrementExternalClientBackendCounter(void)
|
||||
{
|
||||
pg_atomic_sub_fetch_u32(&backendManagementShmemData->activeClientBackendCounter, 1);
|
||||
pg_atomic_sub_fetch_u32(&backendManagementShmemData->externalClientBackendCounter, 1);
|
||||
}
|
||||
|
|
|
@ -66,9 +66,9 @@ extern bool MyBackendGotCancelledDueToDeadlock(bool clearState);
|
|||
extern bool MyBackendIsInDisributedTransaction(void);
|
||||
extern List * ActiveDistributedTransactionNumbers(void);
|
||||
extern LocalTransactionId GetMyProcLocalTransactionId(void);
|
||||
extern int GetAllActiveClientBackendCount(void);
|
||||
extern void IncrementClientBackendCounter(void);
|
||||
extern void DecrementClientBackendCounter(void);
|
||||
extern int GetExternalClientBackendCount(void);
|
||||
extern uint32 IncrementExternalClientBackendCounter(void);
|
||||
extern void DecrementExternalClientBackendCounter(void);
|
||||
|
||||
extern bool ExecuteRemoteQueryOrCommand(char *nodeName, uint32 nodePort,
|
||||
char *queryString, StringInfo queryResultString,
|
||||
|
|
|
@ -14,15 +14,18 @@
|
|||
#define ADJUST_POOLSIZE_AUTOMATICALLY 0
|
||||
#define DISABLE_CONNECTION_THROTTLING -1
|
||||
#define DISABLE_REMOTE_CONNECTIONS_FOR_LOCAL_QUERIES -1
|
||||
#define ALLOW_ALL_EXTERNAL_CONNECTIONS -1
|
||||
|
||||
|
||||
extern int MaxSharedPoolSize;
|
||||
extern int LocalSharedPoolSize;
|
||||
extern int MaxClientConnections;
|
||||
|
||||
|
||||
extern void InitializeSharedConnectionStats(void);
|
||||
extern void WaitForSharedConnection(void);
|
||||
extern void WakeupWaiterBackendsForSharedConnection(void);
|
||||
extern int GetMaxClientConnections(void);
|
||||
extern int GetMaxSharedPoolSize(void);
|
||||
extern int GetLocalSharedPoolSize(void);
|
||||
extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port);
|
||||
|
|
|
@ -0,0 +1,55 @@
|
|||
Parsed test spec with 3 sessions
|
||||
|
||||
starting permutation: s1-grant s1-connect s2-connect s2-connect-superuser s3-select
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
(localhost,57637,t,t)
|
||||
(localhost,57638,t,t)
|
||||
(2 rows)
|
||||
|
||||
step s1-grant:
|
||||
SELECT result FROM run_command_on_placements('my_table', 'GRANT SELECT ON TABLE %s TO my_user');
|
||||
|
||||
result
|
||||
---------------------------------------------------------------------
|
||||
GRANT
|
||||
GRANT
|
||||
GRANT
|
||||
GRANT
|
||||
(4 rows)
|
||||
|
||||
step s1-connect:
|
||||
SELECT make_external_connection_to_node('localhost', 57637, 'my_user', current_database());
|
||||
|
||||
make_external_connection_to_node
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s2-connect:
|
||||
SELECT make_external_connection_to_node('localhost', 57637, 'my_user', current_database());
|
||||
|
||||
ERROR: connection failed
|
||||
step s2-connect-superuser:
|
||||
SELECT make_external_connection_to_node('localhost', 57637, 'postgres', current_database());
|
||||
|
||||
make_external_connection_to_node
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s3-select:
|
||||
SET ROLE my_user;
|
||||
SELECT count(*) FROM my_table;
|
||||
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
(localhost,57637,t,t)
|
||||
(localhost,57638,t,t)
|
||||
(2 rows)
|
||||
|
|
@ -1723,8 +1723,7 @@ SELECT pg_sleep(0.1);
|
|||
|
||||
-- since max_cached_conns_per_worker == 0 at this point, the
|
||||
-- backend(s) that execute on the shards will be terminated
|
||||
-- so show that there is only a single client backend,
|
||||
-- which is actually the backend that executes here
|
||||
-- so show that there no internal backends
|
||||
SET search_path TO single_node;
|
||||
SELECT count(*) from should_commit;
|
||||
count
|
||||
|
@ -1732,7 +1731,13 @@ SELECT count(*) from should_commit;
|
|||
0
|
||||
(1 row)
|
||||
|
||||
SELECT pg_catalog.get_all_active_client_backend_count();
|
||||
SELECT count(*) FROM pg_stat_activity WHERE application_name LIKE 'citus_internal%';
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT get_all_active_client_backend_count();
|
||||
get_all_active_client_backend_count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
|
@ -1757,10 +1762,17 @@ BEGIN;
|
|||
(1 row)
|
||||
|
||||
-- now, we should have additional 32 connections
|
||||
SELECT pg_catalog.get_all_active_client_backend_count();
|
||||
SELECT count(*) FROM pg_stat_activity WHERE application_name LIKE 'citus_internal%';
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
32
|
||||
(1 row)
|
||||
|
||||
-- single external connection
|
||||
SELECT get_all_active_client_backend_count();
|
||||
get_all_active_client_backend_count
|
||||
---------------------------------------------------------------------
|
||||
33
|
||||
1
|
||||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
|
|
|
@ -62,6 +62,7 @@ test: isolation_validate_vs_insert
|
|||
test: isolation_insert_select_conflict
|
||||
test: shared_connection_waits
|
||||
test: isolation_cancellation
|
||||
test: isolation_max_client_connections
|
||||
test: isolation_undistribute_table
|
||||
test: isolation_fix_partition_shard_index_names
|
||||
test: isolation_global_pid
|
||||
|
|
|
@ -0,0 +1,66 @@
|
|||
setup
|
||||
{
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
|
||||
CREATE USER my_user;
|
||||
SELECT run_command_on_workers('CREATE USER my_user');
|
||||
|
||||
CREATE TABLE my_table (test_id integer NOT NULL, data text);
|
||||
SELECT create_distributed_table('my_table', 'test_id');
|
||||
|
||||
GRANT USAGE ON SCHEMA public TO my_user;
|
||||
GRANT SELECT ON TABLE my_table TO my_user;
|
||||
|
||||
CREATE FUNCTION make_external_connection_to_node(text,int,text,text)
|
||||
RETURNS void
|
||||
AS 'citus'
|
||||
LANGUAGE C STRICT;
|
||||
|
||||
SELECT run_command_on_workers('ALTER SYSTEM SET citus.max_client_connections TO 1');
|
||||
SELECT run_command_on_workers('SELECT pg_reload_conf()');
|
||||
}
|
||||
|
||||
teardown
|
||||
{
|
||||
SELECT run_command_on_workers('ALTER SYSTEM RESET citus.max_client_connections');
|
||||
SELECT run_command_on_workers('SELECT pg_reload_conf()');
|
||||
}
|
||||
|
||||
session "s1"
|
||||
|
||||
// Setup runs as a transaction, so run_command_on_placements must be separate
|
||||
step "s1-grant"
|
||||
{
|
||||
SELECT result FROM run_command_on_placements('my_table', 'GRANT SELECT ON TABLE %s TO my_user');
|
||||
}
|
||||
|
||||
// Open one external connection as non-superuser, is allowed
|
||||
step "s1-connect"
|
||||
{
|
||||
SELECT make_external_connection_to_node('localhost', 57637, 'my_user', current_database());
|
||||
}
|
||||
|
||||
session "s2"
|
||||
|
||||
// Open another external connection as non-superuser, not allowed
|
||||
step "s2-connect"
|
||||
{
|
||||
SELECT make_external_connection_to_node('localhost', 57637, 'my_user', current_database());
|
||||
}
|
||||
|
||||
// Open another external connection as superuser, allowed
|
||||
step "s2-connect-superuser"
|
||||
{
|
||||
SELECT make_external_connection_to_node('localhost', 57637, 'postgres', current_database());
|
||||
}
|
||||
|
||||
session "s3"
|
||||
|
||||
// Open internal connections as non-superuser, allowed
|
||||
step "s3-select"
|
||||
{
|
||||
SET ROLE my_user;
|
||||
SELECT count(*) FROM my_table;
|
||||
}
|
||||
|
||||
permutation "s1-grant" "s1-connect" "s2-connect" "s2-connect-superuser" "s3-select"
|
|
@ -915,11 +915,11 @@ SELECT table_name, citus_table_type, distribution_column, shard_count FROM publi
|
|||
SELECT pg_sleep(0.1);
|
||||
-- since max_cached_conns_per_worker == 0 at this point, the
|
||||
-- backend(s) that execute on the shards will be terminated
|
||||
-- so show that there is only a single client backend,
|
||||
-- which is actually the backend that executes here
|
||||
-- so show that there no internal backends
|
||||
SET search_path TO single_node;
|
||||
SELECT count(*) from should_commit;
|
||||
SELECT pg_catalog.get_all_active_client_backend_count();
|
||||
SELECT count(*) FROM pg_stat_activity WHERE application_name LIKE 'citus_internal%';
|
||||
SELECT get_all_active_client_backend_count();
|
||||
BEGIN;
|
||||
SET LOCAL citus.shard_count TO 32;
|
||||
SET LOCAL citus.force_max_query_parallelization TO ON;
|
||||
|
@ -931,7 +931,10 @@ BEGIN;
|
|||
SELECT count(*) FROM test;
|
||||
|
||||
-- now, we should have additional 32 connections
|
||||
SELECT pg_catalog.get_all_active_client_backend_count();
|
||||
SELECT count(*) FROM pg_stat_activity WHERE application_name LIKE 'citus_internal%';
|
||||
|
||||
-- single external connection
|
||||
SELECT get_all_active_client_backend_count();
|
||||
ROLLBACK;
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue