diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 361ba7081..74d71c372 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -60,11 +60,14 @@ #include "utils/inval.h" +/* Replication model to use when creating distributed tables */ +int ReplicationModel = REPLICATION_MODEL_COORDINATOR; + + /* local function forward declarations */ static void CreateReferenceTable(Oid relationId); static void ConvertToDistributedTable(Oid relationId, char *distributionColumnName, - char distributionMethod, uint32 colocationId, - char replicationModel); + char distributionMethod, uint32 colocationId); static char LookupDistributionMethod(Oid distributionMethodOid); static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId, int16 supportFunctionNumber); @@ -106,8 +109,7 @@ master_create_distributed_table(PG_FUNCTION_ARGS) EnsureSchemaNode(); ConvertToDistributedTable(distributedRelationId, distributionColumnName, - distributionMethod, INVALID_COLOCATION_ID, - REPLICATION_MODEL_COORDINATOR); + distributionMethod, INVALID_COLOCATION_ID); PG_RETURN_VOID(); } @@ -164,8 +166,7 @@ create_distributed_table(PG_FUNCTION_ARGS) if (distributionMethod != DISTRIBUTE_BY_HASH) { ConvertToDistributedTable(relationId, distributionColumnName, - distributionMethod, INVALID_COLOCATION_ID, - REPLICATION_MODEL_COORDINATOR); + distributionMethod, INVALID_COLOCATION_ID); PG_RETURN_VOID(); } @@ -228,7 +229,7 @@ CreateReferenceTable(Oid relationId) /* first, convert the relation into distributed relation */ ConvertToDistributedTable(relationId, distributionColumnName, - DISTRIBUTE_BY_NONE, colocationId, REPLICATION_MODEL_2PC); + DISTRIBUTE_BY_NONE, colocationId); /* now, create the single shard replicated to all nodes */ CreateReferenceTableShard(relationId); @@ -250,14 +251,27 @@ CreateReferenceTable(Oid relationId) */ static void ConvertToDistributedTable(Oid relationId, char *distributionColumnName, - char distributionMethod, uint32 colocationId, - char replicationModel) + char distributionMethod, uint32 colocationId) { Relation relation = NULL; TupleDesc relationDesc = NULL; char *relationName = NULL; char relationKind = 0; Var *distributionColumn = NULL; + char replicationModel = REPLICATION_MODEL_INVALID; + + /* check global replication settings before continuing */ + EnsureReplicationSettings(InvalidOid); + + /* distribute by none tables use 2PC replication; otherwise use GUC setting */ + if (distributionMethod == DISTRIBUTE_BY_NONE) + { + replicationModel = REPLICATION_MODEL_2PC; + } + else + { + replicationModel = ReplicationModel; + } /* * Lock target relation with an exclusive lock - there's no way to make @@ -891,21 +905,10 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName, uint32 colocationId = INVALID_COLOCATION_ID; Oid sourceRelationId = InvalidOid; Oid distributionColumnType = InvalidOid; - char replicationModel = 0; /* get an access lock on the relation to prevent DROP TABLE and ALTER TABLE */ distributedRelation = relation_open(relationId, AccessShareLock); - /* all hash-distributed tables with repfactor=1 are treated as MX tables */ - if (replicationFactor == 1) - { - replicationModel = REPLICATION_MODEL_STREAMING; - } - else - { - replicationModel = REPLICATION_MODEL_COORDINATOR; - } - /* * Get an exclusive lock on the colocation system catalog. Therefore, we * can be sure that there will no modifications on the colocation table @@ -946,7 +949,7 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName, /* create distributed table metadata */ ConvertToDistributedTable(relationId, distributionColumnName, DISTRIBUTE_BY_HASH, - colocationId, replicationModel); + colocationId); /* create shards */ if (sourceRelationId != InvalidOid) @@ -979,3 +982,34 @@ ColumnType(Oid relationId, char *columnName) return columnType; } + + +/* + * Check that the current replication factor setting is compatible with the + * replication model of relationId, if valid. If InvalidOid, check that the + * global replication model setting instead. Errors out if an invalid state + * is detected. + */ +void +EnsureReplicationSettings(Oid relationId) +{ + char replicationModel = (char) ReplicationModel; + char *msgSuffix = "the streaming replication model"; + char *extraHint = " or setting \"citus.replication_model\" to \"statement\""; + + if (relationId != InvalidOid) + { + replicationModel = TableReplicationModel(relationId); + msgSuffix = "tables which use the streaming replication model"; + extraHint = ""; + } + + if (replicationModel == REPLICATION_MODEL_STREAMING && ShardReplicationFactor != 1) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("replication factors above one are incompatible with %s", + msgSuffix), + errhint("Try again after reducing \"citus.shard_replication_" + "factor\" to one%s.", extraHint))); + } +} diff --git a/src/backend/distributed/master/master_create_shards.c b/src/backend/distributed/master/master_create_shards.c index 880abac71..6d311d256 100644 --- a/src/backend/distributed/master/master_create_shards.c +++ b/src/backend/distributed/master/master_create_shards.c @@ -27,6 +27,7 @@ #include "catalog/namespace.h" #include "catalog/pg_class.h" #include "distributed/listutils.h" +#include "distributed/master_metadata_utility.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/multi_join_order.h" diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index 789e2b816..de8e845dd 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -127,6 +127,8 @@ master_create_empty_shard(PG_FUNCTION_ARGS) "on reference tables"))); } + EnsureReplicationSettings(relationId); + /* generate new and unique shardId from sequence */ shardId = GetNextShardId(); diff --git a/src/backend/distributed/planner/multi_join_order.c b/src/backend/distributed/planner/multi_join_order.c index 5639b4190..ce0894e4e 100644 --- a/src/backend/distributed/planner/multi_join_order.c +++ b/src/backend/distributed/planner/multi_join_order.c @@ -1586,3 +1586,16 @@ PartitionMethod(Oid relationId) return partitionMethod; } + + +/* Returns the replication model for the given relation. */ +char +TableReplicationModel(Oid relationId) +{ + /* errors out if not a distributed table */ + DistTableCacheEntry *partitionEntry = DistributedTableCacheEntry(relationId); + + char replicationModel = partitionEntry->replicationModel; + + return replicationModel; +} diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index b701c624e..c400e1e42 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -21,6 +21,7 @@ #include "distributed/citus_nodefuncs.h" #include "distributed/connection_management.h" #include "distributed/connection_management.h" +#include "distributed/master_metadata_utility.h" #include "distributed/master_protocol.h" #include "distributed/multi_copy.h" #include "distributed/multi_executor.h" @@ -32,6 +33,7 @@ #include "distributed/multi_router_planner.h" #include "distributed/multi_server_executor.h" #include "distributed/multi_utility.h" +#include "distributed/pg_dist_partition.h" #include "distributed/placement_connection.h" #include "distributed/remote_commands.h" #include "distributed/task_tracker.h" @@ -63,6 +65,12 @@ static const struct config_enum_entry task_assignment_policy_options[] = { { NULL, 0, false } }; +static const struct config_enum_entry replication_model_options[] = { + { "statement", REPLICATION_MODEL_COORDINATOR, false }, + { "streaming", REPLICATION_MODEL_STREAMING, false }, + { NULL, 0, false } +}; + static const struct config_enum_entry task_executor_type_options[] = { { "real-time", MULTI_EXECUTOR_REAL_TIME, false }, { "task-tracker", MULTI_EXECUTOR_TASK_TRACKER, false }, @@ -571,6 +579,20 @@ RegisterCitusConfigVariables(void) 0, NULL, NULL, NULL); + DefineCustomEnumVariable( + "citus.replication_model", + gettext_noop("Sets the replication model to be used for distributed tables."), + gettext_noop("Depending upon the execution environment, statement- or streaming-" + "based replication modes may be employed. Though most Citus deploy-" + "ments will simply use statement replication, hosted and MX-style" + "deployments should set this parameter to 'streaming'."), + &ReplicationModel, + REPLICATION_MODEL_COORDINATOR, + replication_model_options, + PGC_SUSET, + GUC_SUPERUSER_ONLY, + NULL, NULL, NULL); + DefineCustomEnumVariable( "citus.task_executor_type", gettext_noop("Sets the executor type to be used for distributed queries."), diff --git a/src/include/distributed/master_metadata_utility.h b/src/include/distributed/master_metadata_utility.h index 74a9f10b5..8b8857927 100644 --- a/src/include/distributed/master_metadata_utility.h +++ b/src/include/distributed/master_metadata_utility.h @@ -59,6 +59,9 @@ typedef struct ShardPlacement } ShardPlacement; +/* Config variable managed via guc.c */ +extern int ReplicationModel; + /* Function declarations to read shard and shard placement data */ extern uint32 TableShardReplicationFactor(Oid relationId); extern List * LoadShardIntervalList(Oid relationId); @@ -97,6 +100,7 @@ extern char * TableOwner(Oid relationId); extern void EnsureTablePermissions(Oid relationId, AclMode mode); extern void EnsureTableOwner(Oid relationId); extern void EnsureSuperUser(void); +extern void EnsureReplicationSettings(Oid relationId); extern bool TableReferenced(Oid relationId); extern char * ConstructQualifiedShardName(ShardInterval *shardInterval); extern Datum StringToDatum(char *inputString, Oid dataType); diff --git a/src/include/distributed/multi_join_order.h b/src/include/distributed/multi_join_order.h index b9276b13a..a2466e57a 100644 --- a/src/include/distributed/multi_join_order.h +++ b/src/include/distributed/multi_join_order.h @@ -92,6 +92,7 @@ extern Var * RightColumn(OpExpr *joinClause); extern Var * PartitionColumn(Oid relationId, uint32 rangeTableId); extern Var * PartitionKey(Oid relationId); extern char PartitionMethod(Oid relationId); +extern char TableReplicationModel(Oid relationId); #endif /* MULTI_JOIN_ORDER_H */ diff --git a/src/test/regress/expected/multi_create_table.out b/src/test/regress/expected/multi_create_table.out index 4570273d0..266b1d794 100644 --- a/src/test/regress/expected/multi_create_table.out +++ b/src/test/regress/expected/multi_create_table.out @@ -137,10 +137,16 @@ SELECT master_create_distributed_table('supplier_single_shard', 's_suppkey', 'ap (1 row) --- Show that when a hash distributed table with replication factor=1 is created, it --- automatically marked as streaming replicated -SET citus.shard_replication_factor TO 1; CREATE TABLE mx_table_test (col1 int, col2 text); +-- Since we're superuser, we can set the replication model to 'streaming' to +-- create a one-off MX table... but if we forget to set the replication factor to one, +-- we should see an error reminding us to fix that +SET citus.replication_model TO 'streaming'; +SELECT create_distributed_table('mx_table_test', 'col1'); +ERROR: replication factors above one are incompatible with the streaming replication model +HINT: Try again after reducing "citus.shard_replication_factor" to one or setting "citus.replication_model" to "statement". +-- ok, so now actually create the one-off MX table +SET citus.shard_replication_factor TO 1; SELECT create_distributed_table('mx_table_test', 'col1'); create_distributed_table -------------------------- @@ -153,7 +159,8 @@ SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regcl s (1 row) -DROP TABLE mx_table_test; +DROP TABLE mx_table_test; +RESET citus.replication_model; -- Show that it is not possible to create an mx table with the old -- master_create_distributed_table function CREATE TABLE mx_table_test (col1 int, col2 text); diff --git a/src/test/regress/expected/multi_metadata_sync.out b/src/test/regress/expected/multi_metadata_sync.out index 64a908322..f034c3a73 100644 --- a/src/test/regress/expected/multi_metadata_sync.out +++ b/src/test/regress/expected/multi_metadata_sync.out @@ -267,7 +267,10 @@ SELECT count(*) FROM pg_trigger WHERE tgrelid='mx_testing_schema.mx_test_table': -- Make sure that start_metadata_sync_to_node considers foreign key constraints \c - - - :master_port +-- Since we're superuser, we can set the replication model to 'streaming' to +-- create some MX tables SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; CREATE SCHEMA mx_testing_schema_2; CREATE TABLE mx_testing_schema.fk_test_1 (col1 int, col2 text, col3 int, UNIQUE(col1, col3)); CREATE TABLE mx_testing_schema_2.fk_test_2 (col1 int, col2 int, col3 text, @@ -307,6 +310,7 @@ Foreign-key constraints: DROP TABLE mx_testing_schema_2.fk_test_2; DROP TABLE mx_testing_schema.fk_test_1; RESET citus.shard_replication_factor; +RESET citus.replication_model; -- Check that repeated calls to start_metadata_sync_to_node has no side effects \c - - - :master_port SELECT start_metadata_sync_to_node('localhost', :worker_1_port); @@ -399,6 +403,7 @@ SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port; -- Check that the distributed table can be queried from the worker \c - - - :master_port SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; SELECT start_metadata_sync_to_node('localhost', :worker_1_port); start_metadata_sync_to_node ----------------------------- @@ -489,6 +494,7 @@ CREATE SCHEMA mx_test_schema_1; CREATE SCHEMA mx_test_schema_2; -- Create MX tables SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; CREATE TABLE mx_test_schema_1.mx_table_1 (col1 int UNIQUE, col2 text); CREATE INDEX mx_index_1 ON mx_test_schema_1.mx_table_1 (col1); CREATE TABLE mx_test_schema_2.mx_table_2 (col1 int, col2 text); @@ -754,6 +760,7 @@ SELECT nextval('pg_catalog.pg_dist_colocationid_seq') AS last_colocation_id \gse ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 10000; SET citus.shard_count TO 7; SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; CREATE TABLE mx_colocation_test_1 (a int); SELECT create_distributed_table('mx_colocation_test_1', 'a'); create_distributed_table @@ -848,6 +855,7 @@ DROP TABLE mx_colocation_test_2; \c - - - :master_port SET citus.shard_count TO 7; SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; CREATE TABLE mx_temp_drop_test (a int); SELECT create_distributed_table('mx_temp_drop_test', 'a'); create_distributed_table @@ -880,6 +888,7 @@ DROP TABLE mx_temp_drop_test; \c - - - :master_port SET citus.shard_count TO 3; SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); stop_metadata_sync_to_node ---------------------------- @@ -1097,7 +1106,8 @@ SELECT master_remove_node('localhost', :worker_2_port); (1 row) -CREATE USER mx_user; + -- the master user needs superuser permissions to change the replication model +CREATE USER mx_user WITH SUPERUSER; NOTICE: not propagating CREATE ROLE/USER commands to worker nodes HINT: Connect to worker nodes directly to manually create all necessary users and roles. \c - - - :worker_1_port @@ -1112,6 +1122,7 @@ HINT: Connect to worker nodes directly to manually create all necessary users a -- Create an mx table as a different user CREATE TABLE mx_table (a int, b BIGSERIAL); SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; SELECT create_distributed_table('mx_table', 'a'); create_distributed_table -------------------------- @@ -1352,6 +1363,7 @@ SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); RESET citus.shard_count; RESET citus.shard_replication_factor; +RESET citus.replication_model; RESET citus.multi_shard_commit_protocol; ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id; ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id; diff --git a/src/test/regress/expected/multi_replicate_reference_table.out b/src/test/regress/expected/multi_replicate_reference_table.out index 3c4e6147f..869a3f905 100644 --- a/src/test/regress/expected/multi_replicate_reference_table.out +++ b/src/test/regress/expected/multi_replicate_reference_table.out @@ -353,6 +353,7 @@ SELECT create_reference_table('replicate_reference_table_reference_one'); SET citus.shard_count TO 1; SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; CREATE TABLE replicate_reference_table_hash(column1 int); SELECT create_distributed_table('replicate_reference_table_hash', 'column1'); create_distributed_table @@ -605,3 +606,5 @@ DROP SCHEMA replicate_reference_table_schema CASCADE; -- reload pg_dist_shard_placement table INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement); DROP TABLE tmp_shard_placement; +RESET citus.shard_replication_factor; +RESET citus.replication_model; diff --git a/src/test/regress/expected/multi_table_ddl.out b/src/test/regress/expected/multi_table_ddl.out index b4d2c3704..9e230ba0b 100644 --- a/src/test/regress/expected/multi_table_ddl.out +++ b/src/test/regress/expected/multi_table_ddl.out @@ -33,6 +33,9 @@ ROLLBACK; DROP TABLE testtableddl; -- verify that the table can dropped even if shards exist CREATE TABLE testtableddl(somecol int, distributecol text NOT NULL); +-- create table as MX table to do create empty shard test here, too +SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; SELECT master_create_distributed_table('testtableddl', 'distributecol', 'append'); master_create_distributed_table --------------------------------- @@ -45,7 +48,15 @@ SELECT 1 FROM master_create_empty_shard('testtableddl'); 1 (1 row) +-- this'll error out +SET citus.shard_replication_factor TO 2; +SELECT 1 FROM master_create_empty_shard('testtableddl'); +ERROR: replication factors above one are incompatible with tables which use the streaming replication model +HINT: Try again after reducing "citus.shard_replication_factor" to one. +-- now actually drop table and shards DROP TABLE testtableddl; +RESET citus.shard_replication_factor; +RESET citus.replication_model; -- ensure no metadata of distributed tables are remaining SELECT * FROM pg_dist_partition; logicalrelid | partmethod | partkey | colocationid | repmodel diff --git a/src/test/regress/expected/multi_unsupported_worker_operations.out b/src/test/regress/expected/multi_unsupported_worker_operations.out index aea3b2b89..0d9bfbf03 100644 --- a/src/test/regress/expected/multi_unsupported_worker_operations.out +++ b/src/test/regress/expected/multi_unsupported_worker_operations.out @@ -12,6 +12,7 @@ SELECT nextval('pg_catalog.pg_dist_colocationid_seq') AS last_colocation_id \gse ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 150000; -- Prepare the environment SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; SET citus.shard_count TO 5; -- Create test tables CREATE TABLE mx_table (col_1 int, col_2 text, col_3 BIGSERIAL); @@ -403,3 +404,5 @@ SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition; \c - - - :master_port ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART :last_colocation_id; +RESET citus.shard_replication_factor; +RESET citus.replication_model; diff --git a/src/test/regress/expected/multi_upgrade_reference_table.out b/src/test/regress/expected/multi_upgrade_reference_table.out index fb6e523d5..1d5853d49 100644 --- a/src/test/regress/expected/multi_upgrade_reference_table.out +++ b/src/test/regress/expected/multi_upgrade_reference_table.out @@ -801,6 +801,7 @@ DROP TABLE upgrade_reference_table_transaction_commit; -- create an mx table SET citus.shard_count TO 1; SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; CREATE TABLE upgrade_reference_table_mx(column1 int); SELECT create_distributed_table('upgrade_reference_table_mx', 'column1'); create_distributed_table @@ -911,6 +912,7 @@ DROP TABLE upgrade_reference_table_mx; -- test valid cases, do it with MX SET citus.shard_count TO 1; SET citus.shard_replication_factor TO 2; +RESET citus.replication_model; CREATE TABLE upgrade_reference_table_mx(column1 int); SELECT create_distributed_table('upgrade_reference_table_mx', 'column1'); create_distributed_table diff --git a/src/test/regress/sql/multi_create_table.sql b/src/test/regress/sql/multi_create_table.sql index 0414653fc..5bb175a3f 100644 --- a/src/test/regress/sql/multi_create_table.sql +++ b/src/test/regress/sql/multi_create_table.sql @@ -112,14 +112,21 @@ CREATE TABLE supplier_single_shard ); SELECT master_create_distributed_table('supplier_single_shard', 's_suppkey', 'append'); --- Show that when a hash distributed table with replication factor=1 is created, it --- automatically marked as streaming replicated -SET citus.shard_replication_factor TO 1; - CREATE TABLE mx_table_test (col1 int, col2 text); + +-- Since we're superuser, we can set the replication model to 'streaming' to +-- create a one-off MX table... but if we forget to set the replication factor to one, +-- we should see an error reminding us to fix that +SET citus.replication_model TO 'streaming'; +SELECT create_distributed_table('mx_table_test', 'col1'); + +-- ok, so now actually create the one-off MX table +SET citus.shard_replication_factor TO 1; SELECT create_distributed_table('mx_table_test', 'col1'); SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regclass; -DROP TABLE mx_table_test; +DROP TABLE mx_table_test; + +RESET citus.replication_model; -- Show that it is not possible to create an mx table with the old -- master_create_distributed_table function diff --git a/src/test/regress/sql/multi_metadata_sync.sql b/src/test/regress/sql/multi_metadata_sync.sql index 394397d32..68ad4f615 100644 --- a/src/test/regress/sql/multi_metadata_sync.sql +++ b/src/test/regress/sql/multi_metadata_sync.sql @@ -89,7 +89,11 @@ SELECT count(*) FROM pg_trigger WHERE tgrelid='mx_testing_schema.mx_test_table': -- Make sure that start_metadata_sync_to_node considers foreign key constraints \c - - - :master_port + +-- Since we're superuser, we can set the replication model to 'streaming' to +-- create some MX tables SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; CREATE SCHEMA mx_testing_schema_2; @@ -111,6 +115,7 @@ DROP TABLE mx_testing_schema_2.fk_test_2; DROP TABLE mx_testing_schema.fk_test_1; RESET citus.shard_replication_factor; +RESET citus.replication_model; -- Check that repeated calls to start_metadata_sync_to_node has no side effects \c - - - :master_port @@ -136,6 +141,7 @@ SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port; -- Check that the distributed table can be queried from the worker \c - - - :master_port SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; SELECT start_metadata_sync_to_node('localhost', :worker_1_port); CREATE TABLE mx_query_test (a int, b text, c int); @@ -177,6 +183,7 @@ CREATE SCHEMA mx_test_schema_2; -- Create MX tables SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; CREATE TABLE mx_test_schema_1.mx_table_1 (col1 int UNIQUE, col2 text); CREATE INDEX mx_index_1 ON mx_test_schema_1.mx_table_1 (col1); @@ -302,6 +309,7 @@ SELECT nextval('pg_catalog.pg_dist_colocationid_seq') AS last_colocation_id \gse ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 10000; SET citus.shard_count TO 7; SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; CREATE TABLE mx_colocation_test_1 (a int); SELECT create_distributed_table('mx_colocation_test_1', 'a'); @@ -370,6 +378,7 @@ DROP TABLE mx_colocation_test_2; \c - - - :master_port SET citus.shard_count TO 7; SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; CREATE TABLE mx_temp_drop_test (a int); SELECT create_distributed_table('mx_temp_drop_test', 'a'); @@ -387,6 +396,7 @@ DROP TABLE mx_temp_drop_test; \c - - - :master_port SET citus.shard_count TO 3; SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); @@ -465,7 +475,8 @@ DELETE FROM pg_dist_shard_placement; DELETE FROM pg_dist_partition; SELECT master_remove_node('localhost', :worker_2_port); -CREATE USER mx_user; + -- the master user needs superuser permissions to change the replication model +CREATE USER mx_user WITH SUPERUSER; \c - - - :worker_1_port CREATE USER mx_user; \c - - - :worker_2_port @@ -475,6 +486,7 @@ CREATE USER mx_user; -- Create an mx table as a different user CREATE TABLE mx_table (a int, b BIGSERIAL); SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; SELECT create_distributed_table('mx_table', 'a'); \c - postgres - :master_port @@ -591,6 +603,7 @@ SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); RESET citus.shard_count; RESET citus.shard_replication_factor; +RESET citus.replication_model; RESET citus.multi_shard_commit_protocol; ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id; diff --git a/src/test/regress/sql/multi_replicate_reference_table.sql b/src/test/regress/sql/multi_replicate_reference_table.sql index 39931af95..86c5c8ea0 100644 --- a/src/test/regress/sql/multi_replicate_reference_table.sql +++ b/src/test/regress/sql/multi_replicate_reference_table.sql @@ -228,6 +228,7 @@ SELECT create_reference_table('replicate_reference_table_reference_one'); SET citus.shard_count TO 1; SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; CREATE TABLE replicate_reference_table_hash(column1 int); SELECT create_distributed_table('replicate_reference_table_hash', 'column1'); @@ -395,3 +396,6 @@ DROP SCHEMA replicate_reference_table_schema CASCADE; -- reload pg_dist_shard_placement table INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement); DROP TABLE tmp_shard_placement; + +RESET citus.shard_replication_factor; +RESET citus.replication_model; diff --git a/src/test/regress/sql/multi_table_ddl.sql b/src/test/regress/sql/multi_table_ddl.sql index 8cb2ddbf3..166d7cdbd 100644 --- a/src/test/regress/sql/multi_table_ddl.sql +++ b/src/test/regress/sql/multi_table_ddl.sql @@ -32,10 +32,23 @@ DROP TABLE testtableddl; -- verify that the table can dropped even if shards exist CREATE TABLE testtableddl(somecol int, distributecol text NOT NULL); + +-- create table as MX table to do create empty shard test here, too +SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; SELECT master_create_distributed_table('testtableddl', 'distributecol', 'append'); SELECT 1 FROM master_create_empty_shard('testtableddl'); + +-- this'll error out +SET citus.shard_replication_factor TO 2; +SELECT 1 FROM master_create_empty_shard('testtableddl'); + +-- now actually drop table and shards DROP TABLE testtableddl; +RESET citus.shard_replication_factor; +RESET citus.replication_model; + -- ensure no metadata of distributed tables are remaining SELECT * FROM pg_dist_partition; SELECT * FROM pg_dist_shard; diff --git a/src/test/regress/sql/multi_unsupported_worker_operations.sql b/src/test/regress/sql/multi_unsupported_worker_operations.sql index ceac8b0f4..589b0766b 100644 --- a/src/test/regress/sql/multi_unsupported_worker_operations.sql +++ b/src/test/regress/sql/multi_unsupported_worker_operations.sql @@ -16,6 +16,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 150000; -- Prepare the environment SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; SET citus.shard_count TO 5; -- Create test tables @@ -213,3 +214,6 @@ DELETE FROM pg_dist_node; SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition; \c - - - :master_port ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART :last_colocation_id; + +RESET citus.shard_replication_factor; +RESET citus.replication_model; diff --git a/src/test/regress/sql/multi_upgrade_reference_table.sql b/src/test/regress/sql/multi_upgrade_reference_table.sql index 494eb5eb0..f4f25f084 100644 --- a/src/test/regress/sql/multi_upgrade_reference_table.sql +++ b/src/test/regress/sql/multi_upgrade_reference_table.sql @@ -522,6 +522,7 @@ DROP TABLE upgrade_reference_table_transaction_commit; -- create an mx table SET citus.shard_count TO 1; SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; CREATE TABLE upgrade_reference_table_mx(column1 int); SELECT create_distributed_table('upgrade_reference_table_mx', 'column1'); @@ -596,6 +597,7 @@ DROP TABLE upgrade_reference_table_mx; -- test valid cases, do it with MX SET citus.shard_count TO 1; SET citus.shard_replication_factor TO 2; +RESET citus.replication_model; CREATE TABLE upgrade_reference_table_mx(column1 int); SELECT create_distributed_table('upgrade_reference_table_mx', 'column1'); UPDATE pg_dist_shard_placement SET shardstate = 3