mirror of https://github.com/citusdata/citus.git
Add replication_model GUC
This adds a replication_model GUC which is used as the replication model for any new distributed table that is not a reference table. With this change, tables with replication factor 1 are no longer implicitly MX tables. The GUC is similarly respected during empty shard creation for e.g. existing append-partitioned tables. If the model is set to streaming while replication factor is greater than one, table and shard creation routines will error until this invalid combination is corrected. Changing this parameter requires superuser permissions.pull/1137/head
parent
fe5465aa4e
commit
56197dbdba
|
@ -60,11 +60,14 @@
|
|||
#include "utils/inval.h"
|
||||
|
||||
|
||||
/* Replication model to use when creating distributed tables */
|
||||
int ReplicationModel = REPLICATION_MODEL_COORDINATOR;
|
||||
|
||||
|
||||
/* local function forward declarations */
|
||||
static void CreateReferenceTable(Oid relationId);
|
||||
static void ConvertToDistributedTable(Oid relationId, char *distributionColumnName,
|
||||
char distributionMethod, uint32 colocationId,
|
||||
char replicationModel);
|
||||
char distributionMethod, uint32 colocationId);
|
||||
static char LookupDistributionMethod(Oid distributionMethodOid);
|
||||
static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId,
|
||||
int16 supportFunctionNumber);
|
||||
|
@ -106,8 +109,7 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
|
|||
EnsureSchemaNode();
|
||||
|
||||
ConvertToDistributedTable(distributedRelationId, distributionColumnName,
|
||||
distributionMethod, INVALID_COLOCATION_ID,
|
||||
REPLICATION_MODEL_COORDINATOR);
|
||||
distributionMethod, INVALID_COLOCATION_ID);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
@ -164,8 +166,7 @@ create_distributed_table(PG_FUNCTION_ARGS)
|
|||
if (distributionMethod != DISTRIBUTE_BY_HASH)
|
||||
{
|
||||
ConvertToDistributedTable(relationId, distributionColumnName,
|
||||
distributionMethod, INVALID_COLOCATION_ID,
|
||||
REPLICATION_MODEL_COORDINATOR);
|
||||
distributionMethod, INVALID_COLOCATION_ID);
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
@ -228,7 +229,7 @@ CreateReferenceTable(Oid relationId)
|
|||
|
||||
/* first, convert the relation into distributed relation */
|
||||
ConvertToDistributedTable(relationId, distributionColumnName,
|
||||
DISTRIBUTE_BY_NONE, colocationId, REPLICATION_MODEL_2PC);
|
||||
DISTRIBUTE_BY_NONE, colocationId);
|
||||
|
||||
/* now, create the single shard replicated to all nodes */
|
||||
CreateReferenceTableShard(relationId);
|
||||
|
@ -250,14 +251,27 @@ CreateReferenceTable(Oid relationId)
|
|||
*/
|
||||
static void
|
||||
ConvertToDistributedTable(Oid relationId, char *distributionColumnName,
|
||||
char distributionMethod, uint32 colocationId,
|
||||
char replicationModel)
|
||||
char distributionMethod, uint32 colocationId)
|
||||
{
|
||||
Relation relation = NULL;
|
||||
TupleDesc relationDesc = NULL;
|
||||
char *relationName = NULL;
|
||||
char relationKind = 0;
|
||||
Var *distributionColumn = NULL;
|
||||
char replicationModel = REPLICATION_MODEL_INVALID;
|
||||
|
||||
/* check global replication settings before continuing */
|
||||
EnsureReplicationSettings(InvalidOid);
|
||||
|
||||
/* distribute by none tables use 2PC replication; otherwise use GUC setting */
|
||||
if (distributionMethod == DISTRIBUTE_BY_NONE)
|
||||
{
|
||||
replicationModel = REPLICATION_MODEL_2PC;
|
||||
}
|
||||
else
|
||||
{
|
||||
replicationModel = ReplicationModel;
|
||||
}
|
||||
|
||||
/*
|
||||
* Lock target relation with an exclusive lock - there's no way to make
|
||||
|
@ -891,21 +905,10 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
|
|||
uint32 colocationId = INVALID_COLOCATION_ID;
|
||||
Oid sourceRelationId = InvalidOid;
|
||||
Oid distributionColumnType = InvalidOid;
|
||||
char replicationModel = 0;
|
||||
|
||||
/* get an access lock on the relation to prevent DROP TABLE and ALTER TABLE */
|
||||
distributedRelation = relation_open(relationId, AccessShareLock);
|
||||
|
||||
/* all hash-distributed tables with repfactor=1 are treated as MX tables */
|
||||
if (replicationFactor == 1)
|
||||
{
|
||||
replicationModel = REPLICATION_MODEL_STREAMING;
|
||||
}
|
||||
else
|
||||
{
|
||||
replicationModel = REPLICATION_MODEL_COORDINATOR;
|
||||
}
|
||||
|
||||
/*
|
||||
* Get an exclusive lock on the colocation system catalog. Therefore, we
|
||||
* can be sure that there will no modifications on the colocation table
|
||||
|
@ -946,7 +949,7 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
|
|||
|
||||
/* create distributed table metadata */
|
||||
ConvertToDistributedTable(relationId, distributionColumnName, DISTRIBUTE_BY_HASH,
|
||||
colocationId, replicationModel);
|
||||
colocationId);
|
||||
|
||||
/* create shards */
|
||||
if (sourceRelationId != InvalidOid)
|
||||
|
@ -979,3 +982,34 @@ ColumnType(Oid relationId, char *columnName)
|
|||
|
||||
return columnType;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Check that the current replication factor setting is compatible with the
|
||||
* replication model of relationId, if valid. If InvalidOid, check that the
|
||||
* global replication model setting instead. Errors out if an invalid state
|
||||
* is detected.
|
||||
*/
|
||||
void
|
||||
EnsureReplicationSettings(Oid relationId)
|
||||
{
|
||||
char replicationModel = (char) ReplicationModel;
|
||||
char *msgSuffix = "the streaming replication model";
|
||||
char *extraHint = " or setting \"citus.replication_model\" to \"statement\"";
|
||||
|
||||
if (relationId != InvalidOid)
|
||||
{
|
||||
replicationModel = TableReplicationModel(relationId);
|
||||
msgSuffix = "tables which use the streaming replication model";
|
||||
extraHint = "";
|
||||
}
|
||||
|
||||
if (replicationModel == REPLICATION_MODEL_STREAMING && ShardReplicationFactor != 1)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
errmsg("replication factors above one are incompatible with %s",
|
||||
msgSuffix),
|
||||
errhint("Try again after reducing \"citus.shard_replication_"
|
||||
"factor\" to one%s.", extraHint)));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@
|
|||
#include "catalog/namespace.h"
|
||||
#include "catalog/pg_class.h"
|
||||
#include "distributed/listutils.h"
|
||||
#include "distributed/master_metadata_utility.h"
|
||||
#include "distributed/master_protocol.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/multi_join_order.h"
|
||||
|
|
|
@ -127,6 +127,8 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
|
|||
"on reference tables")));
|
||||
}
|
||||
|
||||
EnsureReplicationSettings(relationId);
|
||||
|
||||
/* generate new and unique shardId from sequence */
|
||||
shardId = GetNextShardId();
|
||||
|
||||
|
|
|
@ -1586,3 +1586,16 @@ PartitionMethod(Oid relationId)
|
|||
|
||||
return partitionMethod;
|
||||
}
|
||||
|
||||
|
||||
/* Returns the replication model for the given relation. */
|
||||
char
|
||||
TableReplicationModel(Oid relationId)
|
||||
{
|
||||
/* errors out if not a distributed table */
|
||||
DistTableCacheEntry *partitionEntry = DistributedTableCacheEntry(relationId);
|
||||
|
||||
char replicationModel = partitionEntry->replicationModel;
|
||||
|
||||
return replicationModel;
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
#include "distributed/citus_nodefuncs.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/master_metadata_utility.h"
|
||||
#include "distributed/master_protocol.h"
|
||||
#include "distributed/multi_copy.h"
|
||||
#include "distributed/multi_executor.h"
|
||||
|
@ -32,6 +33,7 @@
|
|||
#include "distributed/multi_router_planner.h"
|
||||
#include "distributed/multi_server_executor.h"
|
||||
#include "distributed/multi_utility.h"
|
||||
#include "distributed/pg_dist_partition.h"
|
||||
#include "distributed/placement_connection.h"
|
||||
#include "distributed/remote_commands.h"
|
||||
#include "distributed/task_tracker.h"
|
||||
|
@ -63,6 +65,12 @@ static const struct config_enum_entry task_assignment_policy_options[] = {
|
|||
{ NULL, 0, false }
|
||||
};
|
||||
|
||||
static const struct config_enum_entry replication_model_options[] = {
|
||||
{ "statement", REPLICATION_MODEL_COORDINATOR, false },
|
||||
{ "streaming", REPLICATION_MODEL_STREAMING, false },
|
||||
{ NULL, 0, false }
|
||||
};
|
||||
|
||||
static const struct config_enum_entry task_executor_type_options[] = {
|
||||
{ "real-time", MULTI_EXECUTOR_REAL_TIME, false },
|
||||
{ "task-tracker", MULTI_EXECUTOR_TASK_TRACKER, false },
|
||||
|
@ -571,6 +579,20 @@ RegisterCitusConfigVariables(void)
|
|||
0,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomEnumVariable(
|
||||
"citus.replication_model",
|
||||
gettext_noop("Sets the replication model to be used for distributed tables."),
|
||||
gettext_noop("Depending upon the execution environment, statement- or streaming-"
|
||||
"based replication modes may be employed. Though most Citus deploy-"
|
||||
"ments will simply use statement replication, hosted and MX-style"
|
||||
"deployments should set this parameter to 'streaming'."),
|
||||
&ReplicationModel,
|
||||
REPLICATION_MODEL_COORDINATOR,
|
||||
replication_model_options,
|
||||
PGC_SUSET,
|
||||
GUC_SUPERUSER_ONLY,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomEnumVariable(
|
||||
"citus.task_executor_type",
|
||||
gettext_noop("Sets the executor type to be used for distributed queries."),
|
||||
|
|
|
@ -59,6 +59,9 @@ typedef struct ShardPlacement
|
|||
} ShardPlacement;
|
||||
|
||||
|
||||
/* Config variable managed via guc.c */
|
||||
extern int ReplicationModel;
|
||||
|
||||
/* Function declarations to read shard and shard placement data */
|
||||
extern uint32 TableShardReplicationFactor(Oid relationId);
|
||||
extern List * LoadShardIntervalList(Oid relationId);
|
||||
|
@ -97,6 +100,7 @@ extern char * TableOwner(Oid relationId);
|
|||
extern void EnsureTablePermissions(Oid relationId, AclMode mode);
|
||||
extern void EnsureTableOwner(Oid relationId);
|
||||
extern void EnsureSuperUser(void);
|
||||
extern void EnsureReplicationSettings(Oid relationId);
|
||||
extern bool TableReferenced(Oid relationId);
|
||||
extern char * ConstructQualifiedShardName(ShardInterval *shardInterval);
|
||||
extern Datum StringToDatum(char *inputString, Oid dataType);
|
||||
|
|
|
@ -92,6 +92,7 @@ extern Var * RightColumn(OpExpr *joinClause);
|
|||
extern Var * PartitionColumn(Oid relationId, uint32 rangeTableId);
|
||||
extern Var * PartitionKey(Oid relationId);
|
||||
extern char PartitionMethod(Oid relationId);
|
||||
extern char TableReplicationModel(Oid relationId);
|
||||
|
||||
|
||||
#endif /* MULTI_JOIN_ORDER_H */
|
||||
|
|
|
@ -137,10 +137,16 @@ SELECT master_create_distributed_table('supplier_single_shard', 's_suppkey', 'ap
|
|||
|
||||
(1 row)
|
||||
|
||||
-- Show that when a hash distributed table with replication factor=1 is created, it
|
||||
-- automatically marked as streaming replicated
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
CREATE TABLE mx_table_test (col1 int, col2 text);
|
||||
-- Since we're superuser, we can set the replication model to 'streaming' to
|
||||
-- create a one-off MX table... but if we forget to set the replication factor to one,
|
||||
-- we should see an error reminding us to fix that
|
||||
SET citus.replication_model TO 'streaming';
|
||||
SELECT create_distributed_table('mx_table_test', 'col1');
|
||||
ERROR: replication factors above one are incompatible with the streaming replication model
|
||||
HINT: Try again after reducing "citus.shard_replication_factor" to one or setting "citus.replication_model" to "statement".
|
||||
-- ok, so now actually create the one-off MX table
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SELECT create_distributed_table('mx_table_test', 'col1');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
@ -153,7 +159,8 @@ SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regcl
|
|||
s
|
||||
(1 row)
|
||||
|
||||
DROP TABLE mx_table_test;
|
||||
DROP TABLE mx_table_test;
|
||||
RESET citus.replication_model;
|
||||
-- Show that it is not possible to create an mx table with the old
|
||||
-- master_create_distributed_table function
|
||||
CREATE TABLE mx_table_test (col1 int, col2 text);
|
||||
|
|
|
@ -267,7 +267,10 @@ SELECT count(*) FROM pg_trigger WHERE tgrelid='mx_testing_schema.mx_test_table':
|
|||
|
||||
-- Make sure that start_metadata_sync_to_node considers foreign key constraints
|
||||
\c - - - :master_port
|
||||
-- Since we're superuser, we can set the replication model to 'streaming' to
|
||||
-- create some MX tables
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.replication_model TO 'streaming';
|
||||
CREATE SCHEMA mx_testing_schema_2;
|
||||
CREATE TABLE mx_testing_schema.fk_test_1 (col1 int, col2 text, col3 int, UNIQUE(col1, col3));
|
||||
CREATE TABLE mx_testing_schema_2.fk_test_2 (col1 int, col2 int, col3 text,
|
||||
|
@ -307,6 +310,7 @@ Foreign-key constraints:
|
|||
DROP TABLE mx_testing_schema_2.fk_test_2;
|
||||
DROP TABLE mx_testing_schema.fk_test_1;
|
||||
RESET citus.shard_replication_factor;
|
||||
RESET citus.replication_model;
|
||||
-- Check that repeated calls to start_metadata_sync_to_node has no side effects
|
||||
\c - - - :master_port
|
||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
|
@ -399,6 +403,7 @@ SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port;
|
|||
-- Check that the distributed table can be queried from the worker
|
||||
\c - - - :master_port
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.replication_model TO 'streaming';
|
||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
start_metadata_sync_to_node
|
||||
-----------------------------
|
||||
|
@ -489,6 +494,7 @@ CREATE SCHEMA mx_test_schema_1;
|
|||
CREATE SCHEMA mx_test_schema_2;
|
||||
-- Create MX tables
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.replication_model TO 'streaming';
|
||||
CREATE TABLE mx_test_schema_1.mx_table_1 (col1 int UNIQUE, col2 text);
|
||||
CREATE INDEX mx_index_1 ON mx_test_schema_1.mx_table_1 (col1);
|
||||
CREATE TABLE mx_test_schema_2.mx_table_2 (col1 int, col2 text);
|
||||
|
@ -754,6 +760,7 @@ SELECT nextval('pg_catalog.pg_dist_colocationid_seq') AS last_colocation_id \gse
|
|||
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 10000;
|
||||
SET citus.shard_count TO 7;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.replication_model TO 'streaming';
|
||||
CREATE TABLE mx_colocation_test_1 (a int);
|
||||
SELECT create_distributed_table('mx_colocation_test_1', 'a');
|
||||
create_distributed_table
|
||||
|
@ -848,6 +855,7 @@ DROP TABLE mx_colocation_test_2;
|
|||
\c - - - :master_port
|
||||
SET citus.shard_count TO 7;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.replication_model TO 'streaming';
|
||||
CREATE TABLE mx_temp_drop_test (a int);
|
||||
SELECT create_distributed_table('mx_temp_drop_test', 'a');
|
||||
create_distributed_table
|
||||
|
@ -880,6 +888,7 @@ DROP TABLE mx_temp_drop_test;
|
|||
\c - - - :master_port
|
||||
SET citus.shard_count TO 3;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.replication_model TO 'streaming';
|
||||
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
stop_metadata_sync_to_node
|
||||
----------------------------
|
||||
|
@ -1097,7 +1106,8 @@ SELECT master_remove_node('localhost', :worker_2_port);
|
|||
|
||||
(1 row)
|
||||
|
||||
CREATE USER mx_user;
|
||||
-- the master user needs superuser permissions to change the replication model
|
||||
CREATE USER mx_user WITH SUPERUSER;
|
||||
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes
|
||||
HINT: Connect to worker nodes directly to manually create all necessary users and roles.
|
||||
\c - - - :worker_1_port
|
||||
|
@ -1112,6 +1122,7 @@ HINT: Connect to worker nodes directly to manually create all necessary users a
|
|||
-- Create an mx table as a different user
|
||||
CREATE TABLE mx_table (a int, b BIGSERIAL);
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.replication_model TO 'streaming';
|
||||
SELECT create_distributed_table('mx_table', 'a');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
@ -1352,6 +1363,7 @@ SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
|
|||
|
||||
RESET citus.shard_count;
|
||||
RESET citus.shard_replication_factor;
|
||||
RESET citus.replication_model;
|
||||
RESET citus.multi_shard_commit_protocol;
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id;
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id;
|
||||
|
|
|
@ -353,6 +353,7 @@ SELECT create_reference_table('replicate_reference_table_reference_one');
|
|||
|
||||
SET citus.shard_count TO 1;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.replication_model TO 'streaming';
|
||||
CREATE TABLE replicate_reference_table_hash(column1 int);
|
||||
SELECT create_distributed_table('replicate_reference_table_hash', 'column1');
|
||||
create_distributed_table
|
||||
|
@ -605,3 +606,5 @@ DROP SCHEMA replicate_reference_table_schema CASCADE;
|
|||
-- reload pg_dist_shard_placement table
|
||||
INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement);
|
||||
DROP TABLE tmp_shard_placement;
|
||||
RESET citus.shard_replication_factor;
|
||||
RESET citus.replication_model;
|
||||
|
|
|
@ -33,6 +33,9 @@ ROLLBACK;
|
|||
DROP TABLE testtableddl;
|
||||
-- verify that the table can dropped even if shards exist
|
||||
CREATE TABLE testtableddl(somecol int, distributecol text NOT NULL);
|
||||
-- create table as MX table to do create empty shard test here, too
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.replication_model TO 'streaming';
|
||||
SELECT master_create_distributed_table('testtableddl', 'distributecol', 'append');
|
||||
master_create_distributed_table
|
||||
---------------------------------
|
||||
|
@ -45,7 +48,15 @@ SELECT 1 FROM master_create_empty_shard('testtableddl');
|
|||
1
|
||||
(1 row)
|
||||
|
||||
-- this'll error out
|
||||
SET citus.shard_replication_factor TO 2;
|
||||
SELECT 1 FROM master_create_empty_shard('testtableddl');
|
||||
ERROR: replication factors above one are incompatible with tables which use the streaming replication model
|
||||
HINT: Try again after reducing "citus.shard_replication_factor" to one.
|
||||
-- now actually drop table and shards
|
||||
DROP TABLE testtableddl;
|
||||
RESET citus.shard_replication_factor;
|
||||
RESET citus.replication_model;
|
||||
-- ensure no metadata of distributed tables are remaining
|
||||
SELECT * FROM pg_dist_partition;
|
||||
logicalrelid | partmethod | partkey | colocationid | repmodel
|
||||
|
|
|
@ -12,6 +12,7 @@ SELECT nextval('pg_catalog.pg_dist_colocationid_seq') AS last_colocation_id \gse
|
|||
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 150000;
|
||||
-- Prepare the environment
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.replication_model TO 'streaming';
|
||||
SET citus.shard_count TO 5;
|
||||
-- Create test tables
|
||||
CREATE TABLE mx_table (col_1 int, col_2 text, col_3 BIGSERIAL);
|
||||
|
@ -403,3 +404,5 @@ SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition;
|
|||
|
||||
\c - - - :master_port
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART :last_colocation_id;
|
||||
RESET citus.shard_replication_factor;
|
||||
RESET citus.replication_model;
|
||||
|
|
|
@ -801,6 +801,7 @@ DROP TABLE upgrade_reference_table_transaction_commit;
|
|||
-- create an mx table
|
||||
SET citus.shard_count TO 1;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.replication_model TO 'streaming';
|
||||
CREATE TABLE upgrade_reference_table_mx(column1 int);
|
||||
SELECT create_distributed_table('upgrade_reference_table_mx', 'column1');
|
||||
create_distributed_table
|
||||
|
@ -911,6 +912,7 @@ DROP TABLE upgrade_reference_table_mx;
|
|||
-- test valid cases, do it with MX
|
||||
SET citus.shard_count TO 1;
|
||||
SET citus.shard_replication_factor TO 2;
|
||||
RESET citus.replication_model;
|
||||
CREATE TABLE upgrade_reference_table_mx(column1 int);
|
||||
SELECT create_distributed_table('upgrade_reference_table_mx', 'column1');
|
||||
create_distributed_table
|
||||
|
|
|
@ -112,14 +112,21 @@ CREATE TABLE supplier_single_shard
|
|||
);
|
||||
SELECT master_create_distributed_table('supplier_single_shard', 's_suppkey', 'append');
|
||||
|
||||
-- Show that when a hash distributed table with replication factor=1 is created, it
|
||||
-- automatically marked as streaming replicated
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
|
||||
CREATE TABLE mx_table_test (col1 int, col2 text);
|
||||
|
||||
-- Since we're superuser, we can set the replication model to 'streaming' to
|
||||
-- create a one-off MX table... but if we forget to set the replication factor to one,
|
||||
-- we should see an error reminding us to fix that
|
||||
SET citus.replication_model TO 'streaming';
|
||||
SELECT create_distributed_table('mx_table_test', 'col1');
|
||||
|
||||
-- ok, so now actually create the one-off MX table
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SELECT create_distributed_table('mx_table_test', 'col1');
|
||||
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regclass;
|
||||
DROP TABLE mx_table_test;
|
||||
DROP TABLE mx_table_test;
|
||||
|
||||
RESET citus.replication_model;
|
||||
|
||||
-- Show that it is not possible to create an mx table with the old
|
||||
-- master_create_distributed_table function
|
||||
|
|
|
@ -89,7 +89,11 @@ SELECT count(*) FROM pg_trigger WHERE tgrelid='mx_testing_schema.mx_test_table':
|
|||
|
||||
-- Make sure that start_metadata_sync_to_node considers foreign key constraints
|
||||
\c - - - :master_port
|
||||
|
||||
-- Since we're superuser, we can set the replication model to 'streaming' to
|
||||
-- create some MX tables
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.replication_model TO 'streaming';
|
||||
|
||||
CREATE SCHEMA mx_testing_schema_2;
|
||||
|
||||
|
@ -111,6 +115,7 @@ DROP TABLE mx_testing_schema_2.fk_test_2;
|
|||
DROP TABLE mx_testing_schema.fk_test_1;
|
||||
|
||||
RESET citus.shard_replication_factor;
|
||||
RESET citus.replication_model;
|
||||
|
||||
-- Check that repeated calls to start_metadata_sync_to_node has no side effects
|
||||
\c - - - :master_port
|
||||
|
@ -136,6 +141,7 @@ SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port;
|
|||
-- Check that the distributed table can be queried from the worker
|
||||
\c - - - :master_port
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.replication_model TO 'streaming';
|
||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
|
||||
CREATE TABLE mx_query_test (a int, b text, c int);
|
||||
|
@ -177,6 +183,7 @@ CREATE SCHEMA mx_test_schema_2;
|
|||
|
||||
-- Create MX tables
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.replication_model TO 'streaming';
|
||||
CREATE TABLE mx_test_schema_1.mx_table_1 (col1 int UNIQUE, col2 text);
|
||||
CREATE INDEX mx_index_1 ON mx_test_schema_1.mx_table_1 (col1);
|
||||
|
||||
|
@ -302,6 +309,7 @@ SELECT nextval('pg_catalog.pg_dist_colocationid_seq') AS last_colocation_id \gse
|
|||
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 10000;
|
||||
SET citus.shard_count TO 7;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.replication_model TO 'streaming';
|
||||
|
||||
CREATE TABLE mx_colocation_test_1 (a int);
|
||||
SELECT create_distributed_table('mx_colocation_test_1', 'a');
|
||||
|
@ -370,6 +378,7 @@ DROP TABLE mx_colocation_test_2;
|
|||
\c - - - :master_port
|
||||
SET citus.shard_count TO 7;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.replication_model TO 'streaming';
|
||||
|
||||
CREATE TABLE mx_temp_drop_test (a int);
|
||||
SELECT create_distributed_table('mx_temp_drop_test', 'a');
|
||||
|
@ -387,6 +396,7 @@ DROP TABLE mx_temp_drop_test;
|
|||
\c - - - :master_port
|
||||
SET citus.shard_count TO 3;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.replication_model TO 'streaming';
|
||||
|
||||
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
|
||||
|
@ -465,7 +475,8 @@ DELETE FROM pg_dist_shard_placement;
|
|||
DELETE FROM pg_dist_partition;
|
||||
SELECT master_remove_node('localhost', :worker_2_port);
|
||||
|
||||
CREATE USER mx_user;
|
||||
-- the master user needs superuser permissions to change the replication model
|
||||
CREATE USER mx_user WITH SUPERUSER;
|
||||
\c - - - :worker_1_port
|
||||
CREATE USER mx_user;
|
||||
\c - - - :worker_2_port
|
||||
|
@ -475,6 +486,7 @@ CREATE USER mx_user;
|
|||
-- Create an mx table as a different user
|
||||
CREATE TABLE mx_table (a int, b BIGSERIAL);
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.replication_model TO 'streaming';
|
||||
SELECT create_distributed_table('mx_table', 'a');
|
||||
|
||||
\c - postgres - :master_port
|
||||
|
@ -591,6 +603,7 @@ SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
|
|||
|
||||
RESET citus.shard_count;
|
||||
RESET citus.shard_replication_factor;
|
||||
RESET citus.replication_model;
|
||||
RESET citus.multi_shard_commit_protocol;
|
||||
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id;
|
||||
|
|
|
@ -228,6 +228,7 @@ SELECT create_reference_table('replicate_reference_table_reference_one');
|
|||
|
||||
SET citus.shard_count TO 1;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.replication_model TO 'streaming';
|
||||
CREATE TABLE replicate_reference_table_hash(column1 int);
|
||||
SELECT create_distributed_table('replicate_reference_table_hash', 'column1');
|
||||
|
||||
|
@ -395,3 +396,6 @@ DROP SCHEMA replicate_reference_table_schema CASCADE;
|
|||
-- reload pg_dist_shard_placement table
|
||||
INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement);
|
||||
DROP TABLE tmp_shard_placement;
|
||||
|
||||
RESET citus.shard_replication_factor;
|
||||
RESET citus.replication_model;
|
||||
|
|
|
@ -32,10 +32,23 @@ DROP TABLE testtableddl;
|
|||
|
||||
-- verify that the table can dropped even if shards exist
|
||||
CREATE TABLE testtableddl(somecol int, distributecol text NOT NULL);
|
||||
|
||||
-- create table as MX table to do create empty shard test here, too
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.replication_model TO 'streaming';
|
||||
SELECT master_create_distributed_table('testtableddl', 'distributecol', 'append');
|
||||
SELECT 1 FROM master_create_empty_shard('testtableddl');
|
||||
|
||||
-- this'll error out
|
||||
SET citus.shard_replication_factor TO 2;
|
||||
SELECT 1 FROM master_create_empty_shard('testtableddl');
|
||||
|
||||
-- now actually drop table and shards
|
||||
DROP TABLE testtableddl;
|
||||
|
||||
RESET citus.shard_replication_factor;
|
||||
RESET citus.replication_model;
|
||||
|
||||
-- ensure no metadata of distributed tables are remaining
|
||||
SELECT * FROM pg_dist_partition;
|
||||
SELECT * FROM pg_dist_shard;
|
||||
|
|
|
@ -16,6 +16,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 150000;
|
|||
|
||||
-- Prepare the environment
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.replication_model TO 'streaming';
|
||||
SET citus.shard_count TO 5;
|
||||
|
||||
-- Create test tables
|
||||
|
@ -213,3 +214,6 @@ DELETE FROM pg_dist_node;
|
|||
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition;
|
||||
\c - - - :master_port
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART :last_colocation_id;
|
||||
|
||||
RESET citus.shard_replication_factor;
|
||||
RESET citus.replication_model;
|
||||
|
|
|
@ -522,6 +522,7 @@ DROP TABLE upgrade_reference_table_transaction_commit;
|
|||
-- create an mx table
|
||||
SET citus.shard_count TO 1;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.replication_model TO 'streaming';
|
||||
CREATE TABLE upgrade_reference_table_mx(column1 int);
|
||||
SELECT create_distributed_table('upgrade_reference_table_mx', 'column1');
|
||||
|
||||
|
@ -596,6 +597,7 @@ DROP TABLE upgrade_reference_table_mx;
|
|||
-- test valid cases, do it with MX
|
||||
SET citus.shard_count TO 1;
|
||||
SET citus.shard_replication_factor TO 2;
|
||||
RESET citus.replication_model;
|
||||
CREATE TABLE upgrade_reference_table_mx(column1 int);
|
||||
SELECT create_distributed_table('upgrade_reference_table_mx', 'column1');
|
||||
UPDATE pg_dist_shard_placement SET shardstate = 3
|
||||
|
|
Loading…
Reference in New Issue