From 56197dbdba57a67ad170b97acf5f11254b85f008 Mon Sep 17 00:00:00 2001 From: Jason Petersen Date: Tue, 17 Jan 2017 11:03:17 -0700 Subject: [PATCH] Add replication_model GUC This adds a replication_model GUC which is used as the replication model for any new distributed table that is not a reference table. With this change, tables with replication factor 1 are no longer implicitly MX tables. The GUC is similarly respected during empty shard creation for e.g. existing append-partitioned tables. If the model is set to streaming while replication factor is greater than one, table and shard creation routines will error until this invalid combination is corrected. Changing this parameter requires superuser permissions. --- .../commands/create_distributed_table.c | 76 ++++++++++++++----- .../distributed/master/master_create_shards.c | 1 + .../master/master_stage_protocol.c | 2 + .../distributed/planner/multi_join_order.c | 13 ++++ src/backend/distributed/shared_library_init.c | 22 ++++++ .../distributed/master_metadata_utility.h | 4 + src/include/distributed/multi_join_order.h | 1 + .../regress/expected/multi_create_table.out | 15 +++- .../regress/expected/multi_metadata_sync.out | 14 +++- .../multi_replicate_reference_table.out | 3 + src/test/regress/expected/multi_table_ddl.out | 11 +++ .../multi_unsupported_worker_operations.out | 3 + .../multi_upgrade_reference_table.out | 2 + src/test/regress/sql/multi_create_table.sql | 17 +++-- src/test/regress/sql/multi_metadata_sync.sql | 15 +++- .../sql/multi_replicate_reference_table.sql | 4 + src/test/regress/sql/multi_table_ddl.sql | 13 ++++ .../multi_unsupported_worker_operations.sql | 4 + .../sql/multi_upgrade_reference_table.sql | 2 + 19 files changed, 190 insertions(+), 32 deletions(-) diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 361ba7081..74d71c372 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -60,11 +60,14 @@ #include "utils/inval.h" +/* Replication model to use when creating distributed tables */ +int ReplicationModel = REPLICATION_MODEL_COORDINATOR; + + /* local function forward declarations */ static void CreateReferenceTable(Oid relationId); static void ConvertToDistributedTable(Oid relationId, char *distributionColumnName, - char distributionMethod, uint32 colocationId, - char replicationModel); + char distributionMethod, uint32 colocationId); static char LookupDistributionMethod(Oid distributionMethodOid); static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId, int16 supportFunctionNumber); @@ -106,8 +109,7 @@ master_create_distributed_table(PG_FUNCTION_ARGS) EnsureSchemaNode(); ConvertToDistributedTable(distributedRelationId, distributionColumnName, - distributionMethod, INVALID_COLOCATION_ID, - REPLICATION_MODEL_COORDINATOR); + distributionMethod, INVALID_COLOCATION_ID); PG_RETURN_VOID(); } @@ -164,8 +166,7 @@ create_distributed_table(PG_FUNCTION_ARGS) if (distributionMethod != DISTRIBUTE_BY_HASH) { ConvertToDistributedTable(relationId, distributionColumnName, - distributionMethod, INVALID_COLOCATION_ID, - REPLICATION_MODEL_COORDINATOR); + distributionMethod, INVALID_COLOCATION_ID); PG_RETURN_VOID(); } @@ -228,7 +229,7 @@ CreateReferenceTable(Oid relationId) /* first, convert the relation into distributed relation */ ConvertToDistributedTable(relationId, distributionColumnName, - DISTRIBUTE_BY_NONE, colocationId, REPLICATION_MODEL_2PC); + DISTRIBUTE_BY_NONE, colocationId); /* now, create the single shard replicated to all nodes */ CreateReferenceTableShard(relationId); @@ -250,14 +251,27 @@ CreateReferenceTable(Oid relationId) */ static void ConvertToDistributedTable(Oid relationId, char *distributionColumnName, - char distributionMethod, uint32 colocationId, - char replicationModel) + char distributionMethod, uint32 colocationId) { Relation relation = NULL; TupleDesc relationDesc = NULL; char *relationName = NULL; char relationKind = 0; Var *distributionColumn = NULL; + char replicationModel = REPLICATION_MODEL_INVALID; + + /* check global replication settings before continuing */ + EnsureReplicationSettings(InvalidOid); + + /* distribute by none tables use 2PC replication; otherwise use GUC setting */ + if (distributionMethod == DISTRIBUTE_BY_NONE) + { + replicationModel = REPLICATION_MODEL_2PC; + } + else + { + replicationModel = ReplicationModel; + } /* * Lock target relation with an exclusive lock - there's no way to make @@ -891,21 +905,10 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName, uint32 colocationId = INVALID_COLOCATION_ID; Oid sourceRelationId = InvalidOid; Oid distributionColumnType = InvalidOid; - char replicationModel = 0; /* get an access lock on the relation to prevent DROP TABLE and ALTER TABLE */ distributedRelation = relation_open(relationId, AccessShareLock); - /* all hash-distributed tables with repfactor=1 are treated as MX tables */ - if (replicationFactor == 1) - { - replicationModel = REPLICATION_MODEL_STREAMING; - } - else - { - replicationModel = REPLICATION_MODEL_COORDINATOR; - } - /* * Get an exclusive lock on the colocation system catalog. Therefore, we * can be sure that there will no modifications on the colocation table @@ -946,7 +949,7 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName, /* create distributed table metadata */ ConvertToDistributedTable(relationId, distributionColumnName, DISTRIBUTE_BY_HASH, - colocationId, replicationModel); + colocationId); /* create shards */ if (sourceRelationId != InvalidOid) @@ -979,3 +982,34 @@ ColumnType(Oid relationId, char *columnName) return columnType; } + + +/* + * Check that the current replication factor setting is compatible with the + * replication model of relationId, if valid. If InvalidOid, check that the + * global replication model setting instead. Errors out if an invalid state + * is detected. + */ +void +EnsureReplicationSettings(Oid relationId) +{ + char replicationModel = (char) ReplicationModel; + char *msgSuffix = "the streaming replication model"; + char *extraHint = " or setting \"citus.replication_model\" to \"statement\""; + + if (relationId != InvalidOid) + { + replicationModel = TableReplicationModel(relationId); + msgSuffix = "tables which use the streaming replication model"; + extraHint = ""; + } + + if (replicationModel == REPLICATION_MODEL_STREAMING && ShardReplicationFactor != 1) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("replication factors above one are incompatible with %s", + msgSuffix), + errhint("Try again after reducing \"citus.shard_replication_" + "factor\" to one%s.", extraHint))); + } +} diff --git a/src/backend/distributed/master/master_create_shards.c b/src/backend/distributed/master/master_create_shards.c index 880abac71..6d311d256 100644 --- a/src/backend/distributed/master/master_create_shards.c +++ b/src/backend/distributed/master/master_create_shards.c @@ -27,6 +27,7 @@ #include "catalog/namespace.h" #include "catalog/pg_class.h" #include "distributed/listutils.h" +#include "distributed/master_metadata_utility.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/multi_join_order.h" diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index 789e2b816..de8e845dd 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -127,6 +127,8 @@ master_create_empty_shard(PG_FUNCTION_ARGS) "on reference tables"))); } + EnsureReplicationSettings(relationId); + /* generate new and unique shardId from sequence */ shardId = GetNextShardId(); diff --git a/src/backend/distributed/planner/multi_join_order.c b/src/backend/distributed/planner/multi_join_order.c index 5639b4190..ce0894e4e 100644 --- a/src/backend/distributed/planner/multi_join_order.c +++ b/src/backend/distributed/planner/multi_join_order.c @@ -1586,3 +1586,16 @@ PartitionMethod(Oid relationId) return partitionMethod; } + + +/* Returns the replication model for the given relation. */ +char +TableReplicationModel(Oid relationId) +{ + /* errors out if not a distributed table */ + DistTableCacheEntry *partitionEntry = DistributedTableCacheEntry(relationId); + + char replicationModel = partitionEntry->replicationModel; + + return replicationModel; +} diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index b701c624e..c400e1e42 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -21,6 +21,7 @@ #include "distributed/citus_nodefuncs.h" #include "distributed/connection_management.h" #include "distributed/connection_management.h" +#include "distributed/master_metadata_utility.h" #include "distributed/master_protocol.h" #include "distributed/multi_copy.h" #include "distributed/multi_executor.h" @@ -32,6 +33,7 @@ #include "distributed/multi_router_planner.h" #include "distributed/multi_server_executor.h" #include "distributed/multi_utility.h" +#include "distributed/pg_dist_partition.h" #include "distributed/placement_connection.h" #include "distributed/remote_commands.h" #include "distributed/task_tracker.h" @@ -63,6 +65,12 @@ static const struct config_enum_entry task_assignment_policy_options[] = { { NULL, 0, false } }; +static const struct config_enum_entry replication_model_options[] = { + { "statement", REPLICATION_MODEL_COORDINATOR, false }, + { "streaming", REPLICATION_MODEL_STREAMING, false }, + { NULL, 0, false } +}; + static const struct config_enum_entry task_executor_type_options[] = { { "real-time", MULTI_EXECUTOR_REAL_TIME, false }, { "task-tracker", MULTI_EXECUTOR_TASK_TRACKER, false }, @@ -571,6 +579,20 @@ RegisterCitusConfigVariables(void) 0, NULL, NULL, NULL); + DefineCustomEnumVariable( + "citus.replication_model", + gettext_noop("Sets the replication model to be used for distributed tables."), + gettext_noop("Depending upon the execution environment, statement- or streaming-" + "based replication modes may be employed. Though most Citus deploy-" + "ments will simply use statement replication, hosted and MX-style" + "deployments should set this parameter to 'streaming'."), + &ReplicationModel, + REPLICATION_MODEL_COORDINATOR, + replication_model_options, + PGC_SUSET, + GUC_SUPERUSER_ONLY, + NULL, NULL, NULL); + DefineCustomEnumVariable( "citus.task_executor_type", gettext_noop("Sets the executor type to be used for distributed queries."), diff --git a/src/include/distributed/master_metadata_utility.h b/src/include/distributed/master_metadata_utility.h index 74a9f10b5..8b8857927 100644 --- a/src/include/distributed/master_metadata_utility.h +++ b/src/include/distributed/master_metadata_utility.h @@ -59,6 +59,9 @@ typedef struct ShardPlacement } ShardPlacement; +/* Config variable managed via guc.c */ +extern int ReplicationModel; + /* Function declarations to read shard and shard placement data */ extern uint32 TableShardReplicationFactor(Oid relationId); extern List * LoadShardIntervalList(Oid relationId); @@ -97,6 +100,7 @@ extern char * TableOwner(Oid relationId); extern void EnsureTablePermissions(Oid relationId, AclMode mode); extern void EnsureTableOwner(Oid relationId); extern void EnsureSuperUser(void); +extern void EnsureReplicationSettings(Oid relationId); extern bool TableReferenced(Oid relationId); extern char * ConstructQualifiedShardName(ShardInterval *shardInterval); extern Datum StringToDatum(char *inputString, Oid dataType); diff --git a/src/include/distributed/multi_join_order.h b/src/include/distributed/multi_join_order.h index b9276b13a..a2466e57a 100644 --- a/src/include/distributed/multi_join_order.h +++ b/src/include/distributed/multi_join_order.h @@ -92,6 +92,7 @@ extern Var * RightColumn(OpExpr *joinClause); extern Var * PartitionColumn(Oid relationId, uint32 rangeTableId); extern Var * PartitionKey(Oid relationId); extern char PartitionMethod(Oid relationId); +extern char TableReplicationModel(Oid relationId); #endif /* MULTI_JOIN_ORDER_H */ diff --git a/src/test/regress/expected/multi_create_table.out b/src/test/regress/expected/multi_create_table.out index 4570273d0..266b1d794 100644 --- a/src/test/regress/expected/multi_create_table.out +++ b/src/test/regress/expected/multi_create_table.out @@ -137,10 +137,16 @@ SELECT master_create_distributed_table('supplier_single_shard', 's_suppkey', 'ap (1 row) --- Show that when a hash distributed table with replication factor=1 is created, it --- automatically marked as streaming replicated -SET citus.shard_replication_factor TO 1; CREATE TABLE mx_table_test (col1 int, col2 text); +-- Since we're superuser, we can set the replication model to 'streaming' to +-- create a one-off MX table... but if we forget to set the replication factor to one, +-- we should see an error reminding us to fix that +SET citus.replication_model TO 'streaming'; +SELECT create_distributed_table('mx_table_test', 'col1'); +ERROR: replication factors above one are incompatible with the streaming replication model +HINT: Try again after reducing "citus.shard_replication_factor" to one or setting "citus.replication_model" to "statement". +-- ok, so now actually create the one-off MX table +SET citus.shard_replication_factor TO 1; SELECT create_distributed_table('mx_table_test', 'col1'); create_distributed_table -------------------------- @@ -153,7 +159,8 @@ SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regcl s (1 row) -DROP TABLE mx_table_test; +DROP TABLE mx_table_test; +RESET citus.replication_model; -- Show that it is not possible to create an mx table with the old -- master_create_distributed_table function CREATE TABLE mx_table_test (col1 int, col2 text); diff --git a/src/test/regress/expected/multi_metadata_sync.out b/src/test/regress/expected/multi_metadata_sync.out index 64a908322..f034c3a73 100644 --- a/src/test/regress/expected/multi_metadata_sync.out +++ b/src/test/regress/expected/multi_metadata_sync.out @@ -267,7 +267,10 @@ SELECT count(*) FROM pg_trigger WHERE tgrelid='mx_testing_schema.mx_test_table': -- Make sure that start_metadata_sync_to_node considers foreign key constraints \c - - - :master_port +-- Since we're superuser, we can set the replication model to 'streaming' to +-- create some MX tables SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; CREATE SCHEMA mx_testing_schema_2; CREATE TABLE mx_testing_schema.fk_test_1 (col1 int, col2 text, col3 int, UNIQUE(col1, col3)); CREATE TABLE mx_testing_schema_2.fk_test_2 (col1 int, col2 int, col3 text, @@ -307,6 +310,7 @@ Foreign-key constraints: DROP TABLE mx_testing_schema_2.fk_test_2; DROP TABLE mx_testing_schema.fk_test_1; RESET citus.shard_replication_factor; +RESET citus.replication_model; -- Check that repeated calls to start_metadata_sync_to_node has no side effects \c - - - :master_port SELECT start_metadata_sync_to_node('localhost', :worker_1_port); @@ -399,6 +403,7 @@ SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port; -- Check that the distributed table can be queried from the worker \c - - - :master_port SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; SELECT start_metadata_sync_to_node('localhost', :worker_1_port); start_metadata_sync_to_node ----------------------------- @@ -489,6 +494,7 @@ CREATE SCHEMA mx_test_schema_1; CREATE SCHEMA mx_test_schema_2; -- Create MX tables SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; CREATE TABLE mx_test_schema_1.mx_table_1 (col1 int UNIQUE, col2 text); CREATE INDEX mx_index_1 ON mx_test_schema_1.mx_table_1 (col1); CREATE TABLE mx_test_schema_2.mx_table_2 (col1 int, col2 text); @@ -754,6 +760,7 @@ SELECT nextval('pg_catalog.pg_dist_colocationid_seq') AS last_colocation_id \gse ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 10000; SET citus.shard_count TO 7; SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; CREATE TABLE mx_colocation_test_1 (a int); SELECT create_distributed_table('mx_colocation_test_1', 'a'); create_distributed_table @@ -848,6 +855,7 @@ DROP TABLE mx_colocation_test_2; \c - - - :master_port SET citus.shard_count TO 7; SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; CREATE TABLE mx_temp_drop_test (a int); SELECT create_distributed_table('mx_temp_drop_test', 'a'); create_distributed_table @@ -880,6 +888,7 @@ DROP TABLE mx_temp_drop_test; \c - - - :master_port SET citus.shard_count TO 3; SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); stop_metadata_sync_to_node ---------------------------- @@ -1097,7 +1106,8 @@ SELECT master_remove_node('localhost', :worker_2_port); (1 row) -CREATE USER mx_user; + -- the master user needs superuser permissions to change the replication model +CREATE USER mx_user WITH SUPERUSER; NOTICE: not propagating CREATE ROLE/USER commands to worker nodes HINT: Connect to worker nodes directly to manually create all necessary users and roles. \c - - - :worker_1_port @@ -1112,6 +1122,7 @@ HINT: Connect to worker nodes directly to manually create all necessary users a -- Create an mx table as a different user CREATE TABLE mx_table (a int, b BIGSERIAL); SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; SELECT create_distributed_table('mx_table', 'a'); create_distributed_table -------------------------- @@ -1352,6 +1363,7 @@ SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); RESET citus.shard_count; RESET citus.shard_replication_factor; +RESET citus.replication_model; RESET citus.multi_shard_commit_protocol; ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id; ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id; diff --git a/src/test/regress/expected/multi_replicate_reference_table.out b/src/test/regress/expected/multi_replicate_reference_table.out index 3c4e6147f..869a3f905 100644 --- a/src/test/regress/expected/multi_replicate_reference_table.out +++ b/src/test/regress/expected/multi_replicate_reference_table.out @@ -353,6 +353,7 @@ SELECT create_reference_table('replicate_reference_table_reference_one'); SET citus.shard_count TO 1; SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; CREATE TABLE replicate_reference_table_hash(column1 int); SELECT create_distributed_table('replicate_reference_table_hash', 'column1'); create_distributed_table @@ -605,3 +606,5 @@ DROP SCHEMA replicate_reference_table_schema CASCADE; -- reload pg_dist_shard_placement table INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement); DROP TABLE tmp_shard_placement; +RESET citus.shard_replication_factor; +RESET citus.replication_model; diff --git a/src/test/regress/expected/multi_table_ddl.out b/src/test/regress/expected/multi_table_ddl.out index b4d2c3704..9e230ba0b 100644 --- a/src/test/regress/expected/multi_table_ddl.out +++ b/src/test/regress/expected/multi_table_ddl.out @@ -33,6 +33,9 @@ ROLLBACK; DROP TABLE testtableddl; -- verify that the table can dropped even if shards exist CREATE TABLE testtableddl(somecol int, distributecol text NOT NULL); +-- create table as MX table to do create empty shard test here, too +SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; SELECT master_create_distributed_table('testtableddl', 'distributecol', 'append'); master_create_distributed_table --------------------------------- @@ -45,7 +48,15 @@ SELECT 1 FROM master_create_empty_shard('testtableddl'); 1 (1 row) +-- this'll error out +SET citus.shard_replication_factor TO 2; +SELECT 1 FROM master_create_empty_shard('testtableddl'); +ERROR: replication factors above one are incompatible with tables which use the streaming replication model +HINT: Try again after reducing "citus.shard_replication_factor" to one. +-- now actually drop table and shards DROP TABLE testtableddl; +RESET citus.shard_replication_factor; +RESET citus.replication_model; -- ensure no metadata of distributed tables are remaining SELECT * FROM pg_dist_partition; logicalrelid | partmethod | partkey | colocationid | repmodel diff --git a/src/test/regress/expected/multi_unsupported_worker_operations.out b/src/test/regress/expected/multi_unsupported_worker_operations.out index aea3b2b89..0d9bfbf03 100644 --- a/src/test/regress/expected/multi_unsupported_worker_operations.out +++ b/src/test/regress/expected/multi_unsupported_worker_operations.out @@ -12,6 +12,7 @@ SELECT nextval('pg_catalog.pg_dist_colocationid_seq') AS last_colocation_id \gse ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 150000; -- Prepare the environment SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; SET citus.shard_count TO 5; -- Create test tables CREATE TABLE mx_table (col_1 int, col_2 text, col_3 BIGSERIAL); @@ -403,3 +404,5 @@ SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition; \c - - - :master_port ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART :last_colocation_id; +RESET citus.shard_replication_factor; +RESET citus.replication_model; diff --git a/src/test/regress/expected/multi_upgrade_reference_table.out b/src/test/regress/expected/multi_upgrade_reference_table.out index fb6e523d5..1d5853d49 100644 --- a/src/test/regress/expected/multi_upgrade_reference_table.out +++ b/src/test/regress/expected/multi_upgrade_reference_table.out @@ -801,6 +801,7 @@ DROP TABLE upgrade_reference_table_transaction_commit; -- create an mx table SET citus.shard_count TO 1; SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; CREATE TABLE upgrade_reference_table_mx(column1 int); SELECT create_distributed_table('upgrade_reference_table_mx', 'column1'); create_distributed_table @@ -911,6 +912,7 @@ DROP TABLE upgrade_reference_table_mx; -- test valid cases, do it with MX SET citus.shard_count TO 1; SET citus.shard_replication_factor TO 2; +RESET citus.replication_model; CREATE TABLE upgrade_reference_table_mx(column1 int); SELECT create_distributed_table('upgrade_reference_table_mx', 'column1'); create_distributed_table diff --git a/src/test/regress/sql/multi_create_table.sql b/src/test/regress/sql/multi_create_table.sql index 0414653fc..5bb175a3f 100644 --- a/src/test/regress/sql/multi_create_table.sql +++ b/src/test/regress/sql/multi_create_table.sql @@ -112,14 +112,21 @@ CREATE TABLE supplier_single_shard ); SELECT master_create_distributed_table('supplier_single_shard', 's_suppkey', 'append'); --- Show that when a hash distributed table with replication factor=1 is created, it --- automatically marked as streaming replicated -SET citus.shard_replication_factor TO 1; - CREATE TABLE mx_table_test (col1 int, col2 text); + +-- Since we're superuser, we can set the replication model to 'streaming' to +-- create a one-off MX table... but if we forget to set the replication factor to one, +-- we should see an error reminding us to fix that +SET citus.replication_model TO 'streaming'; +SELECT create_distributed_table('mx_table_test', 'col1'); + +-- ok, so now actually create the one-off MX table +SET citus.shard_replication_factor TO 1; SELECT create_distributed_table('mx_table_test', 'col1'); SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regclass; -DROP TABLE mx_table_test; +DROP TABLE mx_table_test; + +RESET citus.replication_model; -- Show that it is not possible to create an mx table with the old -- master_create_distributed_table function diff --git a/src/test/regress/sql/multi_metadata_sync.sql b/src/test/regress/sql/multi_metadata_sync.sql index 394397d32..68ad4f615 100644 --- a/src/test/regress/sql/multi_metadata_sync.sql +++ b/src/test/regress/sql/multi_metadata_sync.sql @@ -89,7 +89,11 @@ SELECT count(*) FROM pg_trigger WHERE tgrelid='mx_testing_schema.mx_test_table': -- Make sure that start_metadata_sync_to_node considers foreign key constraints \c - - - :master_port + +-- Since we're superuser, we can set the replication model to 'streaming' to +-- create some MX tables SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; CREATE SCHEMA mx_testing_schema_2; @@ -111,6 +115,7 @@ DROP TABLE mx_testing_schema_2.fk_test_2; DROP TABLE mx_testing_schema.fk_test_1; RESET citus.shard_replication_factor; +RESET citus.replication_model; -- Check that repeated calls to start_metadata_sync_to_node has no side effects \c - - - :master_port @@ -136,6 +141,7 @@ SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port; -- Check that the distributed table can be queried from the worker \c - - - :master_port SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; SELECT start_metadata_sync_to_node('localhost', :worker_1_port); CREATE TABLE mx_query_test (a int, b text, c int); @@ -177,6 +183,7 @@ CREATE SCHEMA mx_test_schema_2; -- Create MX tables SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; CREATE TABLE mx_test_schema_1.mx_table_1 (col1 int UNIQUE, col2 text); CREATE INDEX mx_index_1 ON mx_test_schema_1.mx_table_1 (col1); @@ -302,6 +309,7 @@ SELECT nextval('pg_catalog.pg_dist_colocationid_seq') AS last_colocation_id \gse ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 10000; SET citus.shard_count TO 7; SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; CREATE TABLE mx_colocation_test_1 (a int); SELECT create_distributed_table('mx_colocation_test_1', 'a'); @@ -370,6 +378,7 @@ DROP TABLE mx_colocation_test_2; \c - - - :master_port SET citus.shard_count TO 7; SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; CREATE TABLE mx_temp_drop_test (a int); SELECT create_distributed_table('mx_temp_drop_test', 'a'); @@ -387,6 +396,7 @@ DROP TABLE mx_temp_drop_test; \c - - - :master_port SET citus.shard_count TO 3; SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); @@ -465,7 +475,8 @@ DELETE FROM pg_dist_shard_placement; DELETE FROM pg_dist_partition; SELECT master_remove_node('localhost', :worker_2_port); -CREATE USER mx_user; + -- the master user needs superuser permissions to change the replication model +CREATE USER mx_user WITH SUPERUSER; \c - - - :worker_1_port CREATE USER mx_user; \c - - - :worker_2_port @@ -475,6 +486,7 @@ CREATE USER mx_user; -- Create an mx table as a different user CREATE TABLE mx_table (a int, b BIGSERIAL); SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; SELECT create_distributed_table('mx_table', 'a'); \c - postgres - :master_port @@ -591,6 +603,7 @@ SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); RESET citus.shard_count; RESET citus.shard_replication_factor; +RESET citus.replication_model; RESET citus.multi_shard_commit_protocol; ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id; diff --git a/src/test/regress/sql/multi_replicate_reference_table.sql b/src/test/regress/sql/multi_replicate_reference_table.sql index 39931af95..86c5c8ea0 100644 --- a/src/test/regress/sql/multi_replicate_reference_table.sql +++ b/src/test/regress/sql/multi_replicate_reference_table.sql @@ -228,6 +228,7 @@ SELECT create_reference_table('replicate_reference_table_reference_one'); SET citus.shard_count TO 1; SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; CREATE TABLE replicate_reference_table_hash(column1 int); SELECT create_distributed_table('replicate_reference_table_hash', 'column1'); @@ -395,3 +396,6 @@ DROP SCHEMA replicate_reference_table_schema CASCADE; -- reload pg_dist_shard_placement table INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement); DROP TABLE tmp_shard_placement; + +RESET citus.shard_replication_factor; +RESET citus.replication_model; diff --git a/src/test/regress/sql/multi_table_ddl.sql b/src/test/regress/sql/multi_table_ddl.sql index 8cb2ddbf3..166d7cdbd 100644 --- a/src/test/regress/sql/multi_table_ddl.sql +++ b/src/test/regress/sql/multi_table_ddl.sql @@ -32,10 +32,23 @@ DROP TABLE testtableddl; -- verify that the table can dropped even if shards exist CREATE TABLE testtableddl(somecol int, distributecol text NOT NULL); + +-- create table as MX table to do create empty shard test here, too +SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; SELECT master_create_distributed_table('testtableddl', 'distributecol', 'append'); SELECT 1 FROM master_create_empty_shard('testtableddl'); + +-- this'll error out +SET citus.shard_replication_factor TO 2; +SELECT 1 FROM master_create_empty_shard('testtableddl'); + +-- now actually drop table and shards DROP TABLE testtableddl; +RESET citus.shard_replication_factor; +RESET citus.replication_model; + -- ensure no metadata of distributed tables are remaining SELECT * FROM pg_dist_partition; SELECT * FROM pg_dist_shard; diff --git a/src/test/regress/sql/multi_unsupported_worker_operations.sql b/src/test/regress/sql/multi_unsupported_worker_operations.sql index ceac8b0f4..589b0766b 100644 --- a/src/test/regress/sql/multi_unsupported_worker_operations.sql +++ b/src/test/regress/sql/multi_unsupported_worker_operations.sql @@ -16,6 +16,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 150000; -- Prepare the environment SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; SET citus.shard_count TO 5; -- Create test tables @@ -213,3 +214,6 @@ DELETE FROM pg_dist_node; SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition; \c - - - :master_port ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART :last_colocation_id; + +RESET citus.shard_replication_factor; +RESET citus.replication_model; diff --git a/src/test/regress/sql/multi_upgrade_reference_table.sql b/src/test/regress/sql/multi_upgrade_reference_table.sql index 494eb5eb0..f4f25f084 100644 --- a/src/test/regress/sql/multi_upgrade_reference_table.sql +++ b/src/test/regress/sql/multi_upgrade_reference_table.sql @@ -522,6 +522,7 @@ DROP TABLE upgrade_reference_table_transaction_commit; -- create an mx table SET citus.shard_count TO 1; SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; CREATE TABLE upgrade_reference_table_mx(column1 int); SELECT create_distributed_table('upgrade_reference_table_mx', 'column1'); @@ -596,6 +597,7 @@ DROP TABLE upgrade_reference_table_mx; -- test valid cases, do it with MX SET citus.shard_count TO 1; SET citus.shard_replication_factor TO 2; +RESET citus.replication_model; CREATE TABLE upgrade_reference_table_mx(column1 int); SELECT create_distributed_table('upgrade_reference_table_mx', 'column1'); UPDATE pg_dist_shard_placement SET shardstate = 3