Merge pull request #5698 from citusdata/marcocitus/internal-reserved-connections

pull/5742/head
Marco Slot 2022-03-03 10:33:06 +01:00 committed by GitHub
commit f7d3405148
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 353 additions and 43 deletions

View File

@ -106,6 +106,9 @@ int MaxSharedPoolSize = 0;
*/ */
int LocalSharedPoolSize = 0; int LocalSharedPoolSize = 0;
/* number of connections reserved for Citus */
int MaxClientConnections = ALLOW_ALL_EXTERNAL_CONNECTIONS;
/* the following two structs are used for accessing shared memory */ /* the following two structs are used for accessing shared memory */
static HTAB *SharedConnStatsHash = NULL; static HTAB *SharedConnStatsHash = NULL;
@ -192,6 +195,25 @@ StoreAllRemoteConnectionStats(Tuplestorestate *tupleStore, TupleDesc tupleDescri
} }
/*
* GetMaxClientConnections returns the value of citus.max_client_connections,
* or max_connections when it is -1 or when connecting as superuser.
*
* The latter is done because citus.max_client_connections does not apply to
* superuser.
*/
int
GetMaxClientConnections(void)
{
if (MaxClientConnections == ALLOW_ALL_EXTERNAL_CONNECTIONS || superuser())
{
return MaxConnections;
}
return MaxClientConnections;
}
/* /*
* GetMaxSharedPoolSize is a wrapper around MaxSharedPoolSize which is controlled * GetMaxSharedPoolSize is a wrapper around MaxSharedPoolSize which is controlled
* via a GUC. * via a GUC.
@ -204,7 +226,7 @@ GetMaxSharedPoolSize(void)
{ {
if (MaxSharedPoolSize == ADJUST_POOLSIZE_AUTOMATICALLY) if (MaxSharedPoolSize == ADJUST_POOLSIZE_AUTOMATICALLY)
{ {
return MaxConnections; return GetMaxClientConnections();
} }
return MaxSharedPoolSize; return MaxSharedPoolSize;
@ -223,7 +245,7 @@ GetLocalSharedPoolSize(void)
{ {
if (LocalSharedPoolSize == ADJUST_POOLSIZE_AUTOMATICALLY) if (LocalSharedPoolSize == ADJUST_POOLSIZE_AUTOMATICALLY)
{ {
return MaxConnections * 0.5; return GetMaxClientConnections() * 0.5;
} }
return LocalSharedPoolSize; return LocalSharedPoolSize;
@ -318,7 +340,7 @@ TryToIncrementSharedConnectionCounter(const char *hostname, int port)
return false; return false;
} }
activeBackendCount = GetAllActiveClientBackendCount(); activeBackendCount = GetExternalClientBackendCount();
} }
LockConnectionSharedMemory(LW_EXCLUSIVE); LockConnectionSharedMemory(LW_EXCLUSIVE);

View File

@ -23,6 +23,7 @@
#include "safe_lib.h" #include "safe_lib.h"
#include "catalog/pg_authid.h"
#include "citus_version.h" #include "citus_version.h"
#include "commands/explain.h" #include "commands/explain.h"
#include "common/string.h" #include "common/string.h"
@ -84,12 +85,14 @@
#include "libpq/auth.h" #include "libpq/auth.h"
#include "port/atomics.h" #include "port/atomics.h"
#include "postmaster/postmaster.h" #include "postmaster/postmaster.h"
#include "replication/walsender.h"
#include "storage/ipc.h" #include "storage/ipc.h"
#include "optimizer/planner.h" #include "optimizer/planner.h"
#include "optimizer/paths.h" #include "optimizer/paths.h"
#include "tcop/tcopprot.h" #include "tcop/tcopprot.h"
#include "utils/guc.h" #include "utils/guc.h"
#include "utils/guc_tables.h" #include "utils/guc_tables.h"
#include "utils/syscache.h"
#include "utils/varlena.h" #include "utils/varlena.h"
#include "columnar/mod.h" #include "columnar/mod.h"
@ -113,9 +116,9 @@ static void DoInitialCleanup(void);
static void ResizeStackToMaximumDepth(void); static void ResizeStackToMaximumDepth(void);
static void multi_log_hook(ErrorData *edata); static void multi_log_hook(ErrorData *edata);
static void RegisterConnectionCleanup(void); static void RegisterConnectionCleanup(void);
static void RegisterClientBackendCounterDecrement(void); static void RegisterExternalClientBackendCounterDecrement(void);
static void CitusCleanupConnectionsAtExit(int code, Datum arg); static void CitusCleanupConnectionsAtExit(int code, Datum arg);
static void DecrementClientBackendCounterAtExit(int code, Datum arg); static void DecrementExternalClientBackendCounterAtExit(int code, Datum arg);
static void CreateRequiredDirectories(void); static void CreateRequiredDirectories(void);
static void RegisterCitusConfigVariables(void); static void RegisterCitusConfigVariables(void);
static void OverridePostgresConfigAssignHooks(void); static void OverridePostgresConfigAssignHooks(void);
@ -135,6 +138,7 @@ static const char * LocalPoolSizeGucShowHook(void);
static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource
source); source);
static void CitusAuthHook(Port *port, int status); static void CitusAuthHook(Port *port, int status);
static bool IsSuperuser(char *userName);
static ClientAuthentication_hook_type original_client_auth_hook = NULL; static ClientAuthentication_hook_type original_client_auth_hook = NULL;
@ -488,16 +492,16 @@ RegisterConnectionCleanup(void)
/* /*
* RegisterClientBackendCounterDecrement is called when the backend terminates. * RegisterExternalClientBackendCounterDecrement is called when the backend terminates.
* For all client backends, we register a callback that will undo * For all client backends, we register a callback that will undo
*/ */
static void static void
RegisterClientBackendCounterDecrement(void) RegisterExternalClientBackendCounterDecrement(void)
{ {
static bool registeredCleanup = false; static bool registeredCleanup = false;
if (registeredCleanup == false) if (registeredCleanup == false)
{ {
before_shmem_exit(DecrementClientBackendCounterAtExit, 0); before_shmem_exit(DecrementExternalClientBackendCounterAtExit, 0);
registeredCleanup = true; registeredCleanup = true;
} }
@ -527,13 +531,13 @@ CitusCleanupConnectionsAtExit(int code, Datum arg)
/* /*
* DecrementClientBackendCounterAtExit is called before_shmem_exit() of the * DecrementExternalClientBackendCounterAtExit is called before_shmem_exit() of the
* backend for the purposes decrementing * backend for the purposes decrementing
*/ */
static void static void
DecrementClientBackendCounterAtExit(int code, Datum arg) DecrementExternalClientBackendCounterAtExit(int code, Datum arg)
{ {
DecrementClientBackendCounter(); DecrementExternalClientBackendCounter();
} }
@ -1340,6 +1344,23 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD, GUC_STANDARD,
NULL, NULL, NULL); NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.max_client_connections",
gettext_noop("Sets the maximum number of connections regular clients can make"),
gettext_noop("To ensure that a Citus cluster has a sufficient number of "
"connection slots to serve queries internally, it can be "
"useful to reserve connection slots for Citus internal "
"connections. When max_client_connections is set to a value "
"below max_connections, the remaining connections are reserved "
"for connections between Citus nodes. This does not affect "
"superuser_reserved_connections. If set to -1, no connections "
"are reserved."),
&MaxClientConnections,
-1, -1, MaxConnections,
PGC_SUSET,
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomIntVariable( DefineCustomIntVariable(
"citus.max_intermediate_result_size", "citus.max_intermediate_result_size",
gettext_noop("Sets the maximum size of the intermediate results in KB for " gettext_noop("Sets the maximum size of the intermediate results in KB for "
@ -2171,12 +2192,73 @@ StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource source)
static void static void
CitusAuthHook(Port *port, int status) CitusAuthHook(Port *port, int status)
{ {
uint64 gpid = ExtractGlobalPID(port->application_name);
/* external connections to not have a GPID immediately */
if (gpid == INVALID_CITUS_INTERNAL_BACKEND_GPID)
{
/*
* We raise the shared connection counter pre-emptively. As a result, we may
* have scenarios in which a few simultaneous connection attempts prevent
* each other from succeeding, but we avoid scenarios where we oversubscribe
* the system.
*
* By also calling RegisterExternalClientBackendCounterDecrement here, we
* immediately lower the counter if we throw a FATAL error below. The client
* connection counter may temporarily exceed maxClientConnections in between.
*/
RegisterExternalClientBackendCounterDecrement();
uint32 externalClientCount = IncrementExternalClientBackendCounter();
/*
* Limit non-superuser client connections if citus.max_client_connections
* is set.
*/
if (MaxClientConnections >= 0 &&
!IsSuperuser(port->user_name) &&
externalClientCount > MaxClientConnections)
{
ereport(FATAL, (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
errmsg("remaining connection slots are reserved for "
"non-replication superuser connections"),
errdetail("the server is configured to accept up to %d "
"regular client connections",
MaxClientConnections)));
}
}
/* let other authentication hooks to kick in first */ /* let other authentication hooks to kick in first */
if (original_client_auth_hook) if (original_client_auth_hook)
{ {
original_client_auth_hook(port, status); original_client_auth_hook(port, status);
} }
}
RegisterClientBackendCounterDecrement();
IncrementClientBackendCounter();
/*
* IsSuperuser returns whether the role with the given name is superuser.
*/
static bool
IsSuperuser(char *roleName)
{
if (roleName == NULL)
{
return false;
}
HeapTuple roleTuple = SearchSysCache1(AUTHNAME, CStringGetDatum(roleName));
if (!HeapTupleIsValid(roleTuple))
{
ereport(FATAL,
(errcode(ERRCODE_INVALID_AUTHORIZATION_SPECIFICATION),
errmsg("role \"%s\" does not exist", roleName)));
}
Form_pg_authid rform = (Form_pg_authid) GETSTRUCT(roleTuple);
bool isSuperuser = rform->rolsuper;
ReleaseSysCache(roleTuple);
return isSuperuser;
} }

View File

@ -27,5 +27,5 @@ PG_FUNCTION_INFO_V1(get_all_active_client_backend_count);
Datum Datum
get_all_active_client_backend_count(PG_FUNCTION_ARGS) get_all_active_client_backend_count(PG_FUNCTION_ARGS)
{ {
PG_RETURN_UINT32(GetAllActiveClientBackendCount()); PG_RETURN_UINT32(GetExternalClientBackendCount());
} }

View File

@ -0,0 +1,67 @@
/*-------------------------------------------------------------------------
*
* test/src/make_external_connection.c
*
* This file contains UDF to connect to a node without using the Citus
* internal application_name.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include "libpq-fe.h"
#include "access/xact.h"
#include "distributed/connection_management.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/function_utils.h"
#include "distributed/intermediate_result_pruning.h"
#include "distributed/lock_graph.h"
#include "distributed/metadata_cache.h"
#include "distributed/remote_commands.h"
#include "distributed/run_from_same_connection.h"
#include "distributed/version_compat.h"
#include "executor/spi.h"
#include "lib/stringinfo.h"
#include "postmaster/postmaster.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
PG_FUNCTION_INFO_V1(make_external_connection_to_node);
/*
* make_external_connection_to_node opens a conneciton to a node
* and keeps it until the end of the session.
*/
Datum
make_external_connection_to_node(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
char *nodeName = text_to_cstring(PG_GETARG_TEXT_P(0));
uint32 nodePort = PG_GETARG_UINT32(1);
char *userName = text_to_cstring(PG_GETARG_TEXT_P(2));
char *databaseName = text_to_cstring(PG_GETARG_TEXT_P(3));
StringInfo connectionString = makeStringInfo();
appendStringInfo(connectionString,
"host=%s port=%d user=%s dbname=%s",
nodeName, nodePort, userName, databaseName);
PGconn *pgConn = PQconnectdb(connectionString->data);
if (PQstatus(pgConn) != CONNECTION_OK)
{
PQfinish(pgConn);
ereport(ERROR, (errmsg("connection failed")));
}
PG_RETURN_VOID();
}

View File

@ -68,14 +68,12 @@ typedef struct BackendManagementShmemData
pg_atomic_uint64 nextTransactionNumber; pg_atomic_uint64 nextTransactionNumber;
/* /*
* Total number of client backends that are authenticated. * Total number of external client backends that are authenticated.
* We only care about activeClientBackendCounter when adaptive
* connection management is enabled, otherwise always zero.
* *
* Note that the counter does not consider any background workers * Note that the counter does not consider any background workers
* or such, it only counts client_backends. * or such, and also exludes internal connections between nodes.
*/ */
pg_atomic_uint32 activeClientBackendCounter; pg_atomic_uint32 externalClientBackendCounter;
BackendData backends[FLEXIBLE_ARRAY_MEMBER]; BackendData backends[FLEXIBLE_ARRAY_MEMBER];
} BackendManagementShmemData; } BackendManagementShmemData;
@ -548,7 +546,7 @@ BackendManagementShmemInit(void)
pg_atomic_init_u64(&backendManagementShmemData->nextTransactionNumber, 1); pg_atomic_init_u64(&backendManagementShmemData->nextTransactionNumber, 1);
/* there are no active backends yet, so start with zero */ /* there are no active backends yet, so start with zero */
pg_atomic_init_u32(&backendManagementShmemData->activeClientBackendCounter, 0); pg_atomic_init_u32(&backendManagementShmemData->externalClientBackendCounter, 0);
/* /*
* We need to init per backend's spinlock before any backend * We need to init per backend's spinlock before any backend
@ -1166,36 +1164,37 @@ GetMyProcLocalTransactionId(void)
/* /*
* GetAllActiveClientBackendCount returns activeClientBackendCounter in * GetExternalClientBackendCount returns externalClientBackendCounter in
* the shared memory. * the shared memory.
*/ */
int int
GetAllActiveClientBackendCount(void) GetExternalClientBackendCount(void)
{ {
uint32 activeBackendCount = uint32 activeBackendCount =
pg_atomic_read_u32(&backendManagementShmemData->activeClientBackendCounter); pg_atomic_read_u32(&backendManagementShmemData->externalClientBackendCounter);
return activeBackendCount; return activeBackendCount;
} }
/* /*
* IncrementClientBackendCounter increments activeClientBackendCounter in * IncrementExternalClientBackendCounter increments externalClientBackendCounter in
* the shared memory by one. * the shared memory by one.
*/ */
void uint32
IncrementClientBackendCounter(void) IncrementExternalClientBackendCounter(void)
{ {
pg_atomic_add_fetch_u32(&backendManagementShmemData->activeClientBackendCounter, 1); return pg_atomic_add_fetch_u32(
&backendManagementShmemData->externalClientBackendCounter, 1);
} }
/* /*
* DecrementClientBackendCounter decrements activeClientBackendCounter in * DecrementExternalClientBackendCounter decrements externalClientBackendCounter in
* the shared memory by one. * the shared memory by one.
*/ */
void void
DecrementClientBackendCounter(void) DecrementExternalClientBackendCounter(void)
{ {
pg_atomic_sub_fetch_u32(&backendManagementShmemData->activeClientBackendCounter, 1); pg_atomic_sub_fetch_u32(&backendManagementShmemData->externalClientBackendCounter, 1);
} }

View File

@ -66,9 +66,9 @@ extern bool MyBackendGotCancelledDueToDeadlock(bool clearState);
extern bool MyBackendIsInDisributedTransaction(void); extern bool MyBackendIsInDisributedTransaction(void);
extern List * ActiveDistributedTransactionNumbers(void); extern List * ActiveDistributedTransactionNumbers(void);
extern LocalTransactionId GetMyProcLocalTransactionId(void); extern LocalTransactionId GetMyProcLocalTransactionId(void);
extern int GetAllActiveClientBackendCount(void); extern int GetExternalClientBackendCount(void);
extern void IncrementClientBackendCounter(void); extern uint32 IncrementExternalClientBackendCounter(void);
extern void DecrementClientBackendCounter(void); extern void DecrementExternalClientBackendCounter(void);
extern bool ExecuteRemoteQueryOrCommand(char *nodeName, uint32 nodePort, extern bool ExecuteRemoteQueryOrCommand(char *nodeName, uint32 nodePort,
char *queryString, StringInfo queryResultString, char *queryString, StringInfo queryResultString,

View File

@ -14,15 +14,18 @@
#define ADJUST_POOLSIZE_AUTOMATICALLY 0 #define ADJUST_POOLSIZE_AUTOMATICALLY 0
#define DISABLE_CONNECTION_THROTTLING -1 #define DISABLE_CONNECTION_THROTTLING -1
#define DISABLE_REMOTE_CONNECTIONS_FOR_LOCAL_QUERIES -1 #define DISABLE_REMOTE_CONNECTIONS_FOR_LOCAL_QUERIES -1
#define ALLOW_ALL_EXTERNAL_CONNECTIONS -1
extern int MaxSharedPoolSize; extern int MaxSharedPoolSize;
extern int LocalSharedPoolSize; extern int LocalSharedPoolSize;
extern int MaxClientConnections;
extern void InitializeSharedConnectionStats(void); extern void InitializeSharedConnectionStats(void);
extern void WaitForSharedConnection(void); extern void WaitForSharedConnection(void);
extern void WakeupWaiterBackendsForSharedConnection(void); extern void WakeupWaiterBackendsForSharedConnection(void);
extern int GetMaxClientConnections(void);
extern int GetMaxSharedPoolSize(void); extern int GetMaxSharedPoolSize(void);
extern int GetLocalSharedPoolSize(void); extern int GetLocalSharedPoolSize(void);
extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port); extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port);

View File

@ -0,0 +1,55 @@
Parsed test spec with 3 sessions
starting permutation: s1-grant s1-connect s2-connect s2-connect-superuser s3-select
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,t)
(localhost,57638,t,t)
(2 rows)
step s1-grant:
SELECT result FROM run_command_on_placements('my_table', 'GRANT SELECT ON TABLE %s TO my_user');
result
---------------------------------------------------------------------
GRANT
GRANT
GRANT
GRANT
(4 rows)
step s1-connect:
SELECT make_external_connection_to_node('localhost', 57637, 'my_user', current_database());
make_external_connection_to_node
---------------------------------------------------------------------
(1 row)
step s2-connect:
SELECT make_external_connection_to_node('localhost', 57637, 'my_user', current_database());
ERROR: connection failed
step s2-connect-superuser:
SELECT make_external_connection_to_node('localhost', 57637, 'postgres', current_database());
make_external_connection_to_node
---------------------------------------------------------------------
(1 row)
step s3-select:
SET ROLE my_user;
SELECT count(*) FROM my_table;
count
---------------------------------------------------------------------
0
(1 row)
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,t)
(localhost,57638,t,t)
(2 rows)

