diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index da09bbbf5..43dd3a7ee 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -67,7 +67,8 @@ int ReplicationModel = REPLICATION_MODEL_COORDINATOR; /* local function forward declarations */ static void CreateReferenceTable(Oid relationId); static void ConvertToDistributedTable(Oid relationId, char *distributionColumnName, - char distributionMethod, uint32 colocationId); + char distributionMethod, char replicationModel, + uint32 colocationId); static char LookupDistributionMethod(Oid distributionMethodOid); static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId, int16 supportFunctionNumber); @@ -108,8 +109,19 @@ master_create_distributed_table(PG_FUNCTION_ARGS) EnsureCoordinator(); + if (ReplicationModel != REPLICATION_MODEL_COORDINATOR) + { + ereport(NOTICE, (errmsg("using statement-based replication"), + errdetail("The current replication_model setting is " + "'streaming', which is not supported by " + "master_create_distributed_table."), + errhint("Use create_distributed_table to use the streaming " + "replication model."))); + } + ConvertToDistributedTable(distributedRelationId, distributionColumnName, - distributionMethod, INVALID_COLOCATION_ID); + distributionMethod, REPLICATION_MODEL_COORDINATOR, + INVALID_COLOCATION_ID); PG_RETURN_VOID(); } @@ -165,8 +177,16 @@ create_distributed_table(PG_FUNCTION_ARGS) /* if distribution method is not hash, just create partition metadata */ if (distributionMethod != DISTRIBUTE_BY_HASH) { + if (ReplicationModel != REPLICATION_MODEL_COORDINATOR) + { + ereport(NOTICE, (errmsg("using statement-based replication"), + errdetail("Streaming replication is supported only for " + "hash-distributed tables."))); + } + ConvertToDistributedTable(relationId, distributionColumnName, - distributionMethod, INVALID_COLOCATION_ID); + distributionMethod, REPLICATION_MODEL_COORDINATOR, + INVALID_COLOCATION_ID); PG_RETURN_VOID(); } @@ -229,7 +249,7 @@ CreateReferenceTable(Oid relationId) /* first, convert the relation into distributed relation */ ConvertToDistributedTable(relationId, distributionColumnName, - DISTRIBUTE_BY_NONE, colocationId); + DISTRIBUTE_BY_NONE, REPLICATION_MODEL_2PC, colocationId); /* now, create the single shard replicated to all nodes */ CreateReferenceTableShard(relationId); @@ -251,27 +271,17 @@ CreateReferenceTable(Oid relationId) */ static void ConvertToDistributedTable(Oid relationId, char *distributionColumnName, - char distributionMethod, uint32 colocationId) + char distributionMethod, char replicationModel, + uint32 colocationId) { Relation relation = NULL; TupleDesc relationDesc = NULL; char *relationName = NULL; char relationKind = 0; Var *distributionColumn = NULL; - char replicationModel = REPLICATION_MODEL_INVALID; /* check global replication settings before continuing */ - EnsureReplicationSettings(InvalidOid); - - /* distribute by none tables use 2PC replication; otherwise use GUC setting */ - if (distributionMethod == DISTRIBUTE_BY_NONE) - { - replicationModel = REPLICATION_MODEL_2PC; - } - else - { - replicationModel = ReplicationModel; - } + EnsureReplicationSettings(InvalidOid, replicationModel); /* * Lock target relation with an exclusive lock - there's no way to make @@ -949,7 +959,7 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName, /* create distributed table metadata */ ConvertToDistributedTable(relationId, distributionColumnName, DISTRIBUTE_BY_HASH, - colocationId); + ReplicationModel, colocationId); /* create shards */ if (sourceRelationId != InvalidOid) @@ -991,15 +1001,13 @@ ColumnType(Oid relationId, char *columnName) * is detected. */ void -EnsureReplicationSettings(Oid relationId) +EnsureReplicationSettings(Oid relationId, char replicationModel) { - char replicationModel = (char) ReplicationModel; char *msgSuffix = "the streaming replication model"; char *extraHint = " or setting \"citus.replication_model\" to \"statement\""; if (relationId != InvalidOid) { - replicationModel = TableReplicationModel(relationId); msgSuffix = "tables which use the streaming replication model"; extraHint = ""; } diff --git a/src/backend/distributed/master/master_create_shards.c b/src/backend/distributed/master/master_create_shards.c index 7c5a21011..2a12cc347 100644 --- a/src/backend/distributed/master/master_create_shards.c +++ b/src/backend/distributed/master/master_create_shards.c @@ -97,6 +97,7 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, uint64 hashTokenIncrement = 0; List *existingShardList = NIL; int64 shardIndex = 0; + DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); /* make sure table is hash partitioned */ CheckHashPartitionedTable(distributedTableId); @@ -138,6 +139,21 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, errmsg("replication_factor must be positive"))); } + /* make sure that RF=1 if the table is streaming replicated */ + if (cacheEntry->replicationModel == REPLICATION_MODEL_STREAMING && + replicationFactor > 1) + { + char *relationName = get_rel_name(cacheEntry->relationId); + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("using replication factor %d with the streaming " + "replication model is not supported", + replicationFactor), + errdetail("The table %s is marked as streaming replicated and " + "the shard replication factor of streaming replicated " + "tables must be 1.", relationName), + errhint("Use replication factor 1."))); + } + /* calculate the split of the hash space */ hashTokenIncrement = HASH_TOKEN_COUNT / shardCount; diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index c04ba0858..5deb6832f 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -87,6 +87,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS) Oid relationId = ResolveRelationId(relationNameText); char relationKind = get_rel_relkind(relationId); char *relationOwner = TableOwner(relationId); + char replicationModel = REPLICATION_MODEL_INVALID; EnsureTablePermissions(relationId, ACL_INSERT); CheckDistributedTable(relationId); @@ -127,7 +128,9 @@ master_create_empty_shard(PG_FUNCTION_ARGS) "on reference tables"))); } - EnsureReplicationSettings(relationId); + replicationModel = TableReplicationModel(relationId); + + EnsureReplicationSettings(relationId, replicationModel); /* generate new and unique shardId from sequence */ shardId = GetNextShardId(); diff --git a/src/include/distributed/master_metadata_utility.h b/src/include/distributed/master_metadata_utility.h index 8b8857927..013df30ba 100644 --- a/src/include/distributed/master_metadata_utility.h +++ b/src/include/distributed/master_metadata_utility.h @@ -100,7 +100,7 @@ extern char * TableOwner(Oid relationId); extern void EnsureTablePermissions(Oid relationId, AclMode mode); extern void EnsureTableOwner(Oid relationId); extern void EnsureSuperUser(void); -extern void EnsureReplicationSettings(Oid relationId); +extern void EnsureReplicationSettings(Oid relationId, char replicationModel); extern bool TableReferenced(Oid relationId); extern char * ConstructQualifiedShardName(ShardInterval *shardInterval); extern Datum StringToDatum(char *inputString, Oid dataType); diff --git a/src/test/regress/expected/multi_create_table.out b/src/test/regress/expected/multi_create_table.out index 266b1d794..b23c2d26b 100644 --- a/src/test/regress/expected/multi_create_table.out +++ b/src/test/regress/expected/multi_create_table.out @@ -160,39 +160,206 @@ SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regcl (1 row) DROP TABLE mx_table_test; -RESET citus.replication_model; --- Show that it is not possible to create an mx table with the old --- master_create_distributed_table function -CREATE TABLE mx_table_test (col1 int, col2 text); -SELECT master_create_distributed_table('mx_table_test', 'col1', 'hash'); +-- Show that master_create_distributed_table ignores citus.replication_model GUC +CREATE TABLE s_table(a int); +SELECT master_create_distributed_table('s_table', 'a', 'hash'); +NOTICE: using statement-based replication +DETAIL: The current replication_model setting is 'streaming', which is not supported by master_create_distributed_table. +HINT: Use create_distributed_table to use the streaming replication model. master_create_distributed_table --------------------------------- (1 row) -SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regclass; +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='s_table'::regclass; repmodel ---------- c (1 row) -DROP TABLE mx_table_test; --- Show that when replication factor > 1 the table is created as coordinator-replicated +-- Show that master_create_worker_shards complains when RF>1 and replication model is streaming +UPDATE pg_dist_partition SET repmodel = 's' WHERE logicalrelid='s_table'::regclass; +SELECT master_create_worker_shards('s_table', 4, 2); +ERROR: using replication factor 2 with the streaming replication model is not supported +DETAIL: The table s_table is marked as streaming replicated and the shard replication factor of streaming replicated tables must be 1. +HINT: Use replication factor 1. +DROP TABLE s_table; +RESET citus.replication_model; +-- Show that create_distributed_table with append and range distributions ignore +-- citus.replication_model GUC SET citus.shard_replication_factor TO 2; -CREATE TABLE mx_table_test (col1 int, col2 text); -SELECT create_distributed_table('mx_table_test', 'col1'); +SET citus.replication_model TO streaming; +CREATE TABLE repmodel_test (a int); +SELECT create_distributed_table('repmodel_test', 'a', 'append'); +NOTICE: using statement-based replication +DETAIL: Streaming replication is supported only for hash-distributed tables. create_distributed_table -------------------------- (1 row) -SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regclass; +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass; repmodel ---------- c (1 row) -DROP TABLE mx_table_test; +DROP TABLE repmodel_test; +CREATE TABLE repmodel_test (a int); +SELECT create_distributed_table('repmodel_test', 'a', 'range'); +NOTICE: using statement-based replication +DETAIL: Streaming replication is supported only for hash-distributed tables. + create_distributed_table +-------------------------- + +(1 row) + +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass; + repmodel +---------- + c +(1 row) + +DROP TABLE repmodel_test; +-- Show that master_create_distributed_table created statement replicated tables no matter +-- what citus.replication_model set to +CREATE TABLE repmodel_test (a int); +SELECT master_create_distributed_table('repmodel_test', 'a', 'hash'); +NOTICE: using statement-based replication +DETAIL: The current replication_model setting is 'streaming', which is not supported by master_create_distributed_table. +HINT: Use create_distributed_table to use the streaming replication model. + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass; + repmodel +---------- + c +(1 row) + +DROP TABLE repmodel_test; +CREATE TABLE repmodel_test (a int); +SELECT master_create_distributed_table('repmodel_test', 'a', 'append'); +NOTICE: using statement-based replication +DETAIL: The current replication_model setting is 'streaming', which is not supported by master_create_distributed_table. +HINT: Use create_distributed_table to use the streaming replication model. + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass; + repmodel +---------- + c +(1 row) + +DROP TABLE repmodel_test; +CREATE TABLE repmodel_test (a int); +SELECT master_create_distributed_table('repmodel_test', 'a', 'range'); +NOTICE: using statement-based replication +DETAIL: The current replication_model setting is 'streaming', which is not supported by master_create_distributed_table. +HINT: Use create_distributed_table to use the streaming replication model. + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass; + repmodel +---------- + c +(1 row) + +DROP TABLE repmodel_test; +-- Check that the replication_model overwrite behavior is the same with RF=1 +SET citus.shard_replication_factor TO 1; +CREATE TABLE repmodel_test (a int); +SELECT create_distributed_table('repmodel_test', 'a', 'append'); +NOTICE: using statement-based replication +DETAIL: Streaming replication is supported only for hash-distributed tables. + create_distributed_table +-------------------------- + +(1 row) + +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass; + repmodel +---------- + c +(1 row) + +DROP TABLE repmodel_test; +CREATE TABLE repmodel_test (a int); +SELECT create_distributed_table('repmodel_test', 'a', 'range'); +NOTICE: using statement-based replication +DETAIL: Streaming replication is supported only for hash-distributed tables. + create_distributed_table +-------------------------- + +(1 row) + +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass; + repmodel +---------- + c +(1 row) + +DROP TABLE repmodel_test; +CREATE TABLE repmodel_test (a int); +SELECT master_create_distributed_table('repmodel_test', 'a', 'hash'); +NOTICE: using statement-based replication +DETAIL: The current replication_model setting is 'streaming', which is not supported by master_create_distributed_table. +HINT: Use create_distributed_table to use the streaming replication model. + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass; + repmodel +---------- + c +(1 row) + +DROP TABLE repmodel_test; +CREATE TABLE repmodel_test (a int); +SELECT master_create_distributed_table('repmodel_test', 'a', 'append'); +NOTICE: using statement-based replication +DETAIL: The current replication_model setting is 'streaming', which is not supported by master_create_distributed_table. +HINT: Use create_distributed_table to use the streaming replication model. + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass; + repmodel +---------- + c +(1 row) + +DROP TABLE repmodel_test; +CREATE TABLE repmodel_test (a int); +SELECT master_create_distributed_table('repmodel_test', 'a', 'range'); +NOTICE: using statement-based replication +DETAIL: The current replication_model setting is 'streaming', which is not supported by master_create_distributed_table. +HINT: Use create_distributed_table to use the streaming replication model. + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass; + repmodel +---------- + c +(1 row) + +DROP TABLE repmodel_test; +RESET citus.replication_model; SET citus.shard_replication_factor TO default; SET citus.shard_count to 4; CREATE TABLE lineitem_hash_part (like lineitem); diff --git a/src/test/regress/expected/multi_table_ddl.out b/src/test/regress/expected/multi_table_ddl.out index 508ac769b..a5e2f4330 100644 --- a/src/test/regress/expected/multi_table_ddl.out +++ b/src/test/regress/expected/multi_table_ddl.out @@ -40,9 +40,8 @@ SELECT master_create_distributed_table('testtableddl', 'distributecol', 'append' DROP TABLE testtableddl; -- verify that the table can dropped even if shards exist CREATE TABLE testtableddl(somecol int, distributecol text NOT NULL); --- create table as MX table to do create empty shard test here, too +-- create table and do create empty shard test here, too SET citus.shard_replication_factor TO 1; -SET citus.replication_model TO 'streaming'; SELECT master_create_distributed_table('testtableddl', 'distributecol', 'append'); master_create_distributed_table --------------------------------- @@ -55,15 +54,9 @@ SELECT 1 FROM master_create_empty_shard('testtableddl'); 1 (1 row) --- this'll error out -SET citus.shard_replication_factor TO 2; -SELECT 1 FROM master_create_empty_shard('testtableddl'); -ERROR: replication factors above one are incompatible with tables which use the streaming replication model -HINT: Try again after reducing "citus.shard_replication_factor" to one. -- now actually drop table and shards DROP TABLE testtableddl; RESET citus.shard_replication_factor; -RESET citus.replication_model; -- ensure no metadata of distributed tables are remaining SELECT * FROM pg_dist_partition; logicalrelid | partmethod | partkey | colocationid | repmodel diff --git a/src/test/regress/sql/multi_create_table.sql b/src/test/regress/sql/multi_create_table.sql index 5bb175a3f..2aa46b9a4 100644 --- a/src/test/regress/sql/multi_create_table.sql +++ b/src/test/regress/sql/multi_create_table.sql @@ -126,22 +126,81 @@ SELECT create_distributed_table('mx_table_test', 'col1'); SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regclass; DROP TABLE mx_table_test; +-- Show that master_create_distributed_table ignores citus.replication_model GUC +CREATE TABLE s_table(a int); +SELECT master_create_distributed_table('s_table', 'a', 'hash'); +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='s_table'::regclass; + +-- Show that master_create_worker_shards complains when RF>1 and replication model is streaming +UPDATE pg_dist_partition SET repmodel = 's' WHERE logicalrelid='s_table'::regclass; +SELECT master_create_worker_shards('s_table', 4, 2); + +DROP TABLE s_table; + RESET citus.replication_model; --- Show that it is not possible to create an mx table with the old --- master_create_distributed_table function -CREATE TABLE mx_table_test (col1 int, col2 text); -SELECT master_create_distributed_table('mx_table_test', 'col1', 'hash'); -SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regclass; -DROP TABLE mx_table_test; - --- Show that when replication factor > 1 the table is created as coordinator-replicated +-- Show that create_distributed_table with append and range distributions ignore +-- citus.replication_model GUC SET citus.shard_replication_factor TO 2; -CREATE TABLE mx_table_test (col1 int, col2 text); -SELECT create_distributed_table('mx_table_test', 'col1'); -SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regclass; -DROP TABLE mx_table_test; +SET citus.replication_model TO streaming; +CREATE TABLE repmodel_test (a int); +SELECT create_distributed_table('repmodel_test', 'a', 'append'); +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass; +DROP TABLE repmodel_test; + +CREATE TABLE repmodel_test (a int); +SELECT create_distributed_table('repmodel_test', 'a', 'range'); +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass; +DROP TABLE repmodel_test; + +-- Show that master_create_distributed_table created statement replicated tables no matter +-- what citus.replication_model set to + +CREATE TABLE repmodel_test (a int); +SELECT master_create_distributed_table('repmodel_test', 'a', 'hash'); +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass; +DROP TABLE repmodel_test; + +CREATE TABLE repmodel_test (a int); +SELECT master_create_distributed_table('repmodel_test', 'a', 'append'); +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass; +DROP TABLE repmodel_test; + +CREATE TABLE repmodel_test (a int); +SELECT master_create_distributed_table('repmodel_test', 'a', 'range'); +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass; +DROP TABLE repmodel_test; + +-- Check that the replication_model overwrite behavior is the same with RF=1 +SET citus.shard_replication_factor TO 1; + +CREATE TABLE repmodel_test (a int); +SELECT create_distributed_table('repmodel_test', 'a', 'append'); +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass; +DROP TABLE repmodel_test; + +CREATE TABLE repmodel_test (a int); +SELECT create_distributed_table('repmodel_test', 'a', 'range'); +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass; +DROP TABLE repmodel_test; + +CREATE TABLE repmodel_test (a int); +SELECT master_create_distributed_table('repmodel_test', 'a', 'hash'); +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass; +DROP TABLE repmodel_test; + +CREATE TABLE repmodel_test (a int); +SELECT master_create_distributed_table('repmodel_test', 'a', 'append'); +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass; +DROP TABLE repmodel_test; + +CREATE TABLE repmodel_test (a int); +SELECT master_create_distributed_table('repmodel_test', 'a', 'range'); +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass; +DROP TABLE repmodel_test; + +RESET citus.replication_model; SET citus.shard_replication_factor TO default; SET citus.shard_count to 4; diff --git a/src/test/regress/sql/multi_table_ddl.sql b/src/test/regress/sql/multi_table_ddl.sql index e1f737e83..4a3aa712f 100644 --- a/src/test/regress/sql/multi_table_ddl.sql +++ b/src/test/regress/sql/multi_table_ddl.sql @@ -37,21 +37,15 @@ DROP TABLE testtableddl; -- verify that the table can dropped even if shards exist CREATE TABLE testtableddl(somecol int, distributecol text NOT NULL); --- create table as MX table to do create empty shard test here, too +-- create table and do create empty shard test here, too SET citus.shard_replication_factor TO 1; -SET citus.replication_model TO 'streaming'; SELECT master_create_distributed_table('testtableddl', 'distributecol', 'append'); SELECT 1 FROM master_create_empty_shard('testtableddl'); --- this'll error out -SET citus.shard_replication_factor TO 2; -SELECT 1 FROM master_create_empty_shard('testtableddl'); - -- now actually drop table and shards DROP TABLE testtableddl; RESET citus.shard_replication_factor; -RESET citus.replication_model; -- ensure no metadata of distributed tables are remaining SELECT * FROM pg_dist_partition;