diff --git a/src/backend/distributed/commands/database.c b/src/backend/distributed/commands/database.c index 626234434..4886ed62e 100644 --- a/src/backend/distributed/commands/database.c +++ b/src/backend/distributed/commands/database.c @@ -34,6 +34,8 @@ #include "distributed/metadata/distobject.h" #include "distributed/database/database_sharding.h" #include "distributed/deparse_shard_query.h" +#include "distributed/listutils.h" +#include "distributed/adaptive_executor.h" diff --git a/src/backend/distributed/database/database_lock.c b/src/backend/distributed/database/database_lock.c new file mode 100644 index 000000000..64757832f --- /dev/null +++ b/src/backend/distributed/database/database_lock.c @@ -0,0 +1,53 @@ +/*------------------------------------------------------------------------- + * + * database_lock.c + * Functions for locking a database. + * + * Copyright (c) Microsoft, Inc. + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" +#include "funcapi.h" +#include "fmgr.h" +#include "miscadmin.h" + +#include "catalog/pg_database.h" +#include "commands/dbcommands.h" +#include "distributed/metadata_cache.h" +#include "storage/lmgr.h" + + +static void CitusDatabaseLock(Oid databaseId); + + +PG_FUNCTION_INFO_V1(citus_database_lock_by_name); + + +/* + * citus_database_lock locks the given database in access exclusive mode + * to temporarily block new connections. + */ +Datum +citus_database_lock_by_name(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + Name databaseName = PG_GETARG_NAME(0); + bool missingOk = false; + Oid databaseId = get_database_oid(NameStr(*databaseName), missingOk); + + CitusDatabaseLock(databaseId); + + PG_RETURN_VOID(); +} + + +/* + * CitusDatabaseLock locks a database for new connections. + */ +static void +CitusDatabaseLock(Oid databaseId) +{ + LockSharedObject(DatabaseRelationId, databaseId, 0, ExclusiveLock); +} diff --git a/src/backend/distributed/database/database_sharding.c b/src/backend/distributed/database/database_sharding.c new file mode 100644 index 000000000..b6e844890 --- /dev/null +++ b/src/backend/distributed/database/database_sharding.c @@ -0,0 +1,544 @@ +/*------------------------------------------------------------------------- + * + * database_sharding.c + * + * This file contains module-level definitions. + * + * Copyright (c) 2023, Microsoft, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "fmgr.h" +#include "miscadmin.h" + +#include "citus_version.h" +#include "pg_version_compat.h" + +#include "access/genam.h" +#include "commands/dbcommands.h" +#include "distributed/connection_management.h" +#include "distributed/database/database_sharding.h" +#include "distributed/deparser.h" +#include "distributed/deparse_shard_query.h" +#include "distributed/listutils.h" +#include "distributed/metadata_sync.h" +#include "distributed/remote_commands.h" +#include "distributed/shared_library_init.h" +#include "distributed/worker_transaction.h" +#include "executor/spi.h" +#include "nodes/makefuncs.h" +#include "nodes/parsenodes.h" +#include "postmaster/postmaster.h" +#include "tcop/utility.h" +#include "utils/builtins.h" +#include "utils/fmgroids.h" + + +static void ExecuteCommandInControlDatabase(char *command); +static void AllowConnectionsOnlyOnNodeGroup(Oid databaseOid, Oid nodeGroupId); +static void InsertDatabaseShardAssignment(Oid databaseOid, int nodeGroupId); +static void InsertDatabaseShardAssignmentLocally(Oid databaseOid, int nodeGroupId); +static void InsertDatabaseShardAssignmentOnOtherNodes(Oid databaseOid, int nodeGroupId); +static void DeleteDatabaseShardByDatabaseId(Oid databaseOid); +static void DeleteDatabaseShardByDatabaseIdOnOtherNodes(Oid databaseOid); +static DatabaseShard * TupleToDatabaseShard(HeapTuple heapTuple, + TupleDesc tupleDescriptor); +static char * DeleteDatabaseShardByDatabaseIdCommand(Oid databaseOid); + + +PG_FUNCTION_INFO_V1(database_shard_assign); +PG_FUNCTION_INFO_V1(citus_internal_add_database_shard); +PG_FUNCTION_INFO_V1(citus_internal_delete_database_shard); + + +/* citus.enable_database_sharding setting */ +bool EnableDatabaseSharding = false; + +/* citus.database_sharding_pgbouncer_file setting */ +char *DatabaseShardingPgBouncerFile = ""; + + +/* + * PreProcessUtilityInDatabaseShard handles DDL commands that occur within a + * database shard and require global coordination: + * - CREATE/ALTER/DROP DATABASE + * - CREATE/ALTER/DROP ROLE/USER/GROUP + */ +void +PreProcessUtilityInDatabaseShard(Node *parseTree, const char *queryString, + ProcessUtilityContext context, + bool *runPreviousUtilityHook) +{ + if (!EnableDatabaseSharding || context != PROCESS_UTILITY_TOPLEVEL) + { + return; + } + + if (EnableCreateDatabasePropagation) + { + if (IsA(parseTree, CreatedbStmt)) + { + char *command = DeparseCreatedbStmt(parseTree); + ExecuteCommandInControlDatabase(command); + + /* command is fully delegated to control database */ + *runPreviousUtilityHook = false; + } + else if (IsA(parseTree, DropdbStmt)) + { + char *command = DeparseDropdbStmt(parseTree); + ExecuteCommandInControlDatabase(command); + + /* command is fully delegated to control database */ + *runPreviousUtilityHook = false; + } + } +} + + +/* + * PostProcessUtilityInDatabaseShard is currently a noop. + */ +void +PostProcessUtilityInDatabaseShard(Node *parseTree, const char *queryString, + ProcessUtilityContext context) +{ + if (!EnableDatabaseSharding || context != PROCESS_UTILITY_TOPLEVEL) + { + return; + } +} + + +/* + * ExecuteCommandInControlDatabase connects to localhost to execute a command + * in the main Citus database. + */ +static void +ExecuteCommandInControlDatabase(char *command) +{ + int connectionFlag = FORCE_NEW_CONNECTION; + + MultiConnection *connection = + GetNodeUserDatabaseConnection(connectionFlag, LocalHostName, PostPortNumber, + NULL, CitusMainDatabase); + + ExecuteCriticalRemoteCommand(connection, + "SET application_name TO 'citus_database_shard'"); + ExecuteCriticalRemoteCommand(connection, command); + CloseConnection(connection); +} + + +/* + * database_shard_assign assigns an existing database to a node. + */ +Datum +database_shard_assign(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + char *databaseName = text_to_cstring(PG_GETARG_TEXT_P(0)); + + bool missingOk = false; + Oid databaseOid = get_database_oid(databaseName, missingOk); + + if (!pg_database_ownercheck(databaseOid, GetUserId())) + { + ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("permission denied to assign database \"%s\" " + "to a shard", + databaseName))); + } + + if (GetDatabaseShardByOid(databaseOid) != NULL) + { + ereport(ERROR, (errmsg("database is already assigned to a shard"))); + } + + AssignDatabaseToShard(databaseOid); + + PG_RETURN_VOID(); +} + + +/* + * AssignDatabaseToShard finds a suitable node for the given + * database and assigns it. + */ +void +AssignDatabaseToShard(Oid databaseOid) +{ + int nodeGroupId = GetLocalGroupId(); + + List *workerNodes = TargetWorkerSetNodeList(ALL_SHARD_NODES, RowShareLock); + if (list_length(workerNodes) > 0) + { + /* TODO: actually look for available space */ + int workerNodeIndex = databaseOid % list_length(workerNodes); + WorkerNode *workerNode = list_nth(workerNodes, workerNodeIndex); + nodeGroupId = workerNode->groupId; + } + + InsertDatabaseShardAssignment(databaseOid, nodeGroupId); + AllowConnectionsOnlyOnNodeGroup(databaseOid, nodeGroupId); + + ReconfigurePgBouncersOnCommit = true; +} + + +/* + * AllowConnectionsOnlyOnNodeGroup sets the ALLOW_CONNECTIONS properties on + * the database to false, except on nodeGroupId. + */ +static void +AllowConnectionsOnlyOnNodeGroup(Oid databaseOid, Oid nodeGroupId) +{ + StringInfo command = makeStringInfo(); + char *databaseName = get_database_name(databaseOid); + + List *workerNodes = TargetWorkerSetNodeList(ALL_SHARD_NODES, RowShareLock); + WorkerNode *workerNode = NULL; + + foreach_ptr(workerNode, workerNodes) + { + resetStringInfo(command); + + if (workerNode->groupId == nodeGroupId) + { + appendStringInfo(command, "GRANT CONNECT ON DATABASE %s TO public", + quote_identifier(databaseName)); + } + else + { + appendStringInfo(command, "REVOKE CONNECT ON DATABASE %s FROM public", + quote_identifier(databaseName)); + } + + if (workerNode->groupId == GetLocalGroupId()) + { + ExecuteQueryViaSPI(command->data, SPI_OK_UTILITY); + } + else + { + SendCommandToWorker(workerNode->workerName, workerNode->workerPort, + command->data); + } + } +} + + +/* + * InsertDatabaseShardAssignment inserts a record into the local + * citus_catalog.database_sharding table. + */ +static void +InsertDatabaseShardAssignment(Oid databaseOid, int nodeGroupId) +{ + InsertDatabaseShardAssignmentLocally(databaseOid, nodeGroupId); + + if (EnableMetadataSync) + { + InsertDatabaseShardAssignmentOnOtherNodes(databaseOid, nodeGroupId); + } +} + + +/* + * InsertDatabaseShardAssignmentLocally inserts a record into the local + * citus_catalog.database_sharding table. + */ +static void +InsertDatabaseShardAssignmentLocally(Oid databaseOid, int nodeGroupId) +{ + Datum values[Natts_database_shard]; + bool isNulls[Natts_database_shard]; + + /* form new shard tuple */ + memset(values, 0, sizeof(values)); + memset(isNulls, false, sizeof(isNulls)); + + values[Anum_database_shard_database_id - 1] = ObjectIdGetDatum(databaseOid); + values[Anum_database_shard_node_group_id - 1] = Int32GetDatum(nodeGroupId); + values[Anum_database_shard_is_available - 1] = BoolGetDatum(true); + + /* open shard relation and insert new tuple */ + Relation databaseShardTable = table_open(DatabaseShardRelationId(), RowExclusiveLock); + + TupleDesc tupleDescriptor = RelationGetDescr(databaseShardTable); + HeapTuple heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls); + + CatalogTupleInsert(databaseShardTable, heapTuple); + + CommandCounterIncrement(); + table_close(databaseShardTable, NoLock); +} + + +/* + * InsertDatabaseShardAssignmentOnOtherNodes inserts a record into the + * citus_catalog.database_sharding table on other nodes. + */ +static void +InsertDatabaseShardAssignmentOnOtherNodes(Oid databaseOid, int nodeGroupId) +{ + char *insertCommand = InsertDatabaseShardAssignmentCommand(databaseOid, nodeGroupId); + SendCommandToWorkersWithMetadata(insertCommand); +} + + +/* + * UpdateDatabaseShard updates a database shard after it is moved to a new node. + */ +void +UpdateDatabaseShard(Oid databaseOid, int targetNodeGroupId) +{ + DeleteDatabaseShardByDatabaseId(databaseOid); + InsertDatabaseShardAssignment(databaseOid, targetNodeGroupId); + AllowConnectionsOnlyOnNodeGroup(databaseOid, targetNodeGroupId); +} + + +/* + * DeleteDatabaseShardByDatabaseId deletes a record from the + * citus_catalog.database_sharding table. + */ +static void +DeleteDatabaseShardByDatabaseId(Oid databaseOid) +{ + DeleteDatabaseShardByDatabaseIdLocally(databaseOid); + + if (EnableMetadataSync) + { + DeleteDatabaseShardByDatabaseIdOnOtherNodes(databaseOid); + } +} + + +/* + * DeleteDatabaseShardByDatabaseIdLocally deletes a database_shard record by database OID. + */ +void +DeleteDatabaseShardByDatabaseIdLocally(Oid databaseOid) +{ + Relation databaseShardTable = table_open(DatabaseShardRelationId(), + RowExclusiveLock); + + const int scanKeyCount = 1; + ScanKeyData scanKey[1]; + bool indexOK = true; + + ScanKeyInit(&scanKey[0], Anum_database_shard_database_id, + BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(databaseOid)); + + SysScanDesc scanDescriptor = systable_beginscan(databaseShardTable, + DatabaseShardPrimaryKeyIndexId(), + indexOK, + NULL, scanKeyCount, scanKey); + + HeapTuple heapTuple = systable_getnext(scanDescriptor); + if (heapTuple != NULL) + { + simple_heap_delete(databaseShardTable, &heapTuple->t_self); + } + + systable_endscan(scanDescriptor); + + CommandCounterIncrement(); + table_close(databaseShardTable, NoLock); +} + + +/* + * DeleteDatabaseShardByDatabaseIdOnOtherNodes deletes a record from the + * citus_catalog.database_sharding table on other nodes. + */ +static void +DeleteDatabaseShardByDatabaseIdOnOtherNodes(Oid databaseOid) +{ + char *deleteCommand = DeleteDatabaseShardByDatabaseIdCommand(databaseOid); + SendCommandToWorkersWithMetadata(deleteCommand); +} + + +/* + * ListDatabaseShards lists all database shards in citus_catalog.database_shard. + */ +List * +ListDatabaseShards(void) +{ + Relation databaseShardTable = table_open(DatabaseShardRelationId(), AccessShareLock); + TupleDesc tupleDescriptor = RelationGetDescr(databaseShardTable); + + List *dbShardList = NIL; + int scanKeyCount = 0; + bool indexOK = false; + + SysScanDesc scanDescriptor = systable_beginscan(databaseShardTable, InvalidOid, + indexOK, NULL, scanKeyCount, NULL); + + HeapTuple heapTuple = NULL; + while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor))) + { + DatabaseShard *dbShard = TupleToDatabaseShard(heapTuple, tupleDescriptor); + dbShardList = lappend(dbShardList, dbShard); + } + + systable_endscan(scanDescriptor); + table_close(databaseShardTable, NoLock); + + return dbShardList; +} + + +/* + * GetDatabaseShardByOid gets a database shard by database OID or + * NULL if no database shard could be found. + */ +DatabaseShard * +GetDatabaseShardByOid(Oid databaseOid) +{ + DatabaseShard *result = NULL; + + Relation databaseShardTable = table_open(DatabaseShardRelationId(), AccessShareLock); + TupleDesc tupleDescriptor = RelationGetDescr(databaseShardTable); + + const int scanKeyCount = 1; + ScanKeyData scanKey[1]; + bool indexOK = true; + + ScanKeyInit(&scanKey[0], Anum_database_shard_database_id, + BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(databaseOid)); + + SysScanDesc scanDescriptor = systable_beginscan(databaseShardTable, + DatabaseShardPrimaryKeyIndexId(), + indexOK, + NULL, scanKeyCount, scanKey); + + HeapTuple heapTuple = systable_getnext(scanDescriptor); + if (HeapTupleIsValid(heapTuple)) + { + result = TupleToDatabaseShard(heapTuple, tupleDescriptor); + } + + systable_endscan(scanDescriptor); + table_close(databaseShardTable, NoLock); + + return result; +} + + +/* + * TupleToDatabaseShard converts a database_shard record tuple into a DatabaseShard struct. + */ +static DatabaseShard * +TupleToDatabaseShard(HeapTuple heapTuple, TupleDesc tupleDescriptor) +{ + Datum datumArray[Natts_database_shard]; + bool isNullArray[Natts_database_shard]; + heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray); + + DatabaseShard *record = palloc0(sizeof(DatabaseShard)); + + record->databaseOid = + DatumGetObjectId(datumArray[Anum_database_shard_database_id - 1]); + + record->nodeGroupId = + DatumGetInt32(datumArray[Anum_database_shard_node_group_id - 1]); + + record->isAvailable = + DatumGetBool(datumArray[Anum_database_shard_is_available - 1]); + + return record; +} + + +/* + * citus_internal_add_database_shard is an internal UDF to + * add a row to database_shard. + */ +Datum +citus_internal_add_database_shard(PG_FUNCTION_ARGS) +{ + char *databaseName = TextDatumGetCString(PG_GETARG_DATUM(0)); + int nodeGroupId = PG_GETARG_INT32(1); + + bool missingOk = false; + Oid databaseOid = get_database_oid(databaseName, missingOk); + + if (!pg_database_ownercheck(databaseOid, GetUserId())) + { + aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE, + databaseName); + } + + InsertDatabaseShardAssignmentLocally(databaseOid, nodeGroupId); + + + PG_RETURN_VOID(); +} + + +/* + * InsertDatabaseShardAssignmentCommand returns a command to insert a database shard + * assignment into the metadata on a remote node. + */ +char * +InsertDatabaseShardAssignmentCommand(Oid databaseOid, int nodeGroupId) +{ + StringInfo command = makeStringInfo(); + char *databaseName = get_database_name(databaseOid); + + appendStringInfo(command, + "SELECT pg_catalog.citus_internal_add_database_shard(%s,%d)", + quote_literal_cstr(databaseName), + nodeGroupId); + + return command->data; +} + + +/* + * citus_internal_delete_database_shard is an internal UDF to + * delete a row from database_shard. + */ +Datum +citus_internal_delete_database_shard(PG_FUNCTION_ARGS) +{ + char *databaseName = TextDatumGetCString(PG_GETARG_DATUM(0)); + + bool missingOk = false; + Oid databaseOid = get_database_oid(databaseName, missingOk); + + if (!pg_database_ownercheck(databaseOid, GetUserId())) + { + aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE, + databaseName); + } + + DeleteDatabaseShardByDatabaseIdLocally(databaseOid); + + + + PG_RETURN_VOID(); +} + + +/* + * DeleteDatabaseShardByDatabaseIdCommand returns a command to delete a database shard + * assignment from the metadata on a remote node. + */ +static char * +DeleteDatabaseShardByDatabaseIdCommand(Oid databaseOid) +{ + StringInfo command = makeStringInfo(); + char *databaseName = get_database_name(databaseOid); + + appendStringInfo(command, + "SELECT pg_catalog.citus_internal_delete_database_shard(%s)", + quote_literal_cstr(databaseName)); + + return command->data; +} diff --git a/src/backend/distributed/database/database_size.c b/src/backend/distributed/database/database_size.c new file mode 100644 index 000000000..46e72ad13 --- /dev/null +++ b/src/backend/distributed/database/database_size.c @@ -0,0 +1,180 @@ +/*------------------------------------------------------------------------- + * + * database_size.c + * Functions for getting the size of a database. + * + * Copyright (c) Microsoft, Inc. + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" +#include "funcapi.h" +#include "fmgr.h" +#include "miscadmin.h" + +#include "commands/dbcommands.h" +#include "distributed/citus_safe_lib.h" +#include "distributed/database/database_sharding.h" +#include "distributed/listutils.h" +#include "distributed/metadata_cache.h" +#include "distributed/remote_commands.h" +#include "distributed/worker_transaction.h" +#include "utils/builtins.h" + + +static int64 GetLocalDatabaseSize(Oid databaseId); +static int64 CitusDatabaseShardSize(DatabaseShard *dbShard); +static int64 CitusDatabaseSizeOnNodeList(Oid databaseId, List *workerNodeList); + + +PG_FUNCTION_INFO_V1(citus_database_size_oid); +PG_FUNCTION_INFO_V1(citus_database_size_name); + + +/* + * citus_database_size_oid returns the size of a Citus database + * with the given oid. + */ +Datum +citus_database_size_oid(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + Oid databaseId = PG_GETARG_OID(0); + int64 size = CitusDatabaseSize(databaseId); + + PG_RETURN_INT64(size); +} + + +/* + * citus_database_size_name returns the size of a Citus database + * with the given name. + */ +Datum +citus_database_size_name(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + Name databaseName = PG_GETARG_NAME(0); + bool missingOk = false; + Oid databaseId = get_database_oid(NameStr(*databaseName), missingOk); + + int64 size = CitusDatabaseSize(databaseId); + + PG_RETURN_INT64(size); +} + + +/* + * CitusDatabaseSize returns the size of a Citus database. + */ +int64 +CitusDatabaseSize(Oid databaseId) +{ + DatabaseShard *dbShard = GetDatabaseShardByOid(databaseId); + if (dbShard != NULL) + { + /* for known database shards, get the remote size */ + return CitusDatabaseShardSize(dbShard); + } + + if (databaseId == MyDatabaseId) + { + /* for the current database, get the size from all nodes */ + List *workerNodes = TargetWorkerSetNodeList(ALL_SHARD_NODES, RowShareLock); + return CitusDatabaseSizeOnNodeList(databaseId, workerNodes); + } + + /* for other databases, get the local size */ + /* TODO: get it from main database? */ + return GetLocalDatabaseSize(databaseId); +} + + +/* + * GetLocalDatabaseSize returns the local database size by calling pg_database_size. + */ +static int64 +GetLocalDatabaseSize(Oid databaseId) +{ + Datum databaseIdDatum = ObjectIdGetDatum(databaseId); + Datum sizeDatum = DirectFunctionCall1(pg_database_size_oid, databaseIdDatum); + return DatumGetInt64(sizeDatum); +} + + +/* + * CitusDatabaseShardSize gets the database size for a specific + * shard. + */ +static int64 +CitusDatabaseShardSize(DatabaseShard *dbShard) +{ + WorkerNode *workerNode = LookupNodeForGroup(dbShard->nodeGroupId); + + return CitusDatabaseSizeOnNodeList(dbShard->databaseOid, list_make1(workerNode)); +} + + +/* + * CitusDatabaseSizeOnNodeList returns the sum of the sizes + * for a given database from all nodes in the list. + */ +static int64 +CitusDatabaseSizeOnNodeList(Oid databaseId, List *workerNodeList) +{ + int64 size = 0; + + bool raiseInterrupts = true; + + char *databaseName = get_database_name(databaseId); + char *command = psprintf("SELECT pg_catalog.pg_database_size(%s)", + quote_literal_cstr(databaseName)); + + WorkerNode *workerNode = NULL; + foreach_ptr(workerNode, workerNodeList) + { + if (workerNode->groupId == GetLocalGroupId()) + { + return GetLocalDatabaseSize(databaseId); + } + + int connectionFlags = 0; + MultiConnection *connection = GetNodeConnection(connectionFlags, + workerNode->workerName, + workerNode->workerPort); + + int querySent = SendRemoteCommand(connection, command); + if (querySent == 0) + { + ReportConnectionError(connection, ERROR); + } + + PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts); + if (!IsResponseOK(result)) + { + ReportResultError(connection, result, ERROR); + } + + if (PQntuples(result) != 1 || PQnfields(result) != 1) + { + PQclear(result); + ClearResults(connection, raiseInterrupts); + + ereport(ERROR, (errmsg("unexpected number of columns returned by: %s", + command))); + } + + if (!PQgetisnull(result, 0, 0)) + { + char *sizeString = PQgetvalue(result, 0, 0); + size += SafeStringToUint64(sizeString); + } + + PQclear(result); + ClearResults(connection, raiseInterrupts); + } + + return size; +} diff --git a/src/backend/distributed/database/database_sharding.h b/src/include/distributed/database/database_sharding.h similarity index 100% rename from src/backend/distributed/database/database_sharding.h rename to src/include/distributed/database/database_sharding.h diff --git a/src/include/distributed/pooler/pgbouncer_manager.h b/src/include/distributed/pooler/pgbouncer_manager.h new file mode 100644 index 000000000..530d1b1ee --- /dev/null +++ b/src/include/distributed/pooler/pgbouncer_manager.h @@ -0,0 +1,47 @@ +/*------------------------------------------------------------------------- + * + * pgbouncer_manager.h + * Functions for managing outbound pgbouncers + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef PGBOUNCER_MANAGER_H +#define PGBOUNCER_MANAGER_H + +/* default number of inbound pgbouncer processes (0 is disabled) */ +#define PGBOUNCER_INBOUND_PROCS_DEFAULT 0 + +/* bits reserved for local pgbouncer ID in the pgbouncer peer_id */ +#define PGBOUNCER_PEER_ID_LOCAL_ID_BITS 5 + +/* maximum number of inbound pgbouncer processes */ +#define PGBOUNCER_INBOUND_PROCS_MAX ((1 << PGBOUNCER_PEER_ID_LOCAL_ID_BITS) - 1) + + +/* GUC variable that sets the number of inbound pgbouncer procs */ +extern int PgBouncerInboundProcs; + +/* GUC variable that sets the inbound pgbouncer port */ +extern int PgBouncerInboundPort; + +/* GUC variable that sets the path to pgbouncer executable */ +extern char *PgBouncerPath; + +/* GUC variable that sets a pgbouncer file to include */ +extern char *PgBouncerIncludeConfig; + +/* global variable to request pgbouncer reconfiguration */ +extern bool ReconfigurePgBouncersOnCommit; + +void InitializeSharedPgBouncerManager(void); +size_t SharedPgBouncerManagerShmemSize(void); +void PgBouncerManagerMain(Datum arg); +bool PauseDatabaseOnInboundPgBouncers(char *databaseName); +bool ResumeDatabaseOnInboundPgBouncers(char *databaseName); +void TriggerPgBouncerReconfigureIfNeeded(void); + + +#endif /* PGBOUNCER_MANAGER_H */