Allow partitioned tables with replication factor > 1

With this commit, we all partitioned distributed tables with
replication factor > 1. However, we also have many restrictions.

In summary, we disallow all kinds of modifications (including DDLs)
on the partition tables. Instead, the user is allowed to run the
modifications over the parent table.

The necessity for such a restriction have two aspects:
   - We need to acquire shard resource locks appropriately
   - We need to handle marking partitions INVALID in case
     of any failures. Note that, in theory, the parent table
     should also become INVALID, which is too aggressive.
pull/2389/head
Onder Kalaci 2018-09-14 13:07:54 +03:00
parent 22f5af1bc3
commit c1b5a04f6e
18 changed files with 941 additions and 37 deletions

View File

@ -743,14 +743,6 @@ EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn,
"for hash-distributed tables")));
}
/* we currently don't support partitioned tables for replication factor > 1 */
if (ShardReplicationFactor > 1)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("distributing partitioned tables with replication "
"factor greater than 1 is not supported")));
}
/* we don't support distributing tables with multi-level partitioning */
if (PartitionTable(relationId))
{

View File

@ -64,6 +64,7 @@
#include "distributed/multi_copy.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_router_planner.h"
#include "distributed/multi_shard_transaction.h"
#include "distributed/placement_connection.h"
#include "distributed/relation_access_tracking.h"
@ -199,6 +200,9 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
Oid relationId = RangeVarGetRelid(copyStatement->relation, NoLock, false);
char partitionMethod = PartitionMethod(relationId);
/* disallow modifications to a partition table which have rep. factpr > 1 */
EnsurePartitionTableNotReplicated(relationId);
if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod ==
DISTRIBUTE_BY_RANGE || partitionMethod == DISTRIBUTE_BY_NONE)
{

View File

@ -3324,6 +3324,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
bool shouldSyncMetadata = ShouldSyncTableMetadata(ddlJob->targetRelationId);
EnsureCoordinator();
EnsurePartitionTableNotReplicated(ddlJob->targetRelationId);
if (!ddlJob->concurrentIndexCmd)
{

View File

@ -37,6 +37,7 @@
#include "distributed/multi_router_planner.h"
#include "distributed/multi_server_executor.h"
#include "distributed/multi_shard_transaction.h"
#include "distributed/distributed_planner.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/resource_lock.h"
@ -142,6 +143,7 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
rangeVar->schemaname = schemaName;
}
EnsurePartitionTableNotReplicated(relationId);
EnsureTablePermissions(relationId, ACL_TRUNCATE);
if (ShouldExecuteTruncateStmtSequential(truncateStatement))

View File

@ -28,6 +28,7 @@
#include "distributed/multi_master_planner.h"
#include "distributed/multi_router_planner.h"
#include "distributed/recursive_planning.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/worker_shard_visibility.h"
#include "executor/executor.h"
#include "nodes/makefuncs.h"
@ -61,6 +62,8 @@ static DistributedPlan * CreateDistributedPlan(uint64 planId, Query *originalQue
bool hasUnresolvedParams,
PlannerRestrictionContext *
plannerRestrictionContext);
static DeferredErrorMessage * DeferErrorIfPartitionTableNotSingleReplicated(Oid
relationId);
static Node * ResolveExternalParams(Node *inputNode, ParamListInfo boundParams);
static void AssignRTEIdentities(Query *queryTree);
@ -584,8 +587,12 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
if (IsModifyCommand(originalQuery))
{
Oid targetRelationId = InvalidOid;
EnsureModificationsCanRun();
targetRelationId = ModifyQueryResultRelationId(query);
EnsurePartitionTableNotReplicated(targetRelationId);
if (InsertSelectIntoDistributedTable(originalQuery))
{
distributedPlan =
@ -764,6 +771,53 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
}
/*
* EnsurePartitionTableNotReplicated errors out if the infput relation is
* a partition table and the table has a replication factor greater than
* one.
*
* If the table is not a partition or replication factor is 1, the function
* becomes a no-op.
*/
void
EnsurePartitionTableNotReplicated(Oid relationId)
{
DeferredErrorMessage *deferredError =
DeferErrorIfPartitionTableNotSingleReplicated(relationId);
if (deferredError != NULL)
{
RaiseDeferredError(deferredError, ERROR);
}
}
/*
* DeferErrorIfPartitionTableNotSingleReplicated defers error if the input relation
* is a partition table with replication factor > 1. Otherwise, the function returns
* NULL.
*/
static DeferredErrorMessage *
DeferErrorIfPartitionTableNotSingleReplicated(Oid relationId)
{
if (PartitionTableNoLock(relationId) && !SingleReplicatedTable(relationId))
{
Oid parentOid = PartitionParentOid(relationId);
char *parentRelationTest = get_rel_name(parentOid);
StringInfo errorHint = makeStringInfo();
appendStringInfo(errorHint, "Run the query on the parent table "
"\"%s\" instead.", parentRelationTest);
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"modifications on partitions when replication "
"factor is greater than 1 is not supported",
NULL, errorHint->data);
}
return NULL;
}
/*
* ResolveExternalParams replaces the external parameters that appears
* in the query with the corresponding entries in the boundParams.

View File

@ -33,6 +33,7 @@
#include "distributed/multi_join_order.h"
#include "distributed/multi_logical_planner.h"
#include "distributed/multi_logical_optimizer.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_router_planner.h"
#include "distributed/multi_server_executor.h"
@ -472,6 +473,33 @@ ExtractSelectRangeTableEntry(Query *query)
}
/*
* ModifyQueryResultRelationId returns the result relation's Oid
* for the given modification query.
*
* The function errors out if the input query is not a
* modify query (e.g., INSERT, UPDATE or DELETE). So, this
* function is not expected to be called on SELECT queries.
*/
Oid
ModifyQueryResultRelationId(Query *query)
{
RangeTblEntry *resultRte = NULL;
/* only modify queries have result relations */
if (!IsModifyCommand(query))
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("input query is not a modification query")));
}
resultRte = ExtractInsertRangeTableEntry(query);
Assert(OidIsValid(resultRte->relid));
return resultRte->relid;
}
/*
* ExtractInsertRangeTableEntry returns the INSERT'ed table's range table entry.
* Note that the function expects and asserts that the input query be
@ -588,7 +616,9 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer
if (rangeTableEntry->rtekind == RTE_RELATION)
{
if (!IsDistributedTable(rangeTableEntry->relid))
Oid relationId = rangeTableEntry->relid;
if (!IsDistributedTable(relationId))
{
StringInfo errorMessage = makeStringInfo();
char *relationName = get_rel_name(rangeTableEntry->relid);

View File

@ -414,19 +414,21 @@ SearchCachedShardInterval(Datum partitionColumnValue, ShardInterval **shardInter
bool
SingleReplicatedTable(Oid relationId)
{
List *shardIntervalList = LoadShardList(relationId);
ListCell *shardIntervalCell = NULL;
List *shardList = LoadShardList(relationId);
List *shardPlacementList = NIL;
Oid shardId = INVALID_SHARD_ID;
foreach(shardIntervalCell, shardIntervalList)
if (list_length(shardList) <= 1)
{
uint64 *shardIdPointer = (uint64 *) lfirst(shardIntervalCell);
uint64 shardId = (*shardIdPointer);
List *shardPlacementList = ShardPlacementList(shardId);
return false;
}
if (list_length(shardPlacementList) != 1)
{
return false;
}
/* checking only for the first shard id should suffice */
shardId = (*(uint64 *) linitial(shardList));
shardPlacementList = ShardPlacementList(shardId);
if (list_length(shardPlacementList) != 1)
{
return false;
}
return true;

View File

@ -96,6 +96,7 @@ extern void multi_join_restriction_hook(PlannerInfo *root,
extern bool IsModifyCommand(Query *query);
extern bool IsUpdateOrDelete(struct DistributedPlan *distributedPlan);
extern bool IsModifyDistributedPlan(struct DistributedPlan *distributedPlan);
extern void EnsurePartitionTableNotReplicated(Oid relationId);
extern bool IsMultiTaskPlan(struct DistributedPlan *distributedPlan);
extern bool IsMultiShardModifyPlan(struct DistributedPlan *distributedPlan);
extern RangeTblEntry * RemoteScanRangeTableEntry(List *columnNameList);

View File

@ -59,6 +59,7 @@ extern RelationRestrictionContext * CopyRelationRestrictionContext(
extern Oid ExtractFirstDistributedTableId(Query *query);
extern RangeTblEntry * ExtractSelectRangeTableEntry(Query *query);
extern Oid ModifyQueryResultRelationId(Query *query);
extern RangeTblEntry * ExtractInsertRangeTableEntry(Query *query);
extern RangeTblEntry * ExtractDistributedInsertValuesRTE(Query *query);
extern bool IsMultiRowInsert(Query *query);

View File

@ -272,10 +272,6 @@ SELECT create_distributed_table('partitioning_test_failure', 'id', 'range');
ERROR: distributing partitioned tables in only supported for hash-distributed tables
SELECT create_reference_table('partitioning_test_failure');
ERROR: distributing partitioned tables in only supported for hash-distributed tables
-- replication factor > 1 is not allowed in distributed partitioned tables
SET citus.shard_replication_factor TO 2;
SELECT create_distributed_table('partitioning_test_failure', 'id');
ERROR: distributing partitioned tables with replication factor greater than 1 is not supported
SET citus.shard_replication_factor TO 1;
-- non-distributed tables cannot have distributed partitions;
DROP TABLE partitioning_test_failure_2009;

View File

@ -272,10 +272,6 @@ SELECT create_distributed_table('partitioning_test_failure', 'id', 'range');
ERROR: distributing partitioned tables in only supported for hash-distributed tables
SELECT create_reference_table('partitioning_test_failure');
ERROR: distributing partitioned tables in only supported for hash-distributed tables
-- replication factor > 1 is not allowed in distributed partitioned tables
SET citus.shard_replication_factor TO 2;
SELECT create_distributed_table('partitioning_test_failure', 'id');
ERROR: distributing partitioned tables with replication factor greater than 1 is not supported
SET citus.shard_replication_factor TO 1;
-- non-distributed tables cannot have distributed partitions;
DROP TABLE partitioning_test_failure_2009;

View File

@ -270,12 +270,6 @@ SELECT create_reference_table('partitioning_test_failure');
ERROR: relation "partitioning_test_failure" does not exist
LINE 1: SELECT create_reference_table('partitioning_test_failure');
^
-- replication factor > 1 is not allowed in distributed partitioned tables
SET citus.shard_replication_factor TO 2;
SELECT create_distributed_table('partitioning_test_failure', 'id');
ERROR: relation "partitioning_test_failure" does not exist
LINE 1: SELECT create_distributed_table('partitioning_test_failure',...
^
SET citus.shard_replication_factor TO 1;
-- non-distributed tables cannot have distributed partitions;
DROP TABLE partitioning_test_failure_2009;

View File

@ -0,0 +1,210 @@
--
-- Distributed Partitioned Table Tests
--
SET citus.next_shard_id TO 1760000;
CREATE SCHEMA partitioned_table_replicated;
SET search_path TO partitioned_table_replicated;
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 2;
-- print major version number for version-specific tests
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int AS server_version;
server_version
----------------
11
(1 row)
CREATE TABLE collections (
key bigint,
ts timestamptz,
collection_id integer,
value numeric
) PARTITION BY LIST ( collection_id );
CREATE TABLE collections_1
PARTITION OF collections (key, ts, collection_id, value)
FOR VALUES IN ( 1 );
CREATE TABLE collections_2
PARTITION OF collections (key, ts, collection_id, value)
FOR VALUES IN ( 2 );
-- load some data data
INSERT INTO collections (key, ts, collection_id, value) VALUES (1, '2009-01-01', 1, 1);
INSERT INTO collections (key, ts, collection_id, value) VALUES (2, '2009-01-01', 1, 2);
INSERT INTO collections (key, ts, collection_id, value) VALUES (3, '2009-01-01', 2, 1);
INSERT INTO collections (key, ts, collection_id, value) VALUES (4, '2009-01-01', 2, 2);
-- in the first case, we'll distributed the
-- already existing partitioninong hierarcy
SELECT create_distributed_table('collections', 'key');
NOTICE: Copying data from local table...
NOTICE: Copying data from local table...
create_distributed_table
--------------------------
(1 row)
-- now create partition of a already distributed table
CREATE TABLE collections_3 PARTITION OF collections FOR VALUES IN ( 3 );
-- now attaching non distributed table to a distributed table
CREATE TABLE collections_4 AS SELECT * FROM collections LIMIT 0;
-- load some data
INSERT INTO collections_4 SELECT i, '2009-01-01', 4, i FROM generate_series (0, 10) i;
ALTER TABLE collections ATTACH PARTITION collections_4 FOR VALUES IN ( 4 );
NOTICE: Copying data from local table...
-- finally attach a distributed table to a distributed table
CREATE TABLE collections_5 AS SELECT * FROM collections LIMIT 0;
SELECT create_distributed_table('collections_5', 'key');
create_distributed_table
--------------------------
(1 row)
-- load some data
INSERT INTO collections_5 SELECT i, '2009-01-01', 5, i FROM generate_series (0, 10) i;
ALTER TABLE collections ATTACH PARTITION collections_5 FOR VALUES IN ( 5 );
-- make sure that we've all the placements
SELECT
logicalrelid, count(*) as placement_count
FROM
pg_dist_shard, pg_dist_shard_placement
WHERE
logicalrelid::text LIKE '%collections%' AND
pg_dist_shard.shardid = pg_dist_shard_placement.shardid
GROUP BY
logicalrelid
ORDER BY
1,2;
logicalrelid | placement_count
---------------+-----------------
collections | 8
collections_1 | 8
collections_2 | 8
collections_3 | 8
collections_4 | 8
collections_5 | 8
(6 rows)
-- and, make sure that all tables are colocated
SELECT
count(DISTINCT colocationid)
FROM
pg_dist_partition
WHERE
logicalrelid::text LIKE '%collections%';
count
-------
1
(1 row)
-- make sure that any kind of modification is disallowed on partitions
-- given that replication factor > 1
INSERT INTO collections_4 (key, ts, collection_id, value) VALUES (4, '2009-01-01', 2, 2);
ERROR: modifications on partitions when replication factor is greater than 1 is not supported
HINT: Run the query on the parent table "collections" instead.
-- single shard update/delete not allowed
UPDATE collections_1 SET ts = now() WHERE key = 1;
ERROR: modifications on partitions when replication factor is greater than 1 is not supported
HINT: Run the query on the parent table "collections" instead.
DELETE FROM collections_1 WHERE ts = now() AND key = 1;
ERROR: modifications on partitions when replication factor is greater than 1 is not supported
HINT: Run the query on the parent table "collections" instead.
-- multi shard update/delete are not allowed
UPDATE collections_1 SET ts = now();
ERROR: modifications on partitions when replication factor is greater than 1 is not supported
HINT: Run the query on the parent table "collections" instead.
DELETE FROM collections_1 WHERE ts = now();
ERROR: modifications on partitions when replication factor is greater than 1 is not supported
HINT: Run the query on the parent table "collections" instead.
-- insert..select pushdown
INSERT INTO collections_1 SELECT * FROM collections_1;
ERROR: modifications on partitions when replication factor is greater than 1 is not supported
HINT: Run the query on the parent table "collections" instead.
-- insert..select via coordinator
INSERT INTO collections_1 SELECT * FROM collections_1 OFFSET 0;
ERROR: modifications on partitions when replication factor is greater than 1 is not supported
HINT: Run the query on the parent table "collections" instead.
-- COPY is not allowed
COPY collections_1 FROM STDIN;
ERROR: modifications on partitions when replication factor is greater than 1 is not supported
HINT: Run the query on the parent table "collections" instead.
\.
invalid command \.
-- DDLs are not allowed
CREATE INDEX index_on_partition ON collections_1(key);
ERROR: modifications on partitions when replication factor is greater than 1 is not supported
HINT: Run the query on the parent table "collections" instead.
-- EXPLAIN with modifications is not allowed as well
UPDATE collections_1 SET ts = now() WHERE key = 1;
ERROR: modifications on partitions when replication factor is greater than 1 is not supported
HINT: Run the query on the parent table "collections" instead.
-- TRUNCATE is also not allowed
TRUNCATE collections_1;
ERROR: modifications on partitions when replication factor is greater than 1 is not supported
HINT: Run the query on the parent table "collections" instead.
TRUNCATE collections, collections_1;
ERROR: modifications on partitions when replication factor is greater than 1 is not supported
HINT: Run the query on the parent table "collections" instead.
-- modifying CTEs are also not allowed
WITH collections_5_cte AS
(
DELETE FROM collections_5 RETURNING *
)
SELECT * FROM collections_5_cte;
ERROR: modifications on partitions when replication factor is greater than 1 is not supported
HINT: Run the query on the parent table "collections" instead.
-- foreign key creation is disallowed due to replication factor > 1
CREATE TABLE fkey_test (key bigint PRIMARY KEY);
SELECT create_distributed_table('fkey_test', 'key');
create_distributed_table
--------------------------
(1 row)
ALTER TABLE
collections_5
ADD CONSTRAINT
fkey_delete FOREIGN KEY(key)
REFERENCES
fkey_test(key) ON DELETE CASCADE;
ERROR: cannot create foreign key constraint
DETAIL: Citus Community Edition currently supports foreign key constraints only for "citus.shard_replication_factor = 1".
HINT: Please change "citus.shard_replication_factor to 1". To learn more about using foreign keys with other replication factors, please contact us at https://citusdata.com/about/contact_us.
-- we should be able to attach and detach partitions
-- given that those DDLs are on the parent table
CREATE TABLE collections_6
PARTITION OF collections (key, ts, collection_id, value)
FOR VALUES IN ( 6 );
ALTER TABLE collections DETACH PARTITION collections_6;
ALTER TABLE collections ATTACH PARTITION collections_6 FOR VALUES IN ( 6 );
-- read queries works just fine
SELECT count(*) FROM collections_1 WHERE key = 1;
count
-------
1
(1 row)
SELECT count(*) FROM collections_1 WHERE key != 1;
count
-------
1
(1 row)
-- rollups SELECT'ing from partitions should work just fine
CREATE TABLE collections_agg (
key bigint,
sum_value numeric
);
SELECT create_distributed_table('collections_agg', 'key');
create_distributed_table
--------------------------
(1 row)
-- pushdown roll-up
INSERT INTO collections_agg SELECT key, sum(key) FROM collections_1 GROUP BY key;
-- coordinator roll-up
INSERT INTO collections_agg SELECT collection_id, sum(key) FROM collections_1 GROUP BY collection_id;
SET search_path TO public;
DROP SCHEMA partitioned_table_replicated CASCADE;
NOTICE: drop cascades to 3 other objects
DETAIL: drop cascades to table partitioned_table_replicated.collections
drop cascades to table partitioned_table_replicated.fkey_test
drop cascades to table partitioned_table_replicated.collections_agg

View File

@ -0,0 +1,210 @@
--
-- Distributed Partitioned Table Tests
--
SET citus.next_shard_id TO 1760000;
CREATE SCHEMA partitioned_table_replicated;
SET search_path TO partitioned_table_replicated;
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 2;
-- print major version number for version-specific tests
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int AS server_version;
server_version
----------------
10
(1 row)
CREATE TABLE collections (
key bigint,
ts timestamptz,
collection_id integer,
value numeric
) PARTITION BY LIST ( collection_id );
CREATE TABLE collections_1
PARTITION OF collections (key, ts, collection_id, value)
FOR VALUES IN ( 1 );
CREATE TABLE collections_2
PARTITION OF collections (key, ts, collection_id, value)
FOR VALUES IN ( 2 );
-- load some data data
INSERT INTO collections (key, ts, collection_id, value) VALUES (1, '2009-01-01', 1, 1);
INSERT INTO collections (key, ts, collection_id, value) VALUES (2, '2009-01-01', 1, 2);
INSERT INTO collections (key, ts, collection_id, value) VALUES (3, '2009-01-01', 2, 1);
INSERT INTO collections (key, ts, collection_id, value) VALUES (4, '2009-01-01', 2, 2);
-- in the first case, we'll distributed the
-- already existing partitioninong hierarcy
SELECT create_distributed_table('collections', 'key');
NOTICE: Copying data from local table...
NOTICE: Copying data from local table...
create_distributed_table
--------------------------
(1 row)
-- now create partition of a already distributed table
CREATE TABLE collections_3 PARTITION OF collections FOR VALUES IN ( 3 );
-- now attaching non distributed table to a distributed table
CREATE TABLE collections_4 AS SELECT * FROM collections LIMIT 0;
-- load some data
INSERT INTO collections_4 SELECT i, '2009-01-01', 4, i FROM generate_series (0, 10) i;
ALTER TABLE collections ATTACH PARTITION collections_4 FOR VALUES IN ( 4 );
NOTICE: Copying data from local table...
-- finally attach a distributed table to a distributed table
CREATE TABLE collections_5 AS SELECT * FROM collections LIMIT 0;
SELECT create_distributed_table('collections_5', 'key');
create_distributed_table
--------------------------
(1 row)
-- load some data
INSERT INTO collections_5 SELECT i, '2009-01-01', 5, i FROM generate_series (0, 10) i;
ALTER TABLE collections ATTACH PARTITION collections_5 FOR VALUES IN ( 5 );
-- make sure that we've all the placements
SELECT
logicalrelid, count(*) as placement_count
FROM
pg_dist_shard, pg_dist_shard_placement
WHERE
logicalrelid::text LIKE '%collections%' AND
pg_dist_shard.shardid = pg_dist_shard_placement.shardid
GROUP BY
logicalrelid
ORDER BY
1,2;
logicalrelid | placement_count
---------------+-----------------
collections | 8
collections_1 | 8
collections_2 | 8
collections_3 | 8
collections_4 | 8
collections_5 | 8
(6 rows)
-- and, make sure that all tables are colocated
SELECT
count(DISTINCT colocationid)
FROM
pg_dist_partition
WHERE
logicalrelid::text LIKE '%collections%';
count
-------
1
(1 row)
-- make sure that any kind of modification is disallowed on partitions
-- given that replication factor > 1
INSERT INTO collections_4 (key, ts, collection_id, value) VALUES (4, '2009-01-01', 2, 2);
ERROR: modifications on partitions when replication factor is greater than 1 is not supported
HINT: Run the query on the parent table "collections" instead.
-- single shard update/delete not allowed
UPDATE collections_1 SET ts = now() WHERE key = 1;
ERROR: modifications on partitions when replication factor is greater than 1 is not supported
HINT: Run the query on the parent table "collections" instead.
DELETE FROM collections_1 WHERE ts = now() AND key = 1;
ERROR: modifications on partitions when replication factor is greater than 1 is not supported
HINT: Run the query on the parent table "collections" instead.
-- multi shard update/delete are not allowed
UPDATE collections_1 SET ts = now();
ERROR: modifications on partitions when replication factor is greater than 1 is not supported
HINT: Run the query on the parent table "collections" instead.
DELETE FROM collections_1 WHERE ts = now();
ERROR: modifications on partitions when replication factor is greater than 1 is not supported
HINT: Run the query on the parent table "collections" instead.
-- insert..select pushdown
INSERT INTO collections_1 SELECT * FROM collections_1;
ERROR: modifications on partitions when replication factor is greater than 1 is not supported
HINT: Run the query on the parent table "collections" instead.
-- insert..select via coordinator
INSERT INTO collections_1 SELECT * FROM collections_1 OFFSET 0;
ERROR: modifications on partitions when replication factor is greater than 1 is not supported
HINT: Run the query on the parent table "collections" instead.
-- COPY is not allowed
COPY collections_1 FROM STDIN;
ERROR: modifications on partitions when replication factor is greater than 1 is not supported
HINT: Run the query on the parent table "collections" instead.
\.
invalid command \.
-- DDLs are not allowed
CREATE INDEX index_on_partition ON collections_1(key);
ERROR: modifications on partitions when replication factor is greater than 1 is not supported
HINT: Run the query on the parent table "collections" instead.
-- EXPLAIN with modifications is not allowed as well
UPDATE collections_1 SET ts = now() WHERE key = 1;
ERROR: modifications on partitions when replication factor is greater than 1 is not supported
HINT: Run the query on the parent table "collections" instead.
-- TRUNCATE is also not allowed
TRUNCATE collections_1;
ERROR: modifications on partitions when replication factor is greater than 1 is not supported
HINT: Run the query on the parent table "collections" instead.
TRUNCATE collections, collections_1;
ERROR: modifications on partitions when replication factor is greater than 1 is not supported
HINT: Run the query on the parent table "collections" instead.
-- modifying CTEs are also not allowed
WITH collections_5_cte AS
(
DELETE FROM collections_5 RETURNING *
)
SELECT * FROM collections_5_cte;
ERROR: modifications on partitions when replication factor is greater than 1 is not supported
HINT: Run the query on the parent table "collections" instead.
-- foreign key creation is disallowed due to replication factor > 1
CREATE TABLE fkey_test (key bigint PRIMARY KEY);
SELECT create_distributed_table('fkey_test', 'key');
create_distributed_table
--------------------------
(1 row)
ALTER TABLE
collections_5
ADD CONSTRAINT
fkey_delete FOREIGN KEY(key)
REFERENCES
fkey_test(key) ON DELETE CASCADE;
ERROR: cannot create foreign key constraint
DETAIL: Citus Community Edition currently supports foreign key constraints only for "citus.shard_replication_factor = 1".
HINT: Please change "citus.shard_replication_factor to 1". To learn more about using foreign keys with other replication factors, please contact us at https://citusdata.com/about/contact_us.
-- we should be able to attach and detach partitions
-- given that those DDLs are on the parent table
CREATE TABLE collections_6
PARTITION OF collections (key, ts, collection_id, value)
FOR VALUES IN ( 6 );
ALTER TABLE collections DETACH PARTITION collections_6;
ALTER TABLE collections ATTACH PARTITION collections_6 FOR VALUES IN ( 6 );
-- read queries works just fine
SELECT count(*) FROM collections_1 WHERE key = 1;
count
-------
1
(1 row)
SELECT count(*) FROM collections_1 WHERE key != 1;
count
-------
1
(1 row)
-- rollups SELECT'ing from partitions should work just fine
CREATE TABLE collections_agg (
key bigint,
sum_value numeric
);
SELECT create_distributed_table('collections_agg', 'key');
create_distributed_table
--------------------------
(1 row)
-- pushdown roll-up
INSERT INTO collections_agg SELECT key, sum(key) FROM collections_1 GROUP BY key;
-- coordinator roll-up
INSERT INTO collections_agg SELECT collection_id, sum(key) FROM collections_1 GROUP BY collection_id;
SET search_path TO public;
DROP SCHEMA partitioned_table_replicated CASCADE;
NOTICE: drop cascades to 3 other objects
DETAIL: drop cascades to table partitioned_table_replicated.collections
drop cascades to table partitioned_table_replicated.fkey_test
drop cascades to table partitioned_table_replicated.collections_agg

View File

@ -0,0 +1,252 @@
--
-- Distributed Partitioned Table Tests
--
SET citus.next_shard_id TO 1760000;
CREATE SCHEMA partitioned_table_replicated;
SET search_path TO partitioned_table_replicated;
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 2;
-- print major version number for version-specific tests
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int AS server_version;
server_version
----------------
9
(1 row)
CREATE TABLE collections (
key bigint,
ts timestamptz,
collection_id integer,
value numeric
) PARTITION BY LIST ( collection_id );
ERROR: syntax error at or near "PARTITION"
LINE 6: ) PARTITION BY LIST ( collection_id );
^
CREATE TABLE collections_1
PARTITION OF collections (key, ts, collection_id, value)
FOR VALUES IN ( 1 );
ERROR: syntax error at or near "PARTITION"
LINE 2: PARTITION OF collections (key, ts, collection_id, value)
^
CREATE TABLE collections_2
PARTITION OF collections (key, ts, collection_id, value)
FOR VALUES IN ( 2 );
ERROR: syntax error at or near "PARTITION"
LINE 2: PARTITION OF collections (key, ts, collection_id, value)
^
-- load some data data
INSERT INTO collections (key, ts, collection_id, value) VALUES (1, '2009-01-01', 1, 1);
ERROR: relation "collections" does not exist
LINE 1: INSERT INTO collections (key, ts, collection_id, value) VALU...
^
INSERT INTO collections (key, ts, collection_id, value) VALUES (2, '2009-01-01', 1, 2);
ERROR: relation "collections" does not exist
LINE 1: INSERT INTO collections (key, ts, collection_id, value) VALU...
^
INSERT INTO collections (key, ts, collection_id, value) VALUES (3, '2009-01-01', 2, 1);
ERROR: relation "collections" does not exist
LINE 1: INSERT INTO collections (key, ts, collection_id, value) VALU...
^
INSERT INTO collections (key, ts, collection_id, value) VALUES (4, '2009-01-01', 2, 2);
ERROR: relation "collections" does not exist
LINE 1: INSERT INTO collections (key, ts, collection_id, value) VALU...
^
-- in the first case, we'll distributed the
-- already existing partitioninong hierarcy
SELECT create_distributed_table('collections', 'key');
ERROR: relation "collections" does not exist
LINE 1: SELECT create_distributed_table('collections', 'key');
^
-- now create partition of a already distributed table
CREATE TABLE collections_3 PARTITION OF collections FOR VALUES IN ( 3 );
ERROR: syntax error at or near "PARTITION"
LINE 1: CREATE TABLE collections_3 PARTITION OF collections FOR VALU...
^
-- now attaching non distributed table to a distributed table
CREATE TABLE collections_4 AS SELECT * FROM collections LIMIT 0;
ERROR: relation "collections" does not exist
LINE 1: CREATE TABLE collections_4 AS SELECT * FROM collections LIMI...
^
-- load some data
INSERT INTO collections_4 SELECT i, '2009-01-01', 4, i FROM generate_series (0, 10) i;
ERROR: relation "collections_4" does not exist
LINE 1: INSERT INTO collections_4 SELECT i, '2009-01-01', 4, i FROM ...
^
ALTER TABLE collections ATTACH PARTITION collections_4 FOR VALUES IN ( 4 );
ERROR: syntax error at or near "ATTACH"
LINE 1: ALTER TABLE collections ATTACH PARTITION collections_4 FOR V...
^
-- finally attach a distributed table to a distributed table
CREATE TABLE collections_5 AS SELECT * FROM collections LIMIT 0;
ERROR: relation "collections" does not exist
LINE 1: CREATE TABLE collections_5 AS SELECT * FROM collections LIMI...
^
SELECT create_distributed_table('collections_5', 'key');
ERROR: relation "collections_5" does not exist
LINE 1: SELECT create_distributed_table('collections_5', 'key');
^
-- load some data
INSERT INTO collections_5 SELECT i, '2009-01-01', 5, i FROM generate_series (0, 10) i;
ERROR: relation "collections_5" does not exist
LINE 1: INSERT INTO collections_5 SELECT i, '2009-01-01', 5, i FROM ...
^
ALTER TABLE collections ATTACH PARTITION collections_5 FOR VALUES IN ( 5 );
ERROR: syntax error at or near "ATTACH"
LINE 1: ALTER TABLE collections ATTACH PARTITION collections_5 FOR V...
^
-- make sure that we've all the placements
SELECT
logicalrelid, count(*) as placement_count
FROM
pg_dist_shard, pg_dist_shard_placement
WHERE
logicalrelid::text LIKE '%collections%' AND
pg_dist_shard.shardid = pg_dist_shard_placement.shardid
GROUP BY
logicalrelid
ORDER BY
1,2;
logicalrelid | placement_count
--------------+-----------------
(0 rows)
-- and, make sure that all tables are colocated
SELECT
count(DISTINCT colocationid)
FROM
pg_dist_partition
WHERE
logicalrelid::text LIKE '%collections%';
count
-------
0
(1 row)
-- make sure that any kind of modification is disallowed on partitions
-- given that replication factor > 1
INSERT INTO collections_4 (key, ts, collection_id, value) VALUES (4, '2009-01-01', 2, 2);
ERROR: relation "collections_4" does not exist
LINE 1: INSERT INTO collections_4 (key, ts, collection_id, value) VA...
^
-- single shard update/delete not allowed
UPDATE collections_1 SET ts = now() WHERE key = 1;
ERROR: relation "collections_1" does not exist
LINE 1: UPDATE collections_1 SET ts = now() WHERE key = 1;
^
DELETE FROM collections_1 WHERE ts = now() AND key = 1;
ERROR: relation "collections_1" does not exist
LINE 1: DELETE FROM collections_1 WHERE ts = now() AND key = 1;
^
-- multi shard update/delete are not allowed
UPDATE collections_1 SET ts = now();
ERROR: relation "collections_1" does not exist
LINE 1: UPDATE collections_1 SET ts = now();
^
DELETE FROM collections_1 WHERE ts = now();
ERROR: relation "collections_1" does not exist
LINE 1: DELETE FROM collections_1 WHERE ts = now();
^
-- insert..select pushdown
INSERT INTO collections_1 SELECT * FROM collections_1;
ERROR: relation "collections_1" does not exist
LINE 1: INSERT INTO collections_1 SELECT * FROM collections_1;
^
-- insert..select via coordinator
INSERT INTO collections_1 SELECT * FROM collections_1 OFFSET 0;
ERROR: relation "collections_1" does not exist
LINE 1: INSERT INTO collections_1 SELECT * FROM collections_1 OFFSET...
^
-- COPY is not allowed
COPY collections_1 FROM STDIN;
ERROR: relation "collections_1" does not exist
\.
invalid command \.
-- DDLs are not allowed
CREATE INDEX index_on_partition ON collections_1(key);
ERROR: relation "collections_1" does not exist
-- EXPLAIN with modifications is not allowed as well
UPDATE collections_1 SET ts = now() WHERE key = 1;
ERROR: relation "collections_1" does not exist
LINE 1: UPDATE collections_1 SET ts = now() WHERE key = 1;
^
-- TRUNCATE is also not allowed
TRUNCATE collections_1;
ERROR: relation "collections_1" does not exist
TRUNCATE collections, collections_1;
ERROR: relation "collections" does not exist
-- modifying CTEs are also not allowed
WITH collections_5_cte AS
(
DELETE FROM collections_5 RETURNING *
)
SELECT * FROM collections_5_cte;
ERROR: relation "collections_5" does not exist
LINE 3: DELETE FROM collections_5 RETURNING *
^
-- foreign key creation is disallowed due to replication factor > 1
CREATE TABLE fkey_test (key bigint PRIMARY KEY);
SELECT create_distributed_table('fkey_test', 'key');
create_distributed_table
--------------------------
(1 row)
ALTER TABLE
collections_5
ADD CONSTRAINT
fkey_delete FOREIGN KEY(key)
REFERENCES
fkey_test(key) ON DELETE CASCADE;
ERROR: relation "collections_5" does not exist
-- we should be able to attach and detach partitions
-- given that those DDLs are on the parent table
CREATE TABLE collections_6
PARTITION OF collections (key, ts, collection_id, value)
FOR VALUES IN ( 6 );
ERROR: syntax error at or near "PARTITION"
LINE 2: PARTITION OF collections (key, ts, collection_id, value)
^
ALTER TABLE collections DETACH PARTITION collections_6;
ERROR: syntax error at or near "DETACH"
LINE 1: ALTER TABLE collections DETACH PARTITION collections_6;
^
ALTER TABLE collections ATTACH PARTITION collections_6 FOR VALUES IN ( 6 );
ERROR: syntax error at or near "ATTACH"
LINE 1: ALTER TABLE collections ATTACH PARTITION collections_6 FOR V...
^
-- read queries works just fine
SELECT count(*) FROM collections_1 WHERE key = 1;
ERROR: relation "collections_1" does not exist
LINE 1: SELECT count(*) FROM collections_1 WHERE key = 1;
^
SELECT count(*) FROM collections_1 WHERE key != 1;
ERROR: relation "collections_1" does not exist
LINE 1: SELECT count(*) FROM collections_1 WHERE key != 1;
^
-- rollups SELECT'ing from partitions should work just fine
CREATE TABLE collections_agg (
key bigint,
sum_value numeric
);
SELECT create_distributed_table('collections_agg', 'key');
create_distributed_table
--------------------------
(1 row)
-- pushdown roll-up
INSERT INTO collections_agg SELECT key, sum(key) FROM collections_1 GROUP BY key;
ERROR: relation "collections_1" does not exist
LINE 1: ...RT INTO collections_agg SELECT key, sum(key) FROM collection...
^
-- coordinator roll-up
INSERT INTO collections_agg SELECT collection_id, sum(key) FROM collections_1 GROUP BY collection_id;
ERROR: relation "collections_1" does not exist
LINE 1: ...llections_agg SELECT collection_id, sum(key) FROM collection...
^
SET search_path TO public;
DROP SCHEMA partitioned_table_replicated CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table partitioned_table_replicated.fkey_test
drop cascades to table partitioned_table_replicated.collections_agg

View File

@ -37,7 +37,7 @@ test: multi_insert_select_window multi_shard_update_delete window_functions dml_
# ----------
# Tests for partitioning support
# ----------
test: multi_partitioning_utils multi_partitioning
test: multi_partitioning_utils multi_partitioning replicated_partitioned_table
# ----------

View File

@ -173,9 +173,6 @@ SELECT create_distributed_table('partitioning_test_failure', 'id', 'append');
SELECT create_distributed_table('partitioning_test_failure', 'id', 'range');
SELECT create_reference_table('partitioning_test_failure');
-- replication factor > 1 is not allowed in distributed partitioned tables
SET citus.shard_replication_factor TO 2;
SELECT create_distributed_table('partitioning_test_failure', 'id');
SET citus.shard_replication_factor TO 1;
-- non-distributed tables cannot have distributed partitions;

View File

@ -0,0 +1,162 @@
--
-- Distributed Partitioned Table Tests
--
SET citus.next_shard_id TO 1760000;
CREATE SCHEMA partitioned_table_replicated;
SET search_path TO partitioned_table_replicated;
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 2;
-- print major version number for version-specific tests
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int AS server_version;
CREATE TABLE collections (
key bigint,
ts timestamptz,
collection_id integer,
value numeric
) PARTITION BY LIST ( collection_id );
CREATE TABLE collections_1
PARTITION OF collections (key, ts, collection_id, value)
FOR VALUES IN ( 1 );
CREATE TABLE collections_2
PARTITION OF collections (key, ts, collection_id, value)
FOR VALUES IN ( 2 );
-- load some data data
INSERT INTO collections (key, ts, collection_id, value) VALUES (1, '2009-01-01', 1, 1);
INSERT INTO collections (key, ts, collection_id, value) VALUES (2, '2009-01-01', 1, 2);
INSERT INTO collections (key, ts, collection_id, value) VALUES (3, '2009-01-01', 2, 1);
INSERT INTO collections (key, ts, collection_id, value) VALUES (4, '2009-01-01', 2, 2);
-- in the first case, we'll distributed the
-- already existing partitioninong hierarcy
SELECT create_distributed_table('collections', 'key');
-- now create partition of a already distributed table
CREATE TABLE collections_3 PARTITION OF collections FOR VALUES IN ( 3 );
-- now attaching non distributed table to a distributed table
CREATE TABLE collections_4 AS SELECT * FROM collections LIMIT 0;
-- load some data
INSERT INTO collections_4 SELECT i, '2009-01-01', 4, i FROM generate_series (0, 10) i;
ALTER TABLE collections ATTACH PARTITION collections_4 FOR VALUES IN ( 4 );
-- finally attach a distributed table to a distributed table
CREATE TABLE collections_5 AS SELECT * FROM collections LIMIT 0;
SELECT create_distributed_table('collections_5', 'key');
-- load some data
INSERT INTO collections_5 SELECT i, '2009-01-01', 5, i FROM generate_series (0, 10) i;
ALTER TABLE collections ATTACH PARTITION collections_5 FOR VALUES IN ( 5 );
-- make sure that we've all the placements
SELECT
logicalrelid, count(*) as placement_count
FROM
pg_dist_shard, pg_dist_shard_placement
WHERE
logicalrelid::text LIKE '%collections%' AND
pg_dist_shard.shardid = pg_dist_shard_placement.shardid
GROUP BY
logicalrelid
ORDER BY
1,2;
-- and, make sure that all tables are colocated
SELECT
count(DISTINCT colocationid)
FROM
pg_dist_partition
WHERE
logicalrelid::text LIKE '%collections%';
-- make sure that any kind of modification is disallowed on partitions
-- given that replication factor > 1
INSERT INTO collections_4 (key, ts, collection_id, value) VALUES (4, '2009-01-01', 2, 2);
-- single shard update/delete not allowed
UPDATE collections_1 SET ts = now() WHERE key = 1;
DELETE FROM collections_1 WHERE ts = now() AND key = 1;
-- multi shard update/delete are not allowed
UPDATE collections_1 SET ts = now();
DELETE FROM collections_1 WHERE ts = now();
-- insert..select pushdown
INSERT INTO collections_1 SELECT * FROM collections_1;
-- insert..select via coordinator
INSERT INTO collections_1 SELECT * FROM collections_1 OFFSET 0;
-- COPY is not allowed
COPY collections_1 FROM STDIN;
\.
-- DDLs are not allowed
CREATE INDEX index_on_partition ON collections_1(key);
-- EXPLAIN with modifications is not allowed as well
UPDATE collections_1 SET ts = now() WHERE key = 1;
-- TRUNCATE is also not allowed
TRUNCATE collections_1;
TRUNCATE collections, collections_1;
-- modifying CTEs are also not allowed
WITH collections_5_cte AS
(
DELETE FROM collections_5 RETURNING *
)
SELECT * FROM collections_5_cte;
-- foreign key creation is disallowed due to replication factor > 1
CREATE TABLE fkey_test (key bigint PRIMARY KEY);
SELECT create_distributed_table('fkey_test', 'key');
ALTER TABLE
collections_5
ADD CONSTRAINT
fkey_delete FOREIGN KEY(key)
REFERENCES
fkey_test(key) ON DELETE CASCADE;
-- we should be able to attach and detach partitions
-- given that those DDLs are on the parent table
CREATE TABLE collections_6
PARTITION OF collections (key, ts, collection_id, value)
FOR VALUES IN ( 6 );
ALTER TABLE collections DETACH PARTITION collections_6;
ALTER TABLE collections ATTACH PARTITION collections_6 FOR VALUES IN ( 6 );
-- read queries works just fine
SELECT count(*) FROM collections_1 WHERE key = 1;
SELECT count(*) FROM collections_1 WHERE key != 1;
-- rollups SELECT'ing from partitions should work just fine
CREATE TABLE collections_agg (
key bigint,
sum_value numeric
);
SELECT create_distributed_table('collections_agg', 'key');
-- pushdown roll-up
INSERT INTO collections_agg SELECT key, sum(key) FROM collections_1 GROUP BY key;
-- coordinator roll-up
INSERT INTO collections_agg SELECT collection_id, sum(key) FROM collections_1 GROUP BY collection_id;
SET search_path TO public;
DROP SCHEMA partitioned_table_replicated CASCADE;