From c1b5a04f6e463ba0707d89fe9caab71f48b196c3 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Fri, 14 Sep 2018 13:07:54 +0300 Subject: [PATCH 1/3] Allow partitioned tables with replication factor > 1 With this commit, we all partitioned distributed tables with replication factor > 1. However, we also have many restrictions. In summary, we disallow all kinds of modifications (including DDLs) on the partition tables. Instead, the user is allowed to run the modifications over the parent table. The necessity for such a restriction have two aspects: - We need to acquire shard resource locks appropriately - We need to handle marking partitions INVALID in case of any failures. Note that, in theory, the parent table should also become INVALID, which is too aggressive. --- .../commands/create_distributed_table.c | 8 - src/backend/distributed/commands/multi_copy.c | 4 + .../distributed/executor/multi_utility.c | 1 + .../master/master_modify_multiple_shards.c | 2 + .../distributed/planner/distributed_planner.c | 54 ++++ .../planner/multi_router_planner.c | 32 ++- .../distributed/utils/shardinterval_utils.c | 22 +- src/include/distributed/distributed_planner.h | 1 + .../distributed/multi_router_planner.h | 1 + .../regress/expected/multi_partitioning.out | 4 - .../regress/expected/multi_partitioning_0.out | 4 - .../regress/expected/multi_partitioning_1.out | 6 - .../expected/replicated_partitioned_table.out | 210 +++++++++++++++ .../replicated_partitioned_table_0.out | 210 +++++++++++++++ .../replicated_partitioned_table_1.out | 252 ++++++++++++++++++ src/test/regress/multi_schedule | 2 +- src/test/regress/sql/multi_partitioning.sql | 3 - .../sql/replicated_partitioned_table.sql | 162 +++++++++++ 18 files changed, 941 insertions(+), 37 deletions(-) create mode 100644 src/test/regress/expected/replicated_partitioned_table.out create mode 100644 src/test/regress/expected/replicated_partitioned_table_0.out create mode 100644 src/test/regress/expected/replicated_partitioned_table_1.out create mode 100644 src/test/regress/sql/replicated_partitioned_table.sql diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 3e3c4ca56..76ea64958 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -743,14 +743,6 @@ EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn, "for hash-distributed tables"))); } - /* we currently don't support partitioned tables for replication factor > 1 */ - if (ShardReplicationFactor > 1) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("distributing partitioned tables with replication " - "factor greater than 1 is not supported"))); - } - /* we don't support distributing tables with multi-level partitioning */ if (PartitionTable(relationId)) { diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 05bcf8432..c91db46f5 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -64,6 +64,7 @@ #include "distributed/multi_copy.h" #include "distributed/multi_partitioning_utils.h" #include "distributed/multi_physical_planner.h" +#include "distributed/multi_router_planner.h" #include "distributed/multi_shard_transaction.h" #include "distributed/placement_connection.h" #include "distributed/relation_access_tracking.h" @@ -199,6 +200,9 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) Oid relationId = RangeVarGetRelid(copyStatement->relation, NoLock, false); char partitionMethod = PartitionMethod(relationId); + /* disallow modifications to a partition table which have rep. factpr > 1 */ + EnsurePartitionTableNotReplicated(relationId); + if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == DISTRIBUTE_BY_RANGE || partitionMethod == DISTRIBUTE_BY_NONE) { diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index 91f02c3a6..acbfb3192 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -3324,6 +3324,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) bool shouldSyncMetadata = ShouldSyncTableMetadata(ddlJob->targetRelationId); EnsureCoordinator(); + EnsurePartitionTableNotReplicated(ddlJob->targetRelationId); if (!ddlJob->concurrentIndexCmd) { diff --git a/src/backend/distributed/master/master_modify_multiple_shards.c b/src/backend/distributed/master/master_modify_multiple_shards.c index 162bfb0a6..f71e3b1d9 100644 --- a/src/backend/distributed/master/master_modify_multiple_shards.c +++ b/src/backend/distributed/master/master_modify_multiple_shards.c @@ -37,6 +37,7 @@ #include "distributed/multi_router_planner.h" #include "distributed/multi_server_executor.h" #include "distributed/multi_shard_transaction.h" +#include "distributed/distributed_planner.h" #include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_partition.h" #include "distributed/resource_lock.h" @@ -142,6 +143,7 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) rangeVar->schemaname = schemaName; } + EnsurePartitionTableNotReplicated(relationId); EnsureTablePermissions(relationId, ACL_TRUNCATE); if (ShouldExecuteTruncateStmtSequential(truncateStatement)) diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 125c0236e..6ef0f5824 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -28,6 +28,7 @@ #include "distributed/multi_master_planner.h" #include "distributed/multi_router_planner.h" #include "distributed/recursive_planning.h" +#include "distributed/shardinterval_utils.h" #include "distributed/worker_shard_visibility.h" #include "executor/executor.h" #include "nodes/makefuncs.h" @@ -61,6 +62,8 @@ static DistributedPlan * CreateDistributedPlan(uint64 planId, Query *originalQue bool hasUnresolvedParams, PlannerRestrictionContext * plannerRestrictionContext); +static DeferredErrorMessage * DeferErrorIfPartitionTableNotSingleReplicated(Oid + relationId); static Node * ResolveExternalParams(Node *inputNode, ParamListInfo boundParams); static void AssignRTEIdentities(Query *queryTree); @@ -584,8 +587,12 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi if (IsModifyCommand(originalQuery)) { + Oid targetRelationId = InvalidOid; EnsureModificationsCanRun(); + targetRelationId = ModifyQueryResultRelationId(query); + EnsurePartitionTableNotReplicated(targetRelationId); + if (InsertSelectIntoDistributedTable(originalQuery)) { distributedPlan = @@ -764,6 +771,53 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi } +/* + * EnsurePartitionTableNotReplicated errors out if the infput relation is + * a partition table and the table has a replication factor greater than + * one. + * + * If the table is not a partition or replication factor is 1, the function + * becomes a no-op. + */ +void +EnsurePartitionTableNotReplicated(Oid relationId) +{ + DeferredErrorMessage *deferredError = + DeferErrorIfPartitionTableNotSingleReplicated(relationId); + if (deferredError != NULL) + { + RaiseDeferredError(deferredError, ERROR); + } +} + + +/* + * DeferErrorIfPartitionTableNotSingleReplicated defers error if the input relation + * is a partition table with replication factor > 1. Otherwise, the function returns + * NULL. + */ +static DeferredErrorMessage * +DeferErrorIfPartitionTableNotSingleReplicated(Oid relationId) +{ + if (PartitionTableNoLock(relationId) && !SingleReplicatedTable(relationId)) + { + Oid parentOid = PartitionParentOid(relationId); + char *parentRelationTest = get_rel_name(parentOid); + StringInfo errorHint = makeStringInfo(); + + appendStringInfo(errorHint, "Run the query on the parent table " + "\"%s\" instead.", parentRelationTest); + + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "modifications on partitions when replication " + "factor is greater than 1 is not supported", + NULL, errorHint->data); + } + + return NULL; +} + + /* * ResolveExternalParams replaces the external parameters that appears * in the query with the corresponding entries in the boundParams. diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 4269a2e24..a48d02fcf 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -33,6 +33,7 @@ #include "distributed/multi_join_order.h" #include "distributed/multi_logical_planner.h" #include "distributed/multi_logical_optimizer.h" +#include "distributed/multi_partitioning_utils.h" #include "distributed/multi_physical_planner.h" #include "distributed/multi_router_planner.h" #include "distributed/multi_server_executor.h" @@ -472,6 +473,33 @@ ExtractSelectRangeTableEntry(Query *query) } +/* + * ModifyQueryResultRelationId returns the result relation's Oid + * for the given modification query. + * + * The function errors out if the input query is not a + * modify query (e.g., INSERT, UPDATE or DELETE). So, this + * function is not expected to be called on SELECT queries. + */ +Oid +ModifyQueryResultRelationId(Query *query) +{ + RangeTblEntry *resultRte = NULL; + + /* only modify queries have result relations */ + if (!IsModifyCommand(query)) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("input query is not a modification query"))); + } + + resultRte = ExtractInsertRangeTableEntry(query); + Assert(OidIsValid(resultRte->relid)); + + return resultRte->relid; +} + + /* * ExtractInsertRangeTableEntry returns the INSERT'ed table's range table entry. * Note that the function expects and asserts that the input query be @@ -588,7 +616,9 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer if (rangeTableEntry->rtekind == RTE_RELATION) { - if (!IsDistributedTable(rangeTableEntry->relid)) + Oid relationId = rangeTableEntry->relid; + + if (!IsDistributedTable(relationId)) { StringInfo errorMessage = makeStringInfo(); char *relationName = get_rel_name(rangeTableEntry->relid); diff --git a/src/backend/distributed/utils/shardinterval_utils.c b/src/backend/distributed/utils/shardinterval_utils.c index a895b994b..d9fbd40ef 100644 --- a/src/backend/distributed/utils/shardinterval_utils.c +++ b/src/backend/distributed/utils/shardinterval_utils.c @@ -414,19 +414,21 @@ SearchCachedShardInterval(Datum partitionColumnValue, ShardInterval **shardInter bool SingleReplicatedTable(Oid relationId) { - List *shardIntervalList = LoadShardList(relationId); - ListCell *shardIntervalCell = NULL; + List *shardList = LoadShardList(relationId); + List *shardPlacementList = NIL; + Oid shardId = INVALID_SHARD_ID; - foreach(shardIntervalCell, shardIntervalList) + if (list_length(shardList) <= 1) { - uint64 *shardIdPointer = (uint64 *) lfirst(shardIntervalCell); - uint64 shardId = (*shardIdPointer); - List *shardPlacementList = ShardPlacementList(shardId); + return false; + } - if (list_length(shardPlacementList) != 1) - { - return false; - } + /* checking only for the first shard id should suffice */ + shardId = (*(uint64 *) linitial(shardList)); + shardPlacementList = ShardPlacementList(shardId); + if (list_length(shardPlacementList) != 1) + { + return false; } return true; diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index f18934e28..03b27b8f5 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -96,6 +96,7 @@ extern void multi_join_restriction_hook(PlannerInfo *root, extern bool IsModifyCommand(Query *query); extern bool IsUpdateOrDelete(struct DistributedPlan *distributedPlan); extern bool IsModifyDistributedPlan(struct DistributedPlan *distributedPlan); +extern void EnsurePartitionTableNotReplicated(Oid relationId); extern bool IsMultiTaskPlan(struct DistributedPlan *distributedPlan); extern bool IsMultiShardModifyPlan(struct DistributedPlan *distributedPlan); extern RangeTblEntry * RemoteScanRangeTableEntry(List *columnNameList); diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index ff22d1335..833c0ce2f 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -59,6 +59,7 @@ extern RelationRestrictionContext * CopyRelationRestrictionContext( extern Oid ExtractFirstDistributedTableId(Query *query); extern RangeTblEntry * ExtractSelectRangeTableEntry(Query *query); +extern Oid ModifyQueryResultRelationId(Query *query); extern RangeTblEntry * ExtractInsertRangeTableEntry(Query *query); extern RangeTblEntry * ExtractDistributedInsertValuesRTE(Query *query); extern bool IsMultiRowInsert(Query *query); diff --git a/src/test/regress/expected/multi_partitioning.out b/src/test/regress/expected/multi_partitioning.out index 450b5eadb..9c90376a8 100644 --- a/src/test/regress/expected/multi_partitioning.out +++ b/src/test/regress/expected/multi_partitioning.out @@ -272,10 +272,6 @@ SELECT create_distributed_table('partitioning_test_failure', 'id', 'range'); ERROR: distributing partitioned tables in only supported for hash-distributed tables SELECT create_reference_table('partitioning_test_failure'); ERROR: distributing partitioned tables in only supported for hash-distributed tables --- replication factor > 1 is not allowed in distributed partitioned tables -SET citus.shard_replication_factor TO 2; -SELECT create_distributed_table('partitioning_test_failure', 'id'); -ERROR: distributing partitioned tables with replication factor greater than 1 is not supported SET citus.shard_replication_factor TO 1; -- non-distributed tables cannot have distributed partitions; DROP TABLE partitioning_test_failure_2009; diff --git a/src/test/regress/expected/multi_partitioning_0.out b/src/test/regress/expected/multi_partitioning_0.out index f54e3552f..b73fc52a7 100644 --- a/src/test/regress/expected/multi_partitioning_0.out +++ b/src/test/regress/expected/multi_partitioning_0.out @@ -272,10 +272,6 @@ SELECT create_distributed_table('partitioning_test_failure', 'id', 'range'); ERROR: distributing partitioned tables in only supported for hash-distributed tables SELECT create_reference_table('partitioning_test_failure'); ERROR: distributing partitioned tables in only supported for hash-distributed tables --- replication factor > 1 is not allowed in distributed partitioned tables -SET citus.shard_replication_factor TO 2; -SELECT create_distributed_table('partitioning_test_failure', 'id'); -ERROR: distributing partitioned tables with replication factor greater than 1 is not supported SET citus.shard_replication_factor TO 1; -- non-distributed tables cannot have distributed partitions; DROP TABLE partitioning_test_failure_2009; diff --git a/src/test/regress/expected/multi_partitioning_1.out b/src/test/regress/expected/multi_partitioning_1.out index 354087d8a..2d0fb4e5c 100644 --- a/src/test/regress/expected/multi_partitioning_1.out +++ b/src/test/regress/expected/multi_partitioning_1.out @@ -270,12 +270,6 @@ SELECT create_reference_table('partitioning_test_failure'); ERROR: relation "partitioning_test_failure" does not exist LINE 1: SELECT create_reference_table('partitioning_test_failure'); ^ --- replication factor > 1 is not allowed in distributed partitioned tables -SET citus.shard_replication_factor TO 2; -SELECT create_distributed_table('partitioning_test_failure', 'id'); -ERROR: relation "partitioning_test_failure" does not exist -LINE 1: SELECT create_distributed_table('partitioning_test_failure',... - ^ SET citus.shard_replication_factor TO 1; -- non-distributed tables cannot have distributed partitions; DROP TABLE partitioning_test_failure_2009; diff --git a/src/test/regress/expected/replicated_partitioned_table.out b/src/test/regress/expected/replicated_partitioned_table.out new file mode 100644 index 000000000..1cdc25c6a --- /dev/null +++ b/src/test/regress/expected/replicated_partitioned_table.out @@ -0,0 +1,210 @@ +-- +-- Distributed Partitioned Table Tests +-- +SET citus.next_shard_id TO 1760000; +CREATE SCHEMA partitioned_table_replicated; +SET search_path TO partitioned_table_replicated; +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 2; +-- print major version number for version-specific tests +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int AS server_version; + server_version +---------------- + 11 +(1 row) + +CREATE TABLE collections ( + key bigint, + ts timestamptz, + collection_id integer, + value numeric +) PARTITION BY LIST ( collection_id ); +CREATE TABLE collections_1 + PARTITION OF collections (key, ts, collection_id, value) + FOR VALUES IN ( 1 ); +CREATE TABLE collections_2 + PARTITION OF collections (key, ts, collection_id, value) + FOR VALUES IN ( 2 ); +-- load some data data +INSERT INTO collections (key, ts, collection_id, value) VALUES (1, '2009-01-01', 1, 1); +INSERT INTO collections (key, ts, collection_id, value) VALUES (2, '2009-01-01', 1, 2); +INSERT INTO collections (key, ts, collection_id, value) VALUES (3, '2009-01-01', 2, 1); +INSERT INTO collections (key, ts, collection_id, value) VALUES (4, '2009-01-01', 2, 2); +-- in the first case, we'll distributed the +-- already existing partitioninong hierarcy +SELECT create_distributed_table('collections', 'key'); +NOTICE: Copying data from local table... +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +-- now create partition of a already distributed table +CREATE TABLE collections_3 PARTITION OF collections FOR VALUES IN ( 3 ); +-- now attaching non distributed table to a distributed table +CREATE TABLE collections_4 AS SELECT * FROM collections LIMIT 0; +-- load some data +INSERT INTO collections_4 SELECT i, '2009-01-01', 4, i FROM generate_series (0, 10) i; +ALTER TABLE collections ATTACH PARTITION collections_4 FOR VALUES IN ( 4 ); +NOTICE: Copying data from local table... +-- finally attach a distributed table to a distributed table +CREATE TABLE collections_5 AS SELECT * FROM collections LIMIT 0; +SELECT create_distributed_table('collections_5', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +-- load some data +INSERT INTO collections_5 SELECT i, '2009-01-01', 5, i FROM generate_series (0, 10) i; +ALTER TABLE collections ATTACH PARTITION collections_5 FOR VALUES IN ( 5 ); +-- make sure that we've all the placements +SELECT + logicalrelid, count(*) as placement_count +FROM + pg_dist_shard, pg_dist_shard_placement +WHERE + logicalrelid::text LIKE '%collections%' AND + pg_dist_shard.shardid = pg_dist_shard_placement.shardid +GROUP BY + logicalrelid +ORDER BY + 1,2; + logicalrelid | placement_count +---------------+----------------- + collections | 8 + collections_1 | 8 + collections_2 | 8 + collections_3 | 8 + collections_4 | 8 + collections_5 | 8 +(6 rows) + +-- and, make sure that all tables are colocated +SELECT + count(DISTINCT colocationid) +FROM + pg_dist_partition +WHERE + logicalrelid::text LIKE '%collections%'; + count +------- + 1 +(1 row) + +-- make sure that any kind of modification is disallowed on partitions +-- given that replication factor > 1 +INSERT INTO collections_4 (key, ts, collection_id, value) VALUES (4, '2009-01-01', 2, 2); +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- single shard update/delete not allowed +UPDATE collections_1 SET ts = now() WHERE key = 1; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +DELETE FROM collections_1 WHERE ts = now() AND key = 1; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- multi shard update/delete are not allowed +UPDATE collections_1 SET ts = now(); +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +DELETE FROM collections_1 WHERE ts = now(); +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- insert..select pushdown +INSERT INTO collections_1 SELECT * FROM collections_1; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- insert..select via coordinator +INSERT INTO collections_1 SELECT * FROM collections_1 OFFSET 0; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- COPY is not allowed +COPY collections_1 FROM STDIN; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +\. +invalid command \. +-- DDLs are not allowed +CREATE INDEX index_on_partition ON collections_1(key); +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- EXPLAIN with modifications is not allowed as well +UPDATE collections_1 SET ts = now() WHERE key = 1; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- TRUNCATE is also not allowed +TRUNCATE collections_1; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +TRUNCATE collections, collections_1; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- modifying CTEs are also not allowed +WITH collections_5_cte AS +( + DELETE FROM collections_5 RETURNING * +) +SELECT * FROM collections_5_cte; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- foreign key creation is disallowed due to replication factor > 1 +CREATE TABLE fkey_test (key bigint PRIMARY KEY); +SELECT create_distributed_table('fkey_test', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +ALTER TABLE + collections_5 +ADD CONSTRAINT + fkey_delete FOREIGN KEY(key) +REFERENCES + fkey_test(key) ON DELETE CASCADE; +ERROR: cannot create foreign key constraint +DETAIL: Citus Community Edition currently supports foreign key constraints only for "citus.shard_replication_factor = 1". +HINT: Please change "citus.shard_replication_factor to 1". To learn more about using foreign keys with other replication factors, please contact us at https://citusdata.com/about/contact_us. +-- we should be able to attach and detach partitions +-- given that those DDLs are on the parent table +CREATE TABLE collections_6 + PARTITION OF collections (key, ts, collection_id, value) + FOR VALUES IN ( 6 ); +ALTER TABLE collections DETACH PARTITION collections_6; +ALTER TABLE collections ATTACH PARTITION collections_6 FOR VALUES IN ( 6 ); +-- read queries works just fine +SELECT count(*) FROM collections_1 WHERE key = 1; + count +------- + 1 +(1 row) + +SELECT count(*) FROM collections_1 WHERE key != 1; + count +------- + 1 +(1 row) + +-- rollups SELECT'ing from partitions should work just fine +CREATE TABLE collections_agg ( + key bigint, + sum_value numeric +); +SELECT create_distributed_table('collections_agg', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +-- pushdown roll-up +INSERT INTO collections_agg SELECT key, sum(key) FROM collections_1 GROUP BY key; +-- coordinator roll-up +INSERT INTO collections_agg SELECT collection_id, sum(key) FROM collections_1 GROUP BY collection_id; +SET search_path TO public; +DROP SCHEMA partitioned_table_replicated CASCADE; +NOTICE: drop cascades to 3 other objects +DETAIL: drop cascades to table partitioned_table_replicated.collections +drop cascades to table partitioned_table_replicated.fkey_test +drop cascades to table partitioned_table_replicated.collections_agg diff --git a/src/test/regress/expected/replicated_partitioned_table_0.out b/src/test/regress/expected/replicated_partitioned_table_0.out new file mode 100644 index 000000000..31e080bae --- /dev/null +++ b/src/test/regress/expected/replicated_partitioned_table_0.out @@ -0,0 +1,210 @@ +-- +-- Distributed Partitioned Table Tests +-- +SET citus.next_shard_id TO 1760000; +CREATE SCHEMA partitioned_table_replicated; +SET search_path TO partitioned_table_replicated; +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 2; +-- print major version number for version-specific tests +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int AS server_version; + server_version +---------------- + 10 +(1 row) + +CREATE TABLE collections ( + key bigint, + ts timestamptz, + collection_id integer, + value numeric +) PARTITION BY LIST ( collection_id ); +CREATE TABLE collections_1 + PARTITION OF collections (key, ts, collection_id, value) + FOR VALUES IN ( 1 ); +CREATE TABLE collections_2 + PARTITION OF collections (key, ts, collection_id, value) + FOR VALUES IN ( 2 ); +-- load some data data +INSERT INTO collections (key, ts, collection_id, value) VALUES (1, '2009-01-01', 1, 1); +INSERT INTO collections (key, ts, collection_id, value) VALUES (2, '2009-01-01', 1, 2); +INSERT INTO collections (key, ts, collection_id, value) VALUES (3, '2009-01-01', 2, 1); +INSERT INTO collections (key, ts, collection_id, value) VALUES (4, '2009-01-01', 2, 2); +-- in the first case, we'll distributed the +-- already existing partitioninong hierarcy +SELECT create_distributed_table('collections', 'key'); +NOTICE: Copying data from local table... +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +-- now create partition of a already distributed table +CREATE TABLE collections_3 PARTITION OF collections FOR VALUES IN ( 3 ); +-- now attaching non distributed table to a distributed table +CREATE TABLE collections_4 AS SELECT * FROM collections LIMIT 0; +-- load some data +INSERT INTO collections_4 SELECT i, '2009-01-01', 4, i FROM generate_series (0, 10) i; +ALTER TABLE collections ATTACH PARTITION collections_4 FOR VALUES IN ( 4 ); +NOTICE: Copying data from local table... +-- finally attach a distributed table to a distributed table +CREATE TABLE collections_5 AS SELECT * FROM collections LIMIT 0; +SELECT create_distributed_table('collections_5', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +-- load some data +INSERT INTO collections_5 SELECT i, '2009-01-01', 5, i FROM generate_series (0, 10) i; +ALTER TABLE collections ATTACH PARTITION collections_5 FOR VALUES IN ( 5 ); +-- make sure that we've all the placements +SELECT + logicalrelid, count(*) as placement_count +FROM + pg_dist_shard, pg_dist_shard_placement +WHERE + logicalrelid::text LIKE '%collections%' AND + pg_dist_shard.shardid = pg_dist_shard_placement.shardid +GROUP BY + logicalrelid +ORDER BY + 1,2; + logicalrelid | placement_count +---------------+----------------- + collections | 8 + collections_1 | 8 + collections_2 | 8 + collections_3 | 8 + collections_4 | 8 + collections_5 | 8 +(6 rows) + +-- and, make sure that all tables are colocated +SELECT + count(DISTINCT colocationid) +FROM + pg_dist_partition +WHERE + logicalrelid::text LIKE '%collections%'; + count +------- + 1 +(1 row) + +-- make sure that any kind of modification is disallowed on partitions +-- given that replication factor > 1 +INSERT INTO collections_4 (key, ts, collection_id, value) VALUES (4, '2009-01-01', 2, 2); +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- single shard update/delete not allowed +UPDATE collections_1 SET ts = now() WHERE key = 1; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +DELETE FROM collections_1 WHERE ts = now() AND key = 1; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- multi shard update/delete are not allowed +UPDATE collections_1 SET ts = now(); +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +DELETE FROM collections_1 WHERE ts = now(); +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- insert..select pushdown +INSERT INTO collections_1 SELECT * FROM collections_1; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- insert..select via coordinator +INSERT INTO collections_1 SELECT * FROM collections_1 OFFSET 0; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- COPY is not allowed +COPY collections_1 FROM STDIN; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +\. +invalid command \. +-- DDLs are not allowed +CREATE INDEX index_on_partition ON collections_1(key); +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- EXPLAIN with modifications is not allowed as well +UPDATE collections_1 SET ts = now() WHERE key = 1; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- TRUNCATE is also not allowed +TRUNCATE collections_1; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +TRUNCATE collections, collections_1; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- modifying CTEs are also not allowed +WITH collections_5_cte AS +( + DELETE FROM collections_5 RETURNING * +) +SELECT * FROM collections_5_cte; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- foreign key creation is disallowed due to replication factor > 1 +CREATE TABLE fkey_test (key bigint PRIMARY KEY); +SELECT create_distributed_table('fkey_test', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +ALTER TABLE + collections_5 +ADD CONSTRAINT + fkey_delete FOREIGN KEY(key) +REFERENCES + fkey_test(key) ON DELETE CASCADE; +ERROR: cannot create foreign key constraint +DETAIL: Citus Community Edition currently supports foreign key constraints only for "citus.shard_replication_factor = 1". +HINT: Please change "citus.shard_replication_factor to 1". To learn more about using foreign keys with other replication factors, please contact us at https://citusdata.com/about/contact_us. +-- we should be able to attach and detach partitions +-- given that those DDLs are on the parent table +CREATE TABLE collections_6 + PARTITION OF collections (key, ts, collection_id, value) + FOR VALUES IN ( 6 ); +ALTER TABLE collections DETACH PARTITION collections_6; +ALTER TABLE collections ATTACH PARTITION collections_6 FOR VALUES IN ( 6 ); +-- read queries works just fine +SELECT count(*) FROM collections_1 WHERE key = 1; + count +------- + 1 +(1 row) + +SELECT count(*) FROM collections_1 WHERE key != 1; + count +------- + 1 +(1 row) + +-- rollups SELECT'ing from partitions should work just fine +CREATE TABLE collections_agg ( + key bigint, + sum_value numeric +); +SELECT create_distributed_table('collections_agg', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +-- pushdown roll-up +INSERT INTO collections_agg SELECT key, sum(key) FROM collections_1 GROUP BY key; +-- coordinator roll-up +INSERT INTO collections_agg SELECT collection_id, sum(key) FROM collections_1 GROUP BY collection_id; +SET search_path TO public; +DROP SCHEMA partitioned_table_replicated CASCADE; +NOTICE: drop cascades to 3 other objects +DETAIL: drop cascades to table partitioned_table_replicated.collections +drop cascades to table partitioned_table_replicated.fkey_test +drop cascades to table partitioned_table_replicated.collections_agg diff --git a/src/test/regress/expected/replicated_partitioned_table_1.out b/src/test/regress/expected/replicated_partitioned_table_1.out new file mode 100644 index 000000000..c7ffc500b --- /dev/null +++ b/src/test/regress/expected/replicated_partitioned_table_1.out @@ -0,0 +1,252 @@ +-- +-- Distributed Partitioned Table Tests +-- +SET citus.next_shard_id TO 1760000; +CREATE SCHEMA partitioned_table_replicated; +SET search_path TO partitioned_table_replicated; +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 2; +-- print major version number for version-specific tests +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int AS server_version; + server_version +---------------- + 9 +(1 row) + +CREATE TABLE collections ( + key bigint, + ts timestamptz, + collection_id integer, + value numeric +) PARTITION BY LIST ( collection_id ); +ERROR: syntax error at or near "PARTITION" +LINE 6: ) PARTITION BY LIST ( collection_id ); + ^ +CREATE TABLE collections_1 + PARTITION OF collections (key, ts, collection_id, value) + FOR VALUES IN ( 1 ); +ERROR: syntax error at or near "PARTITION" +LINE 2: PARTITION OF collections (key, ts, collection_id, value) + ^ +CREATE TABLE collections_2 + PARTITION OF collections (key, ts, collection_id, value) + FOR VALUES IN ( 2 ); +ERROR: syntax error at or near "PARTITION" +LINE 2: PARTITION OF collections (key, ts, collection_id, value) + ^ +-- load some data data +INSERT INTO collections (key, ts, collection_id, value) VALUES (1, '2009-01-01', 1, 1); +ERROR: relation "collections" does not exist +LINE 1: INSERT INTO collections (key, ts, collection_id, value) VALU... + ^ +INSERT INTO collections (key, ts, collection_id, value) VALUES (2, '2009-01-01', 1, 2); +ERROR: relation "collections" does not exist +LINE 1: INSERT INTO collections (key, ts, collection_id, value) VALU... + ^ +INSERT INTO collections (key, ts, collection_id, value) VALUES (3, '2009-01-01', 2, 1); +ERROR: relation "collections" does not exist +LINE 1: INSERT INTO collections (key, ts, collection_id, value) VALU... + ^ +INSERT INTO collections (key, ts, collection_id, value) VALUES (4, '2009-01-01', 2, 2); +ERROR: relation "collections" does not exist +LINE 1: INSERT INTO collections (key, ts, collection_id, value) VALU... + ^ +-- in the first case, we'll distributed the +-- already existing partitioninong hierarcy +SELECT create_distributed_table('collections', 'key'); +ERROR: relation "collections" does not exist +LINE 1: SELECT create_distributed_table('collections', 'key'); + ^ +-- now create partition of a already distributed table +CREATE TABLE collections_3 PARTITION OF collections FOR VALUES IN ( 3 ); +ERROR: syntax error at or near "PARTITION" +LINE 1: CREATE TABLE collections_3 PARTITION OF collections FOR VALU... + ^ +-- now attaching non distributed table to a distributed table +CREATE TABLE collections_4 AS SELECT * FROM collections LIMIT 0; +ERROR: relation "collections" does not exist +LINE 1: CREATE TABLE collections_4 AS SELECT * FROM collections LIMI... + ^ +-- load some data +INSERT INTO collections_4 SELECT i, '2009-01-01', 4, i FROM generate_series (0, 10) i; +ERROR: relation "collections_4" does not exist +LINE 1: INSERT INTO collections_4 SELECT i, '2009-01-01', 4, i FROM ... + ^ +ALTER TABLE collections ATTACH PARTITION collections_4 FOR VALUES IN ( 4 ); +ERROR: syntax error at or near "ATTACH" +LINE 1: ALTER TABLE collections ATTACH PARTITION collections_4 FOR V... + ^ +-- finally attach a distributed table to a distributed table +CREATE TABLE collections_5 AS SELECT * FROM collections LIMIT 0; +ERROR: relation "collections" does not exist +LINE 1: CREATE TABLE collections_5 AS SELECT * FROM collections LIMI... + ^ +SELECT create_distributed_table('collections_5', 'key'); +ERROR: relation "collections_5" does not exist +LINE 1: SELECT create_distributed_table('collections_5', 'key'); + ^ +-- load some data +INSERT INTO collections_5 SELECT i, '2009-01-01', 5, i FROM generate_series (0, 10) i; +ERROR: relation "collections_5" does not exist +LINE 1: INSERT INTO collections_5 SELECT i, '2009-01-01', 5, i FROM ... + ^ +ALTER TABLE collections ATTACH PARTITION collections_5 FOR VALUES IN ( 5 ); +ERROR: syntax error at or near "ATTACH" +LINE 1: ALTER TABLE collections ATTACH PARTITION collections_5 FOR V... + ^ +-- make sure that we've all the placements +SELECT + logicalrelid, count(*) as placement_count +FROM + pg_dist_shard, pg_dist_shard_placement +WHERE + logicalrelid::text LIKE '%collections%' AND + pg_dist_shard.shardid = pg_dist_shard_placement.shardid +GROUP BY + logicalrelid +ORDER BY + 1,2; + logicalrelid | placement_count +--------------+----------------- +(0 rows) + +-- and, make sure that all tables are colocated +SELECT + count(DISTINCT colocationid) +FROM + pg_dist_partition +WHERE + logicalrelid::text LIKE '%collections%'; + count +------- + 0 +(1 row) + +-- make sure that any kind of modification is disallowed on partitions +-- given that replication factor > 1 +INSERT INTO collections_4 (key, ts, collection_id, value) VALUES (4, '2009-01-01', 2, 2); +ERROR: relation "collections_4" does not exist +LINE 1: INSERT INTO collections_4 (key, ts, collection_id, value) VA... + ^ +-- single shard update/delete not allowed +UPDATE collections_1 SET ts = now() WHERE key = 1; +ERROR: relation "collections_1" does not exist +LINE 1: UPDATE collections_1 SET ts = now() WHERE key = 1; + ^ +DELETE FROM collections_1 WHERE ts = now() AND key = 1; +ERROR: relation "collections_1" does not exist +LINE 1: DELETE FROM collections_1 WHERE ts = now() AND key = 1; + ^ +-- multi shard update/delete are not allowed +UPDATE collections_1 SET ts = now(); +ERROR: relation "collections_1" does not exist +LINE 1: UPDATE collections_1 SET ts = now(); + ^ +DELETE FROM collections_1 WHERE ts = now(); +ERROR: relation "collections_1" does not exist +LINE 1: DELETE FROM collections_1 WHERE ts = now(); + ^ +-- insert..select pushdown +INSERT INTO collections_1 SELECT * FROM collections_1; +ERROR: relation "collections_1" does not exist +LINE 1: INSERT INTO collections_1 SELECT * FROM collections_1; + ^ +-- insert..select via coordinator +INSERT INTO collections_1 SELECT * FROM collections_1 OFFSET 0; +ERROR: relation "collections_1" does not exist +LINE 1: INSERT INTO collections_1 SELECT * FROM collections_1 OFFSET... + ^ +-- COPY is not allowed +COPY collections_1 FROM STDIN; +ERROR: relation "collections_1" does not exist +\. +invalid command \. +-- DDLs are not allowed +CREATE INDEX index_on_partition ON collections_1(key); +ERROR: relation "collections_1" does not exist +-- EXPLAIN with modifications is not allowed as well +UPDATE collections_1 SET ts = now() WHERE key = 1; +ERROR: relation "collections_1" does not exist +LINE 1: UPDATE collections_1 SET ts = now() WHERE key = 1; + ^ +-- TRUNCATE is also not allowed +TRUNCATE collections_1; +ERROR: relation "collections_1" does not exist +TRUNCATE collections, collections_1; +ERROR: relation "collections" does not exist +-- modifying CTEs are also not allowed +WITH collections_5_cte AS +( + DELETE FROM collections_5 RETURNING * +) +SELECT * FROM collections_5_cte; +ERROR: relation "collections_5" does not exist +LINE 3: DELETE FROM collections_5 RETURNING * + ^ +-- foreign key creation is disallowed due to replication factor > 1 +CREATE TABLE fkey_test (key bigint PRIMARY KEY); +SELECT create_distributed_table('fkey_test', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +ALTER TABLE + collections_5 +ADD CONSTRAINT + fkey_delete FOREIGN KEY(key) +REFERENCES + fkey_test(key) ON DELETE CASCADE; +ERROR: relation "collections_5" does not exist +-- we should be able to attach and detach partitions +-- given that those DDLs are on the parent table +CREATE TABLE collections_6 + PARTITION OF collections (key, ts, collection_id, value) + FOR VALUES IN ( 6 ); +ERROR: syntax error at or near "PARTITION" +LINE 2: PARTITION OF collections (key, ts, collection_id, value) + ^ +ALTER TABLE collections DETACH PARTITION collections_6; +ERROR: syntax error at or near "DETACH" +LINE 1: ALTER TABLE collections DETACH PARTITION collections_6; + ^ +ALTER TABLE collections ATTACH PARTITION collections_6 FOR VALUES IN ( 6 ); +ERROR: syntax error at or near "ATTACH" +LINE 1: ALTER TABLE collections ATTACH PARTITION collections_6 FOR V... + ^ +-- read queries works just fine +SELECT count(*) FROM collections_1 WHERE key = 1; +ERROR: relation "collections_1" does not exist +LINE 1: SELECT count(*) FROM collections_1 WHERE key = 1; + ^ +SELECT count(*) FROM collections_1 WHERE key != 1; +ERROR: relation "collections_1" does not exist +LINE 1: SELECT count(*) FROM collections_1 WHERE key != 1; + ^ +-- rollups SELECT'ing from partitions should work just fine +CREATE TABLE collections_agg ( + key bigint, + sum_value numeric +); +SELECT create_distributed_table('collections_agg', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +-- pushdown roll-up +INSERT INTO collections_agg SELECT key, sum(key) FROM collections_1 GROUP BY key; +ERROR: relation "collections_1" does not exist +LINE 1: ...RT INTO collections_agg SELECT key, sum(key) FROM collection... + ^ +-- coordinator roll-up +INSERT INTO collections_agg SELECT collection_id, sum(key) FROM collections_1 GROUP BY collection_id; +ERROR: relation "collections_1" does not exist +LINE 1: ...llections_agg SELECT collection_id, sum(key) FROM collection... + ^ +SET search_path TO public; +DROP SCHEMA partitioned_table_replicated CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to table partitioned_table_replicated.fkey_test +drop cascades to table partitioned_table_replicated.collections_agg diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 367015ea5..68b3ccb8b 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -37,7 +37,7 @@ test: multi_insert_select_window multi_shard_update_delete window_functions dml_ # ---------- # Tests for partitioning support # ---------- -test: multi_partitioning_utils multi_partitioning +test: multi_partitioning_utils multi_partitioning replicated_partitioned_table # ---------- diff --git a/src/test/regress/sql/multi_partitioning.sql b/src/test/regress/sql/multi_partitioning.sql index adb643932..8dbef8169 100644 --- a/src/test/regress/sql/multi_partitioning.sql +++ b/src/test/regress/sql/multi_partitioning.sql @@ -173,9 +173,6 @@ SELECT create_distributed_table('partitioning_test_failure', 'id', 'append'); SELECT create_distributed_table('partitioning_test_failure', 'id', 'range'); SELECT create_reference_table('partitioning_test_failure'); --- replication factor > 1 is not allowed in distributed partitioned tables -SET citus.shard_replication_factor TO 2; -SELECT create_distributed_table('partitioning_test_failure', 'id'); SET citus.shard_replication_factor TO 1; -- non-distributed tables cannot have distributed partitions; diff --git a/src/test/regress/sql/replicated_partitioned_table.sql b/src/test/regress/sql/replicated_partitioned_table.sql new file mode 100644 index 000000000..46f592c29 --- /dev/null +++ b/src/test/regress/sql/replicated_partitioned_table.sql @@ -0,0 +1,162 @@ +-- +-- Distributed Partitioned Table Tests +-- +SET citus.next_shard_id TO 1760000; + +CREATE SCHEMA partitioned_table_replicated; +SET search_path TO partitioned_table_replicated; + +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 2; + +-- print major version number for version-specific tests +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int AS server_version; + + +CREATE TABLE collections ( + key bigint, + ts timestamptz, + collection_id integer, + value numeric +) PARTITION BY LIST ( collection_id ); + + +CREATE TABLE collections_1 + PARTITION OF collections (key, ts, collection_id, value) + FOR VALUES IN ( 1 ); + +CREATE TABLE collections_2 + PARTITION OF collections (key, ts, collection_id, value) + FOR VALUES IN ( 2 ); + +-- load some data data +INSERT INTO collections (key, ts, collection_id, value) VALUES (1, '2009-01-01', 1, 1); +INSERT INTO collections (key, ts, collection_id, value) VALUES (2, '2009-01-01', 1, 2); +INSERT INTO collections (key, ts, collection_id, value) VALUES (3, '2009-01-01', 2, 1); +INSERT INTO collections (key, ts, collection_id, value) VALUES (4, '2009-01-01', 2, 2); + +-- in the first case, we'll distributed the +-- already existing partitioninong hierarcy +SELECT create_distributed_table('collections', 'key'); + +-- now create partition of a already distributed table +CREATE TABLE collections_3 PARTITION OF collections FOR VALUES IN ( 3 ); + +-- now attaching non distributed table to a distributed table +CREATE TABLE collections_4 AS SELECT * FROM collections LIMIT 0; + +-- load some data +INSERT INTO collections_4 SELECT i, '2009-01-01', 4, i FROM generate_series (0, 10) i; + +ALTER TABLE collections ATTACH PARTITION collections_4 FOR VALUES IN ( 4 ); + +-- finally attach a distributed table to a distributed table +CREATE TABLE collections_5 AS SELECT * FROM collections LIMIT 0; +SELECT create_distributed_table('collections_5', 'key'); + +-- load some data +INSERT INTO collections_5 SELECT i, '2009-01-01', 5, i FROM generate_series (0, 10) i; +ALTER TABLE collections ATTACH PARTITION collections_5 FOR VALUES IN ( 5 ); + +-- make sure that we've all the placements +SELECT + logicalrelid, count(*) as placement_count +FROM + pg_dist_shard, pg_dist_shard_placement +WHERE + logicalrelid::text LIKE '%collections%' AND + pg_dist_shard.shardid = pg_dist_shard_placement.shardid +GROUP BY + logicalrelid +ORDER BY + 1,2; + +-- and, make sure that all tables are colocated +SELECT + count(DISTINCT colocationid) +FROM + pg_dist_partition +WHERE + logicalrelid::text LIKE '%collections%'; + +-- make sure that any kind of modification is disallowed on partitions +-- given that replication factor > 1 +INSERT INTO collections_4 (key, ts, collection_id, value) VALUES (4, '2009-01-01', 2, 2); + +-- single shard update/delete not allowed +UPDATE collections_1 SET ts = now() WHERE key = 1; +DELETE FROM collections_1 WHERE ts = now() AND key = 1; + +-- multi shard update/delete are not allowed +UPDATE collections_1 SET ts = now(); +DELETE FROM collections_1 WHERE ts = now(); + +-- insert..select pushdown +INSERT INTO collections_1 SELECT * FROM collections_1; + +-- insert..select via coordinator +INSERT INTO collections_1 SELECT * FROM collections_1 OFFSET 0; + +-- COPY is not allowed +COPY collections_1 FROM STDIN; +\. + +-- DDLs are not allowed +CREATE INDEX index_on_partition ON collections_1(key); + +-- EXPLAIN with modifications is not allowed as well +UPDATE collections_1 SET ts = now() WHERE key = 1; + +-- TRUNCATE is also not allowed +TRUNCATE collections_1; +TRUNCATE collections, collections_1; + +-- modifying CTEs are also not allowed +WITH collections_5_cte AS +( + DELETE FROM collections_5 RETURNING * +) +SELECT * FROM collections_5_cte; + +-- foreign key creation is disallowed due to replication factor > 1 +CREATE TABLE fkey_test (key bigint PRIMARY KEY); +SELECT create_distributed_table('fkey_test', 'key'); + +ALTER TABLE + collections_5 +ADD CONSTRAINT + fkey_delete FOREIGN KEY(key) +REFERENCES + fkey_test(key) ON DELETE CASCADE; + +-- we should be able to attach and detach partitions +-- given that those DDLs are on the parent table +CREATE TABLE collections_6 + PARTITION OF collections (key, ts, collection_id, value) + FOR VALUES IN ( 6 ); + +ALTER TABLE collections DETACH PARTITION collections_6; +ALTER TABLE collections ATTACH PARTITION collections_6 FOR VALUES IN ( 6 ); + +-- read queries works just fine +SELECT count(*) FROM collections_1 WHERE key = 1; +SELECT count(*) FROM collections_1 WHERE key != 1; + +-- rollups SELECT'ing from partitions should work just fine +CREATE TABLE collections_agg ( + key bigint, + sum_value numeric +); + +SELECT create_distributed_table('collections_agg', 'key'); + +-- pushdown roll-up +INSERT INTO collections_agg SELECT key, sum(key) FROM collections_1 GROUP BY key; + +-- coordinator roll-up +INSERT INTO collections_agg SELECT collection_id, sum(key) FROM collections_1 GROUP BY collection_id; + +SET search_path TO public; +DROP SCHEMA partitioned_table_replicated CASCADE; + From 8520a5b432e19d9245fce0e5d40f7bf58cfb11e3 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Tue, 18 Sep 2018 15:15:29 +0300 Subject: [PATCH 2/3] worker_append_table_to_shard becomes aware of partitioned tables --- .../worker/worker_data_fetch_protocol.c | 23 ++++++++++++++++++- src/include/distributed/worker_protocol.h | 1 + 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/src/backend/distributed/worker/worker_data_fetch_protocol.c b/src/backend/distributed/worker/worker_data_fetch_protocol.c index a986b7876..0b8415ea1 100644 --- a/src/backend/distributed/worker/worker_data_fetch_protocol.c +++ b/src/backend/distributed/worker/worker_data_fetch_protocol.c @@ -32,6 +32,7 @@ #include "distributed/metadata_cache.h" #include "distributed/multi_client_executor.h" #include "distributed/multi_logical_optimizer.h" +#include "distributed/multi_partitioning_utils.h" #include "distributed/multi_server_executor.h" #include "distributed/multi_utility.h" #include "distributed/relay_utility.h" @@ -763,6 +764,8 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS) uint64 shardId = INVALID_SHARD_ID; bool received = false; StringInfo queryString = NULL; + Oid sourceShardRelationId = InvalidOid; + Oid sourceSchemaId = InvalidOid; CheckCitusVersion(ERROR); @@ -787,7 +790,25 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS) sourceQualifiedName = quote_qualified_identifier(sourceSchemaName, sourceTableName); sourceCopyCommand = makeStringInfo(); - appendStringInfo(sourceCopyCommand, COPY_OUT_COMMAND, sourceQualifiedName); + + /* + * Partitioned tables do not support "COPY table TO STDOUT". Thus, we use + * "COPY (SELECT * FROM table) TO STDOUT" for partitioned tables. + * + * If the schema name is not explicitly set, we use the public schema. + */ + sourceSchemaName = sourceSchemaName ? sourceSchemaName : "public"; + sourceSchemaId = get_namespace_oid(sourceSchemaName, false); + sourceShardRelationId = get_relname_relid(sourceTableName, sourceSchemaId); + if (PartitionedTableNoLock(sourceShardRelationId)) + { + appendStringInfo(sourceCopyCommand, COPY_SELECT_ALL_OUT_COMMAND, + sourceQualifiedName); + } + else + { + appendStringInfo(sourceCopyCommand, COPY_OUT_COMMAND, sourceQualifiedName); + } received = ReceiveRegularFile(sourceNodeName, sourceNodePort, NULL, sourceCopyCommand, localFilePath); diff --git a/src/include/distributed/worker_protocol.h b/src/include/distributed/worker_protocol.h index df4aef827..30bef0f68 100644 --- a/src/include/distributed/worker_protocol.h +++ b/src/include/distributed/worker_protocol.h @@ -46,6 +46,7 @@ /* the tablename in the overloaded COPY statement is the to-be-transferred file */ #define TRANSMIT_REGULAR_COMMAND "COPY \"%s\" TO STDOUT WITH (format 'transmit')" #define COPY_OUT_COMMAND "COPY %s TO STDOUT" +#define COPY_SELECT_ALL_OUT_COMMAND "COPY (SELECT * FROM %s) TO STDOUT" #define COPY_IN_COMMAND "COPY %s FROM '%s'" /* Defines that relate to creating tables */ From abc443d7fa5ea8e280fa8b55d73df66c302f3371 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Tue, 18 Sep 2018 18:29:10 +0300 Subject: [PATCH 3/3] Make sure that shard repair considers replication factor --- .../distributed/executor/multi_utility.c | 64 +++++++-- .../master/master_modify_multiple_shards.c | 1 - .../distributed/master/master_repair_shards.c | 127 ++++++++++++++++-- .../distributed/utils/reference_table_utils.c | 4 +- .../distributed/utils/shardinterval_utils.c | 30 ++++- src/include/distributed/master_protocol.h | 2 +- .../expected/replicated_partitioned_table.out | 88 +++++++++++- .../replicated_partitioned_table_0.out | 91 ++++++++++++- .../replicated_partitioned_table_1.out | 106 +++++++++++++++ .../sql/replicated_partitioned_table.sql | 71 +++++++++- 10 files changed, 551 insertions(+), 33 deletions(-) diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index acbfb3192..6d8919041 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -152,6 +152,7 @@ static void ErrorIfUnsupportedSeqStmt(CreateSeqStmt *createSeqStmt); static void ErrorIfDistributedAlterSeqOwnedBy(AlterSeqStmt *alterSeqStmt); static void ErrorIfUnsupportedTruncateStmt(TruncateStmt *truncateStatement); static void ProcessTruncateStatement(TruncateStmt *truncateStatement); +static void EnsurePartitionTableNotReplicatedForTruncate(TruncateStmt *truncateStatement); static void LockTruncatedRelationMetadataInWorkers(TruncateStmt *truncateStatement); static void AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode); static bool OptionsSpecifyOwnedBy(List *optionList, Oid *ownedByTableId); @@ -2931,7 +2932,51 @@ ErrorIfUnsupportedTruncateStmt(TruncateStmt *truncateStatement) /* - * ProcessTruncateStatement handles distributed locking + * ProcessTruncateStatement handles few things that should be + * done before standard process utility is called for truncate + * command. + */ +static void +ProcessTruncateStatement(TruncateStmt *truncateStatement) +{ + EnsurePartitionTableNotReplicatedForTruncate(truncateStatement); + LockTruncatedRelationMetadataInWorkers(truncateStatement); +} + + +/* + * EnsurePartitionTableNotReplicatedForTruncate a simple wrapper around + * EnsurePartitionTableNotReplicated for TRUNCATE command. + */ +static void +EnsurePartitionTableNotReplicatedForTruncate(TruncateStmt *truncateStatement) +{ + ListCell *relationCell = NULL; + + foreach(relationCell, truncateStatement->relations) + { + RangeVar *relationRV = (RangeVar *) lfirst(relationCell); + Relation relation = heap_openrv(relationRV, NoLock); + Oid relationId = RelationGetRelid(relation); + + if (!IsDistributedTable(relationId)) + { + heap_close(relation, NoLock); + continue; + } + + EnsurePartitionTableNotReplicated(relationId); + + heap_close(relation, NoLock); + } +} + + +/* + * LockTruncatedRelationMetadataInWorkers determines if distributed + * lock is necessary for truncated relations, and acquire locks. + * + * LockTruncatedRelationMetadataInWorkers handles distributed locking * of truncated tables before standard utility takes over. * * Actual distributed truncation occurs inside truncate trigger. @@ -2941,17 +2986,6 @@ ErrorIfUnsupportedTruncateStmt(TruncateStmt *truncateStatement) * non-distributed and distributed relations. */ static void -ProcessTruncateStatement(TruncateStmt *truncateStatement) -{ - LockTruncatedRelationMetadataInWorkers(truncateStatement); -} - - -/* - * LockTruncatedRelationMetadataInWorkers determines if distributed - * lock is necessary for truncated relations, and acquire locks. - */ -static void LockTruncatedRelationMetadataInWorkers(TruncateStmt *truncateStatement) { List *distributedRelationList = NIL; @@ -3316,7 +3350,11 @@ AlterInvolvesPartitionColumn(AlterTableStmt *alterTableStatement, * in its default value of '1pc', then a notice message indicating that '2pc' might be * used for extra safety. In the commit protocol, a BEGIN is sent after connection to * each shard placement and COMMIT/ROLLBACK is handled by - * CompleteShardPlacementTransactions function. + * CoordinatedTransactionCallback function. + * + * The function errors out if the node is not the coordinator or if the DDL is on + * a partitioned table which has replication factor > 1. + * */ static void ExecuteDistributedDDLJob(DDLJob *ddlJob) diff --git a/src/backend/distributed/master/master_modify_multiple_shards.c b/src/backend/distributed/master/master_modify_multiple_shards.c index f71e3b1d9..1668b5985 100644 --- a/src/backend/distributed/master/master_modify_multiple_shards.c +++ b/src/backend/distributed/master/master_modify_multiple_shards.c @@ -143,7 +143,6 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) rangeVar->schemaname = schemaName; } - EnsurePartitionTableNotReplicated(relationId); EnsureTablePermissions(relationId, ACL_TRUNCATE); if (ShouldExecuteTruncateStmtSequential(truncateStatement)) diff --git a/src/backend/distributed/master/master_repair_shards.c b/src/backend/distributed/master/master_repair_shards.c index 666505e61..2fc95d0eb 100644 --- a/src/backend/distributed/master/master_repair_shards.c +++ b/src/backend/distributed/master/master_repair_shards.c @@ -25,6 +25,7 @@ #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/metadata_sync.h" +#include "distributed/multi_partitioning_utils.h" #include "distributed/multi_router_executor.h" #include "distributed/resource_lock.h" #include "distributed/worker_manager.h" @@ -50,6 +51,9 @@ static char LookupShardTransferMode(Oid shardReplicationModeOid); static void RepairShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, int32 targetNodePort); +static List * CopyPartitionShardsCommandList(ShardInterval *shardInterval, + char *sourceNodeName, + int32 sourceNodePort); static void EnsureShardCanBeRepaired(int64 shardId, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, int32 targetNodePort); @@ -219,6 +223,8 @@ RepairShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort, char relationKind = get_rel_relkind(distributedTableId); char *tableOwner = TableOwner(shardInterval->relationId); bool missingOk = false; + bool includeData = false; + bool partitionedTable = false; List *ddlCommandList = NIL; List *foreignConstraintCommandList = NIL; @@ -237,6 +243,18 @@ RepairShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort, "not supported.", relationName))); } + /* + * Let's not allow repairing partitions to prevent any edge cases. + * We're already not allowing any kind of modifications on the partitions + * so their placements are not likely to to be marked as INVALID. The only + * possible case to mark placement of a partition as invalid is + * "ALTER TABLE parent_table DETACH PARTITION partition_table". But, + * given that the table would become a regular distributed table if the + * command succeeds, we're OK since the regular distributed tables can + * be repaired later on. + */ + EnsurePartitionTableNotReplicated(distributedTableId); + /* * We take a lock on the referenced table if there is a foreign constraint * during the copy procedure. If we do not block DMLs on the referenced @@ -260,10 +278,46 @@ RepairShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort, EnsureShardCanBeRepaired(shardId, sourceNodeName, sourceNodePort, targetNodeName, targetNodePort); + /* + * If the shard belongs to a partitioned table, we need to load the data after + * creating the partitions and the partitioning hierarcy. + */ + partitionedTable = PartitionedTableNoLock(distributedTableId); + includeData = !partitionedTable; + /* we generate necessary commands to recreate the shard in target node */ - ddlCommandList = CopyShardCommandList(shardInterval, sourceNodeName, sourceNodePort); + ddlCommandList = + CopyShardCommandList(shardInterval, sourceNodeName, sourceNodePort, includeData); foreignConstraintCommandList = CopyShardForeignConstraintCommandList(shardInterval); ddlCommandList = list_concat(ddlCommandList, foreignConstraintCommandList); + + /* + * CopyShardCommandList() drops the table which cascades to partitions if the + * table is a partitioned table. This means that we need to create both parent + * table and its partitions. + * + * We also skipped copying the data, so include it here. + */ + if (partitionedTable) + { + List *partitionCommandList = NIL; + + char *shardName = ConstructQualifiedShardName(shardInterval); + StringInfo copyShardDataCommand = makeStringInfo(); + + partitionCommandList = + CopyPartitionShardsCommandList(shardInterval, sourceNodeName, sourceNodePort); + ddlCommandList = list_concat(ddlCommandList, partitionCommandList); + + /* finally copy the data as well */ + appendStringInfo(copyShardDataCommand, WORKER_APPEND_TABLE_TO_SHARD, + quote_literal_cstr(shardName), /* table to append */ + quote_literal_cstr(shardName), /* remote table name */ + quote_literal_cstr(sourceNodeName), /* remote host */ + sourceNodePort); /* remote port */ + ddlCommandList = lappend(ddlCommandList, copyShardDataCommand->data); + } + SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort, tableOwner, ddlCommandList); @@ -275,6 +329,49 @@ RepairShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort, } +/* + * CopyPartitionShardsCommandList gets a shardInterval which is a shard that + * belongs to partitioned table (this is asserted). + * + * The function returns a list of commands which re-creates all the partitions + * of the input shardInterval. + */ +static List * +CopyPartitionShardsCommandList(ShardInterval *shardInterval, char *sourceNodeName, + int32 sourceNodePort) +{ + Oid distributedTableId = shardInterval->relationId; + List *partitionList = NIL; + ListCell *partitionOidCell = NULL; + List *ddlCommandList = NIL; + + Assert(PartitionedTableNoLock(distributedTableId)); + + partitionList = PartitionList(distributedTableId); + foreach(partitionOidCell, partitionList) + { + Oid partitionOid = lfirst_oid(partitionOidCell); + uint64 partitionShardId = + ColocatedShardIdInRelation(partitionOid, shardInterval->shardIndex); + ShardInterval *partitionShardInterval = LoadShardInterval(partitionShardId); + bool includeData = false; + List *copyCommandList = NIL; + char *attachPartitionCommand = NULL; + + copyCommandList = + CopyShardCommandList(partitionShardInterval, sourceNodeName, sourceNodePort, + includeData); + ddlCommandList = list_concat(ddlCommandList, copyCommandList); + + attachPartitionCommand = + GenerateAttachShardPartitionCommand(partitionShardInterval); + ddlCommandList = lappend(ddlCommandList, attachPartitionCommand); + } + + return ddlCommandList; +} + + /* * EnsureShardCanBeRepaired checks if the given shard has a healthy placement in the source * node and inactive node on the target node. @@ -350,11 +447,12 @@ SearchShardPlacementInList(List *shardPlacementList, char *nodeName, uint32 node /* * CopyShardCommandList generates command list to copy the given shard placement - * from the source node to the target node. + * from the source node to the target node. Caller could optionally skip copying + * the data by the flag includeDataCopy. */ List * -CopyShardCommandList(ShardInterval *shardInterval, - char *sourceNodeName, int32 sourceNodePort) +CopyShardCommandList(ShardInterval *shardInterval, char *sourceNodeName, + int32 sourceNodePort, bool includeDataCopy) { int64 shardId = shardInterval->shardId; char *shardName = ConstructQualifiedShardName(shardInterval); @@ -371,14 +469,21 @@ CopyShardCommandList(ShardInterval *shardInterval, copyShardToNodeCommandsList = list_concat(copyShardToNodeCommandsList, tableRecreationCommandList); - appendStringInfo(copyShardDataCommand, WORKER_APPEND_TABLE_TO_SHARD, - quote_literal_cstr(shardName), /* table to append */ - quote_literal_cstr(shardName), /* remote table name */ - quote_literal_cstr(sourceNodeName), /* remote host */ - sourceNodePort); /* remote port */ + /* + * The caller doesn't want to include the COPY command, perhaps using + * logical replication to copy the data. + */ + if (includeDataCopy) + { + appendStringInfo(copyShardDataCommand, WORKER_APPEND_TABLE_TO_SHARD, + quote_literal_cstr(shardName), /* table to append */ + quote_literal_cstr(shardName), /* remote table name */ + quote_literal_cstr(sourceNodeName), /* remote host */ + sourceNodePort); /* remote port */ - copyShardToNodeCommandsList = lappend(copyShardToNodeCommandsList, - copyShardDataCommand->data); + copyShardToNodeCommandsList = lappend(copyShardToNodeCommandsList, + copyShardDataCommand->data); + } indexCommandList = GetTableIndexAndConstraintCommands(relationId); indexCommandList = WorkerApplyShardDDLCommandList(indexCommandList, shardId); diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 99a94ba19..ccefdae4b 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -284,7 +284,9 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort) ShardPlacement *sourceShardPlacement = FinalizedShardPlacement(shardId, missingOk); char *srcNodeName = sourceShardPlacement->nodeName; uint32 srcNodePort = sourceShardPlacement->nodePort; - List *ddlCommandList = CopyShardCommandList(shardInterval, srcNodeName, srcNodePort); + bool includeData = true; + List *ddlCommandList = + CopyShardCommandList(shardInterval, srcNodeName, srcNodePort, includeData); List *shardPlacementList = ShardPlacementList(shardId); bool missingWorkerOk = true; diff --git a/src/backend/distributed/utils/shardinterval_utils.c b/src/backend/distributed/utils/shardinterval_utils.c index d9fbd40ef..7825c7f40 100644 --- a/src/backend/distributed/utils/shardinterval_utils.c +++ b/src/backend/distributed/utils/shardinterval_utils.c @@ -16,6 +16,7 @@ #include "catalog/pg_collation.h" #include "catalog/pg_type.h" #include "distributed/metadata_cache.h" +#include "distributed/multi_join_order.h" #include "distributed/distributed_planner.h" #include "distributed/shard_pruning.h" #include "distributed/shardinterval_utils.h" @@ -418,6 +419,7 @@ SingleReplicatedTable(Oid relationId) List *shardPlacementList = NIL; Oid shardId = INVALID_SHARD_ID; + /* we could have append/range distributed tables without shards */ if (list_length(shardList) <= 1) { return false; @@ -425,10 +427,32 @@ SingleReplicatedTable(Oid relationId) /* checking only for the first shard id should suffice */ shardId = (*(uint64 *) linitial(shardList)); - shardPlacementList = ShardPlacementList(shardId); - if (list_length(shardPlacementList) != 1) + + /* for hash distributed tables, it is sufficient to only check one shard */ + if (PartitionMethod(relationId) == DISTRIBUTE_BY_HASH) { - return false; + shardPlacementList = ShardPlacementList(shardId); + if (list_length(shardPlacementList) != 1) + { + return false; + } + } + else + { + List *shardIntervalList = LoadShardList(relationId); + ListCell *shardIntervalCell = NULL; + + foreach(shardIntervalCell, shardIntervalList) + { + uint64 *shardIdPointer = (uint64 *) lfirst(shardIntervalCell); + uint64 shardId = (*shardIdPointer); + List *shardPlacementList = ShardPlacementList(shardId); + + if (list_length(shardPlacementList) != 1) + { + return false; + } + } } return true; diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index f68051283..06c68e4ea 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -163,7 +163,7 @@ extern Datum master_copy_shard_placement(PG_FUNCTION_ARGS); /* function declarations for shard copy functinality */ extern List * CopyShardCommandList(ShardInterval *shardInterval, char *sourceNodeName, - int32 sourceNodePort); + int32 sourceNodePort, bool includeData); extern List * CopyShardForeignConstraintCommandList(ShardInterval *shardInterval); extern void CopyShardForeignConstraintCommandListGrouped(ShardInterval *shardInterval, List ** diff --git a/src/test/regress/expected/replicated_partitioned_table.out b/src/test/regress/expected/replicated_partitioned_table.out index 1cdc25c6a..df990cb02 100644 --- a/src/test/regress/expected/replicated_partitioned_table.out +++ b/src/test/regress/expected/replicated_partitioned_table.out @@ -202,9 +202,95 @@ SELECT create_distributed_table('collections_agg', 'key'); INSERT INTO collections_agg SELECT key, sum(key) FROM collections_1 GROUP BY key; -- coordinator roll-up INSERT INTO collections_agg SELECT collection_id, sum(key) FROM collections_1 GROUP BY collection_id; +-- now make sure that repair functionality works fine +-- create a table and create its distribution metadata +CREATE TABLE customer_engagements (id integer, event_id int) PARTITION BY LIST ( event_id ); +CREATE TABLE customer_engagements_1 + PARTITION OF customer_engagements + FOR VALUES IN ( 1 ); +CREATE TABLE customer_engagements_2 + PARTITION OF customer_engagements + FOR VALUES IN ( 2 ); +-- add some indexes +CREATE INDEX ON customer_engagements (id); +CREATE INDEX ON customer_engagements (event_id); +CREATE INDEX ON customer_engagements (id, event_id); +-- distribute the table +-- create a single shard on the first worker +SET citus.shard_count TO 1; +SET citus.shard_replication_factor TO 2; +SELECT create_distributed_table('customer_engagements', 'id', 'hash'); + create_distributed_table +-------------------------- + +(1 row) + +-- ingest some data for the tests +INSERT INTO customer_engagements VALUES (1, 1); +INSERT INTO customer_engagements VALUES (2, 1); +INSERT INTO customer_engagements VALUES (1, 2); +INSERT INTO customer_engagements VALUES (2, 2); +-- the following queries does the following: +-- (i) create a new shard +-- (ii) mark the second shard placements as unhealthy +-- (iii) do basic checks i.e., only allow copy from healthy placement to unhealthy ones +-- (iv) do a successful master_copy_shard_placement from the first placement to the second +-- (v) mark the first placement as unhealthy and execute a query that is routed to the second placement +SELECT groupid AS worker_2_group FROM pg_dist_node WHERE nodeport=:worker_2_port \gset +SELECT groupid AS worker_1_group FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +-- get the newshardid +SELECT shardid as newshardid FROM pg_dist_shard WHERE logicalrelid = 'customer_engagements'::regclass +\gset +-- now, update the second placement as unhealthy +UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = :newshardid + AND groupid = :worker_2_group; +-- cannot repair a shard after a modification (transaction still open during repair) +BEGIN; +INSERT INTO customer_engagements VALUES (1, 1); +SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); +ERROR: cannot open new connections after the first modification command within a transaction +ROLLBACK; +-- modifications after reparing a shard are fine (will use new metadata) +BEGIN; +SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); + master_copy_shard_placement +----------------------------- + +(1 row) + +ALTER TABLE customer_engagements ADD COLUMN value float DEFAULT 1.0; +SELECT * FROM customer_engagements ORDER BY 1,2,3; + id | event_id | value +----+----------+------- + 1 | 1 | 1 + 1 | 2 | 1 + 2 | 1 | 1 + 2 | 2 | 1 +(4 rows) + +ROLLBACK; +BEGIN; +SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); + master_copy_shard_placement +----------------------------- + +(1 row) + +INSERT INTO customer_engagements VALUES (1, 1); +SELECT count(*) FROM customer_engagements; + count +------- + 5 +(1 row) + +ROLLBACK; +-- TRUNCATE is allowed on the parent table +-- try it just before dropping the table +TRUNCATE collections; SET search_path TO public; DROP SCHEMA partitioned_table_replicated CASCADE; -NOTICE: drop cascades to 3 other objects +NOTICE: drop cascades to 4 other objects DETAIL: drop cascades to table partitioned_table_replicated.collections drop cascades to table partitioned_table_replicated.fkey_test drop cascades to table partitioned_table_replicated.collections_agg +drop cascades to table partitioned_table_replicated.customer_engagements diff --git a/src/test/regress/expected/replicated_partitioned_table_0.out b/src/test/regress/expected/replicated_partitioned_table_0.out index 31e080bae..015b644a7 100644 --- a/src/test/regress/expected/replicated_partitioned_table_0.out +++ b/src/test/regress/expected/replicated_partitioned_table_0.out @@ -202,9 +202,98 @@ SELECT create_distributed_table('collections_agg', 'key'); INSERT INTO collections_agg SELECT key, sum(key) FROM collections_1 GROUP BY key; -- coordinator roll-up INSERT INTO collections_agg SELECT collection_id, sum(key) FROM collections_1 GROUP BY collection_id; +-- now make sure that repair functionality works fine +-- create a table and create its distribution metadata +CREATE TABLE customer_engagements (id integer, event_id int) PARTITION BY LIST ( event_id ); +CREATE TABLE customer_engagements_1 + PARTITION OF customer_engagements + FOR VALUES IN ( 1 ); +CREATE TABLE customer_engagements_2 + PARTITION OF customer_engagements + FOR VALUES IN ( 2 ); +-- add some indexes +CREATE INDEX ON customer_engagements (id); +ERROR: cannot create index on partitioned table "customer_engagements" +CREATE INDEX ON customer_engagements (event_id); +ERROR: cannot create index on partitioned table "customer_engagements" +CREATE INDEX ON customer_engagements (id, event_id); +ERROR: cannot create index on partitioned table "customer_engagements" +-- distribute the table +-- create a single shard on the first worker +SET citus.shard_count TO 1; +SET citus.shard_replication_factor TO 2; +SELECT create_distributed_table('customer_engagements', 'id', 'hash'); + create_distributed_table +-------------------------- + +(1 row) + +-- ingest some data for the tests +INSERT INTO customer_engagements VALUES (1, 1); +INSERT INTO customer_engagements VALUES (2, 1); +INSERT INTO customer_engagements VALUES (1, 2); +INSERT INTO customer_engagements VALUES (2, 2); +-- the following queries does the following: +-- (i) create a new shard +-- (ii) mark the second shard placements as unhealthy +-- (iii) do basic checks i.e., only allow copy from healthy placement to unhealthy ones +-- (iv) do a successful master_copy_shard_placement from the first placement to the second +-- (v) mark the first placement as unhealthy and execute a query that is routed to the second placement +SELECT groupid AS worker_2_group FROM pg_dist_node WHERE nodeport=:worker_2_port \gset +SELECT groupid AS worker_1_group FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +-- get the newshardid +SELECT shardid as newshardid FROM pg_dist_shard WHERE logicalrelid = 'customer_engagements'::regclass +\gset +-- now, update the second placement as unhealthy +UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = :newshardid + AND groupid = :worker_2_group; +-- cannot repair a shard after a modification (transaction still open during repair) +BEGIN; +INSERT INTO customer_engagements VALUES (1, 1); +SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); +ERROR: cannot open new connections after the first modification command within a transaction +ROLLBACK; +-- modifications after reparing a shard are fine (will use new metadata) +BEGIN; +SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); + master_copy_shard_placement +----------------------------- + +(1 row) + +ALTER TABLE customer_engagements ADD COLUMN value float DEFAULT 1.0; +SELECT * FROM customer_engagements ORDER BY 1,2,3; + id | event_id | value +----+----------+------- + 1 | 1 | 1 + 1 | 2 | 1 + 2 | 1 | 1 + 2 | 2 | 1 +(4 rows) + +ROLLBACK; +BEGIN; +SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); + master_copy_shard_placement +----------------------------- + +(1 row) + +INSERT INTO customer_engagements VALUES (1, 1); +SELECT count(*) FROM customer_engagements; + count +------- + 5 +(1 row) + +ROLLBACK; +-- TRUNCATE is allowed on the parent table +-- try it just before dropping the table +TRUNCATE collections; SET search_path TO public; DROP SCHEMA partitioned_table_replicated CASCADE; -NOTICE: drop cascades to 3 other objects +NOTICE: drop cascades to 4 other objects DETAIL: drop cascades to table partitioned_table_replicated.collections drop cascades to table partitioned_table_replicated.fkey_test drop cascades to table partitioned_table_replicated.collections_agg +drop cascades to table partitioned_table_replicated.customer_engagements diff --git a/src/test/regress/expected/replicated_partitioned_table_1.out b/src/test/regress/expected/replicated_partitioned_table_1.out index c7ffc500b..cc4405995 100644 --- a/src/test/regress/expected/replicated_partitioned_table_1.out +++ b/src/test/regress/expected/replicated_partitioned_table_1.out @@ -245,6 +245,112 @@ INSERT INTO collections_agg SELECT collection_id, sum(key) FROM collections_1 GR ERROR: relation "collections_1" does not exist LINE 1: ...llections_agg SELECT collection_id, sum(key) FROM collection... ^ +-- now make sure that repair functionality works fine +-- create a table and create its distribution metadata +CREATE TABLE customer_engagements (id integer, event_id int) PARTITION BY LIST ( event_id ); +ERROR: syntax error at or near "PARTITION" +LINE 1: ...E customer_engagements (id integer, event_id int) PARTITION ... + ^ +CREATE TABLE customer_engagements_1 + PARTITION OF customer_engagements + FOR VALUES IN ( 1 ); +ERROR: syntax error at or near "PARTITION" +LINE 2: PARTITION OF customer_engagements + ^ +CREATE TABLE customer_engagements_2 + PARTITION OF customer_engagements + FOR VALUES IN ( 2 ); +ERROR: syntax error at or near "PARTITION" +LINE 2: PARTITION OF customer_engagements + ^ +-- add some indexes +CREATE INDEX ON customer_engagements (id); +ERROR: relation "customer_engagements" does not exist +CREATE INDEX ON customer_engagements (event_id); +ERROR: relation "customer_engagements" does not exist +CREATE INDEX ON customer_engagements (id, event_id); +ERROR: relation "customer_engagements" does not exist +-- distribute the table +-- create a single shard on the first worker +SET citus.shard_count TO 1; +SET citus.shard_replication_factor TO 2; +SELECT create_distributed_table('customer_engagements', 'id', 'hash'); +ERROR: relation "customer_engagements" does not exist +LINE 1: SELECT create_distributed_table('customer_engagements', 'id'... + ^ +-- ingest some data for the tests +INSERT INTO customer_engagements VALUES (1, 1); +ERROR: relation "customer_engagements" does not exist +LINE 1: INSERT INTO customer_engagements VALUES (1, 1); + ^ +INSERT INTO customer_engagements VALUES (2, 1); +ERROR: relation "customer_engagements" does not exist +LINE 1: INSERT INTO customer_engagements VALUES (2, 1); + ^ +INSERT INTO customer_engagements VALUES (1, 2); +ERROR: relation "customer_engagements" does not exist +LINE 1: INSERT INTO customer_engagements VALUES (1, 2); + ^ +INSERT INTO customer_engagements VALUES (2, 2); +ERROR: relation "customer_engagements" does not exist +LINE 1: INSERT INTO customer_engagements VALUES (2, 2); + ^ +-- the following queries does the following: +-- (i) create a new shard +-- (ii) mark the second shard placements as unhealthy +-- (iii) do basic checks i.e., only allow copy from healthy placement to unhealthy ones +-- (iv) do a successful master_copy_shard_placement from the first placement to the second +-- (v) mark the first placement as unhealthy and execute a query that is routed to the second placement +SELECT groupid AS worker_2_group FROM pg_dist_node WHERE nodeport=:worker_2_port \gset +SELECT groupid AS worker_1_group FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +-- get the newshardid +SELECT shardid as newshardid FROM pg_dist_shard WHERE logicalrelid = 'customer_engagements'::regclass +\gset +ERROR: relation "customer_engagements" does not exist +LINE 1: ...ewshardid FROM pg_dist_shard WHERE logicalrelid = 'customer_... + ^ +-- now, update the second placement as unhealthy +UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = :newshardid + AND groupid = :worker_2_group; +ERROR: syntax error at or near ":" +LINE 1: ...dist_placement SET shardstate = 3 WHERE shardid = :newshardi... + ^ +-- cannot repair a shard after a modification (transaction still open during repair) +BEGIN; +INSERT INTO customer_engagements VALUES (1, 1); +ERROR: relation "customer_engagements" does not exist +LINE 1: INSERT INTO customer_engagements VALUES (1, 1); + ^ +SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); +ERROR: syntax error at or near ":" +LINE 1: SELECT master_copy_shard_placement(:newshardid, 'localhost',... + ^ +ROLLBACK; +-- modifications after reparing a shard are fine (will use new metadata) +BEGIN; +SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); +ERROR: syntax error at or near ":" +LINE 1: SELECT master_copy_shard_placement(:newshardid, 'localhost',... + ^ +ALTER TABLE customer_engagements ADD COLUMN value float DEFAULT 1.0; +ERROR: current transaction is aborted, commands ignored until end of transaction block +SELECT * FROM customer_engagements ORDER BY 1,2,3; +ERROR: current transaction is aborted, commands ignored until end of transaction block +ROLLBACK; +BEGIN; +SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); +ERROR: syntax error at or near ":" +LINE 1: SELECT master_copy_shard_placement(:newshardid, 'localhost',... + ^ +INSERT INTO customer_engagements VALUES (1, 1); +ERROR: current transaction is aborted, commands ignored until end of transaction block +SELECT count(*) FROM customer_engagements; +ERROR: current transaction is aborted, commands ignored until end of transaction block +ROLLBACK; +-- TRUNCATE is allowed on the parent table +-- try it just before dropping the table +TRUNCATE collections; +ERROR: relation "collections" does not exist SET search_path TO public; DROP SCHEMA partitioned_table_replicated CASCADE; NOTICE: drop cascades to 2 other objects diff --git a/src/test/regress/sql/replicated_partitioned_table.sql b/src/test/regress/sql/replicated_partitioned_table.sql index 46f592c29..eba4b8d35 100644 --- a/src/test/regress/sql/replicated_partitioned_table.sql +++ b/src/test/regress/sql/replicated_partitioned_table.sql @@ -157,6 +157,75 @@ INSERT INTO collections_agg SELECT key, sum(key) FROM collections_1 GROUP BY key -- coordinator roll-up INSERT INTO collections_agg SELECT collection_id, sum(key) FROM collections_1 GROUP BY collection_id; +-- now make sure that repair functionality works fine +-- create a table and create its distribution metadata +CREATE TABLE customer_engagements (id integer, event_id int) PARTITION BY LIST ( event_id ); + +CREATE TABLE customer_engagements_1 + PARTITION OF customer_engagements + FOR VALUES IN ( 1 ); + +CREATE TABLE customer_engagements_2 + PARTITION OF customer_engagements + FOR VALUES IN ( 2 ); + +-- add some indexes +CREATE INDEX ON customer_engagements (id); +CREATE INDEX ON customer_engagements (event_id); +CREATE INDEX ON customer_engagements (id, event_id); + +-- distribute the table +-- create a single shard on the first worker +SET citus.shard_count TO 1; +SET citus.shard_replication_factor TO 2; +SELECT create_distributed_table('customer_engagements', 'id', 'hash'); + +-- ingest some data for the tests +INSERT INTO customer_engagements VALUES (1, 1); +INSERT INTO customer_engagements VALUES (2, 1); +INSERT INTO customer_engagements VALUES (1, 2); +INSERT INTO customer_engagements VALUES (2, 2); + +-- the following queries does the following: +-- (i) create a new shard +-- (ii) mark the second shard placements as unhealthy +-- (iii) do basic checks i.e., only allow copy from healthy placement to unhealthy ones +-- (iv) do a successful master_copy_shard_placement from the first placement to the second +-- (v) mark the first placement as unhealthy and execute a query that is routed to the second placement + +SELECT groupid AS worker_2_group FROM pg_dist_node WHERE nodeport=:worker_2_port \gset +SELECT groupid AS worker_1_group FROM pg_dist_node WHERE nodeport=:worker_1_port \gset + +-- get the newshardid +SELECT shardid as newshardid FROM pg_dist_shard WHERE logicalrelid = 'customer_engagements'::regclass +\gset + +-- now, update the second placement as unhealthy +UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = :newshardid + AND groupid = :worker_2_group; + +-- cannot repair a shard after a modification (transaction still open during repair) +BEGIN; +INSERT INTO customer_engagements VALUES (1, 1); +SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); +ROLLBACK; + +-- modifications after reparing a shard are fine (will use new metadata) +BEGIN; +SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); +ALTER TABLE customer_engagements ADD COLUMN value float DEFAULT 1.0; +SELECT * FROM customer_engagements ORDER BY 1,2,3; +ROLLBACK; + +BEGIN; +SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); +INSERT INTO customer_engagements VALUES (1, 1); +SELECT count(*) FROM customer_engagements; +ROLLBACK; + +-- TRUNCATE is allowed on the parent table +-- try it just before dropping the table +TRUNCATE collections; + SET search_path TO public; DROP SCHEMA partitioned_table_replicated CASCADE; -