View File

@ -1723,8 +1723,7 @@ SELECT pg_sleep(0.1);
-- since max_cached_conns_per_worker == 0 at this point, the -- since max_cached_conns_per_worker == 0 at this point, the
-- backend(s) that execute on the shards will be terminated -- backend(s) that execute on the shards will be terminated
-- so show that there is only a single client backend, -- so show that there no internal backends
-- which is actually the backend that executes here
SET search_path TO single_node; SET search_path TO single_node;
SELECT count(*) from should_commit; SELECT count(*) from should_commit;
count count
@ -1732,7 +1731,13 @@ SELECT count(*) from should_commit;
0 0
(1 row) (1 row)
SELECT pg_catalog.get_all_active_client_backend_count(); SELECT count(*) FROM pg_stat_activity WHERE application_name LIKE 'citus_internal%';
count
---------------------------------------------------------------------
0
(1 row)
SELECT get_all_active_client_backend_count();
get_all_active_client_backend_count get_all_active_client_backend_count
--------------------------------------------------------------------- ---------------------------------------------------------------------
1 1
@ -1757,10 +1762,17 @@ BEGIN;
(1 row) (1 row)
-- now, we should have additional 32 connections -- now, we should have additional 32 connections
SELECT pg_catalog.get_all_active_client_backend_count(); SELECT count(*) FROM pg_stat_activity WHERE application_name LIKE 'citus_internal%';
count
---------------------------------------------------------------------
32
(1 row)
-- single external connection
SELECT get_all_active_client_backend_count();
get_all_active_client_backend_count get_all_active_client_backend_count
--------------------------------------------------------------------- ---------------------------------------------------------------------
33 1
(1 row) (1 row)
ROLLBACK; ROLLBACK;

