diff --git a/src/backend/distributed/commands/database.c b/src/backend/distributed/commands/database.c index 4886ed62e..61d89db44 100644 --- a/src/backend/distributed/commands/database.c +++ b/src/backend/distributed/commands/database.c @@ -32,44 +32,43 @@ #include "distributed/deparser.h" #include "distributed/worker_protocol.h" #include "distributed/metadata/distobject.h" -#include "distributed/database/database_sharding.h" #include "distributed/deparse_shard_query.h" #include "distributed/listutils.h" #include "distributed/adaptive_executor.h" - - /* macros to add DefElems to a list */ -#define DEFELEM_ADD_STRING(options, key, value) { \ - DefElem *elem = makeDefElem(key, (Node *) makeString(value), -1); \ - options = lappend(options, elem); \ -} +#define DEFELEM_ADD_STRING(options, key, value) \ + { \ + DefElem *elem = makeDefElem(key, (Node *)makeString(value), -1); \ + options = lappend(options, elem); \ + } -#define DEFELEM_ADD_BOOL(options, key, value) { \ - DefElem *elem = makeDefElem(key, (Node *) makeBoolean(value), -1); \ - options = lappend(options, elem); \ -} +#define DEFELEM_ADD_BOOL(options, key, value) \ + { \ + DefElem *elem = makeDefElem(key, (Node *)makeBoolean(value), -1); \ + options = lappend(options, elem); \ + } -#define DEFELEM_ADD_INT(options, key, value) { \ - DefElem *elem = makeDefElem(key, (Node *) makeInteger(value), -1); \ - options = lappend(options, elem); \ -} +#define DEFELEM_ADD_INT(options, key, value) \ + { \ + DefElem *elem = makeDefElem(key, (Node *)makeInteger(value), -1); \ + options = lappend(options, elem); \ + } -static AlterOwnerStmt * RecreateAlterDatabaseOwnerStmt(Oid databaseOid); +static AlterOwnerStmt *RecreateAlterDatabaseOwnerStmt(Oid databaseOid); -static List * CreateDDLTaskList(char *command, List *workerNodeList, - bool outsideTransaction); +static List *CreateDDLTaskList(char *command, List *workerNodeList, + bool outsideTransaction); PG_FUNCTION_INFO_V1(citus_internal_database_command); static Oid get_database_owner(Oid db_oid); -List * PreprocessGrantOnDatabaseStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext); +List *PreprocessGrantOnDatabaseStmt(Node *node, const char *queryString, + ProcessUtilityContext processUtilityContext); /* controlled via GUC */ bool EnableCreateDatabasePropagation = true; bool EnableAlterDatabaseOwner = true; - /* * AlterDatabaseOwnerObjectAddress returns the ObjectAddress of the database that is the * object of the AlterOwnerStmt. Errors if missing_ok is false. @@ -80,14 +79,13 @@ AlterDatabaseOwnerObjectAddress(Node *node, bool missing_ok, bool isPostprocess) AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node); Assert(stmt->objectType == OBJECT_DATABASE); - Oid databaseOid = get_database_oid(strVal((String *) stmt->object), missing_ok); + Oid databaseOid = get_database_oid(strVal((String *)stmt->object), missing_ok); ObjectAddress *address = palloc0(sizeof(ObjectAddress)); ObjectAddressSet(*address, DatabaseRelationId, databaseOid); return list_make1(address); } - /* * DatabaseOwnerDDLCommands returns a list of sql statements to idempotently apply a * change of the database owner on the workers so that the database is owned by the same @@ -96,11 +94,10 @@ AlterDatabaseOwnerObjectAddress(Node *node, bool missing_ok, bool isPostprocess) List * DatabaseOwnerDDLCommands(const ObjectAddress *address) { - Node *stmt = (Node *) RecreateAlterDatabaseOwnerStmt(address->objectId); + Node *stmt = (Node *)RecreateAlterDatabaseOwnerStmt(address->objectId); return list_make1(DeparseTreeNode(stmt)); } - /* * RecreateAlterDatabaseOwnerStmt creates an AlterOwnerStmt that represents the operation * of changing the owner of the database to its current owner. @@ -111,7 +108,7 @@ RecreateAlterDatabaseOwnerStmt(Oid databaseOid) AlterOwnerStmt *stmt = makeNode(AlterOwnerStmt); stmt->objectType = OBJECT_DATABASE; - stmt->object = (Node *) makeString(get_database_name(databaseOid)); + stmt->object = (Node *)makeString(get_database_name(databaseOid)); Oid ownerOid = get_database_owner(databaseOid); stmt->newowner = makeNode(RoleSpec); @@ -121,7 +118,6 @@ RecreateAlterDatabaseOwnerStmt(Oid databaseOid) return stmt; } - /* * get_database_owner returns the Oid of the role owning the database */ @@ -135,14 +131,13 @@ get_database_owner(Oid db_oid) errmsg("database with OID %u does not exist", db_oid))); } - Oid dba = ((Form_pg_database) GETSTRUCT(tuple))->datdba; + Oid dba = ((Form_pg_database)GETSTRUCT(tuple))->datdba; ReleaseSysCache(tuple); return dba; } - /* * PreprocessGrantOnDatabaseStmt is executed before the statement is applied to the local * postgres instance. @@ -171,88 +166,15 @@ PreprocessGrantOnDatabaseStmt(Node *node, const char *queryString, EnsureCoordinator(); - char *sql = DeparseTreeNode((Node *) stmt); + char *sql = DeparseTreeNode((Node *)stmt); List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) sql, + (void *)sql, ENABLE_DDL_PROPAGATION); return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } -/* - * citus_internal_database_command is an internal UDF to - * create/drop a database in an idempotent maner without - * transaction block restrictions. - */ -Datum -citus_internal_database_command(PG_FUNCTION_ARGS) -{ - text *commandText = PG_GETARG_TEXT_P(0); - char *command = text_to_cstring(commandText); - Node *parseTree = ParseTreeNode(command); - - set_config_option("citus.enable_ddl_propagation", "off", - (superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION, - GUC_ACTION_LOCAL, true, 0, false); - - set_config_option("citus.enable_create_database_propagation", "off", - (superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION, - GUC_ACTION_LOCAL, true, 0, false); - - if (IsA(parseTree, CreatedbStmt)) - { - CreatedbStmt *stmt = castNode(CreatedbStmt, parseTree); - - bool missingOk = true; - Oid databaseOid = get_database_oid(stmt->dbname, missingOk); - - if (!OidIsValid(databaseOid)) - { - createdb(NULL, (CreatedbStmt *) parseTree); - } - else - { - /* TODO: check database properties */ - } - } - else if (IsA(parseTree, DropdbStmt)) - { - DropdbStmt *stmt = castNode(DropdbStmt, parseTree); - - bool missingOk = true; - Oid databaseOid = get_database_oid(stmt->dbname, missingOk); - - if (!OidIsValid(databaseOid)) - { - /* already dropped? */ - } - else - { - /* remove database from pg_dist_object */ - ObjectAddress dbAddress = { 0 }; - ObjectAddressSet(dbAddress, DatabaseRelationId, databaseOid); - - if (IsObjectDistributed(&dbAddress)) - { - UnmarkObjectDistributed(&dbAddress); - } - - /* remove database from database shards */ - DeleteDatabaseShardByDatabaseIdLocally(databaseOid); - - DropDatabase(NULL, (DropdbStmt *) parseTree); - } - } - else - { - ereport(ERROR, (errmsg("unsupported command type %d", nodeTag(parseTree)))); - } - - PG_RETURN_VOID(); -} - - /* * PreprocessAlterDatabaseStmt is executed before the statement is applied to the local * postgres instance. @@ -273,16 +195,15 @@ PreprocessAlterDatabaseStmt(Node *node, const char *queryString, EnsureCoordinator(); - char *sql = DeparseTreeNode((Node *) stmt); + char *sql = DeparseTreeNode((Node *)stmt); List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) sql, + (void *)sql, ENABLE_DDL_PROPAGATION); return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } - #if PG_VERSION_NUM >= PG_VERSION_15 /* @@ -305,16 +226,15 @@ PreprocessAlterDatabaseRefreshCollStmt(Node *node, const char *queryString, EnsureCoordinator(); - char *sql = DeparseTreeNode((Node *) stmt); + char *sql = DeparseTreeNode((Node *)stmt); List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) sql, + (void *)sql, ENABLE_DDL_PROPAGATION); return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } - #endif /* @@ -380,7 +300,7 @@ PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString, /* * PostprocessCreatedbStmt creates the plan to synchronize CREATE DATABASE * across nodes. We use the cannotBeExecutedInTransction option to avoid - * u* sending transaction blocks. + * sending transaction blocks. */ List * PostprocessCreateDatabaseStmt(Node *node, const char *queryString) @@ -425,24 +345,102 @@ PostprocessCreateDatabaseStmt(Node *node, const char *queryString) } /* synchronize pg_dist_object records */ - ObjectAddress dbAddress = { 0 }; + ObjectAddress dbAddress = {0}; ObjectAddressSet(dbAddress, DatabaseRelationId, databaseOid); MarkObjectDistributed(&dbAddress); + return NIL; } +/* + * citus_internal_database_command is an internal UDF to + * create/drop a database in an idempotent maner without + * transaction block restrictions. + */ +Datum citus_internal_database_command(PG_FUNCTION_ARGS) +{ + int saveNestLevel = NewGUCNestLevel(); + text *commandText = PG_GETARG_TEXT_P(0); + char *command = text_to_cstring(commandText); + Node *parseTree = ParseTreeNode(command); + + ereport(NOTICE, (errmsg("test internal pre"), + errhint("test pre hint"))); + + set_config_option("citus.enable_ddl_propagation", "off", + (superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION, + GUC_ACTION_LOCAL, true, 0, false); + + set_config_option("citus.enable_create_database_propagation", "off", + (superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION, + GUC_ACTION_LOCAL, true, 0, false); + + if (IsA(parseTree, CreatedbStmt)) + { + CreatedbStmt *stmt = castNode(CreatedbStmt, parseTree); + + bool missingOk = true; + Oid databaseOid = get_database_oid(stmt->dbname, missingOk); + + if (!OidIsValid(databaseOid)) + { + createdb(NULL, (CreatedbStmt *)parseTree); + } + else + { + /* TODO: check database properties */ + } + } + else if (IsA(parseTree, DropdbStmt)) + { + DropdbStmt *stmt = castNode(DropdbStmt, parseTree); + + bool missingOk = true; + Oid databaseOid = get_database_oid(stmt->dbname, missingOk); + + if (!OidIsValid(databaseOid)) + { + /* already dropped? */ + } + else + { + /* remove database from pg_dist_object */ + ObjectAddress dbAddress = {0}; + ObjectAddressSet(dbAddress, DatabaseRelationId, databaseOid); + + if (IsObjectDistributed(&dbAddress)) + { + UnmarkObjectDistributed(&dbAddress); + } + + // /* remove database from database shards */ + // DeleteDatabaseShardByDatabaseIdLocally(databaseOid); + + DropDatabase(NULL, (DropdbStmt *)parseTree); + } + } + else + { + ereport(ERROR, (errmsg("unsupported command type %d", nodeTag(parseTree)))); + } + + AtEOXact_GUC(true, saveNestLevel); + + PG_RETURN_VOID(); +} + List * PreprocessDropDatabaseStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext) + ProcessUtilityContext processUtilityContext) { if (!EnableCreateDatabasePropagation || !ShouldPropagate()) { return NIL; } - DropdbStmt *stmt = (DropdbStmt *) node; + DropdbStmt *stmt = (DropdbStmt *)node; char *databaseName = stmt->dbname; bool missingOk = true; Oid databaseOid = get_database_oid(databaseName, missingOk); @@ -452,7 +450,7 @@ PreprocessDropDatabaseStmt(Node *node, const char *queryString, return NIL; } - ObjectAddress dbAddress = { 0 }; + ObjectAddress dbAddress = {0}; ObjectAddressSet(dbAddress, DatabaseRelationId, databaseOid); if (!IsObjectDistributed(&dbAddress)) { @@ -472,7 +470,6 @@ PreprocessDropDatabaseStmt(Node *node, const char *queryString, "SELECT pg_catalog.citus_internal_database_command(%s)", quote_literal_cstr(dropDatabaseCommand)); - /* we execute here to avoid EnsureCoordinator check in ExecuteDistributedDDLJob */ bool outsideTransaction = false; List *taskList = CreateDDLTaskList(internalDropCommand->data, workerNodes, diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index cf8e0644e..a818a1ad7 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -694,7 +694,7 @@ ProcessUtilityInternal(PlannedStmt *pstmt, } /* inform the user about potential caveats */ - if (IsA(parsetree, CreatedbStmt)) + if (IsA(parsetree, CreatedbStmt) &&!EnableCreateDatabasePropagation) { if (EnableUnsupportedFeatureMessages) { diff --git a/src/backend/distributed/database/database_lock.c b/src/backend/distributed/database/database_lock.c deleted file mode 100644 index 64757832f..000000000 --- a/src/backend/distributed/database/database_lock.c +++ /dev/null @@ -1,53 +0,0 @@ -/*------------------------------------------------------------------------- - * - * database_lock.c - * Functions for locking a database. - * - * Copyright (c) Microsoft, Inc. - * - *------------------------------------------------------------------------- - */ -#include "postgres.h" -#include "funcapi.h" -#include "fmgr.h" -#include "miscadmin.h" - -#include "catalog/pg_database.h" -#include "commands/dbcommands.h" -#include "distributed/metadata_cache.h" -#include "storage/lmgr.h" - - -static void CitusDatabaseLock(Oid databaseId); - - -PG_FUNCTION_INFO_V1(citus_database_lock_by_name); - - -/* - * citus_database_lock locks the given database in access exclusive mode - * to temporarily block new connections. - */ -Datum -citus_database_lock_by_name(PG_FUNCTION_ARGS) -{ - CheckCitusVersion(ERROR); - - Name databaseName = PG_GETARG_NAME(0); - bool missingOk = false; - Oid databaseId = get_database_oid(NameStr(*databaseName), missingOk); - - CitusDatabaseLock(databaseId); - - PG_RETURN_VOID(); -} - - -/* - * CitusDatabaseLock locks a database for new connections. - */ -static void -CitusDatabaseLock(Oid databaseId) -{ - LockSharedObject(DatabaseRelationId, databaseId, 0, ExclusiveLock); -} diff --git a/src/backend/distributed/database/database_sharding.c b/src/backend/distributed/database/database_sharding.c deleted file mode 100644 index e8c35c04b..000000000 --- a/src/backend/distributed/database/database_sharding.c +++ /dev/null @@ -1,546 +0,0 @@ -/*------------------------------------------------------------------------- - * - * database_sharding.c - * - * This file contains module-level definitions. - * - * Copyright (c) 2023, Microsoft, Inc. - * - *------------------------------------------------------------------------- - */ - -#include "postgres.h" -#include "fmgr.h" -#include "miscadmin.h" - -#include "citus_version.h" -#include "pg_version_compat.h" - -#include "access/genam.h" -#include "commands/dbcommands.h" -#include "distributed/connection_management.h" -#include "distributed/database/database_sharding.h" -#include "distributed/deparser.h" -#include "distributed/deparse_shard_query.h" -#include "distributed/listutils.h" -#include "distributed/metadata_sync.h" -#include "distributed/pooler/pgbouncer_manager.h" -#include "distributed/remote_commands.h" -#include "distributed/shared_library_init.h" -#include "distributed/worker_transaction.h" -#include "executor/spi.h" -#include "nodes/makefuncs.h" -#include "nodes/parsenodes.h" -#include "postmaster/postmaster.h" -#include "tcop/utility.h" -#include "utils/builtins.h" -#include "utils/fmgroids.h" -#include "catalog/pg_database.h" - - -static void ExecuteCommandInControlDatabase(char *command); -static void AllowConnectionsOnlyOnNodeGroup(Oid databaseOid, Oid nodeGroupId); -static void InsertDatabaseShardAssignment(Oid databaseOid, int nodeGroupId); -static void InsertDatabaseShardAssignmentLocally(Oid databaseOid, int nodeGroupId); -static void InsertDatabaseShardAssignmentOnOtherNodes(Oid databaseOid, int nodeGroupId); -static void DeleteDatabaseShardByDatabaseId(Oid databaseOid); -static void DeleteDatabaseShardByDatabaseIdOnOtherNodes(Oid databaseOid); -static DatabaseShard * TupleToDatabaseShard(HeapTuple heapTuple, - TupleDesc tupleDescriptor); -static char * DeleteDatabaseShardByDatabaseIdCommand(Oid databaseOid); - - -PG_FUNCTION_INFO_V1(database_shard_assign); -PG_FUNCTION_INFO_V1(citus_internal_add_database_shard); -PG_FUNCTION_INFO_V1(citus_internal_delete_database_shard); - - -/* citus.enable_database_sharding setting */ -bool EnableDatabaseSharding = false; - -/* citus.database_sharding_pgbouncer_file setting */ -char *DatabaseShardingPgBouncerFile = ""; - - -/* - * PreProcessUtilityInDatabaseShard handles DDL commands that occur within a - * database shard and require global coordination: - * - CREATE/ALTER/DROP DATABASE - * - CREATE/ALTER/DROP ROLE/USER/GROUP - */ -void -PreProcessUtilityInDatabaseShard(Node *parseTree, const char *queryString, - ProcessUtilityContext context, - bool *runPreviousUtilityHook) -{ - if (!EnableDatabaseSharding || context != PROCESS_UTILITY_TOPLEVEL) - { - return; - } - - if (EnableCreateDatabasePropagation) - { - if (IsA(parseTree, CreatedbStmt)) - { - char *command = DeparseCreateDatabaseStmt(parseTree); - ExecuteCommandInControlDatabase(command); - - /* command is fully delegated to control database */ - *runPreviousUtilityHook = false; - } - else if (IsA(parseTree, DropdbStmt)) - { - char *command = DeparseDropDatabaseStmt(parseTree); - ExecuteCommandInControlDatabase(command); - - /* command is fully delegated to control database */ - *runPreviousUtilityHook = false; - } - } -} - - -/* - * PostProcessUtilityInDatabaseShard is currently a noop. - */ -void -PostProcessUtilityInDatabaseShard(Node *parseTree, const char *queryString, - ProcessUtilityContext context) -{ - if (!EnableDatabaseSharding || context != PROCESS_UTILITY_TOPLEVEL) - { - return; - } -} - - -/* - * ExecuteCommandInControlDatabase connects to localhost to execute a command - * in the main Citus database. - */ -static void -ExecuteCommandInControlDatabase(char *command) -{ - int connectionFlag = FORCE_NEW_CONNECTION; - - MultiConnection *connection = - GetNodeUserDatabaseConnection(connectionFlag, LocalHostName, PostPortNumber, - NULL, CitusMainDatabase); - - ExecuteCriticalRemoteCommand(connection, - "SET application_name TO 'citus_database_shard'"); - ExecuteCriticalRemoteCommand(connection, command); - CloseConnection(connection); -} - - -/* - * database_shard_assign assigns an existing database to a node. - */ -Datum -database_shard_assign(PG_FUNCTION_ARGS) -{ - CheckCitusVersion(ERROR); - - char *databaseName = text_to_cstring(PG_GETARG_TEXT_P(0)); - - bool missingOk = false; - Oid databaseOid = get_database_oid(databaseName, missingOk); - - if (!pg_database_ownercheck(databaseOid, GetUserId())) - { - ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - errmsg("permission denied to assign database \"%s\" " - "to a shard", - databaseName))); - } - - if (GetDatabaseShardByOid(databaseOid) != NULL) - { - ereport(ERROR, (errmsg("database is already assigned to a shard"))); - } - - AssignDatabaseToShard(databaseOid); - - PG_RETURN_VOID(); -} - - -/* - * AssignDatabaseToShard finds a suitable node for the given - * database and assigns it. - */ -void -AssignDatabaseToShard(Oid databaseOid) -{ - int nodeGroupId = GetLocalGroupId(); - - List *workerNodes = TargetWorkerSetNodeList(ALL_SHARD_NODES, RowShareLock); - if (list_length(workerNodes) > 0) - { - /* TODO: actually look for available space */ - int workerNodeIndex = databaseOid % list_length(workerNodes); - WorkerNode *workerNode = list_nth(workerNodes, workerNodeIndex); - nodeGroupId = workerNode->groupId; - } - - InsertDatabaseShardAssignment(databaseOid, nodeGroupId); - AllowConnectionsOnlyOnNodeGroup(databaseOid, nodeGroupId); - - ReconfigurePgBouncersOnCommit = true; -} - - -/* - * AllowConnectionsOnlyOnNodeGroup sets the ALLOW_CONNECTIONS properties on - * the database to false, except on nodeGroupId. - */ -static void -AllowConnectionsOnlyOnNodeGroup(Oid databaseOid, Oid nodeGroupId) -{ - StringInfo command = makeStringInfo(); - char *databaseName = get_database_name(databaseOid); - - List *workerNodes = TargetWorkerSetNodeList(ALL_SHARD_NODES, RowShareLock); - WorkerNode *workerNode = NULL; - - foreach_ptr(workerNode, workerNodes) - { - resetStringInfo(command); - - if (workerNode->groupId == nodeGroupId) - { - appendStringInfo(command, "GRANT CONNECT ON DATABASE %s TO public", - quote_identifier(databaseName)); - } - else - { - appendStringInfo(command, "REVOKE CONNECT ON DATABASE %s FROM public", - quote_identifier(databaseName)); - } - - if (workerNode->groupId == GetLocalGroupId()) - { - ExecuteQueryViaSPI(command->data, SPI_OK_UTILITY); - } - else - { - SendCommandToWorker(workerNode->workerName, workerNode->workerPort, - command->data); - } - } -} - - -/* - * InsertDatabaseShardAssignment inserts a record into the local - * citus_catalog.database_sharding table. - */ -static void -InsertDatabaseShardAssignment(Oid databaseOid, int nodeGroupId) -{ - InsertDatabaseShardAssignmentLocally(databaseOid, nodeGroupId); - - if (EnableMetadataSync) - { - InsertDatabaseShardAssignmentOnOtherNodes(databaseOid, nodeGroupId); - } -} - - -/* - * InsertDatabaseShardAssignmentLocally inserts a record into the local - * citus_catalog.database_sharding table. - */ -static void -InsertDatabaseShardAssignmentLocally(Oid databaseOid, int nodeGroupId) -{ - Datum values[Natts_database_shard]; - bool isNulls[Natts_database_shard]; - - /* form new shard tuple */ - memset(values, 0, sizeof(values)); - memset(isNulls, false, sizeof(isNulls)); - - values[Anum_database_shard_database_id - 1] = ObjectIdGetDatum(databaseOid); - values[Anum_database_shard_node_group_id - 1] = Int32GetDatum(nodeGroupId); - values[Anum_database_shard_is_available - 1] = BoolGetDatum(true); - - /* open shard relation and insert new tuple */ - Relation databaseShardTable = table_open(DatabaseShardRelationId(), RowExclusiveLock); - - TupleDesc tupleDescriptor = RelationGetDescr(databaseShardTable); - HeapTuple heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls); - - CatalogTupleInsert(databaseShardTable, heapTuple); - - CommandCounterIncrement(); - table_close(databaseShardTable, NoLock); -} - - -/* - * InsertDatabaseShardAssignmentOnOtherNodes inserts a record into the - * citus_catalog.database_sharding table on other nodes. - */ -static void -InsertDatabaseShardAssignmentOnOtherNodes(Oid databaseOid, int nodeGroupId) -{ - char *insertCommand = InsertDatabaseShardAssignmentCommand(databaseOid, nodeGroupId); - SendCommandToWorkersWithMetadata(insertCommand); -} - - -/* - * UpdateDatabaseShard updates a database shard after it is moved to a new node. - */ -void -UpdateDatabaseShard(Oid databaseOid, int targetNodeGroupId) -{ - DeleteDatabaseShardByDatabaseId(databaseOid); - InsertDatabaseShardAssignment(databaseOid, targetNodeGroupId); - AllowConnectionsOnlyOnNodeGroup(databaseOid, targetNodeGroupId); -} - - -/* - * DeleteDatabaseShardByDatabaseId deletes a record from the - * citus_catalog.database_sharding table. - */ -static void -DeleteDatabaseShardByDatabaseId(Oid databaseOid) -{ - DeleteDatabaseShardByDatabaseIdLocally(databaseOid); - - if (EnableMetadataSync) - { - DeleteDatabaseShardByDatabaseIdOnOtherNodes(databaseOid); - } -} - - -/* - * DeleteDatabaseShardByDatabaseIdLocally deletes a database_shard record by database OID. - */ -void -DeleteDatabaseShardByDatabaseIdLocally(Oid databaseOid) -{ - Relation databaseShardTable = table_open(DatabaseShardRelationId(), - RowExclusiveLock); - - const int scanKeyCount = 1; - ScanKeyData scanKey[1]; - bool indexOK = true; - - ScanKeyInit(&scanKey[0], Anum_database_shard_database_id, - BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(databaseOid)); - - SysScanDesc scanDescriptor = systable_beginscan(databaseShardTable, - DatabaseShardPrimaryKeyIndexId(), - indexOK, - NULL, scanKeyCount, scanKey); - - HeapTuple heapTuple = systable_getnext(scanDescriptor); - if (heapTuple != NULL) - { - simple_heap_delete(databaseShardTable, &heapTuple->t_self); - } - - systable_endscan(scanDescriptor); - - CommandCounterIncrement(); - table_close(databaseShardTable, NoLock); -} - - -/* - * DeleteDatabaseShardByDatabaseIdOnOtherNodes deletes a record from the - * citus_catalog.database_sharding table on other nodes. - */ -static void -DeleteDatabaseShardByDatabaseIdOnOtherNodes(Oid databaseOid) -{ - char *deleteCommand = DeleteDatabaseShardByDatabaseIdCommand(databaseOid); - SendCommandToWorkersWithMetadata(deleteCommand); -} - - -/* - * ListDatabaseShards lists all database shards in citus_catalog.database_shard. - */ -List * -ListDatabaseShards(void) -{ - Relation databaseShardTable = table_open(DatabaseShardRelationId(), AccessShareLock); - TupleDesc tupleDescriptor = RelationGetDescr(databaseShardTable); - - List *dbShardList = NIL; - int scanKeyCount = 0; - bool indexOK = false; - - SysScanDesc scanDescriptor = systable_beginscan(databaseShardTable, InvalidOid, - indexOK, NULL, scanKeyCount, NULL); - - HeapTuple heapTuple = NULL; - while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor))) - { - DatabaseShard *dbShard = TupleToDatabaseShard(heapTuple, tupleDescriptor); - dbShardList = lappend(dbShardList, dbShard); - } - - systable_endscan(scanDescriptor); - table_close(databaseShardTable, NoLock); - - return dbShardList; -} - - -/* - * GetDatabaseShardByOid gets a database shard by database OID or - * NULL if no database shard could be found. - */ -DatabaseShard * -GetDatabaseShardByOid(Oid databaseOid) -{ - DatabaseShard *result = NULL; - - Relation databaseShardTable = table_open(DatabaseShardRelationId(), AccessShareLock); - TupleDesc tupleDescriptor = RelationGetDescr(databaseShardTable); - - const int scanKeyCount = 1; - ScanKeyData scanKey[1]; - bool indexOK = true; - - ScanKeyInit(&scanKey[0], Anum_database_shard_database_id, - BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(databaseOid)); - - SysScanDesc scanDescriptor = systable_beginscan(databaseShardTable, - DatabaseShardPrimaryKeyIndexId(), - indexOK, - NULL, scanKeyCount, scanKey); - - HeapTuple heapTuple = systable_getnext(scanDescriptor); - if (HeapTupleIsValid(heapTuple)) - { - result = TupleToDatabaseShard(heapTuple, tupleDescriptor); - } - - systable_endscan(scanDescriptor); - table_close(databaseShardTable, NoLock); - - return result; -} - - -/* - * TupleToDatabaseShard converts a database_shard record tuple into a DatabaseShard struct. - */ -static DatabaseShard * -TupleToDatabaseShard(HeapTuple heapTuple, TupleDesc tupleDescriptor) -{ - Datum datumArray[Natts_database_shard]; - bool isNullArray[Natts_database_shard]; - heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray); - - DatabaseShard *record = palloc0(sizeof(DatabaseShard)); - - record->databaseOid = - DatumGetObjectId(datumArray[Anum_database_shard_database_id - 1]); - - record->nodeGroupId = - DatumGetInt32(datumArray[Anum_database_shard_node_group_id - 1]); - - record->isAvailable = - DatumGetBool(datumArray[Anum_database_shard_is_available - 1]); - - return record; -} - - -/* - * citus_internal_add_database_shard is an internal UDF to - * add a row to database_shard. - */ -Datum -citus_internal_add_database_shard(PG_FUNCTION_ARGS) -{ - char *databaseName = TextDatumGetCString(PG_GETARG_DATUM(0)); - int nodeGroupId = PG_GETARG_INT32(1); - - bool missingOk = false; - Oid databaseOid = get_database_oid(databaseName, missingOk); - - if (!pg_database_ownercheck(databaseOid, GetUserId())) - { - aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE, - databaseName); - } - - InsertDatabaseShardAssignmentLocally(databaseOid, nodeGroupId); - - - PG_RETURN_VOID(); -} - - -/* - * InsertDatabaseShardAssignmentCommand returns a command to insert a database shard - * assignment into the metadata on a remote node. - */ -char * -InsertDatabaseShardAssignmentCommand(Oid databaseOid, int nodeGroupId) -{ - StringInfo command = makeStringInfo(); - char *databaseName = get_database_name(databaseOid); - - appendStringInfo(command, - "SELECT pg_catalog.citus_internal_add_database_shard(%s,%d)", - quote_literal_cstr(databaseName), - nodeGroupId); - - return command->data; -} - - -/* - * citus_internal_delete_database_shard is an internal UDF to - * delete a row from database_shard. - */ -Datum -citus_internal_delete_database_shard(PG_FUNCTION_ARGS) -{ - char *databaseName = TextDatumGetCString(PG_GETARG_DATUM(0)); - - bool missingOk = false; - Oid databaseOid = get_database_oid(databaseName, missingOk); - - if (!pg_database_ownercheck(databaseOid, GetUserId())) - { - aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE, - databaseName); - } - - DeleteDatabaseShardByDatabaseIdLocally(databaseOid); - - - - PG_RETURN_VOID(); -} - - -/* - * DeleteDatabaseShardByDatabaseIdCommand returns a command to delete a database shard - * assignment from the metadata on a remote node. - */ -static char * -DeleteDatabaseShardByDatabaseIdCommand(Oid databaseOid) -{ - StringInfo command = makeStringInfo(); - char *databaseName = get_database_name(databaseOid); - - appendStringInfo(command, - "SELECT pg_catalog.citus_internal_delete_database_shard(%s)", - quote_literal_cstr(databaseName)); - - return command->data; -} diff --git a/src/backend/distributed/database/database_size.c b/src/backend/distributed/database/database_size.c deleted file mode 100644 index 46e72ad13..000000000 --- a/src/backend/distributed/database/database_size.c +++ /dev/null @@ -1,180 +0,0 @@ -/*------------------------------------------------------------------------- - * - * database_size.c - * Functions for getting the size of a database. - * - * Copyright (c) Microsoft, Inc. - * - *------------------------------------------------------------------------- - */ -#include "postgres.h" -#include "funcapi.h" -#include "fmgr.h" -#include "miscadmin.h" - -#include "commands/dbcommands.h" -#include "distributed/citus_safe_lib.h" -#include "distributed/database/database_sharding.h" -#include "distributed/listutils.h" -#include "distributed/metadata_cache.h" -#include "distributed/remote_commands.h" -#include "distributed/worker_transaction.h" -#include "utils/builtins.h" - - -static int64 GetLocalDatabaseSize(Oid databaseId); -static int64 CitusDatabaseShardSize(DatabaseShard *dbShard); -static int64 CitusDatabaseSizeOnNodeList(Oid databaseId, List *workerNodeList); - - -PG_FUNCTION_INFO_V1(citus_database_size_oid); -PG_FUNCTION_INFO_V1(citus_database_size_name); - - -/* - * citus_database_size_oid returns the size of a Citus database - * with the given oid. - */ -Datum -citus_database_size_oid(PG_FUNCTION_ARGS) -{ - CheckCitusVersion(ERROR); - - Oid databaseId = PG_GETARG_OID(0); - int64 size = CitusDatabaseSize(databaseId); - - PG_RETURN_INT64(size); -} - - -/* - * citus_database_size_name returns the size of a Citus database - * with the given name. - */ -Datum -citus_database_size_name(PG_FUNCTION_ARGS) -{ - CheckCitusVersion(ERROR); - - Name databaseName = PG_GETARG_NAME(0); - bool missingOk = false; - Oid databaseId = get_database_oid(NameStr(*databaseName), missingOk); - - int64 size = CitusDatabaseSize(databaseId); - - PG_RETURN_INT64(size); -} - - -/* - * CitusDatabaseSize returns the size of a Citus database. - */ -int64 -CitusDatabaseSize(Oid databaseId) -{ - DatabaseShard *dbShard = GetDatabaseShardByOid(databaseId); - if (dbShard != NULL) - { - /* for known database shards, get the remote size */ - return CitusDatabaseShardSize(dbShard); - } - - if (databaseId == MyDatabaseId) - { - /* for the current database, get the size from all nodes */ - List *workerNodes = TargetWorkerSetNodeList(ALL_SHARD_NODES, RowShareLock); - return CitusDatabaseSizeOnNodeList(databaseId, workerNodes); - } - - /* for other databases, get the local size */ - /* TODO: get it from main database? */ - return GetLocalDatabaseSize(databaseId); -} - - -/* - * GetLocalDatabaseSize returns the local database size by calling pg_database_size. - */ -static int64 -GetLocalDatabaseSize(Oid databaseId) -{ - Datum databaseIdDatum = ObjectIdGetDatum(databaseId); - Datum sizeDatum = DirectFunctionCall1(pg_database_size_oid, databaseIdDatum); - return DatumGetInt64(sizeDatum); -} - - -/* - * CitusDatabaseShardSize gets the database size for a specific - * shard. - */ -static int64 -CitusDatabaseShardSize(DatabaseShard *dbShard) -{ - WorkerNode *workerNode = LookupNodeForGroup(dbShard->nodeGroupId); - - return CitusDatabaseSizeOnNodeList(dbShard->databaseOid, list_make1(workerNode)); -} - - -/* - * CitusDatabaseSizeOnNodeList returns the sum of the sizes - * for a given database from all nodes in the list. - */ -static int64 -CitusDatabaseSizeOnNodeList(Oid databaseId, List *workerNodeList) -{ - int64 size = 0; - - bool raiseInterrupts = true; - - char *databaseName = get_database_name(databaseId); - char *command = psprintf("SELECT pg_catalog.pg_database_size(%s)", - quote_literal_cstr(databaseName)); - - WorkerNode *workerNode = NULL; - foreach_ptr(workerNode, workerNodeList) - { - if (workerNode->groupId == GetLocalGroupId()) - { - return GetLocalDatabaseSize(databaseId); - } - - int connectionFlags = 0; - MultiConnection *connection = GetNodeConnection(connectionFlags, - workerNode->workerName, - workerNode->workerPort); - - int querySent = SendRemoteCommand(connection, command); - if (querySent == 0) - { - ReportConnectionError(connection, ERROR); - } - - PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts); - if (!IsResponseOK(result)) - { - ReportResultError(connection, result, ERROR); - } - - if (PQntuples(result) != 1 || PQnfields(result) != 1) - { - PQclear(result); - ClearResults(connection, raiseInterrupts); - - ereport(ERROR, (errmsg("unexpected number of columns returned by: %s", - command))); - } - - if (!PQgetisnull(result, 0, 0)) - { - char *sizeString = PQgetvalue(result, 0, 0); - size += SafeStringToUint64(sizeString); - } - - PQclear(result); - ClearResults(connection, raiseInterrupts); - } - - return size; -} diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 108750ed6..7345b1839 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -208,7 +208,6 @@ typedef struct MetadataCacheData Oid distTransactionGroupIndexId; Oid distTenantSchemaPrimaryKeyIndexId; Oid distTenantSchemaUniqueColocationIdIndexId; - Oid citusCatalogNamespaceId; Oid copyFormatTypeId; Oid readIntermediateResultFuncId; Oid readIntermediateResultArrayFuncId; @@ -312,7 +311,6 @@ static HeapTuple LookupDistPartitionTuple(Relation pgDistPartition, Oid relation static void GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod, Oid *columnTypeId, int32 *columnTypeMod, Oid *intervalTypeId, int32 *intervalTypeMod); -static void CachedNamespaceLookup(const char *nspname, Oid *cachedOid); static void CachedRelationLookup(const char *relationName, Oid *cachedOid); static void CachedRelationLookupExtended(const char *relationName, Oid *cachedOid, bool missing_ok); @@ -2771,36 +2769,7 @@ DistRebalanceStrategyRelationId(void) return MetadataCache.distRebalanceStrategyRelationId; } -/* return oid of citus_catalog.database_sharding relation */ -Oid -DatabaseShardRelationId(void) -{ - CachedRelationNamespaceLookup("database_shard", CitusCatalogNamespaceId(), - &MetadataCache.databaseShardRelationId); - return MetadataCache.databaseShardRelationId; -} - - -/* return oid of citus_catalog.database_sharding primary key */ -Oid -DatabaseShardPrimaryKeyIndexId(void) -{ - CachedRelationNamespaceLookup("database_shard_pkey", CitusCatalogNamespaceId(), - &MetadataCache.databaseShardPKeyIndexId); - - return MetadataCache.databaseShardPKeyIndexId; -} - - - -/* return the oid of citus namespace */ -Oid -CitusCatalogNamespaceId(void) -{ - CachedNamespaceLookup("citus_catalog", &MetadataCache.citusCatalogNamespaceId); - return MetadataCache.citusCatalogNamespaceId; -} /* return oid of pg_dist_object relation */ @@ -2870,17 +2839,6 @@ DistObjectPrimaryKeyIndexId(void) &MetadataCache.distObjectPrimaryKeyIndexId, true); - if (!OidIsValid(MetadataCache.distObjectPrimaryKeyIndexId)) - { - /* - * We can only ever reach here while we are creating/altering our extension before - * the table is moved to pg_catalog. - */ - CachedRelationNamespaceLookupExtended("pg_dist_object_pkey", - CitusCatalogNamespaceId(), - &MetadataCache.distObjectPrimaryKeyIndexId, - false); - } return MetadataCache.distObjectPrimaryKeyIndexId; } @@ -5446,28 +5404,6 @@ DeformedDistShardTupleToShardInterval(Datum *datumArray, bool *isNullArray, } -/* - * CachedNamespaceLookup performs a cached lookup for the namespace (schema), with the - * result cached in cachedOid. - */ -static void -CachedNamespaceLookup(const char *nspname, Oid *cachedOid) -{ - /* force callbacks to be registered, so we always get notified upon changes */ - InitializeCaches(); - - if (*cachedOid == InvalidOid) - { - *cachedOid = get_namespace_oid(nspname, true); - - if (*cachedOid == InvalidOid) - { - ereport(ERROR, (errmsg( - "cache lookup failed for namespace %s, called too early?", - nspname))); - } - } -} /* diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 3cdd3dbaa..c510c2d30 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -1264,6 +1264,17 @@ RegisterCitusConfigVariables(void) GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE, NULL, NULL, NULL); + DefineCustomBoolVariable( + "citus.enable_create_database_propagation", + gettext_noop("Enables propagating CREATE DATABASE " + "and DROP DATABASE statements to workers"), + NULL, + &EnableCreateDatabasePropagation, + true, + PGC_USERSET, + GUC_STANDARD, + NULL, NULL, NULL); + DefineCustomBoolVariable( "citus.enable_create_role_propagation", gettext_noop("Enables propagating CREATE ROLE " diff --git a/src/backend/distributed/sql/downgrades/citus--12.2-1--12.1-1.sql b/src/backend/distributed/sql/downgrades/citus--12.2-1--12.1-1.sql index 0550bd5cf..3c1d76ab9 100644 --- a/src/backend/distributed/sql/downgrades/citus--12.2-1--12.1-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--12.2-1--12.1-1.sql @@ -1,3 +1,2 @@ -- citus--12.2-1--12.1-1 DROP FUNCTION pg_catalog.citus_internal_database_command(text); --- this is an empty downgrade path since citus--12.2-1--12.1-1.sql is empty for now diff --git a/src/include/distributed/pooler/pgbouncer_manager.h b/src/include/distributed/pooler/pgbouncer_manager.h deleted file mode 100644 index 530d1b1ee..000000000 --- a/src/include/distributed/pooler/pgbouncer_manager.h +++ /dev/null @@ -1,47 +0,0 @@ -/*------------------------------------------------------------------------- - * - * pgbouncer_manager.h - * Functions for managing outbound pgbouncers - * - * Copyright (c) Citus Data, Inc. - * - *------------------------------------------------------------------------- - */ - -#ifndef PGBOUNCER_MANAGER_H -#define PGBOUNCER_MANAGER_H - -/* default number of inbound pgbouncer processes (0 is disabled) */ -#define PGBOUNCER_INBOUND_PROCS_DEFAULT 0 - -/* bits reserved for local pgbouncer ID in the pgbouncer peer_id */ -#define PGBOUNCER_PEER_ID_LOCAL_ID_BITS 5 - -/* maximum number of inbound pgbouncer processes */ -#define PGBOUNCER_INBOUND_PROCS_MAX ((1 << PGBOUNCER_PEER_ID_LOCAL_ID_BITS) - 1) - - -/* GUC variable that sets the number of inbound pgbouncer procs */ -extern int PgBouncerInboundProcs; - -/* GUC variable that sets the inbound pgbouncer port */ -extern int PgBouncerInboundPort; - -/* GUC variable that sets the path to pgbouncer executable */ -extern char *PgBouncerPath; - -/* GUC variable that sets a pgbouncer file to include */ -extern char *PgBouncerIncludeConfig; - -/* global variable to request pgbouncer reconfiguration */ -extern bool ReconfigurePgBouncersOnCommit; - -void InitializeSharedPgBouncerManager(void); -size_t SharedPgBouncerManagerShmemSize(void); -void PgBouncerManagerMain(Datum arg); -bool PauseDatabaseOnInboundPgBouncers(char *databaseName); -bool ResumeDatabaseOnInboundPgBouncers(char *databaseName); -void TriggerPgBouncerReconfigureIfNeeded(void); - - -#endif /* PGBOUNCER_MANAGER_H */