Fix DDL execution problem on MX when search_path is used

Make sure that the coordinator sends the commands when the search
path synchronised with the coordinator's search_path. This is only
important when Citus sends the commands that are directly relayed
to the worker nodes. For example, the deparsed DLL commands or
queries always adds schema qualifications to the queries. So, they
do not require this change.
pull/2335/head
Onder Kalaci 2018-08-09 11:46:04 +03:00
parent 537d80abdb
commit 85d418412d
3 changed files with 255 additions and 2 deletions

View File

@ -158,6 +158,8 @@ static bool IsIndexRenameStmt(RenameStmt *renameStmt);
static bool AlterInvolvesPartitionColumn(AlterTableStmt *alterTableStatement,
AlterTableCmd *command);
static void ExecuteDistributedDDLJob(DDLJob *ddlJob);
static char * SetSearchPathToCurrentSearchPathCommand(void);
static char * CurrentSearchPath(void);
static List * CreateIndexTaskList(Oid relationId, IndexStmt *indexStmt);
static List * DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt);
static List * InterShardDDLTaskList(Oid leftRelationId, Oid rightRelationId,
@ -3146,7 +3148,19 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
{
if (shouldSyncMetadata)
{
char *setSearchPathCommand = SetSearchPathToCurrentSearchPathCommand();
SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION);
/*
* Given that we're relaying the query to the worker nodes directly,
* we should set the search path exactly the same when necessary.
*/
if (setSearchPathCommand != NULL)
{
SendCommandToWorkers(WORKERS_WITH_METADATA, setSearchPathCommand);
}
SendCommandToWorkers(WORKERS_WITH_METADATA, (char *) ddlJob->commandString);
}
@ -3173,8 +3187,19 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
if (shouldSyncMetadata)
{
List *commandList = list_make2(DISABLE_DDL_PROPAGATION,
(char *) ddlJob->commandString);
List *commandList = list_make1(DISABLE_DDL_PROPAGATION);
char *setSearchPathCommand = SetSearchPathToCurrentSearchPathCommand();
/*
* Given that we're relaying the query to the worker nodes directly,
* we should set the search path exactly the same when necessary.
*/
if (setSearchPathCommand != NULL)
{
commandList = lappend(commandList, setSearchPathCommand);
}
commandList = lappend(commandList, (char *) ddlJob->commandString);
SendBareCommandListToWorkers(WORKERS_WITH_METADATA, commandList);
}
@ -3193,6 +3218,77 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
}
/*
* SetSearchPathToCurrentSearchPathCommand generates a command which can
* set the search path to the exact same search path that the issueing node
* has.
*
* If the current search path is null (or doesn't have any valid schemas),
* the function returns NULL.
*/
static char *
SetSearchPathToCurrentSearchPathCommand(void)
{
StringInfo setCommand = NULL;
char *currentSearchPath = CurrentSearchPath();
if (currentSearchPath == NULL)
{
return NULL;
}
setCommand = makeStringInfo();
appendStringInfo(setCommand, "SET search_path TO %s;", currentSearchPath);
return setCommand->data;
}
/*
* CurrentSearchPath is a C interface for calling current_schemas(bool) that
* PostgreSQL exports.
*
* CurrentSchemas returns all the schemas in the seach_path that are seperated
* with comma (,) sign. The returned string can be used to set the search_path.
*
* The function omits implicit schemas.
*
* The function returns NULL if there are no valid schemas in the search_path,
* mimicing current_schemas(false) function.
*/
static char *
CurrentSearchPath(void)
{
StringInfo currentSearchPath = makeStringInfo();
List *searchPathList = fetch_search_path(false);
ListCell *searchPathCell;
bool schemaAdded = false;
foreach(searchPathCell, searchPathList)
{
char *schemaName = get_namespace_name(lfirst_oid(searchPathCell));
/* watch out for deleted namespace */
if (schemaName)
{
if (schemaAdded)
{
appendStringInfoString(currentSearchPath, ",");
schemaAdded = false;
}
appendStringInfoString(currentSearchPath, quote_identifier(schemaName));
schemaAdded = true;
}
}
/* fetch_search_path() returns a palloc'd list that we should free now */
list_free(searchPathList);
return (currentSearchPath->len > 0 ? currentSearchPath->data : NULL);
}
/*
* DDLTaskList builds a list of tasks to execute a DDL command on a
* given list of shards.

View File

@ -373,3 +373,81 @@ WHERE
-- set task_executor back to real-time
SET citus.task_executor_type TO "real-time";
-- connect to the master and do some test
-- regarding DDL support on schemas where
-- the search_path is set
\c - - - :master_port
CREATE SCHEMA mx_ddl_schema_1;
CREATE SCHEMA mx_ddl_schema_2;
CREATE SCHEMA "CiTuS.TeAeN";
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
SET citus.replication_model TO 'streaming';
-- in the first test make sure that we handle DDLs
-- when search path is set
SET search_path TO mx_ddl_schema_1;
CREATE TABLE table_1 (key int PRIMARY KEY, value text);
SELECT create_distributed_table('table_1', 'key');
create_distributed_table
--------------------------
(1 row)
CREATE INDEX i1 ON table_1(value);
CREATE INDEX CONCURRENTLY i2 ON table_1(value);
-- now create a foriegn key on tables that are on seperate schemas
SET search_path TO mx_ddl_schema_1, mx_ddl_schema_2;
CREATE TABLE mx_ddl_schema_2.table_2 (key int PRIMARY KEY, value text);
SELECT create_distributed_table('mx_ddl_schema_2.table_2', 'key');
create_distributed_table
--------------------------
(1 row)
ALTER TABLE table_2 ADD CONSTRAINT test_constraint FOREIGN KEY (key) REFERENCES table_1(key);
-- we can also handle schema/table names with quotation
SET search_path TO "CiTuS.TeAeN";
CREATE TABLE "TeeNTabLE.1!?!"(id int, "TeNANt_Id" int);
SELECT create_distributed_table('"TeeNTabLE.1!?!"', 'id');
create_distributed_table
--------------------------
(1 row)
CREATE INDEX "MyTenantIndex" ON "CiTuS.TeAeN"."TeeNTabLE.1!?!"("TeNANt_Id");
SET search_path TO "CiTuS.TeAeN", mx_ddl_schema_1, mx_ddl_schema_2;
ALTER TABLE "TeeNTabLE.1!?!" ADD CONSTRAINT test_constraint_2 FOREIGN KEY (id) REFERENCES table_1(key);
ALTER TABLE "TeeNTabLE.1!?!" ADD COLUMN new_col INT;
-- same semantics with CREATE INDEX CONCURRENTLY such that
-- it uses a single connection to execute all the commands
SET citus.multi_shard_modify_mode TO 'sequential';
ALTER TABLE "TeeNTabLE.1!?!" DROP COLUMN new_col;
-- set it back to the default value
SET citus.multi_shard_modify_mode TO 'parallel';
-- test with a not existing schema is in the search path
SET search_path TO not_existing_schema, "CiTuS.TeAeN";
ALTER TABLE "TeeNTabLE.1!?!" ADD COLUMN new_col INT;
-- test with a public schema is in the search path
SET search_path TO public, "CiTuS.TeAeN";
ALTER TABLE "TeeNTabLE.1!?!" DROP COLUMN new_col;
-- make sure that we handle transaction blocks properly
BEGIN;
SET search_path TO public, "CiTuS.TeAeN";
ALTER TABLE "TeeNTabLE.1!?!" ADD COLUMN new_col INT;
SET search_path TO mx_ddl_schema_1;
CREATE INDEX i55 ON table_1(value);
SET search_path TO mx_ddl_schema_1, public, "CiTuS.TeAeN";
ALTER TABLE "TeeNTabLE.1!?!" DROP COLUMN new_col;
DROP INDEX i55;
COMMIT;
-- set the search_path to null
SET search_path TO '';
ALTER TABLE "CiTuS.TeAeN"."TeeNTabLE.1!?!" ADD COLUMN new_col INT;
-- set the search_path to not existing schema
SET search_path TO not_existing_schema;
ALTER TABLE "CiTuS.TeAeN"."TeeNTabLE.1!?!" DROP COLUMN new_col;
DROP SCHEMA mx_ddl_schema_1, mx_ddl_schema_2, "CiTuS.TeAeN" CASCADE;
NOTICE: drop cascades to 3 other objects
DETAIL: drop cascades to table "CiTuS.TeAeN"."TeeNTabLE.1!?!"
drop cascades to table mx_ddl_schema_2.table_2
drop cascades to table mx_ddl_schema_1.table_1

View File

@ -220,3 +220,82 @@ WHERE
-- set task_executor back to real-time
SET citus.task_executor_type TO "real-time";
-- connect to the master and do some test
-- regarding DDL support on schemas where
-- the search_path is set
\c - - - :master_port
CREATE SCHEMA mx_ddl_schema_1;
CREATE SCHEMA mx_ddl_schema_2;
CREATE SCHEMA "CiTuS.TeAeN";
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
SET citus.replication_model TO 'streaming';
-- in the first test make sure that we handle DDLs
-- when search path is set
SET search_path TO mx_ddl_schema_1;
CREATE TABLE table_1 (key int PRIMARY KEY, value text);
SELECT create_distributed_table('table_1', 'key');
CREATE INDEX i1 ON table_1(value);
CREATE INDEX CONCURRENTLY i2 ON table_1(value);
-- now create a foriegn key on tables that are on seperate schemas
SET search_path TO mx_ddl_schema_1, mx_ddl_schema_2;
CREATE TABLE mx_ddl_schema_2.table_2 (key int PRIMARY KEY, value text);
SELECT create_distributed_table('mx_ddl_schema_2.table_2', 'key');
ALTER TABLE table_2 ADD CONSTRAINT test_constraint FOREIGN KEY (key) REFERENCES table_1(key);
-- we can also handle schema/table names with quotation
SET search_path TO "CiTuS.TeAeN";
CREATE TABLE "TeeNTabLE.1!?!"(id int, "TeNANt_Id" int);
SELECT create_distributed_table('"TeeNTabLE.1!?!"', 'id');
CREATE INDEX "MyTenantIndex" ON "CiTuS.TeAeN"."TeeNTabLE.1!?!"("TeNANt_Id");
SET search_path TO "CiTuS.TeAeN", mx_ddl_schema_1, mx_ddl_schema_2;
ALTER TABLE "TeeNTabLE.1!?!" ADD CONSTRAINT test_constraint_2 FOREIGN KEY (id) REFERENCES table_1(key);
ALTER TABLE "TeeNTabLE.1!?!" ADD COLUMN new_col INT;
-- same semantics with CREATE INDEX CONCURRENTLY such that
-- it uses a single connection to execute all the commands
SET citus.multi_shard_modify_mode TO 'sequential';
ALTER TABLE "TeeNTabLE.1!?!" DROP COLUMN new_col;
-- set it back to the default value
SET citus.multi_shard_modify_mode TO 'parallel';
-- test with a not existing schema is in the search path
SET search_path TO not_existing_schema, "CiTuS.TeAeN";
ALTER TABLE "TeeNTabLE.1!?!" ADD COLUMN new_col INT;
-- test with a public schema is in the search path
SET search_path TO public, "CiTuS.TeAeN";
ALTER TABLE "TeeNTabLE.1!?!" DROP COLUMN new_col;
-- make sure that we handle transaction blocks properly
BEGIN;
SET search_path TO public, "CiTuS.TeAeN";
ALTER TABLE "TeeNTabLE.1!?!" ADD COLUMN new_col INT;
SET search_path TO mx_ddl_schema_1;
CREATE INDEX i55 ON table_1(value);
SET search_path TO mx_ddl_schema_1, public, "CiTuS.TeAeN";
ALTER TABLE "TeeNTabLE.1!?!" DROP COLUMN new_col;
DROP INDEX i55;
COMMIT;
-- set the search_path to null
SET search_path TO '';
ALTER TABLE "CiTuS.TeAeN"."TeeNTabLE.1!?!" ADD COLUMN new_col INT;
-- set the search_path to not existing schema
SET search_path TO not_existing_schema;
ALTER TABLE "CiTuS.TeAeN"."TeeNTabLE.1!?!" DROP COLUMN new_col;
DROP SCHEMA mx_ddl_schema_1, mx_ddl_schema_2, "CiTuS.TeAeN" CASCADE;