View File

@ -62,6 +62,7 @@ test: isolation_validate_vs_insert
test: isolation_insert_select_conflict test: isolation_insert_select_conflict
test: shared_connection_waits test: shared_connection_waits
test: isolation_cancellation test: isolation_cancellation
test: isolation_max_client_connections
test: isolation_undistribute_table test: isolation_undistribute_table
test: isolation_fix_partition_shard_index_names test: isolation_fix_partition_shard_index_names
test: isolation_global_pid test: isolation_global_pid

View File

@ -0,0 +1,66 @@
setup
{
SET citus.shard_replication_factor TO 1;
CREATE USER my_user;
SELECT run_command_on_workers('CREATE USER my_user');
CREATE TABLE my_table (test_id integer NOT NULL, data text);
SELECT create_distributed_table('my_table', 'test_id');
GRANT USAGE ON SCHEMA public TO my_user;
GRANT SELECT ON TABLE my_table TO my_user;
CREATE FUNCTION make_external_connection_to_node(text,int,text,text)
RETURNS void
AS 'citus'
LANGUAGE C STRICT;
SELECT run_command_on_workers('ALTER SYSTEM SET citus.max_client_connections TO 1');
SELECT run_command_on_workers('SELECT pg_reload_conf()');
}
teardown
{
SELECT run_command_on_workers('ALTER SYSTEM RESET citus.max_client_connections');
SELECT run_command_on_workers('SELECT pg_reload_conf()');
}
session "s1"
// Setup runs as a transaction, so run_command_on_placements must be separate
step "s1-grant"
{
SELECT result FROM run_command_on_placements('my_table', 'GRANT SELECT ON TABLE %s TO my_user');
}
// Open one external connection as non-superuser, is allowed
step "s1-connect"
{
SELECT make_external_connection_to_node('localhost', 57637, 'my_user', current_database());
}
session "s2"
// Open another external connection as non-superuser, not allowed
step "s2-connect"
{
SELECT make_external_connection_to_node('localhost', 57637, 'my_user', current_database());
}
// Open another external connection as superuser, allowed
step "s2-connect-superuser"
{
SELECT make_external_connection_to_node('localhost', 57637, 'postgres', current_database());
}
session "s3"
// Open internal connections as non-superuser, allowed
step "s3-select"
{
SET ROLE my_user;
SELECT count(*) FROM my_table;
}
permutation "s1-grant" "s1-connect" "s2-connect" "s2-connect-superuser" "s3-select"

