Add partitioning support to MX tables

Previously, we prevented creation of partitioned tables on Citus MX.
We decided to not focus on this feature until there is a need. Since
now there are requests for this feature, we are implementing support
for partitioned tables on Citus MX.
pull/2083/head
Burak Yucesoy 2018-04-05 11:35:49 +03:00
parent 86b733d14c
commit 0c283fa8a3
11 changed files with 863 additions and 48 deletions

View File

@ -369,6 +369,12 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
CreateReferenceTableShard(relationId);
}
if (ShouldSyncTableMetadata(relationId))
{
CreateTableMetadataOnWorkers(relationId);
}
/* if this table is partitioned table, distribute its partitions too */
if (PartitionedTable(relationId))
{
@ -398,11 +404,6 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
{
relation_close(colocatedRelation, NoLock);
}
if (ShouldSyncTableMetadata(relationId))
{
CreateTableMetadataOnWorkers(relationId);
}
}
@ -727,15 +728,6 @@ EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn,
"factor greater than 1 is not supported")));
}
/* we currently don't support MX tables to be distributed partitioned table */
if (replicationModel == REPLICATION_MODEL_STREAMING &&
CountPrimariesWithMetadata() > 0)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("distributing partitioned tables is not supported "
"with Citus MX")));
}
/* we don't support distributing tables with multi-level partitioning */
if (PartitionTable(relationId))
{

View File

@ -170,6 +170,7 @@ static void RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid ol
static void CheckCopyPermissions(CopyStmt *copyStatement);
static List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist);
static void PostProcessUtility(Node *parsetree);
static void ProcessDropTableStmt(DropStmt *dropTableStatement);
/*
@ -362,6 +363,11 @@ multi_ProcessUtility(PlannedStmt *pstmt,
{
ddlJobs = PlanDropIndexStmt(dropStatement, queryString);
}
if (dropStatement->removeType == OBJECT_TABLE)
{
ProcessDropTableStmt(dropStatement);
}
}
if (IsA(parsetree, AlterTableStmt))
@ -3770,3 +3776,58 @@ PlanGrantStmt(GrantStmt *grantStmt)
return ddlJobs;
}
/*
* ProcessDropTableStmt processes DROP TABLE commands for partitioned tables.
* If we are trying to DROP partitioned tables, we first need to go to MX nodes
* and DETACH partitions from their parents. Otherwise, we process DROP command
* multiple times in MX workers. For shards, we send DROP commands with IF EXISTS
* parameter which solves problem of processing same command multiple times.
* However, for distributed table itself, we directly remove related table from
* Postgres catalogs via performDeletion function, thus we need to be cautious
* about not processing same DROP command twice.
*/
static void
ProcessDropTableStmt(DropStmt *dropTableStatement)
{
ListCell *dropTableCell = NULL;
Assert(dropTableStatement->removeType == OBJECT_TABLE);
foreach(dropTableCell, dropTableStatement->objects)
{
List *tableNameList = (List *) lfirst(dropTableCell);
RangeVar *tableRangeVar = makeRangeVarFromNameList(tableNameList);
bool missingOK = true;
List *partitionList = NIL;
ListCell *partitionCell = NULL;
Oid relationId = RangeVarGetRelid(tableRangeVar, AccessShareLock, missingOK);
if (relationId == InvalidOid ||
!IsDistributedTable(relationId) ||
!ShouldSyncTableMetadata(relationId) ||
!PartitionedTable(relationId))
{
continue;
}
partitionList = PartitionList(relationId);
if (list_length(partitionList) == 0)
{
continue;
}
SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION);
foreach(partitionCell, partitionList)
{
Oid partitionRelationId = lfirst_oid(partitionCell);
char *detachPartitionCommand =
GenerateDetachPartitionCommand(partitionRelationId);
SendCommandToWorkers(WORKERS_WITH_METADATA, detachPartitionCommand);
}
}
}

View File

