mirror of https://github.com/citusdata/citus.git
Enforce statement based replication on old APIs and non-hash tables
This change ignores `citus.replication_model` setting and uses the statement based replication in - Tables distributed via the old `master_create_distributed_table` function - Append and range partitioned tables, even if created via `create_distributed_table` function This seems like the easiest solution to #1191, without changing the existing behavior and harming existing users with custom scripts. This change also prevents RF>1 on streaming replicated tables on `master_create_worker_shards` Prior to this change, `master_create_worker_shards` command was not checking the replication model of the target table, thus allowing RF>1 with streaming replicated tables. With this change, `master_create_worker_shards` errors out on the case.pull/1206/head
parent
1ba078caea
commit
df9cf346ee
|
@ -67,7 +67,8 @@ int ReplicationModel = REPLICATION_MODEL_COORDINATOR;
|
||||||
/* local function forward declarations */
|
/* local function forward declarations */
|
||||||
static void CreateReferenceTable(Oid relationId);
|
static void CreateReferenceTable(Oid relationId);
|
||||||
static void ConvertToDistributedTable(Oid relationId, char *distributionColumnName,
|
static void ConvertToDistributedTable(Oid relationId, char *distributionColumnName,
|
||||||
char distributionMethod, uint32 colocationId);
|
char distributionMethod, char replicationModel,
|
||||||
|
uint32 colocationId);
|
||||||
static char LookupDistributionMethod(Oid distributionMethodOid);
|
static char LookupDistributionMethod(Oid distributionMethodOid);
|
||||||
static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId,
|
static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId,
|
||||||
int16 supportFunctionNumber);
|
int16 supportFunctionNumber);
|
||||||
|
@ -108,8 +109,19 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
EnsureCoordinator();
|
EnsureCoordinator();
|
||||||
|
|
||||||
|
if (ReplicationModel != REPLICATION_MODEL_COORDINATOR)
|
||||||
|
{
|
||||||
|
ereport(NOTICE, (errmsg("using statement-based replication"),
|
||||||
|
errdetail("The current replication_model setting is "
|
||||||
|
"'streaming', which is not supported by "
|
||||||
|
"master_create_distributed_table."),
|
||||||
|
errhint("Use create_distributed_table to use the streaming "
|
||||||
|
"replication model.")));
|
||||||
|
}
|
||||||
|
|
||||||
ConvertToDistributedTable(distributedRelationId, distributionColumnName,
|
ConvertToDistributedTable(distributedRelationId, distributionColumnName,
|
||||||
distributionMethod, INVALID_COLOCATION_ID);
|
distributionMethod, REPLICATION_MODEL_COORDINATOR,
|
||||||
|
INVALID_COLOCATION_ID);
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
}
|
}
|
||||||
|
@ -165,8 +177,16 @@ create_distributed_table(PG_FUNCTION_ARGS)
|
||||||
/* if distribution method is not hash, just create partition metadata */
|
/* if distribution method is not hash, just create partition metadata */
|
||||||
if (distributionMethod != DISTRIBUTE_BY_HASH)
|
if (distributionMethod != DISTRIBUTE_BY_HASH)
|
||||||
{
|
{
|
||||||
|
if (ReplicationModel != REPLICATION_MODEL_COORDINATOR)
|
||||||
|
{
|
||||||
|
ereport(NOTICE, (errmsg("using statement-based replication"),
|
||||||
|
errdetail("Streaming replication is supported only for "
|
||||||
|
"hash-distributed tables.")));
|
||||||
|
}
|
||||||
|
|
||||||
ConvertToDistributedTable(relationId, distributionColumnName,
|
ConvertToDistributedTable(relationId, distributionColumnName,
|
||||||
distributionMethod, INVALID_COLOCATION_ID);
|
distributionMethod, REPLICATION_MODEL_COORDINATOR,
|
||||||
|
INVALID_COLOCATION_ID);
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -229,7 +249,7 @@ CreateReferenceTable(Oid relationId)
|
||||||
|
|
||||||
/* first, convert the relation into distributed relation */
|
/* first, convert the relation into distributed relation */
|
||||||
ConvertToDistributedTable(relationId, distributionColumnName,
|
ConvertToDistributedTable(relationId, distributionColumnName,
|
||||||
DISTRIBUTE_BY_NONE, colocationId);
|
DISTRIBUTE_BY_NONE, REPLICATION_MODEL_2PC, colocationId);
|
||||||
|
|
||||||
/* now, create the single shard replicated to all nodes */
|
/* now, create the single shard replicated to all nodes */
|
||||||
CreateReferenceTableShard(relationId);
|
CreateReferenceTableShard(relationId);
|
||||||
|
@ -251,27 +271,17 @@ CreateReferenceTable(Oid relationId)
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
ConvertToDistributedTable(Oid relationId, char *distributionColumnName,
|
ConvertToDistributedTable(Oid relationId, char *distributionColumnName,
|
||||||
char distributionMethod, uint32 colocationId)
|
char distributionMethod, char replicationModel,
|
||||||
|
uint32 colocationId)
|
||||||
{
|
{
|
||||||
Relation relation = NULL;
|
Relation relation = NULL;
|
||||||
TupleDesc relationDesc = NULL;
|
TupleDesc relationDesc = NULL;
|
||||||
char *relationName = NULL;
|
char *relationName = NULL;
|
||||||
char relationKind = 0;
|
char relationKind = 0;
|
||||||
Var *distributionColumn = NULL;
|
Var *distributionColumn = NULL;
|
||||||
char replicationModel = REPLICATION_MODEL_INVALID;
|
|
||||||
|
|
||||||
/* check global replication settings before continuing */
|
/* check global replication settings before continuing */
|
||||||
EnsureReplicationSettings(InvalidOid);
|
EnsureReplicationSettings(InvalidOid, replicationModel);
|
||||||
|
|
||||||
/* distribute by none tables use 2PC replication; otherwise use GUC setting */
|
|
||||||
if (distributionMethod == DISTRIBUTE_BY_NONE)
|
|
||||||
{
|
|
||||||
replicationModel = REPLICATION_MODEL_2PC;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
replicationModel = ReplicationModel;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Lock target relation with an exclusive lock - there's no way to make
|
* Lock target relation with an exclusive lock - there's no way to make
|
||||||
|
@ -949,7 +959,7 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
|
||||||
|
|
||||||
/* create distributed table metadata */
|
/* create distributed table metadata */
|
||||||
ConvertToDistributedTable(relationId, distributionColumnName, DISTRIBUTE_BY_HASH,
|
ConvertToDistributedTable(relationId, distributionColumnName, DISTRIBUTE_BY_HASH,
|
||||||
colocationId);
|
ReplicationModel, colocationId);
|
||||||
|
|
||||||
/* create shards */
|
/* create shards */
|
||||||
if (sourceRelationId != InvalidOid)
|
if (sourceRelationId != InvalidOid)
|
||||||
|
@ -991,15 +1001,13 @@ ColumnType(Oid relationId, char *columnName)
|
||||||
* is detected.
|
* is detected.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
EnsureReplicationSettings(Oid relationId)
|
EnsureReplicationSettings(Oid relationId, char replicationModel)
|
||||||
{
|
{
|
||||||
char replicationModel = (char) ReplicationModel;
|
|
||||||
char *msgSuffix = "the streaming replication model";
|
char *msgSuffix = "the streaming replication model";
|
||||||
char *extraHint = " or setting \"citus.replication_model\" to \"statement\"";
|
char *extraHint = " or setting \"citus.replication_model\" to \"statement\"";
|
||||||
|
|
||||||
if (relationId != InvalidOid)
|
if (relationId != InvalidOid)
|
||||||
{
|
{
|
||||||
replicationModel = TableReplicationModel(relationId);
|
|
||||||
msgSuffix = "tables which use the streaming replication model";
|
msgSuffix = "tables which use the streaming replication model";
|
||||||
extraHint = "";
|
extraHint = "";
|
||||||
}
|
}
|
||||||
|
|
|
@ -97,6 +97,7 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
|
||||||
uint64 hashTokenIncrement = 0;
|
uint64 hashTokenIncrement = 0;
|
||||||
List *existingShardList = NIL;
|
List *existingShardList = NIL;
|
||||||
int64 shardIndex = 0;
|
int64 shardIndex = 0;
|
||||||
|
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
|
||||||
|
|
||||||
/* make sure table is hash partitioned */
|
/* make sure table is hash partitioned */
|
||||||
CheckHashPartitionedTable(distributedTableId);
|
CheckHashPartitionedTable(distributedTableId);
|
||||||
|
@ -138,6 +139,21 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
|
||||||
errmsg("replication_factor must be positive")));
|
errmsg("replication_factor must be positive")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* make sure that RF=1 if the table is streaming replicated */
|
||||||
|
if (cacheEntry->replicationModel == REPLICATION_MODEL_STREAMING &&
|
||||||
|
replicationFactor > 1)
|
||||||
|
{
|
||||||
|
char *relationName = get_rel_name(cacheEntry->relationId);
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||||
|
errmsg("using replication factor %d with the streaming "
|
||||||
|
"replication model is not supported",
|
||||||
|
replicationFactor),
|
||||||
|
errdetail("The table %s is marked as streaming replicated and "
|
||||||
|
"the shard replication factor of streaming replicated "
|
||||||
|
"tables must be 1.", relationName),
|
||||||
|
errhint("Use replication factor 1.")));
|
||||||
|
}
|
||||||
|
|
||||||
/* calculate the split of the hash space */
|
/* calculate the split of the hash space */
|
||||||
hashTokenIncrement = HASH_TOKEN_COUNT / shardCount;
|
hashTokenIncrement = HASH_TOKEN_COUNT / shardCount;
|
||||||
|
|
||||||
|
|
|
@ -87,6 +87,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
|
||||||
Oid relationId = ResolveRelationId(relationNameText);
|
Oid relationId = ResolveRelationId(relationNameText);
|
||||||
char relationKind = get_rel_relkind(relationId);
|
char relationKind = get_rel_relkind(relationId);
|
||||||
char *relationOwner = TableOwner(relationId);
|
char *relationOwner = TableOwner(relationId);
|
||||||
|
char replicationModel = REPLICATION_MODEL_INVALID;
|
||||||
|
|
||||||
EnsureTablePermissions(relationId, ACL_INSERT);
|
EnsureTablePermissions(relationId, ACL_INSERT);
|
||||||
CheckDistributedTable(relationId);
|
CheckDistributedTable(relationId);
|
||||||
|
@ -127,7 +128,9 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
|
||||||
"on reference tables")));
|
"on reference tables")));
|
||||||
}
|
}
|
||||||
|
|
||||||
EnsureReplicationSettings(relationId);
|
replicationModel = TableReplicationModel(relationId);
|
||||||
|
|
||||||
|
EnsureReplicationSettings(relationId, replicationModel);
|
||||||
|
|
||||||
/* generate new and unique shardId from sequence */
|
/* generate new and unique shardId from sequence */
|
||||||
shardId = GetNextShardId();
|
shardId = GetNextShardId();
|
||||||
|
|
|
@ -100,7 +100,7 @@ extern char * TableOwner(Oid relationId);
|
||||||
extern void EnsureTablePermissions(Oid relationId, AclMode mode);
|
extern void EnsureTablePermissions(Oid relationId, AclMode mode);
|
||||||
extern void EnsureTableOwner(Oid relationId);
|
extern void EnsureTableOwner(Oid relationId);
|
||||||
extern void EnsureSuperUser(void);
|
extern void EnsureSuperUser(void);
|
||||||
extern void EnsureReplicationSettings(Oid relationId);
|
extern void EnsureReplicationSettings(Oid relationId, char replicationModel);
|
||||||
extern bool TableReferenced(Oid relationId);
|
extern bool TableReferenced(Oid relationId);
|
||||||
extern char * ConstructQualifiedShardName(ShardInterval *shardInterval);
|
extern char * ConstructQualifiedShardName(ShardInterval *shardInterval);
|
||||||
extern Datum StringToDatum(char *inputString, Oid dataType);
|
extern Datum StringToDatum(char *inputString, Oid dataType);
|
||||||
|
|
|
@ -160,39 +160,206 @@ SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regcl
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
DROP TABLE mx_table_test;
|
DROP TABLE mx_table_test;
|
||||||
RESET citus.replication_model;
|
-- Show that master_create_distributed_table ignores citus.replication_model GUC
|
||||||
-- Show that it is not possible to create an mx table with the old
|
CREATE TABLE s_table(a int);
|
||||||
-- master_create_distributed_table function
|
SELECT master_create_distributed_table('s_table', 'a', 'hash');
|
||||||
CREATE TABLE mx_table_test (col1 int, col2 text);
|
NOTICE: using statement-based replication
|
||||||
SELECT master_create_distributed_table('mx_table_test', 'col1', 'hash');
|
DETAIL: The current replication_model setting is 'streaming', which is not supported by master_create_distributed_table.
|
||||||
|
HINT: Use create_distributed_table to use the streaming replication model.
|
||||||
master_create_distributed_table
|
master_create_distributed_table
|
||||||
---------------------------------
|
---------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regclass;
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='s_table'::regclass;
|
||||||
repmodel
|
repmodel
|
||||||
----------
|
----------
|
||||||
c
|
c
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
DROP TABLE mx_table_test;
|
-- Show that master_create_worker_shards complains when RF>1 and replication model is streaming
|
||||||
-- Show that when replication factor > 1 the table is created as coordinator-replicated
|
UPDATE pg_dist_partition SET repmodel = 's' WHERE logicalrelid='s_table'::regclass;
|
||||||
|
SELECT master_create_worker_shards('s_table', 4, 2);
|
||||||
|
ERROR: using replication factor 2 with the streaming replication model is not supported
|
||||||
|
DETAIL: The table s_table is marked as streaming replicated and the shard replication factor of streaming replicated tables must be 1.
|
||||||
|
HINT: Use replication factor 1.
|
||||||
|
DROP TABLE s_table;
|
||||||
|
RESET citus.replication_model;
|
||||||
|
-- Show that create_distributed_table with append and range distributions ignore
|
||||||
|
-- citus.replication_model GUC
|
||||||
SET citus.shard_replication_factor TO 2;
|
SET citus.shard_replication_factor TO 2;
|
||||||
CREATE TABLE mx_table_test (col1 int, col2 text);
|
SET citus.replication_model TO streaming;
|
||||||
SELECT create_distributed_table('mx_table_test', 'col1');
|
CREATE TABLE repmodel_test (a int);
|
||||||
|
SELECT create_distributed_table('repmodel_test', 'a', 'append');
|
||||||
|
NOTICE: using statement-based replication
|
||||||
|
DETAIL: Streaming replication is supported only for hash-distributed tables.
|
||||||
create_distributed_table
|
create_distributed_table
|
||||||
--------------------------
|
--------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regclass;
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
|
||||||
repmodel
|
repmodel
|
||||||
----------
|
----------
|
||||||
c
|
c
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
DROP TABLE mx_table_test;
|
DROP TABLE repmodel_test;
|
||||||
|
CREATE TABLE repmodel_test (a int);
|
||||||
|
SELECT create_distributed_table('repmodel_test', 'a', 'range');
|
||||||
|
NOTICE: using statement-based replication
|
||||||
|
DETAIL: Streaming replication is supported only for hash-distributed tables.
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
|
||||||
|
repmodel
|
||||||
|
----------
|
||||||
|
c
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE repmodel_test;
|
||||||
|
-- Show that master_create_distributed_table created statement replicated tables no matter
|
||||||
|
-- what citus.replication_model set to
|
||||||
|
CREATE TABLE repmodel_test (a int);
|
||||||
|
SELECT master_create_distributed_table('repmodel_test', 'a', 'hash');
|
||||||
|
NOTICE: using statement-based replication
|
||||||
|
DETAIL: The current replication_model setting is 'streaming', which is not supported by master_create_distributed_table.
|
||||||
|
HINT: Use create_distributed_table to use the streaming replication model.
|
||||||
|
master_create_distributed_table
|
||||||
|
---------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
|
||||||
|
repmodel
|
||||||
|
----------
|
||||||
|
c
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE repmodel_test;
|
||||||
|
CREATE TABLE repmodel_test (a int);
|
||||||
|
SELECT master_create_distributed_table('repmodel_test', 'a', 'append');
|
||||||
|
NOTICE: using statement-based replication
|
||||||
|
DETAIL: The current replication_model setting is 'streaming', which is not supported by master_create_distributed_table.
|
||||||
|
HINT: Use create_distributed_table to use the streaming replication model.
|
||||||
|
master_create_distributed_table
|
||||||
|
---------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
|
||||||
|
repmodel
|
||||||
|
----------
|
||||||
|
c
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE repmodel_test;
|
||||||
|
CREATE TABLE repmodel_test (a int);
|
||||||
|
SELECT master_create_distributed_table('repmodel_test', 'a', 'range');
|
||||||
|
NOTICE: using statement-based replication
|
||||||
|
DETAIL: The current replication_model setting is 'streaming', which is not supported by master_create_distributed_table.
|
||||||
|
HINT: Use create_distributed_table to use the streaming replication model.
|
||||||
|
master_create_distributed_table
|
||||||
|
---------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
|
||||||
|
repmodel
|
||||||
|
----------
|
||||||
|
c
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE repmodel_test;
|
||||||
|
-- Check that the replication_model overwrite behavior is the same with RF=1
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
CREATE TABLE repmodel_test (a int);
|
||||||
|
SELECT create_distributed_table('repmodel_test', 'a', 'append');
|
||||||
|
NOTICE: using statement-based replication
|
||||||
|
DETAIL: Streaming replication is supported only for hash-distributed tables.
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
|
||||||
|
repmodel
|
||||||
|
----------
|
||||||
|
c
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE repmodel_test;
|
||||||
|
CREATE TABLE repmodel_test (a int);
|
||||||
|
SELECT create_distributed_table('repmodel_test', 'a', 'range');
|
||||||
|
NOTICE: using statement-based replication
|
||||||
|
DETAIL: Streaming replication is supported only for hash-distributed tables.
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
|
||||||
|
repmodel
|
||||||
|
----------
|
||||||
|
c
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE repmodel_test;
|
||||||
|
CREATE TABLE repmodel_test (a int);
|
||||||
|
SELECT master_create_distributed_table('repmodel_test', 'a', 'hash');
|
||||||
|
NOTICE: using statement-based replication
|
||||||
|
DETAIL: The current replication_model setting is 'streaming', which is not supported by master_create_distributed_table.
|
||||||
|
HINT: Use create_distributed_table to use the streaming replication model.
|
||||||
|
master_create_distributed_table
|
||||||
|
---------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
|
||||||
|
repmodel
|
||||||
|
----------
|
||||||
|
c
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE repmodel_test;
|
||||||
|
CREATE TABLE repmodel_test (a int);
|
||||||
|
SELECT master_create_distributed_table('repmodel_test', 'a', 'append');
|
||||||
|
NOTICE: using statement-based replication
|
||||||
|
DETAIL: The current replication_model setting is 'streaming', which is not supported by master_create_distributed_table.
|
||||||
|
HINT: Use create_distributed_table to use the streaming replication model.
|
||||||
|
master_create_distributed_table
|
||||||
|
---------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
|
||||||
|
repmodel
|
||||||
|
----------
|
||||||
|
c
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE repmodel_test;
|
||||||
|
CREATE TABLE repmodel_test (a int);
|
||||||
|
SELECT master_create_distributed_table('repmodel_test', 'a', 'range');
|
||||||
|
NOTICE: using statement-based replication
|
||||||
|
DETAIL: The current replication_model setting is 'streaming', which is not supported by master_create_distributed_table.
|
||||||
|
HINT: Use create_distributed_table to use the streaming replication model.
|
||||||
|
master_create_distributed_table
|
||||||
|
---------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
|
||||||
|
repmodel
|
||||||
|
----------
|
||||||
|
c
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE repmodel_test;
|
||||||
|
RESET citus.replication_model;
|
||||||
SET citus.shard_replication_factor TO default;
|
SET citus.shard_replication_factor TO default;
|
||||||
SET citus.shard_count to 4;
|
SET citus.shard_count to 4;
|
||||||
CREATE TABLE lineitem_hash_part (like lineitem);
|
CREATE TABLE lineitem_hash_part (like lineitem);
|
||||||
|
|
|
@ -40,9 +40,8 @@ SELECT master_create_distributed_table('testtableddl', 'distributecol', 'append'
|
||||||
DROP TABLE testtableddl;
|
DROP TABLE testtableddl;
|
||||||
-- verify that the table can dropped even if shards exist
|
-- verify that the table can dropped even if shards exist
|
||||||
CREATE TABLE testtableddl(somecol int, distributecol text NOT NULL);
|
CREATE TABLE testtableddl(somecol int, distributecol text NOT NULL);
|
||||||
-- create table as MX table to do create empty shard test here, too
|
-- create table and do create empty shard test here, too
|
||||||
SET citus.shard_replication_factor TO 1;
|
SET citus.shard_replication_factor TO 1;
|
||||||
SET citus.replication_model TO 'streaming';
|
|
||||||
SELECT master_create_distributed_table('testtableddl', 'distributecol', 'append');
|
SELECT master_create_distributed_table('testtableddl', 'distributecol', 'append');
|
||||||
master_create_distributed_table
|
master_create_distributed_table
|
||||||
---------------------------------
|
---------------------------------
|
||||||
|
@ -55,15 +54,9 @@ SELECT 1 FROM master_create_empty_shard('testtableddl');
|
||||||
1
|
1
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- this'll error out
|
|
||||||
SET citus.shard_replication_factor TO 2;
|
|
||||||
SELECT 1 FROM master_create_empty_shard('testtableddl');
|
|
||||||
ERROR: replication factors above one are incompatible with tables which use the streaming replication model
|
|
||||||
HINT: Try again after reducing "citus.shard_replication_factor" to one.
|
|
||||||
-- now actually drop table and shards
|
-- now actually drop table and shards
|
||||||
DROP TABLE testtableddl;
|
DROP TABLE testtableddl;
|
||||||
RESET citus.shard_replication_factor;
|
RESET citus.shard_replication_factor;
|
||||||
RESET citus.replication_model;
|
|
||||||
-- ensure no metadata of distributed tables are remaining
|
-- ensure no metadata of distributed tables are remaining
|
||||||
SELECT * FROM pg_dist_partition;
|
SELECT * FROM pg_dist_partition;
|
||||||
logicalrelid | partmethod | partkey | colocationid | repmodel
|
logicalrelid | partmethod | partkey | colocationid | repmodel
|
||||||
|
|
|
@ -126,22 +126,81 @@ SELECT create_distributed_table('mx_table_test', 'col1');
|
||||||
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regclass;
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regclass;
|
||||||
DROP TABLE mx_table_test;
|
DROP TABLE mx_table_test;
|
||||||
|
|
||||||
|
-- Show that master_create_distributed_table ignores citus.replication_model GUC
|
||||||
|
CREATE TABLE s_table(a int);
|
||||||
|
SELECT master_create_distributed_table('s_table', 'a', 'hash');
|
||||||
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='s_table'::regclass;
|
||||||
|
|
||||||
|
-- Show that master_create_worker_shards complains when RF>1 and replication model is streaming
|
||||||
|
UPDATE pg_dist_partition SET repmodel = 's' WHERE logicalrelid='s_table'::regclass;
|
||||||
|
SELECT master_create_worker_shards('s_table', 4, 2);
|
||||||
|
|
||||||
|
DROP TABLE s_table;
|
||||||
|
|
||||||
RESET citus.replication_model;
|
RESET citus.replication_model;
|
||||||
|
|
||||||
-- Show that it is not possible to create an mx table with the old
|
-- Show that create_distributed_table with append and range distributions ignore
|
||||||
-- master_create_distributed_table function
|
-- citus.replication_model GUC
|
||||||
CREATE TABLE mx_table_test (col1 int, col2 text);
|
|
||||||
SELECT master_create_distributed_table('mx_table_test', 'col1', 'hash');
|
|
||||||
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regclass;
|
|
||||||
DROP TABLE mx_table_test;
|
|
||||||
|
|
||||||
-- Show that when replication factor > 1 the table is created as coordinator-replicated
|
|
||||||
SET citus.shard_replication_factor TO 2;
|
SET citus.shard_replication_factor TO 2;
|
||||||
CREATE TABLE mx_table_test (col1 int, col2 text);
|
SET citus.replication_model TO streaming;
|
||||||
SELECT create_distributed_table('mx_table_test', 'col1');
|
|
||||||
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regclass;
|
|
||||||
DROP TABLE mx_table_test;
|
|
||||||
|
|
||||||
|
CREATE TABLE repmodel_test (a int);
|
||||||
|
SELECT create_distributed_table('repmodel_test', 'a', 'append');
|
||||||
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
|
||||||
|
DROP TABLE repmodel_test;
|
||||||
|
|
||||||
|
CREATE TABLE repmodel_test (a int);
|
||||||
|
SELECT create_distributed_table('repmodel_test', 'a', 'range');
|
||||||
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
|
||||||
|
DROP TABLE repmodel_test;
|
||||||
|
|
||||||
|
-- Show that master_create_distributed_table created statement replicated tables no matter
|
||||||
|
-- what citus.replication_model set to
|
||||||
|
|
||||||
|
CREATE TABLE repmodel_test (a int);
|
||||||
|
SELECT master_create_distributed_table('repmodel_test', 'a', 'hash');
|
||||||
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
|
||||||
|
DROP TABLE repmodel_test;
|
||||||
|
|
||||||
|
CREATE TABLE repmodel_test (a int);
|
||||||
|
SELECT master_create_distributed_table('repmodel_test', 'a', 'append');
|
||||||
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
|
||||||
|
DROP TABLE repmodel_test;
|
||||||
|
|
||||||
|
CREATE TABLE repmodel_test (a int);
|
||||||
|
SELECT master_create_distributed_table('repmodel_test', 'a', 'range');
|
||||||
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
|
||||||
|
DROP TABLE repmodel_test;
|
||||||
|
|
||||||
|
-- Check that the replication_model overwrite behavior is the same with RF=1
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
|
||||||
|
CREATE TABLE repmodel_test (a int);
|
||||||
|
SELECT create_distributed_table('repmodel_test', 'a', 'append');
|
||||||
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
|
||||||
|
DROP TABLE repmodel_test;
|
||||||
|
|
||||||
|
CREATE TABLE repmodel_test (a int);
|
||||||
|
SELECT create_distributed_table('repmodel_test', 'a', 'range');
|
||||||
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
|
||||||
|
DROP TABLE repmodel_test;
|
||||||
|
|
||||||
|
CREATE TABLE repmodel_test (a int);
|
||||||
|
SELECT master_create_distributed_table('repmodel_test', 'a', 'hash');
|
||||||
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
|
||||||
|
DROP TABLE repmodel_test;
|
||||||
|
|
||||||
|
CREATE TABLE repmodel_test (a int);
|
||||||
|
SELECT master_create_distributed_table('repmodel_test', 'a', 'append');
|
||||||
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
|
||||||
|
DROP TABLE repmodel_test;
|
||||||
|
|
||||||
|
CREATE TABLE repmodel_test (a int);
|
||||||
|
SELECT master_create_distributed_table('repmodel_test', 'a', 'range');
|
||||||
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
|
||||||
|
DROP TABLE repmodel_test;
|
||||||
|
|
||||||
|
RESET citus.replication_model;
|
||||||
SET citus.shard_replication_factor TO default;
|
SET citus.shard_replication_factor TO default;
|
||||||
|
|
||||||
SET citus.shard_count to 4;
|
SET citus.shard_count to 4;
|
||||||
|
|
|
@ -37,21 +37,15 @@ DROP TABLE testtableddl;
|
||||||
-- verify that the table can dropped even if shards exist
|
-- verify that the table can dropped even if shards exist
|
||||||
CREATE TABLE testtableddl(somecol int, distributecol text NOT NULL);
|
CREATE TABLE testtableddl(somecol int, distributecol text NOT NULL);
|
||||||
|
|
||||||
-- create table as MX table to do create empty shard test here, too
|
-- create table and do create empty shard test here, too
|
||||||
SET citus.shard_replication_factor TO 1;
|
SET citus.shard_replication_factor TO 1;
|
||||||
SET citus.replication_model TO 'streaming';
|
|
||||||
SELECT master_create_distributed_table('testtableddl', 'distributecol', 'append');
|
SELECT master_create_distributed_table('testtableddl', 'distributecol', 'append');
|
||||||
SELECT 1 FROM master_create_empty_shard('testtableddl');
|
SELECT 1 FROM master_create_empty_shard('testtableddl');
|
||||||
|
|
||||||
-- this'll error out
|
|
||||||
SET citus.shard_replication_factor TO 2;
|
|
||||||
SELECT 1 FROM master_create_empty_shard('testtableddl');
|
|
||||||
|
|
||||||
-- now actually drop table and shards
|
-- now actually drop table and shards
|
||||||
DROP TABLE testtableddl;
|
DROP TABLE testtableddl;
|
||||||
|
|
||||||
RESET citus.shard_replication_factor;
|
RESET citus.shard_replication_factor;
|
||||||
RESET citus.replication_model;
|
|
||||||
|
|
||||||
-- ensure no metadata of distributed tables are remaining
|
-- ensure no metadata of distributed tables are remaining
|
||||||
SELECT * FROM pg_dist_partition;
|
SELECT * FROM pg_dist_partition;
|
||||||
|
|
Loading…
Reference in New Issue