Adds multi_schedule_hyperscale schedule

pull/3546/head
Halil Ozan Akgul 2020-03-13 11:12:45 +03:00
parent 5bf350faf9
commit a701fc774a
28 changed files with 2259 additions and 1986 deletions

View File

@ -115,6 +115,10 @@ check-multi: all
$(pg_regress_multi_check) --load-extension=citus \
-- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/multi_schedule $(EXTRA_TESTS)
check-multi-hyperscale: all
$(pg_regress_multi_check) --conninfo="$(conninfo)" --load-extension=citus \
-- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/multi_schedule_hyperscale $(EXTRA_TESTS)
check-multi-hyperscale-superuser: all
$(pg_regress_multi_check) --conninfo="$(conninfo)" --worker-1-public-hostname="$(worker_1_public_hostname)" --worker-2-public-hostname="$(worker_2_public_hostname)" --load-extension=citus \
-- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/multi_schedule_hyperscale_superuser $(EXTRA_TESTS)

View File

@ -2,7 +2,8 @@
# Only run few basic tests to set up a testing environment
# ----------
test: multi_cluster_management
test: multi_test_helpers multi_create_fdw
test: multi_test_helpers multi_test_helpers_superuser multi_create_fdw
test: multi_test_catalog_views
test: multi_create_table multi_behavioral_analytics_create_table
test: multi_load_data
test: multi_create_table_superuser multi_behavioral_analytics_create_table_superuser
test: multi_load_data multi_load_data_superuser

View File

@ -1,5 +1,5 @@
# The basic tests runs analyze which depends on shard numbers
test: multi_test_helpers
test: multi_test_helpers multi_test_helpers_superuser
test: multi_test_catalog_views
test: upgrade_basic_before
test: upgrade_type_before upgrade_ref2ref_before upgrade_distributed_function_before upgrade_rebalance_strategy_before

View File

@ -1,8 +1,6 @@
--
-- MULTI_CREATE_TABLE
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 360000;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 100000;
-- Create new table definitions for use in testing in distributed planning and
-- execution functionality. Also create indexes to boost performance. Since we
-- need to cover both reference join and partitioned join, we have created
@ -180,228 +178,6 @@ SELECT create_distributed_table('supplier_single_shard', 's_suppkey', 'append');
(1 row)
CREATE TABLE mx_table_test (col1 int, col2 text);
-- Since we're superuser, we can set the replication model to 'streaming' to
-- create a one-off MX table... but if we forget to set the replication factor to one,
-- we should see an error reminding us to fix that
SET citus.replication_model TO 'streaming';
SELECT create_distributed_table('mx_table_test', 'col1');
ERROR: replication factors above one are incompatible with the streaming replication model
HINT: Try again after reducing "citus.shard_replication_factor" to one or setting "citus.replication_model" to "statement".
-- ok, so now actually create the one-off MX table
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('mx_table_test', 'col1');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regclass;
repmodel
---------------------------------------------------------------------
s
(1 row)
DROP TABLE mx_table_test;
-- Show that master_create_distributed_table ignores citus.replication_model GUC
CREATE TABLE s_table(a int);
SELECT master_create_distributed_table('s_table', 'a', 'hash');
NOTICE: using statement-based replication
DETAIL: The current replication_model setting is 'streaming', which is not supported by master_create_distributed_table.
HINT: Use create_distributed_table to use the streaming replication model.
master_create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='s_table'::regclass;
repmodel
---------------------------------------------------------------------
c
(1 row)
-- Show that master_create_worker_shards complains when RF>1 and replication model is streaming
UPDATE pg_dist_partition SET repmodel = 's' WHERE logicalrelid='s_table'::regclass;
SELECT master_create_worker_shards('s_table', 4, 2);
ERROR: using replication factor 2 with the streaming replication model is not supported
DETAIL: The table s_table is marked as streaming replicated and the shard replication factor of streaming replicated tables must be 1.
HINT: Use replication factor 1.
DROP TABLE s_table;
RESET citus.replication_model;
-- Show that create_distributed_table with append and range distributions ignore
-- citus.replication_model GUC
SET citus.shard_replication_factor TO 2;
SET citus.replication_model TO streaming;
CREATE TABLE repmodel_test (a int);
SELECT create_distributed_table('repmodel_test', 'a', 'append');
NOTICE: using statement-based replication
DETAIL: Streaming replication is supported only for hash-distributed tables.
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
repmodel
---------------------------------------------------------------------
c
(1 row)
DROP TABLE repmodel_test;
CREATE TABLE repmodel_test (a int);
SELECT create_distributed_table('repmodel_test', 'a', 'range');
NOTICE: using statement-based replication
DETAIL: Streaming replication is supported only for hash-distributed tables.
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
repmodel
---------------------------------------------------------------------
c
(1 row)
DROP TABLE repmodel_test;
-- Show that master_create_distributed_table created statement replicated tables no matter
-- what citus.replication_model set to
CREATE TABLE repmodel_test (a int);
SELECT master_create_distributed_table('repmodel_test', 'a', 'hash');
NOTICE: using statement-based replication
DETAIL: The current replication_model setting is 'streaming', which is not supported by master_create_distributed_table.
HINT: Use create_distributed_table to use the streaming replication model.
master_create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
repmodel
---------------------------------------------------------------------
c
(1 row)
DROP TABLE repmodel_test;
CREATE TABLE repmodel_test (a int);
SELECT master_create_distributed_table('repmodel_test', 'a', 'append');
NOTICE: using statement-based replication
DETAIL: The current replication_model setting is 'streaming', which is not supported by master_create_distributed_table.
HINT: Use create_distributed_table to use the streaming replication model.
master_create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
repmodel
---------------------------------------------------------------------
c
(1 row)
DROP TABLE repmodel_test;
CREATE TABLE repmodel_test (a int);
SELECT master_create_distributed_table('repmodel_test', 'a', 'range');
NOTICE: using statement-based replication
DETAIL: The current replication_model setting is 'streaming', which is not supported by master_create_distributed_table.
HINT: Use create_distributed_table to use the streaming replication model.
master_create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
repmodel
---------------------------------------------------------------------
c
(1 row)
DROP TABLE repmodel_test;
-- Check that the replication_model overwrite behavior is the same with RF=1
SET citus.shard_replication_factor TO 1;
CREATE TABLE repmodel_test (a int);
SELECT create_distributed_table('repmodel_test', 'a', 'append');
NOTICE: using statement-based replication
DETAIL: Streaming replication is supported only for hash-distributed tables.
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
repmodel
---------------------------------------------------------------------
c
(1 row)
DROP TABLE repmodel_test;
CREATE TABLE repmodel_test (a int);
SELECT create_distributed_table('repmodel_test', 'a', 'range');
NOTICE: using statement-based replication
DETAIL: Streaming replication is supported only for hash-distributed tables.
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
repmodel
---------------------------------------------------------------------
c
(1 row)
DROP TABLE repmodel_test;
CREATE TABLE repmodel_test (a int);
SELECT master_create_distributed_table('repmodel_test', 'a', 'hash');
NOTICE: using statement-based replication
DETAIL: The current replication_model setting is 'streaming', which is not supported by master_create_distributed_table.
HINT: Use create_distributed_table to use the streaming replication model.
master_create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
repmodel
---------------------------------------------------------------------
c
(1 row)
DROP TABLE repmodel_test;
CREATE TABLE repmodel_test (a int);
SELECT master_create_distributed_table('repmodel_test', 'a', 'append');
NOTICE: using statement-based replication
DETAIL: The current replication_model setting is 'streaming', which is not supported by master_create_distributed_table.
HINT: Use create_distributed_table to use the streaming replication model.
master_create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
repmodel
---------------------------------------------------------------------
c
(1 row)
DROP TABLE repmodel_test;
CREATE TABLE repmodel_test (a int);
SELECT master_create_distributed_table('repmodel_test', 'a', 'range');
NOTICE: using statement-based replication
DETAIL: The current replication_model setting is 'streaming', which is not supported by master_create_distributed_table.
HINT: Use create_distributed_table to use the streaming replication model.
master_create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
repmodel
---------------------------------------------------------------------
c
(1 row)
DROP TABLE repmodel_test;
RESET citus.replication_model;
-- Test initial data loading
CREATE TABLE data_load_test (col1 int, col2 text, col3 serial);
INSERT INTO data_load_test VALUES (132, 'hello');
@ -527,583 +303,3 @@ WHERE col1 = 132;
DROP TABLE data_load_test1, data_load_test2;
END;
-- There should be no table on the worker node
\c - - - :worker_1_port
SELECT relname FROM pg_class WHERE relname LIKE 'data_load_test%';
relname
---------------------------------------------------------------------
(0 rows)
\c - - - :master_port
-- creating an index after loading data works
BEGIN;
CREATE TABLE data_load_test (col1 int, col2 text, col3 serial);
INSERT INTO data_load_test VALUES (132, 'hello');
SELECT create_distributed_table('data_load_test', 'col1');
NOTICE: Copying data from local table...
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE INDEX data_load_test_idx ON data_load_test (col2);
DROP TABLE data_load_test;
END;
-- popping in and out of existence in the same transaction works
BEGIN;
CREATE TABLE data_load_test (col1 int, col2 text, col3 serial);
INSERT INTO data_load_test VALUES (132, 'hello');
SELECT create_distributed_table('data_load_test', 'col1');
NOTICE: Copying data from local table...
create_distributed_table
---------------------------------------------------------------------
(1 row)
DROP TABLE data_load_test;
END;
-- but dropping after a write on the distributed table is currently disallowed
BEGIN;
CREATE TABLE data_load_test (col1 int, col2 text, col3 serial);
INSERT INTO data_load_test VALUES (132, 'hello');
SELECT create_distributed_table('data_load_test', 'col1');
NOTICE: Copying data from local table...
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO data_load_test VALUES (243, 'world');
DROP TABLE data_load_test;
END;
-- Test data loading after dropping a column
CREATE TABLE data_load_test (col1 int, col2 text, col3 text, "CoL4"")" int);
INSERT INTO data_load_test VALUES (132, 'hello', 'world');
INSERT INTO data_load_test VALUES (243, 'world', 'hello');
ALTER TABLE data_load_test DROP COLUMN col1;
SELECT create_distributed_table('data_load_test', 'col3');
NOTICE: Copying data from local table...
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT * FROM data_load_test ORDER BY col2;
col2 | col3 | CoL4")
---------------------------------------------------------------------
hello | world |
world | hello |
(2 rows)
-- make sure the tuple went to the right shard
SELECT * FROM data_load_test WHERE col3 = 'world';
col2 | col3 | CoL4")
---------------------------------------------------------------------
hello | world |
(1 row)
DROP TABLE data_load_test;
SET citus.shard_replication_factor TO default;
SET citus.shard_count to 4;
CREATE TABLE lineitem_hash_part (like lineitem);
SELECT create_distributed_table('lineitem_hash_part', 'l_orderkey');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE orders_hash_part (like orders);
SELECT create_distributed_table('orders_hash_part', 'o_orderkey');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE UNLOGGED TABLE unlogged_table
(
key text,
value text
);
SELECT create_distributed_table('unlogged_table', 'key');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT * FROM master_get_table_ddl_events('unlogged_table');
master_get_table_ddl_events
---------------------------------------------------------------------
CREATE UNLOGGED TABLE public.unlogged_table (key text, value text)
ALTER TABLE public.unlogged_table OWNER TO postgres
(2 rows)
\c - - - :worker_1_port
SELECT relpersistence FROM pg_class WHERE relname LIKE 'unlogged_table_%';
relpersistence
---------------------------------------------------------------------
u
u
u
u
(4 rows)
\c - - - :master_port
-- Test rollback of create table
BEGIN;
CREATE TABLE rollback_table(id int, name varchar(20));
SELECT create_distributed_table('rollback_table','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
ROLLBACK;
-- Table should not exist on the worker node
\c - - - :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = (SELECT oid FROM pg_class WHERE relname LIKE 'rollback_table%');
Column | Type | Modifiers
---------------------------------------------------------------------
(0 rows)
\c - - - :master_port
-- Insert 3 rows to make sure that copy after shard creation touches the same
-- worker node twice.
BEGIN;
CREATE TABLE rollback_table(id int, name varchar(20));
INSERT INTO rollback_table VALUES(1, 'Name_1');
INSERT INTO rollback_table VALUES(2, 'Name_2');
INSERT INTO rollback_table VALUES(3, 'Name_3');
SELECT create_distributed_table('rollback_table','id');
NOTICE: Copying data from local table...
create_distributed_table
---------------------------------------------------------------------
(1 row)
ROLLBACK;
-- Table should not exist on the worker node
\c - - - :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = (SELECT oid FROM pg_class WHERE relname LIKE 'rollback_table%');
Column | Type | Modifiers
---------------------------------------------------------------------
(0 rows)
\c - - - :master_port
BEGIN;
CREATE TABLE rollback_table(id int, name varchar(20));
SELECT create_distributed_table('rollback_table','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
\copy rollback_table from stdin delimiter ','
CREATE INDEX rollback_index ON rollback_table(id);
COMMIT;
-- Check the table is created
SELECT count(*) FROM rollback_table;
count
---------------------------------------------------------------------
3
(1 row)
DROP TABLE rollback_table;
BEGIN;
CREATE TABLE rollback_table(id int, name varchar(20));
SELECT create_distributed_table('rollback_table','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
\copy rollback_table from stdin delimiter ','
ROLLBACK;
-- Table should not exist on the worker node
\c - - - :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = (SELECT oid FROM pg_class WHERE relname LIKE 'rollback_table%');
Column | Type | Modifiers
---------------------------------------------------------------------
(0 rows)
\c - - - :master_port
BEGIN;
CREATE TABLE tt1(id int);
SELECT create_distributed_table('tt1','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE tt2(id int);
SELECT create_distributed_table('tt2','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO tt1 VALUES(1);
INSERT INTO tt2 SELECT * FROM tt1 WHERE id = 1;
COMMIT;
-- Table should exist on the worker node
\c - - - :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = 'public.tt1_360069'::regclass;
Column | Type | Modifiers
---------------------------------------------------------------------
id | integer |
(1 row)
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = 'public.tt2_360073'::regclass;
Column | Type | Modifiers
---------------------------------------------------------------------
id | integer |
(1 row)
\c - - - :master_port
DROP TABLE tt1;
DROP TABLE tt2;
-- It is known that creating a table with master_create_empty_shard is not
-- transactional, so table stay remaining on the worker node after the rollback
BEGIN;
CREATE TABLE append_tt1(id int);
SELECT create_distributed_table('append_tt1','id','append');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT master_create_empty_shard('append_tt1');
master_create_empty_shard
---------------------------------------------------------------------
360077
(1 row)
ROLLBACK;
-- Table exists on the worker node.
\c - - - :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = 'public.append_tt1_360077'::regclass;
Column | Type | Modifiers
---------------------------------------------------------------------
id | integer |
(1 row)
\c - - - :master_port
-- There should be no table on the worker node
\c - - - :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = (SELECT oid from pg_class WHERE relname LIKE 'public.tt1%');
Column | Type | Modifiers
---------------------------------------------------------------------
(0 rows)
\c - - - :master_port
-- Queries executing with router executor is allowed in the same transaction
-- with create_distributed_table
BEGIN;
CREATE TABLE tt1(id int);
INSERT INTO tt1 VALUES(1);
SELECT create_distributed_table('tt1','id');
NOTICE: Copying data from local table...
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO tt1 VALUES(2);
SELECT * FROM tt1 WHERE id = 1;
id
---------------------------------------------------------------------
1
(1 row)
COMMIT;
-- Placements should be created on the worker
\c - - - :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = 'public.tt1_360078'::regclass;
Column | Type | Modifiers
---------------------------------------------------------------------
id | integer |
(1 row)
\c - - - :master_port
DROP TABLE tt1;
BEGIN;
CREATE TABLE tt1(id int);
SELECT create_distributed_table('tt1','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
DROP TABLE tt1;
COMMIT;
-- There should be no table on the worker node
\c - - - :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = (SELECT oid from pg_class WHERE relname LIKE 'tt1%');
Column | Type | Modifiers
---------------------------------------------------------------------
(0 rows)
\c - - - :master_port
-- Tests with create_distributed_table & DDL & DML commands
-- Test should pass since GetPlacementListConnection can provide connections
-- in this order of execution
CREATE TABLE sample_table(id int);
SELECT create_distributed_table('sample_table','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
BEGIN;
CREATE TABLE stage_table (LIKE sample_table);
\COPY stage_table FROM stdin; -- Note that this operation is a local copy
SELECT create_distributed_table('stage_table', 'id');
NOTICE: Copying data from local table...
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO sample_table SELECT * FROM stage_table;
DROP TABLE stage_table;
SELECT * FROM sample_table WHERE id = 3;
id
---------------------------------------------------------------------
3
(1 row)
COMMIT;
-- Show that rows of sample_table are updated
SELECT count(*) FROM sample_table;
count
---------------------------------------------------------------------
4
(1 row)
DROP table sample_table;
-- Test as create_distributed_table - copy - create_distributed_table - copy
-- This combination is used by tests written by some ORMs.
BEGIN;
CREATE TABLE tt1(id int);
SELECT create_distributed_table('tt1','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
\COPY tt1 from stdin;
CREATE TABLE tt2(like tt1);
SELECT create_distributed_table('tt2','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
\COPY tt2 from stdin;
INSERT INTO tt1 SELECT * FROM tt2;
SELECT * FROM tt1 WHERE id = 3;
id
---------------------------------------------------------------------
3
(1 row)
SELECT * FROM tt2 WHERE id = 6;
id
---------------------------------------------------------------------
6
(1 row)
END;
SELECT count(*) FROM tt1;
count
---------------------------------------------------------------------
6
(1 row)
-- the goal of the following test is to make sure that
-- both create_reference_table and create_distributed_table
-- calls creates the schemas without leading to any deadlocks
-- first create reference table, then hash distributed table
BEGIN;
CREATE SCHEMA sc;
CREATE TABLE sc.ref(a int);
insert into sc.ref SELECT s FROM generate_series(0, 100) s;
SELECT create_reference_table('sc.ref');
NOTICE: Copying data from local table...
create_reference_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE sc.hash(a int);
insert into sc.hash SELECT s FROM generate_series(0, 100) s;
SELECT create_distributed_table('sc.hash', 'a');
NOTICE: Copying data from local table...
create_distributed_table
---------------------------------------------------------------------
(1 row)
COMMIT;
-- first create hash distributed table, then reference table
BEGIN;
CREATE SCHEMA sc2;
CREATE TABLE sc2.hash(a int);
insert into sc2.hash SELECT s FROM generate_series(0, 100) s;
SELECT create_distributed_table('sc2.hash', 'a');
NOTICE: Copying data from local table...
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE sc2.ref(a int);
insert into sc2.ref SELECT s FROM generate_series(0, 100) s;
SELECT create_reference_table('sc2.ref');
NOTICE: Copying data from local table...
create_reference_table
---------------------------------------------------------------------
(1 row)
COMMIT;
SET citus.shard_count TO 4;
BEGIN;
CREATE SCHEMA sc3;
CREATE TABLE sc3.alter_replica_table
(
name text,
id int PRIMARY KEY
);
ALTER TABLE sc3.alter_replica_table REPLICA IDENTITY USING INDEX alter_replica_table_pkey;
SELECT create_distributed_table('sc3.alter_replica_table', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
COMMIT;
SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc3' LIMIT 1$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,i)
(localhost,57638,t,i)
(2 rows)
BEGIN;
CREATE SCHEMA sc4;
CREATE TABLE sc4.alter_replica_table
(
name text,
id int PRIMARY KEY
);
INSERT INTO sc4.alter_replica_table(id) SELECT generate_series(1,100);
SET search_path = 'sc4';
ALTER TABLE alter_replica_table REPLICA IDENTITY USING INDEX alter_replica_table_pkey;
SELECT create_distributed_table('alter_replica_table', 'id');
NOTICE: Copying data from local table...
create_distributed_table
---------------------------------------------------------------------
(1 row)
COMMIT;
SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc4' LIMIT 1$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,i)
(localhost,57638,t,i)
(2 rows)
SET search_path = 'public';
BEGIN;
CREATE SCHEMA sc5;
CREATE TABLE sc5.alter_replica_table
(
name text,
id int NOT NULL
);
INSERT INTO sc5.alter_replica_table(id) SELECT generate_series(1,100);
ALTER TABLE sc5.alter_replica_table REPLICA IDENTITY FULL;
SELECT create_distributed_table('sc5.alter_replica_table', 'id');
NOTICE: Copying data from local table...
create_distributed_table
---------------------------------------------------------------------
(1 row)
COMMIT;
SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc5' LIMIT 1$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,f)
(localhost,57638,t,f)
(2 rows)
BEGIN;
CREATE SCHEMA sc6;
CREATE TABLE sc6.alter_replica_table
(
name text,
id int NOT NULL
);
INSERT INTO sc6.alter_replica_table(id) SELECT generate_series(1,100);
CREATE UNIQUE INDEX unique_idx ON sc6.alter_replica_table(id);
ALTER TABLE sc6.alter_replica_table REPLICA IDENTITY USING INDEX unique_idx;
SELECT create_distributed_table('sc6.alter_replica_table', 'id');
NOTICE: Copying data from local table...
create_distributed_table
---------------------------------------------------------------------
(1 row)
COMMIT;
SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc6' LIMIT 1$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,i)
(localhost,57638,t,i)
(2 rows)
BEGIN;
CREATE TABLE alter_replica_table
(
name text,
id int NOT NULL
);
INSERT INTO alter_replica_table(id) SELECT generate_series(1,100);
CREATE UNIQUE INDEX unique_idx ON alter_replica_table(id);
ALTER TABLE alter_replica_table REPLICA IDENTITY USING INDEX unique_idx;
SELECT create_distributed_table('alter_replica_table', 'id');
NOTICE: Copying data from local table...
create_distributed_table
---------------------------------------------------------------------
(1 row)
COMMIT;
SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='public' LIMIT 1$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,i)
(localhost,57638,t,i)
(2 rows)
DROP TABLE tt1;
DROP TABLE tt2;
DROP TABLE alter_replica_table;
DROP SCHEMA sc CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table sc.ref
drop cascades to table sc.hash
DROP SCHEMA sc2 CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table sc2.hash
drop cascades to table sc2.ref
DROP SCHEMA sc3 CASCADE;
NOTICE: drop cascades to table sc3.alter_replica_table
DROP SCHEMA sc4 CASCADE;
NOTICE: drop cascades to table sc4.alter_replica_table
DROP SCHEMA sc5 CASCADE;
NOTICE: drop cascades to table sc5.alter_replica_table
DROP SCHEMA sc6 CASCADE;
NOTICE: drop cascades to table sc6.alter_replica_table

View File

@ -0,0 +1,804 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 360000;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 100000;
-- Since we're superuser, we can set the replication model to 'streaming' to
-- create a one-off MX table... but if we forget to set the replication factor to one,
-- we should see an error reminding us to fix that
SET citus.replication_model TO 'streaming';
SELECT create_distributed_table('mx_table_test', 'col1');
ERROR: replication factors above one are incompatible with the streaming replication model
HINT: Try again after reducing "citus.shard_replication_factor" to one or setting "citus.replication_model" to "statement".
-- ok, so now actually create the one-off MX table
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('mx_table_test', 'col1');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regclass;
repmodel
---------------------------------------------------------------------
s
(1 row)
DROP TABLE mx_table_test;
-- Show that master_create_distributed_table ignores citus.replication_model GUC
CREATE TABLE s_table(a int);
SELECT master_create_distributed_table('s_table', 'a', 'hash');
NOTICE: using statement-based replication
DETAIL: The current replication_model setting is 'streaming', which is not supported by master_create_distributed_table.
HINT: Use create_distributed_table to use the streaming replication model.
master_create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='s_table'::regclass;
repmodel
---------------------------------------------------------------------
c
(1 row)
-- Show that master_create_worker_shards complains when RF>1 and replication model is streaming
UPDATE pg_dist_partition SET repmodel = 's' WHERE logicalrelid='s_table'::regclass;
SELECT master_create_worker_shards('s_table', 4, 2);
ERROR: using replication factor 2 with the streaming replication model is not supported
DETAIL: The table s_table is marked as streaming replicated and the shard replication factor of streaming replicated tables must be 1.
HINT: Use replication factor 1.
DROP TABLE s_table;
RESET citus.replication_model;
-- Show that create_distributed_table with append and range distributions ignore
-- citus.replication_model GUC
SET citus.shard_replication_factor TO 2;
SET citus.replication_model TO streaming;
CREATE TABLE repmodel_test (a int);
SELECT create_distributed_table('repmodel_test', 'a', 'append');
NOTICE: using statement-based replication
DETAIL: Streaming replication is supported only for hash-distributed tables.
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
repmodel
---------------------------------------------------------------------
c
(1 row)
DROP TABLE repmodel_test;
CREATE TABLE repmodel_test (a int);
SELECT create_distributed_table('repmodel_test', 'a', 'range');
NOTICE: using statement-based replication
DETAIL: Streaming replication is supported only for hash-distributed tables.
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
repmodel
---------------------------------------------------------------------
c
(1 row)
DROP TABLE repmodel_test;
-- Show that master_create_distributed_table created statement replicated tables no matter
-- what citus.replication_model set to
CREATE TABLE repmodel_test (a int);
SELECT master_create_distributed_table('repmodel_test', 'a', 'hash');
NOTICE: using statement-based replication
DETAIL: The current replication_model setting is 'streaming', which is not supported by master_create_distributed_table.
HINT: Use create_distributed_table to use the streaming replication model.
master_create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
repmodel
---------------------------------------------------------------------
c
(1 row)
DROP TABLE repmodel_test;
CREATE TABLE repmodel_test (a int);
SELECT master_create_distributed_table('repmodel_test', 'a', 'append');
NOTICE: using statement-based replication
DETAIL: The current replication_model setting is 'streaming', which is not supported by master_create_distributed_table.
HINT: Use create_distributed_table to use the streaming replication model.
master_create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
repmodel
---------------------------------------------------------------------
c
(1 row)
DROP TABLE repmodel_test;
CREATE TABLE repmodel_test (a int);
SELECT master_create_distributed_table('repmodel_test', 'a', 'range');
NOTICE: using statement-based replication
DETAIL: The current replication_model setting is 'streaming', which is not supported by master_create_distributed_table.
HINT: Use create_distributed_table to use the streaming replication model.
master_create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
repmodel
---------------------------------------------------------------------
c
(1 row)
DROP TABLE repmodel_test;
-- Check that the replication_model overwrite behavior is the same with RF=1
SET citus.shard_replication_factor TO 1;
CREATE TABLE repmodel_test (a int);
SELECT create_distributed_table('repmodel_test', 'a', 'append');
NOTICE: using statement-based replication
DETAIL: Streaming replication is supported only for hash-distributed tables.
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
repmodel
---------------------------------------------------------------------
c
(1 row)
DROP TABLE repmodel_test;
CREATE TABLE repmodel_test (a int);
SELECT create_distributed_table('repmodel_test', 'a', 'range');
NOTICE: using statement-based replication
DETAIL: Streaming replication is supported only for hash-distributed tables.
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
repmodel
---------------------------------------------------------------------
c
(1 row)
DROP TABLE repmodel_test;
CREATE TABLE repmodel_test (a int);
SELECT master_create_distributed_table('repmodel_test', 'a', 'hash');
NOTICE: using statement-based replication
DETAIL: The current replication_model setting is 'streaming', which is not supported by master_create_distributed_table.
HINT: Use create_distributed_table to use the streaming replication model.
master_create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
repmodel
---------------------------------------------------------------------
c
(1 row)
DROP TABLE repmodel_test;
CREATE TABLE repmodel_test (a int);
SELECT master_create_distributed_table('repmodel_test', 'a', 'append');
NOTICE: using statement-based replication
DETAIL: The current replication_model setting is 'streaming', which is not supported by master_create_distributed_table.
HINT: Use create_distributed_table to use the streaming replication model.
master_create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
repmodel
---------------------------------------------------------------------
c
(1 row)
DROP TABLE repmodel_test;
CREATE TABLE repmodel_test (a int);
SELECT master_create_distributed_table('repmodel_test', 'a', 'range');
NOTICE: using statement-based replication
DETAIL: The current replication_model setting is 'streaming', which is not supported by master_create_distributed_table.
HINT: Use create_distributed_table to use the streaming replication model.
master_create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
repmodel
---------------------------------------------------------------------
c
(1 row)
DROP TABLE repmodel_test;
RESET citus.replication_model;
-- There should be no table on the worker node
\c - - - :worker_1_port
SELECT relname FROM pg_class WHERE relname LIKE 'data_load_test%';
relname
---------------------------------------------------------------------
(0 rows)
\c - - - :master_port
-- creating an index after loading data works
BEGIN;
CREATE TABLE data_load_test (col1 int, col2 text, col3 serial);
INSERT INTO data_load_test VALUES (132, 'hello');
SELECT create_distributed_table('data_load_test', 'col1');
NOTICE: Copying data from local table...
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE INDEX data_load_test_idx ON data_load_test (col2);
DROP TABLE data_load_test;
END;
-- popping in and out of existence in the same transaction works
BEGIN;
CREATE TABLE data_load_test (col1 int, col2 text, col3 serial);
INSERT INTO data_load_test VALUES (132, 'hello');
SELECT create_distributed_table('data_load_test', 'col1');
NOTICE: Copying data from local table...
create_distributed_table
---------------------------------------------------------------------
(1 row)
DROP TABLE data_load_test;
END;
-- but dropping after a write on the distributed table is currently disallowed
BEGIN;
CREATE TABLE data_load_test (col1 int, col2 text, col3 serial);
INSERT INTO data_load_test VALUES (132, 'hello');
SELECT create_distributed_table('data_load_test', 'col1');
NOTICE: Copying data from local table...
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO data_load_test VALUES (243, 'world');
DROP TABLE data_load_test;
END;
-- Test data loading after dropping a column
CREATE TABLE data_load_test (col1 int, col2 text, col3 text, "CoL4"")" int);
INSERT INTO data_load_test VALUES (132, 'hello', 'world');
INSERT INTO data_load_test VALUES (243, 'world', 'hello');
ALTER TABLE data_load_test DROP COLUMN col1;
SELECT create_distributed_table('data_load_test', 'col3');
NOTICE: Copying data from local table...
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT * FROM data_load_test ORDER BY col2;
col2 | col3 | CoL4")
---------------------------------------------------------------------
hello | world |
world | hello |
(2 rows)
-- make sure the tuple went to the right shard
SELECT * FROM data_load_test WHERE col3 = 'world';
col2 | col3 | CoL4")
---------------------------------------------------------------------
hello | world |
(1 row)
DROP TABLE data_load_test;
SET citus.shard_replication_factor TO default;
SET citus.shard_count to 4;
CREATE TABLE lineitem_hash_part (like lineitem);
SELECT create_distributed_table('lineitem_hash_part', 'l_orderkey');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE orders_hash_part (like orders);
SELECT create_distributed_table('orders_hash_part', 'o_orderkey');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE UNLOGGED TABLE unlogged_table
(
key text,
value text
);
SELECT create_distributed_table('unlogged_table', 'key');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT * FROM master_get_table_ddl_events('unlogged_table');
master_get_table_ddl_events
---------------------------------------------------------------------
CREATE UNLOGGED TABLE public.unlogged_table (key text, value text)
ALTER TABLE public.unlogged_table OWNER TO postgres
(2 rows)
\c - - - :worker_1_port
SELECT relpersistence FROM pg_class WHERE relname LIKE 'unlogged_table_%';
relpersistence
---------------------------------------------------------------------
u
u
u
u
(4 rows)
\c - - - :master_port
-- Test rollback of create table
BEGIN;
CREATE TABLE rollback_table(id int, name varchar(20));
SELECT create_distributed_table('rollback_table','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
ROLLBACK;
-- Table should not exist on the worker node
\c - - - :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = (SELECT oid FROM pg_class WHERE relname LIKE 'rollback_table%');
Column | Type | Modifiers
---------------------------------------------------------------------
(0 rows)
\c - - - :master_port
-- Insert 3 rows to make sure that copy after shard creation touches the same
-- worker node twice.
BEGIN;
CREATE TABLE rollback_table(id int, name varchar(20));
INSERT INTO rollback_table VALUES(1, 'Name_1');
INSERT INTO rollback_table VALUES(2, 'Name_2');
INSERT INTO rollback_table VALUES(3, 'Name_3');
SELECT create_distributed_table('rollback_table','id');
NOTICE: Copying data from local table...
create_distributed_table
---------------------------------------------------------------------
(1 row)
ROLLBACK;
-- Table should not exist on the worker node
\c - - - :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = (SELECT oid FROM pg_class WHERE relname LIKE 'rollback_table%');
Column | Type | Modifiers
---------------------------------------------------------------------
(0 rows)
\c - - - :master_port
BEGIN;
CREATE TABLE rollback_table(id int, name varchar(20));
SELECT create_distributed_table('rollback_table','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
\copy rollback_table from stdin delimiter ','
CREATE INDEX rollback_index ON rollback_table(id);
COMMIT;
-- Check the table is created
SELECT count(*) FROM rollback_table;
count
---------------------------------------------------------------------
3
(1 row)
DROP TABLE rollback_table;
BEGIN;
CREATE TABLE rollback_table(id int, name varchar(20));
SELECT create_distributed_table('rollback_table','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
\copy rollback_table from stdin delimiter ','
ROLLBACK;
-- Table should not exist on the worker node
\c - - - :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = (SELECT oid FROM pg_class WHERE relname LIKE 'rollback_table%');
Column | Type | Modifiers
---------------------------------------------------------------------
(0 rows)
\c - - - :master_port
BEGIN;
CREATE TABLE tt1(id int);
SELECT create_distributed_table('tt1','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE tt2(id int);
SELECT create_distributed_table('tt2','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO tt1 VALUES(1);
INSERT INTO tt2 SELECT * FROM tt1 WHERE id = 1;
COMMIT;
-- Table should exist on the worker node
\c - - - :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = 'public.tt1_360069'::regclass;
Column | Type | Modifiers
---------------------------------------------------------------------
id | integer |
(1 row)
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = 'public.tt2_360073'::regclass;
Column | Type | Modifiers
---------------------------------------------------------------------
id | integer |
(1 row)
\c - - - :master_port
DROP TABLE tt1;
DROP TABLE tt2;
-- It is known that creating a table with master_create_empty_shard is not
-- transactional, so table stay remaining on the worker node after the rollback
BEGIN;
CREATE TABLE append_tt1(id int);
SELECT create_distributed_table('append_tt1','id','append');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT master_create_empty_shard('append_tt1');
master_create_empty_shard
---------------------------------------------------------------------
360077
(1 row)
ROLLBACK;
-- Table exists on the worker node.
\c - - - :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = 'public.append_tt1_360077'::regclass;
Column | Type | Modifiers
---------------------------------------------------------------------
id | integer |
(1 row)
\c - - - :master_port
-- There should be no table on the worker node
\c - - - :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = (SELECT oid from pg_class WHERE relname LIKE 'public.tt1%');
Column | Type | Modifiers
---------------------------------------------------------------------
(0 rows)
\c - - - :master_port
-- Queries executing with router executor is allowed in the same transaction
-- with create_distributed_table
BEGIN;
CREATE TABLE tt1(id int);
INSERT INTO tt1 VALUES(1);
SELECT create_distributed_table('tt1','id');
NOTICE: Copying data from local table...
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO tt1 VALUES(2);
SELECT * FROM tt1 WHERE id = 1;
id
---------------------------------------------------------------------
1
(1 row)
COMMIT;
-- Placements should be created on the worker
\c - - - :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = 'public.tt1_360078'::regclass;
Column | Type | Modifiers
---------------------------------------------------------------------
id | integer |
(1 row)
\c - - - :master_port
DROP TABLE tt1;
BEGIN;
CREATE TABLE tt1(id int);
SELECT create_distributed_table('tt1','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
DROP TABLE tt1;
COMMIT;
-- There should be no table on the worker node
\c - - - :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = (SELECT oid from pg_class WHERE relname LIKE 'tt1%');
Column | Type | Modifiers
---------------------------------------------------------------------
(0 rows)
\c - - - :master_port
-- Tests with create_distributed_table & DDL & DML commands
-- Test should pass since GetPlacementListConnection can provide connections
-- in this order of execution
CREATE TABLE sample_table(id int);
SELECT create_distributed_table('sample_table','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
BEGIN;
CREATE TABLE stage_table (LIKE sample_table);
\COPY stage_table FROM stdin; -- Note that this operation is a local copy
SELECT create_distributed_table('stage_table', 'id');
NOTICE: Copying data from local table...
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO sample_table SELECT * FROM stage_table;
DROP TABLE stage_table;
SELECT * FROM sample_table WHERE id = 3;
id
---------------------------------------------------------------------
3
(1 row)
COMMIT;
-- Show that rows of sample_table are updated
SELECT count(*) FROM sample_table;
count
---------------------------------------------------------------------
4
(1 row)
DROP table sample_table;
-- Test as create_distributed_table - copy - create_distributed_table - copy
-- This combination is used by tests written by some ORMs.
BEGIN;
CREATE TABLE tt1(id int);
SELECT create_distributed_table('tt1','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
\COPY tt1 from stdin;
CREATE TABLE tt2(like tt1);
SELECT create_distributed_table('tt2','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
\COPY tt2 from stdin;
INSERT INTO tt1 SELECT * FROM tt2;
SELECT * FROM tt1 WHERE id = 3;
id
---------------------------------------------------------------------
3
(1 row)
SELECT * FROM tt2 WHERE id = 6;
id
---------------------------------------------------------------------
6
(1 row)
END;
SELECT count(*) FROM tt1;
count
---------------------------------------------------------------------
6
(1 row)
-- the goal of the following test is to make sure that
-- both create_reference_table and create_distributed_table
-- calls creates the schemas without leading to any deadlocks
-- first create reference table, then hash distributed table
BEGIN;
CREATE SCHEMA sc;
CREATE TABLE sc.ref(a int);
insert into sc.ref SELECT s FROM generate_series(0, 100) s;
SELECT create_reference_table('sc.ref');
NOTICE: Copying data from local table...
create_reference_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE sc.hash(a int);
insert into sc.hash SELECT s FROM generate_series(0, 100) s;
SELECT create_distributed_table('sc.hash', 'a');
NOTICE: Copying data from local table...
create_distributed_table
---------------------------------------------------------------------
(1 row)
COMMIT;
-- first create hash distributed table, then reference table
BEGIN;
CREATE SCHEMA sc2;
CREATE TABLE sc2.hash(a int);
insert into sc2.hash SELECT s FROM generate_series(0, 100) s;
SELECT create_distributed_table('sc2.hash', 'a');
NOTICE: Copying data from local table...
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE sc2.ref(a int);
insert into sc2.ref SELECT s FROM generate_series(0, 100) s;
SELECT create_reference_table('sc2.ref');
NOTICE: Copying data from local table...
create_reference_table
---------------------------------------------------------------------
(1 row)
COMMIT;
SET citus.shard_count TO 4;
BEGIN;
CREATE SCHEMA sc3;
CREATE TABLE sc3.alter_replica_table
(
name text,
id int PRIMARY KEY
);
ALTER TABLE sc3.alter_replica_table REPLICA IDENTITY USING INDEX alter_replica_table_pkey;
SELECT create_distributed_table('sc3.alter_replica_table', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
COMMIT;
SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc3' LIMIT 1$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,i)
(localhost,57638,t,i)
(2 rows)
BEGIN;
CREATE SCHEMA sc4;
CREATE TABLE sc4.alter_replica_table
(
name text,
id int PRIMARY KEY
);
INSERT INTO sc4.alter_replica_table(id) SELECT generate_series(1,100);
SET search_path = 'sc4';
ALTER TABLE alter_replica_table REPLICA IDENTITY USING INDEX alter_replica_table_pkey;
SELECT create_distributed_table('alter_replica_table', 'id');
NOTICE: Copying data from local table...
create_distributed_table
---------------------------------------------------------------------
(1 row)
COMMIT;
SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc4' LIMIT 1$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,i)
(localhost,57638,t,i)
(2 rows)
SET search_path = 'public';
BEGIN;
CREATE SCHEMA sc5;
CREATE TABLE sc5.alter_replica_table
(
name text,
id int NOT NULL
);
INSERT INTO sc5.alter_replica_table(id) SELECT generate_series(1,100);
ALTER TABLE sc5.alter_replica_table REPLICA IDENTITY FULL;
SELECT create_distributed_table('sc5.alter_replica_table', 'id');
NOTICE: Copying data from local table...
create_distributed_table
---------------------------------------------------------------------
(1 row)
COMMIT;
SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc5' LIMIT 1$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,f)
(localhost,57638,t,f)
(2 rows)
BEGIN;
CREATE SCHEMA sc6;
CREATE TABLE sc6.alter_replica_table
(
name text,
id int NOT NULL
);
INSERT INTO sc6.alter_replica_table(id) SELECT generate_series(1,100);
CREATE UNIQUE INDEX unique_idx ON sc6.alter_replica_table(id);
ALTER TABLE sc6.alter_replica_table REPLICA IDENTITY USING INDEX unique_idx;
SELECT create_distributed_table('sc6.alter_replica_table', 'id');
NOTICE: Copying data from local table...
create_distributed_table
---------------------------------------------------------------------
(1 row)
COMMIT;
SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc6' LIMIT 1$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,i)
(localhost,57638,t,i)
(2 rows)
BEGIN;
CREATE TABLE alter_replica_table
(
name text,
id int NOT NULL
);
INSERT INTO alter_replica_table(id) SELECT generate_series(1,100);
CREATE UNIQUE INDEX unique_idx ON alter_replica_table(id);
ALTER TABLE alter_replica_table REPLICA IDENTITY USING INDEX unique_idx;
SELECT create_distributed_table('alter_replica_table', 'id');
NOTICE: Copying data from local table...
create_distributed_table
---------------------------------------------------------------------
(1 row)
COMMIT;
SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='public' LIMIT 1$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,i)
(localhost,57638,t,i)
(2 rows)
DROP TABLE tt1;
DROP TABLE tt2;
DROP TABLE alter_replica_table;
DROP SCHEMA sc CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table sc.ref
drop cascades to table sc.hash
DROP SCHEMA sc2 CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table sc2.hash
drop cascades to table sc2.ref
DROP SCHEMA sc3 CASCADE;
NOTICE: drop cascades to table sc3.alter_replica_table
DROP SCHEMA sc4 CASCADE;
NOTICE: drop cascades to table sc4.alter_replica_table
DROP SCHEMA sc5 CASCADE;
NOTICE: drop cascades to table sc5.alter_replica_table
DROP SCHEMA sc6 CASCADE;
NOTICE: drop cascades to table sc6.alter_replica_table

View File

@ -80,45 +80,6 @@ BEGIN
RETURN true;
END;
$func$;
CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000)
RETURNS void
LANGUAGE C STRICT
AS 'citus';
-- set sync intervals to less than 15s so wait_until_metadata_sync never times out
ALTER SYSTEM SET citus.metadata_sync_interval TO 3000;
ALTER SYSTEM SET citus.metadata_sync_retry_interval TO 500;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
-- Verifies pg_dist_node and pg_dist_palcement in the given worker matches the ones in coordinator
CREATE OR REPLACE FUNCTION verify_metadata(hostname TEXT, port INTEGER, master_port INTEGER DEFAULT 57636)
RETURNS BOOLEAN
LANGUAGE sql
AS $$
SELECT wait_until_metadata_sync();
WITH dist_node_summary AS (
SELECT 'SELECT jsonb_agg(ROW(nodeid, groupid, nodename, nodeport, isactive) ORDER BY nodeid) FROM pg_dist_node' as query
), dist_node_check AS (
SELECT count(distinct result) = 1 AS matches
FROM dist_node_summary CROSS JOIN LATERAL
master_run_on_worker(ARRAY[hostname, 'localhost'], ARRAY[port, master_port],
ARRAY[dist_node_summary.query, dist_node_summary.query],
false)
), dist_placement_summary AS (
SELECT 'SELECT jsonb_agg(pg_dist_placement ORDER BY shardid) FROM pg_dist_placement)' AS query
), dist_placement_check AS (
SELECT count(distinct result) = 1 AS matches
FROM dist_placement_summary CROSS JOIN LATERAL
master_run_on_worker(ARRAY[hostname, 'localhost'], ARRAY[port, master_port],
ARRAY[dist_placement_summary.query, dist_placement_summary.query],
false)
)
SELECT dist_node_check.matches AND dist_placement_check.matches
FROM dist_node_check CROSS JOIN dist_placement_check
$$;
--
-- Procedure for creating shards for range partitioned distributed table.
--
@ -135,15 +96,3 @@ BEGIN
END LOOP;
END;
$$ LANGUAGE plpgsql;
-- partition_task_list_results tests the internal PartitionTasklistResults function
CREATE OR REPLACE FUNCTION pg_catalog.partition_task_list_results(resultIdPrefix text,
query text,
target_table regclass,
binaryFormat bool DEFAULT true)
RETURNS TABLE(resultId text,
nodeId int,
rowCount bigint,
targetShardId bigint,
targetShardIndex int)
LANGUAGE C STRICT VOLATILE
AS 'citus', $$partition_task_list_results$$;

View File

@ -0,0 +1,51 @@
CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000)
RETURNS void
LANGUAGE C STRICT
AS 'citus';
-- set sync intervals to less than 15s so wait_until_metadata_sync never times out
ALTER SYSTEM SET citus.metadata_sync_interval TO 3000;
ALTER SYSTEM SET citus.metadata_sync_retry_interval TO 500;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
-- Verifies pg_dist_node and pg_dist_palcement in the given worker matches the ones in coordinator
CREATE OR REPLACE FUNCTION verify_metadata(hostname TEXT, port INTEGER, master_port INTEGER DEFAULT 57636)
RETURNS BOOLEAN
LANGUAGE sql
AS $$
SELECT wait_until_metadata_sync();
WITH dist_node_summary AS (
SELECT 'SELECT jsonb_agg(ROW(nodeid, groupid, nodename, nodeport, isactive) ORDER BY nodeid) FROM pg_dist_node' as query
), dist_node_check AS (
SELECT count(distinct result) = 1 AS matches
FROM dist_node_summary CROSS JOIN LATERAL
master_run_on_worker(ARRAY[hostname, 'localhost'], ARRAY[port, master_port],
ARRAY[dist_node_summary.query, dist_node_summary.query],
false)
), dist_placement_summary AS (
SELECT 'SELECT jsonb_agg(pg_dist_placement ORDER BY shardid) FROM pg_dist_placement)' AS query
), dist_placement_check AS (
SELECT count(distinct result) = 1 AS matches
FROM dist_placement_summary CROSS JOIN LATERAL
master_run_on_worker(ARRAY[hostname, 'localhost'], ARRAY[port, master_port],
ARRAY[dist_placement_summary.query, dist_placement_summary.query],
false)
)
SELECT dist_node_check.matches AND dist_placement_check.matches
FROM dist_node_check CROSS JOIN dist_placement_check
$$;
-- partition_task_list_results tests the internal PartitionTasklistResults function
CREATE OR REPLACE FUNCTION pg_catalog.partition_task_list_results(resultIdPrefix text,
query text,
target_table regclass,
binaryFormat bool DEFAULT true)
RETURNS TABLE(resultId text,
nodeId int,
rowCount bigint,
targetShardId bigint,
targetShardIndex int)
LANGUAGE C STRICT VOLATILE
AS 'citus', $$partition_task_list_results$$;

View File

@ -3,5 +3,5 @@ test: failure_test_helpers
# this should only be run by pg_regress_multi, you don't need it
test: failure_setup
test: multi_test_helpers
test: multi_test_helpers multi_test_helpers_superuser
test: multi_test_catalog_views

View File

@ -3,7 +3,7 @@ test: failure_test_helpers
# this should only be run by pg_regress_multi, you don't need it
test: failure_setup
test: multi_test_helpers
test: multi_test_helpers multi_test_helpers_superuser
test: failure_parallel_connection
test: failure_replicated_partitions
test: multi_test_catalog_views

View File

@ -86,118 +86,6 @@ CREATE TYPE user_composite_type AS
user_id BIGINT
);
SELECT run_command_on_master_and_workers($f$
CREATE FUNCTION cmp_user_composite_type_function(user_composite_type, user_composite_type) RETURNS int
LANGUAGE 'internal'
AS 'btrecordcmp'
IMMUTABLE
RETURNS NULL ON NULL INPUT;
$f$);
SELECT run_command_on_master_and_workers($f$
CREATE FUNCTION gt_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
LANGUAGE 'internal'
AS 'record_gt'
IMMUTABLE
RETURNS NULL ON NULL INPUT;
$f$);
SELECT run_command_on_master_and_workers($f$
CREATE FUNCTION ge_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
LANGUAGE 'internal'
AS 'record_ge'
IMMUTABLE
RETURNS NULL ON NULL INPUT;
$f$);
SELECT run_command_on_master_and_workers($f$
CREATE FUNCTION equal_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
LANGUAGE 'internal'
AS 'record_eq'
IMMUTABLE;
$f$);
SELECT run_command_on_master_and_workers($f$
CREATE FUNCTION lt_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
LANGUAGE 'internal'
AS 'record_lt'
IMMUTABLE
RETURNS NULL ON NULL INPUT;
$f$);
SELECT run_command_on_master_and_workers($f$
CREATE FUNCTION le_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
LANGUAGE 'internal'
AS 'record_lt'
IMMUTABLE
RETURNS NULL ON NULL INPUT;
$f$);
SELECT run_command_on_master_and_workers($f$
CREATE OPERATOR > (
LEFTARG = user_composite_type,
RIGHTARG = user_composite_type,
PROCEDURE = gt_user_composite_type_function
);
$f$);
SELECT run_command_on_master_and_workers($f$
CREATE OPERATOR >= (
LEFTARG = user_composite_type,
RIGHTARG = user_composite_type,
PROCEDURE = ge_user_composite_type_function
);
$f$);
-- ... use that function to create a custom equality operator...
SELECT run_command_on_master_and_workers($f$
-- ... use that function to create a custom equality operator...
CREATE OPERATOR = (
LEFTARG = user_composite_type,
RIGHTARG = user_composite_type,
PROCEDURE = equal_user_composite_type_function,
commutator = =,
RESTRICT = eqsel,
JOIN = eqjoinsel,
merges,
hashes
);
$f$);
SELECT run_command_on_master_and_workers($f$
CREATE OPERATOR <= (
LEFTARG = user_composite_type,
RIGHTARG = user_composite_type,
PROCEDURE = le_user_composite_type_function
);
$f$);
SELECT run_command_on_master_and_workers($f$
CREATE OPERATOR < (
LEFTARG = user_composite_type,
RIGHTARG = user_composite_type,
PROCEDURE = lt_user_composite_type_function
);
$f$);
-- ... and create a custom operator family for hash indexes...
SELECT run_command_on_master_and_workers($f$
CREATE OPERATOR FAMILY cats_2_op_fam USING hash;
$f$);
-- ... create a test HASH function. Though it is a poor hash function,
-- it is acceptable for our tests
@ -210,184 +98,7 @@ SELECT run_command_on_master_and_workers($f$
RETURNS NULL ON NULL INPUT;
$f$);
-- We need to define two different operator classes for the composite types
-- One uses BTREE the other uses HASH
SELECT run_command_on_master_and_workers($f$
CREATE OPERATOR CLASS cats_2_op_fam_clas3
DEFAULT FOR TYPE user_composite_type USING BTREE AS
OPERATOR 1 <= (user_composite_type, user_composite_type),
OPERATOR 2 < (user_composite_type, user_composite_type),
OPERATOR 3 = (user_composite_type, user_composite_type),
OPERATOR 4 >= (user_composite_type, user_composite_type),
OPERATOR 5 > (user_composite_type, user_composite_type),
FUNCTION 1 cmp_user_composite_type_function(user_composite_type, user_composite_type);
$f$);
SELECT run_command_on_master_and_workers($f$
CREATE OPERATOR CLASS cats_2_op_fam_class
DEFAULT FOR TYPE user_composite_type USING HASH AS
OPERATOR 1 = (user_composite_type, user_composite_type),
FUNCTION 1 test_composite_type_hash(user_composite_type);
$f$);
CREATE TABLE events (
composite_id user_composite_type,
event_id bigint,
event_type character varying(255),
event_time bigint
);
SELECT master_create_distributed_table('events', 'composite_id', 'range');
SELECT master_create_empty_shard('events') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)'
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('events') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)'
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('events') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)'
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('events') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)'
WHERE shardid = :new_shard_id;
\COPY events FROM STDIN WITH CSV
"(1,1001)",20001,click,1472807012
"(1,1001)",20002,submit,1472807015
"(1,1001)",20003,pay,1472807020
"(1,1002)",20010,click,1472807022
"(1,1002)",20011,click,1472807023
"(1,1002)",20012,submit,1472807025
"(1,1002)",20013,pay,1472807030
"(1,1003)",20014,click,1472807032
"(1,1003)",20015,click,1472807033
"(1,1003)",20016,click,1472807034
"(1,1003)",20017,submit,1472807035
\.
CREATE TABLE users (
composite_id user_composite_type,
lastseen bigint
);
SELECT master_create_distributed_table('users', 'composite_id', 'range');
SELECT master_create_empty_shard('users') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)'
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('users') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)'
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('users') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)'
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('users') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)'
WHERE shardid = :new_shard_id;
\COPY users FROM STDIN WITH CSV
"(1,1001)",1472807115
"(1,1002)",1472807215
"(1,1003)",1472807315
\.
-- Create tables for subquery tests
CREATE TABLE lineitem_subquery (
l_orderkey bigint not null,
l_partkey integer not null,
l_suppkey integer not null,
l_linenumber integer not null,
l_quantity decimal(15, 2) not null,
l_extendedprice decimal(15, 2) not null,
l_discount decimal(15, 2) not null,
l_tax decimal(15, 2) not null,
l_returnflag char(1) not null,
l_linestatus char(1) not null,
l_shipdate date not null,
l_commitdate date not null,
l_receiptdate date not null,
l_shipinstruct char(25) not null,
l_shipmode char(10) not null,
l_comment varchar(44) not null,
PRIMARY KEY(l_orderkey, l_linenumber) );
SELECT master_create_distributed_table('lineitem_subquery', 'l_orderkey', 'range');
CREATE TABLE orders_subquery (
o_orderkey bigint not null,
o_custkey integer not null,
o_orderstatus char(1) not null,
o_totalprice decimal(15,2) not null,
o_orderdate date not null,
o_orderpriority char(15) not null,
o_clerk char(15) not null,
o_shippriority integer not null,
o_comment varchar(79) not null,
PRIMARY KEY(o_orderkey) );
SELECT master_create_distributed_table('orders_subquery', 'o_orderkey', 'range');
SET citus.enable_router_execution TO 'false';
-- Check that we don't crash if there are not any shards.
SELECT
avg(unit_price)
FROM
(SELECT
l_orderkey,
avg(o_totalprice) AS unit_price
FROM
lineitem_subquery,
orders_subquery
WHERE
l_orderkey = o_orderkey
GROUP BY
l_orderkey) AS unit_prices;
-- Load data into tables.
SELECT master_create_empty_shard('lineitem_subquery') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 5986
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('lineitem_subquery') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = 8997, shardmaxvalue = 14947
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('orders_subquery') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 5986
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('orders_subquery') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = 8997, shardmaxvalue = 14947
WHERE shardid = :new_shard_id;
SET citus.shard_max_size TO "1MB";
\copy lineitem_subquery FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|'
\copy lineitem_subquery FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|'
\copy orders_subquery FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|'
\copy orders_subquery FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|'
SET citus.next_shard_id TO 1400297;
CREATE TABLE events_reference_table (like events_table including all);
SELECT create_reference_table('events_reference_table');

View File

@ -0,0 +1,291 @@
SET citus.next_shard_id TO 1400285;
SELECT run_command_on_master_and_workers($f$
CREATE FUNCTION cmp_user_composite_type_function(user_composite_type, user_composite_type) RETURNS int
LANGUAGE 'internal'
AS 'btrecordcmp'
IMMUTABLE
RETURNS NULL ON NULL INPUT;
$f$);
SELECT run_command_on_master_and_workers($f$
CREATE FUNCTION gt_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
LANGUAGE 'internal'
AS 'record_gt'
IMMUTABLE
RETURNS NULL ON NULL INPUT;
$f$);
SELECT run_command_on_master_and_workers($f$
CREATE FUNCTION ge_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
LANGUAGE 'internal'
AS 'record_ge'
IMMUTABLE
RETURNS NULL ON NULL INPUT;
$f$);
SELECT run_command_on_master_and_workers($f$
CREATE FUNCTION equal_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
LANGUAGE 'internal'
AS 'record_eq'
IMMUTABLE;
$f$);
SELECT run_command_on_master_and_workers($f$
CREATE FUNCTION lt_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
LANGUAGE 'internal'
AS 'record_lt'
IMMUTABLE
RETURNS NULL ON NULL INPUT;
$f$);
SELECT run_command_on_master_and_workers($f$
CREATE FUNCTION le_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
LANGUAGE 'internal'
AS 'record_lt'
IMMUTABLE
RETURNS NULL ON NULL INPUT;
$f$);
SELECT run_command_on_master_and_workers($f$
CREATE OPERATOR > (
LEFTARG = user_composite_type,
RIGHTARG = user_composite_type,
PROCEDURE = gt_user_composite_type_function
);
$f$);
SELECT run_command_on_master_and_workers($f$
CREATE OPERATOR >= (
LEFTARG = user_composite_type,
RIGHTARG = user_composite_type,
PROCEDURE = ge_user_composite_type_function
);
$f$);
-- ... use that function to create a custom equality operator...
SELECT run_command_on_master_and_workers($f$
-- ... use that function to create a custom equality operator...
CREATE OPERATOR = (
LEFTARG = user_composite_type,
RIGHTARG = user_composite_type,
PROCEDURE = equal_user_composite_type_function,
commutator = =,
RESTRICT = eqsel,
JOIN = eqjoinsel,
merges,
hashes
);
$f$);
SELECT run_command_on_master_and_workers($f$
CREATE OPERATOR <= (
LEFTARG = user_composite_type,
RIGHTARG = user_composite_type,
PROCEDURE = le_user_composite_type_function
);
$f$);
SELECT run_command_on_master_and_workers($f$
CREATE OPERATOR < (
LEFTARG = user_composite_type,
RIGHTARG = user_composite_type,
PROCEDURE = lt_user_composite_type_function
);
$f$);
-- ... and create a custom operator family for hash indexes...
SELECT run_command_on_master_and_workers($f$
CREATE OPERATOR FAMILY cats_2_op_fam USING hash;
$f$);
-- We need to define two different operator classes for the composite types
-- One uses BTREE the other uses HASH
SELECT run_command_on_master_and_workers($f$
CREATE OPERATOR CLASS cats_2_op_fam_clas3
DEFAULT FOR TYPE user_composite_type USING BTREE AS
OPERATOR 1 <= (user_composite_type, user_composite_type),
OPERATOR 2 < (user_composite_type, user_composite_type),
OPERATOR 3 = (user_composite_type, user_composite_type),
OPERATOR 4 >= (user_composite_type, user_composite_type),
OPERATOR 5 > (user_composite_type, user_composite_type),
FUNCTION 1 cmp_user_composite_type_function(user_composite_type, user_composite_type);
$f$);
SELECT run_command_on_master_and_workers($f$
CREATE OPERATOR CLASS cats_2_op_fam_class
DEFAULT FOR TYPE user_composite_type USING HASH AS
OPERATOR 1 = (user_composite_type, user_composite_type),
FUNCTION 1 test_composite_type_hash(user_composite_type);
$f$);
CREATE TABLE events (
composite_id user_composite_type,
event_id bigint,
event_type character varying(255),
event_time bigint
);
SELECT master_create_distributed_table('events', 'composite_id', 'range');
SELECT master_create_empty_shard('events') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)'
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('events') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)'
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('events') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)'
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('events') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)'
WHERE shardid = :new_shard_id;
\COPY events FROM STDIN WITH CSV
"(1,1001)",20001,click,1472807012
"(1,1001)",20002,submit,1472807015
"(1,1001)",20003,pay,1472807020
"(1,1002)",20010,click,1472807022
"(1,1002)",20011,click,1472807023
"(1,1002)",20012,submit,1472807025
"(1,1002)",20013,pay,1472807030
"(1,1003)",20014,click,1472807032
"(1,1003)",20015,click,1472807033
"(1,1003)",20016,click,1472807034
"(1,1003)",20017,submit,1472807035
\.
CREATE TABLE users (
composite_id user_composite_type,
lastseen bigint
);
SELECT master_create_distributed_table('users', 'composite_id', 'range');
SELECT master_create_empty_shard('users') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)'
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('users') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)'
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('users') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)'
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('users') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)'
WHERE shardid = :new_shard_id;
\COPY users FROM STDIN WITH CSV
"(1,1001)",1472807115
"(1,1002)",1472807215
"(1,1003)",1472807315
\.
-- Create tables for subquery tests
CREATE TABLE lineitem_subquery (
l_orderkey bigint not null,
l_partkey integer not null,
l_suppkey integer not null,
l_linenumber integer not null,
l_quantity decimal(15, 2) not null,
l_extendedprice decimal(15, 2) not null,
l_discount decimal(15, 2) not null,
l_tax decimal(15, 2) not null,
l_returnflag char(1) not null,
l_linestatus char(1) not null,
l_shipdate date not null,
l_commitdate date not null,
l_receiptdate date not null,
l_shipinstruct char(25) not null,
l_shipmode char(10) not null,
l_comment varchar(44) not null,
PRIMARY KEY(l_orderkey, l_linenumber) );
SELECT master_create_distributed_table('lineitem_subquery', 'l_orderkey', 'range');
CREATE TABLE orders_subquery (
o_orderkey bigint not null,
o_custkey integer not null,
o_orderstatus char(1) not null,
o_totalprice decimal(15,2) not null,
o_orderdate date not null,
o_orderpriority char(15) not null,
o_clerk char(15) not null,
o_shippriority integer not null,
o_comment varchar(79) not null,
PRIMARY KEY(o_orderkey) );
SELECT master_create_distributed_table('orders_subquery', 'o_orderkey', 'range');
SET citus.enable_router_execution TO 'false';
-- Check that we don't crash if there are not any shards.
SELECT
avg(unit_price)
FROM
(SELECT
l_orderkey,
avg(o_totalprice) AS unit_price
FROM
lineitem_subquery,
orders_subquery
WHERE
l_orderkey = o_orderkey
GROUP BY
l_orderkey) AS unit_prices;
-- Load data into tables.
SELECT master_create_empty_shard('lineitem_subquery') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 5986
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('lineitem_subquery') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = 8997, shardmaxvalue = 14947
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('orders_subquery') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 5986
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('orders_subquery') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = 8997, shardmaxvalue = 14947
WHERE shardid = :new_shard_id;
SET citus.shard_max_size TO "1MB";
\copy lineitem_subquery FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|'
\copy lineitem_subquery FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|'
\copy orders_subquery FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|'
\copy orders_subquery FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|'