@ -58,7 +58,7 @@ static Oid TypeOfColumn(Oid tableId, int16 columnId);
static char * TruncateTriggerCreateCommand(Oid relationId);
static char * SchemaOwnerName(Oid objectId);
static bool HasMetadataWorkers(void);
static List * DetachPartitionCommandList(void);
PG_FUNCTION_INFO_V1(start_metadata_sync_to_node);
PG_FUNCTION_INFO_V1(stop_metadata_sync_to_node);
@ -219,7 +219,8 @@ ShouldSyncTableMetadata(Oid relationId)
* following queries:
*
* (i) Query that populates pg_dist_node table
* (ii) Queries that create the clustered tables
* (ii) Queries that create the clustered tables (including foreign keys,
* partitioning hierarchy etc.)
* (iii) Queries that populate pg_dist_partition table referenced by (ii)
* (iv) Queries that populate pg_dist_shard table referenced by (iii)
* (v) Queries that populate pg_dist_placement table referenced by (iv)
@ -252,16 +253,6 @@ MetadataCreateCommands(void)
if (ShouldSyncTableMetadata(cacheEntry->relationId))
{
propagatedTableList = lappend(propagatedTableList, cacheEntry);
if (PartitionedTable(cacheEntry->relationId))
{
char *relationName = get_rel_name(cacheEntry->relationId);
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot perform metadata sync for "
"partitioned table \"%s\"",
relationName)));
}
}
}
@ -297,6 +288,22 @@ MetadataCreateCommands(void)
foreignConstraintCommands);
}
/* construct partitioning hierarchy after all tables are created */
foreach(distributedTableCell, propagatedTableList)
{
DistTableCacheEntry *cacheEntry =
(DistTableCacheEntry *) lfirst(distributedTableCell);
if (PartitionTable(cacheEntry->relationId))
{
char *alterTableAttachPartitionCommands =
GenerateAlterTableAttachPartitionCommand(cacheEntry->relationId);
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
alterTableAttachPartitionCommands);
}
}
/* after all tables are created, create the metadata */
foreach(distributedTableCell, propagatedTableList)
{
@ -382,6 +389,14 @@ GetDistributedTableDDLEvents(Oid relationId)
foreignConstraintCommands = GetTableForeignConstraintCommands(relationId);
commandList = list_concat(commandList, foreignConstraintCommands);
/* commands to create partitioning hierarchy */
if (PartitionTable(relationId))
{
char *alterTableAttachPartitionCommands =
GenerateAlterTableAttachPartitionCommand(relationId);
commandList = lappend(commandList, alterTableAttachPartitionCommands);
}
return commandList;
}
@ -391,19 +406,25 @@ GetDistributedTableDDLEvents(Oid relationId)
* drop all the metadata of the node that are related to clustered tables.
* The drop metadata snapshot commands includes the following queries:
*
* (i) Queries that delete all the rows from pg_dist_node table
* (ii) Queries that drop the clustered tables and remove its references from
* the pg_dist_partition. Note that distributed relation ids are gathered
* from the worker itself to prevent dropping any non-distributed tables
* with the same name.
* (iii) Queries that delete all the rows from pg_dist_shard table referenced by (ii)
* (iv) Queries that delete all the rows from pg_dist_placement table
* referenced by (iii)
* (i) Query to disable DDL propagation (necessary for (ii)
* (ii) Queries that DETACH all partitions of distributed tables
* (iii) Queries that delete all the rows from pg_dist_node table
* (iv) Queries that drop the clustered tables and remove its references from
* the pg_dist_partition. Note that distributed relation ids are gathered
* from the worker itself to prevent dropping any non-distributed tables
* with the same name.
* (v) Queries that delete all the rows from pg_dist_shard table referenced by (iv)
* (vi) Queries that delete all the rows from pg_dist_placement table
* referenced by (v)
*/
List *
MetadataDropCommands(void)
{
List *dropSnapshotCommandList = NIL;
List *detachPartitionCommandList = DetachPartitionCommandList();
dropSnapshotCommandList = list_concat(dropSnapshotCommandList,
detachPartitionCommandList);
dropSnapshotCommandList = lappend(dropSnapshotCommandList,
REMOVE_ALL_CLUSTERED_TABLES_COMMAND);
@ -1096,3 +1117,63 @@ CreateTableMetadataOnWorkers(Oid relationId)
SendCommandToWorkers(WORKERS_WITH_METADATA, command);
}
}
/*
* DetachPartitionCommandList returns list of DETACH commands to detach partitions
* of all distributed tables. This function is used for detaching partitions in MX
* workers before DROPping distributed partitioned tables in them. Thus, we are
* disabling DDL propagation to the beginning of the commands (we are also enabling
* DDL propagation at the end of command list to swtich back to original state). As
* an extra step, if there are no partitions to DETACH, this function simply returns
* empty list to not disable/enable DDL propagation for nothing.
*/
static List *
DetachPartitionCommandList(void)
{
List *detachPartitionCommandList = NIL;
List *distributedTableList = DistributedTableList();
ListCell *distributedTableCell = NULL;
/* we iterate over all distributed partitioned tables and DETACH their partitions */
foreach(distributedTableCell, distributedTableList)
{
DistTableCacheEntry *cacheEntry =
(DistTableCacheEntry *) lfirst(distributedTableCell);
List *partitionList = NIL;
ListCell *partitionCell = NULL;
if (!PartitionedTable(cacheEntry->relationId))
{
continue;
}
partitionList = PartitionList(cacheEntry->relationId);
foreach(partitionCell, partitionList)
{
Oid partitionRelationId = lfirst_oid(partitionCell);
char *detachPartitionCommand =
GenerateDetachPartitionCommand(partitionRelationId);
detachPartitionCommandList = lappend(detachPartitionCommandList,
detachPartitionCommand);
}
}
if (list_length(detachPartitionCommandList) == 0)
{
return NIL;
}
detachPartitionCommandList =
lcons(DISABLE_DDL_PROPAGATION, detachPartitionCommandList);
/*
* We probably do not need this but as an extra precaution, we are enabling
* DDL propagation to swtich back to original state.
*/
detachPartitionCommandList = lappend(detachPartitionCommandList,
ENABLE_DDL_PROPAGATION);
return detachPartitionCommandList;
}

