mirror of https://github.com/citusdata/citus.git
Enable replication factor > 1 in metadata syncing
parent
1d392427c9
commit
cb5034f91e
|
|
@ -367,9 +367,8 @@ ClusterHasKnownMetadataWorkers()
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ShouldSyncTableMetadata checks if the metadata of a distributed table should be
|
* ShouldSyncTableMetadata checks if the metadata of a distributed table should be
|
||||||
* propagated to metadata workers, i.e. the table is an MX table or reference table.
|
* propagated to metadata workers, i.e. the table is a hash distributed table or
|
||||||
* Tables with streaming replication model (which means RF=1) and hash distribution are
|
* reference/citus local table.
|
||||||
* considered as MX tables while tables with none distribution are reference tables.
|
|
||||||
*/
|
*/
|
||||||
bool
|
bool
|
||||||
ShouldSyncTableMetadata(Oid relationId)
|
ShouldSyncTableMetadata(Oid relationId)
|
||||||
|
|
@ -381,12 +380,8 @@ ShouldSyncTableMetadata(Oid relationId)
|
||||||
|
|
||||||
CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(relationId);
|
CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(relationId);
|
||||||
|
|
||||||
bool streamingReplicated =
|
if (IsCitusTableTypeCacheEntry(tableEntry, HASH_DISTRIBUTED) ||
|
||||||
(tableEntry->replicationModel == REPLICATION_MODEL_STREAMING);
|
IsCitusTableTypeCacheEntry(tableEntry, CITUS_TABLE_WITH_NO_DIST_KEY))
|
||||||
|
|
||||||
bool mxTable = (streamingReplicated && IsCitusTableTypeCacheEntry(tableEntry,
|
|
||||||
HASH_DISTRIBUTED));
|
|
||||||
if (mxTable || IsCitusTableTypeCacheEntry(tableEntry, CITUS_TABLE_WITH_NO_DIST_KEY))
|
|
||||||
{
|
{
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
@ -2182,15 +2177,6 @@ EnsurePartitionMetadataIsSane(Oid relationId, char distributionMethod, int coloc
|
||||||
"known replication models.")));
|
"known replication models.")));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (distributionMethod == DISTRIBUTE_BY_HASH &&
|
|
||||||
replicationModel != REPLICATION_MODEL_STREAMING)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
||||||
errmsg("Hash distributed tables can only have '%c' "
|
|
||||||
"as the replication model.",
|
|
||||||
REPLICATION_MODEL_STREAMING)));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (distributionMethod == DISTRIBUTE_BY_NONE &&
|
if (distributionMethod == DISTRIBUTE_BY_NONE &&
|
||||||
!(replicationModel == REPLICATION_MODEL_STREAMING ||
|
!(replicationModel == REPLICATION_MODEL_STREAMING ||
|
||||||
replicationModel == REPLICATION_MODEL_2PC))
|
replicationModel == REPLICATION_MODEL_2PC))
|
||||||
|
|
|
||||||
|
|
@ -107,15 +107,9 @@ SELECT count(*) FROM history;
|
||||||
2
|
2
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- test we can not replicate MX tables
|
-- test we can replicate MX tables
|
||||||
SET citus.shard_replication_factor TO 1;
|
SET citus.shard_replication_factor TO 1;
|
||||||
-- metadata sync will fail as we have a statement replicated table
|
-- metadata sync will succeed even if we have rep > 1 tables
|
||||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
|
||||||
ERROR: relation "mcsp.history" does not exist
|
|
||||||
CONTEXT: while executing command on localhost:xxxxx
|
|
||||||
-- use streaming replication to enable metadata syncing
|
|
||||||
UPDATE pg_dist_partition SET repmodel='s' WHERE logicalrelid IN
|
|
||||||
('history'::regclass);
|
|
||||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||||
start_metadata_sync_to_node
|
start_metadata_sync_to_node
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
|
||||||
|
|
@ -357,18 +357,6 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
SELECT citus_internal_add_partition_metadata ('test_ref'::regclass, 'n', NULL, 0, 'c');
|
SELECT citus_internal_add_partition_metadata ('test_ref'::regclass, 'n', NULL, 0, 'c');
|
||||||
ERROR: Local or references tables can only have 's' or 't' as the replication model.
|
ERROR: Local or references tables can only have 's' or 't' as the replication model.
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
-- not-matching replication model for hash table
|
|
||||||
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
|
||||||
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
|
|
||||||
assign_distributed_transaction_id
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
SET application_name to 'citus';
|
|
||||||
SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', 0, 't');
|
|
||||||
ERROR: Hash distributed tables can only have 's' as the replication model.
|
|
||||||
ROLLBACK;
|
|
||||||
-- add entry for super user table
|
-- add entry for super user table
|
||||||
\c - postgres - :worker_1_port
|
\c - postgres - :worker_1_port
|
||||||
SET search_path TO metadata_sync_helpers;
|
SET search_path TO metadata_sync_helpers;
|
||||||
|
|
|
||||||
|
|
@ -428,6 +428,7 @@ SELECT create_distributed_table('table_range', 'id', 'range');
|
||||||
-- test foreign table creation
|
-- test foreign table creation
|
||||||
CREATE FOREIGN TABLE table3_groupD ( id int ) SERVER fake_fdw_server;
|
CREATE FOREIGN TABLE table3_groupD ( id int ) SERVER fake_fdw_server;
|
||||||
SELECT create_distributed_table('table3_groupD', 'id');
|
SELECT create_distributed_table('table3_groupD', 'id');
|
||||||
|
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
|
||||||
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
|
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
|
||||||
create_distributed_table
|
create_distributed_table
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
@ -1356,3 +1357,4 @@ DROP TABLE range_table;
|
||||||
DROP TABLE none;
|
DROP TABLE none;
|
||||||
DROP TABLE ref;
|
DROP TABLE ref;
|
||||||
DROP TABLE local_table;
|
DROP TABLE local_table;
|
||||||
|
DROP FOREIGN TABLE table3_groupD CASCADE;
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,8 @@
|
||||||
-- ===================================================================
|
-- ===================================================================
|
||||||
-- create test functions
|
-- create test functions
|
||||||
-- ===================================================================
|
-- ===================================================================
|
||||||
|
CREATE SCHEMA metadata_test;
|
||||||
|
SET search_path TO metadata_test;
|
||||||
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 540000;
|
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 540000;
|
||||||
CREATE FUNCTION load_shard_id_array(regclass)
|
CREATE FUNCTION load_shard_id_array(regclass)
|
||||||
RETURNS bigint[]
|
RETURNS bigint[]
|
||||||
|
|
@ -286,6 +288,7 @@ SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 3);
|
||||||
|
|
||||||
-- verify result of the get_shard_id_for_distribution_column
|
-- verify result of the get_shard_id_for_distribution_column
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
|
SET search_path TO metadata_test;
|
||||||
SELECT * FROM get_shardid_test_table1_540006;
|
SELECT * FROM get_shardid_test_table1_540006;
|
||||||
column1 | column2
|
column1 | column2
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
@ -305,6 +308,7 @@ SELECT * FROM get_shardid_test_table1_540007;
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
|
SET search_path TO metadata_test;
|
||||||
-- test non-existing value
|
-- test non-existing value
|
||||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 4);
|
SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 4);
|
||||||
get_shard_id_for_distribution_column
|
get_shard_id_for_distribution_column
|
||||||
|
|
@ -336,6 +340,7 @@ SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{d, e, f
|
||||||
|
|
||||||
-- verify result of the get_shard_id_for_distribution_column
|
-- verify result of the get_shard_id_for_distribution_column
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
|
SET search_path TO metadata_test;
|
||||||
SELECT * FROM get_shardid_test_table2_540013;
|
SELECT * FROM get_shardid_test_table2_540013;
|
||||||
column1 | column2
|
column1 | column2
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
@ -349,6 +354,7 @@ SELECT * FROM get_shardid_test_table2_540011;
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
|
SET search_path TO metadata_test;
|
||||||
-- test mismatching data type
|
-- test mismatching data type
|
||||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', 'a');
|
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', 'a');
|
||||||
ERROR: malformed array literal: "a"
|
ERROR: malformed array literal: "a"
|
||||||
|
|
@ -578,4 +584,5 @@ ORDER BY
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- clear unnecessary tables;
|
-- clear unnecessary tables;
|
||||||
DROP TABLE get_shardid_test_table1, get_shardid_test_table2, get_shardid_test_table3, get_shardid_test_table4, get_shardid_test_table5, events_table_count;
|
SET client_min_messages TO ERROR;
|
||||||
|
DROP SCHEMA metadata_test CASCADE;
|
||||||
|
|
|
||||||
|
|
@ -172,6 +172,7 @@ CREATE FOREIGN TABLE foreign_table (
|
||||||
full_name text not null default ''
|
full_name text not null default ''
|
||||||
) SERVER fake_fdw_server OPTIONS (encoding 'utf-8', compression 'true');
|
) SERVER fake_fdw_server OPTIONS (encoding 'utf-8', compression 'true');
|
||||||
SELECT create_distributed_table('foreign_table', 'id');
|
SELECT create_distributed_table('foreign_table', 'id');
|
||||||
|
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
|
||||||
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
|
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
|
||||||
create_distributed_table
|
create_distributed_table
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
|
||||||
|
|
@ -103,6 +103,7 @@ CREATE FOREIGN TABLE remote_engagements (
|
||||||
SET citus.shard_count TO 1;
|
SET citus.shard_count TO 1;
|
||||||
SET citus.shard_replication_factor TO 2;
|
SET citus.shard_replication_factor TO 2;
|
||||||
SELECT create_distributed_table('remote_engagements', 'id', 'hash');
|
SELECT create_distributed_table('remote_engagements', 'id', 'hash');
|
||||||
|
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
|
||||||
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
|
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
|
||||||
create_distributed_table
|
create_distributed_table
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
@ -118,3 +119,5 @@ UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = :remotenewshardid AN
|
||||||
SELECT master_copy_shard_placement(:remotenewshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
|
SELECT master_copy_shard_placement(:remotenewshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
|
||||||
ERROR: cannot repair shard
|
ERROR: cannot repair shard
|
||||||
DETAIL: Table remote_engagements is a foreign table. Repairing shards backed by foreign tables is not supported.
|
DETAIL: Table remote_engagements is a foreign table. Repairing shards backed by foreign tables is not supported.
|
||||||
|
-- clean-up
|
||||||
|
DROP FOREIGN TABLE remote_engagements CASCADE;
|
||||||
|
|
|
||||||
|
|
@ -89,15 +89,10 @@ WHERE shardid = get_shard_id_for_distribution_column('history', 'key-1') AND nod
|
||||||
SELECT count(*) FROM data;
|
SELECT count(*) FROM data;
|
||||||
SELECT count(*) FROM history;
|
SELECT count(*) FROM history;
|
||||||
|
|
||||||
-- test we can not replicate MX tables
|
-- test we can replicate MX tables
|
||||||
SET citus.shard_replication_factor TO 1;
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
|
||||||
-- metadata sync will fail as we have a statement replicated table
|
-- metadata sync will succeed even if we have rep > 1 tables
|
||||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
|
||||||
|
|
||||||
-- use streaming replication to enable metadata syncing
|
|
||||||
UPDATE pg_dist_partition SET repmodel='s' WHERE logicalrelid IN
|
|
||||||
('history'::regclass);
|
|
||||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||||
|
|
||||||
CREATE TABLE mx_table(a int);
|
CREATE TABLE mx_table(a int);
|
||||||
|
|
|
||||||
|
|
@ -225,14 +225,6 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
SELECT citus_internal_add_partition_metadata ('test_ref'::regclass, 'n', NULL, 0, 'c');
|
SELECT citus_internal_add_partition_metadata ('test_ref'::regclass, 'n', NULL, 0, 'c');
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
|
||||||
-- not-matching replication model for hash table
|
|
||||||
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
|
||||||
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
|
|
||||||
SET application_name to 'citus';
|
|
||||||
SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', 0, 't');
|
|
||||||
ROLLBACK;
|
|
||||||
|
|
||||||
|
|
||||||
-- add entry for super user table
|
-- add entry for super user table
|
||||||
\c - postgres - :worker_1_port
|
\c - postgres - :worker_1_port
|
||||||
SET search_path TO metadata_sync_helpers;
|
SET search_path TO metadata_sync_helpers;
|
||||||
|
|
|
||||||
|
|
@ -553,5 +553,4 @@ DROP TABLE range_table;
|
||||||
DROP TABLE none;
|
DROP TABLE none;
|
||||||
DROP TABLE ref;
|
DROP TABLE ref;
|
||||||
DROP TABLE local_table;
|
DROP TABLE local_table;
|
||||||
|
DROP FOREIGN TABLE table3_groupD CASCADE;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,8 @@
|
||||||
-- ===================================================================
|
-- ===================================================================
|
||||||
-- create test functions
|
-- create test functions
|
||||||
-- ===================================================================
|
-- ===================================================================
|
||||||
|
CREATE SCHEMA metadata_test;
|
||||||
|
SET search_path TO metadata_test;
|
||||||
|
|
||||||
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 540000;
|
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 540000;
|
||||||
|
|
||||||
|
|
@ -196,10 +197,13 @@ SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 3);
|
||||||
|
|
||||||
-- verify result of the get_shard_id_for_distribution_column
|
-- verify result of the get_shard_id_for_distribution_column
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
|
SET search_path TO metadata_test;
|
||||||
|
|
||||||
SELECT * FROM get_shardid_test_table1_540006;
|
SELECT * FROM get_shardid_test_table1_540006;
|
||||||
SELECT * FROM get_shardid_test_table1_540009;
|
SELECT * FROM get_shardid_test_table1_540009;
|
||||||
SELECT * FROM get_shardid_test_table1_540007;
|
SELECT * FROM get_shardid_test_table1_540007;
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
|
SET search_path TO metadata_test;
|
||||||
|
|
||||||
-- test non-existing value
|
-- test non-existing value
|
||||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 4);
|
SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 4);
|
||||||
|
|
@ -217,9 +221,12 @@ SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{d, e, f
|
||||||
|
|
||||||
-- verify result of the get_shard_id_for_distribution_column
|
-- verify result of the get_shard_id_for_distribution_column
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
|
SET search_path TO metadata_test;
|
||||||
|
|
||||||
SELECT * FROM get_shardid_test_table2_540013;
|
SELECT * FROM get_shardid_test_table2_540013;
|
||||||
SELECT * FROM get_shardid_test_table2_540011;
|
SELECT * FROM get_shardid_test_table2_540011;
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
|
SET search_path TO metadata_test;
|
||||||
|
|
||||||
-- test mismatching data type
|
-- test mismatching data type
|
||||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', 'a');
|
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', 'a');
|
||||||
|
|
@ -355,4 +362,5 @@ ORDER BY
|
||||||
types;$$);
|
types;$$);
|
||||||
|
|
||||||
-- clear unnecessary tables;
|
-- clear unnecessary tables;
|
||||||
DROP TABLE get_shardid_test_table1, get_shardid_test_table2, get_shardid_test_table3, get_shardid_test_table4, get_shardid_test_table5, events_table_count;
|
SET client_min_messages TO ERROR;
|
||||||
|
DROP SCHEMA metadata_test CASCADE;
|
||||||
|
|
|
||||||
|
|
@ -103,3 +103,6 @@ UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = :remotenewshardid AN
|
||||||
|
|
||||||
-- oops! we don't support repairing shards backed by foreign tables
|
-- oops! we don't support repairing shards backed by foreign tables
|
||||||
SELECT master_copy_shard_placement(:remotenewshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
|
SELECT master_copy_shard_placement(:remotenewshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
|
||||||
|
|
||||||
|
-- clean-up
|
||||||
|
DROP FOREIGN TABLE remote_engagements CASCADE;
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue