mirror of https://github.com/citusdata/citus.git
Adds multi_schedule_hyperscale
parent
12c6204115
commit
4ced3018fb
|
@ -124,6 +124,10 @@ check-multi-non-adaptive: all
|
||||||
--server-option=citus.task_executor_type=real-time \
|
--server-option=citus.task_executor_type=real-time \
|
||||||
-- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/multi_schedule $(EXTRA_TESTS)
|
-- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/multi_schedule $(EXTRA_TESTS)
|
||||||
|
|
||||||
|
check-multi-hyperscale: all
|
||||||
|
$(pg_regress_multi_check) --constr="$(constr)" --hoststr="$(hoststr)" --load-extension=citus \
|
||||||
|
-- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/multi_schedule_hyperscale $(EXTRA_TESTS)
|
||||||
|
|
||||||
check-multi-hyperscale-superuser: all
|
check-multi-hyperscale-superuser: all
|
||||||
$(pg_regress_multi_check) --constr="$(constr)" --hoststr="$(hoststr)" --load-extension=citus \
|
$(pg_regress_multi_check) --constr="$(constr)" --hoststr="$(hoststr)" --load-extension=citus \
|
||||||
-- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/multi_schedule_hyperscale_superuser $(EXTRA_TESTS)
|
-- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/multi_schedule_hyperscale_superuser $(EXTRA_TESTS)
|
||||||
|
|
|
@ -0,0 +1,508 @@
|
||||||
|
--
|
||||||
|
-- MULTI_CREATE_TABLE
|
||||||
|
--
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 360000;
|
||||||
|
ERROR: must be owner of sequence pg_dist_shardid_seq
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 100000;
|
||||||
|
ERROR: must be owner of sequence pg_dist_colocationid_seq
|
||||||
|
-- Create new table definitions for use in testing in distributed planning and
|
||||||
|
-- execution functionality. Also create indexes to boost performance. Since we
|
||||||
|
-- need to cover both reference join and partitioned join, we have created
|
||||||
|
-- reference and append distributed version of orders, customer and part tables.
|
||||||
|
CREATE TABLE lineitem (
|
||||||
|
l_orderkey bigint not null,
|
||||||
|
l_partkey integer not null,
|
||||||
|
l_suppkey integer not null,
|
||||||
|
l_linenumber integer not null,
|
||||||
|
l_quantity decimal(15, 2) not null,
|
||||||
|
l_extendedprice decimal(15, 2) not null,
|
||||||
|
l_discount decimal(15, 2) not null,
|
||||||
|
l_tax decimal(15, 2) not null,
|
||||||
|
l_returnflag char(1) not null,
|
||||||
|
l_linestatus char(1) not null,
|
||||||
|
l_shipdate date not null,
|
||||||
|
l_commitdate date not null,
|
||||||
|
l_receiptdate date not null,
|
||||||
|
l_shipinstruct char(25) not null,
|
||||||
|
l_shipmode char(10) not null,
|
||||||
|
l_comment varchar(44) not null,
|
||||||
|
PRIMARY KEY(l_orderkey, l_linenumber) );
|
||||||
|
SELECT create_distributed_table('lineitem', 'l_orderkey', 'append');
|
||||||
|
WARNING: table "lineitem" has a UNIQUE or EXCLUDE constraint
|
||||||
|
DETAIL: UNIQUE constraints, EXCLUDE constraints, and PRIMARY KEYs on append-partitioned tables cannot be enforced.
|
||||||
|
HINT: Consider using hash partitioning.
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE INDEX lineitem_time_index ON lineitem (l_shipdate);
|
||||||
|
CREATE TABLE orders (
|
||||||
|
o_orderkey bigint not null,
|
||||||
|
o_custkey integer not null,
|
||||||
|
o_orderstatus char(1) not null,
|
||||||
|
o_totalprice decimal(15,2) not null,
|
||||||
|
o_orderdate date not null,
|
||||||
|
o_orderpriority char(15) not null,
|
||||||
|
o_clerk char(15) not null,
|
||||||
|
o_shippriority integer not null,
|
||||||
|
o_comment varchar(79) not null,
|
||||||
|
PRIMARY KEY(o_orderkey) );
|
||||||
|
SELECT create_distributed_table('orders', 'o_orderkey', 'append');
|
||||||
|
WARNING: table "orders" has a UNIQUE or EXCLUDE constraint
|
||||||
|
DETAIL: UNIQUE constraints, EXCLUDE constraints, and PRIMARY KEYs on append-partitioned tables cannot be enforced.
|
||||||
|
HINT: Consider using hash partitioning.
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE orders_reference (
|
||||||
|
o_orderkey bigint not null,
|
||||||
|
o_custkey integer not null,
|
||||||
|
o_orderstatus char(1) not null,
|
||||||
|
o_totalprice decimal(15,2) not null,
|
||||||
|
o_orderdate date not null,
|
||||||
|
o_orderpriority char(15) not null,
|
||||||
|
o_clerk char(15) not null,
|
||||||
|
o_shippriority integer not null,
|
||||||
|
o_comment varchar(79) not null,
|
||||||
|
PRIMARY KEY(o_orderkey) );
|
||||||
|
SELECT create_reference_table('orders_reference');
|
||||||
|
create_reference_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE customer (
|
||||||
|
c_custkey integer not null,
|
||||||
|
c_name varchar(25) not null,
|
||||||
|
c_address varchar(40) not null,
|
||||||
|
c_nationkey integer not null,
|
||||||
|
c_phone char(15) not null,
|
||||||
|
c_acctbal decimal(15,2) not null,
|
||||||
|
c_mktsegment char(10) not null,
|
||||||
|
c_comment varchar(117) not null);
|
||||||
|
SELECT create_reference_table('customer');
|
||||||
|
create_reference_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE customer_append (
|
||||||
|
c_custkey integer not null,
|
||||||
|
c_name varchar(25) not null,
|
||||||
|
c_address varchar(40) not null,
|
||||||
|
c_nationkey integer not null,
|
||||||
|
c_phone char(15) not null,
|
||||||
|
c_acctbal decimal(15,2) not null,
|
||||||
|
c_mktsegment char(10) not null,
|
||||||
|
c_comment varchar(117) not null);
|
||||||
|
SELECT create_distributed_table('customer_append', 'c_custkey', 'append');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE nation (
|
||||||
|
n_nationkey integer not null,
|
||||||
|
n_name char(25) not null,
|
||||||
|
n_regionkey integer not null,
|
||||||
|
n_comment varchar(152));
|
||||||
|
SELECT create_reference_table('nation');
|
||||||
|
create_reference_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE part (
|
||||||
|
p_partkey integer not null,
|
||||||
|
p_name varchar(55) not null,
|
||||||
|
p_mfgr char(25) not null,
|
||||||
|
p_brand char(10) not null,
|
||||||
|
p_type varchar(25) not null,
|
||||||
|
p_size integer not null,
|
||||||
|
p_container char(10) not null,
|
||||||
|
p_retailprice decimal(15,2) not null,
|
||||||
|
p_comment varchar(23) not null);
|
||||||
|
SELECT create_reference_table('part');
|
||||||
|
create_reference_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE part_append (
|
||||||
|
p_partkey integer not null,
|
||||||
|
p_name varchar(55) not null,
|
||||||
|
p_mfgr char(25) not null,
|
||||||
|
p_brand char(10) not null,
|
||||||
|
p_type varchar(25) not null,
|
||||||
|
p_size integer not null,
|
||||||
|
p_container char(10) not null,
|
||||||
|
p_retailprice decimal(15,2) not null,
|
||||||
|
p_comment varchar(23) not null);
|
||||||
|
SELECT create_distributed_table('part_append', 'p_partkey', 'append');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE supplier
|
||||||
|
(
|
||||||
|
s_suppkey integer not null,
|
||||||
|
s_name char(25) not null,
|
||||||
|
s_address varchar(40) not null,
|
||||||
|
s_nationkey integer,
|
||||||
|
s_phone char(15) not null,
|
||||||
|
s_acctbal decimal(15,2) not null,
|
||||||
|
s_comment varchar(101) not null
|
||||||
|
);
|
||||||
|
SELECT create_reference_table('supplier');
|
||||||
|
create_reference_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- create a single shard supplier table which is not
|
||||||
|
-- a reference table
|
||||||
|
CREATE TABLE supplier_single_shard
|
||||||
|
(
|
||||||
|
s_suppkey integer not null,
|
||||||
|
s_name char(25) not null,
|
||||||
|
s_address varchar(40) not null,
|
||||||
|
s_nationkey integer,
|
||||||
|
s_phone char(15) not null,
|
||||||
|
s_acctbal decimal(15,2) not null,
|
||||||
|
s_comment varchar(101) not null
|
||||||
|
);
|
||||||
|
SELECT create_distributed_table('supplier_single_shard', 's_suppkey', 'append');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE mx_table_test (col1 int, col2 text);
|
||||||
|
-- Since we're superuser, we can set the replication model to 'streaming' to
|
||||||
|
-- create a one-off MX table... but if we forget to set the replication factor to one,
|
||||||
|
-- we should see an error reminding us to fix that
|
||||||
|
SET <user>.replication_model TO 'streaming';
|
||||||
|
ERROR: permission denied to set parameter "<user>.replication_model"
|
||||||
|
SELECT create_distributed_table('mx_table_test', 'col1');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- ok, so now actually create the one-off MX table
|
||||||
|
SET <user>.shard_replication_factor TO 1;
|
||||||
|
SELECT create_distributed_table('mx_table_test', 'col1');
|
||||||
|
ERROR: table "mx_table_test" is already distributed
|
||||||
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regclass;
|
||||||
|
repmodel
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
c
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE mx_table_test;
|
||||||
|
-- Show that master_create_distributed_table ignores <user>.replication_model GUC
|
||||||
|
CREATE TABLE s_table(a int);
|
||||||
|
SELECT master_create_distributed_table('s_table', 'a', 'hash');
|
||||||
|
master_create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='s_table'::regclass;
|
||||||
|
repmodel
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
c
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Show that master_create_worker_shards complains when RF>1 and replication model is streaming
|
||||||
|
UPDATE pg_dist_partition SET repmodel = 's' WHERE logicalrelid='s_table'::regclass;
|
||||||
|
ERROR: permission denied for table pg_dist_partition
|
||||||
|
SELECT master_create_worker_shards('s_table', 4, 2);
|
||||||
|
master_create_worker_shards
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE s_table;
|
||||||
|
RESET <user>.replication_model;
|
||||||
|
ERROR: permission denied to set parameter "<user>.replication_model"
|
||||||
|
-- Show that create_distributed_table with append and range distributions ignore
|
||||||
|
-- <user>.replication_model GUC
|
||||||
|
SET <user>.shard_replication_factor TO 2;
|
||||||
|
SET <user>.replication_model TO streaming;
|
||||||
|
ERROR: permission denied to set parameter "<user>.replication_model"
|
||||||
|
CREATE TABLE repmodel_test (a int);
|
||||||
|
SELECT create_distributed_table('repmodel_test', 'a', 'append');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
|
||||||
|
repmodel
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
c
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE repmodel_test;
|
||||||
|
CREATE TABLE repmodel_test (a int);
|
||||||
|
SELECT create_distributed_table('repmodel_test', 'a', 'range');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
|
||||||
|
repmodel
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
c
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE repmodel_test;
|
||||||
|
-- Show that master_create_distributed_table created statement replicated tables no matter
|
||||||
|
-- what <user>.replication_model set to
|
||||||
|
CREATE TABLE repmodel_test (a int);
|
||||||
|
SELECT master_create_distributed_table('repmodel_test', 'a', 'hash');
|
||||||
|
master_create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
|
||||||
|
repmodel
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
c
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE repmodel_test;
|
||||||
|
CREATE TABLE repmodel_test (a int);
|
||||||
|
SELECT master_create_distributed_table('repmodel_test', 'a', 'append');
|
||||||
|
master_create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
|
||||||
|
repmodel
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
c
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE repmodel_test;
|
||||||
|
CREATE TABLE repmodel_test (a int);
|
||||||
|
SELECT master_create_distributed_table('repmodel_test', 'a', 'range');
|
||||||
|
master_create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
|
||||||
|
repmodel
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
c
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE repmodel_test;
|
||||||
|
-- Check that the replication_model overwrite behavior is the same with RF=1
|
||||||
|
SET <user>.shard_replication_factor TO 1;
|
||||||
|
CREATE TABLE repmodel_test (a int);
|
||||||
|
SELECT create_distributed_table('repmodel_test', 'a', 'append');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
|
||||||
|
repmodel
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
c
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE repmodel_test;
|
||||||
|
CREATE TABLE repmodel_test (a int);
|
||||||
|
SELECT create_distributed_table('repmodel_test', 'a', 'range');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
|
||||||
|
repmodel
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
c
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE repmodel_test;
|
||||||
|
CREATE TABLE repmodel_test (a int);
|
||||||
|
SELECT master_create_distributed_table('repmodel_test', 'a', 'hash');
|
||||||
|
master_create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
|
||||||
|
repmodel
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
c
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE repmodel_test;
|
||||||
|
CREATE TABLE repmodel_test (a int);
|
||||||
|
SELECT master_create_distributed_table('repmodel_test', 'a', 'append');
|
||||||
|
master_create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
|
||||||
|
repmodel
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
c
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE repmodel_test;
|
||||||
|
CREATE TABLE repmodel_test (a int);
|
||||||
|
SELECT master_create_distributed_table('repmodel_test', 'a', 'range');
|
||||||
|
master_create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
|
||||||
|
repmodel
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
c
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE repmodel_test;
|
||||||
|
RESET <user>.replication_model;
|
||||||
|
ERROR: permission denied to set parameter "<user>.replication_model"
|
||||||
|
-- Test initial data loading
|
||||||
|
CREATE TABLE data_load_test (col1 int, col2 text, col3 serial);
|
||||||
|
INSERT INTO data_load_test VALUES (132, 'hello');
|
||||||
|
INSERT INTO data_load_test VALUES (243, 'world');
|
||||||
|
-- table must be empty when using append- or range-partitioning
|
||||||
|
SELECT create_distributed_table('data_load_test', 'col1', 'append');
|
||||||
|
ERROR: cannot distribute relation "data_load_test"
|
||||||
|
DETAIL: Relation "data_load_test" contains data.
|
||||||
|
HINT: Empty your table before distributing it.
|
||||||
|
SELECT create_distributed_table('data_load_test', 'col1', 'range');
|
||||||
|
ERROR: cannot distribute relation "data_load_test"
|
||||||
|
DETAIL: Relation "data_load_test" contains data.
|
||||||
|
HINT: Empty your table before distributing it.
|
||||||
|
-- table must be empty when using master_create_distributed_table (no shards created)
|
||||||
|
SELECT master_create_distributed_table('data_load_test', 'col1', 'hash');
|
||||||
|
ERROR: cannot distribute relation "data_load_test"
|
||||||
|
DETAIL: Relation "data_load_test" contains data.
|
||||||
|
HINT: Empty your table before distributing it.
|
||||||
|
-- create_distributed_table creates shards and copies data into the distributed table
|
||||||
|
SELECT create_distributed_table('data_load_test', 'col1');
|
||||||
|
NOTICE: Copying data from local table...
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM data_load_test ORDER BY col1;
|
||||||
|
col1 | col2 | col3
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
132 | hello | 1
|
||||||
|
243 | world | 2
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
DROP TABLE data_load_test;
|
||||||
|
-- test queries on distributed tables with no shards
|
||||||
|
CREATE TABLE no_shard_test (col1 int, col2 text);
|
||||||
|
SELECT create_distributed_table('no_shard_test', 'col1', 'append');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM no_shard_test WHERE col1 > 1;
|
||||||
|
col1 | col2
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
DROP TABLE no_shard_test;
|
||||||
|
CREATE TABLE no_shard_test (col1 int, col2 text);
|
||||||
|
SELECT create_distributed_table('no_shard_test', 'col1', 'range');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM no_shard_test WHERE col1 > 1;
|
||||||
|
col1 | col2
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
DROP TABLE no_shard_test;
|
||||||
|
CREATE TABLE no_shard_test (col1 int, col2 text);
|
||||||
|
SELECT master_create_distributed_table('no_shard_test', 'col1', 'hash');
|
||||||
|
master_create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM no_shard_test WHERE col1 > 1;
|
||||||
|
col1 | col2
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
DROP TABLE no_shard_test;
|
||||||
|
-- ensure writes in the same transaction as create_distributed_table are visible
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE data_load_test (col1 int, col2 text, col3 serial);
|
||||||
|
INSERT INTO data_load_test VALUES (132, 'hello');
|
||||||
|
SELECT create_distributed_table('data_load_test', 'col1');
|
||||||
|
NOTICE: Copying data from local table...
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO data_load_test VALUES (243, 'world');
|
||||||
|
END;
|
||||||
|
SELECT * FROM data_load_test ORDER BY col1;
|
||||||
|
col1 | col2 | col3
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
132 | hello | 1
|
||||||
|
243 | world | 2
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
DROP TABLE data_load_test;
|
||||||
|
-- creating co-located distributed tables in the same transaction works
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE data_load_test1 (col1 int, col2 text, col3 serial);
|
||||||
|
INSERT INTO data_load_test1 VALUES (132, 'hello');
|
||||||
|
SELECT create_distributed_table('data_load_test1', 'col1');
|
||||||
|
NOTICE: Copying data from local table...
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE data_load_test2 (col1 int, col2 text, col3 serial);
|
||||||
|
INSERT INTO data_load_test2 VALUES (132, 'world');
|
||||||
|
SELECT create_distributed_table('data_load_test2', 'col1');
|
||||||
|
NOTICE: Copying data from local table...
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT a.col2 ||' '|| b.col2
|
||||||
|
FROM data_load_test1 a JOIN data_load_test2 b USING (col1)
|
||||||
|
WHERE col1 = 132;
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
hello world
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE data_load_test1, data_load_test2;
|
||||||
|
END;
|
|
@ -0,0 +1,98 @@
|
||||||
|
-- File to create functions and helpers needed for subsequent tests
|
||||||
|
-- create a helper function to create objects on each node
|
||||||
|
CREATE OR REPLACE FUNCTION run_command_on_master_and_workers(p_sql text)
|
||||||
|
RETURNS void LANGUAGE plpgsql AS $$
|
||||||
|
BEGIN
|
||||||
|
EXECUTE p_sql;
|
||||||
|
PERFORM run_command_on_workers(p_sql);
|
||||||
|
END;$$;
|
||||||
|
-- Create a function to make sure that queries returning the same result
|
||||||
|
CREATE OR REPLACE FUNCTION raise_failed_execution(query text) RETURNS void AS $$
|
||||||
|
BEGIN
|
||||||
|
EXECUTE query;
|
||||||
|
EXCEPTION WHEN OTHERS THEN
|
||||||
|
IF SQLERRM LIKE 'failed to execute task%' THEN
|
||||||
|
RAISE 'Task failed to execute';
|
||||||
|
END IF;
|
||||||
|
END;
|
||||||
|
$$LANGUAGE plpgsql;
|
||||||
|
-- Create a function to ignore worker plans in explain output
|
||||||
|
CREATE OR REPLACE FUNCTION coordinator_plan(explain_commmand text, out query_plan text)
|
||||||
|
RETURNS SETOF TEXT AS $$
|
||||||
|
BEGIN
|
||||||
|
FOR query_plan IN execute explain_commmand LOOP
|
||||||
|
RETURN next;
|
||||||
|
IF query_plan LIKE '%Task Count:%'
|
||||||
|
THEN
|
||||||
|
RETURN;
|
||||||
|
END IF;
|
||||||
|
END LOOP;
|
||||||
|
RETURN;
|
||||||
|
END; $$ language plpgsql;
|
||||||
|
-- Is a distributed plan?
|
||||||
|
CREATE OR REPLACE FUNCTION plan_is_distributed(explain_commmand text)
|
||||||
|
RETURNS BOOLEAN AS $$
|
||||||
|
DECLARE
|
||||||
|
query_plan TEXT;
|
||||||
|
BEGIN
|
||||||
|
FOR query_plan IN execute explain_commmand LOOP
|
||||||
|
IF query_plan LIKE '%Task Count:%'
|
||||||
|
THEN
|
||||||
|
RETURN TRUE;
|
||||||
|
END IF;
|
||||||
|
END LOOP;
|
||||||
|
RETURN FALSE;
|
||||||
|
END; $$ language plpgsql;
|
||||||
|
-- helper function to quickly run SQL on the whole cluster
|
||||||
|
CREATE OR REPLACE FUNCTION run_command_on_coordinator_and_workers(p_sql text)
|
||||||
|
RETURNS void LANGUAGE plpgsql AS $$
|
||||||
|
BEGIN
|
||||||
|
EXECUTE p_sql;
|
||||||
|
PERFORM run_command_on_workers(p_sql);
|
||||||
|
END;$$;
|
||||||
|
-- 1. Marks the given procedure as colocated with the given table.
|
||||||
|
-- 2. Marks the argument index with which we route the procedure.
|
||||||
|
CREATE OR REPLACE FUNCTION colocate_proc_with_table(procname text, tablerelid regclass, argument_index int)
|
||||||
|
RETURNS void LANGUAGE plpgsql AS $$
|
||||||
|
BEGIN
|
||||||
|
update citus.pg_dist_object
|
||||||
|
set distribution_argument_index = argument_index, colocationid = pg_dist_partition.colocationid
|
||||||
|
from pg_proc, pg_dist_partition
|
||||||
|
where proname = procname and oid = objid and pg_dist_partition.logicalrelid = tablerelid;
|
||||||
|
END;$$;
|
||||||
|
-- helper function to verify the function of a coordinator is the same on all workers
|
||||||
|
CREATE OR REPLACE FUNCTION verify_function_is_same_on_workers(funcname text)
|
||||||
|
RETURNS bool
|
||||||
|
LANGUAGE plpgsql
|
||||||
|
AS $func$
|
||||||
|
DECLARE
|
||||||
|
coordinatorSql text;
|
||||||
|
workerSql text;
|
||||||
|
BEGIN
|
||||||
|
SELECT pg_get_functiondef(funcname::regprocedure) INTO coordinatorSql;
|
||||||
|
FOR workerSql IN SELECT result FROM run_command_on_workers('SELECT pg_get_functiondef(' || quote_literal(funcname) || '::regprocedure)') LOOP
|
||||||
|
IF workerSql != coordinatorSql THEN
|
||||||
|
RAISE INFO 'functions are different, coordinator:% worker:%', coordinatorSql, workerSql;
|
||||||
|
RETURN false;
|
||||||
|
END IF;
|
||||||
|
END LOOP;
|
||||||
|
|
||||||
|
RETURN true;
|
||||||
|
END;
|
||||||
|
$func$;
|
||||||
|
--
|
||||||
|
-- Procedure for creating shards for range partitioned distributed table.
|
||||||
|
--
|
||||||
|
CREATE OR REPLACE PROCEDURE create_range_partitioned_shards(rel regclass, minvalues text[], maxvalues text[])
|
||||||
|
AS $$
|
||||||
|
DECLARE
|
||||||
|
new_shardid bigint;
|
||||||
|
idx int;
|
||||||
|
BEGIN
|
||||||
|
FOR idx IN SELECT * FROM generate_series(1, array_length(minvalues, 1))
|
||||||
|
LOOP
|
||||||
|
SELECT master_create_empty_shard(rel::text) INTO new_shardid;
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue=minvalues[idx], shardmaxvalue=maxvalues[idx] WHERE shardid=new_shardid;
|
||||||
|
END LOOP;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql;
|
|
@ -0,0 +1,399 @@
|
||||||
|
--
|
||||||
|
-- multi behavioral analytics
|
||||||
|
-- this file is intended to create the table requires for the tests
|
||||||
|
--
|
||||||
|
SET citus.next_shard_id TO 1400000;
|
||||||
|
SET citus.shard_replication_factor = 1;
|
||||||
|
SET citus.shard_count = 32;
|
||||||
|
|
||||||
|
CREATE SCHEMA with_basics;
|
||||||
|
SET search_path TO 'with_basics';
|
||||||
|
|
||||||
|
CREATE TABLE users_table (user_id int, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint);
|
||||||
|
SELECT create_distributed_table('users_table', 'user_id');
|
||||||
|
|
||||||
|
CREATE TABLE events_table (user_id int, time timestamp, event_type int, value_2 int, value_3 float, value_4 bigint);
|
||||||
|
SELECT create_distributed_table('events_table', 'user_id');
|
||||||
|
|
||||||
|
\COPY users_table FROM '@abs_srcdir@/data/users_table.data' WITH CSV;
|
||||||
|
\COPY events_table FROM '@abs_srcdir@/data/events_table.data' WITH CSV;
|
||||||
|
|
||||||
|
SET citus.shard_count = 96;
|
||||||
|
CREATE SCHEMA subquery_and_ctes;
|
||||||
|
SET search_path TO subquery_and_ctes;
|
||||||
|
|
||||||
|
CREATE TABLE users_table (user_id int, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint);
|
||||||
|
SELECT create_distributed_table('users_table', 'user_id');
|
||||||
|
|
||||||
|
CREATE TABLE events_table (user_id int, time timestamp, event_type int, value_2 int, value_3 float, value_4 bigint);
|
||||||
|
SELECT create_distributed_table('events_table', 'user_id');
|
||||||
|
|
||||||
|
\COPY users_table FROM '@abs_srcdir@/data/users_table.data' WITH CSV;
|
||||||
|
\COPY events_table FROM '@abs_srcdir@/data/events_table.data' WITH CSV;
|
||||||
|
|
||||||
|
SET citus.shard_count TO DEFAULT;
|
||||||
|
SET search_path TO DEFAULT;
|
||||||
|
|
||||||
|
CREATE TABLE users_table (user_id int, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint);
|
||||||
|
SELECT create_distributed_table('users_table', 'user_id');
|
||||||
|
|
||||||
|
CREATE TABLE events_table (user_id int, time timestamp, event_type int, value_2 int, value_3 float, value_4 bigint);
|
||||||
|
SELECT create_distributed_table('events_table', 'user_id');
|
||||||
|
|
||||||
|
CREATE TABLE agg_results (user_id int, value_1_agg int, value_2_agg int, value_3_agg float, value_4_agg bigint, agg_time timestamp);
|
||||||
|
SELECT create_distributed_table('agg_results', 'user_id');
|
||||||
|
|
||||||
|
-- we need this to improve the concurrency on the regression tests
|
||||||
|
CREATE TABLE agg_results_second (user_id int, value_1_agg int, value_2_agg int, value_3_agg float, value_4_agg bigint, agg_time timestamp);
|
||||||
|
SELECT create_distributed_table('agg_results_second', 'user_id');
|
||||||
|
|
||||||
|
-- same as agg_results_second
|
||||||
|
CREATE TABLE agg_results_third (user_id int, value_1_agg int, value_2_agg int, value_3_agg float, value_4_agg bigint, agg_time timestamp);
|
||||||
|
SELECT create_distributed_table('agg_results_third', 'user_id');
|
||||||
|
|
||||||
|
-- same as agg_results_second
|
||||||
|
CREATE TABLE agg_results_fourth (user_id int, value_1_agg int, value_2_agg int, value_3_agg float, value_4_agg bigint, agg_time timestamp);
|
||||||
|
SELECT create_distributed_table('agg_results_fourth', 'user_id');
|
||||||
|
|
||||||
|
-- same as agg_results_second
|
||||||
|
CREATE TABLE agg_results_window (user_id int, value_1_agg int, value_2_agg int, value_3_agg float, value_4_agg bigint, agg_time timestamp);
|
||||||
|
SELECT create_distributed_table('agg_results_window', 'user_id');
|
||||||
|
|
||||||
|
CREATE TABLE users_ref_test_table(id int, it_name varchar(25), k_no int);
|
||||||
|
SELECT create_reference_table('users_ref_test_table');
|
||||||
|
INSERT INTO users_ref_test_table VALUES(1,'User_1',45);
|
||||||
|
INSERT INTO users_ref_test_table VALUES(2,'User_2',46);
|
||||||
|
INSERT INTO users_ref_test_table VALUES(3,'User_3',47);
|
||||||
|
INSERT INTO users_ref_test_table VALUES(4,'User_4',48);
|
||||||
|
INSERT INTO users_ref_test_table VALUES(5,'User_5',49);
|
||||||
|
INSERT INTO users_ref_test_table VALUES(6,'User_6',50);
|
||||||
|
|
||||||
|
\COPY users_table FROM '@abs_srcdir@/data/users_table.data' WITH CSV;
|
||||||
|
\COPY events_table FROM '@abs_srcdir@/data/events_table.data' WITH CSV;
|
||||||
|
|
||||||
|
-- create indexes for
|
||||||
|
CREATE INDEX is_index1 ON users_table(user_id);
|
||||||
|
CREATE INDEX is_index2 ON events_table(user_id);
|
||||||
|
CREATE INDEX is_index3 ON users_table(value_1);
|
||||||
|
CREATE INDEX is_index4 ON events_table(event_type);
|
||||||
|
CREATE INDEX is_index5 ON users_table(value_2);
|
||||||
|
CREATE INDEX is_index6 ON events_table(value_2);
|
||||||
|
|
||||||
|
-- Create composite type to use in subquery pushdown
|
||||||
|
CREATE TYPE user_composite_type AS
|
||||||
|
(
|
||||||
|
tenant_id BIGINT,
|
||||||
|
user_id BIGINT
|
||||||
|
);
|
||||||
|
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE FUNCTION cmp_user_composite_type_function(user_composite_type, user_composite_type) RETURNS int
|
||||||
|
LANGUAGE 'internal'
|
||||||
|
AS 'btrecordcmp'
|
||||||
|
IMMUTABLE
|
||||||
|
RETURNS NULL ON NULL INPUT;
|
||||||
|
$f$);
|
||||||
|
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE FUNCTION gt_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
|
||||||
|
LANGUAGE 'internal'
|
||||||
|
AS 'record_gt'
|
||||||
|
IMMUTABLE
|
||||||
|
RETURNS NULL ON NULL INPUT;
|
||||||
|
$f$);
|
||||||
|
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE FUNCTION ge_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
|
||||||
|
LANGUAGE 'internal'
|
||||||
|
AS 'record_ge'
|
||||||
|
IMMUTABLE
|
||||||
|
RETURNS NULL ON NULL INPUT;
|
||||||
|
$f$);
|
||||||
|
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE FUNCTION equal_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
|
||||||
|
LANGUAGE 'internal'
|
||||||
|
AS 'record_eq'
|
||||||
|
IMMUTABLE;
|
||||||
|
$f$);
|
||||||
|
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE FUNCTION lt_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
|
||||||
|
LANGUAGE 'internal'
|
||||||
|
AS 'record_lt'
|
||||||
|
IMMUTABLE
|
||||||
|
RETURNS NULL ON NULL INPUT;
|
||||||
|
$f$);
|
||||||
|
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE FUNCTION le_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
|
||||||
|
LANGUAGE 'internal'
|
||||||
|
AS 'record_lt'
|
||||||
|
IMMUTABLE
|
||||||
|
RETURNS NULL ON NULL INPUT;
|
||||||
|
$f$);
|
||||||
|
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE OPERATOR > (
|
||||||
|
LEFTARG = user_composite_type,
|
||||||
|
RIGHTARG = user_composite_type,
|
||||||
|
PROCEDURE = gt_user_composite_type_function
|
||||||
|
);
|
||||||
|
$f$);
|
||||||
|
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE OPERATOR >= (
|
||||||
|
LEFTARG = user_composite_type,
|
||||||
|
RIGHTARG = user_composite_type,
|
||||||
|
PROCEDURE = ge_user_composite_type_function
|
||||||
|
);
|
||||||
|
$f$);
|
||||||
|
|
||||||
|
-- ... use that function to create a custom equality operator...
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
-- ... use that function to create a custom equality operator...
|
||||||
|
CREATE OPERATOR = (
|
||||||
|
LEFTARG = user_composite_type,
|
||||||
|
RIGHTARG = user_composite_type,
|
||||||
|
PROCEDURE = equal_user_composite_type_function,
|
||||||
|
commutator = =,
|
||||||
|
RESTRICT = eqsel,
|
||||||
|
JOIN = eqjoinsel,
|
||||||
|
merges,
|
||||||
|
hashes
|
||||||
|
);
|
||||||
|
$f$);
|
||||||
|
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE OPERATOR <= (
|
||||||
|
LEFTARG = user_composite_type,
|
||||||
|
RIGHTARG = user_composite_type,
|
||||||
|
PROCEDURE = le_user_composite_type_function
|
||||||
|
);
|
||||||
|
$f$);
|
||||||
|
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE OPERATOR < (
|
||||||
|
LEFTARG = user_composite_type,
|
||||||
|
RIGHTARG = user_composite_type,
|
||||||
|
PROCEDURE = lt_user_composite_type_function
|
||||||
|
);
|
||||||
|
$f$);
|
||||||
|
|
||||||
|
|
||||||
|
-- ... and create a custom operator family for hash indexes...
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE OPERATOR FAMILY cats_2_op_fam USING hash;
|
||||||
|
$f$);
|
||||||
|
|
||||||
|
|
||||||
|
-- ... create a test HASH function. Though it is a poor hash function,
|
||||||
|
-- it is acceptable for our tests
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE FUNCTION test_composite_type_hash(user_composite_type) RETURNS int
|
||||||
|
AS 'SELECT hashtext( ($1.tenant_id + $1.tenant_id)::text);'
|
||||||
|
LANGUAGE SQL
|
||||||
|
IMMUTABLE
|
||||||
|
RETURNS NULL ON NULL INPUT;
|
||||||
|
$f$);
|
||||||
|
|
||||||
|
|
||||||
|
-- We need to define two different operator classes for the composite types
|
||||||
|
-- One uses BTREE the other uses HASH
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE OPERATOR CLASS cats_2_op_fam_clas3
|
||||||
|
DEFAULT FOR TYPE user_composite_type USING BTREE AS
|
||||||
|
OPERATOR 1 <= (user_composite_type, user_composite_type),
|
||||||
|
OPERATOR 2 < (user_composite_type, user_composite_type),
|
||||||
|
OPERATOR 3 = (user_composite_type, user_composite_type),
|
||||||
|
OPERATOR 4 >= (user_composite_type, user_composite_type),
|
||||||
|
OPERATOR 5 > (user_composite_type, user_composite_type),
|
||||||
|
|
||||||
|
FUNCTION 1 cmp_user_composite_type_function(user_composite_type, user_composite_type);
|
||||||
|
$f$);
|
||||||
|
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE OPERATOR CLASS cats_2_op_fam_class
|
||||||
|
DEFAULT FOR TYPE user_composite_type USING HASH AS
|
||||||
|
OPERATOR 1 = (user_composite_type, user_composite_type),
|
||||||
|
FUNCTION 1 test_composite_type_hash(user_composite_type);
|
||||||
|
$f$);
|
||||||
|
|
||||||
|
CREATE TABLE events (
|
||||||
|
composite_id user_composite_type,
|
||||||
|
event_id bigint,
|
||||||
|
event_type character varying(255),
|
||||||
|
event_time bigint
|
||||||
|
);
|
||||||
|
SELECT master_create_distributed_table('events', 'composite_id', 'range');
|
||||||
|
|
||||||
|
SELECT master_create_empty_shard('events') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)'
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
|
||||||
|
SELECT master_create_empty_shard('events') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)'
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
|
||||||
|
SELECT master_create_empty_shard('events') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)'
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
|
||||||
|
SELECT master_create_empty_shard('events') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)'
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
|
||||||
|
\COPY events FROM STDIN WITH CSV
|
||||||
|
"(1,1001)",20001,click,1472807012
|
||||||
|
"(1,1001)",20002,submit,1472807015
|
||||||
|
"(1,1001)",20003,pay,1472807020
|
||||||
|
"(1,1002)",20010,click,1472807022
|
||||||
|
"(1,1002)",20011,click,1472807023
|
||||||
|
"(1,1002)",20012,submit,1472807025
|
||||||
|
"(1,1002)",20013,pay,1472807030
|
||||||
|
"(1,1003)",20014,click,1472807032
|
||||||
|
"(1,1003)",20015,click,1472807033
|
||||||
|
"(1,1003)",20016,click,1472807034
|
||||||
|
"(1,1003)",20017,submit,1472807035
|
||||||
|
\.
|
||||||
|
|
||||||
|
CREATE TABLE users (
|
||||||
|
composite_id user_composite_type,
|
||||||
|
lastseen bigint
|
||||||
|
);
|
||||||
|
SELECT master_create_distributed_table('users', 'composite_id', 'range');
|
||||||
|
|
||||||
|
SELECT master_create_empty_shard('users') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)'
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
|
||||||
|
SELECT master_create_empty_shard('users') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)'
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
|
||||||
|
SELECT master_create_empty_shard('users') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)'
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
|
||||||
|
SELECT master_create_empty_shard('users') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)'
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
|
||||||
|
\COPY users FROM STDIN WITH CSV
|
||||||
|
"(1,1001)",1472807115
|
||||||
|
"(1,1002)",1472807215
|
||||||
|
"(1,1003)",1472807315
|
||||||
|
\.
|
||||||
|
|
||||||
|
-- Create tables for subquery tests
|
||||||
|
CREATE TABLE lineitem_subquery (
|
||||||
|
l_orderkey bigint not null,
|
||||||
|
l_partkey integer not null,
|
||||||
|
l_suppkey integer not null,
|
||||||
|
l_linenumber integer not null,
|
||||||
|
l_quantity decimal(15, 2) not null,
|
||||||
|
l_extendedprice decimal(15, 2) not null,
|
||||||
|
l_discount decimal(15, 2) not null,
|
||||||
|
l_tax decimal(15, 2) not null,
|
||||||
|
l_returnflag char(1) not null,
|
||||||
|
l_linestatus char(1) not null,
|
||||||
|
l_shipdate date not null,
|
||||||
|
l_commitdate date not null,
|
||||||
|
l_receiptdate date not null,
|
||||||
|
l_shipinstruct char(25) not null,
|
||||||
|
l_shipmode char(10) not null,
|
||||||
|
l_comment varchar(44) not null,
|
||||||
|
PRIMARY KEY(l_orderkey, l_linenumber) );
|
||||||
|
SELECT master_create_distributed_table('lineitem_subquery', 'l_orderkey', 'range');
|
||||||
|
|
||||||
|
CREATE TABLE orders_subquery (
|
||||||
|
o_orderkey bigint not null,
|
||||||
|
o_custkey integer not null,
|
||||||
|
o_orderstatus char(1) not null,
|
||||||
|
o_totalprice decimal(15,2) not null,
|
||||||
|
o_orderdate date not null,
|
||||||
|
o_orderpriority char(15) not null,
|
||||||
|
o_clerk char(15) not null,
|
||||||
|
o_shippriority integer not null,
|
||||||
|
o_comment varchar(79) not null,
|
||||||
|
PRIMARY KEY(o_orderkey) );
|
||||||
|
SELECT master_create_distributed_table('orders_subquery', 'o_orderkey', 'range');
|
||||||
|
|
||||||
|
SET citus.enable_router_execution TO 'false';
|
||||||
|
|
||||||
|
-- Check that we don't crash if there are not any shards.
|
||||||
|
SELECT
|
||||||
|
avg(unit_price)
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
l_orderkey,
|
||||||
|
avg(o_totalprice) AS unit_price
|
||||||
|
FROM
|
||||||
|
lineitem_subquery,
|
||||||
|
orders_subquery
|
||||||
|
WHERE
|
||||||
|
l_orderkey = o_orderkey
|
||||||
|
GROUP BY
|
||||||
|
l_orderkey) AS unit_prices;
|
||||||
|
|
||||||
|
-- Load data into tables.
|
||||||
|
|
||||||
|
SELECT master_create_empty_shard('lineitem_subquery') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 5986
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
|
||||||
|
SELECT master_create_empty_shard('lineitem_subquery') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = 8997, shardmaxvalue = 14947
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
|
||||||
|
SELECT master_create_empty_shard('orders_subquery') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 5986
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
|
||||||
|
SELECT master_create_empty_shard('orders_subquery') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = 8997, shardmaxvalue = 14947
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
|
||||||
|
SET citus.shard_max_size TO "1MB";
|
||||||
|
|
||||||
|
\copy lineitem_subquery FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|'
|
||||||
|
\copy lineitem_subquery FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|'
|
||||||
|
|
||||||
|
\copy orders_subquery FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|'
|
||||||
|
\copy orders_subquery FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|'
|
||||||
|
|
||||||
|
CREATE TABLE events_reference_table (like events_table including all);
|
||||||
|
SELECT create_reference_table('events_reference_table');
|
||||||
|
CREATE INDEX events_ref_val2 on events_reference_table(value_2);
|
||||||
|
INSERT INTO events_reference_table SELECT * FROM events_table;
|
||||||
|
|
||||||
|
CREATE TABLE users_reference_table (like users_table including all);
|
||||||
|
SELECT create_reference_table('users_reference_table');
|
||||||
|
INSERT INTO users_reference_table SELECT * FROM users_table;
|
|
@ -0,0 +1,26 @@
|
||||||
|
--
|
||||||
|
-- MULTI_LOAD_DATA
|
||||||
|
--
|
||||||
|
-- Tests for loading data in a distributed cluster. Please note that the number
|
||||||
|
-- of shards uploaded depends on two config values: citus.shard_replication_factor and
|
||||||
|
-- citus.shard_max_size. These values are set in pg_regress_multi.pl. Shard placement
|
||||||
|
-- policy is left to the default value (round-robin) to test the common install case.
|
||||||
|
|
||||||
|
SET citus.next_shard_id TO 290000;
|
||||||
|
|
||||||
|
\copy lineitem FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|'
|
||||||
|
\copy lineitem FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|'
|
||||||
|
|
||||||
|
\copy orders FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|'
|
||||||
|
\copy orders FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|'
|
||||||
|
|
||||||
|
\copy orders_reference FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|'
|
||||||
|
\copy orders_reference FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|'
|
||||||
|
|
||||||
|
\copy customer FROM '@abs_srcdir@/data/customer.1.data' with delimiter '|'
|
||||||
|
\copy customer_append FROM '@abs_srcdir@/data/customer.1.data' with delimiter '|'
|
||||||
|
\copy nation FROM '@abs_srcdir@/data/nation.data' with delimiter '|'
|
||||||
|
\copy part FROM '@abs_srcdir@/data/part.data' with delimiter '|'
|
||||||
|
\copy part_append FROM '@abs_srcdir@/data/part.data' with delimiter '|'
|
||||||
|
\copy supplier FROM '@abs_srcdir@/data/supplier.data' with delimiter '|'
|
||||||
|
\copy supplier_single_shard FROM '@abs_srcdir@/data/supplier.data' with delimiter '|'
|
|
@ -0,0 +1,262 @@
|
||||||
|
# ----------
|
||||||
|
# $Id$
|
||||||
|
#
|
||||||
|
# Regression tests that exercise distributed planning/execution functionality.
|
||||||
|
#
|
||||||
|
# All new regression tests are expected to be run by this schedule. Tests that
|
||||||
|
# do not set specific task executor type should also be added to
|
||||||
|
# multi_task_tracker_extra_schedule.
|
||||||
|
#
|
||||||
|
# Note that we use variant comparison files to test version dependent regression
|
||||||
|
# test results. For more information:
|
||||||
|
# http://www.postgresql.org/docs/current/static/regress-variant.html
|
||||||
|
# ----------
|
||||||
|
|
||||||
|
# ---
|
||||||
|
# Tests around schema changes, these are run first, so there's no preexisting objects.
|
||||||
|
#
|
||||||
|
# propagate_extension_commands lies just after multi_cluster_management as we do
|
||||||
|
# remove / add node operations, we do not want any preexisting objects before
|
||||||
|
# propagate_extension_commands
|
||||||
|
# ---
|
||||||
|
test: multi_test_helpers_shadow
|
||||||
|
test: multi_test_catalog_views
|
||||||
|
test: multi_name_resolution
|
||||||
|
|
||||||
|
|
||||||
|
# ----------
|
||||||
|
# The following distributed tests depend on creating a partitioned table and
|
||||||
|
# uploading data to it.
|
||||||
|
# ----------
|
||||||
|
test: multi_create_table_shadow
|
||||||
|
test: multi_master_protocol multi_load_data_shadow multi_behavioral_analytics_create_table_shadow
|
||||||
|
test: recursive_dml_with_different_planners_executors
|
||||||
|
test: window_functions multi_insert_select_window
|
||||||
|
|
||||||
|
# following should not run in parallel because it relies on connection counts to workers
|
||||||
|
test: insert_select_connection_leak
|
||||||
|
|
||||||
|
# ---------
|
||||||
|
# at the end of the regression tests regaring recursively planned modifications
|
||||||
|
# ensure that we don't leak any intermediate results
|
||||||
|
# This test should not run in parallel with any other tests
|
||||||
|
# ---------
|
||||||
|
|
||||||
|
# ----------
|
||||||
|
# Tests for partitioning support
|
||||||
|
# ----------
|
||||||
|
|
||||||
|
|
||||||
|
# ----------
|
||||||
|
# Tests for recursive subquery planning
|
||||||
|
# ----------
|
||||||
|
test: subquery_basics subquery_local_tables subquery_executors set_operations set_operation_and_local_tables
|
||||||
|
test: subquery_partitioning subquery_complex_target_list subqueries_not_supported
|
||||||
|
test: non_colocated_join_order
|
||||||
|
test: subquery_prepared_statements pg12 cte_inline
|
||||||
|
|
||||||
|
# ----------
|
||||||
|
# Miscellaneous tests to check our query planning behavior
|
||||||
|
# ----------
|
||||||
|
test: multi_distributed_transaction_id
|
||||||
|
test: hyperscale_tutorial
|
||||||
|
test: multi_basic_queries multi_complex_expressions multi_subquery_complex_queries multi_subquery_behavioral_analytics
|
||||||
|
test: multi_subquery_complex_reference_clause multi_subquery_window_functions multi_sql_function
|
||||||
|
test: multi_function_in_join row_types
|
||||||
|
test: multi_subquery_in_where_reference_clause full_join adaptive_executor propagate_set_commands
|
||||||
|
test: multi_subquery_union multi_subquery_in_where_clause multi_subquery_misc
|
||||||
|
test: multi_limit_clause_approximate multi_single_relation_subquery
|
||||||
|
test: multi_select_for_update
|
||||||
|
test: multi_average_expression multi_working_columns multi_having_pushdown
|
||||||
|
test: multi_array_agg multi_limit_clause
|
||||||
|
test: multi_jsonb_agg multi_jsonb_object_agg multi_json_agg multi_json_object_agg bool_agg ch_bench_having ch_bench_subquery_repartition chbenchmark_all_queries expression_reference_join
|
||||||
|
test: multi_agg_type_conversion multi_count_type_conversion
|
||||||
|
test: multi_cross_shard
|
||||||
|
test: multi_dropped_column_aliases foreign_key_restriction_enforcement
|
||||||
|
test: multi_binary_master_copy_format
|
||||||
|
|
||||||
|
# ----------
|
||||||
|
# Parallel TPC-H tests to check our distributed execution behavior
|
||||||
|
# ----------
|
||||||
|
test: multi_tpch_query1 multi_tpch_query3 multi_tpch_query6 multi_tpch_query10
|
||||||
|
test: multi_tpch_query12 multi_tpch_query14 multi_tpch_query19
|
||||||
|
test: multi_tpch_query7 multi_tpch_query7_nested
|
||||||
|
|
||||||
|
# ----------
|
||||||
|
# Parallel tests to check our join order planning logic. Note that we load data
|
||||||
|
# below; and therefore these tests should come after the execution tests.
|
||||||
|
# ----------
|
||||||
|
test: multi_join_order_tpch_small multi_join_order_additional
|
||||||
|
test: multi_join_order_tpch_repartition
|
||||||
|
|
||||||
|
# ----------
|
||||||
|
# Tests for repartition join planning and execution. Be careful when creating
|
||||||
|
# new shards before these tests, as they expect specific shard identifiers in
|
||||||
|
# the output.
|
||||||
|
# ----------
|
||||||
|
test: multi_repartition_join_ref
|
||||||
|
test: adaptive_executor_repartition
|
||||||
|
|
||||||
|
# ---------
|
||||||
|
# Tests that modify data should run sequentially
|
||||||
|
# ---------
|
||||||
|
test: with_prepare
|
||||||
|
|
||||||
|
# ---------
|
||||||
|
# Tests for recursive planning.
|
||||||
|
# ---------
|
||||||
|
test: with_nested with_where
|
||||||
|
test: cte_prepared_modify cte_nested_modification
|
||||||
|
test: with_executors with_partitioning with_dml
|
||||||
|
|
||||||
|
|
||||||
|
# ----------
|
||||||
|
# Tests to check our large record loading and shard deletion behavior
|
||||||
|
# ----------
|
||||||
|
test: multi_load_large_records
|
||||||
|
test: multi_master_delete_protocol
|
||||||
|
test: multi_shard_modify
|
||||||
|
|
||||||
|
# ----------
|
||||||
|
# Tests around DDL statements run on distributed tables
|
||||||
|
# ----------
|
||||||
|
|
||||||
|
# ----------
|
||||||
|
# multi_create_schema tests creation, loading, and querying of a table in a new
|
||||||
|
# schema (namespace).
|
||||||
|
# ----------
|
||||||
|
test: multi_create_schema
|
||||||
|
|
||||||
|
# ----------
|
||||||
|
# Tests to check if we inform the user about potential caveats of creating new
|
||||||
|
# databases, schemas, roles, and authentication information.
|
||||||
|
# ----------
|
||||||
|
|
||||||
|
# ----------
|
||||||
|
# Tests to check the sequential and parallel executions of DDL and modification
|
||||||
|
# commands
|
||||||
|
# Should not be executed in parallel with other tests
|
||||||
|
# ----------
|
||||||
|
|
||||||
|
# ---------
|
||||||
|
# loads data to create shards in a way that forces
|
||||||
|
# shard caching.
|
||||||
|
# ---------
|
||||||
|
|
||||||
|
# ---------
|
||||||
|
# multi_outer_join loads data to create shards to test outer join mappings
|
||||||
|
# ---------
|
||||||
|
test: multi_outer_join
|
||||||
|
|
||||||
|
# ---
|
||||||
|
# Tests covering mostly modification queries and required preliminary
|
||||||
|
# functionality related to metadata, shard creation, shard pruning and
|
||||||
|
# "hacky" copy script for hash partitioned tables.
|
||||||
|
# Note that the order of the following tests are important. multi_complex_count_distinct
|
||||||
|
# is independent from the rest of the group, it is added to increase parallelism.
|
||||||
|
# ---
|
||||||
|
test: multi_complex_count_distinct
|
||||||
|
test: multi_upsert multi_simple_queries
|
||||||
|
test: foreign_key_to_reference_table validate_constraint
|
||||||
|
|
||||||
|
# ---------
|
||||||
|
# creates hash and range-partitioned tables and performs COPY
|
||||||
|
# creates hash partitioned tables.
|
||||||
|
# ---------
|
||||||
|
test: fast_path_router_modify
|
||||||
|
test: null_parameters
|
||||||
|
|
||||||
|
# ----------
|
||||||
|
# loads more lineitem data using high shard identifiers
|
||||||
|
# ----------
|
||||||
|
|
||||||
|
# ----------
|
||||||
|
# tests various size commands on distributed tables
|
||||||
|
# ----------
|
||||||
|
|
||||||
|
# ----------
|
||||||
|
# multi_drop_extension makes sure we can safely drop and recreate the extension
|
||||||
|
# ----------
|
||||||
|
|
||||||
|
# ----------
|
||||||
|
# tests the propagation of mx-related metadata changes to metadata workers
|
||||||
|
# multi_unsupported_worker_operations tests that unsupported operations error out on metadata workers
|
||||||
|
# ----------
|
||||||
|
|
||||||
|
# ----------
|
||||||
|
# tests if the GRANT ... ON SCHEMA queries are propagated correctly
|
||||||
|
# makes sure we can work with tables in schemas other than public with no problem
|
||||||
|
# ----------
|
||||||
|
|
||||||
|
# ----------
|
||||||
|
# multi_function_evaluation tests edge-cases in master-side function pre-evaluation
|
||||||
|
# ----------
|
||||||
|
test: multi_function_evaluation
|
||||||
|
|
||||||
|
# ----------
|
||||||
|
# tests truncate functionality for distributed tables
|
||||||
|
# ----------
|
||||||
|
|
||||||
|
# ----------
|
||||||
|
# tests utility functions written for co-location feature & internal API
|
||||||
|
# tests master_copy_shard_placement with colocated tables.
|
||||||
|
# ----------
|
||||||
|
|
||||||
|
# ----------
|
||||||
|
# tests utility functions written for citus tools
|
||||||
|
# ----------
|
||||||
|
|
||||||
|
# ----------
|
||||||
|
# multi_foreign_key tests foreign key push down on distributed tables
|
||||||
|
# ----------
|
||||||
|
test: multi_foreign_key
|
||||||
|
|
||||||
|
# ----------
|
||||||
|
# tests for upgrade_reference_table UDF
|
||||||
|
# tests replicating reference tables to new nodes after we add new nodes
|
||||||
|
# tests metadata changes after master_remove_node
|
||||||
|
# ----------
|
||||||
|
|
||||||
|
# --------
|
||||||
|
# Replicating reference tables to coordinator. Add coordinator to pg_dist_node
|
||||||
|
# and rerun some of the tests.
|
||||||
|
# --------
|
||||||
|
test: foreign_key_to_reference_table
|
||||||
|
|
||||||
|
|
||||||
|
# ----------
|
||||||
|
# tests for dropping shards using connection API
|
||||||
|
# ----------
|
||||||
|
|
||||||
|
# ----------
|
||||||
|
# tests simple combinations of permission access and queries
|
||||||
|
# ----------
|
||||||
|
|
||||||
|
# ---------
|
||||||
|
# tests for an obscure crash citus used to exhibit when shardids
|
||||||
|
# changed the table they belonged to during a session
|
||||||
|
# --------
|
||||||
|
|
||||||
|
# ---------
|
||||||
|
# multi_task_string_size tests task string size checks
|
||||||
|
# ---------
|
||||||
|
test: multi_task_string_size
|
||||||
|
|
||||||
|
# ---------
|
||||||
|
# connection encryption tests
|
||||||
|
# ---------
|
||||||
|
|
||||||
|
# ---------
|
||||||
|
# object distribution tests
|
||||||
|
# ---------
|
||||||
|
test: distributed_types_xact_add_enum_value
|
||||||
|
|
||||||
|
# ---------
|
||||||
|
# deparsing logic tests
|
||||||
|
# ---------
|
||||||
|
|
||||||
|
# ---------
|
||||||
|
# test that no tests leaked intermediate results. This should always be last
|
||||||
|
# Causes random test failures so commented out for now
|
||||||
|
# ---------
|
||||||
|
# test:
|
|
@ -0,0 +1,608 @@
|
||||||
|
--
|
||||||
|
-- multi behavioral analytics
|
||||||
|
-- this file is intended to create the table requires for the tests
|
||||||
|
--
|
||||||
|
SET citus.next_shard_id TO 1400000;
|
||||||
|
SET citus.shard_replication_factor = 1;
|
||||||
|
SET citus.shard_count = 32;
|
||||||
|
CREATE SCHEMA with_basics;
|
||||||
|
SET search_path TO 'with_basics';
|
||||||
|
CREATE TABLE users_table (user_id int, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint);
|
||||||
|
SELECT create_distributed_table('users_table', 'user_id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE events_table (user_id int, time timestamp, event_type int, value_2 int, value_3 float, value_4 bigint);
|
||||||
|
SELECT create_distributed_table('events_table', 'user_id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\COPY users_table FROM '@abs_srcdir@/data/users_table.data' WITH CSV;
|
||||||
|
\COPY events_table FROM '@abs_srcdir@/data/events_table.data' WITH CSV;
|
||||||
|
SET citus.shard_count = 96;
|
||||||
|
CREATE SCHEMA subquery_and_ctes;
|
||||||
|
SET search_path TO subquery_and_ctes;
|
||||||
|
CREATE TABLE users_table (user_id int, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint);
|
||||||
|
SELECT create_distributed_table('users_table', 'user_id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE events_table (user_id int, time timestamp, event_type int, value_2 int, value_3 float, value_4 bigint);
|
||||||
|
SELECT create_distributed_table('events_table', 'user_id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\COPY users_table FROM '@abs_srcdir@/data/users_table.data' WITH CSV;
|
||||||
|
\COPY events_table FROM '@abs_srcdir@/data/events_table.data' WITH CSV;
|
||||||
|
SET citus.shard_count TO DEFAULT;
|
||||||
|
SET search_path TO DEFAULT;
|
||||||
|
CREATE TABLE users_table (user_id int, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint);
|
||||||
|
SELECT create_distributed_table('users_table', 'user_id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE events_table (user_id int, time timestamp, event_type int, value_2 int, value_3 float, value_4 bigint);
|
||||||
|
SELECT create_distributed_table('events_table', 'user_id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE agg_results (user_id int, value_1_agg int, value_2_agg int, value_3_agg float, value_4_agg bigint, agg_time timestamp);
|
||||||
|
SELECT create_distributed_table('agg_results', 'user_id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- we need this to improve the concurrency on the regression tests
|
||||||
|
CREATE TABLE agg_results_second (user_id int, value_1_agg int, value_2_agg int, value_3_agg float, value_4_agg bigint, agg_time timestamp);
|
||||||
|
SELECT create_distributed_table('agg_results_second', 'user_id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- same as agg_results_second
|
||||||
|
CREATE TABLE agg_results_third (user_id int, value_1_agg int, value_2_agg int, value_3_agg float, value_4_agg bigint, agg_time timestamp);
|
||||||
|
SELECT create_distributed_table('agg_results_third', 'user_id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- same as agg_results_second
|
||||||
|
CREATE TABLE agg_results_fourth (user_id int, value_1_agg int, value_2_agg int, value_3_agg float, value_4_agg bigint, agg_time timestamp);
|
||||||
|
SELECT create_distributed_table('agg_results_fourth', 'user_id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- same as agg_results_second
|
||||||
|
CREATE TABLE agg_results_window (user_id int, value_1_agg int, value_2_agg int, value_3_agg float, value_4_agg bigint, agg_time timestamp);
|
||||||
|
SELECT create_distributed_table('agg_results_window', 'user_id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE users_ref_test_table(id int, it_name varchar(25), k_no int);
|
||||||
|
SELECT create_reference_table('users_ref_test_table');
|
||||||
|
create_reference_table
|
||||||
|
------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO users_ref_test_table VALUES(1,'User_1',45);
|
||||||
|
INSERT INTO users_ref_test_table VALUES(2,'User_2',46);
|
||||||
|
INSERT INTO users_ref_test_table VALUES(3,'User_3',47);
|
||||||
|
INSERT INTO users_ref_test_table VALUES(4,'User_4',48);
|
||||||
|
INSERT INTO users_ref_test_table VALUES(5,'User_5',49);
|
||||||
|
INSERT INTO users_ref_test_table VALUES(6,'User_6',50);
|
||||||
|
\COPY users_table FROM '@abs_srcdir@/data/users_table.data' WITH CSV;
|
||||||
|
\COPY events_table FROM '@abs_srcdir@/data/events_table.data' WITH CSV;
|
||||||
|
-- create indexes for
|
||||||
|
CREATE INDEX is_index1 ON users_table(user_id);
|
||||||
|
CREATE INDEX is_index2 ON events_table(user_id);
|
||||||
|
CREATE INDEX is_index3 ON users_table(value_1);
|
||||||
|
CREATE INDEX is_index4 ON events_table(event_type);
|
||||||
|
CREATE INDEX is_index5 ON users_table(value_2);
|
||||||
|
CREATE INDEX is_index6 ON events_table(value_2);
|
||||||
|
-- Create composite type to use in subquery pushdown
|
||||||
|
CREATE TYPE user_composite_type AS
|
||||||
|
(
|
||||||
|
tenant_id BIGINT,
|
||||||
|
user_id BIGINT
|
||||||
|
);
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE FUNCTION cmp_user_composite_type_function(user_composite_type, user_composite_type) RETURNS int
|
||||||
|
LANGUAGE 'internal'
|
||||||
|
AS 'btrecordcmp'
|
||||||
|
IMMUTABLE
|
||||||
|
RETURNS NULL ON NULL INPUT;
|
||||||
|
$f$);
|
||||||
|
ERROR: permission denied for language internal
|
||||||
|
CONTEXT: SQL statement "
|
||||||
|
|
||||||
|
CREATE FUNCTION cmp_user_composite_type_function(user_composite_type, user_composite_type) RETURNS int
|
||||||
|
LANGUAGE 'internal'
|
||||||
|
AS 'btrecordcmp'
|
||||||
|
IMMUTABLE
|
||||||
|
RETURNS NULL ON NULL INPUT;
|
||||||
|
"
|
||||||
|
PL/pgSQL function run_command_on_master_and_workers(text) line 3 at EXECUTE
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE FUNCTION gt_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
|
||||||
|
LANGUAGE 'internal'
|
||||||
|
AS 'record_gt'
|
||||||
|
IMMUTABLE
|
||||||
|
RETURNS NULL ON NULL INPUT;
|
||||||
|
$f$);
|
||||||
|
ERROR: permission denied for language internal
|
||||||
|
CONTEXT: SQL statement "
|
||||||
|
|
||||||
|
CREATE FUNCTION gt_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
|
||||||
|
LANGUAGE 'internal'
|
||||||
|
AS 'record_gt'
|
||||||
|
IMMUTABLE
|
||||||
|
RETURNS NULL ON NULL INPUT;
|
||||||
|
"
|
||||||
|
PL/pgSQL function run_command_on_master_and_workers(text) line 3 at EXECUTE
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE FUNCTION ge_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
|
||||||
|
LANGUAGE 'internal'
|
||||||
|
AS 'record_ge'
|
||||||
|
IMMUTABLE
|
||||||
|
RETURNS NULL ON NULL INPUT;
|
||||||
|
$f$);
|
||||||
|
ERROR: permission denied for language internal
|
||||||
|
CONTEXT: SQL statement "
|
||||||
|
|
||||||
|
CREATE FUNCTION ge_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
|
||||||
|
LANGUAGE 'internal'
|
||||||
|
AS 'record_ge'
|
||||||
|
IMMUTABLE
|
||||||
|
RETURNS NULL ON NULL INPUT;
|
||||||
|
"
|
||||||
|
PL/pgSQL function run_command_on_master_and_workers(text) line 3 at EXECUTE
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE FUNCTION equal_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
|
||||||
|
LANGUAGE 'internal'
|
||||||
|
AS 'record_eq'
|
||||||
|
IMMUTABLE;
|
||||||
|
$f$);
|
||||||
|
ERROR: permission denied for language internal
|
||||||
|
CONTEXT: SQL statement "
|
||||||
|
|
||||||
|
CREATE FUNCTION equal_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
|
||||||
|
LANGUAGE 'internal'
|
||||||
|
AS 'record_eq'
|
||||||
|
IMMUTABLE;
|
||||||
|
"
|
||||||
|
PL/pgSQL function run_command_on_master_and_workers(text) line 3 at EXECUTE
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE FUNCTION lt_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
|
||||||
|
LANGUAGE 'internal'
|
||||||
|
AS 'record_lt'
|
||||||
|
IMMUTABLE
|
||||||
|
RETURNS NULL ON NULL INPUT;
|
||||||
|
$f$);
|
||||||
|
ERROR: permission denied for language internal
|
||||||
|
CONTEXT: SQL statement "
|
||||||
|
|
||||||
|
CREATE FUNCTION lt_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
|
||||||
|
LANGUAGE 'internal'
|
||||||
|
AS 'record_lt'
|
||||||
|
IMMUTABLE
|
||||||
|
RETURNS NULL ON NULL INPUT;
|
||||||
|
"
|
||||||
|
PL/pgSQL function run_command_on_master_and_workers(text) line 3 at EXECUTE
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE FUNCTION le_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
|
||||||
|
LANGUAGE 'internal'
|
||||||
|
AS 'record_lt'
|
||||||
|
IMMUTABLE
|
||||||
|
RETURNS NULL ON NULL INPUT;
|
||||||
|
$f$);
|
||||||
|
ERROR: permission denied for language internal
|
||||||
|
CONTEXT: SQL statement "
|
||||||
|
|
||||||
|
CREATE FUNCTION le_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
|
||||||
|
LANGUAGE 'internal'
|
||||||
|
AS 'record_lt'
|
||||||
|
IMMUTABLE
|
||||||
|
RETURNS NULL ON NULL INPUT;
|
||||||
|
"
|
||||||
|
PL/pgSQL function run_command_on_master_and_workers(text) line 3 at EXECUTE
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE OPERATOR > (
|
||||||
|
LEFTARG = user_composite_type,
|
||||||
|
RIGHTARG = user_composite_type,
|
||||||
|
PROCEDURE = gt_user_composite_type_function
|
||||||
|
);
|
||||||
|
$f$);
|
||||||
|
ERROR: function gt_user_composite_type_function(user_composite_type, user_composite_type) does not exist
|
||||||
|
CONTEXT: SQL statement "
|
||||||
|
|
||||||
|
CREATE OPERATOR > (
|
||||||
|
LEFTARG = user_composite_type,
|
||||||
|
RIGHTARG = user_composite_type,
|
||||||
|
PROCEDURE = gt_user_composite_type_function
|
||||||
|
);
|
||||||
|
"
|
||||||
|
PL/pgSQL function run_command_on_master_and_workers(text) line 3 at EXECUTE
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE OPERATOR >= (
|
||||||
|
LEFTARG = user_composite_type,
|
||||||
|
RIGHTARG = user_composite_type,
|
||||||
|
PROCEDURE = ge_user_composite_type_function
|
||||||
|
);
|
||||||
|
$f$);
|
||||||
|
ERROR: function ge_user_composite_type_function(user_composite_type, user_composite_type) does not exist
|
||||||
|
CONTEXT: SQL statement "
|
||||||
|
|
||||||
|
CREATE OPERATOR >= (
|
||||||
|
LEFTARG = user_composite_type,
|
||||||
|
RIGHTARG = user_composite_type,
|
||||||
|
PROCEDURE = ge_user_composite_type_function
|
||||||
|
);
|
||||||
|
"
|
||||||
|
PL/pgSQL function run_command_on_master_and_workers(text) line 3 at EXECUTE
|
||||||
|
-- ... use that function to create a custom equality operator...
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
-- ... use that function to create a custom equality operator...
|
||||||
|
CREATE OPERATOR = (
|
||||||
|
LEFTARG = user_composite_type,
|
||||||
|
RIGHTARG = user_composite_type,
|
||||||
|
PROCEDURE = equal_user_composite_type_function,
|
||||||
|
commutator = =,
|
||||||
|
RESTRICT = eqsel,
|
||||||
|
JOIN = eqjoinsel,
|
||||||
|
merges,
|
||||||
|
hashes
|
||||||
|
);
|
||||||
|
$f$);
|
||||||
|
ERROR: function equal_user_composite_type_function(user_composite_type, user_composite_type) does not exist
|
||||||
|
CONTEXT: SQL statement "
|
||||||
|
|
||||||
|
-- ... use that function to create a custom equality operator...
|
||||||
|
CREATE OPERATOR = (
|
||||||
|
LEFTARG = user_composite_type,
|
||||||
|
RIGHTARG = user_composite_type,
|
||||||
|
PROCEDURE = equal_user_composite_type_function,
|
||||||
|
commutator = =,
|
||||||
|
RESTRICT = eqsel,
|
||||||
|
JOIN = eqjoinsel,
|
||||||
|
merges,
|
||||||
|
hashes
|
||||||
|
);
|
||||||
|
"
|
||||||
|
PL/pgSQL function run_command_on_master_and_workers(text) line 3 at EXECUTE
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE OPERATOR <= (
|
||||||
|
LEFTARG = user_composite_type,
|
||||||
|
RIGHTARG = user_composite_type,
|
||||||
|
PROCEDURE = le_user_composite_type_function
|
||||||
|
);
|
||||||
|
$f$);
|
||||||
|
ERROR: function le_user_composite_type_function(user_composite_type, user_composite_type) does not exist
|
||||||
|
CONTEXT: SQL statement "
|
||||||
|
|
||||||
|
CREATE OPERATOR <= (
|
||||||
|
LEFTARG = user_composite_type,
|
||||||
|
RIGHTARG = user_composite_type,
|
||||||
|
PROCEDURE = le_user_composite_type_function
|
||||||
|
);
|
||||||
|
"
|
||||||
|
PL/pgSQL function run_command_on_master_and_workers(text) line 3 at EXECUTE
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE OPERATOR < (
|
||||||
|
LEFTARG = user_composite_type,
|
||||||
|
RIGHTARG = user_composite_type,
|
||||||
|
PROCEDURE = lt_user_composite_type_function
|
||||||
|
);
|
||||||
|
$f$);
|
||||||
|
ERROR: function lt_user_composite_type_function(user_composite_type, user_composite_type) does not exist
|
||||||
|
CONTEXT: SQL statement "
|
||||||
|
|
||||||
|
CREATE OPERATOR < (
|
||||||
|
LEFTARG = user_composite_type,
|
||||||
|
RIGHTARG = user_composite_type,
|
||||||
|
PROCEDURE = lt_user_composite_type_function
|
||||||
|
);
|
||||||
|
"
|
||||||
|
PL/pgSQL function run_command_on_master_and_workers(text) line 3 at EXECUTE
|
||||||
|
-- ... and create a custom operator family for hash indexes...
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE OPERATOR FAMILY cats_2_op_fam USING hash;
|
||||||
|
$f$);
|
||||||
|
ERROR: must be superuser to create an operator family
|
||||||
|
CONTEXT: SQL statement "
|
||||||
|
|
||||||
|
CREATE OPERATOR FAMILY cats_2_op_fam USING hash;
|
||||||
|
"
|
||||||
|
PL/pgSQL function run_command_on_master_and_workers(text) line 3 at EXECUTE
|
||||||
|
-- ... create a test HASH function. Though it is a poor hash function,
|
||||||
|
-- it is acceptable for our tests
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE FUNCTION test_composite_type_hash(user_composite_type) RETURNS int
|
||||||
|
AS 'SELECT hashtext( ($1.tenant_id + $1.tenant_id)::text);'
|
||||||
|
LANGUAGE SQL
|
||||||
|
IMMUTABLE
|
||||||
|
RETURNS NULL ON NULL INPUT;
|
||||||
|
$f$);
|
||||||
|
run_command_on_master_and_workers
|
||||||
|
-----------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- We need to define two different operator classes for the composite types
|
||||||
|
-- One uses BTREE the other uses HASH
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE OPERATOR CLASS cats_2_op_fam_clas3
|
||||||
|
DEFAULT FOR TYPE user_composite_type USING BTREE AS
|
||||||
|
OPERATOR 1 <= (user_composite_type, user_composite_type),
|
||||||
|
OPERATOR 2 < (user_composite_type, user_composite_type),
|
||||||
|
OPERATOR 3 = (user_composite_type, user_composite_type),
|
||||||
|
OPERATOR 4 >= (user_composite_type, user_composite_type),
|
||||||
|
OPERATOR 5 > (user_composite_type, user_composite_type),
|
||||||
|
|
||||||
|
FUNCTION 1 cmp_user_composite_type_function(user_composite_type, user_composite_type);
|
||||||
|
$f$);
|
||||||
|
ERROR: must be superuser to create an operator class
|
||||||
|
CONTEXT: SQL statement "
|
||||||
|
|
||||||
|
CREATE OPERATOR CLASS cats_2_op_fam_clas3
|
||||||
|
DEFAULT FOR TYPE user_composite_type USING BTREE AS
|
||||||
|
OPERATOR 1 <= (user_composite_type, user_composite_type),
|
||||||
|
OPERATOR 2 < (user_composite_type, user_composite_type),
|
||||||
|
OPERATOR 3 = (user_composite_type, user_composite_type),
|
||||||
|
OPERATOR 4 >= (user_composite_type, user_composite_type),
|
||||||
|
OPERATOR 5 > (user_composite_type, user_composite_type),
|
||||||
|
|
||||||
|
FUNCTION 1 cmp_user_composite_type_function(user_composite_type, user_composite_type);
|
||||||
|
"
|
||||||
|
PL/pgSQL function run_command_on_master_and_workers(text) line 3 at EXECUTE
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE OPERATOR CLASS cats_2_op_fam_class
|
||||||
|
DEFAULT FOR TYPE user_composite_type USING HASH AS
|
||||||
|
OPERATOR 1 = (user_composite_type, user_composite_type),
|
||||||
|
FUNCTION 1 test_composite_type_hash(user_composite_type);
|
||||||
|
$f$);
|
||||||
|
ERROR: must be superuser to create an operator class
|
||||||
|
CONTEXT: SQL statement "
|
||||||
|
|
||||||
|
CREATE OPERATOR CLASS cats_2_op_fam_class
|
||||||
|
DEFAULT FOR TYPE user_composite_type USING HASH AS
|
||||||
|
OPERATOR 1 = (user_composite_type, user_composite_type),
|
||||||
|
FUNCTION 1 test_composite_type_hash(user_composite_type);
|
||||||
|
"
|
||||||
|
PL/pgSQL function run_command_on_master_and_workers(text) line 3 at EXECUTE
|
||||||
|
CREATE TABLE events (
|
||||||
|
composite_id user_composite_type,
|
||||||
|
event_id bigint,
|
||||||
|
event_type character varying(255),
|
||||||
|
event_time bigint
|
||||||
|
);
|
||||||
|
SELECT master_create_distributed_table('events', 'composite_id', 'range');
|
||||||
|
master_create_distributed_table
|
||||||
|
---------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT master_create_empty_shard('events') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)'
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
ERROR: permission denied for table pg_dist_shard
|
||||||
|
SELECT master_create_empty_shard('events') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)'
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
ERROR: permission denied for table pg_dist_shard
|
||||||
|
SELECT master_create_empty_shard('events') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)'
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
ERROR: permission denied for table pg_dist_shard
|
||||||
|
SELECT master_create_empty_shard('events') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)'
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
ERROR: permission denied for table pg_dist_shard
|
||||||
|
\COPY events FROM STDIN WITH CSV
|
||||||
|
ERROR: could not start copy
|
||||||
|
DETAIL: Distributed relation "events" has shards with missing shardminvalue/shardmaxvalue.
|
||||||
|
"(1,1001)",20001,click,1472807012
|
||||||
|
"(1,1001)",20002,submit,1472807015
|
||||||
|
"(1,1001)",20003,pay,1472807020
|
||||||
|
"(1,1002)",20010,click,1472807022
|
||||||
|
"(1,1002)",20011,click,1472807023
|
||||||
|
"(1,1002)",20012,submit,1472807025
|
||||||
|
"(1,1002)",20013,pay,1472807030
|
||||||
|
"(1,1003)",20014,click,1472807032
|
||||||
|
"(1,1003)",20015,click,1472807033
|
||||||
|
"(1,1003)",20016,click,1472807034
|
||||||
|
"(1,1003)",20017,submit,1472807035
|
||||||
|
\.
|
||||||
|
invalid command \.
|
||||||
|
CREATE TABLE users (
|
||||||
|
composite_id user_composite_type,
|
||||||
|
lastseen bigint
|
||||||
|
);
|
||||||
|
ERROR: syntax error at or near ""(1,1001)""
|
||||||
|
LINE 1: "(1,1001)",20001,click,1472807012
|
||||||
|
^
|
||||||
|
SELECT master_create_distributed_table('users', 'composite_id', 'range');
|
||||||
|
ERROR: relation "users" does not exist
|
||||||
|
LINE 1: SELECT master_create_distributed_table('users', 'composite_i...
|
||||||
|
^
|
||||||
|
SELECT master_create_empty_shard('users') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
ERROR: relation "users" does not exist
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)'
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
ERROR: permission denied for table pg_dist_shard
|
||||||
|
SELECT master_create_empty_shard('users') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
ERROR: relation "users" does not exist
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)'
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
ERROR: permission denied for table pg_dist_shard
|
||||||
|
SELECT master_create_empty_shard('users') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
ERROR: relation "users" does not exist
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)'
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
ERROR: permission denied for table pg_dist_shard
|
||||||
|
SELECT master_create_empty_shard('users') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
ERROR: relation "users" does not exist
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)'
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
ERROR: permission denied for table pg_dist_shard
|
||||||
|
\COPY users FROM STDIN WITH CSV
|
||||||
|
ERROR: relation "users" does not exist
|
||||||
|
"(1,1001)",1472807115
|
||||||
|
"(1,1002)",1472807215
|
||||||
|
"(1,1003)",1472807315
|
||||||
|
\.
|
||||||
|
invalid command \.
|
||||||
|
-- Create tables for subquery tests
|
||||||
|
CREATE TABLE lineitem_subquery (
|
||||||
|
l_orderkey bigint not null,
|
||||||
|
l_partkey integer not null,
|
||||||
|
l_suppkey integer not null,
|
||||||
|
l_linenumber integer not null,
|
||||||
|
l_quantity decimal(15, 2) not null,
|
||||||
|
l_extendedprice decimal(15, 2) not null,
|
||||||
|
l_discount decimal(15, 2) not null,
|
||||||
|
l_tax decimal(15, 2) not null,
|
||||||
|
l_returnflag char(1) not null,
|
||||||
|
l_linestatus char(1) not null,
|
||||||
|
l_shipdate date not null,
|
||||||
|
l_commitdate date not null,
|
||||||
|
l_receiptdate date not null,
|
||||||
|
l_shipinstruct char(25) not null,
|
||||||
|
l_shipmode char(10) not null,
|
||||||
|
l_comment varchar(44) not null,
|
||||||
|
PRIMARY KEY(l_orderkey, l_linenumber) );
|
||||||
|
ERROR: syntax error at or near ""(1,1001)""
|
||||||
|
LINE 1: "(1,1001)",1472807115
|
||||||
|
^
|
||||||
|
SELECT master_create_distributed_table('lineitem_subquery', 'l_orderkey', 'range');
|
||||||
|
ERROR: relation "lineitem_subquery" does not exist
|
||||||
|
LINE 1: SELECT master_create_distributed_table('lineitem_subquery', ...
|
||||||
|
^
|
||||||
|
CREATE TABLE orders_subquery (
|
||||||
|
o_orderkey bigint not null,
|
||||||
|
o_custkey integer not null,
|
||||||
|
o_orderstatus char(1) not null,
|
||||||
|
o_totalprice decimal(15,2) not null,
|
||||||
|
o_orderdate date not null,
|
||||||
|
o_orderpriority char(15) not null,
|
||||||
|
o_clerk char(15) not null,
|
||||||
|
o_shippriority integer not null,
|
||||||
|
o_comment varchar(79) not null,
|
||||||
|
PRIMARY KEY(o_orderkey) );
|
||||||
|
SELECT master_create_distributed_table('orders_subquery', 'o_orderkey', 'range');
|
||||||
|
master_create_distributed_table
|
||||||
|
---------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET citus.enable_router_execution TO 'false';
|
||||||
|
-- Check that we don't crash if there are not any shards.
|
||||||
|
SELECT
|
||||||
|
avg(unit_price)
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
l_orderkey,
|
||||||
|
avg(o_totalprice) AS unit_price
|
||||||
|
FROM
|
||||||
|
lineitem_subquery,
|
||||||
|
orders_subquery
|
||||||
|
WHERE
|
||||||
|
l_orderkey = o_orderkey
|
||||||
|
GROUP BY
|
||||||
|
l_orderkey) AS unit_prices;
|
||||||
|
ERROR: relation "lineitem_subquery" does not exist
|
||||||
|
LINE 8: lineitem_subquery,
|
||||||
|
^
|
||||||
|
-- Load data into tables.
|
||||||
|
SELECT master_create_empty_shard('lineitem_subquery') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
ERROR: relation "lineitem_subquery" does not exist
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 5986
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
ERROR: permission denied for table pg_dist_shard
|
||||||
|
SELECT master_create_empty_shard('lineitem_subquery') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
ERROR: relation "lineitem_subquery" does not exist
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = 8997, shardmaxvalue = 14947
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
ERROR: permission denied for table pg_dist_shard
|
||||||
|
SELECT master_create_empty_shard('orders_subquery') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 5986
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
ERROR: permission denied for table pg_dist_shard
|
||||||
|
SELECT master_create_empty_shard('orders_subquery') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = 8997, shardmaxvalue = 14947
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
ERROR: permission denied for table pg_dist_shard
|
||||||
|
SET citus.shard_max_size TO "1MB";
|
||||||
|
\copy lineitem_subquery FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|'
|
||||||
|
ERROR: relation "lineitem_subquery" does not exist
|
||||||
|
\copy lineitem_subquery FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|'
|
||||||
|
ERROR: relation "lineitem_subquery" does not exist
|
||||||
|
\copy orders_subquery FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|'
|
||||||
|
ERROR: could not start copy
|
||||||
|
DETAIL: Distributed relation "orders_subquery" has shards with missing shardminvalue/shardmaxvalue.
|
||||||
|
\copy orders_subquery FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|'
|
||||||
|
ERROR: could not start copy
|
||||||
|
DETAIL: Distributed relation "orders_subquery" has shards with missing shardminvalue/shardmaxvalue.
|
||||||
|
CREATE TABLE events_reference_table (like events_table including all);
|
||||||
|
SELECT create_reference_table('events_reference_table');
|
||||||
|
create_reference_table
|
||||||
|
------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE INDEX events_ref_val2 on events_reference_table(value_2);
|
||||||
|
INSERT INTO events_reference_table SELECT * FROM events_table;
|
||||||
|
CREATE TABLE users_reference_table (like users_table including all);
|
||||||
|
SELECT create_reference_table('users_reference_table');
|
||||||
|
create_reference_table
|
||||||
|
------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO users_reference_table SELECT * FROM users_table;
|
|
@ -0,0 +1,21 @@
|
||||||
|
--
|
||||||
|
-- MULTI_LOAD_DATA
|
||||||
|
--
|
||||||
|
-- Tests for loading data in a distributed cluster. Please note that the number
|
||||||
|
-- of shards uploaded depends on two config values: citus.shard_replication_factor and
|
||||||
|
-- citus.shard_max_size. These values are set in pg_regress_multi.pl. Shard placement
|
||||||
|
-- policy is left to the default value (round-robin) to test the common install case.
|
||||||
|
SET citus.next_shard_id TO 290000;
|
||||||
|
\copy lineitem FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|'
|
||||||
|
\copy lineitem FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|'
|
||||||
|
\copy orders FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|'
|
||||||
|
\copy orders FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|'
|
||||||
|
\copy orders_reference FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|'
|
||||||
|
\copy orders_reference FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|'
|
||||||
|
\copy customer FROM '@abs_srcdir@/data/customer.1.data' with delimiter '|'
|
||||||
|
\copy customer_append FROM '@abs_srcdir@/data/customer.1.data' with delimiter '|'
|
||||||
|
\copy nation FROM '@abs_srcdir@/data/nation.data' with delimiter '|'
|
||||||
|
\copy part FROM '@abs_srcdir@/data/part.data' with delimiter '|'
|
||||||
|
\copy part_append FROM '@abs_srcdir@/data/part.data' with delimiter '|'
|
||||||
|
\copy supplier FROM '@abs_srcdir@/data/supplier.data' with delimiter '|'
|
||||||
|
\copy supplier_single_shard FROM '@abs_srcdir@/data/supplier.data' with delimiter '|'
|
|
@ -0,0 +1,290 @@
|
||||||
|
--
|
||||||
|
-- MULTI_CREATE_TABLE
|
||||||
|
--
|
||||||
|
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 360000;
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 100000;
|
||||||
|
|
||||||
|
-- Create new table definitions for use in testing in distributed planning and
|
||||||
|
-- execution functionality. Also create indexes to boost performance. Since we
|
||||||
|
-- need to cover both reference join and partitioned join, we have created
|
||||||
|
-- reference and append distributed version of orders, customer and part tables.
|
||||||
|
|
||||||
|
CREATE TABLE lineitem (
|
||||||
|
l_orderkey bigint not null,
|
||||||
|
l_partkey integer not null,
|
||||||
|
l_suppkey integer not null,
|
||||||
|
l_linenumber integer not null,
|
||||||
|
l_quantity decimal(15, 2) not null,
|
||||||
|
l_extendedprice decimal(15, 2) not null,
|
||||||
|
l_discount decimal(15, 2) not null,
|
||||||
|
l_tax decimal(15, 2) not null,
|
||||||
|
l_returnflag char(1) not null,
|
||||||
|
l_linestatus char(1) not null,
|
||||||
|
l_shipdate date not null,
|
||||||
|
l_commitdate date not null,
|
||||||
|
l_receiptdate date not null,
|
||||||
|
l_shipinstruct char(25) not null,
|
||||||
|
l_shipmode char(10) not null,
|
||||||
|
l_comment varchar(44) not null,
|
||||||
|
PRIMARY KEY(l_orderkey, l_linenumber) );
|
||||||
|
SELECT create_distributed_table('lineitem', 'l_orderkey', 'append');
|
||||||
|
|
||||||
|
CREATE INDEX lineitem_time_index ON lineitem (l_shipdate);
|
||||||
|
|
||||||
|
CREATE TABLE orders (
|
||||||
|
o_orderkey bigint not null,
|
||||||
|
o_custkey integer not null,
|
||||||
|
o_orderstatus char(1) not null,
|
||||||
|
o_totalprice decimal(15,2) not null,
|
||||||
|
o_orderdate date not null,
|
||||||
|
o_orderpriority char(15) not null,
|
||||||
|
o_clerk char(15) not null,
|
||||||
|
o_shippriority integer not null,
|
||||||
|
o_comment varchar(79) not null,
|
||||||
|
PRIMARY KEY(o_orderkey) );
|
||||||
|
SELECT create_distributed_table('orders', 'o_orderkey', 'append');
|
||||||
|
|
||||||
|
CREATE TABLE orders_reference (
|
||||||
|
o_orderkey bigint not null,
|
||||||
|
o_custkey integer not null,
|
||||||
|
o_orderstatus char(1) not null,
|
||||||
|
o_totalprice decimal(15,2) not null,
|
||||||
|
o_orderdate date not null,
|
||||||
|
o_orderpriority char(15) not null,
|
||||||
|
o_clerk char(15) not null,
|
||||||
|
o_shippriority integer not null,
|
||||||
|
o_comment varchar(79) not null,
|
||||||
|
PRIMARY KEY(o_orderkey) );
|
||||||
|
SELECT create_reference_table('orders_reference');
|
||||||
|
|
||||||
|
|
||||||
|
CREATE TABLE customer (
|
||||||
|
c_custkey integer not null,
|
||||||
|
c_name varchar(25) not null,
|
||||||
|
c_address varchar(40) not null,
|
||||||
|
c_nationkey integer not null,
|
||||||
|
c_phone char(15) not null,
|
||||||
|
c_acctbal decimal(15,2) not null,
|
||||||
|
c_mktsegment char(10) not null,
|
||||||
|
c_comment varchar(117) not null);
|
||||||
|
SELECT create_reference_table('customer');
|
||||||
|
|
||||||
|
CREATE TABLE customer_append (
|
||||||
|
c_custkey integer not null,
|
||||||
|
c_name varchar(25) not null,
|
||||||
|
c_address varchar(40) not null,
|
||||||
|
c_nationkey integer not null,
|
||||||
|
c_phone char(15) not null,
|
||||||
|
c_acctbal decimal(15,2) not null,
|
||||||
|
c_mktsegment char(10) not null,
|
||||||
|
c_comment varchar(117) not null);
|
||||||
|
SELECT create_distributed_table('customer_append', 'c_custkey', 'append');
|
||||||
|
|
||||||
|
CREATE TABLE nation (
|
||||||
|
n_nationkey integer not null,
|
||||||
|
n_name char(25) not null,
|
||||||
|
n_regionkey integer not null,
|
||||||
|
n_comment varchar(152));
|
||||||
|
|
||||||
|
SELECT create_reference_table('nation');
|
||||||
|
|
||||||
|
CREATE TABLE part (
|
||||||
|
p_partkey integer not null,
|
||||||
|
p_name varchar(55) not null,
|
||||||
|
p_mfgr char(25) not null,
|
||||||
|
p_brand char(10) not null,
|
||||||
|
p_type varchar(25) not null,
|
||||||
|
p_size integer not null,
|
||||||
|
p_container char(10) not null,
|
||||||
|
p_retailprice decimal(15,2) not null,
|
||||||
|
p_comment varchar(23) not null);
|
||||||
|
SELECT create_reference_table('part');
|
||||||
|
|
||||||
|
CREATE TABLE part_append (
|
||||||
|
p_partkey integer not null,
|
||||||
|
p_name varchar(55) not null,
|
||||||
|
p_mfgr char(25) not null,
|
||||||
|
p_brand char(10) not null,
|
||||||
|
p_type varchar(25) not null,
|
||||||
|
p_size integer not null,
|
||||||
|
p_container char(10) not null,
|
||||||
|
p_retailprice decimal(15,2) not null,
|
||||||
|
p_comment varchar(23) not null);
|
||||||
|
SELECT create_distributed_table('part_append', 'p_partkey', 'append');
|
||||||
|
|
||||||
|
CREATE TABLE supplier
|
||||||
|
(
|
||||||
|
s_suppkey integer not null,
|
||||||
|
s_name char(25) not null,
|
||||||
|
s_address varchar(40) not null,
|
||||||
|
s_nationkey integer,
|
||||||
|
s_phone char(15) not null,
|
||||||
|
s_acctbal decimal(15,2) not null,
|
||||||
|
s_comment varchar(101) not null
|
||||||
|
);
|
||||||
|
SELECT create_reference_table('supplier');
|
||||||
|
|
||||||
|
-- create a single shard supplier table which is not
|
||||||
|
-- a reference table
|
||||||
|
CREATE TABLE supplier_single_shard
|
||||||
|
(
|
||||||
|
s_suppkey integer not null,
|
||||||
|
s_name char(25) not null,
|
||||||
|
s_address varchar(40) not null,
|
||||||
|
s_nationkey integer,
|
||||||
|
s_phone char(15) not null,
|
||||||
|
s_acctbal decimal(15,2) not null,
|
||||||
|
s_comment varchar(101) not null
|
||||||
|
);
|
||||||
|
SELECT create_distributed_table('supplier_single_shard', 's_suppkey', 'append');
|
||||||
|
|
||||||
|
CREATE TABLE mx_table_test (col1 int, col2 text);
|
||||||
|
|
||||||
|
-- Since we're superuser, we can set the replication model to 'streaming' to
|
||||||
|
-- create a one-off MX table... but if we forget to set the replication factor to one,
|
||||||
|
-- we should see an error reminding us to fix that
|
||||||
|
SET citus.replication_model TO 'streaming';
|
||||||
|
SELECT create_distributed_table('mx_table_test', 'col1');
|
||||||
|
|
||||||
|
-- ok, so now actually create the one-off MX table
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SELECT create_distributed_table('mx_table_test', 'col1');
|
||||||
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regclass;
|
||||||
|
DROP TABLE mx_table_test;
|
||||||
|
|
||||||
|
-- Show that master_create_distributed_table ignores citus.replication_model GUC
|
||||||
|
CREATE TABLE s_table(a int);
|
||||||
|
SELECT master_create_distributed_table('s_table', 'a', 'hash');
|
||||||
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='s_table'::regclass;
|
||||||
|
|
||||||
|
-- Show that master_create_worker_shards complains when RF>1 and replication model is streaming
|
||||||
|
UPDATE pg_dist_partition SET repmodel = 's' WHERE logicalrelid='s_table'::regclass;
|
||||||
|
SELECT master_create_worker_shards('s_table', 4, 2);
|
||||||
|
|
||||||
|
DROP TABLE s_table;
|
||||||
|
|
||||||
|
RESET citus.replication_model;
|
||||||
|
|
||||||
|
-- Show that create_distributed_table with append and range distributions ignore
|
||||||
|
-- citus.replication_model GUC
|
||||||
|
SET citus.shard_replication_factor TO 2;
|
||||||
|
SET citus.replication_model TO streaming;
|
||||||
|
|
||||||
|
CREATE TABLE repmodel_test (a int);
|
||||||
|
SELECT create_distributed_table('repmodel_test', 'a', 'append');
|
||||||
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
|
||||||
|
DROP TABLE repmodel_test;
|
||||||
|
|
||||||
|
CREATE TABLE repmodel_test (a int);
|
||||||
|
SELECT create_distributed_table('repmodel_test', 'a', 'range');
|
||||||
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
|
||||||
|
DROP TABLE repmodel_test;
|
||||||
|
|
||||||
|
-- Show that master_create_distributed_table created statement replicated tables no matter
|
||||||
|
-- what citus.replication_model set to
|
||||||
|
|
||||||
|
CREATE TABLE repmodel_test (a int);
|
||||||
|
SELECT master_create_distributed_table('repmodel_test', 'a', 'hash');
|
||||||
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
|
||||||
|
DROP TABLE repmodel_test;
|
||||||
|
|
||||||
|
CREATE TABLE repmodel_test (a int);
|
||||||
|
SELECT master_create_distributed_table('repmodel_test', 'a', 'append');
|
||||||
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
|
||||||
|
DROP TABLE repmodel_test;
|
||||||
|
|
||||||
|
CREATE TABLE repmodel_test (a int);
|
||||||
|
SELECT master_create_distributed_table('repmodel_test', 'a', 'range');
|
||||||
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
|
||||||
|
DROP TABLE repmodel_test;
|
||||||
|
|
||||||
|
-- Check that the replication_model overwrite behavior is the same with RF=1
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
|
||||||
|
CREATE TABLE repmodel_test (a int);
|
||||||
|
SELECT create_distributed_table('repmodel_test', 'a', 'append');
|
||||||
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
|
||||||
|
DROP TABLE repmodel_test;
|
||||||
|
|
||||||
|
CREATE TABLE repmodel_test (a int);
|
||||||
|
SELECT create_distributed_table('repmodel_test', 'a', 'range');
|
||||||
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
|
||||||
|
DROP TABLE repmodel_test;
|
||||||
|
|
||||||
|
CREATE TABLE repmodel_test (a int);
|
||||||
|
SELECT master_create_distributed_table('repmodel_test', 'a', 'hash');
|
||||||
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
|
||||||
|
DROP TABLE repmodel_test;
|
||||||
|
|
||||||
|
CREATE TABLE repmodel_test (a int);
|
||||||
|
SELECT master_create_distributed_table('repmodel_test', 'a', 'append');
|
||||||
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
|
||||||
|
DROP TABLE repmodel_test;
|
||||||
|
|
||||||
|
CREATE TABLE repmodel_test (a int);
|
||||||
|
SELECT master_create_distributed_table('repmodel_test', 'a', 'range');
|
||||||
|
SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass;
|
||||||
|
DROP TABLE repmodel_test;
|
||||||
|
|
||||||
|
RESET citus.replication_model;
|
||||||
|
|
||||||
|
-- Test initial data loading
|
||||||
|
CREATE TABLE data_load_test (col1 int, col2 text, col3 serial);
|
||||||
|
INSERT INTO data_load_test VALUES (132, 'hello');
|
||||||
|
INSERT INTO data_load_test VALUES (243, 'world');
|
||||||
|
|
||||||
|
-- table must be empty when using append- or range-partitioning
|
||||||
|
SELECT create_distributed_table('data_load_test', 'col1', 'append');
|
||||||
|
SELECT create_distributed_table('data_load_test', 'col1', 'range');
|
||||||
|
|
||||||
|
-- table must be empty when using master_create_distributed_table (no shards created)
|
||||||
|
SELECT master_create_distributed_table('data_load_test', 'col1', 'hash');
|
||||||
|
|
||||||
|
-- create_distributed_table creates shards and copies data into the distributed table
|
||||||
|
SELECT create_distributed_table('data_load_test', 'col1');
|
||||||
|
SELECT * FROM data_load_test ORDER BY col1;
|
||||||
|
DROP TABLE data_load_test;
|
||||||
|
|
||||||
|
-- test queries on distributed tables with no shards
|
||||||
|
CREATE TABLE no_shard_test (col1 int, col2 text);
|
||||||
|
SELECT create_distributed_table('no_shard_test', 'col1', 'append');
|
||||||
|
SELECT * FROM no_shard_test WHERE col1 > 1;
|
||||||
|
DROP TABLE no_shard_test;
|
||||||
|
|
||||||
|
CREATE TABLE no_shard_test (col1 int, col2 text);
|
||||||
|
SELECT create_distributed_table('no_shard_test', 'col1', 'range');
|
||||||
|
SELECT * FROM no_shard_test WHERE col1 > 1;
|
||||||
|
DROP TABLE no_shard_test;
|
||||||
|
|
||||||
|
CREATE TABLE no_shard_test (col1 int, col2 text);
|
||||||
|
SELECT master_create_distributed_table('no_shard_test', 'col1', 'hash');
|
||||||
|
SELECT * FROM no_shard_test WHERE col1 > 1;
|
||||||
|
DROP TABLE no_shard_test;
|
||||||
|
|
||||||
|
-- ensure writes in the same transaction as create_distributed_table are visible
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE data_load_test (col1 int, col2 text, col3 serial);
|
||||||
|
INSERT INTO data_load_test VALUES (132, 'hello');
|
||||||
|
SELECT create_distributed_table('data_load_test', 'col1');
|
||||||
|
INSERT INTO data_load_test VALUES (243, 'world');
|
||||||
|
END;
|
||||||
|
SELECT * FROM data_load_test ORDER BY col1;
|
||||||
|
DROP TABLE data_load_test;
|
||||||
|
|
||||||
|
-- creating co-located distributed tables in the same transaction works
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE data_load_test1 (col1 int, col2 text, col3 serial);
|
||||||
|
INSERT INTO data_load_test1 VALUES (132, 'hello');
|
||||||
|
SELECT create_distributed_table('data_load_test1', 'col1');
|
||||||
|
|
||||||
|
CREATE TABLE data_load_test2 (col1 int, col2 text, col3 serial);
|
||||||
|
INSERT INTO data_load_test2 VALUES (132, 'world');
|
||||||
|
SELECT create_distributed_table('data_load_test2', 'col1');
|
||||||
|
|
||||||
|
SELECT a.col2 ||' '|| b.col2
|
||||||
|
FROM data_load_test1 a JOIN data_load_test2 b USING (col1)
|
||||||
|
WHERE col1 = 132;
|
||||||
|
|
||||||
|
DROP TABLE data_load_test1, data_load_test2;
|
||||||
|
END;
|
|
@ -0,0 +1,106 @@
|
||||||
|
-- File to create functions and helpers needed for subsequent tests
|
||||||
|
|
||||||
|
-- create a helper function to create objects on each node
|
||||||
|
CREATE OR REPLACE FUNCTION run_command_on_master_and_workers(p_sql text)
|
||||||
|
RETURNS void LANGUAGE plpgsql AS $$
|
||||||
|
BEGIN
|
||||||
|
EXECUTE p_sql;
|
||||||
|
PERFORM run_command_on_workers(p_sql);
|
||||||
|
END;$$;
|
||||||
|
|
||||||
|
-- Create a function to make sure that queries returning the same result
|
||||||
|
CREATE OR REPLACE FUNCTION raise_failed_execution(query text) RETURNS void AS $$
|
||||||
|
BEGIN
|
||||||
|
EXECUTE query;
|
||||||
|
EXCEPTION WHEN OTHERS THEN
|
||||||
|
IF SQLERRM LIKE 'failed to execute task%' THEN
|
||||||
|
RAISE 'Task failed to execute';
|
||||||
|
END IF;
|
||||||
|
END;
|
||||||
|
$$LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
-- Create a function to ignore worker plans in explain output
|
||||||
|
CREATE OR REPLACE FUNCTION coordinator_plan(explain_commmand text, out query_plan text)
|
||||||
|
RETURNS SETOF TEXT AS $$
|
||||||
|
BEGIN
|
||||||
|
FOR query_plan IN execute explain_commmand LOOP
|
||||||
|
RETURN next;
|
||||||
|
IF query_plan LIKE '%Task Count:%'
|
||||||
|
THEN
|
||||||
|
RETURN;
|
||||||
|
END IF;
|
||||||
|
END LOOP;
|
||||||
|
RETURN;
|
||||||
|
END; $$ language plpgsql;
|
||||||
|
|
||||||
|
-- Is a distributed plan?
|
||||||
|
CREATE OR REPLACE FUNCTION plan_is_distributed(explain_commmand text)
|
||||||
|
RETURNS BOOLEAN AS $$
|
||||||
|
DECLARE
|
||||||
|
query_plan TEXT;
|
||||||
|
BEGIN
|
||||||
|
FOR query_plan IN execute explain_commmand LOOP
|
||||||
|
IF query_plan LIKE '%Task Count:%'
|
||||||
|
THEN
|
||||||
|
RETURN TRUE;
|
||||||
|
END IF;
|
||||||
|
END LOOP;
|
||||||
|
RETURN FALSE;
|
||||||
|
END; $$ language plpgsql;
|
||||||
|
|
||||||
|
-- helper function to quickly run SQL on the whole cluster
|
||||||
|
CREATE OR REPLACE FUNCTION run_command_on_coordinator_and_workers(p_sql text)
|
||||||
|
RETURNS void LANGUAGE plpgsql AS $$
|
||||||
|
BEGIN
|
||||||
|
EXECUTE p_sql;
|
||||||
|
PERFORM run_command_on_workers(p_sql);
|
||||||
|
END;$$;
|
||||||
|
|
||||||
|
-- 1. Marks the given procedure as colocated with the given table.
|
||||||
|
-- 2. Marks the argument index with which we route the procedure.
|
||||||
|
CREATE OR REPLACE FUNCTION colocate_proc_with_table(procname text, tablerelid regclass, argument_index int)
|
||||||
|
RETURNS void LANGUAGE plpgsql AS $$
|
||||||
|
BEGIN
|
||||||
|
update citus.pg_dist_object
|
||||||
|
set distribution_argument_index = argument_index, colocationid = pg_dist_partition.colocationid
|
||||||
|
from pg_proc, pg_dist_partition
|
||||||
|
where proname = procname and oid = objid and pg_dist_partition.logicalrelid = tablerelid;
|
||||||
|
END;$$;
|
||||||
|
|
||||||
|
-- helper function to verify the function of a coordinator is the same on all workers
|
||||||
|
CREATE OR REPLACE FUNCTION verify_function_is_same_on_workers(funcname text)
|
||||||
|
RETURNS bool
|
||||||
|
LANGUAGE plpgsql
|
||||||
|
AS $func$
|
||||||
|
DECLARE
|
||||||
|
coordinatorSql text;
|
||||||
|
workerSql text;
|
||||||
|
BEGIN
|
||||||
|
SELECT pg_get_functiondef(funcname::regprocedure) INTO coordinatorSql;
|
||||||
|
FOR workerSql IN SELECT result FROM run_command_on_workers('SELECT pg_get_functiondef(' || quote_literal(funcname) || '::regprocedure)') LOOP
|
||||||
|
IF workerSql != coordinatorSql THEN
|
||||||
|
RAISE INFO 'functions are different, coordinator:% worker:%', coordinatorSql, workerSql;
|
||||||
|
RETURN false;
|
||||||
|
END IF;
|
||||||
|
END LOOP;
|
||||||
|
|
||||||
|
RETURN true;
|
||||||
|
END;
|
||||||
|
$func$;
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Procedure for creating shards for range partitioned distributed table.
|
||||||
|
--
|
||||||
|
CREATE OR REPLACE PROCEDURE create_range_partitioned_shards(rel regclass, minvalues text[], maxvalues text[])
|
||||||
|
AS $$
|
||||||
|
DECLARE
|
||||||
|
new_shardid bigint;
|
||||||
|
idx int;
|
||||||
|
BEGIN
|
||||||
|
FOR idx IN SELECT * FROM generate_series(1, array_length(minvalues, 1))
|
||||||
|
LOOP
|
||||||
|
SELECT master_create_empty_shard(rel::text) INTO new_shardid;
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue=minvalues[idx], shardmaxvalue=maxvalues[idx] WHERE shardid=new_shardid;
|
||||||
|
END LOOP;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql;
|
Loading…
Reference in New Issue