View File

@ -915,11 +915,11 @@ SELECT table_name, citus_table_type, distribution_column, shard_count FROM publi
SELECT pg_sleep(0.1); SELECT pg_sleep(0.1);
-- since max_cached_conns_per_worker == 0 at this point, the -- since max_cached_conns_per_worker == 0 at this point, the
-- backend(s) that execute on the shards will be terminated -- backend(s) that execute on the shards will be terminated
-- so show that there is only a single client backend, -- so show that there no internal backends
-- which is actually the backend that executes here
SET search_path TO single_node; SET search_path TO single_node;
SELECT count(*) from should_commit; SELECT count(*) from should_commit;
SELECT pg_catalog.get_all_active_client_backend_count(); SELECT count(*) FROM pg_stat_activity WHERE application_name LIKE 'citus_internal%';
SELECT get_all_active_client_backend_count();
BEGIN; BEGIN;
SET LOCAL citus.shard_count TO 32; SET LOCAL citus.shard_count TO 32;
SET LOCAL citus.force_max_query_parallelization TO ON; SET LOCAL citus.force_max_query_parallelization TO ON;
@ -931,7 +931,10 @@ BEGIN;
SELECT count(*) FROM test; SELECT count(*) FROM test;
-- now, we should have additional 32 connections -- now, we should have additional 32 connections
SELECT pg_catalog.get_all_active_client_backend_count(); SELECT count(*) FROM pg_stat_activity WHERE application_name LIKE 'citus_internal%';
-- single external connection
SELECT get_all_active_client_backend_count();
ROLLBACK; ROLLBACK;