mirror of https://github.com/citusdata/citus.git
Allow distributed partitioned table creation in Cloud
parent
6219186683
commit
6883a09cdd
|
@ -728,11 +728,12 @@ EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn,
|
||||||
}
|
}
|
||||||
|
|
||||||
/* we currently don't support MX tables to be distributed partitioned table */
|
/* we currently don't support MX tables to be distributed partitioned table */
|
||||||
if (replicationModel == REPLICATION_MODEL_STREAMING)
|
if (replicationModel == REPLICATION_MODEL_STREAMING &&
|
||||||
|
CountPrimariesWithMetadata() > 0)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
errmsg("distributing partitioned tables which uses "
|
errmsg("distributing partitioned tables is not supported "
|
||||||
"streaming replication is not supported")));
|
"with Citus MX")));
|
||||||
}
|
}
|
||||||
|
|
||||||
/* we don't support distributing tables with multi-level partitioning */
|
/* we don't support distributing tables with multi-level partitioning */
|
||||||
|
|
|
@ -35,6 +35,7 @@
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/metadata_sync.h"
|
#include "distributed/metadata_sync.h"
|
||||||
#include "distributed/multi_join_order.h"
|
#include "distributed/multi_join_order.h"
|
||||||
|
#include "distributed/multi_partitioning_utils.h"
|
||||||
#include "distributed/pg_dist_node.h"
|
#include "distributed/pg_dist_node.h"
|
||||||
#include "distributed/worker_manager.h"
|
#include "distributed/worker_manager.h"
|
||||||
#include "distributed/worker_transaction.h"
|
#include "distributed/worker_transaction.h"
|
||||||
|
@ -251,6 +252,16 @@ MetadataCreateCommands(void)
|
||||||
if (ShouldSyncTableMetadata(cacheEntry->relationId))
|
if (ShouldSyncTableMetadata(cacheEntry->relationId))
|
||||||
{
|
{
|
||||||
propagatedTableList = lappend(propagatedTableList, cacheEntry);
|
propagatedTableList = lappend(propagatedTableList, cacheEntry);
|
||||||
|
|
||||||
|
if (PartitionedTable(cacheEntry->relationId))
|
||||||
|
{
|
||||||
|
char *relationName = get_rel_name(cacheEntry->relationId);
|
||||||
|
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
errmsg("cannot perform metadata sync for "
|
||||||
|
"partitioned table \"%s\"",
|
||||||
|
relationName)));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -58,7 +58,6 @@ static void RemoveNodeFromCluster(char *nodeName, int32 nodePort);
|
||||||
static Datum AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId,
|
static Datum AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId,
|
||||||
char *nodeRack, bool hasMetadata, bool isActive,
|
char *nodeRack, bool hasMetadata, bool isActive,
|
||||||
Oid nodeRole, char *nodeCluster, bool *nodeAlreadyExists);
|
Oid nodeRole, char *nodeCluster, bool *nodeAlreadyExists);
|
||||||
static uint32 CountPrimariesWithMetadata();
|
|
||||||
static void SetNodeState(char *nodeName, int32 nodePort, bool isActive);
|
static void SetNodeState(char *nodeName, int32 nodePort, bool isActive);
|
||||||
static HeapTuple GetNodeTuple(char *nodeName, int32 nodePort);
|
static HeapTuple GetNodeTuple(char *nodeName, int32 nodePort);
|
||||||
static Datum GenerateNodeTuple(WorkerNode *workerNode);
|
static Datum GenerateNodeTuple(WorkerNode *workerNode);
|
||||||
|
@ -861,8 +860,8 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort)
|
||||||
|
|
||||||
|
|
||||||
/* CountPrimariesWithMetadata returns the number of primary nodes which have metadata. */
|
/* CountPrimariesWithMetadata returns the number of primary nodes which have metadata. */
|
||||||
static uint32
|
uint32
|
||||||
CountPrimariesWithMetadata()
|
CountPrimariesWithMetadata(void)
|
||||||
{
|
{
|
||||||
uint32 primariesWithMetadata = 0;
|
uint32 primariesWithMetadata = 0;
|
||||||
WorkerNode *workerNode = NULL;
|
WorkerNode *workerNode = NULL;
|
||||||
|
|
|
@ -75,6 +75,7 @@ extern WorkerNode * PrimaryNodeForGroup(uint32 groupId, bool *groupContainsNodes
|
||||||
extern bool WorkerNodeIsPrimary(WorkerNode *worker);
|
extern bool WorkerNodeIsPrimary(WorkerNode *worker);
|
||||||
extern bool WorkerNodeIsSecondary(WorkerNode *worker);
|
extern bool WorkerNodeIsSecondary(WorkerNode *worker);
|
||||||
extern bool WorkerNodeIsReadable(WorkerNode *worker);
|
extern bool WorkerNodeIsReadable(WorkerNode *worker);
|
||||||
|
extern uint32 CountPrimariesWithMetadata(void);
|
||||||
|
|
||||||
/* Function declarations for worker node utilities */
|
/* Function declarations for worker node utilities */
|
||||||
extern int CompareWorkerNodes(const void *leftElement, const void *rightElement);
|
extern int CompareWorkerNodes(const void *leftElement, const void *rightElement);
|
||||||
|
|
|
@ -1237,13 +1237,21 @@ ORDER BY
|
||||||
COMMIT;
|
COMMIT;
|
||||||
DROP TABLE
|
DROP TABLE
|
||||||
IF EXISTS
|
IF EXISTS
|
||||||
partitioning_test_2012,
|
partitioning_test_2009,
|
||||||
partitioning_test_2013,
|
|
||||||
partitioned_events_table,
|
partitioned_events_table,
|
||||||
partitioned_users_table,
|
partitioned_users_table,
|
||||||
list_partitioned_events_table,
|
list_partitioned_events_table,
|
||||||
multi_column_partitioning,
|
multi_column_partitioning,
|
||||||
partitioning_locks,
|
partitioning_locks,
|
||||||
partitioning_locks_for_select;
|
partitioning_locks_for_select;
|
||||||
NOTICE: table "partitioning_test_2012" does not exist, skipping
|
-- make sure we can create a partitioned table with streaming replication
|
||||||
NOTICE: table "partitioning_test_2013" does not exist, skipping
|
SET citus.replication_model TO 'streaming';
|
||||||
|
CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time);
|
||||||
|
CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01');
|
||||||
|
SELECT create_distributed_table('partitioning_test', 'id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE partitioning_test;
|
||||||
|
|
|
@ -1114,16 +1114,32 @@ ERROR: current transaction is aborted, commands ignored until end of transactio
|
||||||
COMMIT;
|
COMMIT;
|
||||||
DROP TABLE
|
DROP TABLE
|
||||||
IF EXISTS
|
IF EXISTS
|
||||||
partitioning_test_2012,
|
partitioning_test_2009,
|
||||||
partitioning_test_2013,
|
|
||||||
partitioned_events_table,
|
partitioned_events_table,
|
||||||
partitioned_users_table,
|
partitioned_users_table,
|
||||||
list_partitioned_events_table,
|
list_partitioned_events_table,
|
||||||
multi_column_partitioning,
|
multi_column_partitioning,
|
||||||
partitioning_locks,
|
partitioning_locks,
|
||||||
partitioning_locks_for_select;
|
partitioning_locks_for_select;
|
||||||
|
NOTICE: table "partitioning_test_2009" does not exist, skipping
|
||||||
NOTICE: table "partitioned_events_table" does not exist, skipping
|
NOTICE: table "partitioned_events_table" does not exist, skipping
|
||||||
NOTICE: table "partitioned_users_table" does not exist, skipping
|
NOTICE: table "partitioned_users_table" does not exist, skipping
|
||||||
NOTICE: table "list_partitioned_events_table" does not exist, skipping
|
NOTICE: table "list_partitioned_events_table" does not exist, skipping
|
||||||
NOTICE: table "multi_column_partitioning" does not exist, skipping
|
NOTICE: table "multi_column_partitioning" does not exist, skipping
|
||||||
NOTICE: table "partitioning_locks" does not exist, skipping
|
NOTICE: table "partitioning_locks" does not exist, skipping
|
||||||
|
-- make sure we can create a partitioned table with streaming replication
|
||||||
|
SET citus.replication_model TO 'streaming';
|
||||||
|
CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time);
|
||||||
|
ERROR: syntax error at or near "PARTITION"
|
||||||
|
LINE 1: CREATE TABLE partitioning_test(id int, time date) PARTITION ...
|
||||||
|
^
|
||||||
|
CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01');
|
||||||
|
ERROR: syntax error at or near "PARTITION"
|
||||||
|
LINE 1: CREATE TABLE partitioning_test_2009 PARTITION OF partitionin...
|
||||||
|
^
|
||||||
|
SELECT create_distributed_table('partitioning_test', 'id');
|
||||||
|
ERROR: relation "partitioning_test" does not exist
|
||||||
|
LINE 1: SELECT create_distributed_table('partitioning_test', 'id');
|
||||||
|
^
|
||||||
|
DROP TABLE partitioning_test;
|
||||||
|
ERROR: table "partitioning_test" does not exist
|
||||||
|
|
|
@ -806,11 +806,17 @@ COMMIT;
|
||||||
|
|
||||||
DROP TABLE
|
DROP TABLE
|
||||||
IF EXISTS
|
IF EXISTS
|
||||||
partitioning_test_2012,
|
partitioning_test_2009,
|
||||||
partitioning_test_2013,
|
|
||||||
partitioned_events_table,
|
partitioned_events_table,
|
||||||
partitioned_users_table,
|
partitioned_users_table,
|
||||||
list_partitioned_events_table,
|
list_partitioned_events_table,
|
||||||
multi_column_partitioning,
|
multi_column_partitioning,
|
||||||
partitioning_locks,
|
partitioning_locks,
|
||||||
partitioning_locks_for_select;
|
partitioning_locks_for_select;
|
||||||
|
|
||||||
|
-- make sure we can create a partitioned table with streaming replication
|
||||||
|
SET citus.replication_model TO 'streaming';
|
||||||
|
CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time);
|
||||||
|
CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01');
|
||||||
|
SELECT create_distributed_table('partitioning_test', 'id');
|
||||||
|
DROP TABLE partitioning_test;
|
||||||
|
|
Loading…
Reference in New Issue