mirror of https://github.com/citusdata/citus.git
support create schema <schema> elements
parent
44e3c3b9c6
commit
7ee9285094
|
@ -45,9 +45,9 @@
|
|||
static List * GetObjectAddressBySchemaName(char *schemaName, bool missing_ok);
|
||||
static List * FilterDistributedSchemas(List *schemas);
|
||||
static bool SchemaHasDistributedTableWithFKey(char *schemaName);
|
||||
static bool ShouldPropagateCreateSchemaStmt(void);
|
||||
static bool ShouldPropagateCreateSchemaStmt(CreateSchemaStmt *createSchemaStmt);
|
||||
static List * GetGrantCommandsFromCreateSchemaStmt(Node *node);
|
||||
static bool CreateSchemaStmtCreatesTable(CreateSchemaStmt *stmt);
|
||||
static char * CreateSchemaStmtSchemaName(Node *node);
|
||||
|
||||
|
||||
/*
|
||||
|
@ -57,20 +57,20 @@ static bool CreateSchemaStmtCreatesTable(CreateSchemaStmt *stmt);
|
|||
List *
|
||||
PostprocessCreateSchemaStmt(Node *node, const char *queryString)
|
||||
{
|
||||
CreateSchemaStmt *createSchemaStmt = castNode(CreateSchemaStmt, node);
|
||||
EnsureCoordinator();
|
||||
|
||||
if (!ShouldPropagateCreateSchemaStmt())
|
||||
CreateSchemaStmt *createSchemaStmt = castNode(CreateSchemaStmt, node);
|
||||
if (!ShouldPropagateCreateSchemaStmt(createSchemaStmt))
|
||||
{
|
||||
return NIL;
|
||||
}
|
||||
|
||||
EnsureCoordinator();
|
||||
|
||||
EnsureSequentialMode(OBJECT_SCHEMA);
|
||||
|
||||
bool missingOk = createSchemaStmt->if_not_exists;
|
||||
List *schemaAdressList = CreateSchemaStmtObjectAddress(node, missingOk, true);
|
||||
Assert(list_length(schemaAdressList) == 1);
|
||||
|
||||
ObjectAddress *schemaAdress = linitial(schemaAdressList);
|
||||
Oid schemaId = schemaAdress->objectId;
|
||||
if (!OidIsValid(schemaId))
|
||||
|
@ -88,37 +88,6 @@ PostprocessCreateSchemaStmt(Node *node, const char *queryString)
|
|||
|
||||
commands = list_concat(commands, GetGrantCommandsFromCreateSchemaStmt(node));
|
||||
|
||||
char *schemaName = get_namespace_name(schemaId);
|
||||
if (ShouldUseSchemaBasedSharding(schemaName))
|
||||
{
|
||||
/* for now, we don't allow creating tenant tables when creating the schema itself */
|
||||
if (CreateSchemaStmtCreatesTable(createSchemaStmt))
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot create distributed schema and table in a "
|
||||
"single statement"),
|
||||
errhint("SET citus.enable_schema_based_sharding TO off, "
|
||||
"or create the schema and table in separate "
|
||||
"commands.")));
|
||||
}
|
||||
|
||||
/*
|
||||
* Register the tenant schema on the coordinator and save the command
|
||||
* to register it on the workers.
|
||||
*/
|
||||
int shardCount = 1;
|
||||
int replicationFactor = 1;
|
||||
Oid distributionColumnType = InvalidOid;
|
||||
Oid distributionColumnCollation = InvalidOid;
|
||||
uint32 colocationId = CreateColocationGroup(
|
||||
shardCount, replicationFactor, distributionColumnType,
|
||||
distributionColumnCollation);
|
||||
|
||||
InsertTenantSchemaLocally(schemaId, colocationId);
|
||||
|
||||
commands = lappend(commands, TenantSchemaInsertCommand(schemaId, colocationId));
|
||||
}
|
||||
|
||||
commands = lappend(commands, ENABLE_DDL_PROPAGATION);
|
||||
|
||||
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
|
||||
|
@ -230,17 +199,14 @@ PreprocessGrantOnSchemaStmt(Node *node, const char *queryString,
|
|||
|
||||
|
||||
/*
|
||||
* CreateSchemaStmtObjectAddress returns the ObjectAddress of the schema that is
|
||||
* the object of the CreateSchemaStmt. Errors if missing_ok is false.
|
||||
* CreateSchemaStmtSchemaName returns the schema name properly from either schemaname
|
||||
* or role specification.
|
||||
*/
|
||||
List *
|
||||
CreateSchemaStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess)
|
||||
static char *
|
||||
CreateSchemaStmtSchemaName(Node *node)
|
||||
{
|
||||
CreateSchemaStmt *stmt = castNode(CreateSchemaStmt, node);
|
||||
|
||||
StringInfoData schemaName = { 0 };
|
||||
initStringInfo(&schemaName);
|
||||
|
||||
StringInfo schemaName = makeStringInfo();
|
||||
if (stmt->schemaname == NULL)
|
||||
{
|
||||
/*
|
||||
|
@ -248,14 +214,26 @@ CreateSchemaStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess)
|
|||
* with the name of the authorizated user.
|
||||
*/
|
||||
Assert(stmt->authrole != NULL);
|
||||
appendStringInfoString(&schemaName, RoleSpecString(stmt->authrole, true));
|
||||
appendStringInfoString(schemaName, RoleSpecString(stmt->authrole, true));
|
||||
}
|
||||
else
|
||||
{
|
||||
appendStringInfoString(&schemaName, stmt->schemaname);
|
||||
appendStringInfoString(schemaName, stmt->schemaname);
|
||||
}
|
||||
|
||||
return GetObjectAddressBySchemaName(schemaName.data, missing_ok);
|
||||
return schemaName->data;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateSchemaStmtObjectAddress returns the ObjectAddress of the schema that is
|
||||
* the object of the CreateSchemaStmt. Errors if missing_ok is false.
|
||||
*/
|
||||
List *
|
||||
CreateSchemaStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess)
|
||||
{
|
||||
char *schemaName = CreateSchemaStmtSchemaName(node);
|
||||
return GetObjectAddressBySchemaName(schemaName, missing_ok);
|
||||
}
|
||||
|
||||
|
||||
|
@ -405,13 +383,25 @@ SchemaHasDistributedTableWithFKey(char *schemaName)
|
|||
* switched to sequential mode, we don't propagate.
|
||||
*/
|
||||
static bool
|
||||
ShouldPropagateCreateSchemaStmt()
|
||||
ShouldPropagateCreateSchemaStmt(CreateSchemaStmt *createSchemaStmt)
|
||||
{
|
||||
if (!ShouldPropagate())
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* We are calling `citus_schema_distribute` after postprocess if the schema is
|
||||
* going to be a tenant table. We should not propagate the schema here and
|
||||
* let `citus_schema_distribute` create it during dependency creation outside
|
||||
* transaction for visibility and deadlock reasons.
|
||||
*/
|
||||
char *schemaName = CreateSchemaStmtSchemaName((Node *) createSchemaStmt);
|
||||
if (ShouldUseSchemaBasedSharding(schemaName))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/* check creation against multi-statement transaction policy */
|
||||
if (!ShouldPropagateCreateInCoordinatedTransction())
|
||||
{
|
||||
|
@ -461,27 +451,3 @@ GetGrantCommandsFromCreateSchemaStmt(Node *node)
|
|||
|
||||
return commands;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateSchemaStmtCreatesTable returns true if given CreateSchemaStmt
|
||||
* creates a table using "schema_element" list.
|
||||
*/
|
||||
static bool
|
||||
CreateSchemaStmtCreatesTable(CreateSchemaStmt *stmt)
|
||||
{
|
||||
Node *element = NULL;
|
||||
foreach_ptr(element, stmt->schemaElts)
|
||||
{
|
||||
/*
|
||||
* CREATE TABLE AS and CREATE FOREIGN TABLE commands cannot be
|
||||
* used as schema_elements anyway, so we don't need to check them.
|
||||
*/
|
||||
if (IsA(element, CreateStmt))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -593,6 +593,43 @@ citus_schema_distribute(PG_FUNCTION_ARGS)
|
|||
EnsureCoordinator();
|
||||
|
||||
Oid schemaId = PG_GETARG_OID(0);
|
||||
DistributeSchema(schemaId);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* PropagateSchema propagates the schema to workers and mark it as distributed.
|
||||
*/
|
||||
void
|
||||
PropagateSchema(Oid schemaId)
|
||||
{
|
||||
/* Propagate the schema to workers */
|
||||
ObjectAddress *schemaAddress = palloc0(sizeof(ObjectAddress));
|
||||
ObjectAddressSet(*schemaAddress, NamespaceRelationId, schemaId);
|
||||
|
||||
List *schemaCommands = GetAllDependencyCreateDDLCommands(list_make1(schemaAddress));
|
||||
schemaCommands = lcons(DISABLE_DDL_PROPAGATION, schemaCommands);
|
||||
schemaCommands = lappend(schemaCommands, ENABLE_DDL_PROPAGATION);
|
||||
|
||||
List *metadataNodes = TargetWorkerSetNodeList(NON_COORDINATOR_NODES,
|
||||
RowShareLock);
|
||||
SendMetadataCommandListToWorkerListInCoordinatedTransaction(metadataNodes,
|
||||
CurrentUserName(),
|
||||
schemaCommands);
|
||||
|
||||
/* Mark the schema as distributed object */
|
||||
MarkObjectDistributed(schemaAddress);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DistributeSchema distributes given schema if all required checks pass.
|
||||
*/
|
||||
void
|
||||
DistributeSchema(Oid schemaId)
|
||||
{
|
||||
EnsureSchemaExist(schemaId);
|
||||
EnsureSchemaOwner(schemaId);
|
||||
|
||||
|
@ -611,7 +648,7 @@ citus_schema_distribute(PG_FUNCTION_ARGS)
|
|||
if (IsTenantSchema(schemaId))
|
||||
{
|
||||
ereport(NOTICE, (errmsg("schema %s is already distributed", schemaName)));
|
||||
PG_RETURN_VOID();
|
||||
return;
|
||||
}
|
||||
|
||||
/* Take lock on the relations and filter out partition tables */
|
||||
|
@ -673,6 +710,15 @@ citus_schema_distribute(PG_FUNCTION_ARGS)
|
|||
ExecuteForeignKeyCreateCommandList(originalForeignKeyRecreationCommands,
|
||||
skip_validation);
|
||||
|
||||
/*
|
||||
* The schema is propagated during table creation above when there is at least
|
||||
* one table under the schema. (`EnsureDependenciesExistOnAllNodes`) In cases
|
||||
* there is no table under it, the schema should be propagated explicitly. The
|
||||
* schema propagation is idempotent so it is safe to propagate it even if we
|
||||
* already propagated it.
|
||||
*/
|
||||
PropagateSchema(schemaId);
|
||||
|
||||
/* Register the schema locally and sync it to workers */
|
||||
InsertTenantSchemaLocally(schemaId, colocationId);
|
||||
char *registerSchemaCommand = TenantSchemaInsertCommand(schemaId, colocationId);
|
||||
|
@ -680,8 +726,6 @@ citus_schema_distribute(PG_FUNCTION_ARGS)
|
|||
{
|
||||
SendCommandToWorkersWithMetadata(registerSchemaCommand);
|
||||
}
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -4263,3 +4263,30 @@ ConvertToTenantTableIfNecessary(AlterObjectSchemaStmt *stmt)
|
|||
CreateTenantSchemaTable(relationId);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DistributeSchemaIfNecessary distributes the schema if the schema is not already
|
||||
* distributed and the GUC `EnableSchemaBasedSharding` is enabled.
|
||||
*/
|
||||
void
|
||||
DistributeSchemaIfNecessary(CreateSchemaStmt *createSchemaStmt)
|
||||
{
|
||||
bool missingOk = createSchemaStmt->if_not_exists;
|
||||
List *schemaAdressList = CreateSchemaStmtObjectAddress((Node *) createSchemaStmt,
|
||||
missingOk, true);
|
||||
Assert(list_length(schemaAdressList) == 1);
|
||||
ObjectAddress *schemaAdress = linitial(schemaAdressList);
|
||||
Oid schemaId = schemaAdress->objectId;
|
||||
if (!OidIsValid(schemaId))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
char *schemaName = get_namespace_name(schemaId);
|
||||
if (ShouldUseSchemaBasedSharding(schemaName) &&
|
||||
!IsTenantSchema(schemaId))
|
||||
{
|
||||
DistributeSchema(schemaId);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -358,9 +358,8 @@ multi_ProcessUtility(PlannedStmt *pstmt,
|
|||
|
||||
ConvertNewTableIfNecessary(createStmt);
|
||||
}
|
||||
|
||||
if (context == PROCESS_UTILITY_TOPLEVEL &&
|
||||
IsA(parsetree, AlterObjectSchemaStmt))
|
||||
else if (context == PROCESS_UTILITY_TOPLEVEL &&
|
||||
IsA(parsetree, AlterObjectSchemaStmt))
|
||||
{
|
||||
AlterObjectSchemaStmt *alterSchemaStmt = castNode(AlterObjectSchemaStmt,
|
||||
parsetree);
|
||||
|
@ -370,6 +369,13 @@ multi_ProcessUtility(PlannedStmt *pstmt,
|
|||
ConvertToTenantTableIfNecessary(alterSchemaStmt);
|
||||
}
|
||||
}
|
||||
else if (context == PROCESS_UTILITY_TOPLEVEL && IsA(parsetree,
|
||||
CreateSchemaStmt))
|
||||
{
|
||||
CreateSchemaStmt *createSchemaStmt = castNode(CreateSchemaStmt,
|
||||
parsetree);
|
||||
DistributeSchemaIfNecessary(createSchemaStmt);
|
||||
}
|
||||
}
|
||||
|
||||
UtilityHookLevel--;
|
||||
|
|
|
@ -611,6 +611,7 @@ extern char * GetAlterColumnWithNextvalDefaultCmd(Oid sequenceOid, Oid relationI
|
|||
extern void ErrorIfTableHasIdentityColumn(Oid relationId);
|
||||
extern void ConvertNewTableIfNecessary(Node *createStmt);
|
||||
extern void ConvertToTenantTableIfNecessary(AlterObjectSchemaStmt *alterObjectSchemaStmt);
|
||||
extern void DistributeSchemaIfNecessary(CreateSchemaStmt *createSchemaStmt);
|
||||
|
||||
/* text_search.c - forward declarations */
|
||||
extern List * GetCreateTextSearchConfigStatements(const ObjectAddress *address);
|
||||
|
@ -815,6 +816,8 @@ extern void ErrorIfIllegalPartitioningInTenantSchema(Oid parentRelationId,
|
|||
Oid partitionRelationId);
|
||||
extern void CreateTenantSchemaTable(Oid relationId);
|
||||
extern void ErrorIfTenantTable(Oid relationId, const char *operationName);
|
||||
extern void DistributeSchema(Oid schemaId);
|
||||
extern void PropagateSchema(Oid schemaId);
|
||||
extern uint32 CreateTenantSchemaColocationId(void);
|
||||
|
||||
#endif /*CITUS_COMMANDS_H */
|
||||
|
|
|
@ -301,6 +301,21 @@ ERROR: must be owner of table table5
|
|||
-- alter table owner, then redistribute
|
||||
SET role tenantuser;
|
||||
ALTER TABLE tenant1.table5 OWNER TO dummyregular;
|
||||
SET role dummyregular;
|
||||
SELECT citus_schema_distribute('tenant1');
|
||||
WARNING: permission denied for database regression
|
||||
CONTEXT: while executing command on localhost:xxxxx
|
||||
ERROR: failure on connection marked as essential: localhost:xxxxx
|
||||
-- grant create on database, then redistribute
|
||||
SET role tenantuser;
|
||||
SELECT result FROM run_command_on_all_nodes($$ GRANT CREATE ON DATABASE regression TO dummyregular; $$);
|
||||
result
|
||||
---------------------------------------------------------------------
|
||||
GRANT
|
||||
GRANT
|
||||
GRANT
|
||||
(3 rows)
|
||||
|
||||
SET role dummyregular;
|
||||
SELECT citus_schema_distribute('tenant1');
|
||||
citus_schema_distribute
|
||||
|
@ -374,6 +389,14 @@ SELECT result FROM run_command_on_all_nodes($$ REASSIGN OWNED BY dummyregular TO
|
|||
REASSIGN OWNED
|
||||
(3 rows)
|
||||
|
||||
SELECT result FROM run_command_on_all_nodes($$ REVOKE CREATE ON DATABASE regression FROM dummyregular; $$);
|
||||
result
|
||||
---------------------------------------------------------------------
|
||||
REVOKE
|
||||
REVOKE
|
||||
REVOKE
|
||||
(3 rows)
|
||||
|
||||
DROP USER dummyregular;
|
||||
CREATE USER dummysuper superuser;
|
||||
SET role dummysuper;
|
||||
|
|
|
@ -919,6 +919,7 @@ SELECT citus_stat_tenants_reset();
|
|||
|
||||
SET citus.enable_schema_based_sharding TO ON;
|
||||
CREATE SCHEMA citus_stat_tenants_t1;
|
||||
NOTICE: distributing the schema citus_stat_tenants_t1
|
||||
CREATE TABLE citus_stat_tenants_t1.users(id int);
|
||||
SELECT id FROM citus_stat_tenants_t1.users WHERE id = 2;
|
||||
id
|
||||
|
|
|
@ -1792,6 +1792,7 @@ SELECT citus_remove_node('localhost', :master_port);
|
|||
-- confirm that we can create a tenant schema / table on an empty node
|
||||
SET citus.enable_schema_based_sharding TO ON;
|
||||
CREATE SCHEMA tenant_schema;
|
||||
NOTICE: distributing the schema tenant_schema
|
||||
CREATE TABLE tenant_schema.test(x int, y int);
|
||||
SELECT colocationid = (
|
||||
SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'tenant_schema.test'::regclass
|
||||
|
@ -1820,6 +1821,7 @@ SELECT citus_remove_node('localhost', :master_port);
|
|||
(1 row)
|
||||
|
||||
CREATE SCHEMA tenant_schema;
|
||||
NOTICE: distributing the schema tenant_schema
|
||||
-- Make sure that we can sync metadata for empty tenant schemas
|
||||
-- when adding the first node to the cluster.
|
||||
SELECT 1 FROM citus_add_node('localhost', :worker_1_port);
|
||||
|
|
|
@ -99,6 +99,7 @@ SELECT create_distributed_table('test','x', colocate_with := 'none');
|
|||
|
||||
SET citus.enable_schema_based_sharding TO ON;
|
||||
CREATE SCHEMA citus_schema_1;
|
||||
NOTICE: distributing the schema citus_schema_1
|
||||
CREATE TABLE citus_schema_1.test (x int primary key, y int, "column-1" int, doc xml);
|
||||
SET citus.enable_schema_based_sharding TO OFF;
|
||||
ALTER PUBLICATION pubtables_orig ADD TABLE citus_schema_1.test;
|
||||
|
|
|
@ -99,6 +99,7 @@ SELECT create_distributed_table('test','x', colocate_with := 'none');
|
|||
|
||||
SET citus.enable_schema_based_sharding TO ON;
|
||||
CREATE SCHEMA citus_schema_1;
|
||||
NOTICE: distributing the schema citus_schema_1
|
||||
CREATE TABLE citus_schema_1.test (x int primary key, y int, "column-1" int, doc xml);
|
||||
SET citus.enable_schema_based_sharding TO OFF;
|
||||
ALTER PUBLICATION pubtables_orig ADD TABLE citus_schema_1.test;
|
||||
|
|
|
@ -50,11 +50,14 @@ SELECT COUNT(*)=0 FROM pg_dist_schema WHERE schemaid::regnamespace::text = 'regu
|
|||
|
||||
-- empty tenant
|
||||
CREATE SCHEMA "tenant\'_1";
|
||||
NOTICE: distributing the schema tenant\'_1
|
||||
-- non-empty tenant
|
||||
CREATE SCHEMA "tenant\'_2";
|
||||
NOTICE: distributing the schema tenant\'_2
|
||||
CREATE TABLE "tenant\'_2".test_table(a int, b text);
|
||||
-- empty tenant
|
||||
CREATE SCHEMA "tenant\'_3";
|
||||
NOTICE: distributing the schema tenant\'_3
|
||||
CREATE TABLE "tenant\'_3".test_table(a int, b text);
|
||||
DROP TABLE "tenant\'_3".test_table;
|
||||
-- add a node after creating tenant schemas
|
||||
|
@ -312,11 +315,88 @@ WHERE schemaid::regnamespace::text IN ('tenant_1', 'tenant_2');
|
|||
t
|
||||
(1 row)
|
||||
|
||||
-- verify that we don't allow creating tenant tables via CREATE SCHEMA command
|
||||
CREATE SCHEMA schema_using_schema_elements CREATE TABLE test_table(a int, b text);
|
||||
ERROR: cannot create distributed schema and table in a single statement
|
||||
HINT: SET citus.enable_schema_based_sharding TO off, or create the schema and table in separate commands.
|
||||
-- verify that we allow creating tenant tables via CREATE SCHEMA command
|
||||
CREATE SCHEMA schema_using_schema_elements
|
||||
CREATE TABLE test_table(a int, b text);
|
||||
NOTICE: distributing the schema schema_using_schema_elements
|
||||
SELECT COUNT(*)=1 FROM pg_dist_partition
|
||||
WHERE logicalrelid = 'schema_using_schema_elements.test_table'::regclass AND
|
||||
partmethod = 'n' AND repmodel = 's' AND colocationid = (
|
||||
SELECT colocationid FROM pg_dist_partition
|
||||
WHERE logicalrelid = 'schema_using_schema_elements.test_table'::regclass);
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
-- verify that we allow creating tenant tables with other objects via CREATE SCHEMA command
|
||||
CREATE OR REPLACE FUNCTION dummy_trigger_fn()
|
||||
RETURNS trigger
|
||||
AS $$
|
||||
BEGIN
|
||||
RETURN NULL;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
SET citus.enable_unsafe_triggers TO on;
|
||||
CREATE SCHEMA schema_using_schema_elements_complex
|
||||
CREATE SEQUENCE seq
|
||||
CREATE TABLE test_table(a int DEFAULT nextval('seq'), b tsvector)
|
||||
CREATE INDEX gin_idx ON test_table USING GIN (b)
|
||||
CREATE VIEW v AS SELECT * FROM test_table
|
||||
GRANT CREATE ON SCHEMA schema_using_schema_elements_complex TO public
|
||||
GRANT SELECT ON TABLE test_table TO public
|
||||
CREATE TRIGGER dummy_trigger
|
||||
AFTER INSERT OR UPDATE OR DELETE ON test_table
|
||||
FOR EACH ROW EXECUTE FUNCTION dummy_trigger_fn();
|
||||
WARNING: "view v" has dependency to "table test_table" that is not in Citus' metadata
|
||||
DETAIL: "view v" will be created only locally
|
||||
HINT: Distribute "table test_table" first to distribute "view v"
|
||||
NOTICE: distributing the schema schema_using_schema_elements_complex
|
||||
RESET citus.enable_unsafe_triggers;
|
||||
SELECT COUNT(*)=1 FROM pg_dist_partition
|
||||
WHERE logicalrelid = 'schema_using_schema_elements_complex.test_table'::regclass AND
|
||||
partmethod = 'n' AND repmodel = 's' AND colocationid = (
|
||||
SELECT colocationid FROM pg_dist_partition
|
||||
WHERE logicalrelid = 'schema_using_schema_elements_complex.test_table'::regclass);
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
-- verify that we allow fkeys with reference tables while creating tenant tables with other objects via CREATE SCHEMA command
|
||||
CREATE SCHEMA schema_using_schema_elements_with_local
|
||||
CREATE TABLE local_table(id int PRIMARY KEY REFERENCES regular_schema.ref_tbl(id))
|
||||
CREATE TABLE test_table(a int REFERENCES local_table(id), b text);
|
||||
NOTICE: distributing the schema schema_using_schema_elements_with_local
|
||||
SELECT COUNT(*)=1 FROM pg_dist_partition
|
||||
WHERE logicalrelid = 'schema_using_schema_elements_with_local.test_table'::regclass AND
|
||||
partmethod = 'n' AND repmodel = 's' AND colocationid = (
|
||||
SELECT colocationid FROM pg_dist_partition
|
||||
WHERE logicalrelid = 'schema_using_schema_elements_with_local.test_table'::regclass);
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT COUNT(*)=1 FROM pg_dist_partition
|
||||
WHERE logicalrelid = 'schema_using_schema_elements_with_local.local_table'::regclass AND
|
||||
partmethod = 'n' AND repmodel = 's' AND colocationid = (
|
||||
SELECT colocationid FROM pg_dist_partition
|
||||
WHERE logicalrelid = 'schema_using_schema_elements_with_local.local_table'::regclass);
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
-- verify that we do not allow fkeys with other schemas while creating tenant tables with other objects via CREATE SCHEMA command
|
||||
CREATE TABLE regular_schema.loc_t(id int PRIMARY KEY REFERENCES regular_schema.ref_tbl(id));
|
||||
CREATE SCHEMA schema_using_schema_elements_with_local2
|
||||
CREATE TABLE local_table(id int PRIMARY KEY REFERENCES regular_schema.loc_t(id))
|
||||
CREATE TABLE test_table(a int REFERENCES local_table(id), b text);
|
||||
ERROR: foreign keys from distributed schemas can only point to the same distributed schema or reference tables in regular schemas
|
||||
DETAIL: "schema_using_schema_elements_with_local2.local_table" references "regular_schema.loc_t" via foreign key constraint "local_table_id_fkey"
|
||||
CREATE SCHEMA tenant_4;
|
||||
NOTICE: distributing the schema tenant_4
|
||||
CREATE TABLE tenant_4.tbl_1(a int, b text);
|
||||
CREATE TABLE tenant_4.tbl_2(a int, b text);
|
||||
-- verify that we don't allow creating a foreign table in a tenant schema, with a nice error message
|
||||
|
@ -389,12 +469,13 @@ SELECT EXISTS(
|
|||
(1 row)
|
||||
|
||||
INSERT INTO tenant_4.another_partitioned_table VALUES (1, 'a');
|
||||
ERROR: insert or update on table "another_partitioned_table_child_1920090" violates foreign key constraint "another_partitioned_table_a_fkey_1920089"
|
||||
DETAIL: Key (a)=(1) is not present in table "partitioned_table_1920087".
|
||||
ERROR: insert or update on table "another_partitioned_table_child_1920099" violates foreign key constraint "another_partitioned_table_a_fkey_1920098"
|
||||
DETAIL: Key (a)=(1) is not present in table "partitioned_table_1920096".
|
||||
CONTEXT: while executing command on localhost:xxxxx
|
||||
INSERT INTO tenant_4.partitioned_table VALUES (1, 'a');
|
||||
INSERT INTO tenant_4.another_partitioned_table VALUES (1, 'a');
|
||||
CREATE SCHEMA tenant_5;
|
||||
NOTICE: distributing the schema tenant_5
|
||||
CREATE TABLE tenant_5.tbl_1(a int, b text);
|
||||
CREATE TABLE tenant_5.partitioned_table(a int, b text) PARTITION BY RANGE (a);
|
||||
-- verify that we don't allow creating a partition table that is child of a partitioned table in a different tenant schema
|
||||
|
@ -628,6 +709,7 @@ CREATE TABLE regular_schema.local_table_using_inherits(x int) INHERITS (tenant_5
|
|||
ERROR: tenant tables cannot inherit or be inherited
|
||||
CREATE TABLE tenant_5.tbl_2(a int, b text);
|
||||
CREATE SCHEMA "CiTuS.TeeN_108";
|
||||
NOTICE: distributing the schema CiTuS.TeeN_108
|
||||
ALTER SCHEMA "CiTuS.TeeN_108" RENAME TO citus_teen_proper;
|
||||
SELECT schemaid AS citus_teen_schemaid FROM pg_dist_schema WHERE schemaid::regnamespace::text = 'citus_teen_proper' \gset
|
||||
SELECT colocationid AS citus_teen_colocationid FROM pg_dist_schema WHERE schemaid::regnamespace::text = 'citus_teen_proper' \gset
|
||||
|
@ -1038,6 +1120,7 @@ DROP TABLE temp_table;
|
|||
-- test creating a tenant schema and a tenant table for it in the same transaction
|
||||
BEGIN;
|
||||
CREATE SCHEMA tenant_7;
|
||||
NOTICE: distributing the schema tenant_7
|
||||
CREATE TABLE tenant_7.tbl_1(a int, b text);
|
||||
CREATE TABLE tenant_7.tbl_2(a int, b text);
|
||||
SELECT colocationid = (
|
||||
|
@ -1063,6 +1146,7 @@ COMMIT;
|
|||
-- but this time rollback the transaction.
|
||||
BEGIN;
|
||||
CREATE SCHEMA tenant_8;
|
||||
NOTICE: distributing the schema tenant_8
|
||||
CREATE TABLE tenant_8.tbl_1(a int, b text);
|
||||
CREATE TABLE tenant_8.tbl_2(a int, b text);
|
||||
ROLLBACK;
|
||||
|
@ -1133,6 +1217,7 @@ CREATE TABLE tenant_5.tbl_5(a int, b text, FOREIGN KEY(a) REFERENCES tenant_7.tb
|
|||
ERROR: cannot create foreign key constraint since relations are not colocated or not referencing a reference table
|
||||
DETAIL: A distributed table can only have foreign keys if it is referencing another colocated hash distributed table or a reference table
|
||||
CREATE SCHEMA tenant_9;
|
||||
NOTICE: distributing the schema tenant_9
|
||||
SELECT schemaid AS tenant_9_schemaid FROM pg_dist_schema WHERE schemaid::regnamespace::text = 'tenant_9' \gset
|
||||
SELECT colocationid AS tenant_9_colocationid FROM pg_dist_schema WHERE schemaid::regnamespace::text = 'tenant_9' \gset
|
||||
SELECT result FROM run_command_on_workers($$
|
||||
|
@ -1270,6 +1355,7 @@ CREATE USER test_other_super_user WITH superuser;
|
|||
\c - test_other_super_user
|
||||
SET citus.enable_schema_based_sharding TO ON;
|
||||
CREATE SCHEMA tenant_9;
|
||||
NOTICE: distributing the schema tenant_9
|
||||
\c - postgres
|
||||
SET search_path TO regular_schema;
|
||||
SET citus.next_shard_id TO 1930000;
|
||||
|
@ -1360,10 +1446,12 @@ SET client_min_messages TO NOTICE;
|
|||
SET citus.enable_schema_based_sharding TO ON;
|
||||
-- test create / drop tenant schema / table
|
||||
CREATE SCHEMA tenant_10;
|
||||
NOTICE: distributing the schema tenant_10
|
||||
CREATE TABLE tenant_10.tbl_1(a int, b text);
|
||||
CREATE TABLE tenant_10.tbl_2(a int, b text);
|
||||
DROP TABLE tenant_10.tbl_2;
|
||||
CREATE SCHEMA tenant_11;
|
||||
NOTICE: distributing the schema tenant_11
|
||||
SELECT schemaid AS tenant_10_schemaid FROM pg_dist_schema WHERE schemaid::regnamespace::text = 'tenant_10' \gset
|
||||
SELECT colocationid AS tenant_10_colocationid FROM pg_dist_schema WHERE schemaid::regnamespace::text = 'tenant_10' \gset
|
||||
SELECT schemaid AS tenant_11_schemaid FROM pg_dist_schema WHERE schemaid::regnamespace::text = 'tenant_11' \gset
|
||||
|
@ -1540,6 +1628,7 @@ SET client_min_messages TO NOTICE;
|
|||
CREATE TABLE tenant_3.tbl_1(a int, b text);
|
||||
SET citus.enable_schema_based_sharding TO ON;
|
||||
CREATE SCHEMA tenant_6;
|
||||
NOTICE: distributing the schema tenant_6
|
||||
CREATE TABLE tenant_6.tbl_1(a int, b text);
|
||||
-- verify pg_dist_partition entries for tenant_3.tbl_1 and tenant_6.tbl_1
|
||||
SELECT COUNT(*)=2 FROM pg_dist_partition
|
||||
|
@ -1582,6 +1671,7 @@ SELECT create_distributed_table('type_sing', NULL, colocate_with:='none');
|
|||
|
||||
SET citus.enable_schema_based_sharding TO ON;
|
||||
CREATE SCHEMA type_sch;
|
||||
NOTICE: distributing the schema type_sch
|
||||
CREATE TABLE type_sch.tbl (a INT);
|
||||
SELECT table_name, citus_table_type FROM public.citus_tables WHERE table_name::text LIKE 'type_%';
|
||||
table_name | citus_table_type
|
||||
|
@ -1603,10 +1693,12 @@ SET citus.enable_schema_based_sharding TO ON;
|
|||
CREATE USER citus_schema_role SUPERUSER;
|
||||
SET ROLE citus_schema_role;
|
||||
CREATE SCHEMA citus_sch1;
|
||||
NOTICE: distributing the schema citus_sch1
|
||||
CREATE TABLE citus_sch1.tbl1(a INT);
|
||||
CREATE TABLE citus_sch1.tbl2(a INT);
|
||||
RESET ROLE;
|
||||
CREATE SCHEMA citus_sch2;
|
||||
NOTICE: distributing the schema citus_sch2
|
||||
CREATE TABLE citus_sch2.tbl1(a INT);
|
||||
SET citus.enable_schema_based_sharding TO OFF;
|
||||
INSERT INTO citus_sch1.tbl1 SELECT * FROM generate_series(1, 10000);
|
||||
|
@ -1638,7 +1730,9 @@ ORDER BY cs.schema_name::text;
|
|||
-- test empty schema and empty tables
|
||||
SET citus.enable_schema_based_sharding TO ON;
|
||||
CREATE SCHEMA citus_empty_sch1;
|
||||
NOTICE: distributing the schema citus_empty_sch1
|
||||
CREATE SCHEMA citus_empty_sch2;
|
||||
NOTICE: distributing the schema citus_empty_sch2
|
||||
CREATE TABLE citus_empty_sch2.tbl1(a INT);
|
||||
SET citus.enable_schema_based_sharding TO OFF;
|
||||
SELECT schema_name, schema_size FROM public.citus_schemas
|
||||
|
@ -1679,6 +1773,7 @@ SET search_path TO regular_schema;
|
|||
SET citus.enable_schema_based_sharding TO ON;
|
||||
CREATE ROLE authschema;
|
||||
CREATE SCHEMA AUTHORIZATION authschema;
|
||||
NOTICE: distributing the schema authschema
|
||||
SET citus.enable_schema_based_sharding TO OFF;
|
||||
SELECT result FROM run_command_on_all_nodes($$
|
||||
SELECT COUNT(*)=1
|
||||
|
@ -1693,7 +1788,10 @@ $$);
|
|||
(3 rows)
|
||||
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA regular_schema, tenant_3, tenant_5, tenant_7, tenant_6, type_sch, citus_sch1, citus_sch2, citus_empty_sch1, citus_empty_sch2, authschema CASCADE;
|
||||
DROP SCHEMA regular_schema, tenant_3, tenant_5, tenant_7, tenant_6,
|
||||
type_sch, citus_sch1, citus_sch2, citus_empty_sch1,
|
||||
citus_empty_sch2, authschema, schema_using_schema_elements,
|
||||
schema_using_schema_elements_complex, schema_using_schema_elements_with_local CASCADE;
|
||||
DROP ROLE citus_schema_role, citus_schema_nonpri, authschema;
|
||||
SELECT citus_remove_node('localhost', :master_port);
|
||||
citus_remove_node
|
||||
|
|
|
@ -159,6 +159,7 @@ ERROR: relation "single_node_nullkey_c1_90630532" is a shard relation
|
|||
-- create a tenant schema on single node setup
|
||||
SET citus.enable_schema_based_sharding TO ON;
|
||||
CREATE SCHEMA tenant_1;
|
||||
NOTICE: distributing the schema tenant_1
|
||||
CREATE TABLE tenant_1.tbl_1 (a int);
|
||||
-- verify that we recorded tenant_1 in pg_dist_schema
|
||||
SELECT COUNT(*)=1 FROM pg_dist_schema WHERE schemaid::regnamespace::text = 'tenant_1';
|
||||
|
|
|
@ -159,6 +159,7 @@ ERROR: relation "single_node_nullkey_c1_90630532" is a shard relation
|
|||
-- create a tenant schema on single node setup
|
||||
SET citus.enable_schema_based_sharding TO ON;
|
||||
CREATE SCHEMA tenant_1;
|
||||
NOTICE: distributing the schema tenant_1
|
||||
CREATE TABLE tenant_1.tbl_1 (a int);
|
||||
-- verify that we recorded tenant_1 in pg_dist_schema
|
||||
SELECT COUNT(*)=1 FROM pg_dist_schema WHERE schemaid::regnamespace::text = 'tenant_1';
|
||||
|
|
|
@ -86,6 +86,7 @@ ALTER SCHEMA tenant_1 RENAME TO "tenant\'_1";
|
|||
ALTER SCHEMA tenant_2 RENAME TO "tenant\'_2";
|
||||
SET citus.enable_schema_based_sharding TO ON;
|
||||
CREATE SCHEMA tenant_3;
|
||||
NOTICE: distributing the schema tenant_3
|
||||
-- Show that we can create furher tenant schemas after pg upgrade.
|
||||
SELECT COUNT(*)=1 FROM pg_dist_schema WHERE schemaid::regnamespace::text = 'tenant_3';
|
||||
?column?
|
||||
|
|
|
@ -4,7 +4,9 @@ SET citus.enable_schema_based_sharding TO ON;
|
|||
-- them into public schema.
|
||||
-- empty tenant
|
||||
CREATE SCHEMA "tenant\'_1";
|
||||
NOTICE: distributing the schema tenant\'_1
|
||||
-- non-empty tenant
|
||||
CREATE SCHEMA "tenant\'_2";
|
||||
NOTICE: distributing the schema tenant\'_2
|
||||
CREATE TABLE "tenant\'_2".test_table(a int, b text);
|
||||
RESET citus.enable_schema_based_sharding;
|
||||
|
|
|
@ -198,6 +198,12 @@ ALTER TABLE tenant1.table5 OWNER TO dummyregular;
|
|||
SET role dummyregular;
|
||||
SELECT citus_schema_distribute('tenant1');
|
||||
|
||||
-- grant create on database, then redistribute
|
||||
SET role tenantuser;
|
||||
SELECT result FROM run_command_on_all_nodes($$ GRANT CREATE ON DATABASE regression TO dummyregular; $$);
|
||||
SET role dummyregular;
|
||||
SELECT citus_schema_distribute('tenant1');
|
||||
|
||||
-- show the schema is a tenant schema now
|
||||
SELECT colocationid AS tenant1_colocid FROM pg_dist_schema schemaid \gset
|
||||
-- below query verifies the same colocationid in pg_dist_schema, pg_dist_colocation and all entries in pg_dist_partition at the same time
|
||||
|
@ -220,6 +226,7 @@ SELECT result FROM run_command_on_all_nodes($$ SELECT array_agg(logicalrelid ORD
|
|||
|
||||
RESET role;
|
||||
SELECT result FROM run_command_on_all_nodes($$ REASSIGN OWNED BY dummyregular TO tenantuser; $$);
|
||||
SELECT result FROM run_command_on_all_nodes($$ REVOKE CREATE ON DATABASE regression FROM dummyregular; $$);
|
||||
DROP USER dummyregular;
|
||||
|
||||
CREATE USER dummysuper superuser;
|
||||
|
|
|
@ -209,8 +209,66 @@ $$);
|
|||
SELECT COUNT(DISTINCT(colocationid))=2 FROM pg_dist_schema
|
||||
WHERE schemaid::regnamespace::text IN ('tenant_1', 'tenant_2');
|
||||
|
||||
-- verify that we don't allow creating tenant tables via CREATE SCHEMA command
|
||||
CREATE SCHEMA schema_using_schema_elements CREATE TABLE test_table(a int, b text);
|
||||
-- verify that we allow creating tenant tables via CREATE SCHEMA command
|
||||
CREATE SCHEMA schema_using_schema_elements
|
||||
CREATE TABLE test_table(a int, b text);
|
||||
SELECT COUNT(*)=1 FROM pg_dist_partition
|
||||
WHERE logicalrelid = 'schema_using_schema_elements.test_table'::regclass AND
|
||||
partmethod = 'n' AND repmodel = 's' AND colocationid = (
|
||||
SELECT colocationid FROM pg_dist_partition
|
||||
WHERE logicalrelid = 'schema_using_schema_elements.test_table'::regclass);
|
||||
|
||||
-- verify that we allow creating tenant tables with other objects via CREATE SCHEMA command
|
||||
CREATE OR REPLACE FUNCTION dummy_trigger_fn()
|
||||
RETURNS trigger
|
||||
AS $$
|
||||
BEGIN
|
||||
RETURN NULL;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
SET citus.enable_unsafe_triggers TO on;
|
||||
|
||||
CREATE SCHEMA schema_using_schema_elements_complex
|
||||
CREATE SEQUENCE seq
|
||||
CREATE TABLE test_table(a int DEFAULT nextval('seq'), b tsvector)
|
||||
CREATE INDEX gin_idx ON test_table USING GIN (b)
|
||||
CREATE VIEW v AS SELECT * FROM test_table
|
||||
GRANT CREATE ON SCHEMA schema_using_schema_elements_complex TO public
|
||||
GRANT SELECT ON TABLE test_table TO public
|
||||
CREATE TRIGGER dummy_trigger
|
||||
AFTER INSERT OR UPDATE OR DELETE ON test_table
|
||||
FOR EACH ROW EXECUTE FUNCTION dummy_trigger_fn();
|
||||
|
||||
RESET citus.enable_unsafe_triggers;
|
||||
|
||||
SELECT COUNT(*)=1 FROM pg_dist_partition
|
||||
WHERE logicalrelid = 'schema_using_schema_elements_complex.test_table'::regclass AND
|
||||
partmethod = 'n' AND repmodel = 's' AND colocationid = (
|
||||
SELECT colocationid FROM pg_dist_partition
|
||||
WHERE logicalrelid = 'schema_using_schema_elements_complex.test_table'::regclass);
|
||||
|
||||
-- verify that we allow fkeys with reference tables while creating tenant tables with other objects via CREATE SCHEMA command
|
||||
CREATE SCHEMA schema_using_schema_elements_with_local
|
||||
CREATE TABLE local_table(id int PRIMARY KEY REFERENCES regular_schema.ref_tbl(id))
|
||||
CREATE TABLE test_table(a int REFERENCES local_table(id), b text);
|
||||
SELECT COUNT(*)=1 FROM pg_dist_partition
|
||||
WHERE logicalrelid = 'schema_using_schema_elements_with_local.test_table'::regclass AND
|
||||
partmethod = 'n' AND repmodel = 's' AND colocationid = (
|
||||
SELECT colocationid FROM pg_dist_partition
|
||||
WHERE logicalrelid = 'schema_using_schema_elements_with_local.test_table'::regclass);
|
||||
SELECT COUNT(*)=1 FROM pg_dist_partition
|
||||
WHERE logicalrelid = 'schema_using_schema_elements_with_local.local_table'::regclass AND
|
||||
partmethod = 'n' AND repmodel = 's' AND colocationid = (
|
||||
SELECT colocationid FROM pg_dist_partition
|
||||
WHERE logicalrelid = 'schema_using_schema_elements_with_local.local_table'::regclass);
|
||||
|
||||
-- verify that we do not allow fkeys with other schemas while creating tenant tables with other objects via CREATE SCHEMA command
|
||||
CREATE TABLE regular_schema.loc_t(id int PRIMARY KEY REFERENCES regular_schema.ref_tbl(id));
|
||||
CREATE SCHEMA schema_using_schema_elements_with_local2
|
||||
CREATE TABLE local_table(id int PRIMARY KEY REFERENCES regular_schema.loc_t(id))
|
||||
CREATE TABLE test_table(a int REFERENCES local_table(id), b text);
|
||||
|
||||
|
||||
CREATE SCHEMA tenant_4;
|
||||
CREATE TABLE tenant_4.tbl_1(a int, b text);
|
||||
|
@ -1154,7 +1212,9 @@ SELECT result FROM run_command_on_all_nodes($$
|
|||
$$);
|
||||
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA regular_schema, tenant_3, tenant_5, tenant_7, tenant_6, type_sch, citus_sch1, citus_sch2, citus_empty_sch1, citus_empty_sch2, authschema CASCADE;
|
||||
DROP SCHEMA regular_schema, tenant_3, tenant_5, tenant_7, tenant_6,
|
||||
type_sch, citus_sch1, citus_sch2, citus_empty_sch1,
|
||||
citus_empty_sch2, authschema, schema_using_schema_elements,
|
||||
schema_using_schema_elements_complex, schema_using_schema_elements_with_local CASCADE;
|
||||
DROP ROLE citus_schema_role, citus_schema_nonpri, authschema;
|
||||
|
||||
SELECT citus_remove_node('localhost', :master_port);
|
||||
|
|
Loading…
Reference in New Issue