View File

@ -24,7 +24,3 @@ SET citus.next_shard_id TO 290000;
\copy part_append FROM '@abs_srcdir@/data/part.data' with delimiter '|'
\copy supplier FROM '@abs_srcdir@/data/supplier.data' with delimiter '|'
\copy supplier_single_shard FROM '@abs_srcdir@/data/supplier.data' with delimiter '|'
\copy lineitem_hash_part FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|'
\copy lineitem_hash_part FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|'
\copy orders_hash_part FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|'
\copy orders_hash_part FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|'

View File

@ -0,0 +1,4 @@
\copy lineitem_hash_part FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|'
\copy lineitem_hash_part FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|'
\copy orders_hash_part FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|'
\copy orders_hash_part FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|'

View File

@ -1,3 +1,3 @@
test: multi_cluster_management
test: multi_test_helpers
test: multi_test_helpers multi_test_helpers_superuser
test: multi_test_catalog_views

View File

@ -14,7 +14,7 @@
# Tests around schema changes, these are run first, so there's no preexisting objects.
# ---
test: multi_extension
test: multi_test_helpers
test: multi_test_helpers multi_test_helpers_superuser
test: multi_mx_node_metadata
test: multi_cluster_management
test: multi_mx_function_table_reference

View File

@ -25,7 +25,7 @@ test: multi_cluster_management
test: alter_role_propagation
test: propagate_extension_commands
test: escape_extension_name
test: multi_test_helpers
test: multi_test_helpers multi_test_helpers_superuser
test: multi_test_catalog_views
test: multi_table_ddl
test: multi_name_lengths
@ -40,8 +40,9 @@ test: multi_read_from_secondaries
# uploading data to it.
# ----------
test: multi_create_table
test: multi_create_table_constraints multi_master_protocol multi_load_data multi_behavioral_analytics_create_table
test: multi_behavioral_analytics_basics multi_behavioral_analytics_single_shard_queries multi_insert_select_non_pushable_queries multi_insert_select
test: multi_create_table_superuser
test: multi_create_table_constraints multi_master_protocol multi_load_data multi_load_data_superuser multi_behavioral_analytics_create_table
test: multi_behavioral_analytics_basics multi_behavioral_analytics_single_shard_queries multi_insert_select_non_pushable_queries multi_insert_select multi_behavioral_analytics_create_table_superuser
test: multi_shard_update_delete recursive_dml_with_different_planners_executors
test: insert_select_repartition window_functions dml_recursive multi_insert_select_window
test: multi_insert_select_conflict

