Adds alter table schema propagation

pull/3373/head
Halil Ozan Akgul 2020-01-06 16:21:32 +03:00
parent b6e09eb691
commit c5539d20d9
13 changed files with 730 additions and 53 deletions

View File

@ -85,13 +85,6 @@ static DistributeObjectOps Any_AlterFunction = {
.postprocess = NULL, .postprocess = NULL,
.address = AlterFunctionStmtObjectAddress, .address = AlterFunctionStmtObjectAddress,
}; };
static DistributeObjectOps Any_AlterObjectSchema = {
.deparse = NULL,
.qualify = NULL,
.preprocess = PreprocessAlterTableSchemaStmt,
.postprocess = NULL,
.address = NULL,
};
static DistributeObjectOps Any_AlterPolicy = { static DistributeObjectOps Any_AlterPolicy = {
.deparse = NULL, .deparse = NULL,
.qualify = NULL, .qualify = NULL,
@ -386,6 +379,13 @@ static DistributeObjectOps Table_AlterTable = {
.postprocess = NULL, .postprocess = NULL,
.address = NULL, .address = NULL,
}; };
static DistributeObjectOps Table_AlterObjectSchema = {
.deparse = DeparseAlterTableSchemaStmt,
.qualify = QualifyAlterTableSchemaStmt,
.preprocess = PreprocessAlterTableSchemaStmt,
.postprocess = PostprocessAlterTableSchemaStmt,
.address = AlterTableSchemaStmtObjectAddress,
};
static DistributeObjectOps Table_Drop = { static DistributeObjectOps Table_Drop = {
.deparse = NULL, .deparse = NULL,
.qualify = NULL, .qualify = NULL,
@ -522,6 +522,11 @@ GetDistributeObjectOps(Node *node)
return &Routine_AlterObjectSchema; return &Routine_AlterObjectSchema;
} }
case OBJECT_TABLE:
{
return &Table_AlterObjectSchema;
}
case OBJECT_TYPE: case OBJECT_TYPE:
{ {
return &Type_AlterObjectSchema; return &Type_AlterObjectSchema;
@ -529,7 +534,7 @@ GetDistributeObjectOps(Node *node)
default: default:
{ {
return &Any_AlterObjectSchema; return &NoDistributeOps;
} }
} }
} }

View File

@ -104,38 +104,3 @@ PreprocessDropSchemaStmt(Node *node, const char *queryString)
return NIL; return NIL;
} }
/*
* PreprocessAlterTableSchemaStmt determines whether a given ALTER ... SET SCHEMA
* statement involves a distributed table and issues a warning if so. Because
* we do not support distributed ALTER ... SET SCHEMA, this function always
* returns NIL.
*/
List *
PreprocessAlterTableSchemaStmt(Node *node, const char *queryString)
{
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
if (stmt->relation == NULL)
{
return NIL;
}
Oid relationId = RangeVarGetRelid(stmt->relation,
AccessExclusiveLock,
stmt->missing_ok);
/* first check whether a distributed relation is affected */
if (!OidIsValid(relationId) || !IsDistributedTable(relationId))
{
return NIL;
}
/* emit a warning if a distributed relation is affected */
ereport(WARNING, (errmsg("not propagating ALTER ... SET SCHEMA commands to "
"worker nodes"),
errhint("Connect to worker nodes directly to manually "
"change schemas of affected objects.")));
return NIL;
}

View File

