mirror of https://github.com/citusdata/citus.git
134 lines
5.3 KiB
Plaintext
134 lines
5.3 KiB
Plaintext
-- Tests for citus_copy_shard_placement, which can be used for adding replicas in statement-based replication
|
|
CREATE SCHEMA mcsp;
|
|
SET search_path TO mcsp;
|
|
SET citus.next_shard_id TO 8139000;
|
|
SET citus.shard_replication_factor TO 1;
|
|
CREATE TABLE ref_table(a int, b text unique);
|
|
SELECT create_reference_table('ref_table');
|
|
create_reference_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
CREATE TABLE data (
|
|
key text primary key,
|
|
value text not null,
|
|
check (value <> '')
|
|
);
|
|
CREATE INDEX ON data (value);
|
|
SELECT create_distributed_table('data','key');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
CREATE TABLE history (
|
|
key text not null,
|
|
t timestamptz not null,
|
|
value text not null
|
|
) PARTITION BY RANGE (t);
|
|
CREATE TABLE history_p1 PARTITION OF history FOR VALUES FROM ('2019-01-01') TO ('2020-01-01');
|
|
CREATE TABLE history_p2 PARTITION OF history FOR VALUES FROM ('2020-01-01') TO ('2021-01-01');
|
|
SELECT create_distributed_table('history','key');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- Mark tables as non-mx tables, in order to be able to test citus_copy_shard_placement
|
|
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid IN
|
|
('data'::regclass, 'history'::regclass);
|
|
INSERT INTO data VALUES ('key-1', 'value-1');
|
|
INSERT INTO data VALUES ('key-2', 'value-2');
|
|
INSERT INTO history VALUES ('key-1', '2020-02-01', 'old');
|
|
INSERT INTO history VALUES ('key-1', '2019-10-01', 'older');
|
|
-- verify we error out if no healthy placement exists at source
|
|
SELECT citus_copy_shard_placement(
|
|
get_shard_id_for_distribution_column('data', 'key-1'),
|
|
'localhost', :worker_1_port,
|
|
'localhost', :worker_2_port,
|
|
transfer_mode := 'block_writes');
|
|
ERROR: could not find placement matching "localhost:xxxxx"
|
|
HINT: Confirm the placement still exists and try again.
|
|
-- verify we error out if source and destination are the same
|
|
SELECT citus_copy_shard_placement(
|
|
get_shard_id_for_distribution_column('data', 'key-1'),
|
|
'localhost', :worker_2_port,
|
|
'localhost', :worker_2_port,
|
|
transfer_mode := 'block_writes');
|
|
ERROR: shard xxxxx already exists in the target node
|
|
-- verify we error out if target already contains a healthy placement
|
|
SELECT citus_copy_shard_placement(
|
|
(SELECT shardid FROM pg_dist_shard WHERE logicalrelid='ref_table'::regclass::oid),
|
|
'localhost', :worker_1_port,
|
|
'localhost', :worker_2_port,
|
|
transfer_mode := 'block_writes');
|
|
ERROR: shard xxxxx already exists in the target node
|
|
-- verify we error out if table has foreign key constraints
|
|
INSERT INTO ref_table SELECT 1, value FROM data;
|
|
ALTER TABLE data ADD CONSTRAINT distfk FOREIGN KEY (value) REFERENCES ref_table (b) MATCH FULL;
|
|
SELECT citus_copy_shard_placement(
|
|
get_shard_id_for_distribution_column('data', 'key-1'),
|
|
'localhost', :worker_2_port,
|
|
'localhost', :worker_1_port);
|
|
ERROR: cannot replicate shards with foreign keys
|
|
ALTER TABLE data DROP CONSTRAINT distfk;
|
|
-- replicate shard that contains key-1
|
|
SELECT citus_copy_shard_placement(
|
|
get_shard_id_for_distribution_column('data', 'key-1'),
|
|
'localhost', :worker_2_port,
|
|
'localhost', :worker_1_port,
|
|
transfer_mode := 'block_writes');
|
|
citus_copy_shard_placement
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- forcefully mark the old replica as inactive
|
|
UPDATE pg_dist_shard_placement SET shardstate = 3
|
|
WHERE shardid = get_shard_id_for_distribution_column('data', 'key-1') AND nodeport = :worker_2_port;
|
|
UPDATE pg_dist_shard_placement SET shardstate = 3
|
|
WHERE shardid = get_shard_id_for_distribution_column('history', 'key-1') AND nodeport = :worker_2_port;
|
|
-- should still have all data available thanks to new replica
|
|
SELECT count(*) FROM data;
|
|
count
|
|
---------------------------------------------------------------------
|
|
2
|
|
(1 row)
|
|
|
|
SELECT count(*) FROM history;
|
|
count
|
|
---------------------------------------------------------------------
|
|
2
|
|
(1 row)
|
|
|
|
-- test we can replicate MX tables
|
|
SET citus.shard_replication_factor TO 1;
|
|
-- metadata sync will succeed even if we have rep > 1 tables
|
|
INSERT INTO pg_catalog.pg_dist_object(classid, objid, objsubid) values('pg_class'::regclass::oid, 'public.articles'::regclass::oid, 0);
|
|
INSERT INTO pg_catalog.pg_dist_object(classid, objid, objsubid) values('pg_class'::regclass::oid, 'public.articles_single_shard'::regclass::oid, 0);
|
|
SET client_min_messages TO warning;
|
|
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
|
start_metadata_sync_to_node
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
RESET client_min_messages;
|
|
CREATE TABLE mx_table(a int);
|
|
SELECT create_distributed_table('mx_table', 'a');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT citus_copy_shard_placement(
|
|
get_shard_id_for_distribution_column('mx_table', '1'),
|
|
'localhost', :worker_1_port,
|
|
'localhost', :worker_2_port,
|
|
transfer_mode := 'block_writes');
|
|
ERROR: Table 'mx_table' is streaming replicated. Shards of streaming replicated tables cannot be copied
|
|
SET client_min_messages TO ERROR;
|
|
DROP TABLE mcsp.history;
|
|
DROP SCHEMA mcsp CASCADE;
|