mirror of https://github.com/citusdata/citus.git
Fixes review comments
parent
e716f58955
commit
4087d1941d
|
@ -295,16 +295,13 @@ PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString,
|
|||
List *
|
||||
PostprocessCreateDatabaseStmt(Node *node, const char *queryString)
|
||||
{
|
||||
if (EnableCreateDatabasePropagation)
|
||||
{
|
||||
EnsureCoordinator();
|
||||
}
|
||||
|
||||
if (!EnableCreateDatabasePropagation || !ShouldPropagate())
|
||||
{
|
||||
return NIL;
|
||||
}
|
||||
|
||||
EnsureCoordinator();
|
||||
|
||||
CreatedbStmt *stmt = castNode(CreatedbStmt, node);
|
||||
char *databaseName = stmt->dbname;
|
||||
bool missingOk = false;
|
||||
|
@ -371,6 +368,15 @@ citus_internal_database_command(PG_FUNCTION_ARGS)
|
|||
(superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION,
|
||||
GUC_ACTION_LOCAL, true, 0, false);
|
||||
|
||||
/*
|
||||
* createdb() / DropDatabase() uses ParseState to report the error position for the
|
||||
* input command and the position is reported to be 0 when it's provided as NULL.
|
||||
* We're okay with that because we don't expect this UDF to be called with an incorrect
|
||||
* DDL command.
|
||||
*
|
||||
*/
|
||||
ParseState *pstate = NULL;
|
||||
|
||||
if (IsA(parseTree, CreatedbStmt))
|
||||
{
|
||||
CreatedbStmt *stmt = castNode(CreatedbStmt, parseTree);
|
||||
|
@ -380,7 +386,7 @@ citus_internal_database_command(PG_FUNCTION_ARGS)
|
|||
|
||||
if (!OidIsValid(databaseOid))
|
||||
{
|
||||
createdb(NULL, (CreatedbStmt *) parseTree);
|
||||
createdb(pstate, (CreatedbStmt *) parseTree);
|
||||
}
|
||||
}
|
||||
else if (IsA(parseTree, DropdbStmt))
|
||||
|
@ -393,7 +399,7 @@ citus_internal_database_command(PG_FUNCTION_ARGS)
|
|||
|
||||
if (OidIsValid(databaseOid))
|
||||
{
|
||||
DropDatabase(NULL, (DropdbStmt *) parseTree);
|
||||
DropDatabase(pstate, (DropdbStmt *) parseTree);
|
||||
}
|
||||
}
|
||||
else
|
||||
|
@ -417,6 +423,8 @@ PreprocessDropDatabaseStmt(Node *node, const char *queryString,
|
|||
return NIL;
|
||||
}
|
||||
|
||||
EnsureCoordinator();
|
||||
|
||||
DropdbStmt *stmt = (DropdbStmt *) node;
|
||||
char *databaseName = stmt->dbname;
|
||||
bool missingOk = true;
|
||||
|
@ -450,10 +458,7 @@ PreprocessDropDatabaseStmt(Node *node, const char *queryString,
|
|||
|
||||
/* Delete from pg_dist_object */
|
||||
|
||||
if (IsObjectDistributed(&dbAddress))
|
||||
{
|
||||
UnmarkObjectDistributed(&dbAddress);
|
||||
}
|
||||
UnmarkObjectDistributed(&dbAddress);
|
||||
|
||||
/* ExecuteDistributedDDLJob could not be used since it depends on namespace and
|
||||
* database does not have namespace.
|
||||
|
|
|
@ -53,7 +53,6 @@
|
|||
static char * CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress);
|
||||
static int ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes,
|
||||
Datum *paramValues);
|
||||
bool IsObjectDistributed(const ObjectAddress *address);
|
||||
|
||||
PG_FUNCTION_INFO_V1(citus_unmark_object_distributed);
|
||||
PG_FUNCTION_INFO_V1(master_unmark_object_distributed);
|
||||
|
|
|
@ -310,6 +310,7 @@ static HeapTuple LookupDistPartitionTuple(Relation pgDistPartition, Oid relation
|
|||
static void GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod,
|
||||
Oid *columnTypeId, int32 *columnTypeMod,
|
||||
Oid *intervalTypeId, int32 *intervalTypeMod);
|
||||
static void CachedNamespaceLookup(const char *nspname, Oid *cachedOid);
|
||||
static void CachedRelationLookup(const char *relationName, Oid *cachedOid);
|
||||
static void CachedRelationLookupExtended(const char *relationName, Oid *cachedOid,
|
||||
bool missing_ok);
|
||||
|
@ -2769,6 +2770,15 @@ DistRebalanceStrategyRelationId(void)
|
|||
}
|
||||
|
||||
|
||||
/* return the oid of citus namespace */
|
||||
Oid
|
||||
CitusCatalogNamespaceId(void)
|
||||
{
|
||||
CachedNamespaceLookup("citus", &MetadataCache.citusCatalogNamespaceId);
|
||||
return MetadataCache.citusCatalogNamespaceId;
|
||||
}
|
||||
|
||||
|
||||
/* return oid of pg_dist_object relation */
|
||||
Oid
|
||||
DistObjectRelationId(void)
|
||||
|
@ -2795,14 +2805,12 @@ DistObjectRelationId(void)
|
|||
true);
|
||||
if (!OidIsValid(MetadataCache.distObjectRelationId))
|
||||
{
|
||||
Oid citusNamespaceId = get_namespace_oid("citus", false);
|
||||
|
||||
/*
|
||||
* We can only ever reach here while we are creating/altering our extension before
|
||||
* the table is moved to pg_catalog.
|
||||
*/
|
||||
CachedRelationNamespaceLookupExtended("pg_dist_object",
|
||||
citusNamespaceId,
|
||||
CitusCatalogNamespaceId(),
|
||||
&MetadataCache.distObjectRelationId,
|
||||
false);
|
||||
}
|
||||
|
@ -2836,6 +2844,17 @@ DistObjectPrimaryKeyIndexId(void)
|
|||
&MetadataCache.distObjectPrimaryKeyIndexId,
|
||||
true);
|
||||
|
||||
if (!OidIsValid(MetadataCache.distObjectPrimaryKeyIndexId))
|
||||
{
|
||||
/*
|
||||
* We can only ever reach here while we are creating/altering our extension before
|
||||
* the table is moved to pg_catalog.
|
||||
*/
|
||||
CachedRelationNamespaceLookupExtended("pg_dist_object_pkey",
|
||||
CitusCatalogNamespaceId(),
|
||||
&MetadataCache.distObjectPrimaryKeyIndexId,
|
||||
false);
|
||||
}
|
||||
|
||||
return MetadataCache.distObjectPrimaryKeyIndexId;
|
||||
}
|
||||
|
@ -5401,6 +5420,30 @@ DeformedDistShardTupleToShardInterval(Datum *datumArray, bool *isNullArray,
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* CachedNamespaceLookup performs a cached lookup for the namespace (schema), with the
|
||||
* result cached in cachedOid.
|
||||
*/
|
||||
static void
|
||||
CachedNamespaceLookup(const char *nspname, Oid *cachedOid)
|
||||
{
|
||||
/* force callbacks to be registered, so we always get notified upon changes */
|
||||
InitializeCaches();
|
||||
|
||||
if (*cachedOid == InvalidOid)
|
||||
{
|
||||
*cachedOid = get_namespace_oid(nspname, true);
|
||||
|
||||
if (*cachedOid == InvalidOid)
|
||||
{
|
||||
ereport(ERROR, (errmsg(
|
||||
"cache lookup failed for namespace %s, called too early?",
|
||||
nspname)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CachedRelationLookup performs a cached lookup for the relation
|
||||
* relationName, with the result cached in *cachedOid.
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
#include "tcop/utility.h"
|
||||
#include "utils/acl.h"
|
||||
|
||||
|
||||
extern bool AddAllLocalTablesToMetadata;
|
||||
extern bool EnableSchemaBasedSharding;
|
||||
|
||||
|
@ -57,6 +58,7 @@ typedef enum DistOpsOperationType
|
|||
DIST_OPS_DROP,
|
||||
} DistOpsOperationType;
|
||||
|
||||
|
||||
/*
|
||||
* DistributeObjectOps specifies handlers for node/object type pairs.
|
||||
* Instances of this type should all be declared in deparse.c.
|
||||
|
@ -77,11 +79,11 @@ typedef enum DistOpsOperationType
|
|||
*/
|
||||
typedef struct DistributeObjectOps
|
||||
{
|
||||
char *(*deparse)(Node *);
|
||||
char * (*deparse)(Node *);
|
||||
void (*qualify)(Node *);
|
||||
List *(*preprocess)(Node *, const char *, ProcessUtilityContext);
|
||||
List *(*postprocess)(Node *, const char *);
|
||||
List *(*address)(Node *, bool, bool);
|
||||
List * (*preprocess)(Node *, const char *, ProcessUtilityContext);
|
||||
List * (*postprocess)(Node *, const char *);
|
||||
List * (*address)(Node *, bool, bool);
|
||||
bool markDistributed;
|
||||
|
||||
/* fields used by common implementations, omitted for specialized implementations */
|
||||
|
@ -138,6 +140,7 @@ typedef enum ExtractForeignKeyConstraintsMode
|
|||
INCLUDE_SINGLE_SHARD_TABLES
|
||||
} ExtractForeignKeyConstraintMode;
|
||||
|
||||
|
||||
/*
|
||||
* Flags that can be passed to GetForeignKeyIdsForColumn to
|
||||
* indicate whether relationId argument should match:
|
||||
|
@ -156,6 +159,7 @@ typedef enum SearchForeignKeyColumnFlags
|
|||
/* callers can also pass union of above flags */
|
||||
} SearchForeignKeyColumnFlags;
|
||||
|
||||
|
||||
typedef enum TenantOperation
|
||||
{
|
||||
TENANT_UNDISTRIBUTE_TABLE = 0,
|
||||
|
@ -193,9 +197,11 @@ extern List * DropTextSearchDictObjectAddress(Node *node, bool missing_ok, bool
|
|||
/* index.c */
|
||||
typedef void (*PGIndexProcessor)(Form_pg_index, List **, int);
|
||||
|
||||
|
||||
/* call.c */
|
||||
extern bool CallDistributedProcedureRemotely(CallStmt *callStmt, DestReceiver *dest);
|
||||
|
||||
|
||||
/* collation.c - forward declarations */
|
||||
extern char * CreateCollationDDL(Oid collationId);
|
||||
extern List * CreateCollationDDLsIdempotent(Oid collationId);
|
||||
|
@ -224,6 +230,7 @@ extern List * PreprocessAlterDatabaseRefreshCollStmt(Node *node, const char *que
|
|||
ProcessUtilityContext
|
||||
processUtilityContext);
|
||||
|
||||
|
||||
extern List * PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
|
||||
|
@ -246,6 +253,7 @@ extern List * RenameDomainStmtObjectAddress(Node *node, bool missing_ok, bool
|
|||
extern CreateDomainStmt * RecreateDomainStmt(Oid domainOid);
|
||||
extern Oid get_constraint_typid(Oid conoid);
|
||||
|
||||
|
||||
/* extension.c - forward declarations */
|
||||
extern bool IsDropCitusExtensionStmt(Node *parsetree);
|
||||
extern List * GetDependentFDWsToExtension(Oid extensionId);
|
||||
|
@ -324,11 +332,13 @@ extern Oid GetReferencedTableId(Oid foreignKeyId);
|
|||
extern Oid GetReferencingTableId(Oid foreignKeyId);
|
||||
extern bool RelationInvolvedInAnyNonInheritedForeignKeys(Oid relationId);
|
||||
|
||||
|
||||
/* foreign_data_wrapper.c - forward declarations */
|
||||
extern List * PreprocessGrantOnFDWStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
extern Acl * GetPrivilegesForFDW(Oid FDWOid);
|
||||
|
||||
|
||||
/* foreign_server.c - forward declarations */
|
||||
extern List * PreprocessGrantOnForeignServerStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext
|
||||
|
@ -339,15 +349,17 @@ extern List * AlterForeignServerStmtObjectAddress(Node *node, bool missing_ok, b
|
|||
isPostprocess);
|
||||
extern List * RenameForeignServerStmtObjectAddress(Node *node, bool missing_ok, bool
|
||||
isPostprocess);
|
||||
extern List * AlterForeignServerOwnerStmtObjectAddress(Node *node, bool missing_ok, bool
|
||||
isPostprocess);
|
||||
extern List * AlterForeignServerOwnerStmtObjectAddress(Node *node, bool
|
||||
missing_ok, bool isPostprocess);
|
||||
extern List * GetForeignServerCreateDDLCommand(Oid serverId);
|
||||
|
||||
|
||||
/* foreign_table.c - forward declarations */
|
||||
extern List * PreprocessAlterForeignTableSchemaStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext
|
||||
processUtilityContext);
|
||||
|
||||
|
||||
/* function.c - forward declarations */
|
||||
extern List * PreprocessCreateFunctionStmt(Node *stmt, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
|
@ -377,12 +389,14 @@ extern List * PreprocessGrantOnFunctionStmt(Node *node, const char *queryString,
|
|||
ProcessUtilityContext processUtilityContext);
|
||||
extern List * PostprocessGrantOnFunctionStmt(Node *node, const char *queryString);
|
||||
|
||||
|
||||
/* grant.c - forward declarations */
|
||||
extern List * PreprocessGrantStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
extern void deparsePrivileges(StringInfo privsString, GrantStmt *grantStmt);
|
||||
extern void deparseGrantees(StringInfo granteesString, GrantStmt *grantStmt);
|
||||
|
||||
|
||||
/* index.c - forward declarations */
|
||||
extern bool IsIndexRenameStmt(RenameStmt *renameStmt);
|
||||
extern List * PreprocessIndexStmt(Node *createIndexStatement,
|
||||
|
@ -463,6 +477,7 @@ extern void ErrorIfUnsupportedRenameStmt(RenameStmt *renameStmt);
|
|||
extern List * PreprocessRenameAttributeStmt(Node *stmt, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
|
||||
|
||||
/* role.c - forward declarations*/
|
||||
extern List * PostprocessAlterRoleStmt(Node *stmt, const char *queryString);
|
||||
extern List * PreprocessAlterRoleSetStmt(Node *stmt, const char *queryString,
|
||||
|
@ -585,6 +600,7 @@ extern List * GetAlterIndexStatisticsCommands(Oid indexOid);
|
|||
/* subscription.c - forward declarations */
|
||||
extern Node * ProcessCreateSubscriptionStmt(CreateSubscriptionStmt *createSubStmt);
|
||||
|
||||
|
||||
/* table.c - forward declarations */
|
||||
extern List * PreprocessDropTableStmt(Node *stmt, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
|
|
|
@ -102,6 +102,22 @@ WHERE datname = 'mydatabase';
|
|||
(0 rows)
|
||||
|
||||
\c - - - :master_port
|
||||
--tests for special characters in database name
|
||||
set citus.enable_create_database_propagation=on;
|
||||
SET citus.log_remote_commands = true;
|
||||
set citus.grep_remote_commands = '%CREATE DATABASE%';
|
||||
create database "mydatabase#1'2";
|
||||
NOTICE: issuing SELECT pg_catalog.citus_internal_database_command('CREATE DATABASE "mydatabase#1''2"')
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing SELECT pg_catalog.citus_internal_database_command('CREATE DATABASE "mydatabase#1''2"')
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
set citus.grep_remote_commands = '%DROP DATABASE%';
|
||||
drop database "mydatabase#1'2";
|
||||
NOTICE: issuing SELECT pg_catalog.citus_internal_database_command('DROP DATABASE "mydatabase#1''2"')
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing SELECT pg_catalog.citus_internal_database_command('DROP DATABASE "mydatabase#1''2"')
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
--clean up resources created by this test
|
||||
drop tablespace create_drop_db_tablespace;
|
||||
\c - - - :worker_1_port
|
||||
drop tablespace create_drop_db_tablespace;
|
||||
|
|
|
@ -94,6 +94,18 @@ WHERE datname = 'mydatabase';
|
|||
|
||||
\c - - - :master_port
|
||||
|
||||
--tests for special characters in database name
|
||||
set citus.enable_create_database_propagation=on;
|
||||
SET citus.log_remote_commands = true;
|
||||
set citus.grep_remote_commands = '%CREATE DATABASE%';
|
||||
|
||||
create database "mydatabase#1'2";
|
||||
|
||||
set citus.grep_remote_commands = '%DROP DATABASE%';
|
||||
drop database "mydatabase#1'2";
|
||||
|
||||
--clean up resources created by this test
|
||||
|
||||
drop tablespace create_drop_db_tablespace;
|
||||
|
||||
\c - - - :worker_1_port
|
||||
|
|
Loading…
Reference in New Issue