@ -22,6 +22,7 @@
#include "distributed/colocation_utils.h" #include "distributed/colocation_utils.h"
#include "distributed/commands.h" #include "distributed/commands.h"
#include "distributed/commands/utility_hook.h" #include "distributed/commands/utility_hook.h"
#include "distributed/deparser.h"
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/metadata_sync.h" #include "distributed/metadata_sync.h"
#include "distributed/multi_executor.h" #include "distributed/multi_executor.h"
@ -251,6 +252,30 @@ PostprocessAlterTableStmtAttachPartition(AlterTableStmt *alterTableStatement,
} }
/*
* PostprocessAlterTableSchemaStmt is executed after the change has been applied locally, we
* can now use the new dependencies of the table to ensure all its dependencies exist on
* the workers before we apply the commands remotely.
*/
List *
PostprocessAlterTableSchemaStmt(Node *node, const char *queryString)
{
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
Assert(stmt->objectType == OBJECT_TABLE);
ObjectAddress tableAddress = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagate() || !IsDistributedTable(tableAddress.objectId))
{
return NIL;
}
EnsureDependenciesExistOnAllNodes(&tableAddress);
return NIL;
}
/* /*
* PreprocessAlterTableStmt determines whether a given ALTER TABLE statement involves * PreprocessAlterTableStmt determines whether a given ALTER TABLE statement involves
* a distributed table. If so (and if the statement does not use unsupported * a distributed table. If so (and if the statement does not use unsupported
@ -476,6 +501,41 @@ PreprocessAlterTableMoveAllStmt(Node *node, const char *queryString)
} }
/*
* PreprocessAlterTableSchemaStmt is executed before the statement is applied to the local
* postgres instance.
*
* In this stage we can prepare the commands that will alter the schemas of the shards.
*/
List *
PreprocessAlterTableSchemaStmt(Node *node, const char *queryString)
{
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
Assert(stmt->objectType == OBJECT_TABLE);
if (stmt->relation == NULL)
{
return NIL;
}
ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt,
stmt->missing_ok);
Oid relationId = address.objectId;
/* first check whether a distributed relation is affected */
if (!OidIsValid(relationId) || !IsDistributedTable(relationId))
{
return NIL;
}
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
QualifyTreeNode((Node *) stmt);
ddlJob->targetRelationId = relationId;
ddlJob->concurrentIndexCmd = false;
ddlJob->commandString = DeparseTreeNode((Node *) stmt);
ddlJob->taskList = DDLTaskList(relationId, ddlJob->commandString);
return list_make1(ddlJob);
}
/* /*
* WorkerProcessAlterTableStmt checks and processes the alter table statement to be * WorkerProcessAlterTableStmt checks and processes the alter table statement to be
* worked on the distributed table of the worker node. Currently, it only processes * worked on the distributed table of the worker node. Currently, it only processes
@ -1390,3 +1450,53 @@ ErrorIfUnsupportedAlterAddConstraintStmt(AlterTableStmt *alterTableStatement)
colocationId); colocationId);
relation_close(relation, NoLock); relation_close(relation, NoLock);
} }
/*
* AlterTableSchemaStmtObjectAddress returns the ObjectAddress of the table that is the
* object of the AlterObjectSchemaStmt.
*
* This could be called both before or after it has been applied locally. It will look in
* the old schema first, if the table cannot be found in that schema it will look in the
* new schema. Errors if missing_ok is false and the table cannot be found in either of the
* schemas.
*/
ObjectAddress
AlterTableSchemaStmtObjectAddress(Node *node, bool missing_ok)
{
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
Assert(stmt->objectType == OBJECT_TABLE);
const char *tableName = stmt->relation->relname;
Oid tableOid = InvalidOid;
if (stmt->relation->schemaname)
{
const char *schemaName = stmt->relation->schemaname;
Oid schemaOid = get_namespace_oid(schemaName, false);
tableOid = get_relname_relid(tableName, schemaOid);
}
else
{
tableOid = RelnameGetRelid(stmt->relation->relname);
}
if (tableOid == InvalidOid)
{
const char *newSchemaName = stmt->newschema;
Oid newSchemaOid = get_namespace_oid(newSchemaName, true);
tableOid = get_relname_relid(tableName, newSchemaOid);
if (!missing_ok && tableOid == InvalidOid)
{
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_TABLE),
errmsg("relation \"%s\" does not exist",
quote_qualified_identifier(stmt->relation->schemaname,
tableName))));
}
}
ObjectAddress address = { 0 };
ObjectAddressSet(address, RelationRelationId, tableOid);
return address;
}

View File

