citus/src/test/regress/expected/multi_distribution_metadata...

476 lines
14 KiB
Plaintext

-- ===================================================================
-- create test functions
-- ===================================================================
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 540000;
CREATE FUNCTION load_shard_id_array(regclass)
RETURNS bigint[]
AS 'citus'
LANGUAGE C STRICT;
CREATE FUNCTION load_shard_interval_array(bigint, anyelement)
RETURNS anyarray
AS 'citus'
LANGUAGE C STRICT;
CREATE FUNCTION load_shard_placement_array(bigint, bool)
RETURNS text[]
AS 'citus'
LANGUAGE C STRICT;
CREATE FUNCTION partition_column_id(regclass)
RETURNS smallint
AS 'citus'
LANGUAGE C STRICT;
CREATE FUNCTION partition_type(regclass)
RETURNS "char"
AS 'citus'
LANGUAGE C STRICT;
CREATE FUNCTION is_distributed_table(regclass)
RETURNS boolean
AS 'citus'
LANGUAGE C STRICT;
CREATE FUNCTION column_name_to_column_id(regclass, cstring)
RETURNS smallint
AS 'citus'
LANGUAGE C STRICT;
CREATE FUNCTION create_monolithic_shard_row(regclass)
RETURNS bigint
AS 'citus'
LANGUAGE C STRICT;
CREATE FUNCTION acquire_shared_shard_lock(bigint)
RETURNS void
AS 'citus'
LANGUAGE C STRICT;
-- ===================================================================
-- test distribution metadata functionality
-- ===================================================================
-- create hash distributed table
CREATE TABLE events_hash (
id bigint,
name text
);
SELECT master_create_distributed_table('events_hash', 'name', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
-- create worker shards
SELECT master_create_worker_shards('events_hash', 4, 2);
master_create_worker_shards
-----------------------------
(1 row)
-- set shardstate of one replication from each shard to 0 (invalid value)
UPDATE pg_dist_placement SET shardstate = 0 WHERE shardid BETWEEN 540000 AND 540003
AND groupid = (SELECT groupid FROM pg_dist_node WHERE nodeport = :worker_2_port);
-- should see above shard identifiers
SELECT load_shard_id_array('events_hash');
load_shard_id_array
-------------------------------
{540000,540001,540002,540003}
(1 row)
-- should see array with first shard range
SELECT load_shard_interval_array(540000, 0);
load_shard_interval_array
---------------------------
{-2147483648,-1073741825}
(1 row)
-- should even work for range-partitioned shards
-- create range distributed table
CREATE TABLE events_range (
id bigint,
name text
);
SELECT master_create_distributed_table('events_range', 'name', 'range');
master_create_distributed_table
---------------------------------
(1 row)
-- create empty shard
SELECT master_create_empty_shard('events_range');
master_create_empty_shard
---------------------------
540004
(1 row)
UPDATE pg_dist_shard SET
shardminvalue = 'Aardvark',
shardmaxvalue = 'Zebra'
WHERE shardid = 540004;
SELECT load_shard_interval_array(540004, ''::text);
load_shard_interval_array
---------------------------
{Aardvark,Zebra}
(1 row)
-- should see error for non-existent shard
SELECT load_shard_interval_array(540005, 0);
ERROR: could not find valid entry for shard 540005
-- should see two placements
SELECT load_shard_placement_array(540001, false);
load_shard_placement_array
-----------------------------------
{localhost:57638,localhost:57637}
(1 row)
-- only one of which is finalized
SELECT load_shard_placement_array(540001, true);
load_shard_placement_array
----------------------------
{localhost:57637}
(1 row)
-- should see error for non-existent shard
SELECT load_shard_placement_array(540001, false);
load_shard_placement_array
-----------------------------------
{localhost:57638,localhost:57637}
(1 row)
-- should see column id of 'name'
SELECT partition_column_id('events_hash');
partition_column_id
---------------------
2
(1 row)
-- should see hash partition type and fail for non-distributed tables
SELECT partition_type('events_hash');
partition_type
----------------
h
(1 row)
SELECT partition_type('pg_type');
ERROR: relation pg_type is not distributed
-- should see true for events_hash, false for others
SELECT is_distributed_table('events_hash');
is_distributed_table
----------------------
t
(1 row)
SELECT is_distributed_table('pg_type');
is_distributed_table
----------------------
f
(1 row)
SELECT is_distributed_table('pg_dist_shard');
is_distributed_table
----------------------
f
(1 row)
-- test underlying column name-id translation
SELECT column_name_to_column_id('events_hash', 'name');
column_name_to_column_id
--------------------------
2
(1 row)
SELECT column_name_to_column_id('events_hash', 'ctid');
ERROR: cannot reference system column "ctid" in relation "events_hash"
SELECT column_name_to_column_id('events_hash', 'non_existent');
ERROR: column "non_existent" of relation "events_hash" does not exist
-- drop shard rows (must drop placements first)
DELETE FROM pg_dist_placement
WHERE shardid BETWEEN 540000 AND 540004;
DELETE FROM pg_dist_shard
WHERE logicalrelid = 'events_hash'::regclass;
DELETE FROM pg_dist_shard
WHERE logicalrelid = 'events_range'::regclass;
-- verify that an eager load shows them missing
SELECT load_shard_id_array('events_hash');
load_shard_id_array
---------------------
{}
(1 row)
-- create second table to distribute
CREATE TABLE customers (
id bigint,
name text
);
-- now we'll distribute using function calls but verify metadata manually...
-- partition on id and manually inspect partition row
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey)
VALUES
('customers'::regclass, 'h', column_name_to_column('customers'::regclass, 'id'));
SELECT partmethod, column_to_column_name(logicalrelid, partkey) FROM pg_dist_partition
WHERE logicalrelid = 'customers'::regclass;
partmethod | column_to_column_name
------------+-----------------------
h | id
(1 row)
-- make one huge shard and manually inspect shard row
SELECT create_monolithic_shard_row('customers') AS new_shard_id
\gset
SELECT shardstorage, shardminvalue, shardmaxvalue FROM pg_dist_shard
WHERE shardid = :new_shard_id;
shardstorage | shardminvalue | shardmaxvalue
--------------+---------------+---------------
t | -2147483648 | 2147483647
(1 row)
-- now we'll even test our lock methods...
-- use transaction to bound how long we hold the lock
BEGIN;
-- pick up a shard lock and look for it in pg_locks
SELECT acquire_shared_shard_lock(5);
acquire_shared_shard_lock
---------------------------
(1 row)
SELECT objid, mode FROM pg_locks WHERE locktype = 'advisory' AND objid = 5;
objid | mode
-------+-----------
5 | ShareLock
(1 row)
-- commit should drop the lock
COMMIT;
-- lock should be gone now
SELECT COUNT(*) FROM pg_locks WHERE locktype = 'advisory' AND objid = 5;
count
-------
0
(1 row)
-- test get_shard_id_for_distribution_column
SET citus.shard_count TO 4;
CREATE TABLE get_shardid_test_table1(column1 int, column2 int);
SELECT create_distributed_table('get_shardid_test_table1', 'column1');
create_distributed_table
--------------------------
(1 row)
\COPY get_shardid_test_table1 FROM STDIN with delimiter '|';
SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 1);
get_shard_id_for_distribution_column
--------------------------------------
540006
(1 row)
SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 2);
get_shard_id_for_distribution_column
--------------------------------------
540009
(1 row)
SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 3);
get_shard_id_for_distribution_column
--------------------------------------
540007
(1 row)
-- verify result of the get_shard_id_for_distribution_column
\c - - - :worker_1_port
SELECT * FROM get_shardid_test_table1_540006;
column1 | column2
---------+---------
1 | 1
(1 row)
SELECT * FROM get_shardid_test_table1_540009;
column1 | column2
---------+---------
2 | 2
(1 row)
SELECT * FROM get_shardid_test_table1_540007;
column1 | column2
---------+---------
3 | 3
(1 row)
\c - - - :master_port
-- test non-existing value
SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 4);
get_shard_id_for_distribution_column
--------------------------------------
540007
(1 row)
-- test array type
SET citus.shard_count TO 4;
CREATE TABLE get_shardid_test_table2(column1 text[], column2 int);
SELECT create_distributed_table('get_shardid_test_table2', 'column1');
create_distributed_table
--------------------------
(1 row)
\COPY get_shardid_test_table2 FROM STDIN with delimiter '|';
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{a, b, c}');
get_shard_id_for_distribution_column
--------------------------------------
540013
(1 row)
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{d, e, f}');
get_shard_id_for_distribution_column
--------------------------------------
540011
(1 row)
-- verify result of the get_shard_id_for_distribution_column
\c - - - :worker_1_port
SELECT * FROM get_shardid_test_table2_540013;
column1 | column2
---------+---------
{a,b,c} | 1
(1 row)
SELECT * FROM get_shardid_test_table2_540011;
column1 | column2
---------+---------
{d,e,f} | 2
(1 row)
\c - - - :master_port
-- test mismatching data type
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', 'a');
ERROR: malformed array literal: "a"
DETAIL: Array value must start with "{" or dimension information.
-- test NULL distribution column value for hash distributed table
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2');
ERROR: distribution value cannot be NULL for tables other than reference tables.
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', NULL);
ERROR: distribution value cannot be NULL for tables other than reference tables.
-- test non-distributed table
CREATE TABLE get_shardid_test_table3(column1 int, column2 int);
SELECT get_shard_id_for_distribution_column('get_shardid_test_table3', 1);
ERROR: relation is not distributed
-- test append distributed table
SELECT create_distributed_table('get_shardid_test_table3', 'column1', 'append');
create_distributed_table
--------------------------
(1 row)
SELECT get_shard_id_for_distribution_column('get_shardid_test_table3', 1);
ERROR: finding shard id of given distribution value is only supported for hash partitioned tables, range partitioned tables and reference tables.
-- test reference table;
CREATE TABLE get_shardid_test_table4(column1 int, column2 int);
SELECT create_reference_table('get_shardid_test_table4');
create_reference_table
------------------------
(1 row)
-- test NULL distribution column value for reference table
SELECT get_shard_id_for_distribution_column('get_shardid_test_table4');
get_shard_id_for_distribution_column
--------------------------------------
540014
(1 row)
SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', NULL);
get_shard_id_for_distribution_column
--------------------------------------
540014
(1 row)
-- test different data types for reference table
SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', 1);
get_shard_id_for_distribution_column
--------------------------------------
540014
(1 row)
SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', 'a');
get_shard_id_for_distribution_column
--------------------------------------
540014
(1 row)
SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', '{a, b, c}');
get_shard_id_for_distribution_column
--------------------------------------
540014
(1 row)
-- test range distributed table
CREATE TABLE get_shardid_test_table5(column1 int, column2 int);
SELECT create_distributed_table('get_shardid_test_table5', 'column1', 'range');
create_distributed_table
--------------------------
(1 row)
-- create worker shards
SELECT master_create_empty_shard('get_shardid_test_table5');
master_create_empty_shard
---------------------------
540015
(1 row)
SELECT master_create_empty_shard('get_shardid_test_table5');
master_create_empty_shard
---------------------------
540016
(1 row)
SELECT master_create_empty_shard('get_shardid_test_table5');
master_create_empty_shard
---------------------------
540017
(1 row)
SELECT master_create_empty_shard('get_shardid_test_table5');
master_create_empty_shard
---------------------------
540018
(1 row)
-- now the comparison is done via the partition column type, which is text
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 1000 WHERE shardid = 540015;
UPDATE pg_dist_shard SET shardminvalue = 1001, shardmaxvalue = 2000 WHERE shardid = 540016;
UPDATE pg_dist_shard SET shardminvalue = 2001, shardmaxvalue = 3000 WHERE shardid = 540017;
UPDATE pg_dist_shard SET shardminvalue = 3001, shardmaxvalue = 4000 WHERE shardid = 540018;
SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', 5);
get_shard_id_for_distribution_column
--------------------------------------
540015
(1 row)
SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', 1111);
get_shard_id_for_distribution_column
--------------------------------------
540016
(1 row)
SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', 2689);
get_shard_id_for_distribution_column
--------------------------------------
540017
(1 row)
SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', 3248);
get_shard_id_for_distribution_column
--------------------------------------
540018
(1 row)
-- test non-existing value for range distributed tables
SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', 4001);
get_shard_id_for_distribution_column
--------------------------------------
0
(1 row)
SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', -999);
get_shard_id_for_distribution_column
--------------------------------------
0
(1 row)
-- clear unnecessary tables;
DROP TABLE get_shardid_test_table1, get_shardid_test_table2, get_shardid_test_table3, get_shardid_test_table4, get_shardid_test_table5;