View File

@ -242,7 +242,8 @@ GenerateDetachPartitionCommand(Oid partitionTableId)
tableQualifiedName = generate_qualified_relation_name(partitionTableId);
parentTableQualifiedName = generate_qualified_relation_name(parentId);
appendStringInfo(detachPartitionCommand, "ALTER TABLE %s DETACH PARTITION %s;",
appendStringInfo(detachPartitionCommand,
"ALTER TABLE IF EXISTS %s DETACH PARTITION %s;",
parentTableQualifiedName, tableQualifiedName);
#endif

View File

@ -42,6 +42,7 @@ extern void CreateTableMetadataOnWorkers(Oid relationId);
#define REMOVE_ALL_CLUSTERED_TABLES_COMMAND \
"SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition"
#define DISABLE_DDL_PROPAGATION "SET citus.enable_ddl_propagation TO 'off'"
#define ENABLE_DDL_PROPAGATION "SET citus.enable_ddl_propagation TO 'on'"
#define WORKER_APPLY_SEQUENCE_COMMAND "SELECT worker_apply_sequence_command (%s)"
#define UPSERT_PLACEMENT \
"INSERT INTO pg_dist_placement " \

View File

@ -466,16 +466,16 @@ FROM pg_dist_partition NATURAL JOIN shard_counts
ORDER BY colocationid, logicalrelid;
logicalrelid | colocationid | shard_count | partmethod | repmodel
--------------------------------------------------------+--------------+-------------+------------+----------
nation_hash | 2 | 16 | h | s
citus_mx_test_schema.nation_hash | 2 | 16 | h | s
citus_mx_test_schema_join_1.nation_hash | 3 | 4 | h | s
citus_mx_test_schema_join_1.nation_hash_2 | 3 | 4 | h | s
citus_mx_test_schema_join_2.nation_hash | 3 | 4 | h | s
citus_mx_test_schema.nation_hash_collation_search_path | 3 | 4 | h | s
citus_mx_test_schema.nation_hash_composite_types | 3 | 4 | h | s
mx_ddl_table | 3 | 4 | h | s
app_analytics_events_mx | 3 | 4 | h | s
company_employees_mx | 3 | 4 | h | s
citus_mx_test_schema_join_1.nation_hash | 2 | 4 | h | s
citus_mx_test_schema_join_1.nation_hash_2 | 2 | 4 | h | s
citus_mx_test_schema_join_2.nation_hash | 2 | 4 | h | s
citus_mx_test_schema.nation_hash_collation_search_path | 2 | 4 | h | s
citus_mx_test_schema.nation_hash_composite_types | 2 | 4 | h | s
mx_ddl_table | 2 | 4 | h | s
app_analytics_events_mx | 2 | 4 | h | s
company_employees_mx | 2 | 4 | h | s
nation_hash | 3 | 16 | h | s
citus_mx_test_schema.nation_hash | 3 | 16 | h | s
lineitem_mx | 4 | 16 | h | s
orders_mx | 4 | 16 | h | s
customer_mx | 5 | 1 | n | t

View File

@ -0,0 +1,270 @@
--
-- Distributed Partitioned Table MX Tests
--
SET citus.next_shard_id TO 1700000;
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
-- make sure wen can create partitioning tables in MX
SET citus.replication_model TO 'streaming';
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
-----------------------------
(1 row)
-- 1-) Distributing partitioned table
-- create partitioned table
CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time);
-- create its partitions
CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01');
CREATE TABLE partitioning_test_2010 PARTITION OF partitioning_test FOR VALUES FROM ('2010-01-01') TO ('2011-01-01');
-- load some data and distribute tables
INSERT INTO partitioning_test VALUES (1, '2009-06-06');
INSERT INTO partitioning_test VALUES (2, '2010-07-07');
INSERT INTO partitioning_test_2009 VALUES (3, '2009-09-09');
INSERT INTO partitioning_test_2010 VALUES (4, '2010-03-03');
-- distribute partitioned table
SELECT create_distributed_table('partitioning_test', 'id');
NOTICE: Copying data from local table...
NOTICE: Copying data from local table...
create_distributed_table
--------------------------
(1 row)
-- see from MX node, the data is loaded to shards
\c - - - :worker_1_port
SELECT * FROM partitioning_test ORDER BY 1;
id | time
----+------------
1 | 06-06-2009
2 | 07-07-2010
3 | 09-09-2009
4 | 03-03-2010
(4 rows)
-- see from MX node, partitioned table and its partitions are distributed
SELECT
logicalrelid
FROM
pg_dist_partition
WHERE
logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010')
ORDER BY 1;
logicalrelid
------------------------
partitioning_test
partitioning_test_2009
partitioning_test_2010
(3 rows)
SELECT
logicalrelid, count(*)
FROM pg_dist_shard
WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010')
GROUP BY
logicalrelid
ORDER BY
1,2;
logicalrelid | count
------------------------+-------
partitioning_test | 4
partitioning_test_2009 | 4
partitioning_test_2010 | 4
(3 rows)
-- see from MX node, partitioning hierarchy is built
SELECT inhrelid::regclass FROM pg_inherits WHERE inhparent = 'partitioning_test'::regclass;
inhrelid
------------------------
partitioning_test_2009
partitioning_test_2010
(2 rows)
\c - - - :master_port
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
-- 2-) Creating partition of a distributed table
CREATE TABLE partitioning_test_2011 PARTITION OF partitioning_test FOR VALUES FROM ('2011-01-01') TO ('2012-01-01');
-- see from MX node, new partition is automatically distributed as well
\c - - - :worker_1_port
SELECT
logicalrelid
FROM
pg_dist_partition
WHERE
logicalrelid IN ('partitioning_test', 'partitioning_test_2011')
ORDER BY 1;
logicalrelid
------------------------
partitioning_test
partitioning_test_2011
(2 rows)
SELECT
logicalrelid, count(*)
FROM pg_dist_shard
WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2011')
GROUP BY
logicalrelid
ORDER BY
1,2;
logicalrelid | count
------------------------+-------
partitioning_test | 4
partitioning_test_2011 | 4
(2 rows)
-- see from MX node, partitioning hierarchy is built
SELECT inhrelid::regclass FROM pg_inherits WHERE inhparent = 'partitioning_test'::regclass;
inhrelid
------------------------
partitioning_test_2009
partitioning_test_2010
partitioning_test_2011
(3 rows)
\c - - - :master_port
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
-- 3-) Attaching non distributed table to a distributed table
CREATE TABLE partitioning_test_2012(id int, time date);
-- load some data
INSERT INTO partitioning_test_2012 VALUES (5, '2012-06-06');
INSERT INTO partitioning_test_2012 VALUES (6, '2012-07-07');
ALTER TABLE partitioning_test ATTACH PARTITION partitioning_test_2012 FOR VALUES FROM ('2012-01-01') TO ('2013-01-01');
NOTICE: Copying data from local table...
-- see from MX node, attached partition is distributed as well
\c - - - :worker_1_port
SELECT
logicalrelid
FROM
pg_dist_partition
WHERE
logicalrelid IN ('partitioning_test', 'partitioning_test_2012')
ORDER BY 1;
logicalrelid
------------------------
partitioning_test
partitioning_test_2012
(2 rows)
SELECT
logicalrelid, count(*)
FROM pg_dist_shard
WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2012')
GROUP BY
logicalrelid
ORDER BY
1,2;
logicalrelid | count
------------------------+-------
partitioning_test | 4
partitioning_test_2012 | 4
(2 rows)
-- see from MX node, see the data is loaded to shards
SELECT * FROM partitioning_test ORDER BY 1;
id | time
----+------------
1 | 06-06-2009
2 | 07-07-2010
3 | 09-09-2009
4 | 03-03-2010
5 | 06-06-2012
6 | 07-07-2012
(6 rows)
-- see from MX node, partitioning hierarchy is built
SELECT inhrelid::regclass FROM pg_inherits WHERE inhparent = 'partitioning_test'::regclass;
inhrelid
------------------------
partitioning_test_2009
partitioning_test_2010
partitioning_test_2011
partitioning_test_2012
(4 rows)
\c - - - :master_port
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
-- 4-) Attaching distributed table to distributed table
CREATE TABLE partitioning_test_2013(id int, time date);
SELECT create_distributed_table('partitioning_test_2013', 'id');
create_distributed_table
--------------------------
(1 row)
-- load some data
INSERT INTO partitioning_test_2013 VALUES (7, '2013-06-06');
INSERT INTO partitioning_test_2013 VALUES (8, '2013-07-07');
ALTER TABLE partitioning_test ATTACH PARTITION partitioning_test_2013 FOR VALUES FROM ('2013-01-01') TO ('2014-01-01');
-- see from MX node, see the data is loaded to shards
\c - - - :worker_1_port
SELECT * FROM partitioning_test ORDER BY 1;
id | time
----+------------
1 | 06-06-2009
2 | 07-07-2010
3 | 09-09-2009
4 | 03-03-2010
5 | 06-06-2012
6 | 07-07-2012
7 | 06-06-2013
8 | 07-07-2013
(8 rows)
-- see from MX node, partitioning hierarchy is built
SELECT inhrelid::regclass FROM pg_inherits WHERE inhparent = 'partitioning_test'::regclass;
inhrelid
------------------------
partitioning_test_2009
partitioning_test_2010
partitioning_test_2011
partitioning_test_2012
partitioning_test_2013
(5 rows)
\c - - - :master_port
-- 5-) Detaching partition of the partitioned table
ALTER TABLE partitioning_test DETACH PARTITION partitioning_test_2009;
-- see from MX node, partitioning hierarchy is built
\c - - - :worker_1_port
SELECT inhrelid::regclass FROM pg_inherits WHERE inhparent = 'partitioning_test'::regclass;
inhrelid
------------------------
partitioning_test_2010
partitioning_test_2011
partitioning_test_2012
partitioning_test_2013
(4 rows)
\c - - - :master_port
-- make sure we can repeatedly call start_metadata_sync_to_node
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
-----------------------------
(1 row)
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
-----------------------------
(1 row)
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
-----------------------------
(1 row)
-- make sure we can drop partitions
DROP TABLE partitioning_test_2009;
DROP TABLE partitioning_test_2010;
-- make sure we can drop partitioned table
DROP TABLE partitioning_test;
DROP TABLE IF EXISTS partitioning_test_2013;
NOTICE: table "partitioning_test_2013" does not exist, skipping

View File

@ -0,0 +1,236 @@
--
-- Distributed Partitioned Table MX Tests
--
SET citus.next_shard_id TO 1700000;
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
-- make sure wen can create partitioning tables in MX
SET citus.replication_model TO 'streaming';
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
-----------------------------
(1 row)
-- 1-) Distributing partitioned table
-- create partitioned table
CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time);
ERROR: syntax error at or near "PARTITION"
LINE 1: CREATE TABLE partitioning_test(id int, time date) PARTITION ...
^
-- create its partitions
CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01');
ERROR: syntax error at or near "PARTITION"
LINE 1: CREATE TABLE partitioning_test_2009 PARTITION OF partitionin...
^
CREATE TABLE partitioning_test_2010 PARTITION OF partitioning_test FOR VALUES FROM ('2010-01-01') TO ('2011-01-01');
ERROR: syntax error at or near "PARTITION"
LINE 1: CREATE TABLE partitioning_test_2010 PARTITION OF partitionin...
^
-- load some data and distribute tables
INSERT INTO partitioning_test VALUES (1, '2009-06-06');
ERROR: relation "partitioning_test" does not exist
LINE 1: INSERT INTO partitioning_test VALUES (1, '2009-06-06');
^
INSERT INTO partitioning_test VALUES (2, '2010-07-07');
ERROR: relation "partitioning_test" does not exist
LINE 1: INSERT INTO partitioning_test VALUES (2, '2010-07-07');
^
INSERT INTO partitioning_test_2009 VALUES (3, '2009-09-09');
ERROR: relation "partitioning_test_2009" does not exist
LINE 1: INSERT INTO partitioning_test_2009 VALUES (3, '2009-09-09');
^
INSERT INTO partitioning_test_2010 VALUES (4, '2010-03-03');
ERROR: relation "partitioning_test_2010" does not exist
LINE 1: INSERT INTO partitioning_test_2010 VALUES (4, '2010-03-03');
^
-- distribute partitioned table
SELECT create_distributed_table('partitioning_test', 'id');
ERROR: relation "partitioning_test" does not exist
LINE 1: SELECT create_distributed_table('partitioning_test', 'id');
^
-- see from MX node, the data is loaded to shards
\c - - - :worker_1_port
SELECT * FROM partitioning_test ORDER BY 1;
ERROR: relation "partitioning_test" does not exist
LINE 1: SELECT * FROM partitioning_test ORDER BY 1;
^
-- see from MX node, partitioned table and its partitions are distributed
SELECT
logicalrelid
FROM
pg_dist_partition
WHERE
logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010')
ORDER BY 1;
ERROR: relation "partitioning_test" does not exist
LINE 6: logicalrelid IN ('partitioning_test', 'partitioning_test_20...
^
SELECT
logicalrelid, count(*)
FROM pg_dist_shard
WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010')
GROUP BY
logicalrelid
ORDER BY
1,2;
ERROR: relation "partitioning_test" does not exist
LINE 4: WHERE logicalrelid IN ('partitioning_test', 'partitioning_t...
^
-- see from MX node, partitioning hierarchy is built
SELECT inhrelid::regclass FROM pg_inherits WHERE inhparent = 'partitioning_test'::regclass;
ERROR: relation "partitioning_test" does not exist
LINE 1: ...elid::regclass FROM pg_inherits WHERE inhparent = 'partition...
^
\c - - - :master_port
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
-- 2-) Creating partition of a distributed table
CREATE TABLE partitioning_test_2011 PARTITION OF partitioning_test FOR VALUES FROM ('2011-01-01') TO ('2012-01-01');
ERROR: syntax error at or near "PARTITION"
LINE 1: CREATE TABLE partitioning_test_2011 PARTITION OF partitionin...
^
-- see from MX node, new partition is automatically distributed as well
\c - - - :worker_1_port
SELECT
logicalrelid
FROM
pg_dist_partition
WHERE
logicalrelid IN ('partitioning_test', 'partitioning_test_2011')
ORDER BY 1;
ERROR: relation "partitioning_test" does not exist
LINE 6: logicalrelid IN ('partitioning_test', 'partitioning_test_20...
^
SELECT
logicalrelid, count(*)
FROM pg_dist_shard
WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2011')
GROUP BY
logicalrelid
ORDER BY
1,2;
ERROR: relation "partitioning_test" does not exist
LINE 4: WHERE logicalrelid IN ('partitioning_test', 'partitioning_t...
^
-- see from MX node, partitioning hierarchy is built
SELECT inhrelid::regclass FROM pg_inherits WHERE inhparent = 'partitioning_test'::regclass;
ERROR: relation "partitioning_test" does not exist
LINE 1: ...elid::regclass FROM pg_inherits WHERE inhparent = 'partition...
^
\c - - - :master_port
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
-- 3-) Attaching non distributed table to a distributed table
CREATE TABLE partitioning_test_2012(id int, time date);
-- load some data
INSERT INTO partitioning_test_2012 VALUES (5, '2012-06-06');
INSERT INTO partitioning_test_2012 VALUES (6, '2012-07-07');
ALTER TABLE partitioning_test ATTACH PARTITION partitioning_test_2012 FOR VALUES FROM ('2012-01-01') TO ('2013-01-01');
ERROR: syntax error at or near "ATTACH"
LINE 1: ALTER TABLE partitioning_test ATTACH PARTITION partitioning_...
^
-- see from MX node, attached partition is distributed as well
\c - - - :worker_1_port
SELECT
logicalrelid
FROM
pg_dist_partition
WHERE
logicalrelid IN ('partitioning_test', 'partitioning_test_2012')
ORDER BY 1;
ERROR: relation "partitioning_test" does not exist
LINE 6: logicalrelid IN ('partitioning_test', 'partitioning_test_20...
^
SELECT
logicalrelid, count(*)
FROM pg_dist_shard
WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2012')
GROUP BY
logicalrelid
ORDER BY
1,2;
ERROR: relation "partitioning_test" does not exist
LINE 4: WHERE logicalrelid IN ('partitioning_test', 'partitioning_t...
^
-- see from MX node, see the data is loaded to shards
SELECT * FROM partitioning_test ORDER BY 1;
ERROR: relation "partitioning_test" does not exist
LINE 1: SELECT * FROM partitioning_test ORDER BY 1;
^
-- see from MX node, partitioning hierarchy is built
SELECT inhrelid::regclass FROM pg_inherits WHERE inhparent = 'partitioning_test'::regclass;
ERROR: relation "partitioning_test" does not exist
LINE 1: ...elid::regclass FROM pg_inherits WHERE inhparent = 'partition...
^
\c - - - :master_port
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
-- 4-) Attaching distributed table to distributed table
CREATE TABLE partitioning_test_2013(id int, time date);
SELECT create_distributed_table('partitioning_test_2013', 'id');
create_distributed_table
--------------------------
(1 row)
-- load some data
INSERT INTO partitioning_test_2013 VALUES (7, '2013-06-06');
INSERT INTO partitioning_test_2013 VALUES (8, '2013-07-07');
ALTER TABLE partitioning_test ATTACH PARTITION partitioning_test_2013 FOR VALUES FROM ('2013-01-01') TO ('2014-01-01');
ERROR: syntax error at or near "ATTACH"
LINE 1: ALTER TABLE partitioning_test ATTACH PARTITION partitioning_...
^
-- see from MX node, see the data is loaded to shards
\c - - - :worker_1_port
SELECT * FROM partitioning_test ORDER BY 1;
ERROR: relation "partitioning_test" does not exist
LINE 1: SELECT * FROM partitioning_test ORDER BY 1;
^
-- see from MX node, partitioning hierarchy is built
SELECT inhrelid::regclass FROM pg_inherits WHERE inhparent = 'partitioning_test'::regclass;
ERROR: relation "partitioning_test" does not exist
LINE 1: ...elid::regclass FROM pg_inherits WHERE inhparent = 'partition...
^
\c - - - :master_port
-- 5-) Detaching partition of the partitioned table
ALTER TABLE partitioning_test DETACH PARTITION partitioning_test_2009;
ERROR: syntax error at or near "DETACH"
LINE 1: ALTER TABLE partitioning_test DETACH PARTITION partitioning_...
^
-- see from MX node, partitioning hierarchy is built
\c - - - :worker_1_port
SELECT inhrelid::regclass FROM pg_inherits WHERE inhparent = 'partitioning_test'::regclass;
ERROR: relation "partitioning_test" does not exist
LINE 1: ...elid::regclass FROM pg_inherits WHERE inhparent = 'partition...
^
\c - - - :master_port
-- make sure we can repeatedly call start_metadata_sync_to_node
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
-----------------------------
(1 row)
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
-----------------------------
(1 row)
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
-----------------------------
(1 row)
-- make sure we can drop partitions
DROP TABLE partitioning_test_2009;
ERROR: table "partitioning_test_2009" does not exist
DROP TABLE partitioning_test_2010;
ERROR: table "partitioning_test_2010" does not exist
-- make sure we can drop partitioned table
DROP TABLE partitioning_test;
ERROR: table "partitioning_test" does not exist
DROP TABLE IF EXISTS partitioning_test_2013;

View File

@ -342,9 +342,9 @@ SELECT generate_alter_table_attach_partition_command('multi_column_partition_2')
(1 row)
SELECT generate_alter_table_detach_partition_command('multi_column_partition_2');
generate_alter_table_detach_partition_command
-----------------------------------------------------------------------------------------------
ALTER TABLE public.multi_column_partitioned DETACH PARTITION public.multi_column_partition_2;
generate_alter_table_detach_partition_command
---------------------------------------------------------------------------------------------------------
ALTER TABLE IF EXISTS public.multi_column_partitioned DETACH PARTITION public.multi_column_partition_2;
(1 row)
-- finally a test with LIST partitioning

View File

@ -17,6 +17,7 @@ test: multi_extension
test: multi_cluster_management
test: multi_test_helpers
test: multi_mx_partitioning
test: multi_mx_create_table
test: multi_mx_copy_data multi_mx_router_planner
test: multi_mx_schema_support multi_mx_tpch_query1 multi_mx_tpch_query10

View File

@ -0,0 +1,172 @@
--
-- Distributed Partitioned Table MX Tests
--
SET citus.next_shard_id TO 1700000;
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
-- make sure wen can create partitioning tables in MX
SET citus.replication_model TO 'streaming';
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
-- 1-) Distributing partitioned table
-- create partitioned table
CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time);
-- create its partitions
CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01');
CREATE TABLE partitioning_test_2010 PARTITION OF partitioning_test FOR VALUES FROM ('2010-01-01') TO ('2011-01-01');
-- load some data and distribute tables
INSERT INTO partitioning_test VALUES (1, '2009-06-06');
INSERT INTO partitioning_test VALUES (2, '2010-07-07');
INSERT INTO partitioning_test_2009 VALUES (3, '2009-09-09');
INSERT INTO partitioning_test_2010 VALUES (4, '2010-03-03');
-- distribute partitioned table
SELECT create_distributed_table('partitioning_test', 'id');
-- see from MX node, the data is loaded to shards
\c - - - :worker_1_port
SELECT * FROM partitioning_test ORDER BY 1;
-- see from MX node, partitioned table and its partitions are distributed
SELECT
logicalrelid
FROM
pg_dist_partition
WHERE
logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010')
ORDER BY 1;
SELECT
logicalrelid, count(*)
FROM pg_dist_shard
WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010')
GROUP BY
logicalrelid
ORDER BY
1,2;
-- see from MX node, partitioning hierarchy is built
SELECT inhrelid::regclass FROM pg_inherits WHERE inhparent = 'partitioning_test'::regclass;
\c - - - :master_port
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
-- 2-) Creating partition of a distributed table
CREATE TABLE partitioning_test_2011 PARTITION OF partitioning_test FOR VALUES FROM ('2011-01-01') TO ('2012-01-01');
-- see from MX node, new partition is automatically distributed as well
\c - - - :worker_1_port
SELECT
logicalrelid
FROM
pg_dist_partition
WHERE
logicalrelid IN ('partitioning_test', 'partitioning_test_2011')
ORDER BY 1;
SELECT
logicalrelid, count(*)
FROM pg_dist_shard
WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2011')
GROUP BY
logicalrelid
ORDER BY
1,2;
-- see from MX node, partitioning hierarchy is built
SELECT inhrelid::regclass FROM pg_inherits WHERE inhparent = 'partitioning_test'::regclass;
\c - - - :master_port
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
-- 3-) Attaching non distributed table to a distributed table
CREATE TABLE partitioning_test_2012(id int, time date);
-- load some data
INSERT INTO partitioning_test_2012 VALUES (5, '2012-06-06');
INSERT INTO partitioning_test_2012 VALUES (6, '2012-07-07');
ALTER TABLE partitioning_test ATTACH PARTITION partitioning_test_2012 FOR VALUES FROM ('2012-01-01') TO ('2013-01-01');
-- see from MX node, attached partition is distributed as well
\c - - - :worker_1_port
SELECT
logicalrelid
FROM
pg_dist_partition
WHERE
logicalrelid IN ('partitioning_test', 'partitioning_test_2012')
ORDER BY 1;
SELECT
logicalrelid, count(*)
FROM pg_dist_shard
WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2012')
GROUP BY
logicalrelid
ORDER BY
1,2;
-- see from MX node, see the data is loaded to shards
SELECT * FROM partitioning_test ORDER BY 1;
-- see from MX node, partitioning hierarchy is built
SELECT inhrelid::regclass FROM pg_inherits WHERE inhparent = 'partitioning_test'::regclass;
\c - - - :master_port
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
-- 4-) Attaching distributed table to distributed table
CREATE TABLE partitioning_test_2013(id int, time date);
SELECT create_distributed_table('partitioning_test_2013', 'id');
-- load some data
INSERT INTO partitioning_test_2013 VALUES (7, '2013-06-06');
INSERT INTO partitioning_test_2013 VALUES (8, '2013-07-07');
ALTER TABLE partitioning_test ATTACH PARTITION partitioning_test_2013 FOR VALUES FROM ('2013-01-01') TO ('2014-01-01');
-- see from MX node, see the data is loaded to shards
\c - - - :worker_1_port
SELECT * FROM partitioning_test ORDER BY 1;
-- see from MX node, partitioning hierarchy is built
SELECT inhrelid::regclass FROM pg_inherits WHERE inhparent = 'partitioning_test'::regclass;
\c - - - :master_port
-- 5-) Detaching partition of the partitioned table
ALTER TABLE partitioning_test DETACH PARTITION partitioning_test_2009;
-- see from MX node, partitioning hierarchy is built
\c - - - :worker_1_port
SELECT inhrelid::regclass FROM pg_inherits WHERE inhparent = 'partitioning_test'::regclass;
\c - - - :master_port
-- make sure we can repeatedly call start_metadata_sync_to_node
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
-- make sure we can drop partitions
DROP TABLE partitioning_test_2009;
DROP TABLE partitioning_test_2010;
-- make sure we can drop partitioned table
DROP TABLE partitioning_test;
DROP TABLE IF EXISTS partitioning_test_2013;