@ -0,0 +1,48 @@
/*-------------------------------------------------------------------------
*
* deparse_table_stmts.c
* All routines to deparse table statements.
* This file contains all entry points specific for table statement deparsing as well as
* functions that are currently only used for deparsing of the table statements.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "distributed/deparser.h"
#include "nodes/nodes.h"
#include "nodes/parsenodes.h"
#include "utils/builtins.h"
static void AppendAlterTableSchemaStmt(StringInfo buf, AlterObjectSchemaStmt *stmt);
char *
DeparseAlterTableSchemaStmt(Node *node)
{
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
StringInfoData str = { 0 };
initStringInfo(&str);
Assert(stmt->objectType == OBJECT_TABLE);
AppendAlterTableSchemaStmt(&str, stmt);
return str.data;
}
static void
AppendAlterTableSchemaStmt(StringInfo buf, AlterObjectSchemaStmt *stmt)
{
Assert(stmt->objectType == OBJECT_TABLE);
appendStringInfo(buf, "ALTER TABLE ");
if (stmt->missing_ok)
{
appendStringInfo(buf, "IF EXISTS ");
}
char *tableName = quote_qualified_identifier(stmt->relation->schemaname,
stmt->relation->relname);
const char *newSchemaName = quote_identifier(stmt->newschema);
appendStringInfo(buf, "%s SET SCHEMA %s;", tableName, newSchemaName);
}

View File

@ -0,0 +1,40 @@
/*-------------------------------------------------------------------------
*
* qualify_table_stmt.c
* Functions specialized in fully qualifying all table statements. These
* functions are dispatched from qualify.c
*
* Fully qualifying table statements consists of adding the schema name
* to the subject of the table as well as any other branch of the
* parsetree.
*
* Goal would be that the deparser functions for these statements can
* serialize the statement without any external lookups.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "catalog/namespace.h"
#include "nodes/parsenodes.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
#include "utils/relcache.h"
#include "distributed/deparser.h"
void
QualifyAlterTableSchemaStmt(Node *node)
{
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
Assert(stmt->objectType == OBJECT_TABLE);
if (stmt->relation->schemaname == NULL)
{
Oid tableOid = RelnameGetRelid(stmt->relation->relname);
Oid schemaOid = get_rel_namespace(tableOid);
stmt->relation->schemaname = get_namespace_name(schemaOid);
}
}

View File

@ -76,6 +76,21 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId)
switch (nodeType) switch (nodeType)
{ {
case T_AlterObjectSchemaStmt:
{
AlterObjectSchemaStmt *alterObjectSchemaStmt =
(AlterObjectSchemaStmt *) parseTree;
char **relationName = &(alterObjectSchemaStmt->relation->relname);
char **relationSchemaName = &(alterObjectSchemaStmt->relation->schemaname);
/* prefix with schema name if it is not added already */
SetSchemaNameIfNotExist(relationSchemaName, schemaName);
/* append shardId to base relation name */
AppendShardIdToName(relationName, shardId);
break;
}
case T_AlterTableStmt: case T_AlterTableStmt:
{ {
/* /*

View File

@ -188,8 +188,6 @@ extern List * GenerateAlterRoleIfExistsCommandAllRoles(void);
/* schema.c - forward declarations */ /* schema.c - forward declarations */
extern List * PreprocessDropSchemaStmt(Node *dropSchemaStatement, extern List * PreprocessDropSchemaStmt(Node *dropSchemaStatement,
const char *queryString); const char *queryString);
extern List * PreprocessAlterTableSchemaStmt(Node *stmt,
const char *queryString);
extern List * PreprocessAlterObjectSchemaStmt(Node *alterObjectSchemaStmt, extern List * PreprocessAlterObjectSchemaStmt(Node *alterObjectSchemaStmt,
const char *alterObjectSchemaCommand); const char *alterObjectSchemaCommand);
@ -210,8 +208,10 @@ extern List * PostprocessCreateTableStmtPartitionOf(CreateStmt *createStatement,
extern List * PostprocessAlterTableStmtAttachPartition( extern List * PostprocessAlterTableStmtAttachPartition(
AlterTableStmt *alterTableStatement, AlterTableStmt *alterTableStatement,
const char *queryString); const char *queryString);
extern List * PostprocessAlterTableSchemaStmt(Node *node, const char *queryString);
extern List * PreprocessAlterTableStmt(Node *node, const char *alterTableCommand); extern List * PreprocessAlterTableStmt(Node *node, const char *alterTableCommand);
extern List * PreprocessAlterTableMoveAllStmt(Node *node, const char *queryString); extern List * PreprocessAlterTableMoveAllStmt(Node *node, const char *queryString);
extern List * PreprocessAlterTableSchemaStmt(Node *node, const char *queryString);
extern Node * WorkerProcessAlterTableStmt(AlterTableStmt *alterTableStatement, extern Node * WorkerProcessAlterTableStmt(AlterTableStmt *alterTableStatement,
const char *alterTableCommand); const char *alterTableCommand);
extern bool IsAlterTableRenameStmt(RenameStmt *renameStmt); extern bool IsAlterTableRenameStmt(RenameStmt *renameStmt);
@ -221,6 +221,8 @@ extern void ErrorUnsupportedAlterTableAddColumn(Oid relationId, AlterTableCmd *c
Constraint *constraint); Constraint *constraint);
extern void ErrorIfUnsupportedConstraint(Relation relation, char distributionMethod, extern void ErrorIfUnsupportedConstraint(Relation relation, char distributionMethod,
Var *distributionColumn, uint32 colocationId); Var *distributionColumn, uint32 colocationId);
extern ObjectAddress AlterTableSchemaStmtObjectAddress(Node *stmt,
bool missing_ok);
/* truncate.c - forward declarations */ /* truncate.c - forward declarations */

View File

@ -46,6 +46,11 @@ extern void QualifyRenameCollationStmt(Node *stmt);
extern void QualifyAlterCollationSchemaStmt(Node *stmt); extern void QualifyAlterCollationSchemaStmt(Node *stmt);
extern void QualifyAlterCollationOwnerStmt(Node *stmt); extern void QualifyAlterCollationOwnerStmt(Node *stmt);
/* forward declarations for deparse_table_stmts.c */
extern char * DeparseAlterTableSchemaStmt(Node *stmt);
extern void QualifyAlterTableSchemaStmt(Node *stmt);
/* forward declarations for deparse_type_stmts.c */ /* forward declarations for deparse_type_stmts.c */
extern char * DeparseCompositeTypeStmt(Node *stmt); extern char * DeparseCompositeTypeStmt(Node *stmt);
extern char * DeparseCreateEnumStmt(Node *stmt); extern char * DeparseCreateEnumStmt(Node *stmt);

View File

@ -92,8 +92,6 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
-- Show that schema changes are included in the metadata snapshot -- Show that schema changes are included in the metadata snapshot
CREATE SCHEMA mx_testing_schema; CREATE SCHEMA mx_testing_schema;
ALTER TABLE mx_test_table SET SCHEMA mx_testing_schema; ALTER TABLE mx_test_table SET SCHEMA mx_testing_schema;
WARNING: not propagating ALTER ... SET SCHEMA commands to worker nodes
HINT: Connect to worker nodes directly to manually change schemas of affected objects.
SELECT unnest(master_metadata_snapshot()) order by 1; SELECT unnest(master_metadata_snapshot()) order by 1;
unnest unnest
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -451,3 +451,74 @@ NOTICE: drop cascades to 3 other objects
DETAIL: drop cascades to table "CiTuS.TeAeN"."TeeNTabLE.1!?!" DETAIL: drop cascades to table "CiTuS.TeAeN"."TeeNTabLE.1!?!"
drop cascades to table mx_ddl_schema_2.table_2 drop cascades to table mx_ddl_schema_2.table_2
drop cascades to table mx_ddl_schema_1.table_1 drop cascades to table mx_ddl_schema_1.table_1
-- test if ALTER TABLE SET SCHEMA sets the original table in the worker
SET search_path TO public;
CREATE SCHEMA mx_old_schema;
CREATE TABLE mx_old_schema.table_set_schema (id int);
SELECT create_distributed_table('mx_old_schema.table_set_schema', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE SCHEMA mx_new_schema;
SELECT objid::oid::regnamespace as "Distributed Schemas"
FROM citus.pg_dist_object
WHERE objid::oid::regnamespace IN ('mx_old_schema', 'mx_new_schema');
Distributed Schemas
---------------------------------------------------------------------
mx_old_schema
(1 row)
\c - - - :worker_1_port
SELECT table_schema AS "Table's Schema" FROM information_schema.tables WHERE table_name='table_set_schema';
Table's Schema
---------------------------------------------------------------------
mx_old_schema
(1 row)
SELECT table_schema AS "Shards' Schema"
FROM information_schema.tables
WHERE table_name LIKE 'table\_set\_schema\_%'
GROUP BY table_schema;
Shards' Schema
---------------------------------------------------------------------
mx_old_schema
(1 row)
\c - - - :master_port
ALTER TABLE mx_old_schema.table_set_schema SET SCHEMA mx_new_schema;
SELECT objid::oid::regnamespace as "Distributed Schemas"
FROM citus.pg_dist_object
WHERE objid::oid::regnamespace IN ('mx_old_schema', 'mx_new_schema');
Distributed Schemas
---------------------------------------------------------------------
mx_old_schema
mx_new_schema
(2 rows)
\c - - - :worker_1_port
SELECT table_schema AS "Table's Schema" FROM information_schema.tables WHERE table_name='table_set_schema';
Table's Schema
---------------------------------------------------------------------
mx_new_schema
(1 row)
SELECT table_schema AS "Shards' Schema"
FROM information_schema.tables
WHERE table_name LIKE 'table\_set\_schema\_%'
GROUP BY table_schema;
Shards' Schema
---------------------------------------------------------------------
mx_new_schema
(1 row)
\c - - - :master_port
SELECT * FROM mx_new_schema.table_set_schema;
id
---------------------------------------------------------------------
(0 rows)
DROP SCHEMA mx_old_schema CASCADE;
DROP SCHEMA mx_new_schema CASCADE;
NOTICE: drop cascades to table mx_new_schema.table_set_schema

View File

@ -984,11 +984,250 @@ WHERE
-- set task_executor back to adaptive -- set task_executor back to adaptive
SET citus.task_executor_type TO "adaptive"; SET citus.task_executor_type TO "adaptive";
-- test ALTER TABLE SET SCHEMA -- test ALTER TABLE SET SCHEMA
-- we expect that it will warn out
SET search_path TO public; SET search_path TO public;
ALTER TABLE test_schema_support.nation_hash SET SCHEMA public; CREATE SCHEMA old_schema;
WARNING: not propagating ALTER ... SET SCHEMA commands to worker nodes CREATE TABLE old_schema.table_set_schema(id int);
HINT: Connect to worker nodes directly to manually change schemas of affected objects. SELECT create_distributed_table('old_schema.table_set_schema', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE SCHEMA new_schema;
SELECT objid::oid::regnamespace as "Distributed Schemas"
FROM citus.pg_dist_object
WHERE objid::oid::regnamespace IN ('old_schema', 'new_schema');
Distributed Schemas
---------------------------------------------------------------------
old_schema
(1 row)
\c - - - :worker_1_port
SELECT table_schema AS "Shards' Schema"
FROM information_schema.tables
WHERE table_name LIKE 'table\_set\_schema\_%' AND
table_schema IN ('old_schema', 'new_schema', 'public')
GROUP BY table_schema;
Shards' Schema
---------------------------------------------------------------------
old_schema
(1 row)
\c - - - :master_port
ALTER TABLE old_schema.table_set_schema SET SCHEMA new_schema;
SELECT objid::oid::regnamespace as "Distributed Schemas"
FROM citus.pg_dist_object
WHERE objid::oid::regnamespace IN ('old_schema', 'new_schema');
Distributed Schemas
---------------------------------------------------------------------
old_schema
new_schema
(2 rows)
\c - - - :worker_1_port
SELECT table_schema AS "Shards' Schema"
FROM information_schema.tables
WHERE table_name LIKE 'table\_set\_schema\_%' AND
table_schema IN ('old_schema', 'new_schema', 'public')
GROUP BY table_schema;
Shards' Schema
---------------------------------------------------------------------
new_schema
(1 row)
\c - - - :master_port
SELECT * FROM new_schema.table_set_schema;
id
---------------------------------------------------------------------
(0 rows)
DROP SCHEMA old_schema CASCADE;
DROP SCHEMA new_schema CASCADE;
NOTICE: drop cascades to table new_schema.table_set_schema
-- test ALTER TABLE SET SCHEMA from public
CREATE TABLE table_set_schema(id int);
SELECT create_distributed_table('table_set_schema', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE SCHEMA new_schema;
SELECT objid::oid::regnamespace as "Distributed Schemas"
FROM citus.pg_dist_object
WHERE objid='new_schema'::regnamespace::oid;
Distributed Schemas
---------------------------------------------------------------------
(0 rows)
\c - - - :worker_1_port
SELECT table_schema AS "Shards' Schema"
FROM information_schema.tables
WHERE table_name LIKE 'table\_set\_schema\_%' AND
table_schema IN ('new_schema', 'public')
GROUP BY table_schema;
Shards' Schema
---------------------------------------------------------------------
public
(1 row)
\c - - - :master_port
ALTER TABLE table_set_schema SET SCHEMA new_schema;
SELECT objid::oid::regnamespace as "Distributed Schemas"
FROM citus.pg_dist_object
WHERE objid='new_schema'::regnamespace::oid;
Distributed Schemas
---------------------------------------------------------------------
new_schema
(1 row)
\c - - - :worker_1_port
SELECT table_schema AS "Shards' Schema"
FROM information_schema.tables
WHERE table_name LIKE 'table\_set\_schema\_%' AND
table_schema IN ('new_schema', 'public')
GROUP BY table_schema;
Shards' Schema
---------------------------------------------------------------------
new_schema
(1 row)
\c - - - :master_port
SELECT * FROM new_schema.table_set_schema;
id
---------------------------------------------------------------------
(0 rows)
DROP SCHEMA new_schema CASCADE;
NOTICE: drop cascades to table new_schema.table_set_schema
-- test ALTER TABLE SET SCHEMA when a search path is set
CREATE SCHEMA old_schema;
CREATE TABLE old_schema.table_set_schema(id int);
SELECT create_distributed_table('old_schema.table_set_schema', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE table_set_schema(id int);
SELECT create_distributed_table('table_set_schema', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE SCHEMA new_schema;
SELECT objid::oid::regnamespace as "Distributed Schemas"
FROM citus.pg_dist_object
WHERE objid::oid::regnamespace IN ('old_schema', 'new_schema');
Distributed Schemas
---------------------------------------------------------------------
old_schema
(1 row)
\c - - - :worker_1_port
SELECT table_schema AS "Shards' Schema", COUNT(*) AS "Counts"
FROM information_schema.tables
WHERE table_name LIKE 'table\_set\_schema\_%' AND
table_schema IN ('old_schema', 'new_schema', 'public')
GROUP BY table_schema;
Shards' Schema | Counts
---------------------------------------------------------------------
old_schema | 4
public | 4
(2 rows)
\c - - - :master_port
SET search_path TO old_schema;
ALTER TABLE table_set_schema SET SCHEMA new_schema;
SELECT objid::oid::regnamespace as "Distributed Schemas"
FROM citus.pg_dist_object
WHERE objid::oid::regnamespace IN ('old_schema', 'new_schema');
Distributed Schemas
---------------------------------------------------------------------
old_schema
new_schema
(2 rows)
\c - - - :worker_1_port
SELECT table_schema AS "Shards' Schema", COUNT(*) AS "Counts"
FROM information_schema.tables
WHERE table_name LIKE 'table\_set\_schema\_%' AND
table_schema IN ('old_schema', 'new_schema', 'public')
GROUP BY table_schema;
Shards' Schema | Counts
---------------------------------------------------------------------
new_schema | 4
public | 4
(2 rows)
\c - - - :master_port
SELECT * FROM new_schema.table_set_schema;
id
---------------------------------------------------------------------
(0 rows)
SET search_path to public;
DROP SCHEMA old_schema CASCADE;
DROP SCHEMA new_schema CASCADE;
NOTICE: drop cascades to table new_schema.table_set_schema
DROP TABLE table_set_schema;
-- test ALTER TABLE SET SCHEMA with nonexisting schemas and table
-- expect all to give error
CREATE SCHEMA existing_schema;
CREATE SCHEMA another_existing_schema;
CREATE TABLE existing_schema.table_set_schema(id int);
SELECT create_distributed_table('existing_schema.table_set_schema', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
ALTER TABLE non_existent_schema.table_set_schema SET SCHEMA another_existing_schema;
ERROR: schema "non_existent_schema" does not exist
ALTER TABLE non_existent_schema.non_existent_table SET SCHEMA another_existing_schema;
ERROR: schema "non_existent_schema" does not exist
ALTER TABLE non_existent_schema.table_set_schema SET SCHEMA another_non_existent_schema;
ERROR: schema "non_existent_schema" does not exist
ALTER TABLE non_existent_schema.non_existent_table SET SCHEMA another_non_existent_schema;
ERROR: schema "non_existent_schema" does not exist
ALTER TABLE existing_schema.non_existent_table SET SCHEMA another_existing_schema;
ERROR: relation "existing_schema.non_existent_table" does not exist
ALTER TABLE existing_schema.non_existent_table SET SCHEMA non_existent_schema;
ERROR: relation "existing_schema.non_existent_table" does not exist
ALTER TABLE existing_schema.table_set_schema SET SCHEMA non_existent_schema;
ERROR: schema "non_existent_schema" does not exist
DROP SCHEMA existing_schema, another_existing_schema CASCADE;
NOTICE: drop cascades to table existing_schema.table_set_schema
-- test ALTER TABLE SET SCHEMA with interesting names
CREATE SCHEMA "cItuS.T E E N'sSchema";
CREATE SCHEMA "citus-teen's scnd schm.";
CREATE TABLE "cItuS.T E E N'sSchema"."be$t''t*ble" (id int);
SELECT create_distributed_table('"cItuS.T E E N''sSchema"."be$t''''t*ble"', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
ALTER TABLE "cItuS.T E E N'sSchema"."be$t''t*ble" SET SCHEMA "citus-teen's scnd schm.";
\c - - - :worker_1_port
SELECT table_schema AS "Shards' Schema"
FROM information_schema.tables
WHERE table_name LIKE 'be$t''''t*ble%'
GROUP BY table_schema;
Shards' Schema
---------------------------------------------------------------------
citus-teen's scnd schm.
(1 row)
\c - - - :master_port
SELECT * FROM "citus-teen's scnd schm."."be$t''t*ble";
id
---------------------------------------------------------------------
(0 rows)
DROP SCHEMA "cItuS.T E E N'sSchema", "citus-teen's scnd schm." CASCADE;
NOTICE: drop cascades to table "citus-teen's scnd schm."."be$t''t*ble"
-- test schema propagation with user other than current user -- test schema propagation with user other than current user
SELECT run_command_on_coordinator_and_workers('CREATE USER "test-user"'); SELECT run_command_on_coordinator_and_workers('CREATE USER "test-user"');
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes NOTICE: not propagating CREATE ROLE/USER commands to worker nodes

View File

@ -299,3 +299,39 @@ SET search_path TO not_existing_schema;
ALTER TABLE "CiTuS.TeAeN"."TeeNTabLE.1!?!" DROP COLUMN new_col; ALTER TABLE "CiTuS.TeAeN"."TeeNTabLE.1!?!" DROP COLUMN new_col;
DROP SCHEMA mx_ddl_schema_1, mx_ddl_schema_2, "CiTuS.TeAeN" CASCADE; DROP SCHEMA mx_ddl_schema_1, mx_ddl_schema_2, "CiTuS.TeAeN" CASCADE;
-- test if ALTER TABLE SET SCHEMA sets the original table in the worker
SET search_path TO public;
CREATE SCHEMA mx_old_schema;
CREATE TABLE mx_old_schema.table_set_schema (id int);
SELECT create_distributed_table('mx_old_schema.table_set_schema', 'id');
CREATE SCHEMA mx_new_schema;
SELECT objid::oid::regnamespace as "Distributed Schemas"
FROM citus.pg_dist_object
WHERE objid::oid::regnamespace IN ('mx_old_schema', 'mx_new_schema');
\c - - - :worker_1_port
SELECT table_schema AS "Table's Schema" FROM information_schema.tables WHERE table_name='table_set_schema';
SELECT table_schema AS "Shards' Schema"
FROM information_schema.tables
WHERE table_name LIKE 'table\_set\_schema\_%'
GROUP BY table_schema;
\c - - - :master_port
ALTER TABLE mx_old_schema.table_set_schema SET SCHEMA mx_new_schema;
SELECT objid::oid::regnamespace as "Distributed Schemas"
FROM citus.pg_dist_object
WHERE objid::oid::regnamespace IN ('mx_old_schema', 'mx_new_schema');
\c - - - :worker_1_port
SELECT table_schema AS "Table's Schema" FROM information_schema.tables WHERE table_name='table_set_schema';
SELECT table_schema AS "Shards' Schema"
FROM information_schema.tables
WHERE table_name LIKE 'table\_set\_schema\_%'
GROUP BY table_schema;
\c - - - :master_port
SELECT * FROM mx_new_schema.table_set_schema;
DROP SCHEMA mx_old_schema CASCADE;
DROP SCHEMA mx_new_schema CASCADE;

View File

@ -721,9 +721,152 @@ SET citus.task_executor_type TO "adaptive";
-- test ALTER TABLE SET SCHEMA -- test ALTER TABLE SET SCHEMA
-- we expect that it will warn out
SET search_path TO public; SET search_path TO public;
ALTER TABLE test_schema_support.nation_hash SET SCHEMA public;
CREATE SCHEMA old_schema;
CREATE TABLE old_schema.table_set_schema(id int);
SELECT create_distributed_table('old_schema.table_set_schema', 'id');
CREATE SCHEMA new_schema;
SELECT objid::oid::regnamespace as "Distributed Schemas"
FROM citus.pg_dist_object
WHERE objid::oid::regnamespace IN ('old_schema', 'new_schema');
\c - - - :worker_1_port
SELECT table_schema AS "Shards' Schema"
FROM information_schema.tables
WHERE table_name LIKE 'table\_set\_schema\_%' AND
table_schema IN ('old_schema', 'new_schema', 'public')
GROUP BY table_schema;
\c - - - :master_port
ALTER TABLE old_schema.table_set_schema SET SCHEMA new_schema;
SELECT objid::oid::regnamespace as "Distributed Schemas"
FROM citus.pg_dist_object
WHERE objid::oid::regnamespace IN ('old_schema', 'new_schema');
\c - - - :worker_1_port
SELECT table_schema AS "Shards' Schema"
FROM information_schema.tables
WHERE table_name LIKE 'table\_set\_schema\_%' AND
table_schema IN ('old_schema', 'new_schema', 'public')
GROUP BY table_schema;
\c - - - :master_port
SELECT * FROM new_schema.table_set_schema;
DROP SCHEMA old_schema CASCADE;
DROP SCHEMA new_schema CASCADE;
-- test ALTER TABLE SET SCHEMA from public
CREATE TABLE table_set_schema(id int);
SELECT create_distributed_table('table_set_schema', 'id');
CREATE SCHEMA new_schema;
SELECT objid::oid::regnamespace as "Distributed Schemas"
FROM citus.pg_dist_object
WHERE objid='new_schema'::regnamespace::oid;
\c - - - :worker_1_port
SELECT table_schema AS "Shards' Schema"
FROM information_schema.tables
WHERE table_name LIKE 'table\_set\_schema\_%' AND
table_schema IN ('new_schema', 'public')
GROUP BY table_schema;
\c - - - :master_port
ALTER TABLE table_set_schema SET SCHEMA new_schema;
SELECT objid::oid::regnamespace as "Distributed Schemas"
FROM citus.pg_dist_object
WHERE objid='new_schema'::regnamespace::oid;
\c - - - :worker_1_port
SELECT table_schema AS "Shards' Schema"
FROM information_schema.tables
WHERE table_name LIKE 'table\_set\_schema\_%' AND
table_schema IN ('new_schema', 'public')
GROUP BY table_schema;
\c - - - :master_port
SELECT * FROM new_schema.table_set_schema;
DROP SCHEMA new_schema CASCADE;
-- test ALTER TABLE SET SCHEMA when a search path is set
CREATE SCHEMA old_schema;
CREATE TABLE old_schema.table_set_schema(id int);
SELECT create_distributed_table('old_schema.table_set_schema', 'id');
CREATE TABLE table_set_schema(id int);
SELECT create_distributed_table('table_set_schema', 'id');
CREATE SCHEMA new_schema;
SELECT objid::oid::regnamespace as "Distributed Schemas"
FROM citus.pg_dist_object
WHERE objid::oid::regnamespace IN ('old_schema', 'new_schema');
\c - - - :worker_1_port
SELECT table_schema AS "Shards' Schema", COUNT(*) AS "Counts"
FROM information_schema.tables
WHERE table_name LIKE 'table\_set\_schema\_%' AND
table_schema IN ('old_schema', 'new_schema', 'public')
GROUP BY table_schema;
\c - - - :master_port
SET search_path TO old_schema;
ALTER TABLE table_set_schema SET SCHEMA new_schema;
SELECT objid::oid::regnamespace as "Distributed Schemas"
FROM citus.pg_dist_object
WHERE objid::oid::regnamespace IN ('old_schema', 'new_schema');
\c - - - :worker_1_port
SELECT table_schema AS "Shards' Schema", COUNT(*) AS "Counts"
FROM information_schema.tables
WHERE table_name LIKE 'table\_set\_schema\_%' AND
table_schema IN ('old_schema', 'new_schema', 'public')
GROUP BY table_schema;
\c - - - :master_port
SELECT * FROM new_schema.table_set_schema;
SET search_path to public;
DROP SCHEMA old_schema CASCADE;
DROP SCHEMA new_schema CASCADE;
DROP TABLE table_set_schema;
-- test ALTER TABLE SET SCHEMA with nonexisting schemas and table
-- expect all to give error
CREATE SCHEMA existing_schema;
CREATE SCHEMA another_existing_schema;
CREATE TABLE existing_schema.table_set_schema(id int);
SELECT create_distributed_table('existing_schema.table_set_schema', 'id');
ALTER TABLE non_existent_schema.table_set_schema SET SCHEMA another_existing_schema;
ALTER TABLE non_existent_schema.non_existent_table SET SCHEMA another_existing_schema;
ALTER TABLE non_existent_schema.table_set_schema SET SCHEMA another_non_existent_schema;
ALTER TABLE non_existent_schema.non_existent_table SET SCHEMA another_non_existent_schema;
ALTER TABLE existing_schema.non_existent_table SET SCHEMA another_existing_schema;
ALTER TABLE existing_schema.non_existent_table SET SCHEMA non_existent_schema;
ALTER TABLE existing_schema.table_set_schema SET SCHEMA non_existent_schema;
DROP SCHEMA existing_schema, another_existing_schema CASCADE;
-- test ALTER TABLE SET SCHEMA with interesting names
CREATE SCHEMA "cItuS.T E E N'sSchema";
CREATE SCHEMA "citus-teen's scnd schm.";
CREATE TABLE "cItuS.T E E N'sSchema"."be$t''t*ble" (id int);
SELECT create_distributed_table('"cItuS.T E E N''sSchema"."be$t''''t*ble"', 'id');
ALTER TABLE "cItuS.T E E N'sSchema"."be$t''t*ble" SET SCHEMA "citus-teen's scnd schm.";
\c - - - :worker_1_port
SELECT table_schema AS "Shards' Schema"
FROM information_schema.tables
WHERE table_name LIKE 'be$t''''t*ble%'
GROUP BY table_schema;
\c - - - :master_port
SELECT * FROM "citus-teen's scnd schm."."be$t''t*ble";
DROP SCHEMA "cItuS.T E E N'sSchema", "citus-teen's scnd schm." CASCADE;
-- test schema propagation with user other than current user -- test schema propagation with user other than current user
SELECT run_command_on_coordinator_and_workers('CREATE USER "test-user"'); SELECT run_command_on_coordinator_and_workers('CREATE USER "test-user"');