View File

@ -0,0 +1,262 @@
# ----------
# $Id$
#
# Regression tests that exercise distributed planning/execution functionality.
#
# All new regression tests are expected to be run by this schedule. Tests that
# do not set specific task executor type should also be added to
# multi_task_tracker_extra_schedule.
#
# Note that we use variant comparison files to test version dependent regression
# test results. For more information:
# http://www.postgresql.org/docs/current/static/regress-variant.html
# ----------
# ---
# Tests around schema changes, these are run first, so there's no preexisting objects.
#
# propagate_extension_commands lies just after multi_cluster_management as we do
# remove / add node operations, we do not want any preexisting objects before
# propagate_extension_commands
# ---
test: multi_test_helpers
test: multi_test_catalog_views
test: multi_name_resolution
# ----------
# The following distributed tests depend on creating a partitioned table and
# uploading data to it.
# ----------
test: multi_create_table
test: multi_master_protocol multi_load_data multi_behavioral_analytics_create_table
test: recursive_dml_with_different_planners_executors
test: window_functions multi_insert_select_window
# following should not run in parallel because it relies on connection counts to workers
test: insert_select_connection_leak
# ---------
# at the end of the regression tests regaring recursively planned modifications
# ensure that we don't leak any intermediate results
# This test should not run in parallel with any other tests
# ---------
# ----------
# Tests for partitioning support
# ----------
# ----------
# Tests for recursive subquery planning
# ----------
test: subquery_basics subquery_local_tables subquery_executors set_operations set_operation_and_local_tables
test: subquery_partitioning subquery_complex_target_list subqueries_not_supported
test: non_colocated_join_order
test: subquery_prepared_statements pg12 cte_inline
# ----------
# Miscellaneous tests to check our query planning behavior
# ----------
test: multi_distributed_transaction_id
test: hyperscale_tutorial
test: multi_basic_queries multi_complex_expressions multi_subquery_complex_queries multi_subquery_behavioral_analytics
test: multi_subquery_complex_reference_clause multi_subquery_window_functions multi_sql_function
test: multi_function_in_join row_types
test: multi_subquery_in_where_reference_clause full_join adaptive_executor propagate_set_commands
test: multi_subquery_union multi_subquery_in_where_clause multi_subquery_misc
test: multi_limit_clause_approximate multi_single_relation_subquery
test: multi_select_for_update
test: multi_average_expression multi_working_columns multi_having_pushdown
test: multi_array_agg
test: multi_jsonb_agg multi_jsonb_object_agg multi_json_agg multi_json_object_agg bool_agg ch_bench_having ch_bench_subquery_repartition chbenchmark_all_queries expression_reference_join
test: multi_agg_type_conversion multi_count_type_conversion
test: multi_cross_shard
test: multi_dropped_column_aliases foreign_key_restriction_enforcement
test: multi_binary_master_copy_format
# ----------
# Parallel TPC-H tests to check our distributed execution behavior
# ----------
test: multi_tpch_query1 multi_tpch_query3 multi_tpch_query6 multi_tpch_query10
test: multi_tpch_query12 multi_tpch_query14 multi_tpch_query19
test: multi_tpch_query7 multi_tpch_query7_nested
# ----------
# Parallel tests to check our join order planning logic. Note that we load data
# below; and therefore these tests should come after the execution tests.
# ----------
test: multi_join_order_tpch_small multi_join_order_additional
test: multi_join_order_tpch_repartition
# ----------
# Tests for repartition join planning and execution. Be careful when creating
# new shards before these tests, as they expect specific shard identifiers in
# the output.
# ----------
test: multi_repartition_join_ref
test: adaptive_executor_repartition
# ---------
# Tests that modify data should run sequentially
# ---------
test: with_prepare
# ---------
# Tests for recursive planning.
# ---------
test: with_nested with_where
test: cte_prepared_modify cte_nested_modification
test: with_executors with_partitioning with_dml
# ----------
# Tests to check our large record loading and shard deletion behavior
# ----------
test: multi_load_large_records
test: multi_master_delete_protocol
test: multi_shard_modify
# ----------
# Tests around DDL statements run on distributed tables
# ----------
# ----------
# multi_create_schema tests creation, loading, and querying of a table in a new
# schema (namespace).
# ----------
test: multi_create_schema
# ----------
# Tests to check if we inform the user about potential caveats of creating new
# databases, schemas, roles, and authentication information.
# ----------
# ----------
# Tests to check the sequential and parallel executions of DDL and modification
# commands
# Should not be executed in parallel with other tests
# ----------
# ---------
# loads data to create shards in a way that forces
# shard caching.
# ---------
# ---------
# multi_outer_join loads data to create shards to test outer join mappings
# ---------
test: multi_outer_join
# ---
# Tests covering mostly modification queries and required preliminary
# functionality related to metadata, shard creation, shard pruning and
# "hacky" copy script for hash partitioned tables.
# Note that the order of the following tests are important. multi_complex_count_distinct
# is independent from the rest of the group, it is added to increase parallelism.
# ---
test: multi_complex_count_distinct
test: multi_upsert multi_simple_queries
test: foreign_key_to_reference_table validate_constraint
# ---------
# creates hash and range-partitioned tables and performs COPY
# creates hash partitioned tables.
# ---------
test: fast_path_router_modify
test: null_parameters
# ----------
# loads more lineitem data using high shard identifiers
# ----------
# ----------
# tests various size commands on distributed tables
# ----------
# ----------
# multi_drop_extension makes sure we can safely drop and recreate the extension
# ----------
# ----------
# tests the propagation of mx-related metadata changes to metadata workers
# multi_unsupported_worker_operations tests that unsupported operations error out on metadata workers
# ----------
# ----------
# tests if the GRANT ... ON SCHEMA queries are propagated correctly
# makes sure we can work with tables in schemas other than public with no problem
# ----------
# ----------
# multi_function_evaluation tests edge-cases in master-side function pre-evaluation
# ----------
test: multi_function_evaluation
# ----------
# tests truncate functionality for distributed tables
# ----------
# ----------
# tests utility functions written for co-location feature & internal API
# tests master_copy_shard_placement with colocated tables.
# ----------
# ----------
# tests utility functions written for citus tools
# ----------
# ----------
# multi_foreign_key tests foreign key push down on distributed tables
# ----------
test: multi_foreign_key
# ----------
# tests for upgrade_reference_table UDF
# tests replicating reference tables to new nodes after we add new nodes
# tests metadata changes after master_remove_node
# ----------
# --------
# Replicating reference tables to coordinator. Add coordinator to pg_dist_node
# and rerun some of the tests.
# --------
test: foreign_key_to_reference_table
# ----------
# tests for dropping shards using connection API
# ----------
# ----------
# tests simple combinations of permission access and queries
# ----------
# ---------
# tests for an obscure crash citus used to exhibit when shardids
# changed the table they belonged to during a session
# --------
# ---------
# multi_task_string_size tests task string size checks
# ---------
test: multi_task_string_size
# ---------
# connection encryption tests
# ---------
# ---------
# object distribution tests
# ---------
test: distributed_types_xact_add_enum_value
# ---------
# deparsing logic tests
# ---------
# ---------
# test that no tests leaked intermediate results. This should always be last
# Causes random test failures so commented out for now
# ---------
# test:

