From c594a056ae5e7b595de647bfc8c0cccc95410814 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Thu, 22 Jun 2017 12:39:40 +0300 Subject: [PATCH] First pass of creating distributed tables --- .../commands/create_distributed_table.c | 201 +++++++++++++----- .../distributed/master/master_create_shards.c | 12 +- .../distributed/master/master_node_protocol.c | 4 + .../master/master_stage_protocol.c | 50 ++++- .../distributed/utils/citus_ruleutils.c | 3 +- .../utils/multi_partitioning_utils.c | 20 ++ src/include/distributed/citus_ruleutils.h | 1 + src/include/distributed/master_protocol.h | 3 +- .../distributed/multi_partitioning_utils.h | 1 + .../regress/expected/multi_partitioning.out | 86 ++++++++ .../regress/expected/multi_partitioning_0.out | 88 ++++++++ .../expected/multi_reference_table.out | 2 +- src/test/regress/multi_schedule | 2 +- src/test/regress/sql/multi_partitioning.sql | 60 ++++++ 14 files changed, 477 insertions(+), 56 deletions(-) create mode 100644 src/test/regress/expected/multi_partitioning.out create mode 100644 src/test/regress/expected/multi_partitioning_0.out create mode 100644 src/test/regress/sql/multi_partitioning.sql diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index a97042bda..dfd701962 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -29,6 +29,7 @@ #include "commands/defrem.h" #include "commands/extension.h" #include "commands/trigger.h" +#include "distributed/citus_ruleutils.h" #include "distributed/colocation_utils.h" #include "distributed/distribution_column.h" #include "distributed/master_metadata_utility.h" @@ -38,6 +39,7 @@ #include "distributed/multi_copy.h" #include "distributed/multi_logical_planner.h" #include "distributed/multi_utility.h" +#include "distributed/multi_partitioning_utils.h" #include "distributed/pg_dist_colocation.h" #include "distributed/pg_dist_partition.h" #include "distributed/reference_table_utils.h" @@ -80,7 +82,8 @@ static void CreateHashDistributedTable(Oid relationId, char *distributionColumnN char *colocateWithTableName, int shardCount, int replicationFactor); static Oid ColumnType(Oid relationId, char *columnName); -static void CopyLocalDataIntoShards(Oid relationId); +static void CopyLocalDataIntoShards(Oid destinationDistributedRelationId, List * + sourceLocalRelationList); static List * TupleDescColumnNameList(TupleDesc tupleDescriptor); #if (PG_VERSION_NUM >= 100000) static bool RelationUsesIdentityColumns(TupleDesc relationDesc); @@ -280,7 +283,7 @@ CreateReferenceTable(Oid relationId) /* copy over data for regular relations */ if (relationKind == RELKIND_RELATION) { - CopyLocalDataIntoShards(relationId); + CopyLocalDataIntoShards(relationId, list_make1_oid(relationId)); } } @@ -339,9 +342,9 @@ ConvertToDistributedTable(Oid relationId, char *distributionColumnName, "(OIDS) option in their definitions."))); } - /* verify target relation is either regular or foreign table */ relationKind = relation->rd_rel->relkind; - if (relationKind != RELKIND_RELATION && relationKind != RELKIND_FOREIGN_TABLE) + + if (!SupportedRelationKind(relation)) { ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot distribute relation: %s", @@ -351,13 +354,6 @@ ConvertToDistributedTable(Oid relationId, char *distributionColumnName, } #if (PG_VERSION_NUM >= 100000) - if (relation->rd_rel->relispartition) - { - ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("cannot distribute relation: %s", relationName), - errdetail("Distributing partition tables is unsupported."))); - } - if (RelationUsesIdentityColumns(relationDesc)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), @@ -367,6 +363,20 @@ ConvertToDistributedTable(Oid relationId, char *distributionColumnName, } #endif + /* partitions cannot be distributed unless thier parent is already partitioned */ + if (PartitionTable(relationId) && !IsDistributedTable(PartitionParentOid(relationId))) + { + char *parentRelationName = get_rel_name(PartitionParentOid(relationId)); + char *relationName = get_rel_name(relationId); + + ereport(ERROR, (errmsg("cannot distributed relation \"%s\" which is partition" + " of \"%s\"", relationName, parentRelationName), + errdetail("Citus does not support partitioning among local " + "tables and distributed tables"), + errhint("First distribute the partitioned table \"%s\"", + parentRelationName))); + } + /* check that table is empty if that is required */ if (requireEmpty && !LocalTableEmpty(relationId)) { @@ -664,7 +674,11 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName, /* relax empty table requirement for regular (non-foreign) tables */ relationKind = get_rel_relkind(relationId); +#if (PG_VERSION_NUM >= 100000) + if (relationKind == RELKIND_RELATION || relationKind == RELKIND_PARTITIONED_TABLE) +#else if (relationKind == RELKIND_RELATION) +#endif { requireEmpty = false; } @@ -688,10 +702,53 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName, CreateShardsWithRoundRobinPolicy(relationId, shardCount, replicationFactor); } - /* copy over data for regular relations */ - if (relationKind == RELKIND_RELATION) + /* + * Copy over data for regular relations. Note that here we skip the partitions + * and copy them via the partitioned table below. + */ + if (relationKind == RELKIND_RELATION && !PartitionTable(relationId)) { - CopyLocalDataIntoShards(relationId); + CopyLocalDataIntoShards(relationId, list_make1_oid(relationId)); + } + + /* we also need to check for partitioned tables */ + if (PartitionedTable(relationId)) + { + List *partitionList = PartitionList(relationId); + char *parentRelationName = get_rel_name(relationId); + ListCell *partitionCell = NULL; + + /* partitioned distributed tables are only supported with replication */ + if (replicationFactor != 1) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("cannot distribute relation \"%s\"", + parentRelationName), + errdetail("Relation \"%s\" has partitions and " + "distributing partitioned tables " + "with shard replication factor greater " + "than 1 is not supported.", parentRelationName))); + } + + /* + * For each partition, create a co-located hash distributed table. Note that + * we pick the parent tables distribution key as the distribution key for + * the partitions. + */ + foreach(partitionCell, partitionList) + { + Oid partitionOid = lfirst_oid(partitionCell); + + CreateHashDistributedTable(partitionOid, distributionColumnName, + parentRelationName, shardCount, + replicationFactor); + } + + /* + * After converting all the partition tables into distributed tables, now lets + * copy the existing data from the local partitions. + */ + CopyLocalDataIntoShards(relationId, partitionList); } heap_close(pgDistColocation, NoLock); @@ -742,7 +799,7 @@ EnsureReplicationSettings(Oid relationId, char replicationModel) /* - * CopyLocalDataIntoShards copies data from the local table, which is hidden + * CopyLocalDataIntoShards copies data from list of local tables, which are hidden * after converting it to a distributed table, into the shards of the distributed * table. * @@ -758,33 +815,68 @@ EnsureReplicationSettings(Oid relationId, char replicationModel) * want to read from the local table. To keep it simple, we perform a heap scan * directly on the table. * - * Any writes on the table that are started during this operation will be handled - * as distributed queries once the current transaction commits. SELECTs will - * continue to read from the local table until the current transaction commits, - * after which new SELECTs will be handled as distributed queries. + * Any writes on both the source tables and target table that are started + * during this operation will be handled as distributed queries once the + * current transaction commits. SELECTs will continue to read from the + * local table until the current transaction commits, after which new + * SELECTs will be handled as distributed queries. * * After copying local data into the distributed table, the local data remains * in place and should be truncated at a later time. + * + * Note that it is allowed that destinationDistributedRelationId also appears on + * sourceOidList. */ static void -CopyLocalDataIntoShards(Oid distributedRelationId) +CopyLocalDataIntoShards(Oid destinationDistributedRelationId, List *sourceOidList) { DestReceiver *copyDest = NULL; List *columnNameList = NIL; Relation distributedRelation = NULL; + List *sourceRelationList = NULL; TupleDesc tupleDescriptor = NULL; bool stopOnFailure = true; EState *estate = NULL; - HeapScanDesc scan = NULL; HeapTuple tuple = NULL; ExprContext *econtext = NULL; MemoryContext oldContext = NULL; TupleTableSlot *slot = NULL; uint64 rowsCopied = 0; + ListCell *sourceOidCell = NULL; + ListCell *sourceRelationCell = NULL; + /* take an ExclusiveLock to block all operations except SELECT */ - distributedRelation = heap_open(distributedRelationId, ExclusiveLock); + distributedRelation = heap_open(destinationDistributedRelationId, ExclusiveLock); + + /* take ExclusiveLock on all source relations as well */ + foreach(sourceOidCell, sourceOidList) + { + Oid sourceRelationId = lfirst_oid(sourceOidCell); + Relation localRelation = heap_open(sourceRelationId, ExclusiveLock); + + /* + * This check currently cannot be exercised by any code path. However, + * it should be here as a precaution in case the support is expanded. + */ + if (list_length(sourceOidList) > 1 && + PartitionParentOid(sourceRelationId) != destinationDistributedRelationId) + { + NameData localRelationNameData = localRelation->rd_rel->relname; + char *localRelationName = pstrdup(NameStr(localRelationNameData)); + + NameData distributedRelationNameData = localRelation->rd_rel->relname; + char *distributedRelationName = pstrdup(NameStr(distributedRelationNameData)); + + ereport(ERROR, (errmsg("Local relation \"%s\" cannot be copied into " + "distributed relation \"%s\"", localRelationName, + distributedRelationName), + errdetail("Relations do not have partitioning hierarcy"))); + } + + sourceRelationList = lappend(sourceRelationList, localRelation); + } /* * All writes have finished, make sure that we can see them by using the @@ -808,41 +900,59 @@ CopyLocalDataIntoShards(Oid distributedRelationId) econtext->ecxt_scantuple = slot; copyDest = - (DestReceiver *) CreateCitusCopyDestReceiver(distributedRelationId, + (DestReceiver *) CreateCitusCopyDestReceiver(destinationDistributedRelationId, columnNameList, estate, stopOnFailure); /* initialise state for writing to shards, we'll open connections on demand */ copyDest->rStartup(copyDest, 0, tupleDescriptor); - /* begin reading from local table */ - scan = heap_beginscan(distributedRelation, GetActiveSnapshot(), 0, NULL); - - oldContext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - - while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + /* iterate on the local relations, copy each into the destination relation */ + foreach(sourceRelationCell, sourceRelationList) { - /* materialize tuple and send it to a shard */ - ExecStoreTuple(tuple, slot, InvalidBuffer, false); - copyDest->receiveSlot(slot, copyDest); + Relation localRelation = (Relation) lfirst(sourceRelationCell); + bool printNoticeMessage = true; - /* clear tuple memory */ - ResetPerTupleExprContext(estate); + /* begin reading from local table */ + HeapScanDesc scan = heap_beginscan(localRelation, GetActiveSnapshot(), 0, NULL); - /* make sure we roll back on cancellation */ - CHECK_FOR_INTERRUPTS(); + oldContext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - if (rowsCopied == 0) + while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) { - ereport(NOTICE, (errmsg("Copying data from local table..."))); + /* materialize tuple and send it to a shard */ + ExecStoreTuple(tuple, slot, InvalidBuffer, false); + copyDest->receiveSlot(slot, copyDest); + + /* clear tuple memory */ + ResetPerTupleExprContext(estate); + + /* make sure we roll back on cancellation */ + CHECK_FOR_INTERRUPTS(); + + if (printNoticeMessage) + { + /* we only want to print this once per relation */ + printNoticeMessage = false; + + ereport(NOTICE, (errmsg("Copying data from local table..."))); + } + + rowsCopied++; + + if (rowsCopied % 1000000 == 0) + { + ereport(DEBUG1, (errmsg("Copied %ld rows", rowsCopied))); + } } - rowsCopied++; + MemoryContextSwitchTo(oldContext); - if (rowsCopied % 1000000 == 0) - { - ereport(DEBUG1, (errmsg("Copied %ld rows", rowsCopied))); - } + /* finish reading from the local table */ + heap_endscan(scan); + + /* keep the lock */ + heap_close(localRelation, NoLock); } if (rowsCopied % 1000000 != 0) @@ -850,11 +960,6 @@ CopyLocalDataIntoShards(Oid distributedRelationId) ereport(DEBUG1, (errmsg("Copied %ld rows", rowsCopied))); } - MemoryContextSwitchTo(oldContext); - - /* finish reading from the local table */ - heap_endscan(scan); - /* finish writing into the shards */ copyDest->rShutdown(copyDest); diff --git a/src/backend/distributed/master/master_create_shards.c b/src/backend/distributed/master/master_create_shards.c index a56f283bf..f2a670428 100644 --- a/src/backend/distributed/master/master_create_shards.c +++ b/src/backend/distributed/master/master_create_shards.c @@ -31,6 +31,7 @@ #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/multi_join_order.h" +#include "distributed/multi_partitioning_utils.h" #include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_shard.h" #include "distributed/reference_table_utils.h" @@ -251,6 +252,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId) List *targetTableForeignConstraintCommands = NIL; ListCell *sourceShardCell = NULL; bool includeSequenceDefaults = false; + char *alterTableAttachPartitionCommand = NULL; /* make sure that tables are hash partitioned */ CheckHashPartitionedTable(targetRelationId); @@ -288,6 +290,13 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId) targetTableDDLEvents = GetTableDDLEvents(targetRelationId, includeSequenceDefaults); targetTableForeignConstraintCommands = GetTableForeignConstraintCommands( targetRelationId); + + if (PartitionTable(targetRelationId)) + { + alterTableAttachPartitionCommand = + GenerateAlterTableAttachPartitionCommand(targetRelationId); + } + targetShardStorageType = ShardStorageType(targetRelationId); foreach(sourceShardCell, sourceShardIntervalList) @@ -315,7 +324,8 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId) sourceNodePort, sourceShardIndex, newShardId, targetTableRelationOwner, targetTableDDLEvents, - targetTableForeignConstraintCommands); + targetTableForeignConstraintCommands, + alterTableAttachPartitionCommand); if (created) { const RelayFileState shardState = FILE_FINALIZED; diff --git a/src/backend/distributed/master/master_node_protocol.c b/src/backend/distributed/master/master_node_protocol.c index a5c2f9d46..01567e4a5 100644 --- a/src/backend/distributed/master/master_node_protocol.c +++ b/src/backend/distributed/master/master_node_protocol.c @@ -736,7 +736,11 @@ ShardStorageType(Oid relationId) char shardStorageType = 0; char relationType = get_rel_relkind(relationId); +#if (PG_VERSION_NUM >= 100000) + if (relationType == RELKIND_RELATION || relationType == RELKIND_PARTITIONED_TABLE) +#else if (relationType == RELKIND_RELATION) +#endif { shardStorageType = SHARD_STORAGE_TABLE; } diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index fae23b171..bd6ba6716 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -24,6 +24,9 @@ #include "commands/tablecmds.h" #include "catalog/indexing.h" #include "catalog/namespace.h" +#if (PG_VERSION_NUM >= 100000) +#include "catalog/partition.h" +#endif #include "distributed/colocation_utils.h" #include "distributed/connection_management.h" #include "distributed/multi_client_executor.h" @@ -31,6 +34,7 @@ #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/multi_join_order.h" +#include "distributed/multi_partitioning_utils.h" #include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_shard.h" #include "distributed/placement_connection.h" @@ -396,10 +400,12 @@ CreateShardPlacements(Oid relationId, int64 shardId, List *ddlEventList, relationId); int shardIndex = -1; /* not used in this code path */ bool created = false; + char *alterTableAttachPartitionCommand = NULL; created = WorkerCreateShard(relationId, nodeName, nodePort, shardIndex, shardId, newPlacementOwner, ddlEventList, - foreignConstraintCommandList); + foreignConstraintCommandList, + alterTableAttachPartitionCommand); if (created) { const RelayFileState shardState = FILE_FINALIZED; @@ -438,7 +444,8 @@ CreateShardPlacements(Oid relationId, int64 shardId, List *ddlEventList, bool WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort, int shardIndex, uint64 shardId, char *newShardOwner, - List *ddlCommandList, List *foreignConstraintCommandList) + List *ddlCommandList, List *foreignConstraintCommandList, + char *alterTableAttachPartitionCommand) { Oid schemaId = get_rel_namespace(relationId); char *schemaName = get_namespace_name(schemaId); @@ -531,6 +538,45 @@ WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort, } } + /* + * If the shard is created for a partition, send the command to create the + * partitioning hierarcy on the shard. + */ + if (alterTableAttachPartitionCommand != NULL) + { + Oid parentRelationId = PartitionParentOid(relationId); + uint64 correspondingParentShardId = InvalidOid; + StringInfo applyAttachPartitionCommand = makeStringInfo(); + List *queryResultList = NIL; + + Oid parentSchemaId = InvalidOid; + char *parentSchemaName = NULL; + char *escapedParentSchemaName = NULL; + char *escapedCommand = NULL; + + Assert(PartitionTable(relationId)); + + parentSchemaId = get_rel_namespace(parentRelationId); + parentSchemaName = get_namespace_name(parentSchemaId); + escapedParentSchemaName = quote_literal_cstr(parentSchemaName); + escapedCommand = quote_literal_cstr(alterTableAttachPartitionCommand); + + correspondingParentShardId = ColocatedShardIdInRelation(parentRelationId, + shardIndex); + + appendStringInfo(applyAttachPartitionCommand, + WORKER_APPLY_INTER_SHARD_DDL_COMMAND, correspondingParentShardId, + escapedParentSchemaName, shardId, escapedSchemaName, + escapedCommand); + + queryResultList = ExecuteRemoteQuery(nodeName, nodePort, newShardOwner, + applyAttachPartitionCommand); + if (queryResultList == NIL) + { + shardCreated = false; + } + } + return shardCreated; } diff --git a/src/backend/distributed/utils/citus_ruleutils.c b/src/backend/distributed/utils/citus_ruleutils.c index a543669e6..722af4637 100644 --- a/src/backend/distributed/utils/citus_ruleutils.c +++ b/src/backend/distributed/utils/citus_ruleutils.c @@ -59,7 +59,6 @@ static void AppendOptionListToString(StringInfo stringData, List *options); -static bool SupportedRelationKind(Relation relation); static const char * convert_aclright_to_string(int aclright); @@ -494,7 +493,7 @@ pg_get_tableschemadef_string(Oid tableRelationId, bool includeSequenceDefaults) * SupportedRelationKind returns true if the given relation is supported as a * distributed relation. */ -static bool +bool SupportedRelationKind(Relation relation) { char relationKind = relation->rd_rel->relkind; diff --git a/src/backend/distributed/utils/multi_partitioning_utils.c b/src/backend/distributed/utils/multi_partitioning_utils.c index 23dcdde1e..4fd60c6b3 100644 --- a/src/backend/distributed/utils/multi_partitioning_utils.c +++ b/src/backend/distributed/utils/multi_partitioning_utils.c @@ -197,6 +197,26 @@ PartitionList(Oid parentRelationId) } +/* + * Wrapper around get_partition_parent + * * + * Note: Because this function assumes that the relation whose OID is passed + * as an argument will have precisely one parent, it should only be called + * when it is known that the relation is a partition. + */ +Oid +PartitionParentOid(Oid partitionOid) +{ + Oid partitionParentOid = InvalidOid; + +#if (PG_VERSION_NUM >= 100000) + partitionParentOid = get_partition_parent(partitionOid); +#endif + + return partitionParentOid; +} + + /* * GenerateDetachPartitionCommand gets a partition table and returns * "ALTER TABLE parent_table DETACH PARTITION partitionName" command. diff --git a/src/include/distributed/citus_ruleutils.h b/src/include/distributed/citus_ruleutils.h index 213dec2fa..d4a4e5a1c 100644 --- a/src/include/distributed/citus_ruleutils.h +++ b/src/include/distributed/citus_ruleutils.h @@ -34,6 +34,7 @@ extern char * pg_get_serverdef_string(Oid tableRelationId); extern char * pg_get_sequencedef_string(Oid sequenceRelid); extern Form_pg_sequence pg_get_sequencedef(Oid sequenceRelationId); extern char * pg_get_tableschemadef_string(Oid tableRelationId, bool forShardCreation); +extern bool SupportedRelationKind(Relation relation); extern char * pg_get_tablecolumnoptionsdef_string(Oid tableRelationId); extern void deparse_shard_index_statement(IndexStmt *origStmt, Oid distrelid, int64 shardid, StringInfo buffer); diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index 020b64754..106472ce8 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -116,7 +116,8 @@ extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId); extern void CreateReferenceTableShard(Oid distributedTableId); extern bool WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort, int shardIndex, uint64 shardId, char *newShardOwner, - List *ddlCommandList, List *foreignConstraintCommadList); + List *ddlCommandList, List *foreignConstraintCommadList, + char *alterTableAttachPartitionCommand); extern Oid ForeignConstraintGetReferencedTableId(char *queryString); extern void CheckHashPartitionedTable(Oid distributedTableId); extern void CheckTableSchemaNameForDrop(Oid relationId, char **schemaName, diff --git a/src/include/distributed/multi_partitioning_utils.h b/src/include/distributed/multi_partitioning_utils.h index 4dc5f0616..5b95de830 100644 --- a/src/include/distributed/multi_partitioning_utils.h +++ b/src/include/distributed/multi_partitioning_utils.h @@ -16,6 +16,7 @@ extern bool PartitionTable(Oid relationId); extern bool IsChildTable(Oid relationId); extern bool IsParentTable(Oid relationId); extern List * PartitionList(Oid parentRelationId); +extern Oid PartitionParentOid(Oid partitionOid); extern char * GenerateDetachPartitionCommand(Oid partitionTableId); extern char * GenerateAlterTableAttachPartitionCommand(Oid partitionTableId); extern char * GeneratePartitioningInformation(Oid tableId); diff --git a/src/test/regress/expected/multi_partitioning.out b/src/test/regress/expected/multi_partitioning.out new file mode 100644 index 000000000..e64447d8f --- /dev/null +++ b/src/test/regress/expected/multi_partitioning.out @@ -0,0 +1,86 @@ +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1660000; +CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time); + +-- create its partitions +CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01'); +CREATE TABLE partitioning_test_2010 PARTITION OF partitioning_test FOR VALUES FROM ('2010-01-01') TO ('2011-01-01'); +-- load some data and distribute tables +INSERT INTO partitioning_test VALUES (1, '2009-06-06'); +INSERT INTO partitioning_test VALUES (2, '2010-07-07'); +INSERT INTO partitioning_test_2009 VALUES (3, '2009-09-09'); +INSERT INTO partitioning_test_2010 VALUES (4, '2010-03-03'); +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 1; +-- this should error out given that parent of the partition is not distributed +SELECT create_distributed_table('partitioning_test_2010', 'id'); +ERROR: cannot distributed relation "partitioning_test_2010" which is partition of "partitioning_test" +DETAIL: Citus does not support partitioning among local tables and distributed tables +HINT: First distribute the partitioned table "partitioning_test" +-- this should suceed +SELECT create_distributed_table('partitioning_test', 'id'); +NOTICE: Copying data from local table... +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +-- check the data +SELECT * FROM partitioning_test ORDER BY 1; + id | time +----+------------ + 1 | 06-06-2009 + 2 | 07-07-2010 + 3 | 09-09-2009 + 4 | 03-03-2010 +(4 rows) + +-- check the metadata +SELECT + * +FROM + pg_dist_partition +WHERE + logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') +ORDER BY 1; + logicalrelid | partmethod | partkey | colocationid | repmodel +------------------------+------------+------------------------------------------------------------------------------------------------------------------------+--------------+---------- + partitioning_test | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 2 | c + partitioning_test_2009 | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 2 | c + partitioning_test_2010 | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 2 | c +(3 rows) + +SELECT + logicalrelid, count(*) +FROM pg_dist_shard + WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') +GROUP BY + logicalrelid +ORDER BY + 1,2; + logicalrelid | count +------------------------+------- + partitioning_test | 4 + partitioning_test_2009 | 4 + partitioning_test_2010 | 4 +(3 rows) + +SELECT + nodename, nodeport, count(*) +FROM + pg_dist_shard_placement +WHERE + shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') ) +GROUP BY + nodename, nodeport +ORDER BY + 1,2,3; + nodename | nodeport | count +-----------+----------+------- + localhost | 57637 | 6 + localhost | 57638 | 6 +(2 rows) + +-- dropping the parent should CASCADE to the children as well +DROP TABLE partitioning_test; +\d+ partitioning_test* diff --git a/src/test/regress/expected/multi_partitioning_0.out b/src/test/regress/expected/multi_partitioning_0.out new file mode 100644 index 000000000..823fffc6f --- /dev/null +++ b/src/test/regress/expected/multi_partitioning_0.out @@ -0,0 +1,88 @@ +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1660000; +CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time); +ERROR: syntax error at or near "PARTITION" +LINE 1: CREATE TABLE partitioning_test(id int, time date) PARTITION ... + ^ + +-- create its partitions +CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01'); +ERROR: syntax error at or near "PARTITION" +LINE 1: CREATE TABLE partitioning_test_2009 PARTITION OF partitionin... + ^ +CREATE TABLE partitioning_test_2010 PARTITION OF partitioning_test FOR VALUES FROM ('2010-01-01') TO ('2011-01-01'); +ERROR: syntax error at or near "PARTITION" +LINE 1: CREATE TABLE partitioning_test_2010 PARTITION OF partitionin... + ^ +-- load some data and distribute tables +INSERT INTO partitioning_test VALUES (1, '2009-06-06'); +ERROR: relation "partitioning_test" does not exist +LINE 1: INSERT INTO partitioning_test VALUES (1, '2009-06-06'); + ^ +INSERT INTO partitioning_test VALUES (2, '2010-07-07'); +ERROR: relation "partitioning_test" does not exist +LINE 1: INSERT INTO partitioning_test VALUES (2, '2010-07-07'); + ^ +INSERT INTO partitioning_test_2009 VALUES (3, '2009-09-09'); +ERROR: relation "partitioning_test_2009" does not exist +LINE 1: INSERT INTO partitioning_test_2009 VALUES (3, '2009-09-09'); + ^ +INSERT INTO partitioning_test_2010 VALUES (4, '2010-03-03'); +ERROR: relation "partitioning_test_2010" does not exist +LINE 1: INSERT INTO partitioning_test_2010 VALUES (4, '2010-03-03'); + ^ +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 1; +-- this should error out given that parent of the partition is not distributed +SELECT create_distributed_table('partitioning_test_2010', 'id'); +ERROR: relation "partitioning_test_2010" does not exist +LINE 1: SELECT create_distributed_table('partitioning_test_2010', 'i... + ^ +-- this should suceed +SELECT create_distributed_table('partitioning_test', 'id'); +ERROR: relation "partitioning_test" does not exist +LINE 1: SELECT create_distributed_table('partitioning_test', 'id'); + ^ +-- check the data +SELECT * FROM partitioning_test ORDER BY 1; +ERROR: relation "partitioning_test" does not exist +LINE 1: SELECT * FROM partitioning_test ORDER BY 1; + ^ +-- check the metadata +SELECT + * +FROM + pg_dist_partition +WHERE + logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') +ORDER BY 1; +ERROR: relation "partitioning_test" does not exist +LINE 6: logicalrelid IN ('partitioning_test', 'partitioning_test_20... + ^ +SELECT + logicalrelid, count(*) +FROM pg_dist_shard + WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') +GROUP BY + logicalrelid +ORDER BY + 1,2; +ERROR: relation "partitioning_test" does not exist +LINE 4: WHERE logicalrelid IN ('partitioning_test', 'partitioning_t... + ^ +SELECT + nodename, nodeport, count(*) +FROM + pg_dist_shard_placement +WHERE + shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') ) +GROUP BY + nodename, nodeport +ORDER BY + 1,2,3; +ERROR: relation "partitioning_test" does not exist +LINE 6: ...shardid FROM pg_dist_shard WHERE logicalrelid IN ('partition... + ^ +-- dropping the parent should CASCADE to the children as well +DROP TABLE partitioning_test; +ERROR: table "partitioning_test" does not exist +\d+ partitioning_test* diff --git a/src/test/regress/expected/multi_reference_table.out b/src/test/regress/expected/multi_reference_table.out index e54e4428e..0eb5f5af6 100644 --- a/src/test/regress/expected/multi_reference_table.out +++ b/src/test/regress/expected/multi_reference_table.out @@ -19,7 +19,7 @@ WHERE logicalrelid = 'reference_table_test'::regclass; partmethod | partkeyisnull | colocationid | repmodel ------------+---------------+--------------+---------- - n | t | 1 | t + n | t | 3 | t (1 row) -- now see that shard min/max values are NULL diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 8e0e413c1..4e00b5f95 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -25,7 +25,7 @@ test: multi_metadata_access # --- # Tests for partitioning support # --- -test: multi_partitioning_utils +test: multi_partitioning_utils multi_partitioning # ---------- # The following distributed tests depend on creating a partitioned table and diff --git a/src/test/regress/sql/multi_partitioning.sql b/src/test/regress/sql/multi_partitioning.sql new file mode 100644 index 000000000..18d9b5cd5 --- /dev/null +++ b/src/test/regress/sql/multi_partitioning.sql @@ -0,0 +1,60 @@ +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1660000; + +CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time); + +-- create its partitions +CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01'); +CREATE TABLE partitioning_test_2010 PARTITION OF partitioning_test FOR VALUES FROM ('2010-01-01') TO ('2011-01-01'); + +-- load some data and distribute tables +INSERT INTO partitioning_test VALUES (1, '2009-06-06'); +INSERT INTO partitioning_test VALUES (2, '2010-07-07'); + +INSERT INTO partitioning_test_2009 VALUES (3, '2009-09-09'); +INSERT INTO partitioning_test_2010 VALUES (4, '2010-03-03'); + +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 1; + +-- this should error out given that parent of the partition is not distributed +SELECT create_distributed_table('partitioning_test_2010', 'id'); + +-- this should suceed +SELECT create_distributed_table('partitioning_test', 'id'); + +-- check the data +SELECT * FROM partitioning_test ORDER BY 1; + +-- check the metadata +SELECT + * +FROM + pg_dist_partition +WHERE + logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') +ORDER BY 1; + +SELECT + logicalrelid, count(*) +FROM pg_dist_shard + WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') +GROUP BY + logicalrelid +ORDER BY + 1,2; + +SELECT + nodename, nodeport, count(*) +FROM + pg_dist_shard_placement +WHERE + shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') ) +GROUP BY + nodename, nodeport +ORDER BY + 1,2,3; + +-- dropping the parent should CASCADE to the children as well +DROP TABLE partitioning_test; + +\d+ partitioning_test*