diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index a97042bda..673108c3f 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -29,6 +29,7 @@ #include "commands/defrem.h" #include "commands/extension.h" #include "commands/trigger.h" +#include "distributed/citus_ruleutils.h" #include "distributed/colocation_utils.h" #include "distributed/distribution_column.h" #include "distributed/master_metadata_utility.h" @@ -38,6 +39,7 @@ #include "distributed/multi_copy.h" #include "distributed/multi_logical_planner.h" #include "distributed/multi_utility.h" +#include "distributed/multi_partitioning_utils.h" #include "distributed/pg_dist_colocation.h" #include "distributed/pg_dist_partition.h" #include "distributed/reference_table_utils.h" @@ -76,11 +78,9 @@ static char LookupDistributionMethod(Oid distributionMethodOid); static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId, int16 supportFunctionNumber); static bool LocalTableEmpty(Oid tableId); -static void CreateHashDistributedTable(Oid relationId, char *distributionColumnName, - char *colocateWithTableName, - int shardCount, int replicationFactor); static Oid ColumnType(Oid relationId, char *columnName); -static void CopyLocalDataIntoShards(Oid relationId); +static void CopyLocalDataIntoShards(Oid destinationDistributedRelationId, List * + sourceLocalRelationList); static List * TupleDescColumnNameList(TupleDesc tupleDescriptor); #if (PG_VERSION_NUM >= 100000) static bool RelationUsesIdentityColumns(TupleDesc relationDesc); @@ -280,7 +280,7 @@ CreateReferenceTable(Oid relationId) /* copy over data for regular relations */ if (relationKind == RELKIND_RELATION) { - CopyLocalDataIntoShards(relationId); + CopyLocalDataIntoShards(relationId, list_make1_oid(relationId)); } } @@ -339,9 +339,9 @@ ConvertToDistributedTable(Oid relationId, char *distributionColumnName, "(OIDS) option in their definitions."))); } - /* verify target relation is either regular or foreign table */ relationKind = relation->rd_rel->relkind; - if (relationKind != RELKIND_RELATION && relationKind != RELKIND_FOREIGN_TABLE) + + if (!SupportedRelationKind(relation)) { ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot distribute relation: %s", @@ -351,13 +351,6 @@ ConvertToDistributedTable(Oid relationId, char *distributionColumnName, } #if (PG_VERSION_NUM >= 100000) - if (relation->rd_rel->relispartition) - { - ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("cannot distribute relation: %s", relationName), - errdetail("Distributing partition tables is unsupported."))); - } - if (RelationUsesIdentityColumns(relationDesc)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), @@ -367,6 +360,20 @@ ConvertToDistributedTable(Oid relationId, char *distributionColumnName, } #endif + /* partitions cannot be distributed unless thier parent is already partitioned */ + if (PartitionTable(relationId) && !IsDistributedTable(PartitionParentOid(relationId))) + { + char *parentRelationName = get_rel_name(PartitionParentOid(relationId)); + char *relationName = get_rel_name(relationId); + + ereport(ERROR, (errmsg("cannot distributed relation \"%s\" which is partition" + " of \"%s\"", relationName, parentRelationName), + errdetail("Citus does not support partitioning among local " + "tables and distributed tables"), + errhint("First distribute the partitioned table \"%s\"", + parentRelationName))); + } + /* check that table is empty if that is required */ if (requireEmpty && !LocalTableEmpty(relationId)) { @@ -608,7 +615,7 @@ CreateTruncateTrigger(Oid relationId) /* * CreateHashDistributedTable creates a hash distributed table. */ -static void +void CreateHashDistributedTable(Oid relationId, char *distributionColumnName, char *colocateWithTableName, int shardCount, int replicationFactor) @@ -664,7 +671,11 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName, /* relax empty table requirement for regular (non-foreign) tables */ relationKind = get_rel_relkind(relationId); +#if (PG_VERSION_NUM >= 100000) + if (relationKind == RELKIND_RELATION || relationKind == RELKIND_PARTITIONED_TABLE) +#else if (relationKind == RELKIND_RELATION) +#endif { requireEmpty = false; } @@ -688,10 +699,53 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName, CreateShardsWithRoundRobinPolicy(relationId, shardCount, replicationFactor); } - /* copy over data for regular relations */ - if (relationKind == RELKIND_RELATION) + /* + * Copy over data for regular relations. Note that here we skip the partitions + * and copy them via the partitioned table below. + */ + if (relationKind == RELKIND_RELATION && !PartitionTable(relationId)) { - CopyLocalDataIntoShards(relationId); + CopyLocalDataIntoShards(relationId, list_make1_oid(relationId)); + } + + /* we also need to check for partitioned tables */ + if (PartitionedTable(relationId)) + { + List *partitionList = PartitionList(relationId); + char *parentRelationName = get_rel_name(relationId); + ListCell *partitionCell = NULL; + + /* partitioned distributed tables are only supported with replication */ + if (replicationFactor != 1) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("cannot distribute relation \"%s\"", + parentRelationName), + errdetail("Relation \"%s\" has partitions and " + "distributing partitioned tables " + "with shard replication factor greater " + "than 1 is not supported.", parentRelationName))); + } + + /* + * For each partition, create a co-located hash distributed table. Note that + * we pick the parent tables distribution key as the distribution key for + * the partitions. + */ + foreach(partitionCell, partitionList) + { + Oid partitionOid = lfirst_oid(partitionCell); + + CreateHashDistributedTable(partitionOid, distributionColumnName, + parentRelationName, shardCount, + replicationFactor); + } + + /* + * After converting all the partition tables into distributed tables, now lets + * copy the existing data from the local partitions. + */ + CopyLocalDataIntoShards(relationId, partitionList); } heap_close(pgDistColocation, NoLock); @@ -742,7 +796,7 @@ EnsureReplicationSettings(Oid relationId, char replicationModel) /* - * CopyLocalDataIntoShards copies data from the local table, which is hidden + * CopyLocalDataIntoShards copies data from list of local tables, which are hidden * after converting it to a distributed table, into the shards of the distributed * table. * @@ -758,33 +812,68 @@ EnsureReplicationSettings(Oid relationId, char replicationModel) * want to read from the local table. To keep it simple, we perform a heap scan * directly on the table. * - * Any writes on the table that are started during this operation will be handled - * as distributed queries once the current transaction commits. SELECTs will - * continue to read from the local table until the current transaction commits, - * after which new SELECTs will be handled as distributed queries. + * Any writes on both the source tables and target table that are started + * during this operation will be handled as distributed queries once the + * current transaction commits. SELECTs will continue to read from the + * local table until the current transaction commits, after which new + * SELECTs will be handled as distributed queries. * * After copying local data into the distributed table, the local data remains * in place and should be truncated at a later time. + * + * Note that it is allowed that destinationDistributedRelationId also appears on + * sourceOidList. */ static void -CopyLocalDataIntoShards(Oid distributedRelationId) +CopyLocalDataIntoShards(Oid destinationDistributedRelationId, List *sourceOidList) { DestReceiver *copyDest = NULL; List *columnNameList = NIL; Relation distributedRelation = NULL; + List *sourceRelationList = NULL; TupleDesc tupleDescriptor = NULL; bool stopOnFailure = true; EState *estate = NULL; - HeapScanDesc scan = NULL; HeapTuple tuple = NULL; ExprContext *econtext = NULL; MemoryContext oldContext = NULL; TupleTableSlot *slot = NULL; uint64 rowsCopied = 0; + ListCell *sourceOidCell = NULL; + ListCell *sourceRelationCell = NULL; + /* take an ExclusiveLock to block all operations except SELECT */ - distributedRelation = heap_open(distributedRelationId, ExclusiveLock); + distributedRelation = heap_open(destinationDistributedRelationId, ExclusiveLock); + + /* take ExclusiveLock on all source relations as well */ + foreach(sourceOidCell, sourceOidList) + { + Oid sourceRelationId = lfirst_oid(sourceOidCell); + Relation localRelation = heap_open(sourceRelationId, ExclusiveLock); + + /* + * This check currently cannot be exercised by any code path. However, + * it should be here as a precaution in case the support is expanded. + */ + if (list_length(sourceOidList) > 1 && + PartitionParentOid(sourceRelationId) != destinationDistributedRelationId) + { + NameData localRelationNameData = localRelation->rd_rel->relname; + char *localRelationName = pstrdup(NameStr(localRelationNameData)); + + NameData distributedRelationNameData = localRelation->rd_rel->relname; + char *distributedRelationName = pstrdup(NameStr(distributedRelationNameData)); + + ereport(ERROR, (errmsg("Local relation \"%s\" cannot be copied into " + "distributed relation \"%s\"", localRelationName, + distributedRelationName), + errdetail("Relations do not have partitioning hierarcy"))); + } + + sourceRelationList = lappend(sourceRelationList, localRelation); + } /* * All writes have finished, make sure that we can see them by using the @@ -808,41 +897,59 @@ CopyLocalDataIntoShards(Oid distributedRelationId) econtext->ecxt_scantuple = slot; copyDest = - (DestReceiver *) CreateCitusCopyDestReceiver(distributedRelationId, + (DestReceiver *) CreateCitusCopyDestReceiver(destinationDistributedRelationId, columnNameList, estate, stopOnFailure); /* initialise state for writing to shards, we'll open connections on demand */ copyDest->rStartup(copyDest, 0, tupleDescriptor); - /* begin reading from local table */ - scan = heap_beginscan(distributedRelation, GetActiveSnapshot(), 0, NULL); - - oldContext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - - while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + /* iterate on the local relations, copy each into the destination relation */ + foreach(sourceRelationCell, sourceRelationList) { - /* materialize tuple and send it to a shard */ - ExecStoreTuple(tuple, slot, InvalidBuffer, false); - copyDest->receiveSlot(slot, copyDest); + Relation localRelation = (Relation) lfirst(sourceRelationCell); + bool printNoticeMessage = true; - /* clear tuple memory */ - ResetPerTupleExprContext(estate); + /* begin reading from local table */ + HeapScanDesc scan = heap_beginscan(localRelation, GetActiveSnapshot(), 0, NULL); - /* make sure we roll back on cancellation */ - CHECK_FOR_INTERRUPTS(); + oldContext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - if (rowsCopied == 0) + while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) { - ereport(NOTICE, (errmsg("Copying data from local table..."))); + /* materialize tuple and send it to a shard */ + ExecStoreTuple(tuple, slot, InvalidBuffer, false); + copyDest->receiveSlot(slot, copyDest); + + /* clear tuple memory */ + ResetPerTupleExprContext(estate); + + /* make sure we roll back on cancellation */ + CHECK_FOR_INTERRUPTS(); + + if (printNoticeMessage) + { + /* we only want to print this once per relation */ + printNoticeMessage = false; + + ereport(NOTICE, (errmsg("Copying data from local table..."))); + } + + rowsCopied++; + + if (rowsCopied % 1000000 == 0) + { + ereport(DEBUG1, (errmsg("Copied %ld rows", rowsCopied))); + } } - rowsCopied++; + MemoryContextSwitchTo(oldContext); - if (rowsCopied % 1000000 == 0) - { - ereport(DEBUG1, (errmsg("Copied %ld rows", rowsCopied))); - } + /* finish reading from the local table */ + heap_endscan(scan); + + /* keep the lock */ + heap_close(localRelation, NoLock); } if (rowsCopied % 1000000 != 0) @@ -850,11 +957,6 @@ CopyLocalDataIntoShards(Oid distributedRelationId) ereport(DEBUG1, (errmsg("Copied %ld rows", rowsCopied))); } - MemoryContextSwitchTo(oldContext); - - /* finish reading from the local table */ - heap_endscan(scan); - /* finish writing into the shards */ copyDest->rShutdown(copyDest); diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index 30390dc06..6941b5f68 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -42,6 +42,7 @@ #include "distributed/metadata_sync.h" #include "distributed/multi_copy.h" #include "distributed/multi_join_order.h" +#include "distributed/multi_partitioning_utils.h" #include "distributed/multi_planner.h" #include "distributed/multi_router_executor.h" #include "distributed/multi_router_planner.h" @@ -512,6 +513,80 @@ multi_ProcessUtility(PlannedStmt *pstmt, } } +#if (PG_VERSION_NUM >= 100000) + if (IsA(parsetree, CreateStmt)) + { + CreateStmt *createStatement = (CreateStmt *) parsetree; + + /* if a partition is being created */ + if (createStatement->inhRelations != NIL && createStatement->partbound != NULL) + { + RangeVar *parentRelation = linitial(createStatement->inhRelations); + char *parentSchemaName = parentRelation->schemaname ? + parentRelation->schemaname : "public"; + + Oid parentId = get_relname_relid(parentRelation->relname, get_namespace_oid( + parentSchemaName, false)); + char *schemaName = createStatement->relation->schemaname ? + createStatement->relation->schemaname : "public"; + + Oid relationId = get_relname_relid(createStatement->relation->relname, + get_namespace_oid(schemaName, false)); + + /* if the table is being attached to a distribtued table, it should be distributed as well */ + if (IsDistributedTable(parentId)) + { + Var *parentPartitionKey = DistPartitionKey(parentId); + char *parentPartitionKeyStr = + get_relid_attribute_name(parentId, + parentPartitionKey->varattno); + + CreateHashDistributedTable(relationId, parentPartitionKeyStr, + get_rel_name(parentId), 0, 0); + } + } + } + else if (IsA(parsetree, AlterTableStmt)) + { + AlterTableStmt *alterTableStatement = (AlterTableStmt *) parsetree; + List *commandList = alterTableStatement->cmds; + ListCell *commandCell = NULL; + + foreach(commandCell, commandList) + { + AlterTableCmd *alterTableCommand = (AlterTableCmd *) lfirst(commandCell); + + AlterTableType alterTableType = alterTableCommand->subtype; + + if (alterTableType == AT_AttachPartition) + { + PartitionCmd *partitionCommand = (PartitionCmd *) alterTableCommand->def; + + char *relationName = partitionCommand->name->relname; + char *schemaName = partitionCommand->name->schemaname ? + partitionCommand->name->schemaname : "public"; + + Oid relationId = get_relname_relid(relationName, + get_namespace_oid(schemaName, false)); + Oid parentId = PartitionParentOid(relationId); + + if (IsDistributedTable(parentId)) + { + Var *parentPartitionKey = DistPartitionKey(parentId); + char *parentPartitionKeyStr = + get_relid_attribute_name(parentId, + parentPartitionKey->varattno); + + CreateHashDistributedTable(relationId, parentPartitionKeyStr, + get_rel_name(parentId), 0, 0); + } + } + } + } + + +#endif + /* TODO: fold VACUUM's processing into the above block */ if (IsA(parsetree, VacuumStmt)) { @@ -1035,6 +1110,30 @@ PlanAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCo constraint->skip_validation = true; } } +#if (PG_VERSION_NUM >= 100000) + else if (alterTableType == AT_AttachPartition) + { + PartitionCmd *partitionCommand = (PartitionCmd *) command->def; + + char *relationName = partitionCommand->name->relname; + char *schemaName = partitionCommand->name->schemaname ? + partitionCommand->name->schemaname : "public"; + + Oid relationId = get_relname_relid(relationName, + get_namespace_oid(schemaName, false)); + Oid parentId = leftRelationId; + + /* + * Do not generate tasks if relation is not distributed and the parent + * is distributed. Because, we'll manually convert the relation into + * distribtued relation and co-locate with its parent. + */ + if (!IsDistributedTable(relationId) && IsDistributedTable(parentId)) + { + return NIL; + } + } +#endif } ddlJob = palloc0(sizeof(DDLJob)); @@ -1783,14 +1882,17 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement) case AT_DropConstraint: case AT_EnableTrigAll: case AT_DisableTrigAll: - { - /* - * We will not perform any special check for ALTER TABLE DROP CONSTRAINT - * , ALTER TABLE .. ALTER COLUMN .. SET NOT NULL and ALTER TABLE ENABLE/ - * DISABLE TRIGGER ALL - */ - break; - } +#if (PG_VERSION_NUM >= 100000) + case AT_AttachPartition: +#endif + { + /* + * We will not perform any special check for ALTER TABLE DROP CONSTRAINT + * , ALTER TABLE .. ALTER COLUMN .. SET NOT NULL and ALTER TABLE ENABLE/ + * DISABLE TRIGGER ALL + */ + break; + } default: { diff --git a/src/backend/distributed/master/master_create_shards.c b/src/backend/distributed/master/master_create_shards.c index 709123089..5b0dc5bc1 100644 --- a/src/backend/distributed/master/master_create_shards.c +++ b/src/backend/distributed/master/master_create_shards.c @@ -31,6 +31,7 @@ #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/multi_join_order.h" +#include "distributed/multi_partitioning_utils.h" #include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_shard.h" #include "distributed/reference_table_utils.h" @@ -251,6 +252,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId) List *targetTableForeignConstraintCommands = NIL; ListCell *sourceShardCell = NULL; bool includeSequenceDefaults = false; + char *alterTableAttachPartitionCommand = NULL; /* make sure that tables are hash partitioned */ CheckHashPartitionedTable(targetRelationId); @@ -288,6 +290,13 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId) targetTableDDLEvents = GetTableDDLEvents(targetRelationId, includeSequenceDefaults); targetTableForeignConstraintCommands = GetTableForeignConstraintCommands( targetRelationId); + + if (PartitionTable(targetRelationId)) + { + alterTableAttachPartitionCommand = + GenerateAlterTableAttachPartitionCommand(targetRelationId); + } + targetShardStorageType = ShardStorageType(targetRelationId); foreach(sourceShardCell, sourceShardIntervalList) @@ -315,7 +324,8 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId) sourceNodePort, sourceShardIndex, newShardId, targetTableRelationOwner, targetTableDDLEvents, - targetTableForeignConstraintCommands); + targetTableForeignConstraintCommands, + alterTableAttachPartitionCommand); if (created) { const RelayFileState shardState = FILE_FINALIZED; diff --git a/src/backend/distributed/master/master_node_protocol.c b/src/backend/distributed/master/master_node_protocol.c index a5c2f9d46..01567e4a5 100644 --- a/src/backend/distributed/master/master_node_protocol.c +++ b/src/backend/distributed/master/master_node_protocol.c @@ -736,7 +736,11 @@ ShardStorageType(Oid relationId) char shardStorageType = 0; char relationType = get_rel_relkind(relationId); +#if (PG_VERSION_NUM >= 100000) + if (relationType == RELKIND_RELATION || relationType == RELKIND_PARTITIONED_TABLE) +#else if (relationType == RELKIND_RELATION) +#endif { shardStorageType = SHARD_STORAGE_TABLE; } diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index d932b92f2..1dccb01df 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -24,6 +24,9 @@ #include "commands/tablecmds.h" #include "catalog/indexing.h" #include "catalog/namespace.h" +#if (PG_VERSION_NUM >= 100000) +#include "catalog/partition.h" +#endif #include "distributed/colocation_utils.h" #include "distributed/connection_management.h" #include "distributed/multi_client_executor.h" @@ -31,6 +34,7 @@ #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/multi_join_order.h" +#include "distributed/multi_partitioning_utils.h" #include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_shard.h" #include "distributed/placement_connection.h" @@ -396,10 +400,12 @@ CreateShardPlacements(Oid relationId, int64 shardId, List *ddlEventList, relationId); int shardIndex = -1; /* not used in this code path */ bool created = false; + char *alterTableAttachPartitionCommand = NULL; created = WorkerCreateShard(relationId, nodeName, nodePort, shardIndex, shardId, newPlacementOwner, ddlEventList, - foreignConstraintCommandList); + foreignConstraintCommandList, + alterTableAttachPartitionCommand); if (created) { const RelayFileState shardState = FILE_FINALIZED; @@ -438,7 +444,8 @@ CreateShardPlacements(Oid relationId, int64 shardId, List *ddlEventList, bool WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort, int shardIndex, uint64 shardId, char *newShardOwner, - List *ddlCommandList, List *foreignConstraintCommandList) + List *ddlCommandList, List *foreignConstraintCommandList, + char *alterTableAttachPartitionCommand) { Oid schemaId = get_rel_namespace(relationId); char *schemaName = get_namespace_name(schemaId); @@ -531,6 +538,45 @@ WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort, } } + /* + * If the shard is created for a partition, send the command to create the + * partitioning hierarcy on the shard. + */ + if (alterTableAttachPartitionCommand != NULL) + { + Oid parentRelationId = PartitionParentOid(relationId); + uint64 correspondingParentShardId = InvalidOid; + StringInfo applyAttachPartitionCommand = makeStringInfo(); + List *queryResultList = NIL; + + Oid parentSchemaId = InvalidOid; + char *parentSchemaName = NULL; + char *escapedParentSchemaName = NULL; + char *escapedCommand = NULL; + + Assert(PartitionTable(relationId)); + + parentSchemaId = get_rel_namespace(parentRelationId); + parentSchemaName = get_namespace_name(parentSchemaId); + escapedParentSchemaName = quote_literal_cstr(parentSchemaName); + escapedCommand = quote_literal_cstr(alterTableAttachPartitionCommand); + + correspondingParentShardId = ColocatedShardIdInRelation(parentRelationId, + shardIndex); + + appendStringInfo(applyAttachPartitionCommand, + WORKER_APPLY_INTER_SHARD_DDL_COMMAND, correspondingParentShardId, + escapedParentSchemaName, shardId, escapedSchemaName, + escapedCommand); + + queryResultList = ExecuteRemoteQuery(nodeName, nodePort, newShardOwner, + applyAttachPartitionCommand); + if (queryResultList == NIL) + { + shardCreated = false; + } + } + return shardCreated; } diff --git a/src/backend/distributed/utils/citus_ruleutils.c b/src/backend/distributed/utils/citus_ruleutils.c index a543669e6..722af4637 100644 --- a/src/backend/distributed/utils/citus_ruleutils.c +++ b/src/backend/distributed/utils/citus_ruleutils.c @@ -59,7 +59,6 @@ static void AppendOptionListToString(StringInfo stringData, List *options); -static bool SupportedRelationKind(Relation relation); static const char * convert_aclright_to_string(int aclright); @@ -494,7 +493,7 @@ pg_get_tableschemadef_string(Oid tableRelationId, bool includeSequenceDefaults) * SupportedRelationKind returns true if the given relation is supported as a * distributed relation. */ -static bool +bool SupportedRelationKind(Relation relation) { char relationKind = relation->rd_rel->relkind; diff --git a/src/backend/distributed/utils/multi_partitioning_utils.c b/src/backend/distributed/utils/multi_partitioning_utils.c index 23dcdde1e..4fd60c6b3 100644 --- a/src/backend/distributed/utils/multi_partitioning_utils.c +++ b/src/backend/distributed/utils/multi_partitioning_utils.c @@ -197,6 +197,26 @@ PartitionList(Oid parentRelationId) } +/* + * Wrapper around get_partition_parent + * * + * Note: Because this function assumes that the relation whose OID is passed + * as an argument will have precisely one parent, it should only be called + * when it is known that the relation is a partition. + */ +Oid +PartitionParentOid(Oid partitionOid) +{ + Oid partitionParentOid = InvalidOid; + +#if (PG_VERSION_NUM >= 100000) + partitionParentOid = get_partition_parent(partitionOid); +#endif + + return partitionParentOid; +} + + /* * GenerateDetachPartitionCommand gets a partition table and returns * "ALTER TABLE parent_table DETACH PARTITION partitionName" command. diff --git a/src/include/distributed/citus_ruleutils.h b/src/include/distributed/citus_ruleutils.h index 213dec2fa..2135d691a 100644 --- a/src/include/distributed/citus_ruleutils.h +++ b/src/include/distributed/citus_ruleutils.h @@ -34,6 +34,7 @@ extern char * pg_get_serverdef_string(Oid tableRelationId); extern char * pg_get_sequencedef_string(Oid sequenceRelid); extern Form_pg_sequence pg_get_sequencedef(Oid sequenceRelationId); extern char * pg_get_tableschemadef_string(Oid tableRelationId, bool forShardCreation); +extern bool SupportedRelationKind(Relation relation); extern char * pg_get_tablecolumnoptionsdef_string(Oid tableRelationId); extern void deparse_shard_index_statement(IndexStmt *origStmt, Oid distrelid, int64 shardid, StringInfo buffer); @@ -48,5 +49,9 @@ extern void deparse_shard_query(Query *query, Oid distrelid, int64 shardid, extern char * generate_relation_name(Oid relid, List *namespaces); extern char * generate_qualified_relation_name(Oid relid); +/* TODO: THIS SHOULD NOT BE HERE */ +extern void CreateHashDistributedTable(Oid relationId, char *distributionColumnName, + char *colocateWithTableName, + int shardCount, int replicationFactor); #endif /* CITUS_RULEUTILS_H */ diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index 3f1d102fd..0d4e3e8b9 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -116,7 +116,8 @@ extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId); extern void CreateReferenceTableShard(Oid distributedTableId); extern bool WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort, int shardIndex, uint64 shardId, char *newShardOwner, - List *ddlCommandList, List *foreignConstraintCommadList); + List *ddlCommandList, List *foreignConstraintCommadList, + char *alterTableAttachPartitionCommand); extern Oid ForeignConstraintGetReferencedTableId(char *queryString); extern void CheckHashPartitionedTable(Oid distributedTableId); extern void CheckTableSchemaNameForDrop(Oid relationId, char **schemaName, diff --git a/src/include/distributed/multi_partitioning_utils.h b/src/include/distributed/multi_partitioning_utils.h index 4dc5f0616..5b95de830 100644 --- a/src/include/distributed/multi_partitioning_utils.h +++ b/src/include/distributed/multi_partitioning_utils.h @@ -16,6 +16,7 @@ extern bool PartitionTable(Oid relationId); extern bool IsChildTable(Oid relationId); extern bool IsParentTable(Oid relationId); extern List * PartitionList(Oid parentRelationId); +extern Oid PartitionParentOid(Oid partitionOid); extern char * GenerateDetachPartitionCommand(Oid partitionTableId); extern char * GenerateAlterTableAttachPartitionCommand(Oid partitionTableId); extern char * GeneratePartitioningInformation(Oid tableId); diff --git a/src/test/regress/expected/multi_partitioning.out b/src/test/regress/expected/multi_partitioning.out new file mode 100644 index 000000000..157994e04 --- /dev/null +++ b/src/test/regress/expected/multi_partitioning.out @@ -0,0 +1,191 @@ +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1660000; +CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time); + +-- create its partitions +CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01'); +CREATE TABLE partitioning_test_2010 PARTITION OF partitioning_test FOR VALUES FROM ('2010-01-01') TO ('2011-01-01'); +-- load some data and distribute tables +INSERT INTO partitioning_test VALUES (1, '2009-06-06'); +INSERT INTO partitioning_test VALUES (2, '2010-07-07'); +INSERT INTO partitioning_test_2009 VALUES (3, '2009-09-09'); +INSERT INTO partitioning_test_2010 VALUES (4, '2010-03-03'); +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 1; +-- this should error out given that parent of the partition is not distributed +SELECT create_distributed_table('partitioning_test_2010', 'id'); +ERROR: cannot distributed relation "partitioning_test_2010" which is partition of "partitioning_test" +DETAIL: Citus does not support partitioning among local tables and distributed tables +HINT: First distribute the partitioned table "partitioning_test" +-- this should suceed +SELECT create_distributed_table('partitioning_test', 'id'); +NOTICE: Copying data from local table... +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +-- check the data +SELECT * FROM partitioning_test ORDER BY 1; + id | time +----+------------ + 1 | 06-06-2009 + 2 | 07-07-2010 + 3 | 09-09-2009 + 4 | 03-03-2010 +(4 rows) + +-- check the metadata +SELECT + * +FROM + pg_dist_partition +WHERE + logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') +ORDER BY 1; + logicalrelid | partmethod | partkey | colocationid | repmodel +------------------------+------------+------------------------------------------------------------------------------------------------------------------------+--------------+---------- + partitioning_test | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 2 | c + partitioning_test_2009 | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 2 | c + partitioning_test_2010 | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 2 | c +(3 rows) + +SELECT + logicalrelid, count(*) +FROM pg_dist_shard + WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') +GROUP BY + logicalrelid +ORDER BY + 1,2; + logicalrelid | count +------------------------+------- + partitioning_test | 4 + partitioning_test_2009 | 4 + partitioning_test_2010 | 4 +(3 rows) + +SELECT + nodename, nodeport, count(*) +FROM + pg_dist_shard_placement +WHERE + shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') ) +GROUP BY + nodename, nodeport +ORDER BY + 1,2,3; + nodename | nodeport | count +-----------+----------+------- + localhost | 57637 | 6 + localhost | 57638 | 6 +(2 rows) + +-- now create a partition and see that it also becomes a distributed table +CREATE TABLE partitioning_test_2011 PARTITION OF partitioning_test FOR VALUES FROM ('2011-01-01') TO ('2012-01-01'); +SELECT + * +FROM + pg_dist_partition +WHERE + logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010', 'partitioning_test_2011') +ORDER BY 1; + logicalrelid | partmethod | partkey | colocationid | repmodel +------------------------+------------+------------------------------------------------------------------------------------------------------------------------+--------------+---------- + partitioning_test | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 2 | c + partitioning_test_2009 | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 2 | c + partitioning_test_2010 | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 2 | c + partitioning_test_2011 | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 2 | c +(4 rows) + +SELECT + logicalrelid, count(*) +FROM pg_dist_shard + WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010', 'partitioning_test_2011') +GROUP BY + logicalrelid +ORDER BY + 1,2; + logicalrelid | count +------------------------+------- + partitioning_test | 4 + partitioning_test_2009 | 4 + partitioning_test_2010 | 4 + partitioning_test_2011 | 4 +(4 rows) + +SELECT + nodename, nodeport, count(*) +FROM + pg_dist_shard_placement +WHERE + shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010', 'partitioning_test_2011') ) +GROUP BY + nodename, nodeport +ORDER BY + 1,2,3; + nodename | nodeport | count +-----------+----------+------- + localhost | 57637 | 8 + localhost | 57638 | 8 +(2 rows) + +-- citus can also support ALTER TABLE .. ATTACH PARTITION +-- even if the partition is not distributed +CREATE TABLE partitioning_test_2012(id int, time date); +ALTER TABLE partitioning_test ATTACH PARTITION partitioning_test_2012 FOR VALUES FROM ('2012-01-01') TO ('2013-01-01'); +SELECT + * +FROM + pg_dist_partition +WHERE + logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010', 'partitioning_test_2011', 'partitioning_test_2012') +ORDER BY 1; + logicalrelid | partmethod | partkey | colocationid | repmodel +------------------------+------------+------------------------------------------------------------------------------------------------------------------------+--------------+---------- + partitioning_test | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 2 | c + partitioning_test_2009 | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 2 | c + partitioning_test_2010 | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 2 | c + partitioning_test_2011 | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 2 | c + partitioning_test_2012 | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 2 | c +(5 rows) + +SELECT + logicalrelid, count(*) +FROM pg_dist_shard + WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010', 'partitioning_test_2011', 'partitioning_test_2012') +GROUP BY + logicalrelid +ORDER BY + 1,2; + logicalrelid | count +------------------------+------- + partitioning_test | 4 + partitioning_test_2009 | 4 + partitioning_test_2010 | 4 + partitioning_test_2011 | 4 + partitioning_test_2012 | 4 +(5 rows) + +SELECT + nodename, nodeport, count(*) +FROM + pg_dist_shard_placement +WHERE + shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010', 'partitioning_test_2011', 'partitioning_test_2012') ) +GROUP BY + nodename, nodeport +ORDER BY + 1,2,3; + nodename | nodeport | count +-----------+----------+------- + localhost | 57637 | 10 + localhost | 57638 | 10 +(2 rows) + +-- dropping the parent should CASCADE to the children as well +DROP TABLE partitioning_test; +\d+ partitioning_test* +-- set the colocationid sequence back to 1 to make sure +-- that this file does not break other tests +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1; diff --git a/src/test/regress/expected/multi_partitioning_0.out b/src/test/regress/expected/multi_partitioning_0.out new file mode 100644 index 000000000..0531616af --- /dev/null +++ b/src/test/regress/expected/multi_partitioning_0.out @@ -0,0 +1,177 @@ +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1660000; +CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time); +ERROR: syntax error at or near "PARTITION" +LINE 1: CREATE TABLE partitioning_test(id int, time date) PARTITION ... + ^ + +-- create its partitions +CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01'); +ERROR: syntax error at or near "PARTITION" +LINE 1: CREATE TABLE partitioning_test_2009 PARTITION OF partitionin... + ^ +CREATE TABLE partitioning_test_2010 PARTITION OF partitioning_test FOR VALUES FROM ('2010-01-01') TO ('2011-01-01'); +ERROR: syntax error at or near "PARTITION" +LINE 1: CREATE TABLE partitioning_test_2010 PARTITION OF partitionin... + ^ +-- load some data and distribute tables +INSERT INTO partitioning_test VALUES (1, '2009-06-06'); +ERROR: relation "partitioning_test" does not exist +LINE 1: INSERT INTO partitioning_test VALUES (1, '2009-06-06'); + ^ +INSERT INTO partitioning_test VALUES (2, '2010-07-07'); +ERROR: relation "partitioning_test" does not exist +LINE 1: INSERT INTO partitioning_test VALUES (2, '2010-07-07'); + ^ +INSERT INTO partitioning_test_2009 VALUES (3, '2009-09-09'); +ERROR: relation "partitioning_test_2009" does not exist +LINE 1: INSERT INTO partitioning_test_2009 VALUES (3, '2009-09-09'); + ^ +INSERT INTO partitioning_test_2010 VALUES (4, '2010-03-03'); +ERROR: relation "partitioning_test_2010" does not exist +LINE 1: INSERT INTO partitioning_test_2010 VALUES (4, '2010-03-03'); + ^ +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 1; +-- this should error out given that parent of the partition is not distributed +SELECT create_distributed_table('partitioning_test_2010', 'id'); +ERROR: relation "partitioning_test_2010" does not exist +LINE 1: SELECT create_distributed_table('partitioning_test_2010', 'i... + ^ +-- this should suceed +SELECT create_distributed_table('partitioning_test', 'id'); +ERROR: relation "partitioning_test" does not exist +LINE 1: SELECT create_distributed_table('partitioning_test', 'id'); + ^ +-- check the data +SELECT * FROM partitioning_test ORDER BY 1; +ERROR: relation "partitioning_test" does not exist +LINE 1: SELECT * FROM partitioning_test ORDER BY 1; + ^ +-- check the metadata +SELECT + * +FROM + pg_dist_partition +WHERE + logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') +ORDER BY 1; +ERROR: relation "partitioning_test" does not exist +LINE 6: logicalrelid IN ('partitioning_test', 'partitioning_test_20... + ^ +SELECT + logicalrelid, count(*) +FROM pg_dist_shard + WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') +GROUP BY + logicalrelid +ORDER BY + 1,2; +ERROR: relation "partitioning_test" does not exist +LINE 4: WHERE logicalrelid IN ('partitioning_test', 'partitioning_t... + ^ +SELECT + nodename, nodeport, count(*) +FROM + pg_dist_shard_placement +WHERE + shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') ) +GROUP BY + nodename, nodeport +ORDER BY + 1,2,3; +ERROR: relation "partitioning_test" does not exist +LINE 6: ...shardid FROM pg_dist_shard WHERE logicalrelid IN ('partition... + ^ +-- now create a partition and see that it also becomes a distributed table +CREATE TABLE partitioning_test_2011 PARTITION OF partitioning_test FOR VALUES FROM ('2011-01-01') TO ('2012-01-01'); +ERROR: syntax error at or near "PARTITION" +LINE 1: CREATE TABLE partitioning_test_2011 PARTITION OF partitionin... + ^ +SELECT + * +FROM + pg_dist_partition +WHERE + logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010', 'partitioning_test_2011') +ORDER BY 1; +ERROR: relation "partitioning_test" does not exist +LINE 6: logicalrelid IN ('partitioning_test', 'partitioning_test_20... + ^ +SELECT + logicalrelid, count(*) +FROM pg_dist_shard + WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010', 'partitioning_test_2011') +GROUP BY + logicalrelid +ORDER BY + 1,2; +ERROR: relation "partitioning_test" does not exist +LINE 4: WHERE logicalrelid IN ('partitioning_test', 'partitioning_t... + ^ +SELECT + nodename, nodeport, count(*) +FROM + pg_dist_shard_placement +WHERE + shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010', 'partitioning_test_2011') ) +GROUP BY + nodename, nodeport +ORDER BY + 1,2,3; +ERROR: relation "partitioning_test" does not exist +LINE 6: ...shardid FROM pg_dist_shard WHERE logicalrelid IN ('partition... + ^ +-- citus can also support ALTER TABLE .. ATTACH PARTITION +-- even if the partition is not distributed +CREATE TABLE partitioning_test_2012(id int, time date); +ALTER TABLE partitioning_test ATTACH PARTITION partitioning_test_2012 FOR VALUES FROM ('2012-01-01') TO ('2013-01-01'); +ERROR: syntax error at or near "ATTACH" +LINE 1: ALTER TABLE partitioning_test ATTACH PARTITION partitioning_... + ^ +SELECT + * +FROM + pg_dist_partition +WHERE + logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010', 'partitioning_test_2011', 'partitioning_test_2012') +ORDER BY 1; +ERROR: relation "partitioning_test" does not exist +LINE 6: logicalrelid IN ('partitioning_test', 'partitioning_test_20... + ^ +SELECT + logicalrelid, count(*) +FROM pg_dist_shard + WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010', 'partitioning_test_2011', 'partitioning_test_2012') +GROUP BY + logicalrelid +ORDER BY + 1,2; +ERROR: relation "partitioning_test" does not exist +LINE 4: WHERE logicalrelid IN ('partitioning_test', 'partitioning_t... + ^ +SELECT + nodename, nodeport, count(*) +FROM + pg_dist_shard_placement +WHERE + shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010', 'partitioning_test_2011', 'partitioning_test_2012') ) +GROUP BY + nodename, nodeport +ORDER BY + 1,2,3; +ERROR: relation "partitioning_test" does not exist +LINE 6: ...shardid FROM pg_dist_shard WHERE logicalrelid IN ('partition... + ^ +-- dropping the parent should CASCADE to the children as well +DROP TABLE partitioning_test; +ERROR: table "partitioning_test" does not exist +\d+ partitioning_test* + Table "public.partitioning_test_2012" + Column | Type | Modifiers | Storage | Stats target | Description +--------+---------+-----------+---------+--------------+------------- + id | integer | | plain | | + time | date | | plain | | + +-- set the colocationid sequence back to 1 to make sure +-- that this file does not break other tests +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1; diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 935e6cefd..fb7289ee5 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -26,7 +26,7 @@ test: multi_metadata_access # --- # Tests for partitioning support # --- -test: multi_partitioning_utils +test: multi_partitioning_utils multi_partitioning # ---------- # The following distributed tests depend on creating a partitioned table and diff --git a/src/test/regress/sql/multi_partitioning.sql b/src/test/regress/sql/multi_partitioning.sql new file mode 100644 index 000000000..cb1241c37 --- /dev/null +++ b/src/test/regress/sql/multi_partitioning.sql @@ -0,0 +1,130 @@ +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1660000; + +CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time); + +-- create its partitions +CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01'); +CREATE TABLE partitioning_test_2010 PARTITION OF partitioning_test FOR VALUES FROM ('2010-01-01') TO ('2011-01-01'); + +-- load some data and distribute tables +INSERT INTO partitioning_test VALUES (1, '2009-06-06'); +INSERT INTO partitioning_test VALUES (2, '2010-07-07'); + +INSERT INTO partitioning_test_2009 VALUES (3, '2009-09-09'); +INSERT INTO partitioning_test_2010 VALUES (4, '2010-03-03'); + +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 1; + +-- this should error out given that parent of the partition is not distributed +SELECT create_distributed_table('partitioning_test_2010', 'id'); + +-- this should suceed +SELECT create_distributed_table('partitioning_test', 'id'); + +-- check the data +SELECT * FROM partitioning_test ORDER BY 1; + +-- check the metadata +SELECT + * +FROM + pg_dist_partition +WHERE + logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') +ORDER BY 1; + +SELECT + logicalrelid, count(*) +FROM pg_dist_shard + WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') +GROUP BY + logicalrelid +ORDER BY + 1,2; + +SELECT + nodename, nodeport, count(*) +FROM + pg_dist_shard_placement +WHERE + shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') ) +GROUP BY + nodename, nodeport +ORDER BY + 1,2,3; + +-- now create a partition and see that it also becomes a distributed table +CREATE TABLE partitioning_test_2011 PARTITION OF partitioning_test FOR VALUES FROM ('2011-01-01') TO ('2012-01-01'); + +SELECT + * +FROM + pg_dist_partition +WHERE + logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010', 'partitioning_test_2011') +ORDER BY 1; + +SELECT + logicalrelid, count(*) +FROM pg_dist_shard + WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010', 'partitioning_test_2011') +GROUP BY + logicalrelid +ORDER BY + 1,2; + +SELECT + nodename, nodeport, count(*) +FROM + pg_dist_shard_placement +WHERE + shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010', 'partitioning_test_2011') ) +GROUP BY + nodename, nodeport +ORDER BY + 1,2,3; + + +-- citus can also support ALTER TABLE .. ATTACH PARTITION +-- even if the partition is not distributed +CREATE TABLE partitioning_test_2012(id int, time date); +ALTER TABLE partitioning_test ATTACH PARTITION partitioning_test_2012 FOR VALUES FROM ('2012-01-01') TO ('2013-01-01'); + +SELECT + * +FROM + pg_dist_partition +WHERE + logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010', 'partitioning_test_2011', 'partitioning_test_2012') +ORDER BY 1; + +SELECT + logicalrelid, count(*) +FROM pg_dist_shard + WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010', 'partitioning_test_2011', 'partitioning_test_2012') +GROUP BY + logicalrelid +ORDER BY + 1,2; + +SELECT + nodename, nodeport, count(*) +FROM + pg_dist_shard_placement +WHERE + shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010', 'partitioning_test_2011', 'partitioning_test_2012') ) +GROUP BY + nodename, nodeport +ORDER BY + 1,2,3; + + +-- dropping the parent should CASCADE to the children as well +DROP TABLE partitioning_test; + +\d+ partitioning_test* + +-- set the colocationid sequence back to 1 to make sure +-- that this file does not break other tests +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1;