View File

@ -19,7 +19,7 @@
# remove / add node operations, we do not want any preexisting objects before
# propagate_extension_commands
# ---
test: multi_test_helpers
test: multi_test_helpers multi_test_helpers_superuser
test: multi_test_catalog_views
test: multi_name_lengths
test: multi_name_resolution
@ -31,8 +31,9 @@ test: multi_metadata_access
# uploading data to it.
# ----------
test: multi_create_table
test: multi_create_table_constraints multi_master_protocol multi_load_data multi_behavioral_analytics_create_table
test: recursive_dml_with_different_planners_executors
test: multi_create_table_superuser
test: multi_create_table_constraints multi_master_protocol multi_load_data multi_load_data_superuser multi_behavioral_analytics_create_table
test: recursive_dml_with_different_planners_executors multi_behavioral_analytics_create_table_superuser
test: insert_select_repartition dml_recursive multi_insert_select_window
test: multi_insert_select_conflict

View File

@ -16,7 +16,7 @@
test: multi_extension
test: multi_cluster_management
test: multi_table_ddl
test: multi_test_helpers
test: multi_test_helpers multi_test_helpers_superuser
test: multi_test_catalog_views
# ----------
@ -24,8 +24,9 @@ test: multi_test_catalog_views
# uploading data to it.
# ----------
test: multi_create_table
test: multi_create_table_superuser
test: multi_master_protocol
test: multi_load_data
test: multi_load_data multi_load_data_superuser
# ----------
# Miscellaneous tests to check our query planning behavior

View File

@ -2,7 +2,7 @@
# Only run few basic tests to set up a testing environment
# ----------
test: multi_cluster_management
test: multi_test_helpers
test: multi_test_helpers multi_test_helpers_superuser
test: multi_test_catalog_views
# the following test has to be run sequentially

View File

