mirror of https://github.com/citusdata/citus.git
Distributes partition-to-be table before ProcessUtility (#5191)
* Skip ALTER TABLE constraint checks while planning * Revert previous commit's solution, keep tests * Distribute partition-to-be table before ProcessUtility * Acquire locks in PreprocessAlterTableStmtAttachPartitionpull/5172/head
parent
889a2731cb
commit
4fb05efabb
|
@ -372,7 +372,7 @@ PostprocessCreateTableStmtPartitionOf(CreateStmt *createStatement, const
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* PostprocessAlterTableStmtAttachPartition takes AlterTableStmt object as
|
* PreprocessAlterTableStmtAttachPartition takes AlterTableStmt object as
|
||||||
* parameter but it only processes into ALTER TABLE ... ATTACH PARTITION
|
* parameter but it only processes into ALTER TABLE ... ATTACH PARTITION
|
||||||
* commands and distributes the partition if necessary. There are four cases
|
* commands and distributes the partition if necessary. There are four cases
|
||||||
* to consider;
|
* to consider;
|
||||||
|
@ -399,8 +399,8 @@ PostprocessCreateTableStmtPartitionOf(CreateStmt *createStatement, const
|
||||||
* ATTACH PARTITION OF command.
|
* ATTACH PARTITION OF command.
|
||||||
*/
|
*/
|
||||||
List *
|
List *
|
||||||
PostprocessAlterTableStmtAttachPartition(AlterTableStmt *alterTableStatement,
|
PreprocessAlterTableStmtAttachPartition(AlterTableStmt *alterTableStatement,
|
||||||
const char *queryString)
|
const char *queryString)
|
||||||
{
|
{
|
||||||
List *commandList = alterTableStatement->cmds;
|
List *commandList = alterTableStatement->cmds;
|
||||||
AlterTableCmd *alterTableCommand = NULL;
|
AlterTableCmd *alterTableCommand = NULL;
|
||||||
|
@ -408,10 +408,15 @@ PostprocessAlterTableStmtAttachPartition(AlterTableStmt *alterTableStatement,
|
||||||
{
|
{
|
||||||
if (alterTableCommand->subtype == AT_AttachPartition)
|
if (alterTableCommand->subtype == AT_AttachPartition)
|
||||||
{
|
{
|
||||||
Oid relationId = AlterTableLookupRelation(alterTableStatement, NoLock);
|
/*
|
||||||
|
* We acquire the lock on the parent and child as we are in the pre-process
|
||||||
|
* and want to ensure we acquire the locks in the same order with Postgres
|
||||||
|
*/
|
||||||
|
LOCKMODE lockmode = AlterTableGetLockLevel(alterTableStatement->cmds);
|
||||||
|
Oid relationId = AlterTableLookupRelation(alterTableStatement, lockmode);
|
||||||
PartitionCmd *partitionCommand = (PartitionCmd *) alterTableCommand->def;
|
PartitionCmd *partitionCommand = (PartitionCmd *) alterTableCommand->def;
|
||||||
bool partitionMissingOk = false;
|
bool partitionMissingOk = false;
|
||||||
Oid partitionRelationId = RangeVarGetRelid(partitionCommand->name, NoLock,
|
Oid partitionRelationId = RangeVarGetRelid(partitionCommand->name, lockmode,
|
||||||
partitionMissingOk);
|
partitionMissingOk);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -434,6 +439,13 @@ PostprocessAlterTableStmtAttachPartition(AlterTableStmt *alterTableStatement,
|
||||||
!IsCitusTable(partitionRelationId))
|
!IsCitusTable(partitionRelationId))
|
||||||
{
|
{
|
||||||
Var *distributionColumn = DistPartitionKeyOrError(relationId);
|
Var *distributionColumn = DistPartitionKeyOrError(relationId);
|
||||||
|
char *distributionColumnName = ColumnToColumnName(relationId,
|
||||||
|
nodeToString(
|
||||||
|
distributionColumn));
|
||||||
|
distributionColumn = FindColumnWithNameOnTargetRelation(relationId,
|
||||||
|
distributionColumnName,
|
||||||
|
partitionRelationId);
|
||||||
|
|
||||||
char distributionMethod = DISTRIBUTE_BY_HASH;
|
char distributionMethod = DISTRIBUTE_BY_HASH;
|
||||||
char *parentRelationName = generate_qualified_relation_name(relationId);
|
char *parentRelationName = generate_qualified_relation_name(relationId);
|
||||||
bool viaDeprecatedAPI = false;
|
bool viaDeprecatedAPI = false;
|
||||||
|
|
|
@ -443,6 +443,17 @@ ProcessUtilityInternal(PlannedStmt *pstmt,
|
||||||
PreprocessTruncateStatement((TruncateStmt *) parsetree);
|
PreprocessTruncateStatement((TruncateStmt *) parsetree);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We only process ALTER TABLE ... ATTACH PARTITION commands in the function below
|
||||||
|
* and distribute the partition if necessary.
|
||||||
|
*/
|
||||||
|
if (IsA(parsetree, AlterTableStmt))
|
||||||
|
{
|
||||||
|
AlterTableStmt *alterTableStatement = (AlterTableStmt *) parsetree;
|
||||||
|
|
||||||
|
PreprocessAlterTableStmtAttachPartition(alterTableStatement, queryString);
|
||||||
|
}
|
||||||
|
|
||||||
/* only generate worker DDLJobs if propagation is enabled */
|
/* only generate worker DDLJobs if propagation is enabled */
|
||||||
const DistributeObjectOps *ops = NULL;
|
const DistributeObjectOps *ops = NULL;
|
||||||
if (EnableDDLPropagation)
|
if (EnableDDLPropagation)
|
||||||
|
@ -611,17 +622,6 @@ ProcessUtilityInternal(PlannedStmt *pstmt,
|
||||||
PostprocessCreateTableStmt(createStatement, queryString);
|
PostprocessCreateTableStmt(createStatement, queryString);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* We only process ALTER TABLE ... ATTACH PARTITION commands in the function below
|
|
||||||
* and distribute the partition if necessary.
|
|
||||||
*/
|
|
||||||
if (IsA(parsetree, AlterTableStmt))
|
|
||||||
{
|
|
||||||
AlterTableStmt *alterTableStatement = (AlterTableStmt *) parsetree;
|
|
||||||
|
|
||||||
PostprocessAlterTableStmtAttachPartition(alterTableStatement, queryString);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* after local command has completed, finish by executing worker DDLJobs, if any */
|
/* after local command has completed, finish by executing worker DDLJobs, if any */
|
||||||
if (ddlJobs != NIL)
|
if (ddlJobs != NIL)
|
||||||
{
|
{
|
||||||
|
|
|
@ -399,9 +399,8 @@ extern List * PreprocessDropTableStmt(Node *stmt, const char *queryString,
|
||||||
extern void PostprocessCreateTableStmt(CreateStmt *createStatement,
|
extern void PostprocessCreateTableStmt(CreateStmt *createStatement,
|
||||||
const char *queryString);
|
const char *queryString);
|
||||||
extern bool ShouldEnableLocalReferenceForeignKeys(void);
|
extern bool ShouldEnableLocalReferenceForeignKeys(void);
|
||||||
extern List * PostprocessAlterTableStmtAttachPartition(
|
extern List * PreprocessAlterTableStmtAttachPartition(AlterTableStmt *alterTableStatement,
|
||||||
AlterTableStmt *alterTableStatement,
|
const char *queryString);
|
||||||
const char *queryString);
|
|
||||||
extern List * PostprocessAlterTableSchemaStmt(Node *node, const char *queryString);
|
extern List * PostprocessAlterTableSchemaStmt(Node *node, const char *queryString);
|
||||||
extern List * PreprocessAlterTableStmt(Node *node, const char *alterTableCommand,
|
extern List * PreprocessAlterTableStmt(Node *node, const char *alterTableCommand,
|
||||||
ProcessUtilityContext processUtilityContext);
|
ProcessUtilityContext processUtilityContext);
|
||||||
|
|
|
@ -2210,8 +2210,42 @@ SELECT worker_partitioned_relation_total_size(oid) FROM pg_class WHERE relname L
|
||||||
|
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
DROP TABLE "events.Energy Added";
|
DROP TABLE "events.Energy Added";
|
||||||
|
-- test we skip the foreign key validation query on coordinator
|
||||||
|
-- that happens when attaching a non-distributed partition to a distributed-partitioned table
|
||||||
|
-- with a foreign key to another distributed table
|
||||||
|
SET search_path = partitioning_schema;
|
||||||
|
SET citus.shard_replication_factor = 1;
|
||||||
|
CREATE TABLE another_distributed_table (x int primary key, y int);
|
||||||
|
SELECT create_distributed_table('another_distributed_table','x');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE distributed_parent_table (
|
||||||
|
event_id serial NOT NULL REFERENCES another_distributed_table (x),
|
||||||
|
event_time timestamptz NOT NULL DEFAULT now())
|
||||||
|
PARTITION BY RANGE (event_time);
|
||||||
|
SELECT create_distributed_table('distributed_parent_table', 'event_id');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE non_distributed_child_1 (event_id int NOT NULL, event_time timestamptz NOT NULL DEFAULT now());
|
||||||
|
ALTER TABLE distributed_parent_table ATTACH PARTITION non_distributed_child_1 FOR VALUES FROM ('2021-06-30') TO ('2021-07-01');
|
||||||
|
-- check DEFAULT partition behaves as expected
|
||||||
|
CREATE TABLE non_distributed_child_2 (event_id int NOT NULL, event_time timestamptz NOT NULL DEFAULT now());
|
||||||
|
ALTER TABLE distributed_parent_table ATTACH PARTITION non_distributed_child_2 DEFAULT;
|
||||||
|
-- check adding another partition when default partition exists
|
||||||
|
CREATE TABLE non_distributed_child_3 (event_id int NOT NULL, event_time timestamptz NOT NULL DEFAULT now());
|
||||||
|
ALTER TABLE distributed_parent_table ATTACH PARTITION non_distributed_child_3 FOR VALUES FROM ('2021-07-30') TO ('2021-08-01');
|
||||||
DROP SCHEMA partitioning_schema CASCADE;
|
DROP SCHEMA partitioning_schema CASCADE;
|
||||||
NOTICE: drop cascades to table partitioning_schema."schema-test"
|
NOTICE: drop cascades to 3 other objects
|
||||||
|
DETAIL: drop cascades to table "schema-test"
|
||||||
|
drop cascades to table another_distributed_table
|
||||||
|
drop cascades to table distributed_parent_table
|
||||||
|
RESET search_path;
|
||||||
DROP TABLE IF EXISTS
|
DROP TABLE IF EXISTS
|
||||||
partitioning_hash_test,
|
partitioning_hash_test,
|
||||||
partitioning_hash_join_test,
|
partitioning_hash_join_test,
|
||||||
|
|
|
@ -1303,7 +1303,29 @@ SELECT worker_partitioned_relation_total_size(oid) FROM pg_class WHERE relname L
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
DROP TABLE "events.Energy Added";
|
DROP TABLE "events.Energy Added";
|
||||||
|
|
||||||
|
-- test we skip the foreign key validation query on coordinator
|
||||||
|
-- that happens when attaching a non-distributed partition to a distributed-partitioned table
|
||||||
|
-- with a foreign key to another distributed table
|
||||||
|
SET search_path = partitioning_schema;
|
||||||
|
SET citus.shard_replication_factor = 1;
|
||||||
|
CREATE TABLE another_distributed_table (x int primary key, y int);
|
||||||
|
SELECT create_distributed_table('another_distributed_table','x');
|
||||||
|
CREATE TABLE distributed_parent_table (
|
||||||
|
event_id serial NOT NULL REFERENCES another_distributed_table (x),
|
||||||
|
event_time timestamptz NOT NULL DEFAULT now())
|
||||||
|
PARTITION BY RANGE (event_time);
|
||||||
|
SELECT create_distributed_table('distributed_parent_table', 'event_id');
|
||||||
|
CREATE TABLE non_distributed_child_1 (event_id int NOT NULL, event_time timestamptz NOT NULL DEFAULT now());
|
||||||
|
ALTER TABLE distributed_parent_table ATTACH PARTITION non_distributed_child_1 FOR VALUES FROM ('2021-06-30') TO ('2021-07-01');
|
||||||
|
-- check DEFAULT partition behaves as expected
|
||||||
|
CREATE TABLE non_distributed_child_2 (event_id int NOT NULL, event_time timestamptz NOT NULL DEFAULT now());
|
||||||
|
ALTER TABLE distributed_parent_table ATTACH PARTITION non_distributed_child_2 DEFAULT;
|
||||||
|
-- check adding another partition when default partition exists
|
||||||
|
CREATE TABLE non_distributed_child_3 (event_id int NOT NULL, event_time timestamptz NOT NULL DEFAULT now());
|
||||||
|
ALTER TABLE distributed_parent_table ATTACH PARTITION non_distributed_child_3 FOR VALUES FROM ('2021-07-30') TO ('2021-08-01');
|
||||||
|
|
||||||
DROP SCHEMA partitioning_schema CASCADE;
|
DROP SCHEMA partitioning_schema CASCADE;
|
||||||
|
RESET search_path;
|
||||||
DROP TABLE IF EXISTS
|
DROP TABLE IF EXISTS
|
||||||
partitioning_hash_test,
|
partitioning_hash_test,
|
||||||
partitioning_hash_join_test,
|
partitioning_hash_join_test,
|
||||||
|
|
Loading…
Reference in New Issue