citus/src/test/regress/expected/multi_replicate_reference_t...

729 lines
23 KiB
Plaintext

--
-- MULTI_REPLICATE_REFERENCE_TABLE
--
-- Tests that check the metadata returned by the master node.
SET citus.next_shard_id TO 1370000;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1370000;
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART 1370000;
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 1370000;
-- remove a node for testing purposes
CREATE TABLE tmp_shard_placement AS SELECT * FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port;
DELETE FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port;
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
--------------------
(1 row)
-- test adding new node with no reference tables
-- verify there is no node with nodeport = :worker_2_port before adding the node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
0
(1 row)
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
?column?
----------
1
(1 row)
-- verify node is added
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
1
(1 row)
-- verify nothing is replicated to the new node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
-- test adding new node with a reference table which does not have any healthy placement
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
--------------------
(1 row)
-- verify there is no node with nodeport = :worker_2_port before adding the node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
0
(1 row)
CREATE TABLE replicate_reference_table_unhealthy(column1 int);
SELECT create_reference_table('replicate_reference_table_unhealthy');
create_reference_table
------------------------
(1 row)
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1370000;
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
ERROR: could not find any healthy placement for shard 1370000
-- verify node is not added
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
0
(1 row)
-- verify nothing is replicated to the new node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
DROP TABLE replicate_reference_table_unhealthy;
-- test replicating a reference table when a new node added
CREATE TABLE replicate_reference_table_valid(column1 int);
SELECT create_reference_table('replicate_reference_table_valid');
create_reference_table
------------------------
(1 row)
-- status before master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_valid'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1370000 | 1 | 1 | 0
(1 row)
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_valid" to the node localhost:57638
?column?
----------
1
(1 row)
-- status after master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1370001 | 1 | 0 | localhost | 57638
(1 row)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_valid'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1370000 | 1 | 2 | 0
(1 row)
-- test add same node twice
-- status before master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1370001 | 1 | 0 | localhost | 57638
(1 row)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_valid'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1370000 | 1 | 2 | 0
(1 row)
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
?column?
----------
1
(1 row)
-- status after master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1370001 | 1 | 0 | localhost | 57638
(1 row)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_valid'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1370000 | 1 | 2 | 0
(1 row)
DROP TABLE replicate_reference_table_valid;
-- test replicating a reference table when a new node added in TRANSACTION + ROLLBACK
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
--------------------
(1 row)
CREATE TABLE replicate_reference_table_rollback(column1 int);
SELECT create_reference_table('replicate_reference_table_rollback');
create_reference_table
------------------------
(1 row)
-- status before master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_rollback'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1370001 | 1 | 1 | 0
(1 row)
BEGIN;
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_rollback" to the node localhost:57638
?column?
----------
1
(1 row)
ROLLBACK;
-- status after master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_rollback'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1370001 | 1 | 1 | 0
(1 row)
DROP TABLE replicate_reference_table_rollback;
-- test replicating a reference table when a new node added in TRANSACTION + COMMIT
CREATE TABLE replicate_reference_table_commit(column1 int);
SELECT create_reference_table('replicate_reference_table_commit');
create_reference_table
------------------------
(1 row)
-- status before master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_commit'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1370001 | 1 | 1 | 0
(1 row)
BEGIN;
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_commit" to the node localhost:57638
?column?
----------
1
(1 row)
COMMIT;
-- status after master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1370003 | 1 | 0 | localhost | 57638
(1 row)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_commit'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1370001 | 1 | 2 | 0
(1 row)
DROP TABLE replicate_reference_table_commit;
-- test adding new node + upgrading another hash distributed table to reference table + creating new reference table in TRANSACTION
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
--------------------
(1 row)
CREATE TABLE replicate_reference_table_reference_one(column1 int);
SELECT create_reference_table('replicate_reference_table_reference_one');
create_reference_table
------------------------
(1 row)
SET citus.shard_count TO 1;
SET citus.shard_replication_factor TO 1;
SET citus.replication_model TO 'streaming';
CREATE TABLE replicate_reference_table_hash(column1 int);
SELECT create_distributed_table('replicate_reference_table_hash', 'column1');
create_distributed_table
--------------------------
(1 row)
-- update replication model to statement-based replication since streaming replicated tables cannot be upgraded to reference tables
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='replicate_reference_table_hash'::regclass;
CREATE TABLE replicate_reference_table_reference_two(column1 int);
-- status before master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_reference_one'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1370002 | 1 | 1 | 0
(1 row)
SELECT
logicalrelid, partmethod, colocationid, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two')
ORDER BY logicalrelid;
logicalrelid | partmethod | colocationid | repmodel
-----------------------------------------+------------+--------------+----------
replicate_reference_table_reference_one | n | 1370002 | t
replicate_reference_table_hash | h | 1360004 | c
(2 rows)
BEGIN;
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_reference_one" to the node localhost:57638
?column?
----------
1
(1 row)
SELECT upgrade_to_reference_table('replicate_reference_table_hash');
NOTICE: Replicating reference table "replicate_reference_table_hash" to the node localhost:57638
upgrade_to_reference_table
----------------------------
(1 row)
SELECT create_reference_table('replicate_reference_table_reference_two');
create_reference_table
------------------------
(1 row)
COMMIT;
-- status after master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port
ORDER BY
shardid;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1370004 | 1 | 0 | localhost | 57638
1370005 | 1 | 0 | localhost | 57638
1370006 | 1 | 0 | localhost | 57638
(3 rows)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_reference_one'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1370002 | 1 | 2 | 0
(1 row)
SELECT
logicalrelid, partmethod, colocationid, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two')
ORDER BY
logicalrelid;
logicalrelid | partmethod | colocationid | repmodel
-----------------------------------------+------------+--------------+----------
replicate_reference_table_reference_one | n | 1370002 | t
replicate_reference_table_hash | n | 1370002 | t
replicate_reference_table_reference_two | n | 1370002 | t
(3 rows)
DROP TABLE replicate_reference_table_reference_one;
DROP TABLE replicate_reference_table_hash;
DROP TABLE replicate_reference_table_reference_two;
-- test inserting a value then adding a new node in a transaction
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
--------------------
(1 row)
CREATE TABLE replicate_reference_table_insert(column1 int);
SELECT create_reference_table('replicate_reference_table_insert');
create_reference_table
------------------------
(1 row)
BEGIN;
INSERT INTO replicate_reference_table_insert VALUES(1);
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_insert" to the node localhost:57638
ERROR: cannot open new connections after the first modification command within a transaction
ROLLBACK;
DROP TABLE replicate_reference_table_insert;
-- test COPY then adding a new node in a transaction
CREATE TABLE replicate_reference_table_copy(column1 int);
SELECT create_reference_table('replicate_reference_table_copy');
create_reference_table
------------------------
(1 row)
BEGIN;
COPY replicate_reference_table_copy FROM STDIN;
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_copy" to the node localhost:57638
ERROR: cannot open new connections after the first modification command within a transaction
ROLLBACK;
DROP TABLE replicate_reference_table_copy;
-- test executing DDL command then adding a new node in a transaction
CREATE TABLE replicate_reference_table_ddl(column1 int);
SELECT create_reference_table('replicate_reference_table_ddl');
create_reference_table
------------------------
(1 row)
BEGIN;
ALTER TABLE replicate_reference_table_ddl ADD column2 int;
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_ddl" to the node localhost:57638
ERROR: cannot open new connections after the first modification command within a transaction
ROLLBACK;
DROP TABLE replicate_reference_table_ddl;
-- test DROP table after adding new node in a transaction
CREATE TABLE replicate_reference_table_drop(column1 int);
SELECT create_reference_table('replicate_reference_table_drop');
create_reference_table
------------------------
(1 row)
-- status before master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_drop'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1370003 | 1 | 1 | 0
(1 row)
BEGIN;
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_drop" to the node localhost:57638
?column?
----------
1
(1 row)
DROP TABLE replicate_reference_table_drop;
COMMIT;
-- status after master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
SELECT * FROM pg_dist_colocation WHERE colocationid = 1370009;
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
(0 rows)
-- test adding a node while there is a reference table at another schema
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
--------------------
(1 row)
CREATE SCHEMA replicate_reference_table_schema;
CREATE TABLE replicate_reference_table_schema.table1(column1 int);
SELECT create_reference_table('replicate_reference_table_schema.table1');
create_reference_table
------------------------
(1 row)
-- status before master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_schema.table1'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1370004 | 1 | 1 | 0
(1 row)
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "table1" to the node localhost:57638
?column?
----------
1
(1 row)
-- status after master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1370011 | 1 | 0 | localhost | 57638
(1 row)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_schema.table1'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1370004 | 1 | 2 | 0
(1 row)
DROP TABLE replicate_reference_table_schema.table1;
DROP SCHEMA replicate_reference_table_schema CASCADE;
-- do some tests with inactive node
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
--------------------
(1 row)
CREATE TABLE initially_not_replicated_reference_table (key int);
SELECT create_reference_table('initially_not_replicated_reference_table');
create_reference_table
------------------------
(1 row)
SELECT 1 FROM master_add_inactive_node('localhost', :worker_2_port);
?column?
----------
1
(1 row)
-- we should see only one shard placements
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
shardid IN (SELECT
shardid
FROM
pg_dist_shard
WHERE
logicalrelid = 'initially_not_replicated_reference_table'::regclass)
ORDER BY 1,4,5;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1370012 | 1 | 0 | localhost | 57637
(1 row)
-- we should see the two shard placements after activation
SELECT 1 FROM master_activate_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "initially_not_replicated_reference_table" to the node localhost:57638
?column?
----------
1
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
shardid IN (SELECT
shardid
FROM
pg_dist_shard
WHERE
logicalrelid = 'initially_not_replicated_reference_table'::regclass)
ORDER BY 1,4,5;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1370012 | 1 | 0 | localhost | 57637
1370012 | 1 | 0 | localhost | 57638
(2 rows)
-- this should have no effect
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
?column?
----------
1
(1 row)
-- test adding an invalid node while we have reference tables to replicate
-- set client message level to ERROR to suppress OS-dependent host name resolution warnings
SET client_min_messages to ERROR;
SELECT master_add_node('invalid-node-name', 9999);
ERROR: failure on connection marked as essential: invalid-node-name:9999
SET client_min_messages to DEFAULT;
-- drop unnecassary tables
DROP TABLE initially_not_replicated_reference_table;
-- reload pg_dist_shard_placement table
INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement);
DROP TABLE tmp_shard_placement;
RESET citus.shard_replication_factor;
RESET citus.replication_model;