@ -125,165 +125,6 @@ CREATE TYPE user_composite_type AS
tenant_id BIGINT,
user_id BIGINT
);
SELECT run_command_on_master_and_workers($f$
CREATE FUNCTION cmp_user_composite_type_function(user_composite_type, user_composite_type) RETURNS int
LANGUAGE 'internal'
AS 'btrecordcmp'
IMMUTABLE
RETURNS NULL ON NULL INPUT;
$f$);
run_command_on_master_and_workers
-----------------------------------
(1 row)
SELECT run_command_on_master_and_workers($f$
CREATE FUNCTION gt_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
LANGUAGE 'internal'
AS 'record_gt'
IMMUTABLE
RETURNS NULL ON NULL INPUT;
$f$);
run_command_on_master_and_workers
-----------------------------------
(1 row)
SELECT run_command_on_master_and_workers($f$
CREATE FUNCTION ge_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
LANGUAGE 'internal'
AS 'record_ge'
IMMUTABLE
RETURNS NULL ON NULL INPUT;
$f$);
run_command_on_master_and_workers
-----------------------------------
(1 row)
SELECT run_command_on_master_and_workers($f$
CREATE FUNCTION equal_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
LANGUAGE 'internal'
AS 'record_eq'
IMMUTABLE;
$f$);
run_command_on_master_and_workers
-----------------------------------
(1 row)
SELECT run_command_on_master_and_workers($f$
CREATE FUNCTION lt_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
LANGUAGE 'internal'
AS 'record_lt'
IMMUTABLE
RETURNS NULL ON NULL INPUT;
$f$);
run_command_on_master_and_workers
-----------------------------------
(1 row)
SELECT run_command_on_master_and_workers($f$
CREATE FUNCTION le_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
LANGUAGE 'internal'
AS 'record_lt'
IMMUTABLE
RETURNS NULL ON NULL INPUT;
$f$);
run_command_on_master_and_workers
-----------------------------------
(1 row)
SELECT run_command_on_master_and_workers($f$
CREATE OPERATOR > (
LEFTARG = user_composite_type,
RIGHTARG = user_composite_type,
PROCEDURE = gt_user_composite_type_function
);
$f$);
run_command_on_master_and_workers
-----------------------------------
(1 row)
SELECT run_command_on_master_and_workers($f$
CREATE OPERATOR >= (
LEFTARG = user_composite_type,
RIGHTARG = user_composite_type,
PROCEDURE = ge_user_composite_type_function
);
$f$);
run_command_on_master_and_workers
-----------------------------------
(1 row)
-- ... use that function to create a custom equality operator...
SELECT run_command_on_master_and_workers($f$
-- ... use that function to create a custom equality operator...
CREATE OPERATOR = (
LEFTARG = user_composite_type,
RIGHTARG = user_composite_type,
PROCEDURE = equal_user_composite_type_function,
commutator = =,
RESTRICT = eqsel,
JOIN = eqjoinsel,
merges,
hashes
);
$f$);
run_command_on_master_and_workers
-----------------------------------
(1 row)
SELECT run_command_on_master_and_workers($f$
CREATE OPERATOR <= (
LEFTARG = user_composite_type,
RIGHTARG = user_composite_type,
PROCEDURE = le_user_composite_type_function
);
$f$);
run_command_on_master_and_workers
-----------------------------------
(1 row)
SELECT run_command_on_master_and_workers($f$
CREATE OPERATOR < (
LEFTARG = user_composite_type,
RIGHTARG = user_composite_type,
PROCEDURE = lt_user_composite_type_function
);
$f$);
run_command_on_master_and_workers
-----------------------------------
(1 row)
-- ... and create a custom operator family for hash indexes...
SELECT run_command_on_master_and_workers($f$
CREATE OPERATOR FAMILY cats_2_op_fam USING hash;
$f$);
run_command_on_master_and_workers
-----------------------------------
(1 row)
-- ... create a test HASH function. Though it is a poor hash function,
-- it is acceptable for our tests
SELECT run_command_on_master_and_workers($f$
@ -299,177 +140,7 @@ $f$);
(1 row)
-- We need to define two different operator classes for the composite types
-- One uses BTREE the other uses HASH
SELECT run_command_on_master_and_workers($f$
CREATE OPERATOR CLASS cats_2_op_fam_clas3
DEFAULT FOR TYPE user_composite_type USING BTREE AS
OPERATOR 1 <= (user_composite_type, user_composite_type),
OPERATOR 2 < (user_composite_type, user_composite_type),
OPERATOR 3 = (user_composite_type, user_composite_type),
OPERATOR 4 >= (user_composite_type, user_composite_type),
OPERATOR 5 > (user_composite_type, user_composite_type),
FUNCTION 1 cmp_user_composite_type_function(user_composite_type, user_composite_type);
$f$);
run_command_on_master_and_workers
-----------------------------------
(1 row)
SELECT run_command_on_master_and_workers($f$
CREATE OPERATOR CLASS cats_2_op_fam_class
DEFAULT FOR TYPE user_composite_type USING HASH AS
OPERATOR 1 = (user_composite_type, user_composite_type),
FUNCTION 1 test_composite_type_hash(user_composite_type);
$f$);
run_command_on_master_and_workers
-----------------------------------
(1 row)
CREATE TABLE events (
composite_id user_composite_type,
event_id bigint,
event_type character varying(255),
event_time bigint
);
SELECT master_create_distributed_table('events', 'composite_id', 'range');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_empty_shard('events') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)'
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('events') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)'
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('events') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)'
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('events') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)'
WHERE shardid = :new_shard_id;
\COPY events FROM STDIN WITH CSV
CREATE TABLE users (
composite_id user_composite_type,
lastseen bigint
);
SELECT master_create_distributed_table('users', 'composite_id', 'range');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_empty_shard('users') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)'
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('users') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)'
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('users') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)'
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('users') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)'
WHERE shardid = :new_shard_id;
\COPY users FROM STDIN WITH CSV
-- Create tables for subquery tests
CREATE TABLE lineitem_subquery (
l_orderkey bigint not null,
l_partkey integer not null,
l_suppkey integer not null,
l_linenumber integer not null,
l_quantity decimal(15, 2) not null,
l_extendedprice decimal(15, 2) not null,
l_discount decimal(15, 2) not null,
l_tax decimal(15, 2) not null,
l_returnflag char(1) not null,
l_linestatus char(1) not null,
l_shipdate date not null,
l_commitdate date not null,
l_receiptdate date not null,
l_shipinstruct char(25) not null,
l_shipmode char(10) not null,
l_comment varchar(44) not null,
PRIMARY KEY(l_orderkey, l_linenumber) );
SELECT master_create_distributed_table('lineitem_subquery', 'l_orderkey', 'range');
master_create_distributed_table
---------------------------------
(1 row)
CREATE TABLE orders_subquery (
o_orderkey bigint not null,
o_custkey integer not null,
o_orderstatus char(1) not null,
o_totalprice decimal(15,2) not null,
o_orderdate date not null,
o_orderpriority char(15) not null,
o_clerk char(15) not null,
o_shippriority integer not null,
o_comment varchar(79) not null,
PRIMARY KEY(o_orderkey) );
SELECT master_create_distributed_table('orders_subquery', 'o_orderkey', 'range');
master_create_distributed_table
---------------------------------
(1 row)
SET citus.enable_router_execution TO 'false';
-- Check that we don't crash if there are not any shards.
SELECT
avg(unit_price)
FROM
(SELECT
l_orderkey,
avg(o_totalprice) AS unit_price
FROM
lineitem_subquery,
orders_subquery
WHERE
l_orderkey = o_orderkey
GROUP BY
l_orderkey) AS unit_prices;
avg
-----
(1 row)
-- Load data into tables.
SELECT master_create_empty_shard('lineitem_subquery') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 5986
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('lineitem_subquery') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = 8997, shardmaxvalue = 14947
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('orders_subquery') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 5986
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('orders_subquery') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = 8997, shardmaxvalue = 14947
WHERE shardid = :new_shard_id;
SET citus.shard_max_size TO "1MB";
\copy lineitem_subquery FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|'
\copy lineitem_subquery FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|'
\copy orders_subquery FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|'
\copy orders_subquery FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|'
SET citus.next_shard_id TO 1400297;
CREATE TABLE events_reference_table (like events_table including all);
SELECT create_reference_table('events_reference_table');
create_reference_table

View File

@ -0,0 +1,331 @@
SET citus.next_shard_id TO 1400285;
SELECT run_command_on_master_and_workers($f$
CREATE FUNCTION cmp_user_composite_type_function(user_composite_type, user_composite_type) RETURNS int
LANGUAGE 'internal'
AS 'btrecordcmp'
IMMUTABLE
RETURNS NULL ON NULL INPUT;
$f$);
run_command_on_master_and_workers
-----------------------------------
(1 row)
SELECT run_command_on_master_and_workers($f$
CREATE FUNCTION gt_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
LANGUAGE 'internal'
AS 'record_gt'
IMMUTABLE
RETURNS NULL ON NULL INPUT;
$f$);
run_command_on_master_and_workers
-----------------------------------
(1 row)
SELECT run_command_on_master_and_workers($f$
CREATE FUNCTION ge_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
LANGUAGE 'internal'
AS 'record_ge'
IMMUTABLE
RETURNS NULL ON NULL INPUT;
$f$);
run_command_on_master_and_workers
-----------------------------------
(1 row)
SELECT run_command_on_master_and_workers($f$
CREATE FUNCTION equal_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
LANGUAGE 'internal'
AS 'record_eq'
IMMUTABLE;
$f$);
run_command_on_master_and_workers
-----------------------------------
(1 row)
SELECT run_command_on_master_and_workers($f$
CREATE FUNCTION lt_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
LANGUAGE 'internal'
AS 'record_lt'
IMMUTABLE
RETURNS NULL ON NULL INPUT;
$f$);
run_command_on_master_and_workers
-----------------------------------
(1 row)
SELECT run_command_on_master_and_workers($f$
CREATE FUNCTION le_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
LANGUAGE 'internal'
AS 'record_lt'
IMMUTABLE
RETURNS NULL ON NULL INPUT;
$f$);
run_command_on_master_and_workers
-----------------------------------
(1 row)
SELECT run_command_on_master_and_workers($f$
CREATE OPERATOR > (
LEFTARG = user_composite_type,
RIGHTARG = user_composite_type,
PROCEDURE = gt_user_composite_type_function
);
$f$);
run_command_on_master_and_workers
-----------------------------------
(1 row)
SELECT run_command_on_master_and_workers($f$
CREATE OPERATOR >= (
LEFTARG = user_composite_type,
RIGHTARG = user_composite_type,
PROCEDURE = ge_user_composite_type_function
);
$f$);
run_command_on_master_and_workers
-----------------------------------
(1 row)
-- ... use that function to create a custom equality operator...
SELECT run_command_on_master_and_workers($f$
-- ... use that function to create a custom equality operator...
CREATE OPERATOR = (
LEFTARG = user_composite_type,
RIGHTARG = user_composite_type,
PROCEDURE = equal_user_composite_type_function,
commutator = =,
RESTRICT = eqsel,
JOIN = eqjoinsel,
merges,
hashes
);
$f$);
run_command_on_master_and_workers
-----------------------------------
(1 row)
SELECT run_command_on_master_and_workers($f$
CREATE OPERATOR <= (
LEFTARG = user_composite_type,
RIGHTARG = user_composite_type,
PROCEDURE = le_user_composite_type_function
);
$f$);
run_command_on_master_and_workers
-----------------------------------
(1 row)
SELECT run_command_on_master_and_workers($f$
CREATE OPERATOR < (
LEFTARG = user_composite_type,
RIGHTARG = user_composite_type,
PROCEDURE = lt_user_composite_type_function
);
$f$);
run_command_on_master_and_workers
-----------------------------------
(1 row)
-- ... and create a custom operator family for hash indexes...
SELECT run_command_on_master_and_workers($f$
CREATE OPERATOR FAMILY cats_2_op_fam USING hash;
$f$);
run_command_on_master_and_workers
-----------------------------------
(1 row)
-- We need to define two different operator classes for the composite types
-- One uses BTREE the other uses HASH
SELECT run_command_on_master_and_workers($f$
CREATE OPERATOR CLASS cats_2_op_fam_clas3
DEFAULT FOR TYPE user_composite_type USING BTREE AS
OPERATOR 1 <= (user_composite_type, user_composite_type),
OPERATOR 2 < (user_composite_type, user_composite_type),
OPERATOR 3 = (user_composite_type, user_composite_type),
OPERATOR 4 >= (user_composite_type, user_composite_type),
OPERATOR 5 > (user_composite_type, user_composite_type),
FUNCTION 1 cmp_user_composite_type_function(user_composite_type, user_composite_type);
$f$);
run_command_on_master_and_workers
-----------------------------------
(1 row)
SELECT run_command_on_master_and_workers($f$
CREATE OPERATOR CLASS cats_2_op_fam_class
DEFAULT FOR TYPE user_composite_type USING HASH AS
OPERATOR 1 = (user_composite_type, user_composite_type),
FUNCTION 1 test_composite_type_hash(user_composite_type);
$f$);
run_command_on_master_and_workers
-----------------------------------
(1 row)
CREATE TABLE events (
composite_id user_composite_type,
event_id bigint,
event_type character varying(255),
event_time bigint
);
SELECT master_create_distributed_table('events', 'composite_id', 'range');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_empty_shard('events') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)'
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('events') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)'
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('events') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)'
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('events') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)'
WHERE shardid = :new_shard_id;
\COPY events FROM STDIN WITH CSV
CREATE TABLE users (
composite_id user_composite_type,
lastseen bigint
);
SELECT master_create_distributed_table('users', 'composite_id', 'range');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_empty_shard('users') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)'
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('users') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)'
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('users') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)'
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('users') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)'
WHERE shardid = :new_shard_id;
\COPY users FROM STDIN WITH CSV
-- Create tables for subquery tests
CREATE TABLE lineitem_subquery (
l_orderkey bigint not null,
l_partkey integer not null,
l_suppkey integer not null,
l_linenumber integer not null,
l_quantity decimal(15, 2) not null,
l_extendedprice decimal(15, 2) not null,
l_discount decimal(15, 2) not null,
l_tax decimal(15, 2) not null,
l_returnflag char(1) not null,
l_linestatus char(1) not null,
l_shipdate date not null,
l_commitdate date not null,
l_receiptdate date not null,
l_shipinstruct char(25) not null,
l_shipmode char(10) not null,
l_comment varchar(44) not null,
PRIMARY KEY(l_orderkey, l_linenumber) );
SELECT master_create_distributed_table('lineitem_subquery', 'l_orderkey', 'range');
master_create_distributed_table
---------------------------------
(1 row)
CREATE TABLE orders_subquery (
o_orderkey bigint not null,
o_custkey integer not null,
o_orderstatus char(1) not null,
o_totalprice decimal(15,2) not null,
o_orderdate date not null,
o_orderpriority char(15) not null,
o_clerk char(15) not null,
o_shippriority integer not null,
o_comment varchar(79) not null,
PRIMARY KEY(o_orderkey) );
SELECT master_create_distributed_table('orders_subquery', 'o_orderkey', 'range');
master_create_distributed_table
---------------------------------
(1 row)
SET citus.enable_router_execution TO 'false';
-- Check that we don't crash if there are not any shards.
SELECT
avg(unit_price)
FROM
(SELECT
l_orderkey,
avg(o_totalprice) AS unit_price
FROM
lineitem_subquery,
orders_subquery
WHERE
l_orderkey = o_orderkey
GROUP BY
l_orderkey) AS unit_prices;
avg
-----
(1 row)
-- Load data into tables.
SELECT master_create_empty_shard('lineitem_subquery') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 5986
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('lineitem_subquery') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = 8997, shardmaxvalue = 14947
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('orders_subquery') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 5986
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('orders_subquery') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = 8997, shardmaxvalue = 14947
WHERE shardid = :new_shard_id;
SET citus.shard_max_size TO "1MB";
\copy lineitem_subquery FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|'
\copy lineitem_subquery FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|'
\copy orders_subquery FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|'
\copy orders_subquery FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|'

View File

@ -19,7 +19,3 @@ SET citus.next_shard_id TO 290000;
\copy part_append FROM '@abs_srcdir@/data/part.data' with delimiter '|'
\copy supplier FROM '@abs_srcdir@/data/supplier.data' with delimiter '|'
\copy supplier_single_shard FROM '@abs_srcdir@/data/supplier.data' with delimiter '|'
\copy lineitem_hash_part FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|'
\copy lineitem_hash_part FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|'
\copy orders_hash_part FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|'
\copy orders_hash_part FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|'

View File

@ -0,0 +1,4 @@
\copy lineitem_hash_part FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|'
\copy lineitem_hash_part FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|'
\copy orders_hash_part FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|'
\copy orders_hash_part FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|'

View File

