diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 224abc7cc..b646738b5 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -728,11 +728,12 @@ EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn, } /* we currently don't support MX tables to be distributed partitioned table */ - if (replicationModel == REPLICATION_MODEL_STREAMING) + if (replicationModel == REPLICATION_MODEL_STREAMING && + CountPrimariesWithMetadata() > 0) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("distributing partitioned tables which uses " - "streaming replication is not supported"))); + errmsg("distributing partitioned tables is not supported " + "with Citus MX"))); } /* we don't support distributing tables with multi-level partitioning */ diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 7d51d08f9..e7af6f158 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -35,6 +35,7 @@ #include "distributed/metadata_cache.h" #include "distributed/metadata_sync.h" #include "distributed/multi_join_order.h" +#include "distributed/multi_partitioning_utils.h" #include "distributed/pg_dist_node.h" #include "distributed/worker_manager.h" #include "distributed/worker_transaction.h" @@ -251,6 +252,16 @@ MetadataCreateCommands(void) if (ShouldSyncTableMetadata(cacheEntry->relationId)) { propagatedTableList = lappend(propagatedTableList, cacheEntry); + + if (PartitionedTable(cacheEntry->relationId)) + { + char *relationName = get_rel_name(cacheEntry->relationId); + + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot perform metadata sync for " + "partitioned table \"%s\"", + relationName))); + } } } diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c index 164db6ae6..d13350023 100644 --- a/src/backend/distributed/utils/node_metadata.c +++ b/src/backend/distributed/utils/node_metadata.c @@ -58,7 +58,6 @@ static void RemoveNodeFromCluster(char *nodeName, int32 nodePort); static Datum AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack, bool hasMetadata, bool isActive, Oid nodeRole, char *nodeCluster, bool *nodeAlreadyExists); -static uint32 CountPrimariesWithMetadata(); static void SetNodeState(char *nodeName, int32 nodePort, bool isActive); static HeapTuple GetNodeTuple(char *nodeName, int32 nodePort); static Datum GenerateNodeTuple(WorkerNode *workerNode); @@ -861,8 +860,8 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort) /* CountPrimariesWithMetadata returns the number of primary nodes which have metadata. */ -static uint32 -CountPrimariesWithMetadata() +uint32 +CountPrimariesWithMetadata(void) { uint32 primariesWithMetadata = 0; WorkerNode *workerNode = NULL; diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index 4e6d017b5..cc7860935 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -75,6 +75,7 @@ extern WorkerNode * PrimaryNodeForGroup(uint32 groupId, bool *groupContainsNodes extern bool WorkerNodeIsPrimary(WorkerNode *worker); extern bool WorkerNodeIsSecondary(WorkerNode *worker); extern bool WorkerNodeIsReadable(WorkerNode *worker); +extern uint32 CountPrimariesWithMetadata(void); /* Function declarations for worker node utilities */ extern int CompareWorkerNodes(const void *leftElement, const void *rightElement); diff --git a/src/test/regress/expected/multi_partitioning.out b/src/test/regress/expected/multi_partitioning.out index b0893e0ec..0b2afe3b9 100644 --- a/src/test/regress/expected/multi_partitioning.out +++ b/src/test/regress/expected/multi_partitioning.out @@ -1237,13 +1237,21 @@ ORDER BY COMMIT; DROP TABLE IF EXISTS - partitioning_test_2012, - partitioning_test_2013, + partitioning_test_2009, partitioned_events_table, partitioned_users_table, list_partitioned_events_table, multi_column_partitioning, partitioning_locks, partitioning_locks_for_select; -NOTICE: table "partitioning_test_2012" does not exist, skipping -NOTICE: table "partitioning_test_2013" does not exist, skipping +-- make sure we can create a partitioned table with streaming replication +SET citus.replication_model TO 'streaming'; +CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time); +CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01'); +SELECT create_distributed_table('partitioning_test', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +DROP TABLE partitioning_test; diff --git a/src/test/regress/expected/multi_partitioning_0.out b/src/test/regress/expected/multi_partitioning_0.out index 056ef12a7..3026b668e 100644 --- a/src/test/regress/expected/multi_partitioning_0.out +++ b/src/test/regress/expected/multi_partitioning_0.out @@ -1114,16 +1114,32 @@ ERROR: current transaction is aborted, commands ignored until end of transactio COMMIT; DROP TABLE IF EXISTS - partitioning_test_2012, - partitioning_test_2013, + partitioning_test_2009, partitioned_events_table, partitioned_users_table, list_partitioned_events_table, multi_column_partitioning, partitioning_locks, partitioning_locks_for_select; +NOTICE: table "partitioning_test_2009" does not exist, skipping NOTICE: table "partitioned_events_table" does not exist, skipping NOTICE: table "partitioned_users_table" does not exist, skipping NOTICE: table "list_partitioned_events_table" does not exist, skipping NOTICE: table "multi_column_partitioning" does not exist, skipping NOTICE: table "partitioning_locks" does not exist, skipping +-- make sure we can create a partitioned table with streaming replication +SET citus.replication_model TO 'streaming'; +CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time); +ERROR: syntax error at or near "PARTITION" +LINE 1: CREATE TABLE partitioning_test(id int, time date) PARTITION ... + ^ +CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01'); +ERROR: syntax error at or near "PARTITION" +LINE 1: CREATE TABLE partitioning_test_2009 PARTITION OF partitionin... + ^ +SELECT create_distributed_table('partitioning_test', 'id'); +ERROR: relation "partitioning_test" does not exist +LINE 1: SELECT create_distributed_table('partitioning_test', 'id'); + ^ +DROP TABLE partitioning_test; +ERROR: table "partitioning_test" does not exist diff --git a/src/test/regress/sql/multi_partitioning.sql b/src/test/regress/sql/multi_partitioning.sql index 2b122cf61..d55e52394 100644 --- a/src/test/regress/sql/multi_partitioning.sql +++ b/src/test/regress/sql/multi_partitioning.sql @@ -806,11 +806,17 @@ COMMIT; DROP TABLE IF EXISTS - partitioning_test_2012, - partitioning_test_2013, + partitioning_test_2009, partitioned_events_table, partitioned_users_table, list_partitioned_events_table, multi_column_partitioning, partitioning_locks, partitioning_locks_for_select; + +-- make sure we can create a partitioned table with streaming replication +SET citus.replication_model TO 'streaming'; +CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time); +CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01'); +SELECT create_distributed_table('partitioning_test', 'id'); +DROP TABLE partitioning_test;