mirror of https://github.com/citusdata/citus.git
Deletes unnecessary test outputs pt2 (#6214)
parent
85305b2773
commit
9cfadd7965
|
@ -1,788 +0,0 @@
|
|||
--
|
||||
-- MULTI_COLOCATED_SHARD_REBALANCE
|
||||
--
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 13000000;
|
||||
SET citus.shard_count TO 6;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
-- create distributed tables
|
||||
CREATE TABLE table1_group1 ( id int PRIMARY KEY);
|
||||
SELECT create_distributed_table('table1_group1', 'id', 'hash');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE table2_group1 ( id int );
|
||||
SELECT create_distributed_table('table2_group1', 'id', 'hash');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SET citus.shard_count TO 8;
|
||||
CREATE TABLE table5_groupX ( id int );
|
||||
SELECT create_distributed_table('table5_groupX', 'id', 'hash');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE table6_append ( id int );
|
||||
SELECT master_create_distributed_table('table6_append', 'id', 'append');
|
||||
master_create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT master_create_empty_shard('table6_append');
|
||||
master_create_empty_shard
|
||||
---------------------------------------------------------------------
|
||||
13000020
|
||||
(1 row)
|
||||
|
||||
SELECT master_create_empty_shard('table6_append');
|
||||
master_create_empty_shard
|
||||
---------------------------------------------------------------------
|
||||
13000021
|
||||
(1 row)
|
||||
|
||||
-- Mark tables as non-mx tables, in order to be able to test master_copy_shard_placement
|
||||
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid IN
|
||||
('table1_group1'::regclass, 'table2_group1'::regclass, 'table5_groupX'::regclass);
|
||||
-- test copy
|
||||
-- test copying colocated shards
|
||||
-- status before shard copy
|
||||
SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport
|
||||
FROM
|
||||
pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp
|
||||
WHERE
|
||||
p.logicalrelid = s.logicalrelid AND
|
||||
s.shardid = sp.shardid AND
|
||||
colocationid = (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'table1_group1'::regclass)
|
||||
ORDER BY s.shardid, sp.nodeport;
|
||||
shardid | logicalrelid | nodeport
|
||||
---------------------------------------------------------------------
|
||||
13000000 | table1_group1 | 57637
|
||||
13000001 | table1_group1 | 57638
|
||||
13000002 | table1_group1 | 57637
|
||||
13000003 | table1_group1 | 57638
|
||||
13000004 | table1_group1 | 57637
|
||||
13000005 | table1_group1 | 57638
|
||||
13000006 | table2_group1 | 57637
|
||||
13000007 | table2_group1 | 57638
|
||||
13000008 | table2_group1 | 57637
|
||||
13000009 | table2_group1 | 57638
|
||||
13000010 | table2_group1 | 57637
|
||||
13000011 | table2_group1 | 57638
|
||||
(12 rows)
|
||||
|
||||
-- try to copy colocated shards without a replica identity
|
||||
SELECT master_copy_shard_placement(13000000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, false);
|
||||
master_copy_shard_placement
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- copy colocated shards
|
||||
SELECT master_copy_shard_placement(13000000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, false, 'force_logical');
|
||||
ERROR: shard xxxxx already exist in target placement
|
||||
-- status after shard copy
|
||||
SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport
|
||||
FROM
|
||||
pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp
|
||||
WHERE
|
||||
p.logicalrelid = s.logicalrelid AND
|
||||
s.shardid = sp.shardid AND
|
||||
colocationid = (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'table1_group1'::regclass)
|
||||
ORDER BY s.shardid, sp.nodeport;
|
||||
shardid | logicalrelid | nodeport
|
||||
---------------------------------------------------------------------
|
||||
13000000 | table1_group1 | 57637
|
||||
13000000 | table1_group1 | 57638
|
||||
13000001 | table1_group1 | 57638
|
||||
13000002 | table1_group1 | 57637
|
||||
13000003 | table1_group1 | 57638
|
||||
13000004 | table1_group1 | 57637
|
||||
13000005 | table1_group1 | 57638
|
||||
13000006 | table2_group1 | 57637
|
||||
13000006 | table2_group1 | 57638
|
||||
13000007 | table2_group1 | 57638
|
||||
13000008 | table2_group1 | 57637
|
||||
13000009 | table2_group1 | 57638
|
||||
13000010 | table2_group1 | 57637
|
||||
13000011 | table2_group1 | 57638
|
||||
(14 rows)
|
||||
|
||||
-- also connect worker to verify we successfully copied given shard (and other colocated shards)
|
||||
\c - - - :worker_2_port
|
||||
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.table1_group1_13000000'::regclass;
|
||||
Column | Type | Modifiers
|
||||
---------------------------------------------------------------------
|
||||
id | integer | not null
|
||||
(1 row)
|
||||
|
||||
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.table2_group1_13000006'::regclass;
|
||||
Column | Type | Modifiers
|
||||
---------------------------------------------------------------------
|
||||
id | integer |
|
||||
(1 row)
|
||||
|
||||
\c - - - :master_port
|
||||
-- copy colocated shards again to see error message
|
||||
SELECT master_copy_shard_placement(13000000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, false, 'force_logical');
|
||||
ERROR: shard xxxxx already exist in target placement
|
||||
-- test copying NOT colocated shard
|
||||
-- status before shard copy
|
||||
SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport
|
||||
FROM
|
||||
pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp
|
||||
WHERE
|
||||
p.logicalrelid = s.logicalrelid AND
|
||||
s.shardid = sp.shardid AND
|
||||
p.logicalrelid = 'table5_groupX'::regclass
|
||||
ORDER BY s.shardid, sp.nodeport;
|
||||
shardid | logicalrelid | nodeport
|
||||
---------------------------------------------------------------------
|
||||
13000012 | table5_groupx | 57637
|
||||
13000013 | table5_groupx | 57638
|
||||
13000014 | table5_groupx | 57637
|
||||
13000015 | table5_groupx | 57638
|
||||
13000016 | table5_groupx | 57637
|
||||
13000017 | table5_groupx | 57638
|
||||
13000018 | table5_groupx | 57637
|
||||
13000019 | table5_groupx | 57638
|
||||
(8 rows)
|
||||
|
||||
-- copy NOT colocated shard
|
||||
SELECT master_copy_shard_placement(13000012, 'localhost', :worker_1_port, 'localhost', :worker_2_port, false, 'force_logical');
|
||||
master_copy_shard_placement
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- status after shard copy
|
||||
SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport
|
||||
FROM
|
||||
pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp
|
||||
WHERE
|
||||
p.logicalrelid = s.logicalrelid AND
|
||||
s.shardid = sp.shardid AND
|
||||
p.logicalrelid = 'table5_groupX'::regclass
|
||||
ORDER BY s.shardid, sp.nodeport;
|
||||
shardid | logicalrelid | nodeport
|
||||
---------------------------------------------------------------------
|
||||
13000012 | table5_groupx | 57637
|
||||
13000012 | table5_groupx | 57638
|
||||
13000013 | table5_groupx | 57638
|
||||
13000014 | table5_groupx | 57637
|
||||
13000015 | table5_groupx | 57638
|
||||
13000016 | table5_groupx | 57637
|
||||
13000017 | table5_groupx | 57638
|
||||
13000018 | table5_groupx | 57637
|
||||
13000019 | table5_groupx | 57638
|
||||
(9 rows)
|
||||
|
||||
-- test copying shard in append distributed table
|
||||
-- status before shard copy
|
||||
SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport
|
||||
FROM
|
||||
pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp
|
||||
WHERE
|
||||
p.logicalrelid = s.logicalrelid AND
|
||||
s.shardid = sp.shardid AND
|
||||
p.logicalrelid = 'table6_append'::regclass
|
||||
ORDER BY s.shardid, sp.nodeport;
|
||||
shardid | logicalrelid | nodeport
|
||||
---------------------------------------------------------------------
|
||||
13000020 | table6_append | 57638
|
||||
13000021 | table6_append | 57637
|
||||
(2 rows)
|
||||
|
||||
-- copy shard in append distributed table
|
||||
SELECT master_copy_shard_placement(13000020, 'localhost', :worker_2_port, 'localhost', :worker_1_port, false, 'force_logical');
|
||||
master_copy_shard_placement
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- status after shard copy
|
||||
SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport
|
||||
FROM
|
||||
pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp
|
||||
WHERE
|
||||
p.logicalrelid = s.logicalrelid AND
|
||||
s.shardid = sp.shardid AND
|
||||
p.logicalrelid = 'table6_append'::regclass
|
||||
ORDER BY s.shardid, sp.nodeport;
|
||||
shardid | logicalrelid | nodeport
|
||||
---------------------------------------------------------------------
|
||||
13000020 | table6_append | 57637
|
||||
13000020 | table6_append | 57638
|
||||
13000021 | table6_append | 57637
|
||||
(3 rows)
|
||||
|
||||
-- test move
|
||||
-- test moving colocated shards
|
||||
-- status before shard move
|
||||
SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport
|
||||
FROM
|
||||
pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp
|
||||
WHERE
|
||||
p.logicalrelid = s.logicalrelid AND
|
||||
s.shardid = sp.shardid AND
|
||||
colocationid = (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'table1_group1'::regclass)
|
||||
ORDER BY s.shardid, sp.nodeport;
|
||||
shardid | logicalrelid | nodeport
|
||||
---------------------------------------------------------------------
|
||||
13000000 | table1_group1 | 57637
|
||||
13000000 | table1_group1 | 57638
|
||||
13000001 | table1_group1 | 57638
|
||||
13000002 | table1_group1 | 57637
|
||||
13000003 | table1_group1 | 57638
|
||||
13000004 | table1_group1 | 57637
|
||||
13000005 | table1_group1 | 57638
|
||||
13000006 | table2_group1 | 57637
|
||||
13000006 | table2_group1 | 57638
|
||||
13000007 | table2_group1 | 57638
|
||||
13000008 | table2_group1 | 57637
|
||||
13000009 | table2_group1 | 57638
|
||||
13000010 | table2_group1 | 57637
|
||||
13000011 | table2_group1 | 57638
|
||||
(14 rows)
|
||||
|
||||
-- move colocated shards
|
||||
SELECT master_move_shard_placement(13000001, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'force_logical');
|
||||
master_move_shard_placement
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- status after shard move
|
||||
SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport
|
||||
FROM
|
||||
pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp
|
||||
WHERE
|
||||
p.logicalrelid = s.logicalrelid AND
|
||||
s.shardid = sp.shardid AND
|
||||
colocationid = (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'table1_group1'::regclass)
|
||||
ORDER BY s.shardid, sp.nodeport;
|
||||
shardid | logicalrelid | nodeport
|
||||
---------------------------------------------------------------------
|
||||
13000000 | table1_group1 | 57637
|
||||
13000000 | table1_group1 | 57638
|
||||
13000001 | table1_group1 | 57637
|
||||
13000002 | table1_group1 | 57637
|
||||
13000003 | table1_group1 | 57638
|
||||
13000004 | table1_group1 | 57637
|
||||
13000005 | table1_group1 | 57638
|
||||
13000006 | table2_group1 | 57637
|
||||
13000006 | table2_group1 | 57638
|
||||
13000007 | table2_group1 | 57637
|
||||
13000008 | table2_group1 | 57637
|
||||
13000009 | table2_group1 | 57638
|
||||
13000010 | table2_group1 | 57637
|
||||
13000011 | table2_group1 | 57638
|
||||
(14 rows)
|
||||
|
||||
-- also connect worker to verify we successfully moved given shard (and other colocated shards)
|
||||
\c - - - :worker_1_port
|
||||
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.table1_group1_13000001'::regclass;
|
||||
Column | Type | Modifiers
|
||||
---------------------------------------------------------------------
|
||||
id | integer | not null
|
||||
(1 row)
|
||||
|
||||
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.table2_group1_13000007'::regclass;
|
||||
Column | Type | Modifiers
|
||||
---------------------------------------------------------------------
|
||||
id | integer |
|
||||
(1 row)
|
||||
|
||||
\c - - - :master_port
|
||||
-- test moving NOT colocated shard
|
||||
-- status before shard move
|
||||
SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport
|
||||
FROM
|
||||
pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp
|
||||
WHERE
|
||||
p.logicalrelid = s.logicalrelid AND
|
||||
s.shardid = sp.shardid AND
|
||||
p.logicalrelid = 'table5_groupX'::regclass
|
||||
ORDER BY s.shardid, sp.nodeport;
|
||||
shardid | logicalrelid | nodeport
|
||||
---------------------------------------------------------------------
|
||||
13000012 | table5_groupx | 57637
|
||||
13000012 | table5_groupx | 57638
|
||||
13000013 | table5_groupx | 57638
|
||||
13000014 | table5_groupx | 57637
|
||||
13000015 | table5_groupx | 57638
|
||||
13000016 | table5_groupx | 57637
|
||||
13000017 | table5_groupx | 57638
|
||||
13000018 | table5_groupx | 57637
|
||||
13000019 | table5_groupx | 57638
|
||||
(9 rows)
|
||||
|
||||
-- move NOT colocated shard
|
||||
SELECT master_move_shard_placement(13000013, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'force_logical');
|
||||
master_move_shard_placement
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- status after shard move
|
||||
SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport
|
||||
FROM
|
||||
pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp
|
||||
WHERE
|
||||
p.logicalrelid = s.logicalrelid AND
|
||||
s.shardid = sp.shardid AND
|
||||
p.logicalrelid = 'table5_groupX'::regclass
|
||||
ORDER BY s.shardid, sp.nodeport;
|
||||
shardid | logicalrelid | nodeport
|
||||
---------------------------------------------------------------------
|
||||
13000012 | table5_groupx | 57637
|
||||
13000012 | table5_groupx | 57638
|
||||
13000013 | table5_groupx | 57637
|
||||
13000014 | table5_groupx | 57637
|
||||
13000015 | table5_groupx | 57638
|
||||
13000016 | table5_groupx | 57637
|
||||
13000017 | table5_groupx | 57638
|
||||
13000018 | table5_groupx | 57637
|
||||
13000019 | table5_groupx | 57638
|
||||
(9 rows)
|
||||
|
||||
-- test moving shard in append distributed table
|
||||
-- status before shard move
|
||||
SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport
|
||||
FROM
|
||||
pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp
|
||||
WHERE
|
||||
p.logicalrelid = s.logicalrelid AND
|
||||
s.shardid = sp.shardid AND
|
||||
p.logicalrelid = 'table6_append'::regclass
|
||||
ORDER BY s.shardid, sp.nodeport;
|
||||
shardid | logicalrelid | nodeport
|
||||
---------------------------------------------------------------------
|
||||
13000020 | table6_append | 57637
|
||||
13000020 | table6_append | 57638
|
||||
13000021 | table6_append | 57637
|
||||
(3 rows)
|
||||
|
||||
-- move shard in append distributed table
|
||||
SELECT master_move_shard_placement(13000021, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical');
|
||||
master_move_shard_placement
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- status after shard move
|
||||
SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport
|
||||
FROM
|
||||
pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp
|
||||
WHERE
|
||||
p.logicalrelid = s.logicalrelid AND
|
||||
s.shardid = sp.shardid AND
|
||||
p.logicalrelid = 'table6_append'::regclass
|
||||
ORDER BY s.shardid, sp.nodeport;
|
||||
shardid | logicalrelid | nodeport
|
||||
---------------------------------------------------------------------
|
||||
13000020 | table6_append | 57637
|
||||
13000020 | table6_append | 57638
|
||||
13000021 | table6_append | 57638
|
||||
(3 rows)
|
||||
|
||||
-- try to move shard from wrong node
|
||||
SELECT master_move_shard_placement(13000021, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical');
|
||||
ERROR: could not find placement matching "localhost:xxxxx"
|
||||
HINT: Confirm the placement still exists and try again.
|
||||
-- test shard move with foreign constraints
|
||||
DROP TABLE IF EXISTS table1_group1, table2_group1;
|
||||
SET citus.shard_count TO 6;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
-- create distributed tables
|
||||
CREATE TABLE table1_group1 ( id int PRIMARY KEY);
|
||||
SELECT create_distributed_table('table1_group1', 'id', 'hash');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE table2_group1 ( id int, table1_id int, FOREIGN KEY(table1_id) REFERENCES table1_group1(id));
|
||||
SELECT create_distributed_table('table2_group1', 'table1_id', 'hash');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Mark the tables as non-mx tables
|
||||
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid IN
|
||||
('table1_group1'::regclass, 'table2_group1'::regclass);
|
||||
-- status before shard rebalance
|
||||
SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport
|
||||
FROM
|
||||
pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp
|
||||
WHERE
|
||||
p.logicalrelid = s.logicalrelid AND
|
||||
s.shardid = sp.shardid AND
|
||||
colocationid = (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'table1_group1'::regclass)
|
||||
ORDER BY s.shardid, sp.nodeport;
|
||||
shardid | logicalrelid | nodeport
|
||||
---------------------------------------------------------------------
|
||||
13000022 | table1_group1 | 57637
|
||||
13000023 | table1_group1 | 57638
|
||||
13000024 | table1_group1 | 57637
|
||||
13000025 | table1_group1 | 57638
|
||||
13000026 | table1_group1 | 57637
|
||||
13000027 | table1_group1 | 57638
|
||||
13000028 | table2_group1 | 57637
|
||||
13000029 | table2_group1 | 57638
|
||||
13000030 | table2_group1 | 57637
|
||||
13000031 | table2_group1 | 57638
|
||||
13000032 | table2_group1 | 57637
|
||||
13000033 | table2_group1 | 57638
|
||||
(12 rows)
|
||||
|
||||
SELECT master_move_shard_placement(13000022, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'block_writes');
|
||||
master_move_shard_placement
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- status after shard rebalance
|
||||
SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport
|
||||
FROM
|
||||
pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp
|
||||
WHERE
|
||||
p.logicalrelid = s.logicalrelid AND
|
||||
s.shardid = sp.shardid AND
|
||||
colocationid = (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'table1_group1'::regclass)
|
||||
ORDER BY s.shardid, sp.nodeport;
|
||||
shardid | logicalrelid | nodeport
|
||||
---------------------------------------------------------------------
|
||||
13000022 | table1_group1 | 57638
|
||||
13000023 | table1_group1 | 57638
|
||||
13000024 | table1_group1 | 57637
|
||||
13000025 | table1_group1 | 57638
|
||||
13000026 | table1_group1 | 57637
|
||||
13000027 | table1_group1 | 57638
|
||||
13000028 | table2_group1 | 57638
|
||||
13000029 | table2_group1 | 57638
|
||||
13000030 | table2_group1 | 57637
|
||||
13000031 | table2_group1 | 57638
|
||||
13000032 | table2_group1 | 57637
|
||||
13000033 | table2_group1 | 57638
|
||||
(12 rows)
|
||||
|
||||
-- also connect worker to verify we successfully moved given shard (and other colocated shards)
|
||||
\c - - - :worker_2_port
|
||||
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.table1_group1_13000022'::regclass;
|
||||
Column | Type | Modifiers
|
||||
---------------------------------------------------------------------
|
||||
id | integer | not null
|
||||
(1 row)
|
||||
|
||||
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.table2_group1_13000028'::regclass;
|
||||
Column | Type | Modifiers
|
||||
---------------------------------------------------------------------
|
||||
id | integer |
|
||||
table1_id | integer |
|
||||
(2 rows)
|
||||
|
||||
-- make sure that we've created the foreign keys
|
||||
SELECT "Constraint", "Definition" FROM table_fkeys;
|
||||
Constraint | Definition
|
||||
---------------------------------------------------------------------
|
||||
pg_dist_poolinfo_nodeid_fkey | FOREIGN KEY (nodeid) REFERENCES pg_dist_node(nodeid)
|
||||
table2_group1_table1_id_fkey_13000028 | FOREIGN KEY (table1_id) REFERENCES table1_group1_13000022(id)
|
||||
table2_group1_table1_id_fkey_13000029 | FOREIGN KEY (table1_id) REFERENCES table1_group1_13000023(id)
|
||||
table2_group1_table1_id_fkey_13000031 | FOREIGN KEY (table1_id) REFERENCES table1_group1_13000025(id)
|
||||
table2_group1_table1_id_fkey_13000033 | FOREIGN KEY (table1_id) REFERENCES table1_group1_13000027(id)
|
||||
test_constraint_1230019 | FOREIGN KEY (l_orderkey) REFERENCES tenant_isolation.orders_streaming_1230016(o_orderkey)
|
||||
test_constraint_1230020 | FOREIGN KEY (l_orderkey) REFERENCES tenant_isolation.orders_streaming_1230017(o_orderkey)
|
||||
test_constraint_1230021 | FOREIGN KEY (l_orderkey) REFERENCES tenant_isolation.orders_streaming_1230018(o_orderkey)
|
||||
test_constraint_1230025 | FOREIGN KEY (l_orderkey) REFERENCES tenant_isolation.orders_streaming_1230022(o_orderkey)
|
||||
test_constraint_1230026 | FOREIGN KEY (l_orderkey) REFERENCES tenant_isolation.orders_streaming_1230023(o_orderkey)
|
||||
test_constraint_1230027 | FOREIGN KEY (l_orderkey) REFERENCES tenant_isolation.orders_streaming_1230024(o_orderkey)
|
||||
(11 rows)
|
||||
|
||||
\c - - - :master_port
|
||||
-- test shard copy with foreign constraints
|
||||
-- we expect it to error out because we do not support foreign constraints with replication factor > 1
|
||||
SELECT master_copy_shard_placement(13000022, 'localhost', :worker_2_port, 'localhost', :worker_1_port, false);
|
||||
ERROR: cannot replicate shards with foreign keys
|
||||
-- lets also test that master_move_shard_placement doesn't break serials
|
||||
CREATE TABLE serial_move_test (key int, other_val serial);
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SELECT create_distributed_table('serial_move_test', 'key');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- key 15 goes to shard xxxxx
|
||||
INSERT INTO serial_move_test (key) VALUES (15) RETURNING *;
|
||||
key | other_val
|
||||
---------------------------------------------------------------------
|
||||
15 | 1
|
||||
(1 row)
|
||||
|
||||
INSERT INTO serial_move_test (key) VALUES (15) RETURNING *;
|
||||
key | other_val
|
||||
---------------------------------------------------------------------
|
||||
15 | 2
|
||||
(1 row)
|
||||
|
||||
-- confirm the shard id
|
||||
SELECT * FROM run_command_on_placements('serial_move_test', 'SELECT DISTINCT key FROM %s WHERE key = 15') WHERE result = '15' AND shardid = 13000034;
|
||||
nodename | nodeport | shardid | success | result
|
||||
---------------------------------------------------------------------
|
||||
localhost | 57637 | 13000034 | t | 15
|
||||
(1 row)
|
||||
|
||||
SELECT master_move_shard_placement(13000034, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical');
|
||||
master_move_shard_placement
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- confirm the successfull move
|
||||
SELECT * FROM run_command_on_placements('serial_move_test', 'SELECT DISTINCT key FROM %s WHERE key = 15') WHERE result = '15' AND shardid = 13000034;
|
||||
nodename | nodeport | shardid | success | result
|
||||
---------------------------------------------------------------------
|
||||
localhost | 57638 | 13000034 | t | 15
|
||||
(1 row)
|
||||
|
||||
-- finally show that serials work fine afterwards
|
||||
INSERT INTO serial_move_test (key) VALUES (15) RETURNING *;
|
||||
key | other_val
|
||||
---------------------------------------------------------------------
|
||||
15 | 3
|
||||
(1 row)
|
||||
|
||||
INSERT INTO serial_move_test (key) VALUES (15) RETURNING *;
|
||||
key | other_val
|
||||
---------------------------------------------------------------------
|
||||
15 | 4
|
||||
(1 row)
|
||||
|
||||
-- lets do some failure testing
|
||||
CREATE TABLE logical_failure_test (key int);
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.shard_count TO 4;
|
||||
SELECT create_distributed_table('logical_failure_test', 'key');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- ensure that the shard is created for this user
|
||||
\c - - - :worker_2_port
|
||||
\dt logical_failure_test_13000038
|
||||
List of relations
|
||||
Schema | Name | Type | Owner
|
||||
---------------------------------------------------------------------
|
||||
public | logical_failure_test_13000038 | table | postgres
|
||||
(1 row)
|
||||
|
||||
DROP TABLE logical_failure_test_13000038;
|
||||
-- should fail since the command wouldn't be able to connect to the worker_1
|
||||
\c - - - :master_port
|
||||
SELECT master_move_shard_placement(13000038, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'force_logical');
|
||||
ERROR: could not copy table "logical_failure_test_13000038" from "localhost:xxxxx"
|
||||
CONTEXT: while executing command on localhost:xxxxx
|
||||
DROP TABLE logical_failure_test;
|
||||
-- lets test the logical replication modes
|
||||
CREATE TABLE test_with_pkey (key int PRIMARY KEY, value int NOT NULL);
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.shard_count TO 4;
|
||||
SELECT create_distributed_table('test_with_pkey', 'key', colocate_with => 'none');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- should succeed since there is a replica identity defined
|
||||
SELECT master_move_shard_placement(13000042, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
|
||||
master_move_shard_placement
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- should succeed since we still have a replica identity
|
||||
ALTER TABLE test_with_pkey REPLICA IDENTITY FULL;
|
||||
SELECT master_move_shard_placement(13000042, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'auto');
|
||||
master_move_shard_placement
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- this time should fail since we don't have replica identity any more
|
||||
ALTER TABLE test_with_pkey REPLICA IDENTITY NOTHING;
|
||||
SELECT master_move_shard_placement(13000042, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'auto');
|
||||
master_move_shard_placement
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- should succeed since we still have a replica identity
|
||||
ALTER TABLE test_with_pkey REPLICA IDENTITY USING INDEX test_with_pkey_pkey;
|
||||
SELECT master_move_shard_placement(13000042, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
|
||||
ERROR: could not find placement matching "localhost:xxxxx"
|
||||
HINT: Confirm the placement still exists and try again.
|
||||
-- one final test with shard_transfer_mode auto
|
||||
CREATE UNIQUE INDEX req_rep_idx ON test_with_pkey(key, value);
|
||||
ALTER TABLE test_with_pkey REPLICA IDENTITY USING INDEX req_rep_idx;
|
||||
SELECT master_move_shard_placement(13000042, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'auto');
|
||||
master_move_shard_placement
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
ALTER TABLE test_with_pkey REPLICA IDENTITY NOTHING;
|
||||
SELECT master_move_shard_placement(13000042, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical');
|
||||
master_move_shard_placement
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- should succeed but not use logical replication
|
||||
ALTER TABLE test_with_pkey REPLICA IDENTITY NOTHING;
|
||||
SET client_min_messages TO DEBUG1;
|
||||
SELECT master_move_shard_placement(13000042, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes');
|
||||
DEBUG: table "test_with_pkey_13000042" does not exist, skipping
|
||||
DETAIL: NOTICE from localhost:xxxxx
|
||||
master_move_shard_placement
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SET client_min_messages TO DEFAULT;
|
||||
-- we don't support multiple shard moves in a single transaction
|
||||
SELECT
|
||||
master_move_shard_placement(shardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode:='force_logical')
|
||||
FROM
|
||||
pg_dist_shard_placement where nodeport = :worker_1_port AND
|
||||
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_with_pkey'::regclass);
|
||||
master_move_shard_placement
|
||||
---------------------------------------------------------------------
|
||||
|
||||
|
||||
(2 rows)
|
||||
|
||||
-- similar test with explicit transaction block
|
||||
BEGIN;
|
||||
|
||||
SELECT master_move_shard_placement(13000042, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode:='force_logical');
|
||||
ERROR: could not find placement matching "localhost:xxxxx"
|
||||
HINT: Confirm the placement still exists and try again.
|
||||
SELECT master_move_shard_placement(13000044, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode:='force_logical');
|
||||
ERROR: current transaction is aborted, commands ignored until end of transaction block
|
||||
COMMIT;
|
||||
-- we do support the same with block writes
|
||||
SELECT
|
||||
master_move_shard_placement(shardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode:='block_writes')
|
||||
FROM
|
||||
pg_dist_shard_placement where nodeport = :worker_1_port AND
|
||||
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_with_pkey'::regclass);
|
||||
master_move_shard_placement
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
-- we should be able to move shard placements after COMMIT/ABORT
|
||||
BEGIN;
|
||||
|
||||
SELECT master_move_shard_placement(13000043, 'localhost', :worker_2_port, 'localhost', :worker_1_port, shard_transfer_mode:='force_logical');
|
||||
master_move_shard_placement
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
COMMIT;
|
||||
SELECT master_move_shard_placement(13000045, 'localhost', :worker_2_port, 'localhost', :worker_1_port, shard_transfer_mode:='force_logical');
|
||||
master_move_shard_placement
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
BEGIN;
|
||||
|
||||
SELECT master_move_shard_placement(13000043, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode:='force_logical');
|
||||
master_move_shard_placement
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
ABORT;
|
||||
SELECT master_move_shard_placement(13000045, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode:='force_logical');
|
||||
master_move_shard_placement
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- we should be able to move shard placements of partitioend tables
|
||||
CREATE SCHEMA move_partitions;
|
||||
CREATE TABLE move_partitions.events (
|
||||
id serial,
|
||||
t timestamptz default now(),
|
||||
payload text
|
||||
)
|
||||
PARTITION BY RANGE(t);
|
||||
ERROR: syntax error at or near "PARTITION"
|
||||
SET citus.shard_count TO 6;
|
||||
SELECT create_distributed_table('move_partitions.events', 'id', colocate_with := 'none');
|
||||
ERROR: relation "move_partitions.events" does not exist
|
||||
CREATE TABLE move_partitions.events_1 PARTITION OF move_partitions.events
|
||||
FOR VALUES FROM ('2015-01-01') TO ('2016-01-01');
|
||||
ERROR: syntax error at or near "PARTITION"
|
||||
INSERT INTO move_partitions.events (t, payload)
|
||||
SELECT '2015-01-01'::date + (interval '1 day' * s), s FROM generate_series(1, 100) s;
|
||||
ERROR: relation "move_partitions.events" does not exist
|
||||
SELECT count(*) FROM move_partitions.events;
|
||||
ERROR: relation "move_partitions.events" does not exist
|
||||
-- try to move automatically
|
||||
SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port)
|
||||
FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid)
|
||||
WHERE logicalrelid = 'move_partitions.events'::regclass AND nodeport = :worker_2_port
|
||||
ORDER BY shardid LIMIT 1;
|
||||
ERROR: relation "move_partitions.events" does not exist
|
||||
-- force logical replication
|
||||
SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'force_logical')
|
||||
FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid)
|
||||
WHERE logicalrelid = 'move_partitions.events'::regclass AND nodeport = :worker_2_port
|
||||
ORDER BY shardid LIMIT 1;
|
||||
ERROR: relation "move_partitions.events" does not exist
|
||||
SELECT count(*) FROM move_partitions.events;
|
||||
ERROR: relation "move_partitions.events" does not exist
|
||||
-- add a primary key to the partition
|
||||
ALTER TABLE move_partitions.events_1 ADD CONSTRAINT e_1_pk PRIMARY KEY (id);
|
||||
ERROR: relation "move_partitions.events_1" does not exist
|
||||
-- should be able to move automatically now
|
||||
SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port)
|
||||
FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid)
|
||||
WHERE logicalrelid = 'move_partitions.events'::regclass AND nodeport = :worker_2_port
|
||||
ORDER BY shardid LIMIT 1;
|
||||
ERROR: relation "move_partitions.events" does not exist
|
||||
SELECT count(*) FROM move_partitions.events;
|
||||
ERROR: relation "move_partitions.events" does not exist
|
||||
-- should also be able to move with block writes
|
||||
SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes')
|
||||
FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid)
|
||||
WHERE logicalrelid = 'move_partitions.events'::regclass AND nodeport = :worker_2_port
|
||||
ORDER BY shardid LIMIT 1;
|
||||
ERROR: relation "move_partitions.events" does not exist
|
||||
SELECT count(*) FROM move_partitions.events;
|
||||
ERROR: relation "move_partitions.events" does not exist
|
||||
-- should have moved all shards to node 1 (2*6 = 12)
|
||||
SELECT count(*)
|
||||
FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid)
|
||||
WHERE logicalrelid::text LIKE 'move_partitions.events%' AND nodeport = :worker_1_port;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
DROP TABLE move_partitions.events;
|
||||
ERROR: table "events" does not exist
|
||||
-- set back to the defaults and drop the table
|
||||
SET client_min_messages TO DEFAULT;
|
||||
DROP TABLE test_with_pkey;
|
|
@ -1,265 +0,0 @@
|
|||
--
|
||||
-- multi function in join queries aims to test the function calls that are
|
||||
-- used in joins.
|
||||
--
|
||||
-- These functions are supposed to be executed on the worker and to ensure
|
||||
-- that we wrap those functions inside (SELECT * FROM fnc()) sub queries.
|
||||
--
|
||||
-- We do not yet support those functions that:
|
||||
-- - have lateral joins
|
||||
-- - have WITH ORDINALITY clause
|
||||
-- - are user-defined and immutable
|
||||
CREATE SCHEMA functions_in_joins;
|
||||
SET search_path TO 'functions_in_joins';
|
||||
SET citus.next_shard_id TO 2500000;
|
||||
SET citus.shard_replication_factor to 1;
|
||||
CREATE TABLE table1 (id int, data int);
|
||||
SELECT create_distributed_table('table1','id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO table1
|
||||
SELECT x, x*x
|
||||
from generate_series(1, 100) as f (x);
|
||||
-- Verbose messages for observing the subqueries that wrapped function calls
|
||||
SET client_min_messages TO DEBUG1;
|
||||
-- Check joins on a sequence
|
||||
CREATE SEQUENCE numbers;
|
||||
SELECT * FROM table1 JOIN nextval('numbers') n ON (id = n) ORDER BY id ASC;
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT n FROM nextval('functions_in_joins.numbers'::regclass) n(n)
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, table1.data, n.n FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.n FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(n bigint)) n ON ((table1.id OPERATOR(pg_catalog.=) n.n))) ORDER BY table1.id
|
||||
id | data | n
|
||||
---------------------------------------------------------------------
|
||||
1 | 1 | 1
|
||||
(1 row)
|
||||
|
||||
-- Check joins of a function that returns a single integer
|
||||
CREATE FUNCTION add(integer, integer) RETURNS integer
|
||||
AS 'SELECT $1 + $2;'
|
||||
LANGUAGE SQL;
|
||||
DEBUG: switching to sequential query execution mode
|
||||
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
|
||||
SELECT * FROM table1 JOIN add(3,5) sum ON (id = sum) ORDER BY id ASC;
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT sum FROM functions_in_joins.add(3, 5) sum(sum)
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, table1.data, sum.sum FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum integer)) sum ON ((table1.id OPERATOR(pg_catalog.=) sum.sum))) ORDER BY table1.id
|
||||
id | data | sum
|
||||
---------------------------------------------------------------------
|
||||
8 | 64 | 8
|
||||
(1 row)
|
||||
|
||||
-- Check join of plpgsql functions
|
||||
-- a function returning a single integer
|
||||
CREATE OR REPLACE FUNCTION increment(i integer) RETURNS integer AS $$
|
||||
BEGIN
|
||||
RETURN i + 1;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
DEBUG: switching to sequential query execution mode
|
||||
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
|
||||
SELECT * FROM table1 JOIN increment(2) val ON (id = val) ORDER BY id ASC;
|
||||
DEBUG: function does not have co-located tables
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT val FROM functions_in_joins.increment(2) val(val)
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, table1.data, val.val FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.val FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(val integer)) val ON ((table1.id OPERATOR(pg_catalog.=) val.val))) ORDER BY table1.id
|
||||
id | data | val
|
||||
---------------------------------------------------------------------
|
||||
3 | 9 | 3
|
||||
(1 row)
|
||||
|
||||
-- a function that returns a set of integers
|
||||
-- Block distributing function as we have tests below to test it locally
|
||||
SET citus.enable_metadata_sync TO OFF;
|
||||
CREATE OR REPLACE FUNCTION next_k_integers(IN first_value INTEGER,
|
||||
IN k INTEGER DEFAULT 3,
|
||||
OUT result INTEGER)
|
||||
RETURNS SETOF INTEGER AS $$
|
||||
BEGIN
|
||||
RETURN QUERY SELECT x FROM generate_series(first_value, first_value+k-1) f(x);
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
RESET citus.enable_metadata_sync;
|
||||
SELECT *
|
||||
FROM table1 JOIN next_k_integers(3,2) next_integers ON (id = next_integers.result)
|
||||
ORDER BY id ASC;
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT result FROM functions_in_joins.next_k_integers(3, 2) next_integers(result)
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, table1.data, next_integers.result FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.result FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(result integer)) next_integers ON ((table1.id OPERATOR(pg_catalog.=) next_integers.result))) ORDER BY table1.id
|
||||
id | data | result
|
||||
---------------------------------------------------------------------
|
||||
3 | 9 | 3
|
||||
4 | 16 | 4
|
||||
(2 rows)
|
||||
|
||||
-- a function returning set of records
|
||||
CREATE FUNCTION get_set_of_records() RETURNS SETOF RECORD AS $cmd$
|
||||
SELECT x, x+1 FROM generate_series(0,4) f(x)
|
||||
$cmd$
|
||||
LANGUAGE SQL;
|
||||
DEBUG: switching to sequential query execution mode
|
||||
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
|
||||
SELECT * FROM table1 JOIN get_set_of_records() AS t2(x int, y int) ON (id = x) ORDER BY id ASC;
|
||||
DEBUG: function does not have co-located tables
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT x, y FROM functions_in_joins.get_set_of_records() t2(x integer, y integer)
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, table1.data, t2.x, t2.y FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) t2 ON ((table1.id OPERATOR(pg_catalog.=) t2.x))) ORDER BY table1.id
|
||||
id | data | x | y
|
||||
---------------------------------------------------------------------
|
||||
1 | 1 | 1 | 2
|
||||
2 | 4 | 2 | 3
|
||||
3 | 9 | 3 | 4
|
||||
4 | 16 | 4 | 5
|
||||
(4 rows)
|
||||
|
||||
-- a function returning table
|
||||
CREATE FUNCTION dup(int) RETURNS TABLE(f1 int, f2 text)
|
||||
AS $$ SELECT $1, CAST($1 AS text) || ' is text' $$
|
||||
LANGUAGE SQL;
|
||||
DEBUG: switching to sequential query execution mode
|
||||
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
|
||||
SELECT f.* FROM table1 t JOIN dup(32) f ON (f1 = id);
|
||||
DEBUG: function does not have co-located tables
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT f1, f2 FROM functions_in_joins.dup(32) f(f1, f2)
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT f.f1, f.f2 FROM (functions_in_joins.table1 t JOIN (SELECT intermediate_result.f1, intermediate_result.f2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(f1 integer, f2 text)) f ON ((f.f1 OPERATOR(pg_catalog.=) t.id)))
|
||||
f1 | f2
|
||||
---------------------------------------------------------------------
|
||||
32 | 32 is text
|
||||
(1 row)
|
||||
|
||||
-- a stable function
|
||||
CREATE OR REPLACE FUNCTION the_minimum_id()
|
||||
RETURNS INTEGER STABLE AS 'SELECT min(id) FROM table1' LANGUAGE SQL;
|
||||
DEBUG: switching to sequential query execution mode
|
||||
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
|
||||
SELECT * FROM table1 JOIN the_minimum_id() min_id ON (id = min_id);
|
||||
DEBUG: function does not have co-located tables
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT min_id FROM functions_in_joins.the_minimum_id() min_id(min_id)
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, table1.data, min_id.min_id FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.min_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(min_id integer)) min_id ON ((table1.id OPERATOR(pg_catalog.=) min_id.min_id)))
|
||||
id | data | min_id
|
||||
---------------------------------------------------------------------
|
||||
1 | 1 | 1
|
||||
(1 row)
|
||||
|
||||
-- a built-in immutable function
|
||||
SELECT * FROM table1 JOIN abs(100) as hundred ON (id = hundred) ORDER BY id ASC;
|
||||
id | data | hundred
|
||||
---------------------------------------------------------------------
|
||||
100 | 10000 | 100
|
||||
(1 row)
|
||||
|
||||
-- function joins inside a CTE
|
||||
WITH next_row_to_process AS (
|
||||
SELECT * FROM table1 JOIN nextval('numbers') n ON (id = n)
|
||||
)
|
||||
SELECT *
|
||||
FROM table1, next_row_to_process
|
||||
WHERE table1.data <= next_row_to_process.data
|
||||
ORDER BY 1,2 ASC;
|
||||
DEBUG: generating subplan XXX_1 for CTE next_row_to_process: SELECT table1.id, table1.data, n.n FROM (functions_in_joins.table1 JOIN nextval('functions_in_joins.numbers'::regclass) n(n) ON ((table1.id OPERATOR(pg_catalog.=) n.n)))
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT n FROM nextval('functions_in_joins.numbers'::regclass) n(n)
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, table1.data, n.n FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.n FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(n bigint)) n ON ((table1.id OPERATOR(pg_catalog.=) n.n)))
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, table1.data, next_row_to_process.id, next_row_to_process.data, next_row_to_process.n FROM functions_in_joins.table1, (SELECT intermediate_result.id, intermediate_result.data, intermediate_result.n FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer, data integer, n bigint)) next_row_to_process WHERE (table1.data OPERATOR(pg_catalog.<=) next_row_to_process.data) ORDER BY table1.id, table1.data
|
||||
id | data | id | data | n
|
||||
---------------------------------------------------------------------
|
||||
1 | 1 | 2 | 4 | 2
|
||||
2 | 4 | 2 | 4 | 2
|
||||
(2 rows)
|
||||
|
||||
-- Multiple functions in an RTE
|
||||
SELECT * FROM ROWS FROM (next_k_integers(5), next_k_integers(10)) AS f(a, b),
|
||||
table1 WHERE id = a ORDER BY id ASC;
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT a, b FROM ROWS FROM(functions_in_joins.next_k_integers(5), functions_in_joins.next_k_integers(10)) f(a, b)
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT f.a, f.b, table1.id, table1.data FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) f(a, b), functions_in_joins.table1 WHERE (table1.id OPERATOR(pg_catalog.=) f.a) ORDER BY table1.id
|
||||
a | b | id | data
|
||||
---------------------------------------------------------------------
|
||||
5 | 10 | 5 | 25
|
||||
6 | 11 | 6 | 36
|
||||
7 | 12 | 7 | 49
|
||||
(3 rows)
|
||||
|
||||
-- Custom Type returning function used in a join
|
||||
RESET client_min_messages;
|
||||
CREATE TYPE min_and_max AS (
|
||||
minimum INT,
|
||||
maximum INT
|
||||
);
|
||||
SET client_min_messages TO DEBUG1;
|
||||
CREATE OR REPLACE FUNCTION max_and_min () RETURNS
|
||||
min_and_max AS $$
|
||||
DECLARE
|
||||
result min_and_max%rowtype;
|
||||
begin
|
||||
select into result min(data) as minimum, max(data) as maximum from table1;
|
||||
return result;
|
||||
end;
|
||||
$$ language plpgsql;
|
||||
DEBUG: switching to sequential query execution mode
|
||||
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
|
||||
SELECT * FROM table1 JOIN max_and_min() m ON (m.maximum = data OR m.minimum = data) ORDER BY 1,2,3,4;
|
||||
DEBUG: function does not have co-located tables
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT minimum, maximum FROM functions_in_joins.max_and_min() m(minimum, maximum)
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, table1.data, m.minimum, m.maximum FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.minimum, intermediate_result.maximum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(minimum integer, maximum integer)) m ON (((m.maximum OPERATOR(pg_catalog.=) table1.data) OR (m.minimum OPERATOR(pg_catalog.=) table1.data)))) ORDER BY table1.id, table1.data, m.minimum, m.maximum
|
||||
id | data | minimum | maximum
|
||||
---------------------------------------------------------------------
|
||||
1 | 1 | 1 | 10000
|
||||
100 | 10000 | 1 | 10000
|
||||
(2 rows)
|
||||
|
||||
-- The following tests will fail as we do not support all joins on
|
||||
-- all kinds of functions
|
||||
-- In other words, we cannot recursively plan the functions and hence
|
||||
-- the query fails on the workers
|
||||
SET client_min_messages TO ERROR;
|
||||
\set VERBOSITY terse
|
||||
-- function joins in CTE results can create lateral joins that are not supported
|
||||
-- we execute the query within a function to consolidate the error messages
|
||||
-- between different executors
|
||||
SET citus.enable_metadata_sync TO OFF;
|
||||
CREATE FUNCTION raise_failed_execution_func_join(query text) RETURNS void AS $$
|
||||
BEGIN
|
||||
EXECUTE query;
|
||||
EXCEPTION WHEN OTHERS THEN
|
||||
IF SQLERRM LIKE 'failed to execute task%' THEN
|
||||
RAISE 'Task failed to execute';
|
||||
ELSIF SQLERRM LIKE '%does not exist%' THEN
|
||||
RAISE 'Task failed to execute';
|
||||
END IF;
|
||||
END;
|
||||
$$LANGUAGE plpgsql;
|
||||
RESET citus.enable_metadata_sync;
|
||||
SELECT raise_failed_execution_func_join($$
|
||||
WITH one_row AS (
|
||||
SELECT * FROM table1 WHERE id=52
|
||||
)
|
||||
SELECT table1.id, table1.data
|
||||
FROM one_row, table1, next_k_integers(one_row.id, 5) next_five_ids
|
||||
WHERE table1.id = next_five_ids;
|
||||
$$);
|
||||
ERROR: Task failed to execute
|
||||
-- a user-defined immutable function
|
||||
SET citus.enable_metadata_sync TO OFF;
|
||||
CREATE OR REPLACE FUNCTION the_answer_to_life()
|
||||
RETURNS INTEGER IMMUTABLE AS 'SELECT 42' LANGUAGE SQL;
|
||||
RESET citus.enable_metadata_sync;
|
||||
SELECT raise_failed_execution_func_join($$
|
||||
SELECT * FROM table1 JOIN the_answer_to_life() the_answer ON (id = the_answer);
|
||||
$$);
|
||||
ERROR: Task failed to execute
|
||||
SELECT raise_failed_execution_func_join($$
|
||||
SELECT *
|
||||
FROM table1
|
||||
JOIN next_k_integers(10,5) WITH ORDINALITY next_integers
|
||||
ON (id = next_integers.result);
|
||||
$$);
|
||||
ERROR: Task failed to execute
|
||||
-- WITH ORDINALITY clause
|
||||
SELECT raise_failed_execution_func_join($$
|
||||
SELECT *
|
||||
FROM table1
|
||||
JOIN next_k_integers(10,5) WITH ORDINALITY next_integers
|
||||
ON (id = next_integers.result)
|
||||
ORDER BY id ASC;
|
||||
$$);
|
||||
ERROR: Task failed to execute
|
||||
RESET client_min_messages;
|
||||
DROP SCHEMA functions_in_joins CASCADE;
|
||||
NOTICE: drop cascades to 12 other objects
|
||||
SET search_path TO DEFAULT;
|
|
@ -1,60 +0,0 @@
|
|||
--
|
||||
-- MULTI_POOLINFO_USAGE
|
||||
--
|
||||
-- Test pooler info logic
|
||||
--
|
||||
-- Test of ability to override host/port for a node
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.next_shard_id TO 20000000;
|
||||
SELECT nodeid AS worker_1_id FROM pg_dist_node WHERE nodename = 'localhost' AND nodeport = :worker_1_port;
|
||||
worker_1_id
|
||||
---------------------------------------------------------------------
|
||||
16
|
||||
(1 row)
|
||||
|
||||
\gset
|
||||
SELECT nodeid AS worker_2_id FROM pg_dist_node WHERE nodename = 'localhost' AND nodeport = :worker_2_port;
|
||||
worker_2_id
|
||||
---------------------------------------------------------------------
|
||||
18
|
||||
(1 row)
|
||||
|
||||
\gset
|
||||
CREATE TABLE lotsa_connections (id integer, name text);
|
||||
SELECT create_distributed_table('lotsa_connections', 'id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO lotsa_connections VALUES (1, 'user'), (2, 'user'), (3, 'user'), (4, 'user');
|
||||
SELECT COUNT(*) FROM lotsa_connections;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
4
|
||||
(1 row)
|
||||
|
||||
-- put outright bad values
|
||||
\set VERBOSITY terse
|
||||
INSERT INTO pg_dist_poolinfo VALUES (:worker_1_id, 'host=failhost'),
|
||||
(:worker_2_id, 'port=9999');
|
||||
\c
|
||||
SELECT COUNT(*) FROM lotsa_connections;
|
||||
ERROR: epoll_ctl() failed: No such file or directory
|
||||
-- "re-route" worker one to node two and vice-versa
|
||||
DELETE FROM pg_dist_poolinfo;
|
||||
INSERT INTO pg_dist_poolinfo VALUES (:worker_1_id, 'port=' || :worker_2_port),
|
||||
(:worker_2_id, 'port=' || :worker_1_port);
|
||||
\c
|
||||
-- this fails because the shards of one worker won't exist on the other and shards
|
||||
-- are still looked up using the node name, not the effective connection host
|
||||
INSERT INTO lotsa_connections VALUES (1, 'user'), (2, 'user'), (3, 'user'), (4, 'user');
|
||||
ERROR: relation "public.lotsa_connections_20000000" does not exist
|
||||
-- tweak poolinfo to use 127.0.0.1 instead of localhost; should work!
|
||||
DELETE FROM pg_dist_poolinfo;
|
||||
INSERT INTO pg_dist_poolinfo VALUES (:worker_1_id, 'host=127.0.0.1 port=' || :worker_1_port),
|
||||
(:worker_2_id, 'host=127.0.0.1 port=' || :worker_2_port);
|
||||
\c
|
||||
DELETE FROM lotsa_connections;
|
||||
DROP TABLE lotsa_connections;
|
||||
DELETE FROM pg_dist_poolinfo;
|
|
@ -958,10 +958,12 @@ EXPLAIN (COSTS FALSE)
|
|||
Tasks Shown: One of 4
|
||||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> HashAggregate
|
||||
-> GroupAggregate
|
||||
Group Key: l_orderkey
|
||||
-> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part
|
||||
(13 rows)
|
||||
-> Sort
|
||||
Sort Key: l_orderkey
|
||||
-> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part
|
||||
(15 rows)
|
||||
|
||||
-- check the plan if the hash aggreate is disabled.
|
||||
SET enable_hashagg TO off;
|
||||
|
@ -982,10 +984,12 @@ EXPLAIN (COSTS FALSE)
|
|||
Tasks Shown: One of 4
|
||||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> HashAggregate
|
||||
-> GroupAggregate
|
||||
Group Key: l_orderkey
|
||||
-> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part
|
||||
(12 rows)
|
||||
-> Sort
|
||||
Sort Key: l_orderkey
|
||||
-> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part
|
||||
(14 rows)
|
||||
|
||||
SET enable_hashagg TO on;
|
||||
-- distinct on non-partition column with aggregate
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -1,6 +1,11 @@
|
|||
--
|
||||
-- WINDOW_FUNCTIONS
|
||||
-- ===================================================================
|
||||
-- test top level window functions that are pushdownable
|
||||
-- ===================================================================
|
||||
-- This test file has an alternative output because of use of
|
||||
-- incremental sort in some explain outputs in PG13
|
||||
--
|
||||
-- a very simple window function with an aggregate and a window function
|
||||
-- distribution column is on the partition by clause
|
||||
SELECT
|
||||
|
@ -1382,20 +1387,15 @@ LIMIT 5;
|
|||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Limit
|
||||
-> Incremental Sort
|
||||
-> Sort
|
||||
Sort Key: users_table.user_id, (avg(users_table.value_1)) DESC
|
||||
Presorted Key: users_table.user_id
|
||||
-> WindowAgg
|
||||
-> Incremental Sort
|
||||
-> Sort
|
||||
Sort Key: users_table.user_id, (('1'::numeric / ('1'::numeric + avg(users_table.value_1))))
|
||||
Presorted Key: users_table.user_id
|
||||
-> GroupAggregate
|
||||
-> HashAggregate
|
||||
Group Key: users_table.user_id, users_table.value_2
|
||||
-> Incremental Sort
|
||||
Sort Key: users_table.user_id, users_table.value_2
|
||||
Presorted Key: users_table.user_id
|
||||
-> Index Scan using is_index1_1400256 on users_table_1400256 users_table
|
||||
(22 rows)
|
||||
-> Seq Scan on users_table_1400256 users_table
|
||||
(17 rows)
|
||||
|
||||
EXPLAIN (COSTS FALSE)
|
||||
SELECT
|
||||
|
@ -1418,20 +1418,15 @@ LIMIT 5;
|
|||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Limit
|
||||
-> Incremental Sort
|
||||
-> Sort
|
||||
Sort Key: users_table.user_id, (avg(users_table.value_1)) DESC
|
||||
Presorted Key: users_table.user_id
|
||||
-> WindowAgg
|
||||
-> Incremental Sort
|
||||
-> Sort
|
||||
Sort Key: users_table.user_id, (('1'::numeric / ('1'::numeric + avg(users_table.value_1))))
|
||||
Presorted Key: users_table.user_id
|
||||
-> GroupAggregate
|
||||
-> HashAggregate
|
||||
Group Key: users_table.user_id, users_table.value_2
|
||||
-> Incremental Sort
|
||||
Sort Key: users_table.user_id, users_table.value_2
|
||||
Presorted Key: users_table.user_id
|
||||
-> Index Scan using is_index1_1400256 on users_table_1400256 users_table
|
||||
(22 rows)
|
||||
-> Seq Scan on users_table_1400256 users_table
|
||||
(17 rows)
|
||||
|
||||
EXPLAIN (COSTS FALSE)
|
||||
SELECT
|
||||
|
@ -1443,7 +1438,7 @@ FROM
|
|||
GROUP BY user_id, value_2
|
||||
ORDER BY user_id, avg(value_1) DESC
|
||||
LIMIT 5;
|
||||
QUERY PLAN
|
||||
QUERY PLAN
|
||||
---------------------------------------------------------------------
|
||||
Limit
|
||||
-> Sort
|
||||
|
@ -1454,20 +1449,15 @@ LIMIT 5;
|
|||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Limit
|
||||
-> Incremental Sort
|
||||
-> Sort
|
||||
Sort Key: users_table.user_id, (avg(users_table.value_1)) DESC
|
||||
Presorted Key: users_table.user_id
|
||||
-> WindowAgg
|
||||
-> Incremental Sort
|
||||
-> Sort
|
||||
Sort Key: users_table.user_id, ((1 / (1 + sum(users_table.value_2))))
|
||||
Presorted Key: users_table.user_id
|
||||
-> GroupAggregate
|
||||
-> HashAggregate
|
||||
Group Key: users_table.user_id, users_table.value_2
|
||||
-> Incremental Sort
|
||||
Sort Key: users_table.user_id, users_table.value_2
|
||||
Presorted Key: users_table.user_id
|
||||
-> Index Scan using is_index1_1400256 on users_table_1400256 users_table
|
||||
(22 rows)
|
||||
-> Seq Scan on users_table_1400256 users_table
|
||||
(17 rows)
|
||||
|
||||
EXPLAIN (COSTS FALSE)
|
||||
SELECT
|
||||
|
@ -1479,7 +1469,7 @@ FROM
|
|||
GROUP BY user_id, value_2
|
||||
ORDER BY user_id, avg(value_1) DESC
|
||||
LIMIT 5;
|
||||
QUERY PLAN
|
||||
QUERY PLAN
|
||||
---------------------------------------------------------------------
|
||||
Limit
|
||||
-> Sort
|
||||
|
@ -1490,20 +1480,15 @@ LIMIT 5;
|
|||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Limit
|
||||
-> Incremental Sort
|
||||
-> Sort
|
||||
Sort Key: users_table.user_id, (avg(users_table.value_1)) DESC
|
||||
Presorted Key: users_table.user_id
|
||||
-> WindowAgg
|
||||
-> Incremental Sort
|
||||
-> Sort
|
||||
Sort Key: users_table.user_id, (sum(users_table.value_2))
|
||||
Presorted Key: users_table.user_id
|
||||
-> GroupAggregate
|
||||
-> HashAggregate
|
||||
Group Key: users_table.user_id, users_table.value_2
|
||||
-> Incremental Sort
|
||||
Sort Key: users_table.user_id, users_table.value_2
|
||||
Presorted Key: users_table.user_id
|
||||
-> Index Scan using is_index1_1400256 on users_table_1400256 users_table
|
||||
(22 rows)
|
||||
-> Seq Scan on users_table_1400256 users_table
|
||||
(17 rows)
|
||||
|
||||
-- Grouping can be pushed down with aggregates even when window function can't
|
||||
EXPLAIN (COSTS FALSE)
|
||||
|
|
|
@ -1,6 +1,11 @@
|
|||
--
|
||||
-- WINDOW_FUNCTIONS
|
||||
-- ===================================================================
|
||||
-- test top level window functions that are pushdownable
|
||||
-- ===================================================================
|
||||
-- This test file has an alternative output because of use of
|
||||
-- incremental sort in some explain outputs in PG13
|
||||
--
|
||||
-- a very simple window function with an aggregate and a window function
|
||||
-- distribution column is on the partition by clause
|
||||
SELECT
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -1,6 +1,11 @@
|
|||
--
|
||||
-- WINDOW_FUNCTIONS
|
||||
-- ===================================================================
|
||||
-- test top level window functions that are pushdownable
|
||||
-- ===================================================================
|
||||
-- This test file has an alternative output because of use of
|
||||
-- incremental sort in some explain outputs in PG13
|
||||
--
|
||||
|
||||
-- a very simple window function with an aggregate and a window function
|
||||
-- distribution column is on the partition by clause
|
||||
|
|
Loading…
Reference in New Issue