@ -2,9 +2,6 @@
-- MULTI_CREATE_TABLE
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 360000;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 100000;
-- Create new table definitions for use in testing in distributed planning and
-- execution functionality. Also create indexes to boost performance. Since we
-- need to cover both reference join and partitioned join, we have created
@ -141,94 +138,6 @@ SELECT create_distributed_table('supplier_single_shard', 's_suppkey', 'append');
CREATE TABLE mx_table_test (col1 int, col2 text);
-- Since we're superuser, we can set the replication model to 'streaming' to
-- create a one-off MX table... but if we forget to set the replication factor to one,
-- we should see an error reminding us to fix that
SET citus.replication_model TO 'streaming';
SELECT create_distributed_table('mx_table_test', 'col1');
-- ok, so now actually create the one-off MX table
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('mx_table_test', 'col1');
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regclass;
DROP TABLE mx_table_test;
-- Show that master_create_distributed_table ignores citus.replication_model GUC
CREATE TABLE s_table(a int);
SELECT master_create_distributed_table('s_table', 'a', 'hash');
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='s_table'::regclass;
-- Show that master_create_worker_shards complains when RF>1 and replication model is streaming
UPDATE pg_dist_partition SET repmodel = 's' WHERE logicalrelid='s_table'::regclass;
SELECT master_create_worker_shards('s_table', 4, 2);
DROP TABLE s_table;
RESET citus.replication_model;
-- Show that create_distributed_table with append and range distributions ignore
-- citus.replication_model GUC
SET citus.shard_replication_factor TO 2;
SET citus.replication_model TO streaming;
CREATE TABLE repmodel_test (a int);
SELECT create_distributed_table('repmodel_test', 'a', 'append');
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
DROP TABLE repmodel_test;
CREATE TABLE repmodel_test (a int);
SELECT create_distributed_table('repmodel_test', 'a', 'range');
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
DROP TABLE repmodel_test;
-- Show that master_create_distributed_table created statement replicated tables no matter
-- what citus.replication_model set to
CREATE TABLE repmodel_test (a int);
SELECT master_create_distributed_table('repmodel_test', 'a', 'hash');
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
DROP TABLE repmodel_test;
CREATE TABLE repmodel_test (a int);
SELECT master_create_distributed_table('repmodel_test', 'a', 'append');
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
DROP TABLE repmodel_test;
CREATE TABLE repmodel_test (a int);
SELECT master_create_distributed_table('repmodel_test', 'a', 'range');
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
DROP TABLE repmodel_test;
-- Check that the replication_model overwrite behavior is the same with RF=1
SET citus.shard_replication_factor TO 1;
CREATE TABLE repmodel_test (a int);
SELECT create_distributed_table('repmodel_test', 'a', 'append');
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
DROP TABLE repmodel_test;
CREATE TABLE repmodel_test (a int);
SELECT create_distributed_table('repmodel_test', 'a', 'range');
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
DROP TABLE repmodel_test;
CREATE TABLE repmodel_test (a int);
SELECT master_create_distributed_table('repmodel_test', 'a', 'hash');
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
DROP TABLE repmodel_test;
CREATE TABLE repmodel_test (a int);
SELECT master_create_distributed_table('repmodel_test', 'a', 'append');
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
DROP TABLE repmodel_test;
CREATE TABLE repmodel_test (a int);
SELECT master_create_distributed_table('repmodel_test', 'a', 'range');
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
DROP TABLE repmodel_test;
RESET citus.replication_model;
-- Test initial data loading
CREATE TABLE data_load_test (col1 int, col2 text, col3 serial);
INSERT INTO data_load_test VALUES (132, 'hello');
@ -288,348 +197,3 @@ WHERE col1 = 132;
DROP TABLE data_load_test1, data_load_test2;
END;
-- There should be no table on the worker node
\c - - :public_worker_1_host :worker_1_port
SELECT relname FROM pg_class WHERE relname LIKE 'data_load_test%';
\c - - :master_host :master_port
-- creating an index after loading data works
BEGIN;
CREATE TABLE data_load_test (col1 int, col2 text, col3 serial);
INSERT INTO data_load_test VALUES (132, 'hello');
SELECT create_distributed_table('data_load_test', 'col1');
CREATE INDEX data_load_test_idx ON data_load_test (col2);
DROP TABLE data_load_test;
END;
-- popping in and out of existence in the same transaction works
BEGIN;
CREATE TABLE data_load_test (col1 int, col2 text, col3 serial);
INSERT INTO data_load_test VALUES (132, 'hello');
SELECT create_distributed_table('data_load_test', 'col1');
DROP TABLE data_load_test;
END;
-- but dropping after a write on the distributed table is currently disallowed
BEGIN;
CREATE TABLE data_load_test (col1 int, col2 text, col3 serial);
INSERT INTO data_load_test VALUES (132, 'hello');
SELECT create_distributed_table('data_load_test', 'col1');
INSERT INTO data_load_test VALUES (243, 'world');
DROP TABLE data_load_test;
END;
-- Test data loading after dropping a column
CREATE TABLE data_load_test (col1 int, col2 text, col3 text, "CoL4"")" int);
INSERT INTO data_load_test VALUES (132, 'hello', 'world');
INSERT INTO data_load_test VALUES (243, 'world', 'hello');
ALTER TABLE data_load_test DROP COLUMN col1;
SELECT create_distributed_table('data_load_test', 'col3');
SELECT * FROM data_load_test ORDER BY col2;
-- make sure the tuple went to the right shard
SELECT * FROM data_load_test WHERE col3 = 'world';
DROP TABLE data_load_test;
SET citus.shard_replication_factor TO default;
SET citus.shard_count to 4;
CREATE TABLE lineitem_hash_part (like lineitem);
SELECT create_distributed_table('lineitem_hash_part', 'l_orderkey');
CREATE TABLE orders_hash_part (like orders);
SELECT create_distributed_table('orders_hash_part', 'o_orderkey');
CREATE UNLOGGED TABLE unlogged_table
(
key text,
value text
);
SELECT create_distributed_table('unlogged_table', 'key');
SELECT * FROM master_get_table_ddl_events('unlogged_table');
\c - - :public_worker_1_host :worker_1_port
SELECT relpersistence FROM pg_class WHERE relname LIKE 'unlogged_table_%';
\c - - :master_host :master_port
-- Test rollback of create table
BEGIN;
CREATE TABLE rollback_table(id int, name varchar(20));
SELECT create_distributed_table('rollback_table','id');
ROLLBACK;
-- Table should not exist on the worker node
\c - - :public_worker_1_host :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = (SELECT oid FROM pg_class WHERE relname LIKE 'rollback_table%');
\c - - :master_host :master_port
-- Insert 3 rows to make sure that copy after shard creation touches the same
-- worker node twice.
BEGIN;
CREATE TABLE rollback_table(id int, name varchar(20));
INSERT INTO rollback_table VALUES(1, 'Name_1');
INSERT INTO rollback_table VALUES(2, 'Name_2');
INSERT INTO rollback_table VALUES(3, 'Name_3');
SELECT create_distributed_table('rollback_table','id');
ROLLBACK;
-- Table should not exist on the worker node
\c - - :public_worker_1_host :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = (SELECT oid FROM pg_class WHERE relname LIKE 'rollback_table%');
\c - - :master_host :master_port
BEGIN;
CREATE TABLE rollback_table(id int, name varchar(20));
SELECT create_distributed_table('rollback_table','id');
\copy rollback_table from stdin delimiter ','
1, 'name_1'
2, 'name_2'
3, 'name_3'
\.
CREATE INDEX rollback_index ON rollback_table(id);
COMMIT;
-- Check the table is created
SELECT count(*) FROM rollback_table;
DROP TABLE rollback_table;
BEGIN;
CREATE TABLE rollback_table(id int, name varchar(20));
SELECT create_distributed_table('rollback_table','id');
\copy rollback_table from stdin delimiter ','
1, 'name_1'
2, 'name_2'
3, 'name_3'
\.
ROLLBACK;
-- Table should not exist on the worker node
\c - - :public_worker_1_host :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = (SELECT oid FROM pg_class WHERE relname LIKE 'rollback_table%');
\c - - :master_host :master_port
BEGIN;
CREATE TABLE tt1(id int);
SELECT create_distributed_table('tt1','id');
CREATE TABLE tt2(id int);
SELECT create_distributed_table('tt2','id');
INSERT INTO tt1 VALUES(1);
INSERT INTO tt2 SELECT * FROM tt1 WHERE id = 1;
COMMIT;
-- Table should exist on the worker node
\c - - :public_worker_1_host :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = 'public.tt1_360069'::regclass;
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = 'public.tt2_360073'::regclass;
\c - - :master_host :master_port
DROP TABLE tt1;
DROP TABLE tt2;
-- It is known that creating a table with master_create_empty_shard is not
-- transactional, so table stay remaining on the worker node after the rollback
BEGIN;
CREATE TABLE append_tt1(id int);
SELECT create_distributed_table('append_tt1','id','append');
SELECT master_create_empty_shard('append_tt1');
ROLLBACK;
-- Table exists on the worker node.
\c - - :public_worker_1_host :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = 'public.append_tt1_360077'::regclass;
\c - - :master_host :master_port
-- There should be no table on the worker node
\c - - :public_worker_1_host :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = (SELECT oid from pg_class WHERE relname LIKE 'public.tt1%');
\c - - :master_host :master_port
-- Queries executing with router executor is allowed in the same transaction
-- with create_distributed_table
BEGIN;
CREATE TABLE tt1(id int);
INSERT INTO tt1 VALUES(1);
SELECT create_distributed_table('tt1','id');
INSERT INTO tt1 VALUES(2);
SELECT * FROM tt1 WHERE id = 1;
COMMIT;
-- Placements should be created on the worker
\c - - :public_worker_1_host :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = 'public.tt1_360078'::regclass;
\c - - :master_host :master_port
DROP TABLE tt1;
BEGIN;
CREATE TABLE tt1(id int);
SELECT create_distributed_table('tt1','id');
DROP TABLE tt1;
COMMIT;
-- There should be no table on the worker node
\c - - :public_worker_1_host :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = (SELECT oid from pg_class WHERE relname LIKE 'tt1%');
\c - - :master_host :master_port
-- Tests with create_distributed_table & DDL & DML commands
-- Test should pass since GetPlacementListConnection can provide connections
-- in this order of execution
CREATE TABLE sample_table(id int);
SELECT create_distributed_table('sample_table','id');
BEGIN;
CREATE TABLE stage_table (LIKE sample_table);
\COPY stage_table FROM stdin; -- Note that this operation is a local copy
1
2
3
4
\.
SELECT create_distributed_table('stage_table', 'id');
INSERT INTO sample_table SELECT * FROM stage_table;
DROP TABLE stage_table;
SELECT * FROM sample_table WHERE id = 3;
COMMIT;
-- Show that rows of sample_table are updated
SELECT count(*) FROM sample_table;
DROP table sample_table;
-- Test as create_distributed_table - copy - create_distributed_table - copy
-- This combination is used by tests written by some ORMs.
BEGIN;
CREATE TABLE tt1(id int);
SELECT create_distributed_table('tt1','id');
\COPY tt1 from stdin;
1
2
3
\.
CREATE TABLE tt2(like tt1);
SELECT create_distributed_table('tt2','id');
\COPY tt2 from stdin;
4
5
6
\.
INSERT INTO tt1 SELECT * FROM tt2;
SELECT * FROM tt1 WHERE id = 3;
SELECT * FROM tt2 WHERE id = 6;
END;
SELECT count(*) FROM tt1;
-- the goal of the following test is to make sure that
-- both create_reference_table and create_distributed_table
-- calls creates the schemas without leading to any deadlocks
-- first create reference table, then hash distributed table
BEGIN;
CREATE SCHEMA sc;
CREATE TABLE sc.ref(a int);
insert into sc.ref SELECT s FROM generate_series(0, 100) s;
SELECT create_reference_table('sc.ref');
CREATE TABLE sc.hash(a int);
insert into sc.hash SELECT s FROM generate_series(0, 100) s;
SELECT create_distributed_table('sc.hash', 'a');
COMMIT;
-- first create hash distributed table, then reference table
BEGIN;
CREATE SCHEMA sc2;
CREATE TABLE sc2.hash(a int);
insert into sc2.hash SELECT s FROM generate_series(0, 100) s;
SELECT create_distributed_table('sc2.hash', 'a');
CREATE TABLE sc2.ref(a int);
insert into sc2.ref SELECT s FROM generate_series(0, 100) s;
SELECT create_reference_table('sc2.ref');
COMMIT;
SET citus.shard_count TO 4;
BEGIN;
CREATE SCHEMA sc3;
CREATE TABLE sc3.alter_replica_table
(
name text,
id int PRIMARY KEY
);
ALTER TABLE sc3.alter_replica_table REPLICA IDENTITY USING INDEX alter_replica_table_pkey;
SELECT create_distributed_table('sc3.alter_replica_table', 'id');
COMMIT;
SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc3' LIMIT 1$$);
BEGIN;
CREATE SCHEMA sc4;
CREATE TABLE sc4.alter_replica_table
(
name text,
id int PRIMARY KEY
);
INSERT INTO sc4.alter_replica_table(id) SELECT generate_series(1,100);
SET search_path = 'sc4';
ALTER TABLE alter_replica_table REPLICA IDENTITY USING INDEX alter_replica_table_pkey;
SELECT create_distributed_table('alter_replica_table', 'id');
COMMIT;
SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc4' LIMIT 1$$);
SET search_path = 'public';
BEGIN;
CREATE SCHEMA sc5;
CREATE TABLE sc5.alter_replica_table
(
name text,
id int NOT NULL
);
INSERT INTO sc5.alter_replica_table(id) SELECT generate_series(1,100);
ALTER TABLE sc5.alter_replica_table REPLICA IDENTITY FULL;
SELECT create_distributed_table('sc5.alter_replica_table', 'id');
COMMIT;
SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc5' LIMIT 1$$);
BEGIN;
CREATE SCHEMA sc6;
CREATE TABLE sc6.alter_replica_table
(
name text,
id int NOT NULL
);
INSERT INTO sc6.alter_replica_table(id) SELECT generate_series(1,100);
CREATE UNIQUE INDEX unique_idx ON sc6.alter_replica_table(id);
ALTER TABLE sc6.alter_replica_table REPLICA IDENTITY USING INDEX unique_idx;
SELECT create_distributed_table('sc6.alter_replica_table', 'id');
COMMIT;
SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc6' LIMIT 1$$);
BEGIN;
CREATE TABLE alter_replica_table
(
name text,
id int NOT NULL
);
INSERT INTO alter_replica_table(id) SELECT generate_series(1,100);
CREATE UNIQUE INDEX unique_idx ON alter_replica_table(id);
ALTER TABLE alter_replica_table REPLICA IDENTITY USING INDEX unique_idx;
SELECT create_distributed_table('alter_replica_table', 'id');
COMMIT;
SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='public' LIMIT 1$$);
DROP TABLE tt1;
DROP TABLE tt2;
DROP TABLE alter_replica_table;
DROP SCHEMA sc CASCADE;
DROP SCHEMA sc2 CASCADE;
DROP SCHEMA sc3 CASCADE;
DROP SCHEMA sc4 CASCADE;
DROP SCHEMA sc5 CASCADE;
DROP SCHEMA sc6 CASCADE;

View File

