diff --git a/src/backend/distributed/commands/database.c b/src/backend/distributed/commands/database.c index 70fa871a0..067701114 100644 --- a/src/backend/distributed/commands/database.c +++ b/src/backend/distributed/commands/database.c @@ -295,16 +295,13 @@ PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString, List * PostprocessCreateDatabaseStmt(Node *node, const char *queryString) { - if (EnableCreateDatabasePropagation) - { - EnsureCoordinator(); - } - if (!EnableCreateDatabasePropagation || !ShouldPropagate()) { return NIL; } + EnsureCoordinator(); + CreatedbStmt *stmt = castNode(CreatedbStmt, node); char *databaseName = stmt->dbname; bool missingOk = false; @@ -371,6 +368,15 @@ citus_internal_database_command(PG_FUNCTION_ARGS) (superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION, GUC_ACTION_LOCAL, true, 0, false); + /* + * createdb() / DropDatabase() uses ParseState to report the error position for the + * input command and the position is reported to be 0 when it's provided as NULL. + * We're okay with that because we don't expect this UDF to be called with an incorrect + * DDL command. + * + */ + ParseState *pstate = NULL; + if (IsA(parseTree, CreatedbStmt)) { CreatedbStmt *stmt = castNode(CreatedbStmt, parseTree); @@ -380,7 +386,7 @@ citus_internal_database_command(PG_FUNCTION_ARGS) if (!OidIsValid(databaseOid)) { - createdb(NULL, (CreatedbStmt *) parseTree); + createdb(pstate, (CreatedbStmt *) parseTree); } } else if (IsA(parseTree, DropdbStmt)) @@ -393,7 +399,7 @@ citus_internal_database_command(PG_FUNCTION_ARGS) if (OidIsValid(databaseOid)) { - DropDatabase(NULL, (DropdbStmt *) parseTree); + DropDatabase(pstate, (DropdbStmt *) parseTree); } } else @@ -417,6 +423,8 @@ PreprocessDropDatabaseStmt(Node *node, const char *queryString, return NIL; } + EnsureCoordinator(); + DropdbStmt *stmt = (DropdbStmt *) node; char *databaseName = stmt->dbname; bool missingOk = true; @@ -450,10 +458,7 @@ PreprocessDropDatabaseStmt(Node *node, const char *queryString, /* Delete from pg_dist_object */ - if (IsObjectDistributed(&dbAddress)) - { - UnmarkObjectDistributed(&dbAddress); - } + UnmarkObjectDistributed(&dbAddress); /* ExecuteDistributedDDLJob could not be used since it depends on namespace and * database does not have namespace. diff --git a/src/backend/distributed/metadata/distobject.c b/src/backend/distributed/metadata/distobject.c index a025ba73f..c420e6ec3 100644 --- a/src/backend/distributed/metadata/distobject.c +++ b/src/backend/distributed/metadata/distobject.c @@ -53,7 +53,6 @@ static char * CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress); static int ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes, Datum *paramValues); -bool IsObjectDistributed(const ObjectAddress *address); PG_FUNCTION_INFO_V1(citus_unmark_object_distributed); PG_FUNCTION_INFO_V1(master_unmark_object_distributed); diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 55ec63a6a..55d0f11c5 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -310,6 +310,7 @@ static HeapTuple LookupDistPartitionTuple(Relation pgDistPartition, Oid relation static void GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod, Oid *columnTypeId, int32 *columnTypeMod, Oid *intervalTypeId, int32 *intervalTypeMod); +static void CachedNamespaceLookup(const char *nspname, Oid *cachedOid); static void CachedRelationLookup(const char *relationName, Oid *cachedOid); static void CachedRelationLookupExtended(const char *relationName, Oid *cachedOid, bool missing_ok); @@ -2769,6 +2770,15 @@ DistRebalanceStrategyRelationId(void) } +/* return the oid of citus namespace */ +Oid +CitusCatalogNamespaceId(void) +{ + CachedNamespaceLookup("citus", &MetadataCache.citusCatalogNamespaceId); + return MetadataCache.citusCatalogNamespaceId; +} + + /* return oid of pg_dist_object relation */ Oid DistObjectRelationId(void) @@ -2795,14 +2805,12 @@ DistObjectRelationId(void) true); if (!OidIsValid(MetadataCache.distObjectRelationId)) { - Oid citusNamespaceId = get_namespace_oid("citus", false); - /* * We can only ever reach here while we are creating/altering our extension before * the table is moved to pg_catalog. */ CachedRelationNamespaceLookupExtended("pg_dist_object", - citusNamespaceId, + CitusCatalogNamespaceId(), &MetadataCache.distObjectRelationId, false); } @@ -2836,6 +2844,17 @@ DistObjectPrimaryKeyIndexId(void) &MetadataCache.distObjectPrimaryKeyIndexId, true); + if (!OidIsValid(MetadataCache.distObjectPrimaryKeyIndexId)) + { + /* + * We can only ever reach here while we are creating/altering our extension before + * the table is moved to pg_catalog. + */ + CachedRelationNamespaceLookupExtended("pg_dist_object_pkey", + CitusCatalogNamespaceId(), + &MetadataCache.distObjectPrimaryKeyIndexId, + false); + } return MetadataCache.distObjectPrimaryKeyIndexId; } @@ -5401,6 +5420,30 @@ DeformedDistShardTupleToShardInterval(Datum *datumArray, bool *isNullArray, } +/* + * CachedNamespaceLookup performs a cached lookup for the namespace (schema), with the + * result cached in cachedOid. + */ +static void +CachedNamespaceLookup(const char *nspname, Oid *cachedOid) +{ + /* force callbacks to be registered, so we always get notified upon changes */ + InitializeCaches(); + + if (*cachedOid == InvalidOid) + { + *cachedOid = get_namespace_oid(nspname, true); + + if (*cachedOid == InvalidOid) + { + ereport(ERROR, (errmsg( + "cache lookup failed for namespace %s, called too early?", + nspname))); + } + } +} + + /* * CachedRelationLookup performs a cached lookup for the relation * relationName, with the result cached in *cachedOid. diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index 36e412378..22c35a694 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -22,6 +22,7 @@ #include "tcop/utility.h" #include "utils/acl.h" + extern bool AddAllLocalTablesToMetadata; extern bool EnableSchemaBasedSharding; @@ -57,6 +58,7 @@ typedef enum DistOpsOperationType DIST_OPS_DROP, } DistOpsOperationType; + /* * DistributeObjectOps specifies handlers for node/object type pairs. * Instances of this type should all be declared in deparse.c. @@ -77,11 +79,11 @@ typedef enum DistOpsOperationType */ typedef struct DistributeObjectOps { - char *(*deparse)(Node *); + char * (*deparse)(Node *); void (*qualify)(Node *); - List *(*preprocess)(Node *, const char *, ProcessUtilityContext); - List *(*postprocess)(Node *, const char *); - List *(*address)(Node *, bool, bool); + List * (*preprocess)(Node *, const char *, ProcessUtilityContext); + List * (*postprocess)(Node *, const char *); + List * (*address)(Node *, bool, bool); bool markDistributed; /* fields used by common implementations, omitted for specialized implementations */ @@ -138,6 +140,7 @@ typedef enum ExtractForeignKeyConstraintsMode INCLUDE_SINGLE_SHARD_TABLES } ExtractForeignKeyConstraintMode; + /* * Flags that can be passed to GetForeignKeyIdsForColumn to * indicate whether relationId argument should match: @@ -156,6 +159,7 @@ typedef enum SearchForeignKeyColumnFlags /* callers can also pass union of above flags */ } SearchForeignKeyColumnFlags; + typedef enum TenantOperation { TENANT_UNDISTRIBUTE_TABLE = 0, @@ -193,9 +197,11 @@ extern List * DropTextSearchDictObjectAddress(Node *node, bool missing_ok, bool /* index.c */ typedef void (*PGIndexProcessor)(Form_pg_index, List **, int); + /* call.c */ extern bool CallDistributedProcedureRemotely(CallStmt *callStmt, DestReceiver *dest); + /* collation.c - forward declarations */ extern char * CreateCollationDDL(Oid collationId); extern List * CreateCollationDDLsIdempotent(Oid collationId); @@ -224,6 +230,7 @@ extern List * PreprocessAlterDatabaseRefreshCollStmt(Node *node, const char *que ProcessUtilityContext processUtilityContext); + extern List * PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString, ProcessUtilityContext processUtilityContext); @@ -246,6 +253,7 @@ extern List * RenameDomainStmtObjectAddress(Node *node, bool missing_ok, bool extern CreateDomainStmt * RecreateDomainStmt(Oid domainOid); extern Oid get_constraint_typid(Oid conoid); + /* extension.c - forward declarations */ extern bool IsDropCitusExtensionStmt(Node *parsetree); extern List * GetDependentFDWsToExtension(Oid extensionId); @@ -324,11 +332,13 @@ extern Oid GetReferencedTableId(Oid foreignKeyId); extern Oid GetReferencingTableId(Oid foreignKeyId); extern bool RelationInvolvedInAnyNonInheritedForeignKeys(Oid relationId); + /* foreign_data_wrapper.c - forward declarations */ extern List * PreprocessGrantOnFDWStmt(Node *node, const char *queryString, ProcessUtilityContext processUtilityContext); extern Acl * GetPrivilegesForFDW(Oid FDWOid); + /* foreign_server.c - forward declarations */ extern List * PreprocessGrantOnForeignServerStmt(Node *node, const char *queryString, ProcessUtilityContext @@ -339,15 +349,17 @@ extern List * AlterForeignServerStmtObjectAddress(Node *node, bool missing_ok, b isPostprocess); extern List * RenameForeignServerStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess); -extern List * AlterForeignServerOwnerStmtObjectAddress(Node *node, bool missing_ok, bool - isPostprocess); +extern List * AlterForeignServerOwnerStmtObjectAddress(Node *node, bool + missing_ok, bool isPostprocess); extern List * GetForeignServerCreateDDLCommand(Oid serverId); + /* foreign_table.c - forward declarations */ extern List * PreprocessAlterForeignTableSchemaStmt(Node *node, const char *queryString, ProcessUtilityContext processUtilityContext); + /* function.c - forward declarations */ extern List * PreprocessCreateFunctionStmt(Node *stmt, const char *queryString, ProcessUtilityContext processUtilityContext); @@ -377,12 +389,14 @@ extern List * PreprocessGrantOnFunctionStmt(Node *node, const char *queryString, ProcessUtilityContext processUtilityContext); extern List * PostprocessGrantOnFunctionStmt(Node *node, const char *queryString); + /* grant.c - forward declarations */ extern List * PreprocessGrantStmt(Node *node, const char *queryString, ProcessUtilityContext processUtilityContext); extern void deparsePrivileges(StringInfo privsString, GrantStmt *grantStmt); extern void deparseGrantees(StringInfo granteesString, GrantStmt *grantStmt); + /* index.c - forward declarations */ extern bool IsIndexRenameStmt(RenameStmt *renameStmt); extern List * PreprocessIndexStmt(Node *createIndexStatement, @@ -463,6 +477,7 @@ extern void ErrorIfUnsupportedRenameStmt(RenameStmt *renameStmt); extern List * PreprocessRenameAttributeStmt(Node *stmt, const char *queryString, ProcessUtilityContext processUtilityContext); + /* role.c - forward declarations*/ extern List * PostprocessAlterRoleStmt(Node *stmt, const char *queryString); extern List * PreprocessAlterRoleSetStmt(Node *stmt, const char *queryString, @@ -585,6 +600,7 @@ extern List * GetAlterIndexStatisticsCommands(Oid indexOid); /* subscription.c - forward declarations */ extern Node * ProcessCreateSubscriptionStmt(CreateSubscriptionStmt *createSubStmt); + /* table.c - forward declarations */ extern List * PreprocessDropTableStmt(Node *stmt, const char *queryString, ProcessUtilityContext processUtilityContext); diff --git a/src/test/regress/expected/create_drop_database_propagation.out b/src/test/regress/expected/create_drop_database_propagation.out index e578cd2eb..be85e50c0 100644 --- a/src/test/regress/expected/create_drop_database_propagation.out +++ b/src/test/regress/expected/create_drop_database_propagation.out @@ -102,6 +102,22 @@ WHERE datname = 'mydatabase'; (0 rows) \c - - - :master_port +--tests for special characters in database name +set citus.enable_create_database_propagation=on; +SET citus.log_remote_commands = true; +set citus.grep_remote_commands = '%CREATE DATABASE%'; +create database "mydatabase#1'2"; +NOTICE: issuing SELECT pg_catalog.citus_internal_database_command('CREATE DATABASE "mydatabase#1''2"') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT pg_catalog.citus_internal_database_command('CREATE DATABASE "mydatabase#1''2"') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +set citus.grep_remote_commands = '%DROP DATABASE%'; +drop database "mydatabase#1'2"; +NOTICE: issuing SELECT pg_catalog.citus_internal_database_command('DROP DATABASE "mydatabase#1''2"') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT pg_catalog.citus_internal_database_command('DROP DATABASE "mydatabase#1''2"') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +--clean up resources created by this test drop tablespace create_drop_db_tablespace; \c - - - :worker_1_port drop tablespace create_drop_db_tablespace; diff --git a/src/test/regress/sql/create_drop_database_propagation.sql b/src/test/regress/sql/create_drop_database_propagation.sql index 415a03f97..5e2166f77 100644 --- a/src/test/regress/sql/create_drop_database_propagation.sql +++ b/src/test/regress/sql/create_drop_database_propagation.sql @@ -94,6 +94,18 @@ WHERE datname = 'mydatabase'; \c - - - :master_port +--tests for special characters in database name +set citus.enable_create_database_propagation=on; +SET citus.log_remote_commands = true; +set citus.grep_remote_commands = '%CREATE DATABASE%'; + +create database "mydatabase#1'2"; + +set citus.grep_remote_commands = '%DROP DATABASE%'; +drop database "mydatabase#1'2"; + +--clean up resources created by this test + drop tablespace create_drop_db_tablespace; \c - - - :worker_1_port