From fddf9b3fcc5a6f744c5786d496f2de2987d9eb50 Mon Sep 17 00:00:00 2001 From: Burak Yucesoy Date: Thu, 27 Jul 2017 14:59:12 +0300 Subject: [PATCH] Add distributed partitioned table support distributed table creation With this PR, Citus starts to support all possible ways to create distributed partitioned tables. These are; - Distributing already created partitioning hierarchy - CREATE TABLE ... PARTITION OF a distributed_table - ALTER TABLE distributed_table ATTACH PARTITION non_distributed_table - ALTER TABLE distributed_table ATTACH PARTITION distributed_table We also support DETACHing partitions from partitioned tables and propogating TRUNCATE and DDL commands to distributed partitioned tables. This PR also refactors some parts of distributed table creation logic. --- .../commands/create_distributed_table.c | 111 +++++++- .../distributed/executor/multi_utility.c | 269 +++++++++++++++++- .../master/master_expire_table_cache.c | 3 +- .../distributed/master/master_repair_shards.c | 3 +- .../master/master_stage_protocol.c | 55 +++- .../distributed/relay/relay_event_utility.c | 10 +- .../utils/multi_partitioning_utils.c | 22 +- src/include/distributed/master_protocol.h | 1 + .../distributed/multi_partitioning_utils.h | 1 + .../expected/multi_reference_table.out | 2 +- .../multi_alter_table_statements.source | 6 +- 11 files changed, 451 insertions(+), 32 deletions(-) diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 0a783f49b..f601237de 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -38,6 +38,7 @@ #include "distributed/metadata_sync.h" #include "distributed/multi_copy.h" #include "distributed/multi_logical_planner.h" +#include "distributed/multi_partitioning_utils.h" #include "distributed/multi_utility.h" #include "distributed/pg_dist_colocation.h" #include "distributed/pg_dist_partition.h" @@ -262,7 +263,6 @@ create_reference_table(PG_FUNCTION_ARGS) errdetail("There are no active worker nodes."))); } - CreateDistributedTable(relationId, distributionColumn, DISTRIBUTE_BY_NONE, colocateWithTableName, viaDeprecatedAPI); @@ -277,7 +277,8 @@ create_reference_table(PG_FUNCTION_ARGS) * This functions contains all necessary logic to create distributed tables. It * perform necessary checks to ensure distributing the table is safe. If it is * safe to distribute the table, this function creates distributed table metadata, - * creates shards and copies local data to shards. + * creates shards and copies local data to shards. This function also handles + * partitioned tables by distributing its partitions as well. * * viaDeprecatedAPI boolean flag is not optimal way to implement this function, * but it helps reducing code duplication a lot. We hope to remove that flag one @@ -355,6 +356,21 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio CreateReferenceTableShard(relationId); } + /* if this table is partitioned table, distribute its partitions too */ + if (PartitionedTable(relationId)) + { + List *partitionList = PartitionList(relationId); + ListCell *partitionCell = NULL; + + foreach(partitionCell, partitionList) + { + Oid partitionRelationId = lfirst_oid(partitionCell); + CreateDistributedTable(partitionRelationId, distributionColumn, + distributionMethod, colocateWithTableName, + viaDeprecatedAPI); + } + } + /* copy over data for hash distributed and reference tables */ if (distributionMethod == DISTRIBUTE_BY_HASH || distributionMethod == DISTRIBUTE_BY_NONE) @@ -566,6 +582,7 @@ EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn, Relation relation = NULL; TupleDesc relationDesc = NULL; char *relationName = NULL; + Oid parentRelationId = InvalidOid; EnsureTableOwner(relationId); EnsureTableNotDistributed(relationId); @@ -626,6 +643,81 @@ EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn, } } + if (PartitionTable(relationId)) + { + parentRelationId = PartitionParentOid(relationId); + } + + /* partitions cannot be distributed if their parent is not distributed */ + if (PartitionTable(relationId) && !IsDistributedTable(parentRelationId)) + { + char *relationName = get_rel_name(relationId); + char *parentRelationName = get_rel_name(parentRelationId); + + ereport(ERROR, (errmsg("cannot distribute relation \"%s\" which is partition of " + "\"%s\"", relationName, parentRelationName), + errdetail("Citus does not support distributing partitions " + "if their parent is not distributed table."), + errhint("Distribute the partitioned table \"%s\" instead.", + parentRelationName))); + } + + /* + * These checks are mostly for partitioned tables not partitions because we prevent + * distributing partitions directly in the above check. However, partitions can still + * reach this point because, we call CreateDistributedTable for partitions if their + * parent table is distributed. + */ + if (PartitionedTable(relationId)) + { + /* we cannot distribute partitioned tables with master_create_distributed_table */ + if (viaDeprecatedAPI) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("distributing partitioned tables in only supported " + "with create_distributed_table UDF"))); + } + + /* distributing partitioned tables in only supported for hash-distribution */ + if (distributionMethod != DISTRIBUTE_BY_HASH) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("distributing partitioned tables in only supported " + "for hash-distributed tables"))); + } + + /* we currently don't support partitioned tables for replication factor > 1 */ + if (ShardReplicationFactor > 1) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("distributing partitioned tables with replication " + "factor greater than 1 is not supported"))); + } + + /* we currently don't support MX tables to be distributed partitioned table */ + if (replicationModel == REPLICATION_MODEL_STREAMING) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("distributing partitioned tables which uses " + "streaming replication is not supported"))); + } + + /* we don't support distributing tables with multi-level partitioning */ + if (PartitionTable(relationId)) + { + char *relationName = get_rel_name(relationId); + Oid parentRelationId = PartitionParentOid(relationId); + char *parentRelationName = get_rel_name(parentRelationId); + + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("distributing multi-level partitioned tables " + "is not supported"), + errdetail("Relation \"%s\" is partitioned table itself and " + "it is also partition of relation \"%s\".", + relationName, parentRelationName))); + } + } + ErrorIfUnsupportedConstraint(relation, distributionMethod, distributionColumn, colocationId); @@ -1015,7 +1107,9 @@ RegularTable(Oid relationId) /* * CopyLocalDataIntoShards copies data from the local table, which is hidden * after converting it to a distributed table, into the shards of the distributed - * table. + * table. For partitioned tables, this functions returns without copying the data + * because we call this function for both partitioned tables and its partitions. + * Returning early saves us from copying data to workers twice. * * This function uses CitusCopyDestReceiver to invoke the distributed COPY logic. * We cannot use a regular COPY here since that cannot read from a table. Instead @@ -1057,6 +1151,17 @@ CopyLocalDataIntoShards(Oid distributedRelationId) /* take an ExclusiveLock to block all operations except SELECT */ distributedRelation = heap_open(distributedRelationId, ExclusiveLock); + /* + * Skip copying from partitioned tables, we will copy the data from + * partition to partition's shards. + */ + if (PartitionedTable(distributedRelationId)) + { + heap_close(distributedRelation, NoLock); + + return; + } + /* * All writes have finished, make sure that we can see them by using the * latest snapshot. We use GetLatestSnapshot instead of diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index 94a13ff01..74a2a4ded 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -42,6 +42,7 @@ #include "distributed/metadata_sync.h" #include "distributed/multi_copy.h" #include "distributed/multi_join_order.h" +#include "distributed/multi_partitioning_utils.h" #include "distributed/multi_planner.h" #include "distributed/multi_router_executor.h" #include "distributed/multi_router_planner.h" @@ -108,6 +109,8 @@ static void VerifyTransmitStmt(CopyStmt *copyStatement); /* Local functions forward declarations for processing distributed table commands */ static Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustRunAsOwner); +static void ProcessCreateTableStmtPartitionOf(CreateStmt *createStatement); +static void ProcessAlterTableStmtAttachPartition(AlterTableStmt *alterTableStatement); static List * PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand); static List * PlanDropIndexStmt(DropStmt *dropIndexStatement, @@ -154,8 +157,8 @@ static void ShowNoticeIfNotUsing2PC(void); static List * DDLTaskList(Oid relationId, const char *commandString); static List * CreateIndexTaskList(Oid relationId, IndexStmt *indexStmt); static List * DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt); -static List * ForeignKeyTaskList(Oid leftRelationId, Oid rightRelationId, - const char *commandString); +static List * InterShardDDLTaskList(Oid leftRelationId, Oid rightRelationId, + const char *commandString); static void RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid oldRelOid, void *arg); static void CheckCopyPermissions(CopyStmt *copyStatement); @@ -467,6 +470,29 @@ multi_ProcessUtility(PlannedStmt *pstmt, params, dest, completionTag); #endif + + /* + * We only process CREATE TABLE ... PARTITION OF commands in the function below + * to handle the case when user creates a table as a partition of distributed table. + */ + if (IsA(parsetree, CreateStmt)) + { + CreateStmt *createStatement = (CreateStmt *) parsetree; + + ProcessCreateTableStmtPartitionOf(createStatement); + } + + /* + * We only process ALTER TABLE ... ATTACH PARTITION commands in the function below + * and distribute the partition if necessary. + */ + if (IsA(parsetree, AlterTableStmt)) + { + AlterTableStmt *alterTableStatement = (AlterTableStmt *) parsetree; + + ProcessAlterTableStmtAttachPartition(alterTableStatement); + } + /* don't run post-process code for local commands */ if (ddlJobs != NIL) { @@ -776,6 +802,135 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustR } +/* + * ProcessCreateTableStmtPartitionOf takes CreateStmt object as a parameter but + * it only processes CREATE TABLE ... PARTITION OF statements and it checks if + * user creates the table as a partition of a distributed table. In that case, + * it distributes partition as well. Since the table itself is a partition, + * CreateDistributedTable will attach it to its parent table automatically after + * distributing it. + * + * This function does nothing if PostgreSQL's version is less then 10 and given + * CreateStmt is not a CREATE TABLE ... PARTITION OF command. + */ +static void +ProcessCreateTableStmtPartitionOf(CreateStmt *createStatement) +{ +#if (PG_VERSION_NUM >= 100000) + if (createStatement->inhRelations != NIL && createStatement->partbound != NULL) + { + RangeVar *parentRelation = linitial(createStatement->inhRelations); + bool parentMissingOk = false; + Oid parentRelationId = RangeVarGetRelid(parentRelation, NoLock, + parentMissingOk); + + /* a partition can only inherit from single parent table */ + Assert(list_length(createStatement->inhRelations) == 1); + + Assert(parentRelationId != InvalidOid); + + /* + * If a partition is being created and if its parent is a distributed + * table, we will distribute this table as well. + */ + if (IsDistributedTable(parentRelationId)) + { + bool missingOk = false; + Oid relationId = RangeVarGetRelid(createStatement->relation, NoLock, + missingOk); + Var *parentDistributionColumn = DistPartitionKey(parentRelationId); + char parentDistributionMethod = DISTRIBUTE_BY_HASH; + char *parentRelationName = get_rel_name(parentRelationId); + bool viaDeprecatedAPI = false; + + CreateDistributedTable(relationId, parentDistributionColumn, + parentDistributionMethod, parentRelationName, + viaDeprecatedAPI); + } + } +#endif +} + + +/* + * ProcessAlterTableStmtAttachPartition takes AlterTableStmt object as parameter + * but it only processes into ALTER TABLE ... ATTACH PARTITION commands and + * distributes the partition if necessary. There are four cases to consider; + * + * Parent is not distributed, partition is not distributed: We do not need to + * do anything in this case. + * + * Parent is not distributed, partition is distributed: This can happen if + * user first distributes a table and tries to attach it to a non-distributed + * table. Non-distributed tables cannot have distributed partitions, thus we + * simply error out in this case. + * + * Parent is distributed, partition is not distributed: We should distribute + * the table and attach it to its parent in workers. CreateDistributedTable + * perform both of these operations. Thus, we will not propagate ALTER TABLE + * ... ATTACH PARTITION command to workers. + * + * Parent is distributed, partition is distributed: Partition is already + * distributed, we only need to attach it to its parent in workers. Attaching + * operation will be performed via propagating this ALTER TABLE ... ATTACH + * PARTITION command to workers. + * + * This function does nothing if PostgreSQL's version is less then 10 and given + * CreateStmt is not a ALTER TABLE ... ATTACH PARTITION OF command. + */ +static void +ProcessAlterTableStmtAttachPartition(AlterTableStmt *alterTableStatement) +{ +#if (PG_VERSION_NUM >= 100000) + List *commandList = alterTableStatement->cmds; + ListCell *commandCell = NULL; + + foreach(commandCell, commandList) + { + AlterTableCmd *alterTableCommand = (AlterTableCmd *) lfirst(commandCell); + + if (alterTableCommand->subtype == AT_AttachPartition) + { + Oid relationId = AlterTableLookupRelation(alterTableStatement, NoLock); + PartitionCmd *partitionCommand = (PartitionCmd *) alterTableCommand->def; + bool partitionMissingOk = false; + Oid partitionRelationId = RangeVarGetRelid(partitionCommand->name, NoLock, + partitionMissingOk); + + /* + * If user first distributes the table then tries to attach it to non + * distributed table, we error out. + */ + if (!IsDistributedTable(relationId) && + IsDistributedTable(partitionRelationId)) + { + char *parentRelationName = get_rel_name(partitionRelationId); + + ereport(ERROR, (errmsg("non-distributed tables cannot have " + "distributed partitions"), + errhint("Distribute the partitioned table \"%s\" " + "instead", parentRelationName))); + } + + /* if parent of this table is distributed, distribute this table too */ + if (IsDistributedTable(relationId) && + !IsDistributedTable(partitionRelationId)) + { + Var *distributionColumn = DistPartitionKey(relationId); + char distributionMethod = DISTRIBUTE_BY_HASH; + char *relationName = get_rel_name(relationId); + bool viaDeprecatedAPI = false; + + CreateDistributedTable(partitionRelationId, distributionColumn, + distributionMethod, relationName, + viaDeprecatedAPI); + } + } + } +#endif +} + + /* * PlanIndexStmt determines whether a given CREATE INDEX statement involves * a distributed table. If so (and if the statement does not use unsupported @@ -1041,6 +1196,42 @@ PlanAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCo constraint->skip_validation = true; } } +#if (PG_VERSION_NUM >= 100000) + else if (alterTableType == AT_AttachPartition) + { + PartitionCmd *partitionCommand = (PartitionCmd *) command->def; + + /* + * We only support ALTER TABLE ATTACH PARTITION, if it is only subcommand of + * ALTER TABLE. It was already checked in ErrorIfUnsupportedAlterTableStmt. + */ + Assert(list_length(commandList) <= 1); + + rightRelationId = RangeVarGetRelid(partitionCommand->name, NoLock, false); + + /* + * Do not generate tasks if relation is distributed and the partition + * is not distributed. Because, we'll manually convert the partition into + * distributed table and co-locate with its parent. + */ + if (!IsDistributedTable(rightRelationId)) + { + return NIL; + } + } + else if (alterTableType == AT_DetachPartition) + { + PartitionCmd *partitionCommand = (PartitionCmd *) command->def; + + /* + * We only support ALTER TABLE DETACH PARTITION, if it is only subcommand of + * ALTER TABLE. It was already checked in ErrorIfUnsupportedAlterTableStmt. + */ + Assert(list_length(commandList) <= 1); + + rightRelationId = RangeVarGetRelid(partitionCommand->name, NoLock, false); + } +#endif } ddlJob = palloc0(sizeof(DDLJob)); @@ -1051,8 +1242,8 @@ PlanAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCo if (rightRelationId) { /* if foreign key related, use specialized task list function ... */ - ddlJob->taskList = ForeignKeyTaskList(leftRelationId, rightRelationId, - alterTableCommand); + ddlJob->taskList = InterShardDDLTaskList(leftRelationId, rightRelationId, + alterTableCommand); } else { @@ -1759,6 +1950,54 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement) break; } +#if (PG_VERSION_NUM >= 100000) + case AT_AttachPartition: + { + Oid relationId = AlterTableLookupRelation(alterTableStatement, + NoLock); + PartitionCmd *partitionCommand = (PartitionCmd *) command->def; + bool missingOK = false; + Oid partitionRelationId = RangeVarGetRelid(partitionCommand->name, + NoLock, missingOK); + + /* we only allow partitioning commands if they are only subcommand */ + if (commandList->length > 1) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot execute ATTACH PARTITION command " + "with other subcommands"), + errhint("You can issue each subcommand " + "separately."))); + } + + if (IsDistributedTable(partitionRelationId) && + !TablesColocated(relationId, partitionRelationId)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("distributed tables cannot have " + "non-colocated distributed tables as a " + "partition "))); + } + + break; + } + + case AT_DetachPartition: + { + /* we only allow partitioning commands if they are only subcommand */ + if (commandList->length > 1) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot execute DETACH PARTITION command " + "with other subcommands"), + errhint("You can issue each subcommand " + "separately."))); + } + + break; + } + +#endif case AT_SetNotNull: case AT_DropConstraint: case AT_EnableTrigAll: @@ -1776,9 +2015,10 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("alter table command is currently unsupported"), - errdetail("Only ADD|DROP COLUMN, SET|DROP NOT NULL," - " SET|DROP DEFAULT, ADD|DROP CONSTRAINT and " - "TYPE subcommands are supported."))); + errdetail("Only ADD|DROP COLUMN, SET|DROP NOT NULL, " + "SET|DROP DEFAULT, ADD|DROP CONSTRAINT, " + "ATTACH|DETACH PARTITION and TYPE subcommands " + "are supported."))); } } } @@ -2786,16 +3026,17 @@ DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt) /* - * ForeignKeyTaskList builds a list of tasks to execute a foreign key command on a - * shards of given list of distributed table. + * InterShardDDLTaskList builds a list of tasks to execute a inter shard DDL command on a + * shards of given list of distributed table. At the moment this function is used to run + * foreign key and partitioning command on worker node. * - * leftRelationId is the relation id of actual distributed table which given foreign key - * command is applied. rightRelationId is the relation id of distributed table which - * foreign key refers to. + * leftRelationId is the relation id of actual distributed table which given command is + * applied. rightRelationId is the relation id of distributed table which given command + * refers to. */ static List * -ForeignKeyTaskList(Oid leftRelationId, Oid rightRelationId, - const char *commandString) +InterShardDDLTaskList(Oid leftRelationId, Oid rightRelationId, + const char *commandString) { List *taskList = NIL; diff --git a/src/backend/distributed/master/master_expire_table_cache.c b/src/backend/distributed/master/master_expire_table_cache.c index 3ef550891..4ddc31656 100644 --- a/src/backend/distributed/master/master_expire_table_cache.c +++ b/src/backend/distributed/master/master_expire_table_cache.c @@ -194,7 +194,8 @@ DropShardsFromWorker(WorkerNode *workerNode, Oid relationId, List *shardInterval else { ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("expire target is not a regular or foreign table"))); + errmsg("expire target is not a regular, foreign or partitioned " + "table"))); } connection = GetNodeConnection(connectionFlag, workerNode->workerName, diff --git a/src/backend/distributed/master/master_repair_shards.c b/src/backend/distributed/master/master_repair_shards.c index 1de50bdab..6348b1444 100644 --- a/src/backend/distributed/master/master_repair_shards.c +++ b/src/backend/distributed/master/master_repair_shards.c @@ -402,7 +402,8 @@ RecreateTableDDLCommandList(Oid relationId) else { ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("repair target is not a regular or foreign table"))); + errmsg("repair target is not a regular, foreign or partitioned " + "table"))); } dropCommandList = list_make1(dropCommand->data); diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index 423b5e551..6297b4da1 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -24,6 +24,9 @@ #include "commands/tablecmds.h" #include "catalog/indexing.h" #include "catalog/namespace.h" +#if (PG_VERSION_NUM >= 100000) +#include "catalog/partition.h" +#endif #include "distributed/colocation_utils.h" #include "distributed/connection_management.h" #include "distributed/multi_client_executor.h" @@ -31,6 +34,7 @@ #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/multi_join_order.h" +#include "distributed/multi_partitioning_utils.h" #include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_shard.h" #include "distributed/placement_connection.h" @@ -368,6 +372,7 @@ CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId, int placementsCreated = 0; int attemptNumber = 0; List *foreignConstraintCommandList = GetTableForeignConstraintCommands(relationId); + char *alterTableAttachPartitionCommand = NULL; bool includeSequenceDefaults = false; List *ddlCommandList = GetTableDDLEvents(relationId, includeSequenceDefaults); uint32 connectionFlag = FOR_DDL; @@ -402,7 +407,8 @@ CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId, } WorkerCreateShard(relationId, shardIndex, shardId, ddlCommandList, - foreignConstraintCommandList, connection); + foreignConstraintCommandList, alterTableAttachPartitionCommand, + connection); InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, shardState, shardSize, nodeGroupId); @@ -483,12 +489,20 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements, ListCell *connectionCell = NULL; ListCell *shardPlacementCell = NULL; int connectionFlags = FOR_DDL; + char *alterTableAttachPartitionCommand = NULL; if (useExclusiveConnection) { connectionFlags |= CONNECTION_PER_PLACEMENT; } + + if (PartitionTable(distributedRelationId)) + { + alterTableAttachPartitionCommand = + GenerateAlterTableAttachPartitionCommand(distributedRelationId); + } + BeginOrContinueCoordinatedTransaction(); foreach(shardPlacementCell, shardPlacements) @@ -517,7 +531,7 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements, WorkerCreateShard(distributedRelationId, shardIndex, shardId, ddlCommandList, foreignConstraintCommandList, - connection); + alterTableAttachPartitionCommand, connection); } /* @@ -540,7 +554,8 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements, */ void WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId, List *ddlCommandList, - List *foreignConstraintCommandList, MultiConnection *connection) + List *foreignConstraintCommandList, + char *alterTableAttachPartitionCommand, MultiConnection *connection) { Oid schemaId = get_rel_namespace(relationId); char *schemaName = get_namespace_name(schemaId); @@ -618,6 +633,40 @@ WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId, List *ddlComma ExecuteCriticalRemoteCommand(connection, applyForeignConstraintCommand->data); } + + /* + * If the shard is created for a partition, send the command to create the + * partitioning hierarcy on the shard. + */ + if (alterTableAttachPartitionCommand != NULL) + { + Oid parentRelationId = PartitionParentOid(relationId); + uint64 correspondingParentShardId = InvalidOid; + StringInfo applyAttachPartitionCommand = makeStringInfo(); + + Oid parentSchemaId = InvalidOid; + char *parentSchemaName = NULL; + char *escapedParentSchemaName = NULL; + char *escapedCommand = NULL; + + Assert(PartitionTable(relationId)); + + parentSchemaId = get_rel_namespace(parentRelationId); + parentSchemaName = get_namespace_name(parentSchemaId); + escapedParentSchemaName = quote_literal_cstr(parentSchemaName); + escapedCommand = quote_literal_cstr(alterTableAttachPartitionCommand); + + correspondingParentShardId = ColocatedShardIdInRelation(parentRelationId, + shardIndex); + + appendStringInfo(applyAttachPartitionCommand, + WORKER_APPLY_INTER_SHARD_DDL_COMMAND, correspondingParentShardId, + escapedParentSchemaName, shardId, escapedSchemaName, + escapedCommand); + + + ExecuteCriticalRemoteCommand(connection, applyAttachPartitionCommand->data); + } } diff --git a/src/backend/distributed/relay/relay_event_utility.c b/src/backend/distributed/relay/relay_event_utility.c index de2a75da6..b09e20df7 100644 --- a/src/backend/distributed/relay/relay_event_utility.c +++ b/src/backend/distributed/relay/relay_event_utility.c @@ -451,8 +451,8 @@ RelayEventExtendNamesForInterShardCommands(Node *parseTree, uint64 leftShardId, } } #if (PG_VERSION_NUM >= 100000) - else if (command->subtype == AT_AttachPartition || command->subtype == - AT_DetachPartition) + else if (command->subtype == AT_AttachPartition || + command->subtype == AT_DetachPartition) { PartitionCmd *partitionCommand = (PartitionCmd *) command->def; @@ -469,9 +469,8 @@ RelayEventExtendNamesForInterShardCommands(Node *parseTree, uint64 leftShardId, SetSchemaNameIfNotExist(relationSchemaName, rightShardSchemaName); /* - * We will not append shard id to referencing table name or - * constraint name. They will be handled when we drop into - * RelayEventExtendNames. + * We will not append shard id to left shard name. This will be + * handled when we drop into RelayEventExtendNames. */ AppendShardIdToName(referencedTableName, rightShardId); } @@ -622,6 +621,7 @@ AppendShardIdToName(char **name, uint64 shardId) { snprintf(extendedName, NAMEDATALEN, "%s%s", (*name), shardIdAndSeparator); } + /* * Otherwise, we need to truncate the name further to accommodate * a sufficient hash value. The resulting name will avoid collision diff --git a/src/backend/distributed/utils/multi_partitioning_utils.c b/src/backend/distributed/utils/multi_partitioning_utils.c index 23dcdde1e..867426c6b 100644 --- a/src/backend/distributed/utils/multi_partitioning_utils.c +++ b/src/backend/distributed/utils/multi_partitioning_utils.c @@ -17,7 +17,7 @@ #include "catalog/pg_inherits.h" #include "catalog/pg_inherits_fn.h" #include "distributed/citus_ruleutils.h" -#include +#include "distributed/multi_partitioning_utils.h" #include "lib/stringinfo.h" #include "nodes/pg_list.h" #include "utils/builtins.h" @@ -159,6 +159,26 @@ IsParentTable(Oid relationId) } +/* + * Wrapper around get_partition_parent + * + * Note: Because this function assumes that the relation whose OID is passed + * as an argument will have precisely one parent, it should only be called + * when it is known that the relation is a partition. + */ +Oid +PartitionParentOid(Oid partitionOid) +{ + Oid partitionParentOid = InvalidOid; + +#if (PG_VERSION_NUM >= 100000) + partitionParentOid = get_partition_parent(partitionOid); +#endif + + return partitionParentOid; +} + + /* * Takes a parent relation and returns Oid list of its partitions. The * function errors out if the given relation is not a parent. diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index bb45f2da8..905c5bbbd 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -125,6 +125,7 @@ extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, extern void CreateReferenceTableShard(Oid distributedTableId); extern void WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId, List *ddlCommandList, List *foreignConstraintCommandList, + char *alterTableAttachPartitionCommand, MultiConnection *connection); extern Oid ForeignConstraintGetReferencedTableId(char *queryString); extern void CheckHashPartitionedTable(Oid distributedTableId); diff --git a/src/include/distributed/multi_partitioning_utils.h b/src/include/distributed/multi_partitioning_utils.h index 4dc5f0616..4a169e421 100644 --- a/src/include/distributed/multi_partitioning_utils.h +++ b/src/include/distributed/multi_partitioning_utils.h @@ -15,6 +15,7 @@ extern bool PartitionedTable(Oid relationId); extern bool PartitionTable(Oid relationId); extern bool IsChildTable(Oid relationId); extern bool IsParentTable(Oid relationId); +extern Oid PartitionParentOid(Oid partitionOid); extern List * PartitionList(Oid parentRelationId); extern char * GenerateDetachPartitionCommand(Oid partitionTableId); extern char * GenerateAlterTableAttachPartitionCommand(Oid partitionTableId); diff --git a/src/test/regress/expected/multi_reference_table.out b/src/test/regress/expected/multi_reference_table.out index c83912132..bcf1c9574 100644 --- a/src/test/regress/expected/multi_reference_table.out +++ b/src/test/regress/expected/multi_reference_table.out @@ -1393,7 +1393,7 @@ ALTER TABLE reference_table_ddl RENAME TO reference_table_ddl_test; ERROR: renaming distributed tables is currently unsupported ALTER TABLE reference_table_ddl SET WITH OIDS; ERROR: alter table command is currently unsupported -DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT, ADD|DROP CONSTRAINT and TYPE subcommands are supported. +DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT, ADD|DROP CONSTRAINT, ATTACH|DETACH PARTITION and TYPE subcommands are supported. -- now test reference tables against some helper UDFs that Citus provides -- cannot delete / drop shards from a reference table SELECT master_apply_delete_command('DELETE FROM reference_table_ddl'); diff --git a/src/test/regress/output/multi_alter_table_statements.source b/src/test/regress/output/multi_alter_table_statements.source index 1f0fe5af2..561093627 100644 --- a/src/test/regress/output/multi_alter_table_statements.source +++ b/src/test/regress/output/multi_alter_table_statements.source @@ -332,7 +332,7 @@ SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.lineite ALTER TABLE lineitem_alter ADD COLUMN int_column3 INTEGER, ALTER COLUMN int_column1 SET STATISTICS 10; ERROR: alter table command is currently unsupported -DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT, ADD|DROP CONSTRAINT and TYPE subcommands are supported. +DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT, ADD|DROP CONSTRAINT, ATTACH|DETACH PARTITION and TYPE subcommands are supported. ALTER TABLE lineitem_alter DROP COLUMN int_column1, DROP COLUMN int_column2; SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.lineitem_alter'::regclass; Column | Type | Modifiers @@ -364,12 +364,12 @@ ERROR: cannot execute ALTER TABLE command involving partition column -- Verify that we error out on unsupported statement types ALTER TABLE lineitem_alter ALTER COLUMN l_orderkey SET STATISTICS 100; ERROR: alter table command is currently unsupported -DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT, ADD|DROP CONSTRAINT and TYPE subcommands are supported. +DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT, ADD|DROP CONSTRAINT, ATTACH|DETACH PARTITION and TYPE subcommands are supported. ALTER TABLE lineitem_alter DROP CONSTRAINT IF EXISTS non_existent_contraint; NOTICE: constraint "non_existent_contraint" of relation "lineitem_alter" does not exist, skipping ALTER TABLE lineitem_alter SET WITHOUT OIDS; ERROR: alter table command is currently unsupported -DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT, ADD|DROP CONSTRAINT and TYPE subcommands are supported. +DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT, ADD|DROP CONSTRAINT, ATTACH|DETACH PARTITION and TYPE subcommands are supported. -- Verify that we error out in case of postgres errors on supported statement -- types ALTER TABLE lineitem_alter ADD COLUMN new_column non_existent_type;