diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 3e3c4ca56..76ea64958 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -743,14 +743,6 @@ EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn, "for hash-distributed tables"))); } - /* we currently don't support partitioned tables for replication factor > 1 */ - if (ShardReplicationFactor > 1) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("distributing partitioned tables with replication " - "factor greater than 1 is not supported"))); - } - /* we don't support distributing tables with multi-level partitioning */ if (PartitionTable(relationId)) { diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 05bcf8432..c91db46f5 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -64,6 +64,7 @@ #include "distributed/multi_copy.h" #include "distributed/multi_partitioning_utils.h" #include "distributed/multi_physical_planner.h" +#include "distributed/multi_router_planner.h" #include "distributed/multi_shard_transaction.h" #include "distributed/placement_connection.h" #include "distributed/relation_access_tracking.h" @@ -199,6 +200,9 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) Oid relationId = RangeVarGetRelid(copyStatement->relation, NoLock, false); char partitionMethod = PartitionMethod(relationId); + /* disallow modifications to a partition table which have rep. factpr > 1 */ + EnsurePartitionTableNotReplicated(relationId); + if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == DISTRIBUTE_BY_RANGE || partitionMethod == DISTRIBUTE_BY_NONE) { diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index 91f02c3a6..acbfb3192 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -3324,6 +3324,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) bool shouldSyncMetadata = ShouldSyncTableMetadata(ddlJob->targetRelationId); EnsureCoordinator(); + EnsurePartitionTableNotReplicated(ddlJob->targetRelationId); if (!ddlJob->concurrentIndexCmd) { diff --git a/src/backend/distributed/master/master_modify_multiple_shards.c b/src/backend/distributed/master/master_modify_multiple_shards.c index 162bfb0a6..f71e3b1d9 100644 --- a/src/backend/distributed/master/master_modify_multiple_shards.c +++ b/src/backend/distributed/master/master_modify_multiple_shards.c @@ -37,6 +37,7 @@ #include "distributed/multi_router_planner.h" #include "distributed/multi_server_executor.h" #include "distributed/multi_shard_transaction.h" +#include "distributed/distributed_planner.h" #include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_partition.h" #include "distributed/resource_lock.h" @@ -142,6 +143,7 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) rangeVar->schemaname = schemaName; } + EnsurePartitionTableNotReplicated(relationId); EnsureTablePermissions(relationId, ACL_TRUNCATE); if (ShouldExecuteTruncateStmtSequential(truncateStatement)) diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 125c0236e..6ef0f5824 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -28,6 +28,7 @@ #include "distributed/multi_master_planner.h" #include "distributed/multi_router_planner.h" #include "distributed/recursive_planning.h" +#include "distributed/shardinterval_utils.h" #include "distributed/worker_shard_visibility.h" #include "executor/executor.h" #include "nodes/makefuncs.h" @@ -61,6 +62,8 @@ static DistributedPlan * CreateDistributedPlan(uint64 planId, Query *originalQue bool hasUnresolvedParams, PlannerRestrictionContext * plannerRestrictionContext); +static DeferredErrorMessage * DeferErrorIfPartitionTableNotSingleReplicated(Oid + relationId); static Node * ResolveExternalParams(Node *inputNode, ParamListInfo boundParams); static void AssignRTEIdentities(Query *queryTree); @@ -584,8 +587,12 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi if (IsModifyCommand(originalQuery)) { + Oid targetRelationId = InvalidOid; EnsureModificationsCanRun(); + targetRelationId = ModifyQueryResultRelationId(query); + EnsurePartitionTableNotReplicated(targetRelationId); + if (InsertSelectIntoDistributedTable(originalQuery)) { distributedPlan = @@ -764,6 +771,53 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi } +/* + * EnsurePartitionTableNotReplicated errors out if the infput relation is + * a partition table and the table has a replication factor greater than + * one. + * + * If the table is not a partition or replication factor is 1, the function + * becomes a no-op. + */ +void +EnsurePartitionTableNotReplicated(Oid relationId) +{ + DeferredErrorMessage *deferredError = + DeferErrorIfPartitionTableNotSingleReplicated(relationId); + if (deferredError != NULL) + { + RaiseDeferredError(deferredError, ERROR); + } +} + + +/* + * DeferErrorIfPartitionTableNotSingleReplicated defers error if the input relation + * is a partition table with replication factor > 1. Otherwise, the function returns + * NULL. + */ +static DeferredErrorMessage * +DeferErrorIfPartitionTableNotSingleReplicated(Oid relationId) +{ + if (PartitionTableNoLock(relationId) && !SingleReplicatedTable(relationId)) + { + Oid parentOid = PartitionParentOid(relationId); + char *parentRelationTest = get_rel_name(parentOid); + StringInfo errorHint = makeStringInfo(); + + appendStringInfo(errorHint, "Run the query on the parent table " + "\"%s\" instead.", parentRelationTest); + + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "modifications on partitions when replication " + "factor is greater than 1 is not supported", + NULL, errorHint->data); + } + + return NULL; +} + + /* * ResolveExternalParams replaces the external parameters that appears * in the query with the corresponding entries in the boundParams. diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 4269a2e24..a48d02fcf 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -33,6 +33,7 @@ #include "distributed/multi_join_order.h" #include "distributed/multi_logical_planner.h" #include "distributed/multi_logical_optimizer.h" +#include "distributed/multi_partitioning_utils.h" #include "distributed/multi_physical_planner.h" #include "distributed/multi_router_planner.h" #include "distributed/multi_server_executor.h" @@ -472,6 +473,33 @@ ExtractSelectRangeTableEntry(Query *query) } +/* + * ModifyQueryResultRelationId returns the result relation's Oid + * for the given modification query. + * + * The function errors out if the input query is not a + * modify query (e.g., INSERT, UPDATE or DELETE). So, this + * function is not expected to be called on SELECT queries. + */ +Oid +ModifyQueryResultRelationId(Query *query) +{ + RangeTblEntry *resultRte = NULL; + + /* only modify queries have result relations */ + if (!IsModifyCommand(query)) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("input query is not a modification query"))); + } + + resultRte = ExtractInsertRangeTableEntry(query); + Assert(OidIsValid(resultRte->relid)); + + return resultRte->relid; +} + + /* * ExtractInsertRangeTableEntry returns the INSERT'ed table's range table entry. * Note that the function expects and asserts that the input query be @@ -588,7 +616,9 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer if (rangeTableEntry->rtekind == RTE_RELATION) { - if (!IsDistributedTable(rangeTableEntry->relid)) + Oid relationId = rangeTableEntry->relid; + + if (!IsDistributedTable(relationId)) { StringInfo errorMessage = makeStringInfo(); char *relationName = get_rel_name(rangeTableEntry->relid); diff --git a/src/backend/distributed/utils/shardinterval_utils.c b/src/backend/distributed/utils/shardinterval_utils.c index a895b994b..d9fbd40ef 100644 --- a/src/backend/distributed/utils/shardinterval_utils.c +++ b/src/backend/distributed/utils/shardinterval_utils.c @@ -414,19 +414,21 @@ SearchCachedShardInterval(Datum partitionColumnValue, ShardInterval **shardInter bool SingleReplicatedTable(Oid relationId) { - List *shardIntervalList = LoadShardList(relationId); - ListCell *shardIntervalCell = NULL; + List *shardList = LoadShardList(relationId); + List *shardPlacementList = NIL; + Oid shardId = INVALID_SHARD_ID; - foreach(shardIntervalCell, shardIntervalList) + if (list_length(shardList) <= 1) { - uint64 *shardIdPointer = (uint64 *) lfirst(shardIntervalCell); - uint64 shardId = (*shardIdPointer); - List *shardPlacementList = ShardPlacementList(shardId); + return false; + } - if (list_length(shardPlacementList) != 1) - { - return false; - } + /* checking only for the first shard id should suffice */ + shardId = (*(uint64 *) linitial(shardList)); + shardPlacementList = ShardPlacementList(shardId); + if (list_length(shardPlacementList) != 1) + { + return false; } return true; diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index f18934e28..03b27b8f5 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -96,6 +96,7 @@ extern void multi_join_restriction_hook(PlannerInfo *root, extern bool IsModifyCommand(Query *query); extern bool IsUpdateOrDelete(struct DistributedPlan *distributedPlan); extern bool IsModifyDistributedPlan(struct DistributedPlan *distributedPlan); +extern void EnsurePartitionTableNotReplicated(Oid relationId); extern bool IsMultiTaskPlan(struct DistributedPlan *distributedPlan); extern bool IsMultiShardModifyPlan(struct DistributedPlan *distributedPlan); extern RangeTblEntry * RemoteScanRangeTableEntry(List *columnNameList); diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index ff22d1335..833c0ce2f 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -59,6 +59,7 @@ extern RelationRestrictionContext * CopyRelationRestrictionContext( extern Oid ExtractFirstDistributedTableId(Query *query); extern RangeTblEntry * ExtractSelectRangeTableEntry(Query *query); +extern Oid ModifyQueryResultRelationId(Query *query); extern RangeTblEntry * ExtractInsertRangeTableEntry(Query *query); extern RangeTblEntry * ExtractDistributedInsertValuesRTE(Query *query); extern bool IsMultiRowInsert(Query *query); diff --git a/src/test/regress/expected/multi_partitioning.out b/src/test/regress/expected/multi_partitioning.out index 450b5eadb..9c90376a8 100644 --- a/src/test/regress/expected/multi_partitioning.out +++ b/src/test/regress/expected/multi_partitioning.out @@ -272,10 +272,6 @@ SELECT create_distributed_table('partitioning_test_failure', 'id', 'range'); ERROR: distributing partitioned tables in only supported for hash-distributed tables SELECT create_reference_table('partitioning_test_failure'); ERROR: distributing partitioned tables in only supported for hash-distributed tables --- replication factor > 1 is not allowed in distributed partitioned tables -SET citus.shard_replication_factor TO 2; -SELECT create_distributed_table('partitioning_test_failure', 'id'); -ERROR: distributing partitioned tables with replication factor greater than 1 is not supported SET citus.shard_replication_factor TO 1; -- non-distributed tables cannot have distributed partitions; DROP TABLE partitioning_test_failure_2009; diff --git a/src/test/regress/expected/multi_partitioning_0.out b/src/test/regress/expected/multi_partitioning_0.out index f54e3552f..b73fc52a7 100644 --- a/src/test/regress/expected/multi_partitioning_0.out +++ b/src/test/regress/expected/multi_partitioning_0.out @@ -272,10 +272,6 @@ SELECT create_distributed_table('partitioning_test_failure', 'id', 'range'); ERROR: distributing partitioned tables in only supported for hash-distributed tables SELECT create_reference_table('partitioning_test_failure'); ERROR: distributing partitioned tables in only supported for hash-distributed tables --- replication factor > 1 is not allowed in distributed partitioned tables -SET citus.shard_replication_factor TO 2; -SELECT create_distributed_table('partitioning_test_failure', 'id'); -ERROR: distributing partitioned tables with replication factor greater than 1 is not supported SET citus.shard_replication_factor TO 1; -- non-distributed tables cannot have distributed partitions; DROP TABLE partitioning_test_failure_2009; diff --git a/src/test/regress/expected/multi_partitioning_1.out b/src/test/regress/expected/multi_partitioning_1.out index 354087d8a..2d0fb4e5c 100644 --- a/src/test/regress/expected/multi_partitioning_1.out +++ b/src/test/regress/expected/multi_partitioning_1.out @@ -270,12 +270,6 @@ SELECT create_reference_table('partitioning_test_failure'); ERROR: relation "partitioning_test_failure" does not exist LINE 1: SELECT create_reference_table('partitioning_test_failure'); ^ --- replication factor > 1 is not allowed in distributed partitioned tables -SET citus.shard_replication_factor TO 2; -SELECT create_distributed_table('partitioning_test_failure', 'id'); -ERROR: relation "partitioning_test_failure" does not exist -LINE 1: SELECT create_distributed_table('partitioning_test_failure',... - ^ SET citus.shard_replication_factor TO 1; -- non-distributed tables cannot have distributed partitions; DROP TABLE partitioning_test_failure_2009; diff --git a/src/test/regress/expected/replicated_partitioned_table.out b/src/test/regress/expected/replicated_partitioned_table.out new file mode 100644 index 000000000..1cdc25c6a --- /dev/null +++ b/src/test/regress/expected/replicated_partitioned_table.out @@ -0,0 +1,210 @@ +-- +-- Distributed Partitioned Table Tests +-- +SET citus.next_shard_id TO 1760000; +CREATE SCHEMA partitioned_table_replicated; +SET search_path TO partitioned_table_replicated; +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 2; +-- print major version number for version-specific tests +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int AS server_version; + server_version +---------------- + 11 +(1 row) + +CREATE TABLE collections ( + key bigint, + ts timestamptz, + collection_id integer, + value numeric +) PARTITION BY LIST ( collection_id ); +CREATE TABLE collections_1 + PARTITION OF collections (key, ts, collection_id, value) + FOR VALUES IN ( 1 ); +CREATE TABLE collections_2 + PARTITION OF collections (key, ts, collection_id, value) + FOR VALUES IN ( 2 ); +-- load some data data +INSERT INTO collections (key, ts, collection_id, value) VALUES (1, '2009-01-01', 1, 1); +INSERT INTO collections (key, ts, collection_id, value) VALUES (2, '2009-01-01', 1, 2); +INSERT INTO collections (key, ts, collection_id, value) VALUES (3, '2009-01-01', 2, 1); +INSERT INTO collections (key, ts, collection_id, value) VALUES (4, '2009-01-01', 2, 2); +-- in the first case, we'll distributed the +-- already existing partitioninong hierarcy +SELECT create_distributed_table('collections', 'key'); +NOTICE: Copying data from local table... +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +-- now create partition of a already distributed table +CREATE TABLE collections_3 PARTITION OF collections FOR VALUES IN ( 3 ); +-- now attaching non distributed table to a distributed table +CREATE TABLE collections_4 AS SELECT * FROM collections LIMIT 0; +-- load some data +INSERT INTO collections_4 SELECT i, '2009-01-01', 4, i FROM generate_series (0, 10) i; +ALTER TABLE collections ATTACH PARTITION collections_4 FOR VALUES IN ( 4 ); +NOTICE: Copying data from local table... +-- finally attach a distributed table to a distributed table +CREATE TABLE collections_5 AS SELECT * FROM collections LIMIT 0; +SELECT create_distributed_table('collections_5', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +-- load some data +INSERT INTO collections_5 SELECT i, '2009-01-01', 5, i FROM generate_series (0, 10) i; +ALTER TABLE collections ATTACH PARTITION collections_5 FOR VALUES IN ( 5 ); +-- make sure that we've all the placements +SELECT + logicalrelid, count(*) as placement_count +FROM + pg_dist_shard, pg_dist_shard_placement +WHERE + logicalrelid::text LIKE '%collections%' AND + pg_dist_shard.shardid = pg_dist_shard_placement.shardid +GROUP BY + logicalrelid +ORDER BY + 1,2; + logicalrelid | placement_count +---------------+----------------- + collections | 8 + collections_1 | 8 + collections_2 | 8 + collections_3 | 8 + collections_4 | 8 + collections_5 | 8 +(6 rows) + +-- and, make sure that all tables are colocated +SELECT + count(DISTINCT colocationid) +FROM + pg_dist_partition +WHERE + logicalrelid::text LIKE '%collections%'; + count +------- + 1 +(1 row) + +-- make sure that any kind of modification is disallowed on partitions +-- given that replication factor > 1 +INSERT INTO collections_4 (key, ts, collection_id, value) VALUES (4, '2009-01-01', 2, 2); +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- single shard update/delete not allowed +UPDATE collections_1 SET ts = now() WHERE key = 1; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +DELETE FROM collections_1 WHERE ts = now() AND key = 1; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- multi shard update/delete are not allowed +UPDATE collections_1 SET ts = now(); +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +DELETE FROM collections_1 WHERE ts = now(); +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- insert..select pushdown +INSERT INTO collections_1 SELECT * FROM collections_1; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- insert..select via coordinator +INSERT INTO collections_1 SELECT * FROM collections_1 OFFSET 0; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- COPY is not allowed +COPY collections_1 FROM STDIN; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +\. +invalid command \. +-- DDLs are not allowed +CREATE INDEX index_on_partition ON collections_1(key); +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- EXPLAIN with modifications is not allowed as well +UPDATE collections_1 SET ts = now() WHERE key = 1; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- TRUNCATE is also not allowed +TRUNCATE collections_1; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +TRUNCATE collections, collections_1; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- modifying CTEs are also not allowed +WITH collections_5_cte AS +( + DELETE FROM collections_5 RETURNING * +) +SELECT * FROM collections_5_cte; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- foreign key creation is disallowed due to replication factor > 1 +CREATE TABLE fkey_test (key bigint PRIMARY KEY); +SELECT create_distributed_table('fkey_test', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +ALTER TABLE + collections_5 +ADD CONSTRAINT + fkey_delete FOREIGN KEY(key) +REFERENCES + fkey_test(key) ON DELETE CASCADE; +ERROR: cannot create foreign key constraint +DETAIL: Citus Community Edition currently supports foreign key constraints only for "citus.shard_replication_factor = 1". +HINT: Please change "citus.shard_replication_factor to 1". To learn more about using foreign keys with other replication factors, please contact us at https://citusdata.com/about/contact_us. +-- we should be able to attach and detach partitions +-- given that those DDLs are on the parent table +CREATE TABLE collections_6 + PARTITION OF collections (key, ts, collection_id, value) + FOR VALUES IN ( 6 ); +ALTER TABLE collections DETACH PARTITION collections_6; +ALTER TABLE collections ATTACH PARTITION collections_6 FOR VALUES IN ( 6 ); +-- read queries works just fine +SELECT count(*) FROM collections_1 WHERE key = 1; + count +------- + 1 +(1 row) + +SELECT count(*) FROM collections_1 WHERE key != 1; + count +------- + 1 +(1 row) + +-- rollups SELECT'ing from partitions should work just fine +CREATE TABLE collections_agg ( + key bigint, + sum_value numeric +); +SELECT create_distributed_table('collections_agg', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +-- pushdown roll-up +INSERT INTO collections_agg SELECT key, sum(key) FROM collections_1 GROUP BY key; +-- coordinator roll-up +INSERT INTO collections_agg SELECT collection_id, sum(key) FROM collections_1 GROUP BY collection_id; +SET search_path TO public; +DROP SCHEMA partitioned_table_replicated CASCADE; +NOTICE: drop cascades to 3 other objects +DETAIL: drop cascades to table partitioned_table_replicated.collections +drop cascades to table partitioned_table_replicated.fkey_test +drop cascades to table partitioned_table_replicated.collections_agg diff --git a/src/test/regress/expected/replicated_partitioned_table_0.out b/src/test/regress/expected/replicated_partitioned_table_0.out new file mode 100644 index 000000000..31e080bae --- /dev/null +++ b/src/test/regress/expected/replicated_partitioned_table_0.out @@ -0,0 +1,210 @@ +-- +-- Distributed Partitioned Table Tests +-- +SET citus.next_shard_id TO 1760000; +CREATE SCHEMA partitioned_table_replicated; +SET search_path TO partitioned_table_replicated; +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 2; +-- print major version number for version-specific tests +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int AS server_version; + server_version +---------------- + 10 +(1 row) + +CREATE TABLE collections ( + key bigint, + ts timestamptz, + collection_id integer, + value numeric +) PARTITION BY LIST ( collection_id ); +CREATE TABLE collections_1 + PARTITION OF collections (key, ts, collection_id, value) + FOR VALUES IN ( 1 ); +CREATE TABLE collections_2 + PARTITION OF collections (key, ts, collection_id, value) + FOR VALUES IN ( 2 ); +-- load some data data +INSERT INTO collections (key, ts, collection_id, value) VALUES (1, '2009-01-01', 1, 1); +INSERT INTO collections (key, ts, collection_id, value) VALUES (2, '2009-01-01', 1, 2); +INSERT INTO collections (key, ts, collection_id, value) VALUES (3, '2009-01-01', 2, 1); +INSERT INTO collections (key, ts, collection_id, value) VALUES (4, '2009-01-01', 2, 2); +-- in the first case, we'll distributed the +-- already existing partitioninong hierarcy +SELECT create_distributed_table('collections', 'key'); +NOTICE: Copying data from local table... +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +-- now create partition of a already distributed table +CREATE TABLE collections_3 PARTITION OF collections FOR VALUES IN ( 3 ); +-- now attaching non distributed table to a distributed table +CREATE TABLE collections_4 AS SELECT * FROM collections LIMIT 0; +-- load some data +INSERT INTO collections_4 SELECT i, '2009-01-01', 4, i FROM generate_series (0, 10) i; +ALTER TABLE collections ATTACH PARTITION collections_4 FOR VALUES IN ( 4 ); +NOTICE: Copying data from local table... +-- finally attach a distributed table to a distributed table +CREATE TABLE collections_5 AS SELECT * FROM collections LIMIT 0; +SELECT create_distributed_table('collections_5', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +-- load some data +INSERT INTO collections_5 SELECT i, '2009-01-01', 5, i FROM generate_series (0, 10) i; +ALTER TABLE collections ATTACH PARTITION collections_5 FOR VALUES IN ( 5 ); +-- make sure that we've all the placements +SELECT + logicalrelid, count(*) as placement_count +FROM + pg_dist_shard, pg_dist_shard_placement +WHERE + logicalrelid::text LIKE '%collections%' AND + pg_dist_shard.shardid = pg_dist_shard_placement.shardid +GROUP BY + logicalrelid +ORDER BY + 1,2; + logicalrelid | placement_count +---------------+----------------- + collections | 8 + collections_1 | 8 + collections_2 | 8 + collections_3 | 8 + collections_4 | 8 + collections_5 | 8 +(6 rows) + +-- and, make sure that all tables are colocated +SELECT + count(DISTINCT colocationid) +FROM + pg_dist_partition +WHERE + logicalrelid::text LIKE '%collections%'; + count +------- + 1 +(1 row) + +-- make sure that any kind of modification is disallowed on partitions +-- given that replication factor > 1 +INSERT INTO collections_4 (key, ts, collection_id, value) VALUES (4, '2009-01-01', 2, 2); +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- single shard update/delete not allowed +UPDATE collections_1 SET ts = now() WHERE key = 1; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +DELETE FROM collections_1 WHERE ts = now() AND key = 1; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- multi shard update/delete are not allowed +UPDATE collections_1 SET ts = now(); +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +DELETE FROM collections_1 WHERE ts = now(); +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- insert..select pushdown +INSERT INTO collections_1 SELECT * FROM collections_1; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- insert..select via coordinator +INSERT INTO collections_1 SELECT * FROM collections_1 OFFSET 0; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- COPY is not allowed +COPY collections_1 FROM STDIN; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +\. +invalid command \. +-- DDLs are not allowed +CREATE INDEX index_on_partition ON collections_1(key); +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- EXPLAIN with modifications is not allowed as well +UPDATE collections_1 SET ts = now() WHERE key = 1; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- TRUNCATE is also not allowed +TRUNCATE collections_1; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +TRUNCATE collections, collections_1; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- modifying CTEs are also not allowed +WITH collections_5_cte AS +( + DELETE FROM collections_5 RETURNING * +) +SELECT * FROM collections_5_cte; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- foreign key creation is disallowed due to replication factor > 1 +CREATE TABLE fkey_test (key bigint PRIMARY KEY); +SELECT create_distributed_table('fkey_test', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +ALTER TABLE + collections_5 +ADD CONSTRAINT + fkey_delete FOREIGN KEY(key) +REFERENCES + fkey_test(key) ON DELETE CASCADE; +ERROR: cannot create foreign key constraint +DETAIL: Citus Community Edition currently supports foreign key constraints only for "citus.shard_replication_factor = 1". +HINT: Please change "citus.shard_replication_factor to 1". To learn more about using foreign keys with other replication factors, please contact us at https://citusdata.com/about/contact_us. +-- we should be able to attach and detach partitions +-- given that those DDLs are on the parent table +CREATE TABLE collections_6 + PARTITION OF collections (key, ts, collection_id, value) + FOR VALUES IN ( 6 ); +ALTER TABLE collections DETACH PARTITION collections_6; +ALTER TABLE collections ATTACH PARTITION collections_6 FOR VALUES IN ( 6 ); +-- read queries works just fine +SELECT count(*) FROM collections_1 WHERE key = 1; + count +------- + 1 +(1 row) + +SELECT count(*) FROM collections_1 WHERE key != 1; + count +------- + 1 +(1 row) + +-- rollups SELECT'ing from partitions should work just fine +CREATE TABLE collections_agg ( + key bigint, + sum_value numeric +); +SELECT create_distributed_table('collections_agg', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +-- pushdown roll-up +INSERT INTO collections_agg SELECT key, sum(key) FROM collections_1 GROUP BY key; +-- coordinator roll-up +INSERT INTO collections_agg SELECT collection_id, sum(key) FROM collections_1 GROUP BY collection_id; +SET search_path TO public; +DROP SCHEMA partitioned_table_replicated CASCADE; +NOTICE: drop cascades to 3 other objects +DETAIL: drop cascades to table partitioned_table_replicated.collections +drop cascades to table partitioned_table_replicated.fkey_test +drop cascades to table partitioned_table_replicated.collections_agg diff --git a/src/test/regress/expected/replicated_partitioned_table_1.out b/src/test/regress/expected/replicated_partitioned_table_1.out new file mode 100644 index 000000000..c7ffc500b --- /dev/null +++ b/src/test/regress/expected/replicated_partitioned_table_1.out @@ -0,0 +1,252 @@ +-- +-- Distributed Partitioned Table Tests +-- +SET citus.next_shard_id TO 1760000; +CREATE SCHEMA partitioned_table_replicated; +SET search_path TO partitioned_table_replicated; +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 2; +-- print major version number for version-specific tests +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int AS server_version; + server_version +---------------- + 9 +(1 row) + +CREATE TABLE collections ( + key bigint, + ts timestamptz, + collection_id integer, + value numeric +) PARTITION BY LIST ( collection_id ); +ERROR: syntax error at or near "PARTITION" +LINE 6: ) PARTITION BY LIST ( collection_id ); + ^ +CREATE TABLE collections_1 + PARTITION OF collections (key, ts, collection_id, value) + FOR VALUES IN ( 1 ); +ERROR: syntax error at or near "PARTITION" +LINE 2: PARTITION OF collections (key, ts, collection_id, value) + ^ +CREATE TABLE collections_2 + PARTITION OF collections (key, ts, collection_id, value) + FOR VALUES IN ( 2 ); +ERROR: syntax error at or near "PARTITION" +LINE 2: PARTITION OF collections (key, ts, collection_id, value) + ^ +-- load some data data +INSERT INTO collections (key, ts, collection_id, value) VALUES (1, '2009-01-01', 1, 1); +ERROR: relation "collections" does not exist +LINE 1: INSERT INTO collections (key, ts, collection_id, value) VALU... + ^ +INSERT INTO collections (key, ts, collection_id, value) VALUES (2, '2009-01-01', 1, 2); +ERROR: relation "collections" does not exist +LINE 1: INSERT INTO collections (key, ts, collection_id, value) VALU... + ^ +INSERT INTO collections (key, ts, collection_id, value) VALUES (3, '2009-01-01', 2, 1); +ERROR: relation "collections" does not exist +LINE 1: INSERT INTO collections (key, ts, collection_id, value) VALU... + ^ +INSERT INTO collections (key, ts, collection_id, value) VALUES (4, '2009-01-01', 2, 2); +ERROR: relation "collections" does not exist +LINE 1: INSERT INTO collections (key, ts, collection_id, value) VALU... + ^ +-- in the first case, we'll distributed the +-- already existing partitioninong hierarcy +SELECT create_distributed_table('collections', 'key'); +ERROR: relation "collections" does not exist +LINE 1: SELECT create_distributed_table('collections', 'key'); + ^ +-- now create partition of a already distributed table +CREATE TABLE collections_3 PARTITION OF collections FOR VALUES IN ( 3 ); +ERROR: syntax error at or near "PARTITION" +LINE 1: CREATE TABLE collections_3 PARTITION OF collections FOR VALU... + ^ +-- now attaching non distributed table to a distributed table +CREATE TABLE collections_4 AS SELECT * FROM collections LIMIT 0; +ERROR: relation "collections" does not exist +LINE 1: CREATE TABLE collections_4 AS SELECT * FROM collections LIMI... + ^ +-- load some data +INSERT INTO collections_4 SELECT i, '2009-01-01', 4, i FROM generate_series (0, 10) i; +ERROR: relation "collections_4" does not exist +LINE 1: INSERT INTO collections_4 SELECT i, '2009-01-01', 4, i FROM ... + ^ +ALTER TABLE collections ATTACH PARTITION collections_4 FOR VALUES IN ( 4 ); +ERROR: syntax error at or near "ATTACH" +LINE 1: ALTER TABLE collections ATTACH PARTITION collections_4 FOR V... + ^ +-- finally attach a distributed table to a distributed table +CREATE TABLE collections_5 AS SELECT * FROM collections LIMIT 0; +ERROR: relation "collections" does not exist +LINE 1: CREATE TABLE collections_5 AS SELECT * FROM collections LIMI... + ^ +SELECT create_distributed_table('collections_5', 'key'); +ERROR: relation "collections_5" does not exist +LINE 1: SELECT create_distributed_table('collections_5', 'key'); + ^ +-- load some data +INSERT INTO collections_5 SELECT i, '2009-01-01', 5, i FROM generate_series (0, 10) i; +ERROR: relation "collections_5" does not exist +LINE 1: INSERT INTO collections_5 SELECT i, '2009-01-01', 5, i FROM ... + ^ +ALTER TABLE collections ATTACH PARTITION collections_5 FOR VALUES IN ( 5 ); +ERROR: syntax error at or near "ATTACH" +LINE 1: ALTER TABLE collections ATTACH PARTITION collections_5 FOR V... + ^ +-- make sure that we've all the placements +SELECT + logicalrelid, count(*) as placement_count +FROM + pg_dist_shard, pg_dist_shard_placement +WHERE + logicalrelid::text LIKE '%collections%' AND + pg_dist_shard.shardid = pg_dist_shard_placement.shardid +GROUP BY + logicalrelid +ORDER BY + 1,2; + logicalrelid | placement_count +--------------+----------------- +(0 rows) + +-- and, make sure that all tables are colocated +SELECT + count(DISTINCT colocationid) +FROM + pg_dist_partition +WHERE + logicalrelid::text LIKE '%collections%'; + count +------- + 0 +(1 row) + +-- make sure that any kind of modification is disallowed on partitions +-- given that replication factor > 1 +INSERT INTO collections_4 (key, ts, collection_id, value) VALUES (4, '2009-01-01', 2, 2); +ERROR: relation "collections_4" does not exist +LINE 1: INSERT INTO collections_4 (key, ts, collection_id, value) VA... + ^ +-- single shard update/delete not allowed +UPDATE collections_1 SET ts = now() WHERE key = 1; +ERROR: relation "collections_1" does not exist +LINE 1: UPDATE collections_1 SET ts = now() WHERE key = 1; + ^ +DELETE FROM collections_1 WHERE ts = now() AND key = 1; +ERROR: relation "collections_1" does not exist +LINE 1: DELETE FROM collections_1 WHERE ts = now() AND key = 1; + ^ +-- multi shard update/delete are not allowed +UPDATE collections_1 SET ts = now(); +ERROR: relation "collections_1" does not exist +LINE 1: UPDATE collections_1 SET ts = now(); + ^ +DELETE FROM collections_1 WHERE ts = now(); +ERROR: relation "collections_1" does not exist +LINE 1: DELETE FROM collections_1 WHERE ts = now(); + ^ +-- insert..select pushdown +INSERT INTO collections_1 SELECT * FROM collections_1; +ERROR: relation "collections_1" does not exist +LINE 1: INSERT INTO collections_1 SELECT * FROM collections_1; + ^ +-- insert..select via coordinator +INSERT INTO collections_1 SELECT * FROM collections_1 OFFSET 0; +ERROR: relation "collections_1" does not exist +LINE 1: INSERT INTO collections_1 SELECT * FROM collections_1 OFFSET... + ^ +-- COPY is not allowed +COPY collections_1 FROM STDIN; +ERROR: relation "collections_1" does not exist +\. +invalid command \. +-- DDLs are not allowed +CREATE INDEX index_on_partition ON collections_1(key); +ERROR: relation "collections_1" does not exist +-- EXPLAIN with modifications is not allowed as well +UPDATE collections_1 SET ts = now() WHERE key = 1; +ERROR: relation "collections_1" does not exist +LINE 1: UPDATE collections_1 SET ts = now() WHERE key = 1; + ^ +-- TRUNCATE is also not allowed +TRUNCATE collections_1; +ERROR: relation "collections_1" does not exist +TRUNCATE collections, collections_1; +ERROR: relation "collections" does not exist +-- modifying CTEs are also not allowed +WITH collections_5_cte AS +( + DELETE FROM collections_5 RETURNING * +) +SELECT * FROM collections_5_cte; +ERROR: relation "collections_5" does not exist +LINE 3: DELETE FROM collections_5 RETURNING * + ^ +-- foreign key creation is disallowed due to replication factor > 1 +CREATE TABLE fkey_test (key bigint PRIMARY KEY); +SELECT create_distributed_table('fkey_test', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +ALTER TABLE + collections_5 +ADD CONSTRAINT + fkey_delete FOREIGN KEY(key) +REFERENCES + fkey_test(key) ON DELETE CASCADE; +ERROR: relation "collections_5" does not exist +-- we should be able to attach and detach partitions +-- given that those DDLs are on the parent table +CREATE TABLE collections_6 + PARTITION OF collections (key, ts, collection_id, value) + FOR VALUES IN ( 6 ); +ERROR: syntax error at or near "PARTITION" +LINE 2: PARTITION OF collections (key, ts, collection_id, value) + ^ +ALTER TABLE collections DETACH PARTITION collections_6; +ERROR: syntax error at or near "DETACH" +LINE 1: ALTER TABLE collections DETACH PARTITION collections_6; + ^ +ALTER TABLE collections ATTACH PARTITION collections_6 FOR VALUES IN ( 6 ); +ERROR: syntax error at or near "ATTACH" +LINE 1: ALTER TABLE collections ATTACH PARTITION collections_6 FOR V... + ^ +-- read queries works just fine +SELECT count(*) FROM collections_1 WHERE key = 1; +ERROR: relation "collections_1" does not exist +LINE 1: SELECT count(*) FROM collections_1 WHERE key = 1; + ^ +SELECT count(*) FROM collections_1 WHERE key != 1; +ERROR: relation "collections_1" does not exist +LINE 1: SELECT count(*) FROM collections_1 WHERE key != 1; + ^ +-- rollups SELECT'ing from partitions should work just fine +CREATE TABLE collections_agg ( + key bigint, + sum_value numeric +); +SELECT create_distributed_table('collections_agg', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +-- pushdown roll-up +INSERT INTO collections_agg SELECT key, sum(key) FROM collections_1 GROUP BY key; +ERROR: relation "collections_1" does not exist +LINE 1: ...RT INTO collections_agg SELECT key, sum(key) FROM collection... + ^ +-- coordinator roll-up +INSERT INTO collections_agg SELECT collection_id, sum(key) FROM collections_1 GROUP BY collection_id; +ERROR: relation "collections_1" does not exist +LINE 1: ...llections_agg SELECT collection_id, sum(key) FROM collection... + ^ +SET search_path TO public; +DROP SCHEMA partitioned_table_replicated CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to table partitioned_table_replicated.fkey_test +drop cascades to table partitioned_table_replicated.collections_agg diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 367015ea5..68b3ccb8b 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -37,7 +37,7 @@ test: multi_insert_select_window multi_shard_update_delete window_functions dml_ # ---------- # Tests for partitioning support # ---------- -test: multi_partitioning_utils multi_partitioning +test: multi_partitioning_utils multi_partitioning replicated_partitioned_table # ---------- diff --git a/src/test/regress/sql/multi_partitioning.sql b/src/test/regress/sql/multi_partitioning.sql index adb643932..8dbef8169 100644 --- a/src/test/regress/sql/multi_partitioning.sql +++ b/src/test/regress/sql/multi_partitioning.sql @@ -173,9 +173,6 @@ SELECT create_distributed_table('partitioning_test_failure', 'id', 'append'); SELECT create_distributed_table('partitioning_test_failure', 'id', 'range'); SELECT create_reference_table('partitioning_test_failure'); --- replication factor > 1 is not allowed in distributed partitioned tables -SET citus.shard_replication_factor TO 2; -SELECT create_distributed_table('partitioning_test_failure', 'id'); SET citus.shard_replication_factor TO 1; -- non-distributed tables cannot have distributed partitions; diff --git a/src/test/regress/sql/replicated_partitioned_table.sql b/src/test/regress/sql/replicated_partitioned_table.sql new file mode 100644 index 000000000..46f592c29 --- /dev/null +++ b/src/test/regress/sql/replicated_partitioned_table.sql @@ -0,0 +1,162 @@ +-- +-- Distributed Partitioned Table Tests +-- +SET citus.next_shard_id TO 1760000; + +CREATE SCHEMA partitioned_table_replicated; +SET search_path TO partitioned_table_replicated; + +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 2; + +-- print major version number for version-specific tests +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int AS server_version; + + +CREATE TABLE collections ( + key bigint, + ts timestamptz, + collection_id integer, + value numeric +) PARTITION BY LIST ( collection_id ); + + +CREATE TABLE collections_1 + PARTITION OF collections (key, ts, collection_id, value) + FOR VALUES IN ( 1 ); + +CREATE TABLE collections_2 + PARTITION OF collections (key, ts, collection_id, value) + FOR VALUES IN ( 2 ); + +-- load some data data +INSERT INTO collections (key, ts, collection_id, value) VALUES (1, '2009-01-01', 1, 1); +INSERT INTO collections (key, ts, collection_id, value) VALUES (2, '2009-01-01', 1, 2); +INSERT INTO collections (key, ts, collection_id, value) VALUES (3, '2009-01-01', 2, 1); +INSERT INTO collections (key, ts, collection_id, value) VALUES (4, '2009-01-01', 2, 2); + +-- in the first case, we'll distributed the +-- already existing partitioninong hierarcy +SELECT create_distributed_table('collections', 'key'); + +-- now create partition of a already distributed table +CREATE TABLE collections_3 PARTITION OF collections FOR VALUES IN ( 3 ); + +-- now attaching non distributed table to a distributed table +CREATE TABLE collections_4 AS SELECT * FROM collections LIMIT 0; + +-- load some data +INSERT INTO collections_4 SELECT i, '2009-01-01', 4, i FROM generate_series (0, 10) i; + +ALTER TABLE collections ATTACH PARTITION collections_4 FOR VALUES IN ( 4 ); + +-- finally attach a distributed table to a distributed table +CREATE TABLE collections_5 AS SELECT * FROM collections LIMIT 0; +SELECT create_distributed_table('collections_5', 'key'); + +-- load some data +INSERT INTO collections_5 SELECT i, '2009-01-01', 5, i FROM generate_series (0, 10) i; +ALTER TABLE collections ATTACH PARTITION collections_5 FOR VALUES IN ( 5 ); + +-- make sure that we've all the placements +SELECT + logicalrelid, count(*) as placement_count +FROM + pg_dist_shard, pg_dist_shard_placement +WHERE + logicalrelid::text LIKE '%collections%' AND + pg_dist_shard.shardid = pg_dist_shard_placement.shardid +GROUP BY + logicalrelid +ORDER BY + 1,2; + +-- and, make sure that all tables are colocated +SELECT + count(DISTINCT colocationid) +FROM + pg_dist_partition +WHERE + logicalrelid::text LIKE '%collections%'; + +-- make sure that any kind of modification is disallowed on partitions +-- given that replication factor > 1 +INSERT INTO collections_4 (key, ts, collection_id, value) VALUES (4, '2009-01-01', 2, 2); + +-- single shard update/delete not allowed +UPDATE collections_1 SET ts = now() WHERE key = 1; +DELETE FROM collections_1 WHERE ts = now() AND key = 1; + +-- multi shard update/delete are not allowed +UPDATE collections_1 SET ts = now(); +DELETE FROM collections_1 WHERE ts = now(); + +-- insert..select pushdown +INSERT INTO collections_1 SELECT * FROM collections_1; + +-- insert..select via coordinator +INSERT INTO collections_1 SELECT * FROM collections_1 OFFSET 0; + +-- COPY is not allowed +COPY collections_1 FROM STDIN; +\. + +-- DDLs are not allowed +CREATE INDEX index_on_partition ON collections_1(key); + +-- EXPLAIN with modifications is not allowed as well +UPDATE collections_1 SET ts = now() WHERE key = 1; + +-- TRUNCATE is also not allowed +TRUNCATE collections_1; +TRUNCATE collections, collections_1; + +-- modifying CTEs are also not allowed +WITH collections_5_cte AS +( + DELETE FROM collections_5 RETURNING * +) +SELECT * FROM collections_5_cte; + +-- foreign key creation is disallowed due to replication factor > 1 +CREATE TABLE fkey_test (key bigint PRIMARY KEY); +SELECT create_distributed_table('fkey_test', 'key'); + +ALTER TABLE + collections_5 +ADD CONSTRAINT + fkey_delete FOREIGN KEY(key) +REFERENCES + fkey_test(key) ON DELETE CASCADE; + +-- we should be able to attach and detach partitions +-- given that those DDLs are on the parent table +CREATE TABLE collections_6 + PARTITION OF collections (key, ts, collection_id, value) + FOR VALUES IN ( 6 ); + +ALTER TABLE collections DETACH PARTITION collections_6; +ALTER TABLE collections ATTACH PARTITION collections_6 FOR VALUES IN ( 6 ); + +-- read queries works just fine +SELECT count(*) FROM collections_1 WHERE key = 1; +SELECT count(*) FROM collections_1 WHERE key != 1; + +-- rollups SELECT'ing from partitions should work just fine +CREATE TABLE collections_agg ( + key bigint, + sum_value numeric +); + +SELECT create_distributed_table('collections_agg', 'key'); + +-- pushdown roll-up +INSERT INTO collections_agg SELECT key, sum(key) FROM collections_1 GROUP BY key; + +-- coordinator roll-up +INSERT INTO collections_agg SELECT collection_id, sum(key) FROM collections_1 GROUP BY collection_id; + +SET search_path TO public; +DROP SCHEMA partitioned_table_replicated CASCADE; +