mirror of https://github.com/citusdata/citus.git
Merge pull request #2335 from citusdata/fix_mx_schema
Fix DDL execution bug on MX when search_path is usedpull/2327/head
commit
904e1781fa
|
@ -158,6 +158,8 @@ static bool IsIndexRenameStmt(RenameStmt *renameStmt);
|
||||||
static bool AlterInvolvesPartitionColumn(AlterTableStmt *alterTableStatement,
|
static bool AlterInvolvesPartitionColumn(AlterTableStmt *alterTableStatement,
|
||||||
AlterTableCmd *command);
|
AlterTableCmd *command);
|
||||||
static void ExecuteDistributedDDLJob(DDLJob *ddlJob);
|
static void ExecuteDistributedDDLJob(DDLJob *ddlJob);
|
||||||
|
static char * SetSearchPathToCurrentSearchPathCommand(void);
|
||||||
|
static char * CurrentSearchPath(void);
|
||||||
static List * CreateIndexTaskList(Oid relationId, IndexStmt *indexStmt);
|
static List * CreateIndexTaskList(Oid relationId, IndexStmt *indexStmt);
|
||||||
static List * DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt);
|
static List * DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt);
|
||||||
static List * InterShardDDLTaskList(Oid leftRelationId, Oid rightRelationId,
|
static List * InterShardDDLTaskList(Oid leftRelationId, Oid rightRelationId,
|
||||||
|
@ -3146,7 +3148,19 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
|
||||||
{
|
{
|
||||||
if (shouldSyncMetadata)
|
if (shouldSyncMetadata)
|
||||||
{
|
{
|
||||||
|
char *setSearchPathCommand = SetSearchPathToCurrentSearchPathCommand();
|
||||||
|
|
||||||
SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION);
|
SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Given that we're relaying the query to the worker nodes directly,
|
||||||
|
* we should set the search path exactly the same when necessary.
|
||||||
|
*/
|
||||||
|
if (setSearchPathCommand != NULL)
|
||||||
|
{
|
||||||
|
SendCommandToWorkers(WORKERS_WITH_METADATA, setSearchPathCommand);
|
||||||
|
}
|
||||||
|
|
||||||
SendCommandToWorkers(WORKERS_WITH_METADATA, (char *) ddlJob->commandString);
|
SendCommandToWorkers(WORKERS_WITH_METADATA, (char *) ddlJob->commandString);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3173,8 +3187,19 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
|
||||||
|
|
||||||
if (shouldSyncMetadata)
|
if (shouldSyncMetadata)
|
||||||
{
|
{
|
||||||
List *commandList = list_make2(DISABLE_DDL_PROPAGATION,
|
List *commandList = list_make1(DISABLE_DDL_PROPAGATION);
|
||||||
(char *) ddlJob->commandString);
|
char *setSearchPathCommand = SetSearchPathToCurrentSearchPathCommand();
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Given that we're relaying the query to the worker nodes directly,
|
||||||
|
* we should set the search path exactly the same when necessary.
|
||||||
|
*/
|
||||||
|
if (setSearchPathCommand != NULL)
|
||||||
|
{
|
||||||
|
commandList = lappend(commandList, setSearchPathCommand);
|
||||||
|
}
|
||||||
|
|
||||||
|
commandList = lappend(commandList, (char *) ddlJob->commandString);
|
||||||
|
|
||||||
SendBareCommandListToWorkers(WORKERS_WITH_METADATA, commandList);
|
SendBareCommandListToWorkers(WORKERS_WITH_METADATA, commandList);
|
||||||
}
|
}
|
||||||
|
@ -3193,6 +3218,77 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* SetSearchPathToCurrentSearchPathCommand generates a command which can
|
||||||
|
* set the search path to the exact same search path that the issueing node
|
||||||
|
* has.
|
||||||
|
*
|
||||||
|
* If the current search path is null (or doesn't have any valid schemas),
|
||||||
|
* the function returns NULL.
|
||||||
|
*/
|
||||||
|
static char *
|
||||||
|
SetSearchPathToCurrentSearchPathCommand(void)
|
||||||
|
{
|
||||||
|
StringInfo setCommand = NULL;
|
||||||
|
char *currentSearchPath = CurrentSearchPath();
|
||||||
|
|
||||||
|
if (currentSearchPath == NULL)
|
||||||
|
{
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
setCommand = makeStringInfo();
|
||||||
|
appendStringInfo(setCommand, "SET search_path TO %s;", currentSearchPath);
|
||||||
|
|
||||||
|
return setCommand->data;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CurrentSearchPath is a C interface for calling current_schemas(bool) that
|
||||||
|
* PostgreSQL exports.
|
||||||
|
*
|
||||||
|
* CurrentSchemas returns all the schemas in the seach_path that are seperated
|
||||||
|
* with comma (,) sign. The returned string can be used to set the search_path.
|
||||||
|
*
|
||||||
|
* The function omits implicit schemas.
|
||||||
|
*
|
||||||
|
* The function returns NULL if there are no valid schemas in the search_path,
|
||||||
|
* mimicing current_schemas(false) function.
|
||||||
|
*/
|
||||||
|
static char *
|
||||||
|
CurrentSearchPath(void)
|
||||||
|
{
|
||||||
|
StringInfo currentSearchPath = makeStringInfo();
|
||||||
|
List *searchPathList = fetch_search_path(false);
|
||||||
|
ListCell *searchPathCell;
|
||||||
|
bool schemaAdded = false;
|
||||||
|
|
||||||
|
foreach(searchPathCell, searchPathList)
|
||||||
|
{
|
||||||
|
char *schemaName = get_namespace_name(lfirst_oid(searchPathCell));
|
||||||
|
|
||||||
|
/* watch out for deleted namespace */
|
||||||
|
if (schemaName)
|
||||||
|
{
|
||||||
|
if (schemaAdded)
|
||||||
|
{
|
||||||
|
appendStringInfoString(currentSearchPath, ",");
|
||||||
|
schemaAdded = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
appendStringInfoString(currentSearchPath, quote_identifier(schemaName));
|
||||||
|
schemaAdded = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* fetch_search_path() returns a palloc'd list that we should free now */
|
||||||
|
list_free(searchPathList);
|
||||||
|
|
||||||
|
return (currentSearchPath->len > 0 ? currentSearchPath->data : NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* DDLTaskList builds a list of tasks to execute a DDL command on a
|
* DDLTaskList builds a list of tasks to execute a DDL command on a
|
||||||
* given list of shards.
|
* given list of shards.
|
||||||
|
|
|
@ -373,3 +373,81 @@ WHERE
|
||||||
|
|
||||||
-- set task_executor back to real-time
|
-- set task_executor back to real-time
|
||||||
SET citus.task_executor_type TO "real-time";
|
SET citus.task_executor_type TO "real-time";
|
||||||
|
-- connect to the master and do some test
|
||||||
|
-- regarding DDL support on schemas where
|
||||||
|
-- the search_path is set
|
||||||
|
\c - - - :master_port
|
||||||
|
CREATE SCHEMA mx_ddl_schema_1;
|
||||||
|
CREATE SCHEMA mx_ddl_schema_2;
|
||||||
|
CREATE SCHEMA "CiTuS.TeAeN";
|
||||||
|
SET citus.shard_count TO 4;
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SET citus.replication_model TO 'streaming';
|
||||||
|
-- in the first test make sure that we handle DDLs
|
||||||
|
-- when search path is set
|
||||||
|
SET search_path TO mx_ddl_schema_1;
|
||||||
|
CREATE TABLE table_1 (key int PRIMARY KEY, value text);
|
||||||
|
SELECT create_distributed_table('table_1', 'key');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE INDEX i1 ON table_1(value);
|
||||||
|
CREATE INDEX CONCURRENTLY i2 ON table_1(value);
|
||||||
|
-- now create a foriegn key on tables that are on seperate schemas
|
||||||
|
SET search_path TO mx_ddl_schema_1, mx_ddl_schema_2;
|
||||||
|
CREATE TABLE mx_ddl_schema_2.table_2 (key int PRIMARY KEY, value text);
|
||||||
|
SELECT create_distributed_table('mx_ddl_schema_2.table_2', 'key');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ALTER TABLE table_2 ADD CONSTRAINT test_constraint FOREIGN KEY (key) REFERENCES table_1(key);
|
||||||
|
-- we can also handle schema/table names with quotation
|
||||||
|
SET search_path TO "CiTuS.TeAeN";
|
||||||
|
CREATE TABLE "TeeNTabLE.1!?!"(id int, "TeNANt_Id" int);
|
||||||
|
SELECT create_distributed_table('"TeeNTabLE.1!?!"', 'id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE INDEX "MyTenantIndex" ON "CiTuS.TeAeN"."TeeNTabLE.1!?!"("TeNANt_Id");
|
||||||
|
SET search_path TO "CiTuS.TeAeN", mx_ddl_schema_1, mx_ddl_schema_2;
|
||||||
|
ALTER TABLE "TeeNTabLE.1!?!" ADD CONSTRAINT test_constraint_2 FOREIGN KEY (id) REFERENCES table_1(key);
|
||||||
|
ALTER TABLE "TeeNTabLE.1!?!" ADD COLUMN new_col INT;
|
||||||
|
-- same semantics with CREATE INDEX CONCURRENTLY such that
|
||||||
|
-- it uses a single connection to execute all the commands
|
||||||
|
SET citus.multi_shard_modify_mode TO 'sequential';
|
||||||
|
ALTER TABLE "TeeNTabLE.1!?!" DROP COLUMN new_col;
|
||||||
|
-- set it back to the default value
|
||||||
|
SET citus.multi_shard_modify_mode TO 'parallel';
|
||||||
|
-- test with a not existing schema is in the search path
|
||||||
|
SET search_path TO not_existing_schema, "CiTuS.TeAeN";
|
||||||
|
ALTER TABLE "TeeNTabLE.1!?!" ADD COLUMN new_col INT;
|
||||||
|
-- test with a public schema is in the search path
|
||||||
|
SET search_path TO public, "CiTuS.TeAeN";
|
||||||
|
ALTER TABLE "TeeNTabLE.1!?!" DROP COLUMN new_col;
|
||||||
|
-- make sure that we handle transaction blocks properly
|
||||||
|
BEGIN;
|
||||||
|
SET search_path TO public, "CiTuS.TeAeN";
|
||||||
|
ALTER TABLE "TeeNTabLE.1!?!" ADD COLUMN new_col INT;
|
||||||
|
SET search_path TO mx_ddl_schema_1;
|
||||||
|
CREATE INDEX i55 ON table_1(value);
|
||||||
|
SET search_path TO mx_ddl_schema_1, public, "CiTuS.TeAeN";
|
||||||
|
ALTER TABLE "TeeNTabLE.1!?!" DROP COLUMN new_col;
|
||||||
|
DROP INDEX i55;
|
||||||
|
COMMIT;
|
||||||
|
-- set the search_path to null
|
||||||
|
SET search_path TO '';
|
||||||
|
ALTER TABLE "CiTuS.TeAeN"."TeeNTabLE.1!?!" ADD COLUMN new_col INT;
|
||||||
|
-- set the search_path to not existing schema
|
||||||
|
SET search_path TO not_existing_schema;
|
||||||
|
ALTER TABLE "CiTuS.TeAeN"."TeeNTabLE.1!?!" DROP COLUMN new_col;
|
||||||
|
DROP SCHEMA mx_ddl_schema_1, mx_ddl_schema_2, "CiTuS.TeAeN" CASCADE;
|
||||||
|
NOTICE: drop cascades to 3 other objects
|
||||||
|
DETAIL: drop cascades to table "CiTuS.TeAeN"."TeeNTabLE.1!?!"
|
||||||
|
drop cascades to table mx_ddl_schema_2.table_2
|
||||||
|
drop cascades to table mx_ddl_schema_1.table_1
|
||||||
|
|
|
@ -220,3 +220,82 @@ WHERE
|
||||||
-- set task_executor back to real-time
|
-- set task_executor back to real-time
|
||||||
SET citus.task_executor_type TO "real-time";
|
SET citus.task_executor_type TO "real-time";
|
||||||
|
|
||||||
|
-- connect to the master and do some test
|
||||||
|
-- regarding DDL support on schemas where
|
||||||
|
-- the search_path is set
|
||||||
|
\c - - - :master_port
|
||||||
|
|
||||||
|
CREATE SCHEMA mx_ddl_schema_1;
|
||||||
|
CREATE SCHEMA mx_ddl_schema_2;
|
||||||
|
CREATE SCHEMA "CiTuS.TeAeN";
|
||||||
|
|
||||||
|
SET citus.shard_count TO 4;
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SET citus.replication_model TO 'streaming';
|
||||||
|
|
||||||
|
-- in the first test make sure that we handle DDLs
|
||||||
|
-- when search path is set
|
||||||
|
SET search_path TO mx_ddl_schema_1;
|
||||||
|
CREATE TABLE table_1 (key int PRIMARY KEY, value text);
|
||||||
|
SELECT create_distributed_table('table_1', 'key');
|
||||||
|
CREATE INDEX i1 ON table_1(value);
|
||||||
|
CREATE INDEX CONCURRENTLY i2 ON table_1(value);
|
||||||
|
|
||||||
|
|
||||||
|
-- now create a foriegn key on tables that are on seperate schemas
|
||||||
|
SET search_path TO mx_ddl_schema_1, mx_ddl_schema_2;
|
||||||
|
CREATE TABLE mx_ddl_schema_2.table_2 (key int PRIMARY KEY, value text);
|
||||||
|
SELECT create_distributed_table('mx_ddl_schema_2.table_2', 'key');
|
||||||
|
|
||||||
|
ALTER TABLE table_2 ADD CONSTRAINT test_constraint FOREIGN KEY (key) REFERENCES table_1(key);
|
||||||
|
|
||||||
|
-- we can also handle schema/table names with quotation
|
||||||
|
SET search_path TO "CiTuS.TeAeN";
|
||||||
|
CREATE TABLE "TeeNTabLE.1!?!"(id int, "TeNANt_Id" int);
|
||||||
|
SELECT create_distributed_table('"TeeNTabLE.1!?!"', 'id');
|
||||||
|
|
||||||
|
CREATE INDEX "MyTenantIndex" ON "CiTuS.TeAeN"."TeeNTabLE.1!?!"("TeNANt_Id");
|
||||||
|
SET search_path TO "CiTuS.TeAeN", mx_ddl_schema_1, mx_ddl_schema_2;
|
||||||
|
|
||||||
|
ALTER TABLE "TeeNTabLE.1!?!" ADD CONSTRAINT test_constraint_2 FOREIGN KEY (id) REFERENCES table_1(key);
|
||||||
|
|
||||||
|
ALTER TABLE "TeeNTabLE.1!?!" ADD COLUMN new_col INT;
|
||||||
|
|
||||||
|
-- same semantics with CREATE INDEX CONCURRENTLY such that
|
||||||
|
-- it uses a single connection to execute all the commands
|
||||||
|
SET citus.multi_shard_modify_mode TO 'sequential';
|
||||||
|
ALTER TABLE "TeeNTabLE.1!?!" DROP COLUMN new_col;
|
||||||
|
|
||||||
|
-- set it back to the default value
|
||||||
|
SET citus.multi_shard_modify_mode TO 'parallel';
|
||||||
|
|
||||||
|
-- test with a not existing schema is in the search path
|
||||||
|
SET search_path TO not_existing_schema, "CiTuS.TeAeN";
|
||||||
|
ALTER TABLE "TeeNTabLE.1!?!" ADD COLUMN new_col INT;
|
||||||
|
|
||||||
|
-- test with a public schema is in the search path
|
||||||
|
SET search_path TO public, "CiTuS.TeAeN";
|
||||||
|
ALTER TABLE "TeeNTabLE.1!?!" DROP COLUMN new_col;
|
||||||
|
|
||||||
|
-- make sure that we handle transaction blocks properly
|
||||||
|
BEGIN;
|
||||||
|
SET search_path TO public, "CiTuS.TeAeN";
|
||||||
|
ALTER TABLE "TeeNTabLE.1!?!" ADD COLUMN new_col INT;
|
||||||
|
|
||||||
|
SET search_path TO mx_ddl_schema_1;
|
||||||
|
CREATE INDEX i55 ON table_1(value);
|
||||||
|
|
||||||
|
SET search_path TO mx_ddl_schema_1, public, "CiTuS.TeAeN";
|
||||||
|
ALTER TABLE "TeeNTabLE.1!?!" DROP COLUMN new_col;
|
||||||
|
DROP INDEX i55;
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- set the search_path to null
|
||||||
|
SET search_path TO '';
|
||||||
|
ALTER TABLE "CiTuS.TeAeN"."TeeNTabLE.1!?!" ADD COLUMN new_col INT;
|
||||||
|
|
||||||
|
-- set the search_path to not existing schema
|
||||||
|
SET search_path TO not_existing_schema;
|
||||||
|
ALTER TABLE "CiTuS.TeAeN"."TeeNTabLE.1!?!" DROP COLUMN new_col;
|
||||||
|
|
||||||
|
DROP SCHEMA mx_ddl_schema_1, mx_ddl_schema_2, "CiTuS.TeAeN" CASCADE;
|
||||||
|
|
Loading…
Reference in New Issue