mirror of https://github.com/citusdata/citus.git
Fixes runtime and compile errors
parent
8435509347
commit
84826dcc98
|
@ -32,44 +32,43 @@
|
||||||
#include "distributed/deparser.h"
|
#include "distributed/deparser.h"
|
||||||
#include "distributed/worker_protocol.h"
|
#include "distributed/worker_protocol.h"
|
||||||
#include "distributed/metadata/distobject.h"
|
#include "distributed/metadata/distobject.h"
|
||||||
#include "distributed/database/database_sharding.h"
|
|
||||||
#include "distributed/deparse_shard_query.h"
|
#include "distributed/deparse_shard_query.h"
|
||||||
#include "distributed/listutils.h"
|
#include "distributed/listutils.h"
|
||||||
#include "distributed/adaptive_executor.h"
|
#include "distributed/adaptive_executor.h"
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/* macros to add DefElems to a list */
|
/* macros to add DefElems to a list */
|
||||||
#define DEFELEM_ADD_STRING(options, key, value) { \
|
#define DEFELEM_ADD_STRING(options, key, value) \
|
||||||
DefElem *elem = makeDefElem(key, (Node *) makeString(value), -1); \
|
{ \
|
||||||
|
DefElem *elem = makeDefElem(key, (Node *)makeString(value), -1); \
|
||||||
options = lappend(options, elem); \
|
options = lappend(options, elem); \
|
||||||
}
|
}
|
||||||
|
|
||||||
#define DEFELEM_ADD_BOOL(options, key, value) { \
|
#define DEFELEM_ADD_BOOL(options, key, value) \
|
||||||
DefElem *elem = makeDefElem(key, (Node *) makeBoolean(value), -1); \
|
{ \
|
||||||
|
DefElem *elem = makeDefElem(key, (Node *)makeBoolean(value), -1); \
|
||||||
options = lappend(options, elem); \
|
options = lappend(options, elem); \
|
||||||
}
|
}
|
||||||
|
|
||||||
#define DEFELEM_ADD_INT(options, key, value) { \
|
#define DEFELEM_ADD_INT(options, key, value) \
|
||||||
DefElem *elem = makeDefElem(key, (Node *) makeInteger(value), -1); \
|
{ \
|
||||||
|
DefElem *elem = makeDefElem(key, (Node *)makeInteger(value), -1); \
|
||||||
options = lappend(options, elem); \
|
options = lappend(options, elem); \
|
||||||
}
|
}
|
||||||
|
|
||||||
static AlterOwnerStmt * RecreateAlterDatabaseOwnerStmt(Oid databaseOid);
|
static AlterOwnerStmt *RecreateAlterDatabaseOwnerStmt(Oid databaseOid);
|
||||||
|
|
||||||
static List * CreateDDLTaskList(char *command, List *workerNodeList,
|
static List *CreateDDLTaskList(char *command, List *workerNodeList,
|
||||||
bool outsideTransaction);
|
bool outsideTransaction);
|
||||||
|
|
||||||
PG_FUNCTION_INFO_V1(citus_internal_database_command);
|
PG_FUNCTION_INFO_V1(citus_internal_database_command);
|
||||||
static Oid get_database_owner(Oid db_oid);
|
static Oid get_database_owner(Oid db_oid);
|
||||||
List * PreprocessGrantOnDatabaseStmt(Node *node, const char *queryString,
|
List *PreprocessGrantOnDatabaseStmt(Node *node, const char *queryString,
|
||||||
ProcessUtilityContext processUtilityContext);
|
ProcessUtilityContext processUtilityContext);
|
||||||
|
|
||||||
/* controlled via GUC */
|
/* controlled via GUC */
|
||||||
bool EnableCreateDatabasePropagation = true;
|
bool EnableCreateDatabasePropagation = true;
|
||||||
bool EnableAlterDatabaseOwner = true;
|
bool EnableAlterDatabaseOwner = true;
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* AlterDatabaseOwnerObjectAddress returns the ObjectAddress of the database that is the
|
* AlterDatabaseOwnerObjectAddress returns the ObjectAddress of the database that is the
|
||||||
* object of the AlterOwnerStmt. Errors if missing_ok is false.
|
* object of the AlterOwnerStmt. Errors if missing_ok is false.
|
||||||
|
@ -80,14 +79,13 @@ AlterDatabaseOwnerObjectAddress(Node *node, bool missing_ok, bool isPostprocess)
|
||||||
AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node);
|
AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node);
|
||||||
Assert(stmt->objectType == OBJECT_DATABASE);
|
Assert(stmt->objectType == OBJECT_DATABASE);
|
||||||
|
|
||||||
Oid databaseOid = get_database_oid(strVal((String *) stmt->object), missing_ok);
|
Oid databaseOid = get_database_oid(strVal((String *)stmt->object), missing_ok);
|
||||||
ObjectAddress *address = palloc0(sizeof(ObjectAddress));
|
ObjectAddress *address = palloc0(sizeof(ObjectAddress));
|
||||||
ObjectAddressSet(*address, DatabaseRelationId, databaseOid);
|
ObjectAddressSet(*address, DatabaseRelationId, databaseOid);
|
||||||
|
|
||||||
return list_make1(address);
|
return list_make1(address);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* DatabaseOwnerDDLCommands returns a list of sql statements to idempotently apply a
|
* DatabaseOwnerDDLCommands returns a list of sql statements to idempotently apply a
|
||||||
* change of the database owner on the workers so that the database is owned by the same
|
* change of the database owner on the workers so that the database is owned by the same
|
||||||
|
@ -96,11 +94,10 @@ AlterDatabaseOwnerObjectAddress(Node *node, bool missing_ok, bool isPostprocess)
|
||||||
List *
|
List *
|
||||||
DatabaseOwnerDDLCommands(const ObjectAddress *address)
|
DatabaseOwnerDDLCommands(const ObjectAddress *address)
|
||||||
{
|
{
|
||||||
Node *stmt = (Node *) RecreateAlterDatabaseOwnerStmt(address->objectId);
|
Node *stmt = (Node *)RecreateAlterDatabaseOwnerStmt(address->objectId);
|
||||||
return list_make1(DeparseTreeNode(stmt));
|
return list_make1(DeparseTreeNode(stmt));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* RecreateAlterDatabaseOwnerStmt creates an AlterOwnerStmt that represents the operation
|
* RecreateAlterDatabaseOwnerStmt creates an AlterOwnerStmt that represents the operation
|
||||||
* of changing the owner of the database to its current owner.
|
* of changing the owner of the database to its current owner.
|
||||||
|
@ -111,7 +108,7 @@ RecreateAlterDatabaseOwnerStmt(Oid databaseOid)
|
||||||
AlterOwnerStmt *stmt = makeNode(AlterOwnerStmt);
|
AlterOwnerStmt *stmt = makeNode(AlterOwnerStmt);
|
||||||
|
|
||||||
stmt->objectType = OBJECT_DATABASE;
|
stmt->objectType = OBJECT_DATABASE;
|
||||||
stmt->object = (Node *) makeString(get_database_name(databaseOid));
|
stmt->object = (Node *)makeString(get_database_name(databaseOid));
|
||||||
|
|
||||||
Oid ownerOid = get_database_owner(databaseOid);
|
Oid ownerOid = get_database_owner(databaseOid);
|
||||||
stmt->newowner = makeNode(RoleSpec);
|
stmt->newowner = makeNode(RoleSpec);
|
||||||
|
@ -121,7 +118,6 @@ RecreateAlterDatabaseOwnerStmt(Oid databaseOid)
|
||||||
return stmt;
|
return stmt;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* get_database_owner returns the Oid of the role owning the database
|
* get_database_owner returns the Oid of the role owning the database
|
||||||
*/
|
*/
|
||||||
|
@ -135,14 +131,13 @@ get_database_owner(Oid db_oid)
|
||||||
errmsg("database with OID %u does not exist", db_oid)));
|
errmsg("database with OID %u does not exist", db_oid)));
|
||||||
}
|
}
|
||||||
|
|
||||||
Oid dba = ((Form_pg_database) GETSTRUCT(tuple))->datdba;
|
Oid dba = ((Form_pg_database)GETSTRUCT(tuple))->datdba;
|
||||||
|
|
||||||
ReleaseSysCache(tuple);
|
ReleaseSysCache(tuple);
|
||||||
|
|
||||||
return dba;
|
return dba;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* PreprocessGrantOnDatabaseStmt is executed before the statement is applied to the local
|
* PreprocessGrantOnDatabaseStmt is executed before the statement is applied to the local
|
||||||
* postgres instance.
|
* postgres instance.
|
||||||
|
@ -171,88 +166,15 @@ PreprocessGrantOnDatabaseStmt(Node *node, const char *queryString,
|
||||||
|
|
||||||
EnsureCoordinator();
|
EnsureCoordinator();
|
||||||
|
|
||||||
char *sql = DeparseTreeNode((Node *) stmt);
|
char *sql = DeparseTreeNode((Node *)stmt);
|
||||||
|
|
||||||
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
|
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
|
||||||
(void *) sql,
|
(void *)sql,
|
||||||
ENABLE_DDL_PROPAGATION);
|
ENABLE_DDL_PROPAGATION);
|
||||||
|
|
||||||
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
|
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* citus_internal_database_command is an internal UDF to
|
|
||||||
* create/drop a database in an idempotent maner without
|
|
||||||
* transaction block restrictions.
|
|
||||||
*/
|
|
||||||
Datum
|
|
||||||
citus_internal_database_command(PG_FUNCTION_ARGS)
|
|
||||||
{
|
|
||||||
text *commandText = PG_GETARG_TEXT_P(0);
|
|
||||||
char *command = text_to_cstring(commandText);
|
|
||||||
Node *parseTree = ParseTreeNode(command);
|
|
||||||
|
|
||||||
set_config_option("citus.enable_ddl_propagation", "off",
|
|
||||||
(superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION,
|
|
||||||
GUC_ACTION_LOCAL, true, 0, false);
|
|
||||||
|
|
||||||
set_config_option("citus.enable_create_database_propagation", "off",
|
|
||||||
(superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION,
|
|
||||||
GUC_ACTION_LOCAL, true, 0, false);
|
|
||||||
|
|
||||||
if (IsA(parseTree, CreatedbStmt))
|
|
||||||
{
|
|
||||||
CreatedbStmt *stmt = castNode(CreatedbStmt, parseTree);
|
|
||||||
|
|
||||||
bool missingOk = true;
|
|
||||||
Oid databaseOid = get_database_oid(stmt->dbname, missingOk);
|
|
||||||
|
|
||||||
if (!OidIsValid(databaseOid))
|
|
||||||
{
|
|
||||||
createdb(NULL, (CreatedbStmt *) parseTree);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
/* TODO: check database properties */
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else if (IsA(parseTree, DropdbStmt))
|
|
||||||
{
|
|
||||||
DropdbStmt *stmt = castNode(DropdbStmt, parseTree);
|
|
||||||
|
|
||||||
bool missingOk = true;
|
|
||||||
Oid databaseOid = get_database_oid(stmt->dbname, missingOk);
|
|
||||||
|
|
||||||
if (!OidIsValid(databaseOid))
|
|
||||||
{
|
|
||||||
/* already dropped? */
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
/* remove database from pg_dist_object */
|
|
||||||
ObjectAddress dbAddress = { 0 };
|
|
||||||
ObjectAddressSet(dbAddress, DatabaseRelationId, databaseOid);
|
|
||||||
|
|
||||||
if (IsObjectDistributed(&dbAddress))
|
|
||||||
{
|
|
||||||
UnmarkObjectDistributed(&dbAddress);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* remove database from database shards */
|
|
||||||
DeleteDatabaseShardByDatabaseIdLocally(databaseOid);
|
|
||||||
|
|
||||||
DropDatabase(NULL, (DropdbStmt *) parseTree);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("unsupported command type %d", nodeTag(parseTree))));
|
|
||||||
}
|
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* PreprocessAlterDatabaseStmt is executed before the statement is applied to the local
|
* PreprocessAlterDatabaseStmt is executed before the statement is applied to the local
|
||||||
* postgres instance.
|
* postgres instance.
|
||||||
|
@ -273,16 +195,15 @@ PreprocessAlterDatabaseStmt(Node *node, const char *queryString,
|
||||||
|
|
||||||
EnsureCoordinator();
|
EnsureCoordinator();
|
||||||
|
|
||||||
char *sql = DeparseTreeNode((Node *) stmt);
|
char *sql = DeparseTreeNode((Node *)stmt);
|
||||||
|
|
||||||
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
|
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
|
||||||
(void *) sql,
|
(void *)sql,
|
||||||
ENABLE_DDL_PROPAGATION);
|
ENABLE_DDL_PROPAGATION);
|
||||||
|
|
||||||
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
|
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
#if PG_VERSION_NUM >= PG_VERSION_15
|
#if PG_VERSION_NUM >= PG_VERSION_15
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -305,16 +226,15 @@ PreprocessAlterDatabaseRefreshCollStmt(Node *node, const char *queryString,
|
||||||
|
|
||||||
EnsureCoordinator();
|
EnsureCoordinator();
|
||||||
|
|
||||||
char *sql = DeparseTreeNode((Node *) stmt);
|
char *sql = DeparseTreeNode((Node *)stmt);
|
||||||
|
|
||||||
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
|
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
|
||||||
(void *) sql,
|
(void *)sql,
|
||||||
ENABLE_DDL_PROPAGATION);
|
ENABLE_DDL_PROPAGATION);
|
||||||
|
|
||||||
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
|
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -380,7 +300,7 @@ PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString,
|
||||||
/*
|
/*
|
||||||
* PostprocessCreatedbStmt creates the plan to synchronize CREATE DATABASE
|
* PostprocessCreatedbStmt creates the plan to synchronize CREATE DATABASE
|
||||||
* across nodes. We use the cannotBeExecutedInTransction option to avoid
|
* across nodes. We use the cannotBeExecutedInTransction option to avoid
|
||||||
* u* sending transaction blocks.
|
* sending transaction blocks.
|
||||||
*/
|
*/
|
||||||
List *
|
List *
|
||||||
PostprocessCreateDatabaseStmt(Node *node, const char *queryString)
|
PostprocessCreateDatabaseStmt(Node *node, const char *queryString)
|
||||||
|
@ -425,14 +345,92 @@ PostprocessCreateDatabaseStmt(Node *node, const char *queryString)
|
||||||
}
|
}
|
||||||
|
|
||||||
/* synchronize pg_dist_object records */
|
/* synchronize pg_dist_object records */
|
||||||
ObjectAddress dbAddress = { 0 };
|
ObjectAddress dbAddress = {0};
|
||||||
ObjectAddressSet(dbAddress, DatabaseRelationId, databaseOid);
|
ObjectAddressSet(dbAddress, DatabaseRelationId, databaseOid);
|
||||||
MarkObjectDistributed(&dbAddress);
|
MarkObjectDistributed(&dbAddress);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* citus_internal_database_command is an internal UDF to
|
||||||
|
* create/drop a database in an idempotent maner without
|
||||||
|
* transaction block restrictions.
|
||||||
|
*/
|
||||||
|
Datum citus_internal_database_command(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
int saveNestLevel = NewGUCNestLevel();
|
||||||
|
text *commandText = PG_GETARG_TEXT_P(0);
|
||||||
|
char *command = text_to_cstring(commandText);
|
||||||
|
Node *parseTree = ParseTreeNode(command);
|
||||||
|
|
||||||
|
ereport(NOTICE, (errmsg("test internal pre"),
|
||||||
|
errhint("test pre hint")));
|
||||||
|
|
||||||
|
set_config_option("citus.enable_ddl_propagation", "off",
|
||||||
|
(superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION,
|
||||||
|
GUC_ACTION_LOCAL, true, 0, false);
|
||||||
|
|
||||||
|
set_config_option("citus.enable_create_database_propagation", "off",
|
||||||
|
(superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION,
|
||||||
|
GUC_ACTION_LOCAL, true, 0, false);
|
||||||
|
|
||||||
|
if (IsA(parseTree, CreatedbStmt))
|
||||||
|
{
|
||||||
|
CreatedbStmt *stmt = castNode(CreatedbStmt, parseTree);
|
||||||
|
|
||||||
|
bool missingOk = true;
|
||||||
|
Oid databaseOid = get_database_oid(stmt->dbname, missingOk);
|
||||||
|
|
||||||
|
if (!OidIsValid(databaseOid))
|
||||||
|
{
|
||||||
|
createdb(NULL, (CreatedbStmt *)parseTree);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/* TODO: check database properties */
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (IsA(parseTree, DropdbStmt))
|
||||||
|
{
|
||||||
|
DropdbStmt *stmt = castNode(DropdbStmt, parseTree);
|
||||||
|
|
||||||
|
bool missingOk = true;
|
||||||
|
Oid databaseOid = get_database_oid(stmt->dbname, missingOk);
|
||||||
|
|
||||||
|
if (!OidIsValid(databaseOid))
|
||||||
|
{
|
||||||
|
/* already dropped? */
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/* remove database from pg_dist_object */
|
||||||
|
ObjectAddress dbAddress = {0};
|
||||||
|
ObjectAddressSet(dbAddress, DatabaseRelationId, databaseOid);
|
||||||
|
|
||||||
|
if (IsObjectDistributed(&dbAddress))
|
||||||
|
{
|
||||||
|
UnmarkObjectDistributed(&dbAddress);
|
||||||
|
}
|
||||||
|
|
||||||
|
// /* remove database from database shards */
|
||||||
|
// DeleteDatabaseShardByDatabaseIdLocally(databaseOid);
|
||||||
|
|
||||||
|
DropDatabase(NULL, (DropdbStmt *)parseTree);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("unsupported command type %d", nodeTag(parseTree))));
|
||||||
|
}
|
||||||
|
|
||||||
|
AtEOXact_GUC(true, saveNestLevel);
|
||||||
|
|
||||||
|
PG_RETURN_VOID();
|
||||||
|
}
|
||||||
|
|
||||||
List *
|
List *
|
||||||
PreprocessDropDatabaseStmt(Node *node, const char *queryString,
|
PreprocessDropDatabaseStmt(Node *node, const char *queryString,
|
||||||
ProcessUtilityContext processUtilityContext)
|
ProcessUtilityContext processUtilityContext)
|
||||||
|
@ -442,7 +440,7 @@ PreprocessDropDatabaseStmt(Node *node, const char *queryString,
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
DropdbStmt *stmt = (DropdbStmt *) node;
|
DropdbStmt *stmt = (DropdbStmt *)node;
|
||||||
char *databaseName = stmt->dbname;
|
char *databaseName = stmt->dbname;
|
||||||
bool missingOk = true;
|
bool missingOk = true;
|
||||||
Oid databaseOid = get_database_oid(databaseName, missingOk);
|
Oid databaseOid = get_database_oid(databaseName, missingOk);
|
||||||
|
@ -452,7 +450,7 @@ PreprocessDropDatabaseStmt(Node *node, const char *queryString,
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
ObjectAddress dbAddress = { 0 };
|
ObjectAddress dbAddress = {0};
|
||||||
ObjectAddressSet(dbAddress, DatabaseRelationId, databaseOid);
|
ObjectAddressSet(dbAddress, DatabaseRelationId, databaseOid);
|
||||||
if (!IsObjectDistributed(&dbAddress))
|
if (!IsObjectDistributed(&dbAddress))
|
||||||
{
|
{
|
||||||
|
@ -472,7 +470,6 @@ PreprocessDropDatabaseStmt(Node *node, const char *queryString,
|
||||||
"SELECT pg_catalog.citus_internal_database_command(%s)",
|
"SELECT pg_catalog.citus_internal_database_command(%s)",
|
||||||
quote_literal_cstr(dropDatabaseCommand));
|
quote_literal_cstr(dropDatabaseCommand));
|
||||||
|
|
||||||
|
|
||||||
/* we execute here to avoid EnsureCoordinator check in ExecuteDistributedDDLJob */
|
/* we execute here to avoid EnsureCoordinator check in ExecuteDistributedDDLJob */
|
||||||
bool outsideTransaction = false;
|
bool outsideTransaction = false;
|
||||||
List *taskList = CreateDDLTaskList(internalDropCommand->data, workerNodes,
|
List *taskList = CreateDDLTaskList(internalDropCommand->data, workerNodes,
|
||||||
|
|
|
@ -694,7 +694,7 @@ ProcessUtilityInternal(PlannedStmt *pstmt,
|
||||||
}
|
}
|
||||||
|
|
||||||
/* inform the user about potential caveats */
|
/* inform the user about potential caveats */
|
||||||
if (IsA(parsetree, CreatedbStmt))
|
if (IsA(parsetree, CreatedbStmt) &&!EnableCreateDatabasePropagation)
|
||||||
{
|
{
|
||||||
if (EnableUnsupportedFeatureMessages)
|
if (EnableUnsupportedFeatureMessages)
|
||||||
{
|
{
|
||||||
|
|
|
@ -1,53 +0,0 @@
|
||||||
/*-------------------------------------------------------------------------
|
|
||||||
*
|
|
||||||
* database_lock.c
|
|
||||||
* Functions for locking a database.
|
|
||||||
*
|
|
||||||
* Copyright (c) Microsoft, Inc.
|
|
||||||
*
|
|
||||||
*-------------------------------------------------------------------------
|
|
||||||
*/
|
|
||||||
#include "postgres.h"
|
|
||||||
#include "funcapi.h"
|
|
||||||
#include "fmgr.h"
|
|
||||||
#include "miscadmin.h"
|
|
||||||
|
|
||||||
#include "catalog/pg_database.h"
|
|
||||||
#include "commands/dbcommands.h"
|
|
||||||
#include "distributed/metadata_cache.h"
|
|
||||||
#include "storage/lmgr.h"
|
|
||||||
|
|
||||||
|
|
||||||
static void CitusDatabaseLock(Oid databaseId);
|
|
||||||
|
|
||||||
|
|
||||||
PG_FUNCTION_INFO_V1(citus_database_lock_by_name);
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* citus_database_lock locks the given database in access exclusive mode
|
|
||||||
* to temporarily block new connections.
|
|
||||||
*/
|
|
||||||
Datum
|
|
||||||
citus_database_lock_by_name(PG_FUNCTION_ARGS)
|
|
||||||
{
|
|
||||||
CheckCitusVersion(ERROR);
|
|
||||||
|
|
||||||
Name databaseName = PG_GETARG_NAME(0);
|
|
||||||
bool missingOk = false;
|
|
||||||
Oid databaseId = get_database_oid(NameStr(*databaseName), missingOk);
|
|
||||||
|
|
||||||
CitusDatabaseLock(databaseId);
|
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* CitusDatabaseLock locks a database for new connections.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
CitusDatabaseLock(Oid databaseId)
|
|
||||||
{
|
|
||||||
LockSharedObject(DatabaseRelationId, databaseId, 0, ExclusiveLock);
|
|
||||||
}
|
|
|
@ -1,546 +0,0 @@
|
||||||
/*-------------------------------------------------------------------------
|
|
||||||
*
|
|
||||||
* database_sharding.c
|
|
||||||
*
|
|
||||||
* This file contains module-level definitions.
|
|
||||||
*
|
|
||||||
* Copyright (c) 2023, Microsoft, Inc.
|
|
||||||
*
|
|
||||||
*-------------------------------------------------------------------------
|
|
||||||
*/
|
|
||||||
|
|
||||||
#include "postgres.h"
|
|
||||||
#include "fmgr.h"
|
|
||||||
#include "miscadmin.h"
|
|
||||||
|
|
||||||
#include "citus_version.h"
|
|
||||||
#include "pg_version_compat.h"
|
|
||||||
|
|
||||||
#include "access/genam.h"
|
|
||||||
#include "commands/dbcommands.h"
|
|
||||||
#include "distributed/connection_management.h"
|
|
||||||
#include "distributed/database/database_sharding.h"
|
|
||||||
#include "distributed/deparser.h"
|
|
||||||
#include "distributed/deparse_shard_query.h"
|
|
||||||
#include "distributed/listutils.h"
|
|
||||||
#include "distributed/metadata_sync.h"
|
|
||||||
#include "distributed/pooler/pgbouncer_manager.h"
|
|
||||||
#include "distributed/remote_commands.h"
|
|
||||||
#include "distributed/shared_library_init.h"
|
|
||||||
#include "distributed/worker_transaction.h"
|
|
||||||
#include "executor/spi.h"
|
|
||||||
#include "nodes/makefuncs.h"
|
|
||||||
#include "nodes/parsenodes.h"
|
|
||||||
#include "postmaster/postmaster.h"
|
|
||||||
#include "tcop/utility.h"
|
|
||||||
#include "utils/builtins.h"
|
|
||||||
#include "utils/fmgroids.h"
|
|
||||||
#include "catalog/pg_database.h"
|
|
||||||
|
|
||||||
|
|
||||||
static void ExecuteCommandInControlDatabase(char *command);
|
|
||||||
static void AllowConnectionsOnlyOnNodeGroup(Oid databaseOid, Oid nodeGroupId);
|
|
||||||
static void InsertDatabaseShardAssignment(Oid databaseOid, int nodeGroupId);
|
|
||||||
static void InsertDatabaseShardAssignmentLocally(Oid databaseOid, int nodeGroupId);
|
|
||||||
static void InsertDatabaseShardAssignmentOnOtherNodes(Oid databaseOid, int nodeGroupId);
|
|
||||||
static void DeleteDatabaseShardByDatabaseId(Oid databaseOid);
|
|
||||||
static void DeleteDatabaseShardByDatabaseIdOnOtherNodes(Oid databaseOid);
|
|
||||||
static DatabaseShard * TupleToDatabaseShard(HeapTuple heapTuple,
|
|
||||||
TupleDesc tupleDescriptor);
|
|
||||||
static char * DeleteDatabaseShardByDatabaseIdCommand(Oid databaseOid);
|
|
||||||
|
|
||||||
|
|
||||||
PG_FUNCTION_INFO_V1(database_shard_assign);
|
|
||||||
PG_FUNCTION_INFO_V1(citus_internal_add_database_shard);
|
|
||||||
PG_FUNCTION_INFO_V1(citus_internal_delete_database_shard);
|
|
||||||
|
|
||||||
|
|
||||||
/* citus.enable_database_sharding setting */
|
|
||||||
bool EnableDatabaseSharding = false;
|
|
||||||
|
|
||||||
/* citus.database_sharding_pgbouncer_file setting */
|
|
||||||
char *DatabaseShardingPgBouncerFile = "";
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* PreProcessUtilityInDatabaseShard handles DDL commands that occur within a
|
|
||||||
* database shard and require global coordination:
|
|
||||||
* - CREATE/ALTER/DROP DATABASE
|
|
||||||
* - CREATE/ALTER/DROP ROLE/USER/GROUP
|
|
||||||
*/
|
|
||||||
void
|
|
||||||
PreProcessUtilityInDatabaseShard(Node *parseTree, const char *queryString,
|
|
||||||
ProcessUtilityContext context,
|
|
||||||
bool *runPreviousUtilityHook)
|
|
||||||
{
|
|
||||||
if (!EnableDatabaseSharding || context != PROCESS_UTILITY_TOPLEVEL)
|
|
||||||
{
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (EnableCreateDatabasePropagation)
|
|
||||||
{
|
|
||||||
if (IsA(parseTree, CreatedbStmt))
|
|
||||||
{
|
|
||||||
char *command = DeparseCreateDatabaseStmt(parseTree);
|
|
||||||
ExecuteCommandInControlDatabase(command);
|
|
||||||
|
|
||||||
/* command is fully delegated to control database */
|
|
||||||
*runPreviousUtilityHook = false;
|
|
||||||
}
|
|
||||||
else if (IsA(parseTree, DropdbStmt))
|
|
||||||
{
|
|
||||||
char *command = DeparseDropDatabaseStmt(parseTree);
|
|
||||||
ExecuteCommandInControlDatabase(command);
|
|
||||||
|
|
||||||
/* command is fully delegated to control database */
|
|
||||||
*runPreviousUtilityHook = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* PostProcessUtilityInDatabaseShard is currently a noop.
|
|
||||||
*/
|
|
||||||
void
|
|
||||||
PostProcessUtilityInDatabaseShard(Node *parseTree, const char *queryString,
|
|
||||||
ProcessUtilityContext context)
|
|
||||||
{
|
|
||||||
if (!EnableDatabaseSharding || context != PROCESS_UTILITY_TOPLEVEL)
|
|
||||||
{
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* ExecuteCommandInControlDatabase connects to localhost to execute a command
|
|
||||||
* in the main Citus database.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
ExecuteCommandInControlDatabase(char *command)
|
|
||||||
{
|
|
||||||
int connectionFlag = FORCE_NEW_CONNECTION;
|
|
||||||
|
|
||||||
MultiConnection *connection =
|
|
||||||
GetNodeUserDatabaseConnection(connectionFlag, LocalHostName, PostPortNumber,
|
|
||||||
NULL, CitusMainDatabase);
|
|
||||||
|
|
||||||
ExecuteCriticalRemoteCommand(connection,
|
|
||||||
"SET application_name TO 'citus_database_shard'");
|
|
||||||
ExecuteCriticalRemoteCommand(connection, command);
|
|
||||||
CloseConnection(connection);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* database_shard_assign assigns an existing database to a node.
|
|
||||||
*/
|
|
||||||
Datum
|
|
||||||
database_shard_assign(PG_FUNCTION_ARGS)
|
|
||||||
{
|
|
||||||
CheckCitusVersion(ERROR);
|
|
||||||
|
|
||||||
char *databaseName = text_to_cstring(PG_GETARG_TEXT_P(0));
|
|
||||||
|
|
||||||
bool missingOk = false;
|
|
||||||
Oid databaseOid = get_database_oid(databaseName, missingOk);
|
|
||||||
|
|
||||||
if (!pg_database_ownercheck(databaseOid, GetUserId()))
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
|
|
||||||
errmsg("permission denied to assign database \"%s\" "
|
|
||||||
"to a shard",
|
|
||||||
databaseName)));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (GetDatabaseShardByOid(databaseOid) != NULL)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("database is already assigned to a shard")));
|
|
||||||
}
|
|
||||||
|
|
||||||
AssignDatabaseToShard(databaseOid);
|
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* AssignDatabaseToShard finds a suitable node for the given
|
|
||||||
* database and assigns it.
|
|
||||||
*/
|
|
||||||
void
|
|
||||||
AssignDatabaseToShard(Oid databaseOid)
|
|
||||||
{
|
|
||||||
int nodeGroupId = GetLocalGroupId();
|
|
||||||
|
|
||||||
List *workerNodes = TargetWorkerSetNodeList(ALL_SHARD_NODES, RowShareLock);
|
|
||||||
if (list_length(workerNodes) > 0)
|
|
||||||
{
|
|
||||||
/* TODO: actually look for available space */
|
|
||||||
int workerNodeIndex = databaseOid % list_length(workerNodes);
|
|
||||||
WorkerNode *workerNode = list_nth(workerNodes, workerNodeIndex);
|
|
||||||
nodeGroupId = workerNode->groupId;
|
|
||||||
}
|
|
||||||
|
|
||||||
InsertDatabaseShardAssignment(databaseOid, nodeGroupId);
|
|
||||||
AllowConnectionsOnlyOnNodeGroup(databaseOid, nodeGroupId);
|
|
||||||
|
|
||||||
ReconfigurePgBouncersOnCommit = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* AllowConnectionsOnlyOnNodeGroup sets the ALLOW_CONNECTIONS properties on
|
|
||||||
* the database to false, except on nodeGroupId.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
AllowConnectionsOnlyOnNodeGroup(Oid databaseOid, Oid nodeGroupId)
|
|
||||||
{
|
|
||||||
StringInfo command = makeStringInfo();
|
|
||||||
char *databaseName = get_database_name(databaseOid);
|
|
||||||
|
|
||||||
List *workerNodes = TargetWorkerSetNodeList(ALL_SHARD_NODES, RowShareLock);
|
|
||||||
WorkerNode *workerNode = NULL;
|
|
||||||
|
|
||||||
foreach_ptr(workerNode, workerNodes)
|
|
||||||
{
|
|
||||||
resetStringInfo(command);
|
|
||||||
|
|
||||||
if (workerNode->groupId == nodeGroupId)
|
|
||||||
{
|
|
||||||
appendStringInfo(command, "GRANT CONNECT ON DATABASE %s TO public",
|
|
||||||
quote_identifier(databaseName));
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
appendStringInfo(command, "REVOKE CONNECT ON DATABASE %s FROM public",
|
|
||||||
quote_identifier(databaseName));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (workerNode->groupId == GetLocalGroupId())
|
|
||||||
{
|
|
||||||
ExecuteQueryViaSPI(command->data, SPI_OK_UTILITY);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
SendCommandToWorker(workerNode->workerName, workerNode->workerPort,
|
|
||||||
command->data);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* InsertDatabaseShardAssignment inserts a record into the local
|
|
||||||
* citus_catalog.database_sharding table.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
InsertDatabaseShardAssignment(Oid databaseOid, int nodeGroupId)
|
|
||||||
{
|
|
||||||
InsertDatabaseShardAssignmentLocally(databaseOid, nodeGroupId);
|
|
||||||
|
|
||||||
if (EnableMetadataSync)
|
|
||||||
{
|
|
||||||
InsertDatabaseShardAssignmentOnOtherNodes(databaseOid, nodeGroupId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* InsertDatabaseShardAssignmentLocally inserts a record into the local
|
|
||||||
* citus_catalog.database_sharding table.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
InsertDatabaseShardAssignmentLocally(Oid databaseOid, int nodeGroupId)
|
|
||||||
{
|
|
||||||
Datum values[Natts_database_shard];
|
|
||||||
bool isNulls[Natts_database_shard];
|
|
||||||
|
|
||||||
/* form new shard tuple */
|
|
||||||
memset(values, 0, sizeof(values));
|
|
||||||
memset(isNulls, false, sizeof(isNulls));
|
|
||||||
|
|
||||||
values[Anum_database_shard_database_id - 1] = ObjectIdGetDatum(databaseOid);
|
|
||||||
values[Anum_database_shard_node_group_id - 1] = Int32GetDatum(nodeGroupId);
|
|
||||||
values[Anum_database_shard_is_available - 1] = BoolGetDatum(true);
|
|
||||||
|
|
||||||
/* open shard relation and insert new tuple */
|
|
||||||
Relation databaseShardTable = table_open(DatabaseShardRelationId(), RowExclusiveLock);
|
|
||||||
|
|
||||||
TupleDesc tupleDescriptor = RelationGetDescr(databaseShardTable);
|
|
||||||
HeapTuple heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
|
|
||||||
|
|
||||||
CatalogTupleInsert(databaseShardTable, heapTuple);
|
|
||||||
|
|
||||||
CommandCounterIncrement();
|
|
||||||
table_close(databaseShardTable, NoLock);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* InsertDatabaseShardAssignmentOnOtherNodes inserts a record into the
|
|
||||||
* citus_catalog.database_sharding table on other nodes.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
InsertDatabaseShardAssignmentOnOtherNodes(Oid databaseOid, int nodeGroupId)
|
|
||||||
{
|
|
||||||
char *insertCommand = InsertDatabaseShardAssignmentCommand(databaseOid, nodeGroupId);
|
|
||||||
SendCommandToWorkersWithMetadata(insertCommand);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* UpdateDatabaseShard updates a database shard after it is moved to a new node.
|
|
||||||
*/
|
|
||||||
void
|
|
||||||
UpdateDatabaseShard(Oid databaseOid, int targetNodeGroupId)
|
|
||||||
{
|
|
||||||
DeleteDatabaseShardByDatabaseId(databaseOid);
|
|
||||||
InsertDatabaseShardAssignment(databaseOid, targetNodeGroupId);
|
|
||||||
AllowConnectionsOnlyOnNodeGroup(databaseOid, targetNodeGroupId);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* DeleteDatabaseShardByDatabaseId deletes a record from the
|
|
||||||
* citus_catalog.database_sharding table.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
DeleteDatabaseShardByDatabaseId(Oid databaseOid)
|
|
||||||
{
|
|
||||||
DeleteDatabaseShardByDatabaseIdLocally(databaseOid);
|
|
||||||
|
|
||||||
if (EnableMetadataSync)
|
|
||||||
{
|
|
||||||
DeleteDatabaseShardByDatabaseIdOnOtherNodes(databaseOid);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* DeleteDatabaseShardByDatabaseIdLocally deletes a database_shard record by database OID.
|
|
||||||
*/
|
|
||||||
void
|
|
||||||
DeleteDatabaseShardByDatabaseIdLocally(Oid databaseOid)
|
|
||||||
{
|
|
||||||
Relation databaseShardTable = table_open(DatabaseShardRelationId(),
|
|
||||||
RowExclusiveLock);
|
|
||||||
|
|
||||||
const int scanKeyCount = 1;
|
|
||||||
ScanKeyData scanKey[1];
|
|
||||||
bool indexOK = true;
|
|
||||||
|
|
||||||
ScanKeyInit(&scanKey[0], Anum_database_shard_database_id,
|
|
||||||
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(databaseOid));
|
|
||||||
|
|
||||||
SysScanDesc scanDescriptor = systable_beginscan(databaseShardTable,
|
|
||||||
DatabaseShardPrimaryKeyIndexId(),
|
|
||||||
indexOK,
|
|
||||||
NULL, scanKeyCount, scanKey);
|
|
||||||
|
|
||||||
HeapTuple heapTuple = systable_getnext(scanDescriptor);
|
|
||||||
if (heapTuple != NULL)
|
|
||||||
{
|
|
||||||
simple_heap_delete(databaseShardTable, &heapTuple->t_self);
|
|
||||||
}
|
|
||||||
|
|
||||||
systable_endscan(scanDescriptor);
|
|
||||||
|
|
||||||
CommandCounterIncrement();
|
|
||||||
table_close(databaseShardTable, NoLock);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* DeleteDatabaseShardByDatabaseIdOnOtherNodes deletes a record from the
|
|
||||||
* citus_catalog.database_sharding table on other nodes.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
DeleteDatabaseShardByDatabaseIdOnOtherNodes(Oid databaseOid)
|
|
||||||
{
|
|
||||||
char *deleteCommand = DeleteDatabaseShardByDatabaseIdCommand(databaseOid);
|
|
||||||
SendCommandToWorkersWithMetadata(deleteCommand);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* ListDatabaseShards lists all database shards in citus_catalog.database_shard.
|
|
||||||
*/
|
|
||||||
List *
|
|
||||||
ListDatabaseShards(void)
|
|
||||||
{
|
|
||||||
Relation databaseShardTable = table_open(DatabaseShardRelationId(), AccessShareLock);
|
|
||||||
TupleDesc tupleDescriptor = RelationGetDescr(databaseShardTable);
|
|
||||||
|
|
||||||
List *dbShardList = NIL;
|
|
||||||
int scanKeyCount = 0;
|
|
||||||
bool indexOK = false;
|
|
||||||
|
|
||||||
SysScanDesc scanDescriptor = systable_beginscan(databaseShardTable, InvalidOid,
|
|
||||||
indexOK, NULL, scanKeyCount, NULL);
|
|
||||||
|
|
||||||
HeapTuple heapTuple = NULL;
|
|
||||||
while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor)))
|
|
||||||
{
|
|
||||||
DatabaseShard *dbShard = TupleToDatabaseShard(heapTuple, tupleDescriptor);
|
|
||||||
dbShardList = lappend(dbShardList, dbShard);
|
|
||||||
}
|
|
||||||
|
|
||||||
systable_endscan(scanDescriptor);
|
|
||||||
table_close(databaseShardTable, NoLock);
|
|
||||||
|
|
||||||
return dbShardList;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* GetDatabaseShardByOid gets a database shard by database OID or
|
|
||||||
* NULL if no database shard could be found.
|
|
||||||
*/
|
|
||||||
DatabaseShard *
|
|
||||||
GetDatabaseShardByOid(Oid databaseOid)
|
|
||||||
{
|
|
||||||
DatabaseShard *result = NULL;
|
|
||||||
|
|
||||||
Relation databaseShardTable = table_open(DatabaseShardRelationId(), AccessShareLock);
|
|
||||||
TupleDesc tupleDescriptor = RelationGetDescr(databaseShardTable);
|
|
||||||
|
|
||||||
const int scanKeyCount = 1;
|
|
||||||
ScanKeyData scanKey[1];
|
|
||||||
bool indexOK = true;
|
|
||||||
|
|
||||||
ScanKeyInit(&scanKey[0], Anum_database_shard_database_id,
|
|
||||||
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(databaseOid));
|
|
||||||
|
|
||||||
SysScanDesc scanDescriptor = systable_beginscan(databaseShardTable,
|
|
||||||
DatabaseShardPrimaryKeyIndexId(),
|
|
||||||
indexOK,
|
|
||||||
NULL, scanKeyCount, scanKey);
|
|
||||||
|
|
||||||
HeapTuple heapTuple = systable_getnext(scanDescriptor);
|
|
||||||
if (HeapTupleIsValid(heapTuple))
|
|
||||||
{
|
|
||||||
result = TupleToDatabaseShard(heapTuple, tupleDescriptor);
|
|
||||||
}
|
|
||||||
|
|
||||||
systable_endscan(scanDescriptor);
|
|
||||||
table_close(databaseShardTable, NoLock);
|
|
||||||
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* TupleToDatabaseShard converts a database_shard record tuple into a DatabaseShard struct.
|
|
||||||
*/
|
|
||||||
static DatabaseShard *
|
|
||||||
TupleToDatabaseShard(HeapTuple heapTuple, TupleDesc tupleDescriptor)
|
|
||||||
{
|
|
||||||
Datum datumArray[Natts_database_shard];
|
|
||||||
bool isNullArray[Natts_database_shard];
|
|
||||||
heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
|
|
||||||
|
|
||||||
DatabaseShard *record = palloc0(sizeof(DatabaseShard));
|
|
||||||
|
|
||||||
record->databaseOid =
|
|
||||||
DatumGetObjectId(datumArray[Anum_database_shard_database_id - 1]);
|
|
||||||
|
|
||||||
record->nodeGroupId =
|
|
||||||
DatumGetInt32(datumArray[Anum_database_shard_node_group_id - 1]);
|
|
||||||
|
|
||||||
record->isAvailable =
|
|
||||||
DatumGetBool(datumArray[Anum_database_shard_is_available - 1]);
|
|
||||||
|
|
||||||
return record;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* citus_internal_add_database_shard is an internal UDF to
|
|
||||||
* add a row to database_shard.
|
|
||||||
*/
|
|
||||||
Datum
|
|
||||||
citus_internal_add_database_shard(PG_FUNCTION_ARGS)
|
|
||||||
{
|
|
||||||
char *databaseName = TextDatumGetCString(PG_GETARG_DATUM(0));
|
|
||||||
int nodeGroupId = PG_GETARG_INT32(1);
|
|
||||||
|
|
||||||
bool missingOk = false;
|
|
||||||
Oid databaseOid = get_database_oid(databaseName, missingOk);
|
|
||||||
|
|
||||||
if (!pg_database_ownercheck(databaseOid, GetUserId()))
|
|
||||||
{
|
|
||||||
aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
|
|
||||||
databaseName);
|
|
||||||
}
|
|
||||||
|
|
||||||
InsertDatabaseShardAssignmentLocally(databaseOid, nodeGroupId);
|
|
||||||
|
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* InsertDatabaseShardAssignmentCommand returns a command to insert a database shard
|
|
||||||
* assignment into the metadata on a remote node.
|
|
||||||
*/
|
|
||||||
char *
|
|
||||||
InsertDatabaseShardAssignmentCommand(Oid databaseOid, int nodeGroupId)
|
|
||||||
{
|
|
||||||
StringInfo command = makeStringInfo();
|
|
||||||
char *databaseName = get_database_name(databaseOid);
|
|
||||||
|
|
||||||
appendStringInfo(command,
|
|
||||||
"SELECT pg_catalog.citus_internal_add_database_shard(%s,%d)",
|
|
||||||
quote_literal_cstr(databaseName),
|
|
||||||
nodeGroupId);
|
|
||||||
|
|
||||||
return command->data;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* citus_internal_delete_database_shard is an internal UDF to
|
|
||||||
* delete a row from database_shard.
|
|
||||||
*/
|
|
||||||
Datum
|
|
||||||
citus_internal_delete_database_shard(PG_FUNCTION_ARGS)
|
|
||||||
{
|
|
||||||
char *databaseName = TextDatumGetCString(PG_GETARG_DATUM(0));
|
|
||||||
|
|
||||||
bool missingOk = false;
|
|
||||||
Oid databaseOid = get_database_oid(databaseName, missingOk);
|
|
||||||
|
|
||||||
if (!pg_database_ownercheck(databaseOid, GetUserId()))
|
|
||||||
{
|
|
||||||
aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
|
|
||||||
databaseName);
|
|
||||||
}
|
|
||||||
|
|
||||||
DeleteDatabaseShardByDatabaseIdLocally(databaseOid);
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* DeleteDatabaseShardByDatabaseIdCommand returns a command to delete a database shard
|
|
||||||
* assignment from the metadata on a remote node.
|
|
||||||
*/
|
|
||||||
static char *
|
|
||||||
DeleteDatabaseShardByDatabaseIdCommand(Oid databaseOid)
|
|
||||||
{
|
|
||||||
StringInfo command = makeStringInfo();
|
|
||||||
char *databaseName = get_database_name(databaseOid);
|
|
||||||
|
|
||||||
appendStringInfo(command,
|
|
||||||
"SELECT pg_catalog.citus_internal_delete_database_shard(%s)",
|
|
||||||
quote_literal_cstr(databaseName));
|
|
||||||
|
|
||||||
return command->data;
|
|
||||||
}
|
|
|
@ -1,180 +0,0 @@
|
||||||
/*-------------------------------------------------------------------------
|
|
||||||
*
|
|
||||||
* database_size.c
|
|
||||||
* Functions for getting the size of a database.
|
|
||||||
*
|
|
||||||
* Copyright (c) Microsoft, Inc.
|
|
||||||
*
|
|
||||||
*-------------------------------------------------------------------------
|
|
||||||
*/
|
|
||||||
#include "postgres.h"
|
|
||||||
#include "funcapi.h"
|
|
||||||
#include "fmgr.h"
|
|
||||||
#include "miscadmin.h"
|
|
||||||
|
|
||||||
#include "commands/dbcommands.h"
|
|
||||||
#include "distributed/citus_safe_lib.h"
|
|
||||||
#include "distributed/database/database_sharding.h"
|
|
||||||
#include "distributed/listutils.h"
|
|
||||||
#include "distributed/metadata_cache.h"
|
|
||||||
#include "distributed/remote_commands.h"
|
|
||||||
#include "distributed/worker_transaction.h"
|
|
||||||
#include "utils/builtins.h"
|
|
||||||
|
|
||||||
|
|
||||||
static int64 GetLocalDatabaseSize(Oid databaseId);
|
|
||||||
static int64 CitusDatabaseShardSize(DatabaseShard *dbShard);
|
|
||||||
static int64 CitusDatabaseSizeOnNodeList(Oid databaseId, List *workerNodeList);
|
|
||||||
|
|
||||||
|
|
||||||
PG_FUNCTION_INFO_V1(citus_database_size_oid);
|
|
||||||
PG_FUNCTION_INFO_V1(citus_database_size_name);
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* citus_database_size_oid returns the size of a Citus database
|
|
||||||
* with the given oid.
|
|
||||||
*/
|
|
||||||
Datum
|
|
||||||
citus_database_size_oid(PG_FUNCTION_ARGS)
|
|
||||||
{
|
|
||||||
CheckCitusVersion(ERROR);
|
|
||||||
|
|
||||||
Oid databaseId = PG_GETARG_OID(0);
|
|
||||||
int64 size = CitusDatabaseSize(databaseId);
|
|
||||||
|
|
||||||
PG_RETURN_INT64(size);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* citus_database_size_name returns the size of a Citus database
|
|
||||||
* with the given name.
|
|
||||||
*/
|
|
||||||
Datum
|
|
||||||
citus_database_size_name(PG_FUNCTION_ARGS)
|
|
||||||
{
|
|
||||||
CheckCitusVersion(ERROR);
|
|
||||||
|
|
||||||
Name databaseName = PG_GETARG_NAME(0);
|
|
||||||
bool missingOk = false;
|
|
||||||
Oid databaseId = get_database_oid(NameStr(*databaseName), missingOk);
|
|
||||||
|
|
||||||
int64 size = CitusDatabaseSize(databaseId);
|
|
||||||
|
|
||||||
PG_RETURN_INT64(size);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* CitusDatabaseSize returns the size of a Citus database.
|
|
||||||
*/
|
|
||||||
int64
|
|
||||||
CitusDatabaseSize(Oid databaseId)
|
|
||||||
{
|
|
||||||
DatabaseShard *dbShard = GetDatabaseShardByOid(databaseId);
|
|
||||||
if (dbShard != NULL)
|
|
||||||
{
|
|
||||||
/* for known database shards, get the remote size */
|
|
||||||
return CitusDatabaseShardSize(dbShard);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (databaseId == MyDatabaseId)
|
|
||||||
{
|
|
||||||
/* for the current database, get the size from all nodes */
|
|
||||||
List *workerNodes = TargetWorkerSetNodeList(ALL_SHARD_NODES, RowShareLock);
|
|
||||||
return CitusDatabaseSizeOnNodeList(databaseId, workerNodes);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* for other databases, get the local size */
|
|
||||||
/* TODO: get it from main database? */
|
|
||||||
return GetLocalDatabaseSize(databaseId);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* GetLocalDatabaseSize returns the local database size by calling pg_database_size.
|
|
||||||
*/
|
|
||||||
static int64
|
|
||||||
GetLocalDatabaseSize(Oid databaseId)
|
|
||||||
{
|
|
||||||
Datum databaseIdDatum = ObjectIdGetDatum(databaseId);
|
|
||||||
Datum sizeDatum = DirectFunctionCall1(pg_database_size_oid, databaseIdDatum);
|
|
||||||
return DatumGetInt64(sizeDatum);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* CitusDatabaseShardSize gets the database size for a specific
|
|
||||||
* shard.
|
|
||||||
*/
|
|
||||||
static int64
|
|
||||||
CitusDatabaseShardSize(DatabaseShard *dbShard)
|
|
||||||
{
|
|
||||||
WorkerNode *workerNode = LookupNodeForGroup(dbShard->nodeGroupId);
|
|
||||||
|
|
||||||
return CitusDatabaseSizeOnNodeList(dbShard->databaseOid, list_make1(workerNode));
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* CitusDatabaseSizeOnNodeList returns the sum of the sizes
|
|
||||||
* for a given database from all nodes in the list.
|
|
||||||
*/
|
|
||||||
static int64
|
|
||||||
CitusDatabaseSizeOnNodeList(Oid databaseId, List *workerNodeList)
|
|
||||||
{
|
|
||||||
int64 size = 0;
|
|
||||||
|
|
||||||
bool raiseInterrupts = true;
|
|
||||||
|
|
||||||
char *databaseName = get_database_name(databaseId);
|
|
||||||
char *command = psprintf("SELECT pg_catalog.pg_database_size(%s)",
|
|
||||||
quote_literal_cstr(databaseName));
|
|
||||||
|
|
||||||
WorkerNode *workerNode = NULL;
|
|
||||||
foreach_ptr(workerNode, workerNodeList)
|
|
||||||
{
|
|
||||||
if (workerNode->groupId == GetLocalGroupId())
|
|
||||||
{
|
|
||||||
return GetLocalDatabaseSize(databaseId);
|
|
||||||
}
|
|
||||||
|
|
||||||
int connectionFlags = 0;
|
|
||||||
MultiConnection *connection = GetNodeConnection(connectionFlags,
|
|
||||||
workerNode->workerName,
|
|
||||||
workerNode->workerPort);
|
|
||||||
|
|
||||||
int querySent = SendRemoteCommand(connection, command);
|
|
||||||
if (querySent == 0)
|
|
||||||
{
|
|
||||||
ReportConnectionError(connection, ERROR);
|
|
||||||
}
|
|
||||||
|
|
||||||
PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts);
|
|
||||||
if (!IsResponseOK(result))
|
|
||||||
{
|
|
||||||
ReportResultError(connection, result, ERROR);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (PQntuples(result) != 1 || PQnfields(result) != 1)
|
|
||||||
{
|
|
||||||
PQclear(result);
|
|
||||||
ClearResults(connection, raiseInterrupts);
|
|
||||||
|
|
||||||
ereport(ERROR, (errmsg("unexpected number of columns returned by: %s",
|
|
||||||
command)));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!PQgetisnull(result, 0, 0))
|
|
||||||
{
|
|
||||||
char *sizeString = PQgetvalue(result, 0, 0);
|
|
||||||
size += SafeStringToUint64(sizeString);
|
|
||||||
}
|
|
||||||
|
|
||||||
PQclear(result);
|
|
||||||
ClearResults(connection, raiseInterrupts);
|
|
||||||
}
|
|
||||||
|
|
||||||
return size;
|
|
||||||
}
|
|
|
@ -208,7 +208,6 @@ typedef struct MetadataCacheData
|
||||||
Oid distTransactionGroupIndexId;
|
Oid distTransactionGroupIndexId;
|
||||||
Oid distTenantSchemaPrimaryKeyIndexId;
|
Oid distTenantSchemaPrimaryKeyIndexId;
|
||||||
Oid distTenantSchemaUniqueColocationIdIndexId;
|
Oid distTenantSchemaUniqueColocationIdIndexId;
|
||||||
Oid citusCatalogNamespaceId;
|
|
||||||
Oid copyFormatTypeId;
|
Oid copyFormatTypeId;
|
||||||
Oid readIntermediateResultFuncId;
|
Oid readIntermediateResultFuncId;
|
||||||
Oid readIntermediateResultArrayFuncId;
|
Oid readIntermediateResultArrayFuncId;
|
||||||
|
@ -312,7 +311,6 @@ static HeapTuple LookupDistPartitionTuple(Relation pgDistPartition, Oid relation
|
||||||
static void GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod,
|
static void GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod,
|
||||||
Oid *columnTypeId, int32 *columnTypeMod,
|
Oid *columnTypeId, int32 *columnTypeMod,
|
||||||
Oid *intervalTypeId, int32 *intervalTypeMod);
|
Oid *intervalTypeId, int32 *intervalTypeMod);
|
||||||
static void CachedNamespaceLookup(const char *nspname, Oid *cachedOid);
|
|
||||||
static void CachedRelationLookup(const char *relationName, Oid *cachedOid);
|
static void CachedRelationLookup(const char *relationName, Oid *cachedOid);
|
||||||
static void CachedRelationLookupExtended(const char *relationName, Oid *cachedOid,
|
static void CachedRelationLookupExtended(const char *relationName, Oid *cachedOid,
|
||||||
bool missing_ok);
|
bool missing_ok);
|
||||||
|
@ -2771,36 +2769,7 @@ DistRebalanceStrategyRelationId(void)
|
||||||
return MetadataCache.distRebalanceStrategyRelationId;
|
return MetadataCache.distRebalanceStrategyRelationId;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* return oid of citus_catalog.database_sharding relation */
|
|
||||||
Oid
|
|
||||||
DatabaseShardRelationId(void)
|
|
||||||
{
|
|
||||||
CachedRelationNamespaceLookup("database_shard", CitusCatalogNamespaceId(),
|
|
||||||
&MetadataCache.databaseShardRelationId);
|
|
||||||
|
|
||||||
return MetadataCache.databaseShardRelationId;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/* return oid of citus_catalog.database_sharding primary key */
|
|
||||||
Oid
|
|
||||||
DatabaseShardPrimaryKeyIndexId(void)
|
|
||||||
{
|
|
||||||
CachedRelationNamespaceLookup("database_shard_pkey", CitusCatalogNamespaceId(),
|
|
||||||
&MetadataCache.databaseShardPKeyIndexId);
|
|
||||||
|
|
||||||
return MetadataCache.databaseShardPKeyIndexId;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/* return the oid of citus namespace */
|
|
||||||
Oid
|
|
||||||
CitusCatalogNamespaceId(void)
|
|
||||||
{
|
|
||||||
CachedNamespaceLookup("citus_catalog", &MetadataCache.citusCatalogNamespaceId);
|
|
||||||
return MetadataCache.citusCatalogNamespaceId;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/* return oid of pg_dist_object relation */
|
/* return oid of pg_dist_object relation */
|
||||||
|
@ -2870,17 +2839,6 @@ DistObjectPrimaryKeyIndexId(void)
|
||||||
&MetadataCache.distObjectPrimaryKeyIndexId,
|
&MetadataCache.distObjectPrimaryKeyIndexId,
|
||||||
true);
|
true);
|
||||||
|
|
||||||
if (!OidIsValid(MetadataCache.distObjectPrimaryKeyIndexId))
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* We can only ever reach here while we are creating/altering our extension before
|
|
||||||
* the table is moved to pg_catalog.
|
|
||||||
*/
|
|
||||||
CachedRelationNamespaceLookupExtended("pg_dist_object_pkey",
|
|
||||||
CitusCatalogNamespaceId(),
|
|
||||||
&MetadataCache.distObjectPrimaryKeyIndexId,
|
|
||||||
false);
|
|
||||||
}
|
|
||||||
|
|
||||||
return MetadataCache.distObjectPrimaryKeyIndexId;
|
return MetadataCache.distObjectPrimaryKeyIndexId;
|
||||||
}
|
}
|
||||||
|
@ -5446,28 +5404,6 @@ DeformedDistShardTupleToShardInterval(Datum *datumArray, bool *isNullArray,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* CachedNamespaceLookup performs a cached lookup for the namespace (schema), with the
|
|
||||||
* result cached in cachedOid.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
CachedNamespaceLookup(const char *nspname, Oid *cachedOid)
|
|
||||||
{
|
|
||||||
/* force callbacks to be registered, so we always get notified upon changes */
|
|
||||||
InitializeCaches();
|
|
||||||
|
|
||||||
if (*cachedOid == InvalidOid)
|
|
||||||
{
|
|
||||||
*cachedOid = get_namespace_oid(nspname, true);
|
|
||||||
|
|
||||||
if (*cachedOid == InvalidOid)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg(
|
|
||||||
"cache lookup failed for namespace %s, called too early?",
|
|
||||||
nspname)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -1264,6 +1264,17 @@ RegisterCitusConfigVariables(void)
|
||||||
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,
|
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,
|
||||||
NULL, NULL, NULL);
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
|
DefineCustomBoolVariable(
|
||||||
|
"citus.enable_create_database_propagation",
|
||||||
|
gettext_noop("Enables propagating CREATE DATABASE "
|
||||||
|
"and DROP DATABASE statements to workers"),
|
||||||
|
NULL,
|
||||||
|
&EnableCreateDatabasePropagation,
|
||||||
|
true,
|
||||||
|
PGC_USERSET,
|
||||||
|
GUC_STANDARD,
|
||||||
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
DefineCustomBoolVariable(
|
DefineCustomBoolVariable(
|
||||||
"citus.enable_create_role_propagation",
|
"citus.enable_create_role_propagation",
|
||||||
gettext_noop("Enables propagating CREATE ROLE "
|
gettext_noop("Enables propagating CREATE ROLE "
|
||||||
|
|
|
@ -1,3 +1,2 @@
|
||||||
-- citus--12.2-1--12.1-1
|
-- citus--12.2-1--12.1-1
|
||||||
DROP FUNCTION pg_catalog.citus_internal_database_command(text);
|
DROP FUNCTION pg_catalog.citus_internal_database_command(text);
|
||||||
-- this is an empty downgrade path since citus--12.2-1--12.1-1.sql is empty for now
|
|
||||||
|
|
|
@ -1,47 +0,0 @@
|
||||||
/*-------------------------------------------------------------------------
|
|
||||||
*
|
|
||||||
* pgbouncer_manager.h
|
|
||||||
* Functions for managing outbound pgbouncers
|
|
||||||
*
|
|
||||||
* Copyright (c) Citus Data, Inc.
|
|
||||||
*
|
|
||||||
*-------------------------------------------------------------------------
|
|
||||||
*/
|
|
||||||
|
|
||||||
#ifndef PGBOUNCER_MANAGER_H
|
|
||||||
#define PGBOUNCER_MANAGER_H
|
|
||||||
|
|
||||||
/* default number of inbound pgbouncer processes (0 is disabled) */
|
|
||||||
#define PGBOUNCER_INBOUND_PROCS_DEFAULT 0
|
|
||||||
|
|
||||||
/* bits reserved for local pgbouncer ID in the pgbouncer peer_id */
|
|
||||||
#define PGBOUNCER_PEER_ID_LOCAL_ID_BITS 5
|
|
||||||
|
|
||||||
/* maximum number of inbound pgbouncer processes */
|
|
||||||
#define PGBOUNCER_INBOUND_PROCS_MAX ((1 << PGBOUNCER_PEER_ID_LOCAL_ID_BITS) - 1)
|
|
||||||
|
|
||||||
|
|
||||||
/* GUC variable that sets the number of inbound pgbouncer procs */
|
|
||||||
extern int PgBouncerInboundProcs;
|
|
||||||
|
|
||||||
/* GUC variable that sets the inbound pgbouncer port */
|
|
||||||
extern int PgBouncerInboundPort;
|
|
||||||
|
|
||||||
/* GUC variable that sets the path to pgbouncer executable */
|
|
||||||
extern char *PgBouncerPath;
|
|
||||||
|
|
||||||
/* GUC variable that sets a pgbouncer file to include */
|
|
||||||
extern char *PgBouncerIncludeConfig;
|
|
||||||
|
|
||||||
/* global variable to request pgbouncer reconfiguration */
|
|
||||||
extern bool ReconfigurePgBouncersOnCommit;
|
|
||||||
|
|
||||||
void InitializeSharedPgBouncerManager(void);
|
|
||||||
size_t SharedPgBouncerManagerShmemSize(void);
|
|
||||||
void PgBouncerManagerMain(Datum arg);
|
|
||||||
bool PauseDatabaseOnInboundPgBouncers(char *databaseName);
|
|
||||||
bool ResumeDatabaseOnInboundPgBouncers(char *databaseName);
|
|
||||||
void TriggerPgBouncerReconfigureIfNeeded(void);
|
|
||||||
|
|
||||||
|
|
||||||
#endif /* PGBOUNCER_MANAGER_H */
|
|
Loading…
Reference in New Issue