From 85d418412d624c49e0e1333c3b4778c42a27fca1 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Thu, 9 Aug 2018 11:46:04 +0300 Subject: [PATCH] Fix DDL execution problem on MX when search_path is used Make sure that the coordinator sends the commands when the search path synchronised with the coordinator's search_path. This is only important when Citus sends the commands that are directly relayed to the worker nodes. For example, the deparsed DLL commands or queries always adds schema qualifications to the queries. So, they do not require this change. --- .../distributed/executor/multi_utility.c | 100 +++++++++++++++++- .../expected/multi_mx_schema_support.out | 78 ++++++++++++++ .../regress/sql/multi_mx_schema_support.sql | 79 ++++++++++++++ 3 files changed, 255 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index ea5b4fee6..80ab68acf 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -158,6 +158,8 @@ static bool IsIndexRenameStmt(RenameStmt *renameStmt); static bool AlterInvolvesPartitionColumn(AlterTableStmt *alterTableStatement, AlterTableCmd *command); static void ExecuteDistributedDDLJob(DDLJob *ddlJob); +static char * SetSearchPathToCurrentSearchPathCommand(void); +static char * CurrentSearchPath(void); static List * CreateIndexTaskList(Oid relationId, IndexStmt *indexStmt); static List * DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt); static List * InterShardDDLTaskList(Oid leftRelationId, Oid rightRelationId, @@ -3146,7 +3148,19 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) { if (shouldSyncMetadata) { + char *setSearchPathCommand = SetSearchPathToCurrentSearchPathCommand(); + SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION); + + /* + * Given that we're relaying the query to the worker nodes directly, + * we should set the search path exactly the same when necessary. + */ + if (setSearchPathCommand != NULL) + { + SendCommandToWorkers(WORKERS_WITH_METADATA, setSearchPathCommand); + } + SendCommandToWorkers(WORKERS_WITH_METADATA, (char *) ddlJob->commandString); } @@ -3173,8 +3187,19 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) if (shouldSyncMetadata) { - List *commandList = list_make2(DISABLE_DDL_PROPAGATION, - (char *) ddlJob->commandString); + List *commandList = list_make1(DISABLE_DDL_PROPAGATION); + char *setSearchPathCommand = SetSearchPathToCurrentSearchPathCommand(); + + /* + * Given that we're relaying the query to the worker nodes directly, + * we should set the search path exactly the same when necessary. + */ + if (setSearchPathCommand != NULL) + { + commandList = lappend(commandList, setSearchPathCommand); + } + + commandList = lappend(commandList, (char *) ddlJob->commandString); SendBareCommandListToWorkers(WORKERS_WITH_METADATA, commandList); } @@ -3193,6 +3218,77 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) } +/* + * SetSearchPathToCurrentSearchPathCommand generates a command which can + * set the search path to the exact same search path that the issueing node + * has. + * + * If the current search path is null (or doesn't have any valid schemas), + * the function returns NULL. + */ +static char * +SetSearchPathToCurrentSearchPathCommand(void) +{ + StringInfo setCommand = NULL; + char *currentSearchPath = CurrentSearchPath(); + + if (currentSearchPath == NULL) + { + return NULL; + } + + setCommand = makeStringInfo(); + appendStringInfo(setCommand, "SET search_path TO %s;", currentSearchPath); + + return setCommand->data; +} + + +/* + * CurrentSearchPath is a C interface for calling current_schemas(bool) that + * PostgreSQL exports. + * + * CurrentSchemas returns all the schemas in the seach_path that are seperated + * with comma (,) sign. The returned string can be used to set the search_path. + * + * The function omits implicit schemas. + * + * The function returns NULL if there are no valid schemas in the search_path, + * mimicing current_schemas(false) function. + */ +static char * +CurrentSearchPath(void) +{ + StringInfo currentSearchPath = makeStringInfo(); + List *searchPathList = fetch_search_path(false); + ListCell *searchPathCell; + bool schemaAdded = false; + + foreach(searchPathCell, searchPathList) + { + char *schemaName = get_namespace_name(lfirst_oid(searchPathCell)); + + /* watch out for deleted namespace */ + if (schemaName) + { + if (schemaAdded) + { + appendStringInfoString(currentSearchPath, ","); + schemaAdded = false; + } + + appendStringInfoString(currentSearchPath, quote_identifier(schemaName)); + schemaAdded = true; + } + } + + /* fetch_search_path() returns a palloc'd list that we should free now */ + list_free(searchPathList); + + return (currentSearchPath->len > 0 ? currentSearchPath->data : NULL); +} + + /* * DDLTaskList builds a list of tasks to execute a DDL command on a * given list of shards. diff --git a/src/test/regress/expected/multi_mx_schema_support.out b/src/test/regress/expected/multi_mx_schema_support.out index be9296ed5..236b20a3c 100644 --- a/src/test/regress/expected/multi_mx_schema_support.out +++ b/src/test/regress/expected/multi_mx_schema_support.out @@ -373,3 +373,81 @@ WHERE -- set task_executor back to real-time SET citus.task_executor_type TO "real-time"; +-- connect to the master and do some test +-- regarding DDL support on schemas where +-- the search_path is set +\c - - - :master_port +CREATE SCHEMA mx_ddl_schema_1; +CREATE SCHEMA mx_ddl_schema_2; +CREATE SCHEMA "CiTuS.TeAeN"; +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; +-- in the first test make sure that we handle DDLs +-- when search path is set +SET search_path TO mx_ddl_schema_1; +CREATE TABLE table_1 (key int PRIMARY KEY, value text); +SELECT create_distributed_table('table_1', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE INDEX i1 ON table_1(value); +CREATE INDEX CONCURRENTLY i2 ON table_1(value); +-- now create a foriegn key on tables that are on seperate schemas +SET search_path TO mx_ddl_schema_1, mx_ddl_schema_2; +CREATE TABLE mx_ddl_schema_2.table_2 (key int PRIMARY KEY, value text); +SELECT create_distributed_table('mx_ddl_schema_2.table_2', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +ALTER TABLE table_2 ADD CONSTRAINT test_constraint FOREIGN KEY (key) REFERENCES table_1(key); +-- we can also handle schema/table names with quotation +SET search_path TO "CiTuS.TeAeN"; +CREATE TABLE "TeeNTabLE.1!?!"(id int, "TeNANt_Id" int); +SELECT create_distributed_table('"TeeNTabLE.1!?!"', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE INDEX "MyTenantIndex" ON "CiTuS.TeAeN"."TeeNTabLE.1!?!"("TeNANt_Id"); +SET search_path TO "CiTuS.TeAeN", mx_ddl_schema_1, mx_ddl_schema_2; +ALTER TABLE "TeeNTabLE.1!?!" ADD CONSTRAINT test_constraint_2 FOREIGN KEY (id) REFERENCES table_1(key); +ALTER TABLE "TeeNTabLE.1!?!" ADD COLUMN new_col INT; +-- same semantics with CREATE INDEX CONCURRENTLY such that +-- it uses a single connection to execute all the commands +SET citus.multi_shard_modify_mode TO 'sequential'; +ALTER TABLE "TeeNTabLE.1!?!" DROP COLUMN new_col; +-- set it back to the default value +SET citus.multi_shard_modify_mode TO 'parallel'; +-- test with a not existing schema is in the search path +SET search_path TO not_existing_schema, "CiTuS.TeAeN"; +ALTER TABLE "TeeNTabLE.1!?!" ADD COLUMN new_col INT; +-- test with a public schema is in the search path +SET search_path TO public, "CiTuS.TeAeN"; +ALTER TABLE "TeeNTabLE.1!?!" DROP COLUMN new_col; +-- make sure that we handle transaction blocks properly +BEGIN; + SET search_path TO public, "CiTuS.TeAeN"; + ALTER TABLE "TeeNTabLE.1!?!" ADD COLUMN new_col INT; + SET search_path TO mx_ddl_schema_1; + CREATE INDEX i55 ON table_1(value); + SET search_path TO mx_ddl_schema_1, public, "CiTuS.TeAeN"; + ALTER TABLE "TeeNTabLE.1!?!" DROP COLUMN new_col; + DROP INDEX i55; +COMMIT; +-- set the search_path to null +SET search_path TO ''; +ALTER TABLE "CiTuS.TeAeN"."TeeNTabLE.1!?!" ADD COLUMN new_col INT; +-- set the search_path to not existing schema +SET search_path TO not_existing_schema; +ALTER TABLE "CiTuS.TeAeN"."TeeNTabLE.1!?!" DROP COLUMN new_col; +DROP SCHEMA mx_ddl_schema_1, mx_ddl_schema_2, "CiTuS.TeAeN" CASCADE; +NOTICE: drop cascades to 3 other objects +DETAIL: drop cascades to table "CiTuS.TeAeN"."TeeNTabLE.1!?!" +drop cascades to table mx_ddl_schema_2.table_2 +drop cascades to table mx_ddl_schema_1.table_1 diff --git a/src/test/regress/sql/multi_mx_schema_support.sql b/src/test/regress/sql/multi_mx_schema_support.sql index 999b6d799..af0dfadb0 100644 --- a/src/test/regress/sql/multi_mx_schema_support.sql +++ b/src/test/regress/sql/multi_mx_schema_support.sql @@ -220,3 +220,82 @@ WHERE -- set task_executor back to real-time SET citus.task_executor_type TO "real-time"; +-- connect to the master and do some test +-- regarding DDL support on schemas where +-- the search_path is set +\c - - - :master_port + +CREATE SCHEMA mx_ddl_schema_1; +CREATE SCHEMA mx_ddl_schema_2; +CREATE SCHEMA "CiTuS.TeAeN"; + +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; + +-- in the first test make sure that we handle DDLs +-- when search path is set +SET search_path TO mx_ddl_schema_1; +CREATE TABLE table_1 (key int PRIMARY KEY, value text); +SELECT create_distributed_table('table_1', 'key'); +CREATE INDEX i1 ON table_1(value); +CREATE INDEX CONCURRENTLY i2 ON table_1(value); + + +-- now create a foriegn key on tables that are on seperate schemas +SET search_path TO mx_ddl_schema_1, mx_ddl_schema_2; +CREATE TABLE mx_ddl_schema_2.table_2 (key int PRIMARY KEY, value text); +SELECT create_distributed_table('mx_ddl_schema_2.table_2', 'key'); + +ALTER TABLE table_2 ADD CONSTRAINT test_constraint FOREIGN KEY (key) REFERENCES table_1(key); + +-- we can also handle schema/table names with quotation +SET search_path TO "CiTuS.TeAeN"; +CREATE TABLE "TeeNTabLE.1!?!"(id int, "TeNANt_Id" int); +SELECT create_distributed_table('"TeeNTabLE.1!?!"', 'id'); + +CREATE INDEX "MyTenantIndex" ON "CiTuS.TeAeN"."TeeNTabLE.1!?!"("TeNANt_Id"); +SET search_path TO "CiTuS.TeAeN", mx_ddl_schema_1, mx_ddl_schema_2; + +ALTER TABLE "TeeNTabLE.1!?!" ADD CONSTRAINT test_constraint_2 FOREIGN KEY (id) REFERENCES table_1(key); + +ALTER TABLE "TeeNTabLE.1!?!" ADD COLUMN new_col INT; + +-- same semantics with CREATE INDEX CONCURRENTLY such that +-- it uses a single connection to execute all the commands +SET citus.multi_shard_modify_mode TO 'sequential'; +ALTER TABLE "TeeNTabLE.1!?!" DROP COLUMN new_col; + +-- set it back to the default value +SET citus.multi_shard_modify_mode TO 'parallel'; + +-- test with a not existing schema is in the search path +SET search_path TO not_existing_schema, "CiTuS.TeAeN"; +ALTER TABLE "TeeNTabLE.1!?!" ADD COLUMN new_col INT; + +-- test with a public schema is in the search path +SET search_path TO public, "CiTuS.TeAeN"; +ALTER TABLE "TeeNTabLE.1!?!" DROP COLUMN new_col; + +-- make sure that we handle transaction blocks properly +BEGIN; + SET search_path TO public, "CiTuS.TeAeN"; + ALTER TABLE "TeeNTabLE.1!?!" ADD COLUMN new_col INT; + + SET search_path TO mx_ddl_schema_1; + CREATE INDEX i55 ON table_1(value); + + SET search_path TO mx_ddl_schema_1, public, "CiTuS.TeAeN"; + ALTER TABLE "TeeNTabLE.1!?!" DROP COLUMN new_col; + DROP INDEX i55; +COMMIT; + +-- set the search_path to null +SET search_path TO ''; +ALTER TABLE "CiTuS.TeAeN"."TeeNTabLE.1!?!" ADD COLUMN new_col INT; + +-- set the search_path to not existing schema +SET search_path TO not_existing_schema; +ALTER TABLE "CiTuS.TeAeN"."TeeNTabLE.1!?!" DROP COLUMN new_col; + +DROP SCHEMA mx_ddl_schema_1, mx_ddl_schema_2, "CiTuS.TeAeN" CASCADE;