@ -0,0 +1,436 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 360000;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 100000;
-- Since we're superuser, we can set the replication model to 'streaming' to
-- create a one-off MX table... but if we forget to set the replication factor to one,
-- we should see an error reminding us to fix that
SET citus.replication_model TO 'streaming';
SELECT create_distributed_table('mx_table_test', 'col1');
-- ok, so now actually create the one-off MX table
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('mx_table_test', 'col1');
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regclass;
DROP TABLE mx_table_test;
-- Show that master_create_distributed_table ignores citus.replication_model GUC
CREATE TABLE s_table(a int);
SELECT master_create_distributed_table('s_table', 'a', 'hash');
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='s_table'::regclass;
-- Show that master_create_worker_shards complains when RF>1 and replication model is streaming
UPDATE pg_dist_partition SET repmodel = 's' WHERE logicalrelid='s_table'::regclass;
SELECT master_create_worker_shards('s_table', 4, 2);
DROP TABLE s_table;
RESET citus.replication_model;
-- Show that create_distributed_table with append and range distributions ignore
-- citus.replication_model GUC
SET citus.shard_replication_factor TO 2;
SET citus.replication_model TO streaming;
CREATE TABLE repmodel_test (a int);
SELECT create_distributed_table('repmodel_test', 'a', 'append');
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
DROP TABLE repmodel_test;
CREATE TABLE repmodel_test (a int);
SELECT create_distributed_table('repmodel_test', 'a', 'range');
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
DROP TABLE repmodel_test;
-- Show that master_create_distributed_table created statement replicated tables no matter
-- what citus.replication_model set to
CREATE TABLE repmodel_test (a int);
SELECT master_create_distributed_table('repmodel_test', 'a', 'hash');
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
DROP TABLE repmodel_test;
CREATE TABLE repmodel_test (a int);
SELECT master_create_distributed_table('repmodel_test', 'a', 'append');
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
DROP TABLE repmodel_test;
CREATE TABLE repmodel_test (a int);
SELECT master_create_distributed_table('repmodel_test', 'a', 'range');
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
DROP TABLE repmodel_test;
-- Check that the replication_model overwrite behavior is the same with RF=1
SET citus.shard_replication_factor TO 1;
CREATE TABLE repmodel_test (a int);
SELECT create_distributed_table('repmodel_test', 'a', 'append');
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
DROP TABLE repmodel_test;
CREATE TABLE repmodel_test (a int);
SELECT create_distributed_table('repmodel_test', 'a', 'range');
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
DROP TABLE repmodel_test;
CREATE TABLE repmodel_test (a int);
SELECT master_create_distributed_table('repmodel_test', 'a', 'hash');
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
DROP TABLE repmodel_test;
CREATE TABLE repmodel_test (a int);
SELECT master_create_distributed_table('repmodel_test', 'a', 'append');
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
DROP TABLE repmodel_test;
CREATE TABLE repmodel_test (a int);
SELECT master_create_distributed_table('repmodel_test', 'a', 'range');
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
DROP TABLE repmodel_test;
RESET citus.replication_model;
-- There should be no table on the worker node
\c - - :public_worker_1_host :worker_1_port
SELECT relname FROM pg_class WHERE relname LIKE 'data_load_test%';
\c - - :master_host :master_port
-- creating an index after loading data works
BEGIN;
CREATE TABLE data_load_test (col1 int, col2 text, col3 serial);
INSERT INTO data_load_test VALUES (132, 'hello');
SELECT create_distributed_table('data_load_test', 'col1');
CREATE INDEX data_load_test_idx ON data_load_test (col2);
DROP TABLE data_load_test;
END;
-- popping in and out of existence in the same transaction works
BEGIN;
CREATE TABLE data_load_test (col1 int, col2 text, col3 serial);
INSERT INTO data_load_test VALUES (132, 'hello');
SELECT create_distributed_table('data_load_test', 'col1');
DROP TABLE data_load_test;
END;
-- but dropping after a write on the distributed table is currently disallowed
BEGIN;
CREATE TABLE data_load_test (col1 int, col2 text, col3 serial);
INSERT INTO data_load_test VALUES (132, 'hello');
SELECT create_distributed_table('data_load_test', 'col1');
INSERT INTO data_load_test VALUES (243, 'world');
DROP TABLE data_load_test;
END;
-- Test data loading after dropping a column
CREATE TABLE data_load_test (col1 int, col2 text, col3 text, "CoL4"")" int);
INSERT INTO data_load_test VALUES (132, 'hello', 'world');
INSERT INTO data_load_test VALUES (243, 'world', 'hello');
ALTER TABLE data_load_test DROP COLUMN col1;
SELECT create_distributed_table('data_load_test', 'col3');
SELECT * FROM data_load_test ORDER BY col2;
-- make sure the tuple went to the right shard
SELECT * FROM data_load_test WHERE col3 = 'world';
DROP TABLE data_load_test;
SET citus.shard_replication_factor TO default;
SET citus.shard_count to 4;
CREATE TABLE lineitem_hash_part (like lineitem);
SELECT create_distributed_table('lineitem_hash_part', 'l_orderkey');
CREATE TABLE orders_hash_part (like orders);
SELECT create_distributed_table('orders_hash_part', 'o_orderkey');
CREATE UNLOGGED TABLE unlogged_table
(
key text,
value text
);
SELECT create_distributed_table('unlogged_table', 'key');
SELECT * FROM master_get_table_ddl_events('unlogged_table');
\c - - :public_worker_1_host :worker_1_port
SELECT relpersistence FROM pg_class WHERE relname LIKE 'unlogged_table_%';
\c - - :master_host :master_port
-- Test rollback of create table
BEGIN;
CREATE TABLE rollback_table(id int, name varchar(20));
SELECT create_distributed_table('rollback_table','id');
ROLLBACK;
-- Table should not exist on the worker node
\c - - :public_worker_1_host :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = (SELECT oid FROM pg_class WHERE relname LIKE 'rollback_table%');
\c - - :master_host :master_port
-- Insert 3 rows to make sure that copy after shard creation touches the same
-- worker node twice.
BEGIN;
CREATE TABLE rollback_table(id int, name varchar(20));
INSERT INTO rollback_table VALUES(1, 'Name_1');
INSERT INTO rollback_table VALUES(2, 'Name_2');
INSERT INTO rollback_table VALUES(3, 'Name_3');
SELECT create_distributed_table('rollback_table','id');
ROLLBACK;
-- Table should not exist on the worker node
\c - - :public_worker_1_host :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = (SELECT oid FROM pg_class WHERE relname LIKE 'rollback_table%');
\c - - :master_host :master_port
BEGIN;
CREATE TABLE rollback_table(id int, name varchar(20));
SELECT create_distributed_table('rollback_table','id');
\copy rollback_table from stdin delimiter ','
1, 'name_1'
2, 'name_2'
3, 'name_3'
\.
CREATE INDEX rollback_index ON rollback_table(id);
COMMIT;
-- Check the table is created
SELECT count(*) FROM rollback_table;
DROP TABLE rollback_table;
BEGIN;
CREATE TABLE rollback_table(id int, name varchar(20));
SELECT create_distributed_table('rollback_table','id');
\copy rollback_table from stdin delimiter ','
1, 'name_1'
2, 'name_2'
3, 'name_3'
\.
ROLLBACK;
-- Table should not exist on the worker node
\c - - :public_worker_1_host :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = (SELECT oid FROM pg_class WHERE relname LIKE 'rollback_table%');
\c - - :master_host :master_port
BEGIN;
CREATE TABLE tt1(id int);
SELECT create_distributed_table('tt1','id');
CREATE TABLE tt2(id int);
SELECT create_distributed_table('tt2','id');
INSERT INTO tt1 VALUES(1);
INSERT INTO tt2 SELECT * FROM tt1 WHERE id = 1;
COMMIT;
-- Table should exist on the worker node
\c - - :public_worker_1_host :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = 'public.tt1_360069'::regclass;
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = 'public.tt2_360073'::regclass;
\c - - :master_host :master_port
DROP TABLE tt1;
DROP TABLE tt2;
-- It is known that creating a table with master_create_empty_shard is not
-- transactional, so table stay remaining on the worker node after the rollback
BEGIN;
CREATE TABLE append_tt1(id int);
SELECT create_distributed_table('append_tt1','id','append');
SELECT master_create_empty_shard('append_tt1');
ROLLBACK;
-- Table exists on the worker node.
\c - - :public_worker_1_host :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = 'public.append_tt1_360077'::regclass;
\c - - :master_host :master_port
-- There should be no table on the worker node
\c - - :public_worker_1_host :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = (SELECT oid from pg_class WHERE relname LIKE 'public.tt1%');
\c - - :master_host :master_port
-- Queries executing with router executor is allowed in the same transaction
-- with create_distributed_table
BEGIN;
CREATE TABLE tt1(id int);
INSERT INTO tt1 VALUES(1);
SELECT create_distributed_table('tt1','id');
INSERT INTO tt1 VALUES(2);
SELECT * FROM tt1 WHERE id = 1;
COMMIT;
-- Placements should be created on the worker
\c - - :public_worker_1_host :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = 'public.tt1_360078'::regclass;
\c - - :master_host :master_port
DROP TABLE tt1;
BEGIN;
CREATE TABLE tt1(id int);
SELECT create_distributed_table('tt1','id');
DROP TABLE tt1;
COMMIT;
-- There should be no table on the worker node
\c - - :public_worker_1_host :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = (SELECT oid from pg_class WHERE relname LIKE 'tt1%');
\c - - :master_host :master_port
-- Tests with create_distributed_table & DDL & DML commands
-- Test should pass since GetPlacementListConnection can provide connections
-- in this order of execution
CREATE TABLE sample_table(id int);
SELECT create_distributed_table('sample_table','id');
BEGIN;
CREATE TABLE stage_table (LIKE sample_table);
\COPY stage_table FROM stdin; -- Note that this operation is a local copy
1
2
3
4
\.
SELECT create_distributed_table('stage_table', 'id');
INSERT INTO sample_table SELECT * FROM stage_table;
DROP TABLE stage_table;
SELECT * FROM sample_table WHERE id = 3;
COMMIT;
-- Show that rows of sample_table are updated
SELECT count(*) FROM sample_table;
DROP table sample_table;
-- Test as create_distributed_table - copy - create_distributed_table - copy
-- This combination is used by tests written by some ORMs.
BEGIN;
CREATE TABLE tt1(id int);
SELECT create_distributed_table('tt1','id');
\COPY tt1 from stdin;
1
2
3
\.
CREATE TABLE tt2(like tt1);
SELECT create_distributed_table('tt2','id');
\COPY tt2 from stdin;
4
5
6
\.
INSERT INTO tt1 SELECT * FROM tt2;
SELECT * FROM tt1 WHERE id = 3;
SELECT * FROM tt2 WHERE id = 6;
END;
SELECT count(*) FROM tt1;
-- the goal of the following test is to make sure that
-- both create_reference_table and create_distributed_table
-- calls creates the schemas without leading to any deadlocks
-- first create reference table, then hash distributed table
BEGIN;
CREATE SCHEMA sc;
CREATE TABLE sc.ref(a int);
insert into sc.ref SELECT s FROM generate_series(0, 100) s;
SELECT create_reference_table('sc.ref');
CREATE TABLE sc.hash(a int);
insert into sc.hash SELECT s FROM generate_series(0, 100) s;
SELECT create_distributed_table('sc.hash', 'a');
COMMIT;
-- first create hash distributed table, then reference table
BEGIN;
CREATE SCHEMA sc2;
CREATE TABLE sc2.hash(a int);
insert into sc2.hash SELECT s FROM generate_series(0, 100) s;
SELECT create_distributed_table('sc2.hash', 'a');
CREATE TABLE sc2.ref(a int);
insert into sc2.ref SELECT s FROM generate_series(0, 100) s;
SELECT create_reference_table('sc2.ref');
COMMIT;
SET citus.shard_count TO 4;
BEGIN;
CREATE SCHEMA sc3;
CREATE TABLE sc3.alter_replica_table
(
name text,
id int PRIMARY KEY
);
ALTER TABLE sc3.alter_replica_table REPLICA IDENTITY USING INDEX alter_replica_table_pkey;
SELECT create_distributed_table('sc3.alter_replica_table', 'id');
COMMIT;
SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc3' LIMIT 1$$);
BEGIN;
CREATE SCHEMA sc4;
CREATE TABLE sc4.alter_replica_table
(
name text,
id int PRIMARY KEY
);
INSERT INTO sc4.alter_replica_table(id) SELECT generate_series(1,100);
SET search_path = 'sc4';
ALTER TABLE alter_replica_table REPLICA IDENTITY USING INDEX alter_replica_table_pkey;
SELECT create_distributed_table('alter_replica_table', 'id');
COMMIT;
SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc4' LIMIT 1$$);
SET search_path = 'public';
BEGIN;
CREATE SCHEMA sc5;
CREATE TABLE sc5.alter_replica_table
(
name text,
id int NOT NULL
);
INSERT INTO sc5.alter_replica_table(id) SELECT generate_series(1,100);
ALTER TABLE sc5.alter_replica_table REPLICA IDENTITY FULL;
SELECT create_distributed_table('sc5.alter_replica_table', 'id');
COMMIT;
SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc5' LIMIT 1$$);
BEGIN;
CREATE SCHEMA sc6;
CREATE TABLE sc6.alter_replica_table
(
name text,
id int NOT NULL
);
INSERT INTO sc6.alter_replica_table(id) SELECT generate_series(1,100);
CREATE UNIQUE INDEX unique_idx ON sc6.alter_replica_table(id);
ALTER TABLE sc6.alter_replica_table REPLICA IDENTITY USING INDEX unique_idx;
SELECT create_distributed_table('sc6.alter_replica_table', 'id');
COMMIT;
SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc6' LIMIT 1$$);
BEGIN;
CREATE TABLE alter_replica_table
(
name text,
id int NOT NULL
);
INSERT INTO alter_replica_table(id) SELECT generate_series(1,100);
CREATE UNIQUE INDEX unique_idx ON alter_replica_table(id);
ALTER TABLE alter_replica_table REPLICA IDENTITY USING INDEX unique_idx;
SELECT create_distributed_table('alter_replica_table', 'id');
COMMIT;
SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='public' LIMIT 1$$);
DROP TABLE tt1;
DROP TABLE tt2;
DROP TABLE alter_replica_table;
DROP SCHEMA sc CASCADE;
DROP SCHEMA sc2 CASCADE;
DROP SCHEMA sc3 CASCADE;
DROP SCHEMA sc4 CASCADE;
DROP SCHEMA sc5 CASCADE;
DROP SCHEMA sc6 CASCADE;

View File

@ -88,43 +88,6 @@ BEGIN
END;
$func$;
CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000)
RETURNS void
LANGUAGE C STRICT
AS 'citus';
-- set sync intervals to less than 15s so wait_until_metadata_sync never times out
ALTER SYSTEM SET citus.metadata_sync_interval TO 3000;
ALTER SYSTEM SET citus.metadata_sync_retry_interval TO 500;
SELECT pg_reload_conf();
-- Verifies pg_dist_node and pg_dist_palcement in the given worker matches the ones in coordinator
CREATE OR REPLACE FUNCTION verify_metadata(hostname TEXT, port INTEGER, master_port INTEGER DEFAULT 57636)
RETURNS BOOLEAN
LANGUAGE sql
AS $$
SELECT wait_until_metadata_sync();
WITH dist_node_summary AS (
SELECT 'SELECT jsonb_agg(ROW(nodeid, groupid, nodename, nodeport, isactive) ORDER BY nodeid) FROM pg_dist_node' as query
), dist_node_check AS (
SELECT count(distinct result) = 1 AS matches
FROM dist_node_summary CROSS JOIN LATERAL
master_run_on_worker(ARRAY[hostname, 'localhost'], ARRAY[port, master_port],
ARRAY[dist_node_summary.query, dist_node_summary.query],
false)
), dist_placement_summary AS (
SELECT 'SELECT jsonb_agg(pg_dist_placement ORDER BY shardid) FROM pg_dist_placement)' AS query
), dist_placement_check AS (
SELECT count(distinct result) = 1 AS matches
FROM dist_placement_summary CROSS JOIN LATERAL
master_run_on_worker(ARRAY[hostname, 'localhost'], ARRAY[port, master_port],
ARRAY[dist_placement_summary.query, dist_placement_summary.query],
false)
)
SELECT dist_node_check.matches AND dist_placement_check.matches
FROM dist_node_check CROSS JOIN dist_placement_check
$$;
--
-- Procedure for creating shards for range partitioned distributed table.
--
@ -141,17 +104,3 @@ BEGIN
END LOOP;
END;
$$ LANGUAGE plpgsql;
-- partition_task_list_results tests the internal PartitionTasklistResults function
CREATE OR REPLACE FUNCTION pg_catalog.partition_task_list_results(resultIdPrefix text,
query text,
target_table regclass,
binaryFormat bool DEFAULT true)
RETURNS TABLE(resultId text,
nodeId int,
rowCount bigint,
targetShardId bigint,
targetShardIndex int)
LANGUAGE C STRICT VOLATILE
AS 'citus', $$partition_task_list_results$$;

View File

@ -0,0 +1,50 @@
CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000)
RETURNS void
LANGUAGE C STRICT
AS 'citus';
-- set sync intervals to less than 15s so wait_until_metadata_sync never times out
ALTER SYSTEM SET citus.metadata_sync_interval TO 3000;
ALTER SYSTEM SET citus.metadata_sync_retry_interval TO 500;
SELECT pg_reload_conf();
-- Verifies pg_dist_node and pg_dist_palcement in the given worker matches the ones in coordinator
CREATE OR REPLACE FUNCTION verify_metadata(hostname TEXT, port INTEGER, master_port INTEGER DEFAULT 57636)
RETURNS BOOLEAN
LANGUAGE sql
AS $$
SELECT wait_until_metadata_sync();
WITH dist_node_summary AS (
SELECT 'SELECT jsonb_agg(ROW(nodeid, groupid, nodename, nodeport, isactive) ORDER BY nodeid) FROM pg_dist_node' as query
), dist_node_check AS (
SELECT count(distinct result) = 1 AS matches
FROM dist_node_summary CROSS JOIN LATERAL
master_run_on_worker(ARRAY[hostname, 'localhost'], ARRAY[port, master_port],
ARRAY[dist_node_summary.query, dist_node_summary.query],
false)
), dist_placement_summary AS (
SELECT 'SELECT jsonb_agg(pg_dist_placement ORDER BY shardid) FROM pg_dist_placement)' AS query
), dist_placement_check AS (
SELECT count(distinct result) = 1 AS matches
FROM dist_placement_summary CROSS JOIN LATERAL
master_run_on_worker(ARRAY[hostname, 'localhost'], ARRAY[port, master_port],
ARRAY[dist_placement_summary.query, dist_placement_summary.query],
false)
)
SELECT dist_node_check.matches AND dist_placement_check.matches
FROM dist_node_check CROSS JOIN dist_placement_check
$$;
-- partition_task_list_results tests the internal PartitionTasklistResults function
CREATE OR REPLACE FUNCTION pg_catalog.partition_task_list_results(resultIdPrefix text,
query text,
target_table regclass,
binaryFormat bool DEFAULT true)
RETURNS TABLE(resultId text,
nodeId int,
rowCount bigint,
targetShardId bigint,
targetShardIndex int)
LANGUAGE C STRICT VOLATILE
AS 'citus', $$partition_task_list_results$$;