diff --git a/src/test/regress/Makefile b/src/test/regress/Makefile index d1a3881f2..95d813511 100644 --- a/src/test/regress/Makefile +++ b/src/test/regress/Makefile @@ -124,6 +124,10 @@ check-multi-non-adaptive: all --server-option=citus.task_executor_type=real-time \ -- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/multi_schedule $(EXTRA_TESTS) +check-multi-hyperscale: all + $(pg_regress_multi_check) --constr="$(constr)" --hoststr="$(hoststr)" --load-extension=citus \ + -- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/multi_schedule_hyperscale $(EXTRA_TESTS) + check-multi-hyperscale-superuser: all $(pg_regress_multi_check) --constr="$(constr)" --hoststr="$(hoststr)" --load-extension=citus \ -- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/multi_schedule_hyperscale_superuser $(EXTRA_TESTS) diff --git a/src/test/regress/expected/multi_create_table_shadow.out b/src/test/regress/expected/multi_create_table_shadow.out new file mode 100644 index 000000000..9b58724e3 --- /dev/null +++ b/src/test/regress/expected/multi_create_table_shadow.out @@ -0,0 +1,508 @@ +-- +-- MULTI_CREATE_TABLE +-- +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 360000; +ERROR: must be owner of sequence pg_dist_shardid_seq +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 100000; +ERROR: must be owner of sequence pg_dist_colocationid_seq +-- Create new table definitions for use in testing in distributed planning and +-- execution functionality. Also create indexes to boost performance. Since we +-- need to cover both reference join and partitioned join, we have created +-- reference and append distributed version of orders, customer and part tables. +CREATE TABLE lineitem ( + l_orderkey bigint not null, + l_partkey integer not null, + l_suppkey integer not null, + l_linenumber integer not null, + l_quantity decimal(15, 2) not null, + l_extendedprice decimal(15, 2) not null, + l_discount decimal(15, 2) not null, + l_tax decimal(15, 2) not null, + l_returnflag char(1) not null, + l_linestatus char(1) not null, + l_shipdate date not null, + l_commitdate date not null, + l_receiptdate date not null, + l_shipinstruct char(25) not null, + l_shipmode char(10) not null, + l_comment varchar(44) not null, + PRIMARY KEY(l_orderkey, l_linenumber) ); +SELECT create_distributed_table('lineitem', 'l_orderkey', 'append'); +WARNING: table "lineitem" has a UNIQUE or EXCLUDE constraint +DETAIL: UNIQUE constraints, EXCLUDE constraints, and PRIMARY KEYs on append-partitioned tables cannot be enforced. +HINT: Consider using hash partitioning. + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE INDEX lineitem_time_index ON lineitem (l_shipdate); +CREATE TABLE orders ( + o_orderkey bigint not null, + o_custkey integer not null, + o_orderstatus char(1) not null, + o_totalprice decimal(15,2) not null, + o_orderdate date not null, + o_orderpriority char(15) not null, + o_clerk char(15) not null, + o_shippriority integer not null, + o_comment varchar(79) not null, + PRIMARY KEY(o_orderkey) ); +SELECT create_distributed_table('orders', 'o_orderkey', 'append'); +WARNING: table "orders" has a UNIQUE or EXCLUDE constraint +DETAIL: UNIQUE constraints, EXCLUDE constraints, and PRIMARY KEYs on append-partitioned tables cannot be enforced. +HINT: Consider using hash partitioning. + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE orders_reference ( + o_orderkey bigint not null, + o_custkey integer not null, + o_orderstatus char(1) not null, + o_totalprice decimal(15,2) not null, + o_orderdate date not null, + o_orderpriority char(15) not null, + o_clerk char(15) not null, + o_shippriority integer not null, + o_comment varchar(79) not null, + PRIMARY KEY(o_orderkey) ); +SELECT create_reference_table('orders_reference'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE customer ( + c_custkey integer not null, + c_name varchar(25) not null, + c_address varchar(40) not null, + c_nationkey integer not null, + c_phone char(15) not null, + c_acctbal decimal(15,2) not null, + c_mktsegment char(10) not null, + c_comment varchar(117) not null); +SELECT create_reference_table('customer'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE customer_append ( + c_custkey integer not null, + c_name varchar(25) not null, + c_address varchar(40) not null, + c_nationkey integer not null, + c_phone char(15) not null, + c_acctbal decimal(15,2) not null, + c_mktsegment char(10) not null, + c_comment varchar(117) not null); +SELECT create_distributed_table('customer_append', 'c_custkey', 'append'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE nation ( + n_nationkey integer not null, + n_name char(25) not null, + n_regionkey integer not null, + n_comment varchar(152)); +SELECT create_reference_table('nation'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE part ( + p_partkey integer not null, + p_name varchar(55) not null, + p_mfgr char(25) not null, + p_brand char(10) not null, + p_type varchar(25) not null, + p_size integer not null, + p_container char(10) not null, + p_retailprice decimal(15,2) not null, + p_comment varchar(23) not null); +SELECT create_reference_table('part'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE part_append ( + p_partkey integer not null, + p_name varchar(55) not null, + p_mfgr char(25) not null, + p_brand char(10) not null, + p_type varchar(25) not null, + p_size integer not null, + p_container char(10) not null, + p_retailprice decimal(15,2) not null, + p_comment varchar(23) not null); +SELECT create_distributed_table('part_append', 'p_partkey', 'append'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE supplier +( + s_suppkey integer not null, + s_name char(25) not null, + s_address varchar(40) not null, + s_nationkey integer, + s_phone char(15) not null, + s_acctbal decimal(15,2) not null, + s_comment varchar(101) not null +); +SELECT create_reference_table('supplier'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +-- create a single shard supplier table which is not +-- a reference table +CREATE TABLE supplier_single_shard +( + s_suppkey integer not null, + s_name char(25) not null, + s_address varchar(40) not null, + s_nationkey integer, + s_phone char(15) not null, + s_acctbal decimal(15,2) not null, + s_comment varchar(101) not null +); +SELECT create_distributed_table('supplier_single_shard', 's_suppkey', 'append'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE mx_table_test (col1 int, col2 text); +-- Since we're superuser, we can set the replication model to 'streaming' to +-- create a one-off MX table... but if we forget to set the replication factor to one, +-- we should see an error reminding us to fix that +SET .replication_model TO 'streaming'; +ERROR: permission denied to set parameter ".replication_model" +SELECT create_distributed_table('mx_table_test', 'col1'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- ok, so now actually create the one-off MX table +SET .shard_replication_factor TO 1; +SELECT create_distributed_table('mx_table_test', 'col1'); +ERROR: table "mx_table_test" is already distributed +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regclass; + repmodel +--------------------------------------------------------------------- + c +(1 row) + +DROP TABLE mx_table_test; +-- Show that master_create_distributed_table ignores .replication_model GUC +CREATE TABLE s_table(a int); +SELECT master_create_distributed_table('s_table', 'a', 'hash'); + master_create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='s_table'::regclass; + repmodel +--------------------------------------------------------------------- + c +(1 row) + +-- Show that master_create_worker_shards complains when RF>1 and replication model is streaming +UPDATE pg_dist_partition SET repmodel = 's' WHERE logicalrelid='s_table'::regclass; +ERROR: permission denied for table pg_dist_partition +SELECT master_create_worker_shards('s_table', 4, 2); + master_create_worker_shards +--------------------------------------------------------------------- + +(1 row) + +DROP TABLE s_table; +RESET .replication_model; +ERROR: permission denied to set parameter ".replication_model" +-- Show that create_distributed_table with append and range distributions ignore +-- .replication_model GUC +SET .shard_replication_factor TO 2; +SET .replication_model TO streaming; +ERROR: permission denied to set parameter ".replication_model" +CREATE TABLE repmodel_test (a int); +SELECT create_distributed_table('repmodel_test', 'a', 'append'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass; + repmodel +--------------------------------------------------------------------- + c +(1 row) + +DROP TABLE repmodel_test; +CREATE TABLE repmodel_test (a int); +SELECT create_distributed_table('repmodel_test', 'a', 'range'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass; + repmodel +--------------------------------------------------------------------- + c +(1 row) + +DROP TABLE repmodel_test; +-- Show that master_create_distributed_table created statement replicated tables no matter +-- what .replication_model set to +CREATE TABLE repmodel_test (a int); +SELECT master_create_distributed_table('repmodel_test', 'a', 'hash'); + master_create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass; + repmodel +--------------------------------------------------------------------- + c +(1 row) + +DROP TABLE repmodel_test; +CREATE TABLE repmodel_test (a int); +SELECT master_create_distributed_table('repmodel_test', 'a', 'append'); + master_create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass; + repmodel +--------------------------------------------------------------------- + c +(1 row) + +DROP TABLE repmodel_test; +CREATE TABLE repmodel_test (a int); +SELECT master_create_distributed_table('repmodel_test', 'a', 'range'); + master_create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass; + repmodel +--------------------------------------------------------------------- + c +(1 row) + +DROP TABLE repmodel_test; +-- Check that the replication_model overwrite behavior is the same with RF=1 +SET .shard_replication_factor TO 1; +CREATE TABLE repmodel_test (a int); +SELECT create_distributed_table('repmodel_test', 'a', 'append'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass; + repmodel +--------------------------------------------------------------------- + c +(1 row) + +DROP TABLE repmodel_test; +CREATE TABLE repmodel_test (a int); +SELECT create_distributed_table('repmodel_test', 'a', 'range'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass; + repmodel +--------------------------------------------------------------------- + c +(1 row) + +DROP TABLE repmodel_test; +CREATE TABLE repmodel_test (a int); +SELECT master_create_distributed_table('repmodel_test', 'a', 'hash'); + master_create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass; + repmodel +--------------------------------------------------------------------- + c +(1 row) + +DROP TABLE repmodel_test; +CREATE TABLE repmodel_test (a int); +SELECT master_create_distributed_table('repmodel_test', 'a', 'append'); + master_create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass; + repmodel +--------------------------------------------------------------------- + c +(1 row) + +DROP TABLE repmodel_test; +CREATE TABLE repmodel_test (a int); +SELECT master_create_distributed_table('repmodel_test', 'a', 'range'); + master_create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass; + repmodel +--------------------------------------------------------------------- + c +(1 row) + +DROP TABLE repmodel_test; +RESET .replication_model; +ERROR: permission denied to set parameter ".replication_model" +-- Test initial data loading +CREATE TABLE data_load_test (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test VALUES (132, 'hello'); +INSERT INTO data_load_test VALUES (243, 'world'); +-- table must be empty when using append- or range-partitioning +SELECT create_distributed_table('data_load_test', 'col1', 'append'); +ERROR: cannot distribute relation "data_load_test" +DETAIL: Relation "data_load_test" contains data. +HINT: Empty your table before distributing it. +SELECT create_distributed_table('data_load_test', 'col1', 'range'); +ERROR: cannot distribute relation "data_load_test" +DETAIL: Relation "data_load_test" contains data. +HINT: Empty your table before distributing it. +-- table must be empty when using master_create_distributed_table (no shards created) +SELECT master_create_distributed_table('data_load_test', 'col1', 'hash'); +ERROR: cannot distribute relation "data_load_test" +DETAIL: Relation "data_load_test" contains data. +HINT: Empty your table before distributing it. +-- create_distributed_table creates shards and copies data into the distributed table +SELECT create_distributed_table('data_load_test', 'col1'); +NOTICE: Copying data from local table... + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM data_load_test ORDER BY col1; + col1 | col2 | col3 +--------------------------------------------------------------------- + 132 | hello | 1 + 243 | world | 2 +(2 rows) + +DROP TABLE data_load_test; +-- test queries on distributed tables with no shards +CREATE TABLE no_shard_test (col1 int, col2 text); +SELECT create_distributed_table('no_shard_test', 'col1', 'append'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM no_shard_test WHERE col1 > 1; + col1 | col2 +--------------------------------------------------------------------- +(0 rows) + +DROP TABLE no_shard_test; +CREATE TABLE no_shard_test (col1 int, col2 text); +SELECT create_distributed_table('no_shard_test', 'col1', 'range'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM no_shard_test WHERE col1 > 1; + col1 | col2 +--------------------------------------------------------------------- +(0 rows) + +DROP TABLE no_shard_test; +CREATE TABLE no_shard_test (col1 int, col2 text); +SELECT master_create_distributed_table('no_shard_test', 'col1', 'hash'); + master_create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM no_shard_test WHERE col1 > 1; + col1 | col2 +--------------------------------------------------------------------- +(0 rows) + +DROP TABLE no_shard_test; +-- ensure writes in the same transaction as create_distributed_table are visible +BEGIN; +CREATE TABLE data_load_test (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test VALUES (132, 'hello'); +SELECT create_distributed_table('data_load_test', 'col1'); +NOTICE: Copying data from local table... + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO data_load_test VALUES (243, 'world'); +END; +SELECT * FROM data_load_test ORDER BY col1; + col1 | col2 | col3 +--------------------------------------------------------------------- + 132 | hello | 1 + 243 | world | 2 +(2 rows) + +DROP TABLE data_load_test; +-- creating co-located distributed tables in the same transaction works +BEGIN; +CREATE TABLE data_load_test1 (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test1 VALUES (132, 'hello'); +SELECT create_distributed_table('data_load_test1', 'col1'); +NOTICE: Copying data from local table... + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE data_load_test2 (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test2 VALUES (132, 'world'); +SELECT create_distributed_table('data_load_test2', 'col1'); +NOTICE: Copying data from local table... + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT a.col2 ||' '|| b.col2 +FROM data_load_test1 a JOIN data_load_test2 b USING (col1) +WHERE col1 = 132; + ?column? +--------------------------------------------------------------------- + hello world +(1 row) + +DROP TABLE data_load_test1, data_load_test2; +END; diff --git a/src/test/regress/expected/multi_test_helpers_shadow.out b/src/test/regress/expected/multi_test_helpers_shadow.out new file mode 100644 index 000000000..3ae6c3b42 --- /dev/null +++ b/src/test/regress/expected/multi_test_helpers_shadow.out @@ -0,0 +1,98 @@ +-- File to create functions and helpers needed for subsequent tests +-- create a helper function to create objects on each node +CREATE OR REPLACE FUNCTION run_command_on_master_and_workers(p_sql text) +RETURNS void LANGUAGE plpgsql AS $$ +BEGIN + EXECUTE p_sql; + PERFORM run_command_on_workers(p_sql); +END;$$; +-- Create a function to make sure that queries returning the same result +CREATE OR REPLACE FUNCTION raise_failed_execution(query text) RETURNS void AS $$ +BEGIN + EXECUTE query; + EXCEPTION WHEN OTHERS THEN + IF SQLERRM LIKE 'failed to execute task%' THEN + RAISE 'Task failed to execute'; + END IF; +END; +$$LANGUAGE plpgsql; +-- Create a function to ignore worker plans in explain output +CREATE OR REPLACE FUNCTION coordinator_plan(explain_commmand text, out query_plan text) +RETURNS SETOF TEXT AS $$ +BEGIN + FOR query_plan IN execute explain_commmand LOOP + RETURN next; + IF query_plan LIKE '%Task Count:%' + THEN + RETURN; + END IF; + END LOOP; + RETURN; +END; $$ language plpgsql; +-- Is a distributed plan? +CREATE OR REPLACE FUNCTION plan_is_distributed(explain_commmand text) +RETURNS BOOLEAN AS $$ +DECLARE + query_plan TEXT; +BEGIN + FOR query_plan IN execute explain_commmand LOOP + IF query_plan LIKE '%Task Count:%' + THEN + RETURN TRUE; + END IF; + END LOOP; + RETURN FALSE; +END; $$ language plpgsql; +-- helper function to quickly run SQL on the whole cluster +CREATE OR REPLACE FUNCTION run_command_on_coordinator_and_workers(p_sql text) +RETURNS void LANGUAGE plpgsql AS $$ +BEGIN + EXECUTE p_sql; + PERFORM run_command_on_workers(p_sql); +END;$$; +-- 1. Marks the given procedure as colocated with the given table. +-- 2. Marks the argument index with which we route the procedure. +CREATE OR REPLACE FUNCTION colocate_proc_with_table(procname text, tablerelid regclass, argument_index int) +RETURNS void LANGUAGE plpgsql AS $$ +BEGIN + update citus.pg_dist_object + set distribution_argument_index = argument_index, colocationid = pg_dist_partition.colocationid + from pg_proc, pg_dist_partition + where proname = procname and oid = objid and pg_dist_partition.logicalrelid = tablerelid; +END;$$; +-- helper function to verify the function of a coordinator is the same on all workers +CREATE OR REPLACE FUNCTION verify_function_is_same_on_workers(funcname text) + RETURNS bool + LANGUAGE plpgsql +AS $func$ +DECLARE + coordinatorSql text; + workerSql text; +BEGIN + SELECT pg_get_functiondef(funcname::regprocedure) INTO coordinatorSql; + FOR workerSql IN SELECT result FROM run_command_on_workers('SELECT pg_get_functiondef(' || quote_literal(funcname) || '::regprocedure)') LOOP + IF workerSql != coordinatorSql THEN + RAISE INFO 'functions are different, coordinator:% worker:%', coordinatorSql, workerSql; + RETURN false; + END IF; + END LOOP; + + RETURN true; +END; +$func$; +-- +-- Procedure for creating shards for range partitioned distributed table. +-- +CREATE OR REPLACE PROCEDURE create_range_partitioned_shards(rel regclass, minvalues text[], maxvalues text[]) +AS $$ +DECLARE + new_shardid bigint; + idx int; +BEGIN + FOR idx IN SELECT * FROM generate_series(1, array_length(minvalues, 1)) + LOOP + SELECT master_create_empty_shard(rel::text) INTO new_shardid; + UPDATE pg_dist_shard SET shardminvalue=minvalues[idx], shardmaxvalue=maxvalues[idx] WHERE shardid=new_shardid; + END LOOP; +END; +$$ LANGUAGE plpgsql; diff --git a/src/test/regress/input/multi_behavioral_analytics_create_table_shadow.source b/src/test/regress/input/multi_behavioral_analytics_create_table_shadow.source new file mode 100644 index 000000000..2be0a888d --- /dev/null +++ b/src/test/regress/input/multi_behavioral_analytics_create_table_shadow.source @@ -0,0 +1,399 @@ +-- +-- multi behavioral analytics +-- this file is intended to create the table requires for the tests +-- +SET citus.next_shard_id TO 1400000; +SET citus.shard_replication_factor = 1; +SET citus.shard_count = 32; + +CREATE SCHEMA with_basics; +SET search_path TO 'with_basics'; + +CREATE TABLE users_table (user_id int, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint); +SELECT create_distributed_table('users_table', 'user_id'); + +CREATE TABLE events_table (user_id int, time timestamp, event_type int, value_2 int, value_3 float, value_4 bigint); +SELECT create_distributed_table('events_table', 'user_id'); + +\COPY users_table FROM '@abs_srcdir@/data/users_table.data' WITH CSV; +\COPY events_table FROM '@abs_srcdir@/data/events_table.data' WITH CSV; + +SET citus.shard_count = 96; +CREATE SCHEMA subquery_and_ctes; +SET search_path TO subquery_and_ctes; + +CREATE TABLE users_table (user_id int, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint); +SELECT create_distributed_table('users_table', 'user_id'); + +CREATE TABLE events_table (user_id int, time timestamp, event_type int, value_2 int, value_3 float, value_4 bigint); +SELECT create_distributed_table('events_table', 'user_id'); + +\COPY users_table FROM '@abs_srcdir@/data/users_table.data' WITH CSV; +\COPY events_table FROM '@abs_srcdir@/data/events_table.data' WITH CSV; + +SET citus.shard_count TO DEFAULT; +SET search_path TO DEFAULT; + +CREATE TABLE users_table (user_id int, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint); +SELECT create_distributed_table('users_table', 'user_id'); + +CREATE TABLE events_table (user_id int, time timestamp, event_type int, value_2 int, value_3 float, value_4 bigint); +SELECT create_distributed_table('events_table', 'user_id'); + +CREATE TABLE agg_results (user_id int, value_1_agg int, value_2_agg int, value_3_agg float, value_4_agg bigint, agg_time timestamp); +SELECT create_distributed_table('agg_results', 'user_id'); + +-- we need this to improve the concurrency on the regression tests +CREATE TABLE agg_results_second (user_id int, value_1_agg int, value_2_agg int, value_3_agg float, value_4_agg bigint, agg_time timestamp); +SELECT create_distributed_table('agg_results_second', 'user_id'); + +-- same as agg_results_second +CREATE TABLE agg_results_third (user_id int, value_1_agg int, value_2_agg int, value_3_agg float, value_4_agg bigint, agg_time timestamp); +SELECT create_distributed_table('agg_results_third', 'user_id'); + +-- same as agg_results_second +CREATE TABLE agg_results_fourth (user_id int, value_1_agg int, value_2_agg int, value_3_agg float, value_4_agg bigint, agg_time timestamp); +SELECT create_distributed_table('agg_results_fourth', 'user_id'); + +-- same as agg_results_second +CREATE TABLE agg_results_window (user_id int, value_1_agg int, value_2_agg int, value_3_agg float, value_4_agg bigint, agg_time timestamp); +SELECT create_distributed_table('agg_results_window', 'user_id'); + +CREATE TABLE users_ref_test_table(id int, it_name varchar(25), k_no int); +SELECT create_reference_table('users_ref_test_table'); +INSERT INTO users_ref_test_table VALUES(1,'User_1',45); +INSERT INTO users_ref_test_table VALUES(2,'User_2',46); +INSERT INTO users_ref_test_table VALUES(3,'User_3',47); +INSERT INTO users_ref_test_table VALUES(4,'User_4',48); +INSERT INTO users_ref_test_table VALUES(5,'User_5',49); +INSERT INTO users_ref_test_table VALUES(6,'User_6',50); + +\COPY users_table FROM '@abs_srcdir@/data/users_table.data' WITH CSV; +\COPY events_table FROM '@abs_srcdir@/data/events_table.data' WITH CSV; + +-- create indexes for +CREATE INDEX is_index1 ON users_table(user_id); +CREATE INDEX is_index2 ON events_table(user_id); +CREATE INDEX is_index3 ON users_table(value_1); +CREATE INDEX is_index4 ON events_table(event_type); +CREATE INDEX is_index5 ON users_table(value_2); +CREATE INDEX is_index6 ON events_table(value_2); + +-- Create composite type to use in subquery pushdown +CREATE TYPE user_composite_type AS +( + tenant_id BIGINT, + user_id BIGINT +); + +SELECT run_command_on_master_and_workers($f$ + + CREATE FUNCTION cmp_user_composite_type_function(user_composite_type, user_composite_type) RETURNS int + LANGUAGE 'internal' + AS 'btrecordcmp' + IMMUTABLE + RETURNS NULL ON NULL INPUT; +$f$); + +SELECT run_command_on_master_and_workers($f$ + + CREATE FUNCTION gt_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean + LANGUAGE 'internal' + AS 'record_gt' + IMMUTABLE + RETURNS NULL ON NULL INPUT; +$f$); + +SELECT run_command_on_master_and_workers($f$ + + CREATE FUNCTION ge_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean + LANGUAGE 'internal' + AS 'record_ge' + IMMUTABLE + RETURNS NULL ON NULL INPUT; +$f$); + +SELECT run_command_on_master_and_workers($f$ + + CREATE FUNCTION equal_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean + LANGUAGE 'internal' + AS 'record_eq' + IMMUTABLE; +$f$); + +SELECT run_command_on_master_and_workers($f$ + + CREATE FUNCTION lt_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean + LANGUAGE 'internal' + AS 'record_lt' + IMMUTABLE + RETURNS NULL ON NULL INPUT; +$f$); + +SELECT run_command_on_master_and_workers($f$ + + CREATE FUNCTION le_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean + LANGUAGE 'internal' + AS 'record_lt' + IMMUTABLE + RETURNS NULL ON NULL INPUT; +$f$); + +SELECT run_command_on_master_and_workers($f$ + + CREATE OPERATOR > ( + LEFTARG = user_composite_type, + RIGHTARG = user_composite_type, + PROCEDURE = gt_user_composite_type_function + ); +$f$); + +SELECT run_command_on_master_and_workers($f$ + + CREATE OPERATOR >= ( + LEFTARG = user_composite_type, + RIGHTARG = user_composite_type, + PROCEDURE = ge_user_composite_type_function + ); +$f$); + +-- ... use that function to create a custom equality operator... +SELECT run_command_on_master_and_workers($f$ + + -- ... use that function to create a custom equality operator... + CREATE OPERATOR = ( + LEFTARG = user_composite_type, + RIGHTARG = user_composite_type, + PROCEDURE = equal_user_composite_type_function, + commutator = =, + RESTRICT = eqsel, + JOIN = eqjoinsel, + merges, + hashes + ); +$f$); + +SELECT run_command_on_master_and_workers($f$ + + CREATE OPERATOR <= ( + LEFTARG = user_composite_type, + RIGHTARG = user_composite_type, + PROCEDURE = le_user_composite_type_function + ); +$f$); + +SELECT run_command_on_master_and_workers($f$ + + CREATE OPERATOR < ( + LEFTARG = user_composite_type, + RIGHTARG = user_composite_type, + PROCEDURE = lt_user_composite_type_function + ); +$f$); + + +-- ... and create a custom operator family for hash indexes... +SELECT run_command_on_master_and_workers($f$ + + CREATE OPERATOR FAMILY cats_2_op_fam USING hash; +$f$); + + +-- ... create a test HASH function. Though it is a poor hash function, +-- it is acceptable for our tests +SELECT run_command_on_master_and_workers($f$ + + CREATE FUNCTION test_composite_type_hash(user_composite_type) RETURNS int + AS 'SELECT hashtext( ($1.tenant_id + $1.tenant_id)::text);' + LANGUAGE SQL + IMMUTABLE + RETURNS NULL ON NULL INPUT; +$f$); + + +-- We need to define two different operator classes for the composite types +-- One uses BTREE the other uses HASH +SELECT run_command_on_master_and_workers($f$ + + CREATE OPERATOR CLASS cats_2_op_fam_clas3 + DEFAULT FOR TYPE user_composite_type USING BTREE AS + OPERATOR 1 <= (user_composite_type, user_composite_type), + OPERATOR 2 < (user_composite_type, user_composite_type), + OPERATOR 3 = (user_composite_type, user_composite_type), + OPERATOR 4 >= (user_composite_type, user_composite_type), + OPERATOR 5 > (user_composite_type, user_composite_type), + + FUNCTION 1 cmp_user_composite_type_function(user_composite_type, user_composite_type); +$f$); + +SELECT run_command_on_master_and_workers($f$ + + CREATE OPERATOR CLASS cats_2_op_fam_class + DEFAULT FOR TYPE user_composite_type USING HASH AS + OPERATOR 1 = (user_composite_type, user_composite_type), + FUNCTION 1 test_composite_type_hash(user_composite_type); +$f$); + +CREATE TABLE events ( + composite_id user_composite_type, + event_id bigint, + event_type character varying(255), + event_time bigint +); +SELECT master_create_distributed_table('events', 'composite_id', 'range'); + +SELECT master_create_empty_shard('events') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)' +WHERE shardid = :new_shard_id; + +SELECT master_create_empty_shard('events') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)' +WHERE shardid = :new_shard_id; + +SELECT master_create_empty_shard('events') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)' +WHERE shardid = :new_shard_id; + +SELECT master_create_empty_shard('events') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)' +WHERE shardid = :new_shard_id; + +\COPY events FROM STDIN WITH CSV +"(1,1001)",20001,click,1472807012 +"(1,1001)",20002,submit,1472807015 +"(1,1001)",20003,pay,1472807020 +"(1,1002)",20010,click,1472807022 +"(1,1002)",20011,click,1472807023 +"(1,1002)",20012,submit,1472807025 +"(1,1002)",20013,pay,1472807030 +"(1,1003)",20014,click,1472807032 +"(1,1003)",20015,click,1472807033 +"(1,1003)",20016,click,1472807034 +"(1,1003)",20017,submit,1472807035 +\. + +CREATE TABLE users ( + composite_id user_composite_type, + lastseen bigint +); +SELECT master_create_distributed_table('users', 'composite_id', 'range'); + +SELECT master_create_empty_shard('users') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)' +WHERE shardid = :new_shard_id; + +SELECT master_create_empty_shard('users') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)' +WHERE shardid = :new_shard_id; + +SELECT master_create_empty_shard('users') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)' +WHERE shardid = :new_shard_id; + +SELECT master_create_empty_shard('users') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)' +WHERE shardid = :new_shard_id; + +\COPY users FROM STDIN WITH CSV +"(1,1001)",1472807115 +"(1,1002)",1472807215 +"(1,1003)",1472807315 +\. + +-- Create tables for subquery tests +CREATE TABLE lineitem_subquery ( + l_orderkey bigint not null, + l_partkey integer not null, + l_suppkey integer not null, + l_linenumber integer not null, + l_quantity decimal(15, 2) not null, + l_extendedprice decimal(15, 2) not null, + l_discount decimal(15, 2) not null, + l_tax decimal(15, 2) not null, + l_returnflag char(1) not null, + l_linestatus char(1) not null, + l_shipdate date not null, + l_commitdate date not null, + l_receiptdate date not null, + l_shipinstruct char(25) not null, + l_shipmode char(10) not null, + l_comment varchar(44) not null, + PRIMARY KEY(l_orderkey, l_linenumber) ); +SELECT master_create_distributed_table('lineitem_subquery', 'l_orderkey', 'range'); + +CREATE TABLE orders_subquery ( + o_orderkey bigint not null, + o_custkey integer not null, + o_orderstatus char(1) not null, + o_totalprice decimal(15,2) not null, + o_orderdate date not null, + o_orderpriority char(15) not null, + o_clerk char(15) not null, + o_shippriority integer not null, + o_comment varchar(79) not null, + PRIMARY KEY(o_orderkey) ); +SELECT master_create_distributed_table('orders_subquery', 'o_orderkey', 'range'); + +SET citus.enable_router_execution TO 'false'; + +-- Check that we don't crash if there are not any shards. +SELECT + avg(unit_price) +FROM + (SELECT + l_orderkey, + avg(o_totalprice) AS unit_price + FROM + lineitem_subquery, + orders_subquery + WHERE + l_orderkey = o_orderkey + GROUP BY + l_orderkey) AS unit_prices; + +-- Load data into tables. + +SELECT master_create_empty_shard('lineitem_subquery') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 5986 +WHERE shardid = :new_shard_id; + +SELECT master_create_empty_shard('lineitem_subquery') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = 8997, shardmaxvalue = 14947 +WHERE shardid = :new_shard_id; + +SELECT master_create_empty_shard('orders_subquery') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 5986 +WHERE shardid = :new_shard_id; + +SELECT master_create_empty_shard('orders_subquery') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = 8997, shardmaxvalue = 14947 +WHERE shardid = :new_shard_id; + +SET citus.shard_max_size TO "1MB"; + +\copy lineitem_subquery FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|' +\copy lineitem_subquery FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|' + +\copy orders_subquery FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|' +\copy orders_subquery FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|' + +CREATE TABLE events_reference_table (like events_table including all); +SELECT create_reference_table('events_reference_table'); +CREATE INDEX events_ref_val2 on events_reference_table(value_2); +INSERT INTO events_reference_table SELECT * FROM events_table; + +CREATE TABLE users_reference_table (like users_table including all); +SELECT create_reference_table('users_reference_table'); +INSERT INTO users_reference_table SELECT * FROM users_table; diff --git a/src/test/regress/input/multi_load_data_shadow.source b/src/test/regress/input/multi_load_data_shadow.source new file mode 100644 index 000000000..6021bcd34 --- /dev/null +++ b/src/test/regress/input/multi_load_data_shadow.source @@ -0,0 +1,26 @@ +-- +-- MULTI_LOAD_DATA +-- +-- Tests for loading data in a distributed cluster. Please note that the number +-- of shards uploaded depends on two config values: citus.shard_replication_factor and +-- citus.shard_max_size. These values are set in pg_regress_multi.pl. Shard placement +-- policy is left to the default value (round-robin) to test the common install case. + +SET citus.next_shard_id TO 290000; + +\copy lineitem FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|' +\copy lineitem FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|' + +\copy orders FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|' +\copy orders FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|' + +\copy orders_reference FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|' +\copy orders_reference FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|' + +\copy customer FROM '@abs_srcdir@/data/customer.1.data' with delimiter '|' +\copy customer_append FROM '@abs_srcdir@/data/customer.1.data' with delimiter '|' +\copy nation FROM '@abs_srcdir@/data/nation.data' with delimiter '|' +\copy part FROM '@abs_srcdir@/data/part.data' with delimiter '|' +\copy part_append FROM '@abs_srcdir@/data/part.data' with delimiter '|' +\copy supplier FROM '@abs_srcdir@/data/supplier.data' with delimiter '|' +\copy supplier_single_shard FROM '@abs_srcdir@/data/supplier.data' with delimiter '|' diff --git a/src/test/regress/multi_schedule_hyperscale b/src/test/regress/multi_schedule_hyperscale new file mode 100644 index 000000000..bbf6db5bf --- /dev/null +++ b/src/test/regress/multi_schedule_hyperscale @@ -0,0 +1,262 @@ +# ---------- +# $Id$ +# +# Regression tests that exercise distributed planning/execution functionality. +# +# All new regression tests are expected to be run by this schedule. Tests that +# do not set specific task executor type should also be added to +# multi_task_tracker_extra_schedule. +# +# Note that we use variant comparison files to test version dependent regression +# test results. For more information: +# http://www.postgresql.org/docs/current/static/regress-variant.html +# ---------- + +# --- +# Tests around schema changes, these are run first, so there's no preexisting objects. +# +# propagate_extension_commands lies just after multi_cluster_management as we do +# remove / add node operations, we do not want any preexisting objects before +# propagate_extension_commands +# --- +test: multi_test_helpers_shadow +test: multi_test_catalog_views +test: multi_name_resolution + + +# ---------- +# The following distributed tests depend on creating a partitioned table and +# uploading data to it. +# ---------- +test: multi_create_table_shadow +test: multi_master_protocol multi_load_data_shadow multi_behavioral_analytics_create_table_shadow +test: recursive_dml_with_different_planners_executors +test: window_functions multi_insert_select_window + +# following should not run in parallel because it relies on connection counts to workers +test: insert_select_connection_leak + +# --------- +# at the end of the regression tests regaring recursively planned modifications +# ensure that we don't leak any intermediate results +# This test should not run in parallel with any other tests +# --------- + +# ---------- +# Tests for partitioning support +# ---------- + + +# ---------- +# Tests for recursive subquery planning +# ---------- +test: subquery_basics subquery_local_tables subquery_executors set_operations set_operation_and_local_tables +test: subquery_partitioning subquery_complex_target_list subqueries_not_supported +test: non_colocated_join_order +test: subquery_prepared_statements pg12 cte_inline + +# ---------- +# Miscellaneous tests to check our query planning behavior +# ---------- +test: multi_distributed_transaction_id +test: hyperscale_tutorial +test: multi_basic_queries multi_complex_expressions multi_subquery_complex_queries multi_subquery_behavioral_analytics +test: multi_subquery_complex_reference_clause multi_subquery_window_functions multi_sql_function +test: multi_function_in_join row_types +test: multi_subquery_in_where_reference_clause full_join adaptive_executor propagate_set_commands +test: multi_subquery_union multi_subquery_in_where_clause multi_subquery_misc +test: multi_limit_clause_approximate multi_single_relation_subquery +test: multi_select_for_update +test: multi_average_expression multi_working_columns multi_having_pushdown +test: multi_array_agg multi_limit_clause +test: multi_jsonb_agg multi_jsonb_object_agg multi_json_agg multi_json_object_agg bool_agg ch_bench_having ch_bench_subquery_repartition chbenchmark_all_queries expression_reference_join +test: multi_agg_type_conversion multi_count_type_conversion +test: multi_cross_shard +test: multi_dropped_column_aliases foreign_key_restriction_enforcement +test: multi_binary_master_copy_format + +# ---------- +# Parallel TPC-H tests to check our distributed execution behavior +# ---------- +test: multi_tpch_query1 multi_tpch_query3 multi_tpch_query6 multi_tpch_query10 +test: multi_tpch_query12 multi_tpch_query14 multi_tpch_query19 +test: multi_tpch_query7 multi_tpch_query7_nested + +# ---------- +# Parallel tests to check our join order planning logic. Note that we load data +# below; and therefore these tests should come after the execution tests. +# ---------- +test: multi_join_order_tpch_small multi_join_order_additional +test: multi_join_order_tpch_repartition + +# ---------- +# Tests for repartition join planning and execution. Be careful when creating +# new shards before these tests, as they expect specific shard identifiers in +# the output. +# ---------- +test: multi_repartition_join_ref +test: adaptive_executor_repartition + +# --------- +# Tests that modify data should run sequentially +# --------- +test: with_prepare + +# --------- +# Tests for recursive planning. +# --------- +test: with_nested with_where +test: cte_prepared_modify cte_nested_modification +test: with_executors with_partitioning with_dml + + +# ---------- +# Tests to check our large record loading and shard deletion behavior +# ---------- +test: multi_load_large_records +test: multi_master_delete_protocol +test: multi_shard_modify + +# ---------- +# Tests around DDL statements run on distributed tables +# ---------- + +# ---------- +# multi_create_schema tests creation, loading, and querying of a table in a new +# schema (namespace). +# ---------- +test: multi_create_schema + +# ---------- +# Tests to check if we inform the user about potential caveats of creating new +# databases, schemas, roles, and authentication information. +# ---------- + +# ---------- +# Tests to check the sequential and parallel executions of DDL and modification +# commands +# Should not be executed in parallel with other tests +# ---------- + +# --------- +# loads data to create shards in a way that forces +# shard caching. +# --------- + +# --------- +# multi_outer_join loads data to create shards to test outer join mappings +# --------- +test: multi_outer_join + +# --- +# Tests covering mostly modification queries and required preliminary +# functionality related to metadata, shard creation, shard pruning and +# "hacky" copy script for hash partitioned tables. +# Note that the order of the following tests are important. multi_complex_count_distinct +# is independent from the rest of the group, it is added to increase parallelism. +# --- +test: multi_complex_count_distinct +test: multi_upsert multi_simple_queries +test: foreign_key_to_reference_table validate_constraint + +# --------- +# creates hash and range-partitioned tables and performs COPY +# creates hash partitioned tables. +# --------- +test: fast_path_router_modify +test: null_parameters + +# ---------- +# loads more lineitem data using high shard identifiers +# ---------- + +# ---------- +# tests various size commands on distributed tables +# ---------- + +# ---------- +# multi_drop_extension makes sure we can safely drop and recreate the extension +# ---------- + +# ---------- +# tests the propagation of mx-related metadata changes to metadata workers +# multi_unsupported_worker_operations tests that unsupported operations error out on metadata workers +# ---------- + +# ---------- +# tests if the GRANT ... ON SCHEMA queries are propagated correctly +# makes sure we can work with tables in schemas other than public with no problem +# ---------- + +# ---------- +# multi_function_evaluation tests edge-cases in master-side function pre-evaluation +# ---------- +test: multi_function_evaluation + +# ---------- +# tests truncate functionality for distributed tables +# ---------- + +# ---------- +# tests utility functions written for co-location feature & internal API +# tests master_copy_shard_placement with colocated tables. +# ---------- + +# ---------- +# tests utility functions written for citus tools +# ---------- + +# ---------- +# multi_foreign_key tests foreign key push down on distributed tables +# ---------- +test: multi_foreign_key + +# ---------- +# tests for upgrade_reference_table UDF +# tests replicating reference tables to new nodes after we add new nodes +# tests metadata changes after master_remove_node +# ---------- + +# -------- +# Replicating reference tables to coordinator. Add coordinator to pg_dist_node +# and rerun some of the tests. +# -------- +test: foreign_key_to_reference_table + + +# ---------- +# tests for dropping shards using connection API +# ---------- + +# ---------- +# tests simple combinations of permission access and queries +# ---------- + +# --------- +# tests for an obscure crash citus used to exhibit when shardids +# changed the table they belonged to during a session +# -------- + +# --------- +# multi_task_string_size tests task string size checks +# --------- +test: multi_task_string_size + +# --------- +# connection encryption tests +# --------- + +# --------- +# object distribution tests +# --------- +test: distributed_types_xact_add_enum_value + +# --------- +# deparsing logic tests +# --------- + +# --------- +# test that no tests leaked intermediate results. This should always be last +# Causes random test failures so commented out for now +# --------- +# test: diff --git a/src/test/regress/output/multi_behavioral_analytics_create_table_shadow.source b/src/test/regress/output/multi_behavioral_analytics_create_table_shadow.source new file mode 100644 index 000000000..5130d52fe --- /dev/null +++ b/src/test/regress/output/multi_behavioral_analytics_create_table_shadow.source @@ -0,0 +1,608 @@ +-- +-- multi behavioral analytics +-- this file is intended to create the table requires for the tests +-- +SET citus.next_shard_id TO 1400000; +SET citus.shard_replication_factor = 1; +SET citus.shard_count = 32; +CREATE SCHEMA with_basics; +SET search_path TO 'with_basics'; +CREATE TABLE users_table (user_id int, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint); +SELECT create_distributed_table('users_table', 'user_id'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE events_table (user_id int, time timestamp, event_type int, value_2 int, value_3 float, value_4 bigint); +SELECT create_distributed_table('events_table', 'user_id'); + create_distributed_table +-------------------------- + +(1 row) + +\COPY users_table FROM '@abs_srcdir@/data/users_table.data' WITH CSV; +\COPY events_table FROM '@abs_srcdir@/data/events_table.data' WITH CSV; +SET citus.shard_count = 96; +CREATE SCHEMA subquery_and_ctes; +SET search_path TO subquery_and_ctes; +CREATE TABLE users_table (user_id int, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint); +SELECT create_distributed_table('users_table', 'user_id'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE events_table (user_id int, time timestamp, event_type int, value_2 int, value_3 float, value_4 bigint); +SELECT create_distributed_table('events_table', 'user_id'); + create_distributed_table +-------------------------- + +(1 row) + +\COPY users_table FROM '@abs_srcdir@/data/users_table.data' WITH CSV; +\COPY events_table FROM '@abs_srcdir@/data/events_table.data' WITH CSV; +SET citus.shard_count TO DEFAULT; +SET search_path TO DEFAULT; +CREATE TABLE users_table (user_id int, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint); +SELECT create_distributed_table('users_table', 'user_id'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE events_table (user_id int, time timestamp, event_type int, value_2 int, value_3 float, value_4 bigint); +SELECT create_distributed_table('events_table', 'user_id'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE agg_results (user_id int, value_1_agg int, value_2_agg int, value_3_agg float, value_4_agg bigint, agg_time timestamp); +SELECT create_distributed_table('agg_results', 'user_id'); + create_distributed_table +-------------------------- + +(1 row) + +-- we need this to improve the concurrency on the regression tests +CREATE TABLE agg_results_second (user_id int, value_1_agg int, value_2_agg int, value_3_agg float, value_4_agg bigint, agg_time timestamp); +SELECT create_distributed_table('agg_results_second', 'user_id'); + create_distributed_table +-------------------------- + +(1 row) + +-- same as agg_results_second +CREATE TABLE agg_results_third (user_id int, value_1_agg int, value_2_agg int, value_3_agg float, value_4_agg bigint, agg_time timestamp); +SELECT create_distributed_table('agg_results_third', 'user_id'); + create_distributed_table +-------------------------- + +(1 row) + +-- same as agg_results_second +CREATE TABLE agg_results_fourth (user_id int, value_1_agg int, value_2_agg int, value_3_agg float, value_4_agg bigint, agg_time timestamp); +SELECT create_distributed_table('agg_results_fourth', 'user_id'); + create_distributed_table +-------------------------- + +(1 row) + +-- same as agg_results_second +CREATE TABLE agg_results_window (user_id int, value_1_agg int, value_2_agg int, value_3_agg float, value_4_agg bigint, agg_time timestamp); +SELECT create_distributed_table('agg_results_window', 'user_id'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE users_ref_test_table(id int, it_name varchar(25), k_no int); +SELECT create_reference_table('users_ref_test_table'); + create_reference_table +------------------------ + +(1 row) + +INSERT INTO users_ref_test_table VALUES(1,'User_1',45); +INSERT INTO users_ref_test_table VALUES(2,'User_2',46); +INSERT INTO users_ref_test_table VALUES(3,'User_3',47); +INSERT INTO users_ref_test_table VALUES(4,'User_4',48); +INSERT INTO users_ref_test_table VALUES(5,'User_5',49); +INSERT INTO users_ref_test_table VALUES(6,'User_6',50); +\COPY users_table FROM '@abs_srcdir@/data/users_table.data' WITH CSV; +\COPY events_table FROM '@abs_srcdir@/data/events_table.data' WITH CSV; +-- create indexes for +CREATE INDEX is_index1 ON users_table(user_id); +CREATE INDEX is_index2 ON events_table(user_id); +CREATE INDEX is_index3 ON users_table(value_1); +CREATE INDEX is_index4 ON events_table(event_type); +CREATE INDEX is_index5 ON users_table(value_2); +CREATE INDEX is_index6 ON events_table(value_2); +-- Create composite type to use in subquery pushdown +CREATE TYPE user_composite_type AS +( + tenant_id BIGINT, + user_id BIGINT +); +SELECT run_command_on_master_and_workers($f$ + + CREATE FUNCTION cmp_user_composite_type_function(user_composite_type, user_composite_type) RETURNS int + LANGUAGE 'internal' + AS 'btrecordcmp' + IMMUTABLE + RETURNS NULL ON NULL INPUT; +$f$); +ERROR: permission denied for language internal +CONTEXT: SQL statement " + + CREATE FUNCTION cmp_user_composite_type_function(user_composite_type, user_composite_type) RETURNS int + LANGUAGE 'internal' + AS 'btrecordcmp' + IMMUTABLE + RETURNS NULL ON NULL INPUT; +" +PL/pgSQL function run_command_on_master_and_workers(text) line 3 at EXECUTE +SELECT run_command_on_master_and_workers($f$ + + CREATE FUNCTION gt_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean + LANGUAGE 'internal' + AS 'record_gt' + IMMUTABLE + RETURNS NULL ON NULL INPUT; +$f$); +ERROR: permission denied for language internal +CONTEXT: SQL statement " + + CREATE FUNCTION gt_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean + LANGUAGE 'internal' + AS 'record_gt' + IMMUTABLE + RETURNS NULL ON NULL INPUT; +" +PL/pgSQL function run_command_on_master_and_workers(text) line 3 at EXECUTE +SELECT run_command_on_master_and_workers($f$ + + CREATE FUNCTION ge_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean + LANGUAGE 'internal' + AS 'record_ge' + IMMUTABLE + RETURNS NULL ON NULL INPUT; +$f$); +ERROR: permission denied for language internal +CONTEXT: SQL statement " + + CREATE FUNCTION ge_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean + LANGUAGE 'internal' + AS 'record_ge' + IMMUTABLE + RETURNS NULL ON NULL INPUT; +" +PL/pgSQL function run_command_on_master_and_workers(text) line 3 at EXECUTE +SELECT run_command_on_master_and_workers($f$ + + CREATE FUNCTION equal_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean + LANGUAGE 'internal' + AS 'record_eq' + IMMUTABLE; +$f$); +ERROR: permission denied for language internal +CONTEXT: SQL statement " + + CREATE FUNCTION equal_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean + LANGUAGE 'internal' + AS 'record_eq' + IMMUTABLE; +" +PL/pgSQL function run_command_on_master_and_workers(text) line 3 at EXECUTE +SELECT run_command_on_master_and_workers($f$ + + CREATE FUNCTION lt_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean + LANGUAGE 'internal' + AS 'record_lt' + IMMUTABLE + RETURNS NULL ON NULL INPUT; +$f$); +ERROR: permission denied for language internal +CONTEXT: SQL statement " + + CREATE FUNCTION lt_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean + LANGUAGE 'internal' + AS 'record_lt' + IMMUTABLE + RETURNS NULL ON NULL INPUT; +" +PL/pgSQL function run_command_on_master_and_workers(text) line 3 at EXECUTE +SELECT run_command_on_master_and_workers($f$ + + CREATE FUNCTION le_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean + LANGUAGE 'internal' + AS 'record_lt' + IMMUTABLE + RETURNS NULL ON NULL INPUT; +$f$); +ERROR: permission denied for language internal +CONTEXT: SQL statement " + + CREATE FUNCTION le_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean + LANGUAGE 'internal' + AS 'record_lt' + IMMUTABLE + RETURNS NULL ON NULL INPUT; +" +PL/pgSQL function run_command_on_master_and_workers(text) line 3 at EXECUTE +SELECT run_command_on_master_and_workers($f$ + + CREATE OPERATOR > ( + LEFTARG = user_composite_type, + RIGHTARG = user_composite_type, + PROCEDURE = gt_user_composite_type_function + ); +$f$); +ERROR: function gt_user_composite_type_function(user_composite_type, user_composite_type) does not exist +CONTEXT: SQL statement " + + CREATE OPERATOR > ( + LEFTARG = user_composite_type, + RIGHTARG = user_composite_type, + PROCEDURE = gt_user_composite_type_function + ); +" +PL/pgSQL function run_command_on_master_and_workers(text) line 3 at EXECUTE +SELECT run_command_on_master_and_workers($f$ + + CREATE OPERATOR >= ( + LEFTARG = user_composite_type, + RIGHTARG = user_composite_type, + PROCEDURE = ge_user_composite_type_function + ); +$f$); +ERROR: function ge_user_composite_type_function(user_composite_type, user_composite_type) does not exist +CONTEXT: SQL statement " + + CREATE OPERATOR >= ( + LEFTARG = user_composite_type, + RIGHTARG = user_composite_type, + PROCEDURE = ge_user_composite_type_function + ); +" +PL/pgSQL function run_command_on_master_and_workers(text) line 3 at EXECUTE +-- ... use that function to create a custom equality operator... +SELECT run_command_on_master_and_workers($f$ + + -- ... use that function to create a custom equality operator... + CREATE OPERATOR = ( + LEFTARG = user_composite_type, + RIGHTARG = user_composite_type, + PROCEDURE = equal_user_composite_type_function, + commutator = =, + RESTRICT = eqsel, + JOIN = eqjoinsel, + merges, + hashes + ); +$f$); +ERROR: function equal_user_composite_type_function(user_composite_type, user_composite_type) does not exist +CONTEXT: SQL statement " + + -- ... use that function to create a custom equality operator... + CREATE OPERATOR = ( + LEFTARG = user_composite_type, + RIGHTARG = user_composite_type, + PROCEDURE = equal_user_composite_type_function, + commutator = =, + RESTRICT = eqsel, + JOIN = eqjoinsel, + merges, + hashes + ); +" +PL/pgSQL function run_command_on_master_and_workers(text) line 3 at EXECUTE +SELECT run_command_on_master_and_workers($f$ + + CREATE OPERATOR <= ( + LEFTARG = user_composite_type, + RIGHTARG = user_composite_type, + PROCEDURE = le_user_composite_type_function + ); +$f$); +ERROR: function le_user_composite_type_function(user_composite_type, user_composite_type) does not exist +CONTEXT: SQL statement " + + CREATE OPERATOR <= ( + LEFTARG = user_composite_type, + RIGHTARG = user_composite_type, + PROCEDURE = le_user_composite_type_function + ); +" +PL/pgSQL function run_command_on_master_and_workers(text) line 3 at EXECUTE +SELECT run_command_on_master_and_workers($f$ + + CREATE OPERATOR < ( + LEFTARG = user_composite_type, + RIGHTARG = user_composite_type, + PROCEDURE = lt_user_composite_type_function + ); +$f$); +ERROR: function lt_user_composite_type_function(user_composite_type, user_composite_type) does not exist +CONTEXT: SQL statement " + + CREATE OPERATOR < ( + LEFTARG = user_composite_type, + RIGHTARG = user_composite_type, + PROCEDURE = lt_user_composite_type_function + ); +" +PL/pgSQL function run_command_on_master_and_workers(text) line 3 at EXECUTE +-- ... and create a custom operator family for hash indexes... +SELECT run_command_on_master_and_workers($f$ + + CREATE OPERATOR FAMILY cats_2_op_fam USING hash; +$f$); +ERROR: must be superuser to create an operator family +CONTEXT: SQL statement " + + CREATE OPERATOR FAMILY cats_2_op_fam USING hash; +" +PL/pgSQL function run_command_on_master_and_workers(text) line 3 at EXECUTE +-- ... create a test HASH function. Though it is a poor hash function, +-- it is acceptable for our tests +SELECT run_command_on_master_and_workers($f$ + + CREATE FUNCTION test_composite_type_hash(user_composite_type) RETURNS int + AS 'SELECT hashtext( ($1.tenant_id + $1.tenant_id)::text);' + LANGUAGE SQL + IMMUTABLE + RETURNS NULL ON NULL INPUT; +$f$); + run_command_on_master_and_workers +----------------------------------- + +(1 row) + +-- We need to define two different operator classes for the composite types +-- One uses BTREE the other uses HASH +SELECT run_command_on_master_and_workers($f$ + + CREATE OPERATOR CLASS cats_2_op_fam_clas3 + DEFAULT FOR TYPE user_composite_type USING BTREE AS + OPERATOR 1 <= (user_composite_type, user_composite_type), + OPERATOR 2 < (user_composite_type, user_composite_type), + OPERATOR 3 = (user_composite_type, user_composite_type), + OPERATOR 4 >= (user_composite_type, user_composite_type), + OPERATOR 5 > (user_composite_type, user_composite_type), + + FUNCTION 1 cmp_user_composite_type_function(user_composite_type, user_composite_type); +$f$); +ERROR: must be superuser to create an operator class +CONTEXT: SQL statement " + + CREATE OPERATOR CLASS cats_2_op_fam_clas3 + DEFAULT FOR TYPE user_composite_type USING BTREE AS + OPERATOR 1 <= (user_composite_type, user_composite_type), + OPERATOR 2 < (user_composite_type, user_composite_type), + OPERATOR 3 = (user_composite_type, user_composite_type), + OPERATOR 4 >= (user_composite_type, user_composite_type), + OPERATOR 5 > (user_composite_type, user_composite_type), + + FUNCTION 1 cmp_user_composite_type_function(user_composite_type, user_composite_type); +" +PL/pgSQL function run_command_on_master_and_workers(text) line 3 at EXECUTE +SELECT run_command_on_master_and_workers($f$ + + CREATE OPERATOR CLASS cats_2_op_fam_class + DEFAULT FOR TYPE user_composite_type USING HASH AS + OPERATOR 1 = (user_composite_type, user_composite_type), + FUNCTION 1 test_composite_type_hash(user_composite_type); +$f$); +ERROR: must be superuser to create an operator class +CONTEXT: SQL statement " + + CREATE OPERATOR CLASS cats_2_op_fam_class + DEFAULT FOR TYPE user_composite_type USING HASH AS + OPERATOR 1 = (user_composite_type, user_composite_type), + FUNCTION 1 test_composite_type_hash(user_composite_type); +" +PL/pgSQL function run_command_on_master_and_workers(text) line 3 at EXECUTE +CREATE TABLE events ( + composite_id user_composite_type, + event_id bigint, + event_type character varying(255), + event_time bigint +); +SELECT master_create_distributed_table('events', 'composite_id', 'range'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_empty_shard('events') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)' +WHERE shardid = :new_shard_id; +ERROR: permission denied for table pg_dist_shard +SELECT master_create_empty_shard('events') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)' +WHERE shardid = :new_shard_id; +ERROR: permission denied for table pg_dist_shard +SELECT master_create_empty_shard('events') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)' +WHERE shardid = :new_shard_id; +ERROR: permission denied for table pg_dist_shard +SELECT master_create_empty_shard('events') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)' +WHERE shardid = :new_shard_id; +ERROR: permission denied for table pg_dist_shard +\COPY events FROM STDIN WITH CSV +ERROR: could not start copy +DETAIL: Distributed relation "events" has shards with missing shardminvalue/shardmaxvalue. +"(1,1001)",20001,click,1472807012 +"(1,1001)",20002,submit,1472807015 +"(1,1001)",20003,pay,1472807020 +"(1,1002)",20010,click,1472807022 +"(1,1002)",20011,click,1472807023 +"(1,1002)",20012,submit,1472807025 +"(1,1002)",20013,pay,1472807030 +"(1,1003)",20014,click,1472807032 +"(1,1003)",20015,click,1472807033 +"(1,1003)",20016,click,1472807034 +"(1,1003)",20017,submit,1472807035 +\. +invalid command \. +CREATE TABLE users ( + composite_id user_composite_type, + lastseen bigint +); +ERROR: syntax error at or near ""(1,1001)"" +LINE 1: "(1,1001)",20001,click,1472807012 + ^ +SELECT master_create_distributed_table('users', 'composite_id', 'range'); +ERROR: relation "users" does not exist +LINE 1: SELECT master_create_distributed_table('users', 'composite_i... + ^ +SELECT master_create_empty_shard('users') AS new_shard_id +\gset +ERROR: relation "users" does not exist +UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)' +WHERE shardid = :new_shard_id; +ERROR: permission denied for table pg_dist_shard +SELECT master_create_empty_shard('users') AS new_shard_id +\gset +ERROR: relation "users" does not exist +UPDATE pg_dist_shard SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)' +WHERE shardid = :new_shard_id; +ERROR: permission denied for table pg_dist_shard +SELECT master_create_empty_shard('users') AS new_shard_id +\gset +ERROR: relation "users" does not exist +UPDATE pg_dist_shard SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)' +WHERE shardid = :new_shard_id; +ERROR: permission denied for table pg_dist_shard +SELECT master_create_empty_shard('users') AS new_shard_id +\gset +ERROR: relation "users" does not exist +UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)' +WHERE shardid = :new_shard_id; +ERROR: permission denied for table pg_dist_shard +\COPY users FROM STDIN WITH CSV +ERROR: relation "users" does not exist +"(1,1001)",1472807115 +"(1,1002)",1472807215 +"(1,1003)",1472807315 +\. +invalid command \. +-- Create tables for subquery tests +CREATE TABLE lineitem_subquery ( + l_orderkey bigint not null, + l_partkey integer not null, + l_suppkey integer not null, + l_linenumber integer not null, + l_quantity decimal(15, 2) not null, + l_extendedprice decimal(15, 2) not null, + l_discount decimal(15, 2) not null, + l_tax decimal(15, 2) not null, + l_returnflag char(1) not null, + l_linestatus char(1) not null, + l_shipdate date not null, + l_commitdate date not null, + l_receiptdate date not null, + l_shipinstruct char(25) not null, + l_shipmode char(10) not null, + l_comment varchar(44) not null, + PRIMARY KEY(l_orderkey, l_linenumber) ); +ERROR: syntax error at or near ""(1,1001)"" +LINE 1: "(1,1001)",1472807115 + ^ +SELECT master_create_distributed_table('lineitem_subquery', 'l_orderkey', 'range'); +ERROR: relation "lineitem_subquery" does not exist +LINE 1: SELECT master_create_distributed_table('lineitem_subquery', ... + ^ +CREATE TABLE orders_subquery ( + o_orderkey bigint not null, + o_custkey integer not null, + o_orderstatus char(1) not null, + o_totalprice decimal(15,2) not null, + o_orderdate date not null, + o_orderpriority char(15) not null, + o_clerk char(15) not null, + o_shippriority integer not null, + o_comment varchar(79) not null, + PRIMARY KEY(o_orderkey) ); +SELECT master_create_distributed_table('orders_subquery', 'o_orderkey', 'range'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SET citus.enable_router_execution TO 'false'; +-- Check that we don't crash if there are not any shards. +SELECT + avg(unit_price) +FROM + (SELECT + l_orderkey, + avg(o_totalprice) AS unit_price + FROM + lineitem_subquery, + orders_subquery + WHERE + l_orderkey = o_orderkey + GROUP BY + l_orderkey) AS unit_prices; +ERROR: relation "lineitem_subquery" does not exist +LINE 8: lineitem_subquery, + ^ +-- Load data into tables. +SELECT master_create_empty_shard('lineitem_subquery') AS new_shard_id +\gset +ERROR: relation "lineitem_subquery" does not exist +UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 5986 +WHERE shardid = :new_shard_id; +ERROR: permission denied for table pg_dist_shard +SELECT master_create_empty_shard('lineitem_subquery') AS new_shard_id +\gset +ERROR: relation "lineitem_subquery" does not exist +UPDATE pg_dist_shard SET shardminvalue = 8997, shardmaxvalue = 14947 +WHERE shardid = :new_shard_id; +ERROR: permission denied for table pg_dist_shard +SELECT master_create_empty_shard('orders_subquery') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 5986 +WHERE shardid = :new_shard_id; +ERROR: permission denied for table pg_dist_shard +SELECT master_create_empty_shard('orders_subquery') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = 8997, shardmaxvalue = 14947 +WHERE shardid = :new_shard_id; +ERROR: permission denied for table pg_dist_shard +SET citus.shard_max_size TO "1MB"; +\copy lineitem_subquery FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|' +ERROR: relation "lineitem_subquery" does not exist +\copy lineitem_subquery FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|' +ERROR: relation "lineitem_subquery" does not exist +\copy orders_subquery FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|' +ERROR: could not start copy +DETAIL: Distributed relation "orders_subquery" has shards with missing shardminvalue/shardmaxvalue. +\copy orders_subquery FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|' +ERROR: could not start copy +DETAIL: Distributed relation "orders_subquery" has shards with missing shardminvalue/shardmaxvalue. +CREATE TABLE events_reference_table (like events_table including all); +SELECT create_reference_table('events_reference_table'); + create_reference_table +------------------------ + +(1 row) + +CREATE INDEX events_ref_val2 on events_reference_table(value_2); +INSERT INTO events_reference_table SELECT * FROM events_table; +CREATE TABLE users_reference_table (like users_table including all); +SELECT create_reference_table('users_reference_table'); + create_reference_table +------------------------ + +(1 row) + +INSERT INTO users_reference_table SELECT * FROM users_table; diff --git a/src/test/regress/output/multi_load_data_shadow.source b/src/test/regress/output/multi_load_data_shadow.source new file mode 100644 index 000000000..d8099d0f8 --- /dev/null +++ b/src/test/regress/output/multi_load_data_shadow.source @@ -0,0 +1,21 @@ +-- +-- MULTI_LOAD_DATA +-- +-- Tests for loading data in a distributed cluster. Please note that the number +-- of shards uploaded depends on two config values: citus.shard_replication_factor and +-- citus.shard_max_size. These values are set in pg_regress_multi.pl. Shard placement +-- policy is left to the default value (round-robin) to test the common install case. +SET citus.next_shard_id TO 290000; +\copy lineitem FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|' +\copy lineitem FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|' +\copy orders FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|' +\copy orders FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|' +\copy orders_reference FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|' +\copy orders_reference FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|' +\copy customer FROM '@abs_srcdir@/data/customer.1.data' with delimiter '|' +\copy customer_append FROM '@abs_srcdir@/data/customer.1.data' with delimiter '|' +\copy nation FROM '@abs_srcdir@/data/nation.data' with delimiter '|' +\copy part FROM '@abs_srcdir@/data/part.data' with delimiter '|' +\copy part_append FROM '@abs_srcdir@/data/part.data' with delimiter '|' +\copy supplier FROM '@abs_srcdir@/data/supplier.data' with delimiter '|' +\copy supplier_single_shard FROM '@abs_srcdir@/data/supplier.data' with delimiter '|' diff --git a/src/test/regress/sql/multi_create_table_shadow.sql b/src/test/regress/sql/multi_create_table_shadow.sql new file mode 100644 index 000000000..29e4aa3a7 --- /dev/null +++ b/src/test/regress/sql/multi_create_table_shadow.sql @@ -0,0 +1,290 @@ +-- +-- MULTI_CREATE_TABLE +-- + +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 360000; +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 100000; + +-- Create new table definitions for use in testing in distributed planning and +-- execution functionality. Also create indexes to boost performance. Since we +-- need to cover both reference join and partitioned join, we have created +-- reference and append distributed version of orders, customer and part tables. + +CREATE TABLE lineitem ( + l_orderkey bigint not null, + l_partkey integer not null, + l_suppkey integer not null, + l_linenumber integer not null, + l_quantity decimal(15, 2) not null, + l_extendedprice decimal(15, 2) not null, + l_discount decimal(15, 2) not null, + l_tax decimal(15, 2) not null, + l_returnflag char(1) not null, + l_linestatus char(1) not null, + l_shipdate date not null, + l_commitdate date not null, + l_receiptdate date not null, + l_shipinstruct char(25) not null, + l_shipmode char(10) not null, + l_comment varchar(44) not null, + PRIMARY KEY(l_orderkey, l_linenumber) ); +SELECT create_distributed_table('lineitem', 'l_orderkey', 'append'); + +CREATE INDEX lineitem_time_index ON lineitem (l_shipdate); + +CREATE TABLE orders ( + o_orderkey bigint not null, + o_custkey integer not null, + o_orderstatus char(1) not null, + o_totalprice decimal(15,2) not null, + o_orderdate date not null, + o_orderpriority char(15) not null, + o_clerk char(15) not null, + o_shippriority integer not null, + o_comment varchar(79) not null, + PRIMARY KEY(o_orderkey) ); +SELECT create_distributed_table('orders', 'o_orderkey', 'append'); + +CREATE TABLE orders_reference ( + o_orderkey bigint not null, + o_custkey integer not null, + o_orderstatus char(1) not null, + o_totalprice decimal(15,2) not null, + o_orderdate date not null, + o_orderpriority char(15) not null, + o_clerk char(15) not null, + o_shippriority integer not null, + o_comment varchar(79) not null, + PRIMARY KEY(o_orderkey) ); +SELECT create_reference_table('orders_reference'); + + +CREATE TABLE customer ( + c_custkey integer not null, + c_name varchar(25) not null, + c_address varchar(40) not null, + c_nationkey integer not null, + c_phone char(15) not null, + c_acctbal decimal(15,2) not null, + c_mktsegment char(10) not null, + c_comment varchar(117) not null); +SELECT create_reference_table('customer'); + +CREATE TABLE customer_append ( + c_custkey integer not null, + c_name varchar(25) not null, + c_address varchar(40) not null, + c_nationkey integer not null, + c_phone char(15) not null, + c_acctbal decimal(15,2) not null, + c_mktsegment char(10) not null, + c_comment varchar(117) not null); +SELECT create_distributed_table('customer_append', 'c_custkey', 'append'); + +CREATE TABLE nation ( + n_nationkey integer not null, + n_name char(25) not null, + n_regionkey integer not null, + n_comment varchar(152)); + +SELECT create_reference_table('nation'); + +CREATE TABLE part ( + p_partkey integer not null, + p_name varchar(55) not null, + p_mfgr char(25) not null, + p_brand char(10) not null, + p_type varchar(25) not null, + p_size integer not null, + p_container char(10) not null, + p_retailprice decimal(15,2) not null, + p_comment varchar(23) not null); +SELECT create_reference_table('part'); + +CREATE TABLE part_append ( + p_partkey integer not null, + p_name varchar(55) not null, + p_mfgr char(25) not null, + p_brand char(10) not null, + p_type varchar(25) not null, + p_size integer not null, + p_container char(10) not null, + p_retailprice decimal(15,2) not null, + p_comment varchar(23) not null); +SELECT create_distributed_table('part_append', 'p_partkey', 'append'); + +CREATE TABLE supplier +( + s_suppkey integer not null, + s_name char(25) not null, + s_address varchar(40) not null, + s_nationkey integer, + s_phone char(15) not null, + s_acctbal decimal(15,2) not null, + s_comment varchar(101) not null +); +SELECT create_reference_table('supplier'); + +-- create a single shard supplier table which is not +-- a reference table +CREATE TABLE supplier_single_shard +( + s_suppkey integer not null, + s_name char(25) not null, + s_address varchar(40) not null, + s_nationkey integer, + s_phone char(15) not null, + s_acctbal decimal(15,2) not null, + s_comment varchar(101) not null +); +SELECT create_distributed_table('supplier_single_shard', 's_suppkey', 'append'); + +CREATE TABLE mx_table_test (col1 int, col2 text); + +-- Since we're superuser, we can set the replication model to 'streaming' to +-- create a one-off MX table... but if we forget to set the replication factor to one, +-- we should see an error reminding us to fix that +SET citus.replication_model TO 'streaming'; +SELECT create_distributed_table('mx_table_test', 'col1'); + +-- ok, so now actually create the one-off MX table +SET citus.shard_replication_factor TO 1; +SELECT create_distributed_table('mx_table_test', 'col1'); +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regclass; +DROP TABLE mx_table_test; + +-- Show that master_create_distributed_table ignores citus.replication_model GUC +CREATE TABLE s_table(a int); +SELECT master_create_distributed_table('s_table', 'a', 'hash'); +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='s_table'::regclass; + +-- Show that master_create_worker_shards complains when RF>1 and replication model is streaming +UPDATE pg_dist_partition SET repmodel = 's' WHERE logicalrelid='s_table'::regclass; +SELECT master_create_worker_shards('s_table', 4, 2); + +DROP TABLE s_table; + +RESET citus.replication_model; + +-- Show that create_distributed_table with append and range distributions ignore +-- citus.replication_model GUC +SET citus.shard_replication_factor TO 2; +SET citus.replication_model TO streaming; + +CREATE TABLE repmodel_test (a int); +SELECT create_distributed_table('repmodel_test', 'a', 'append'); +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass; +DROP TABLE repmodel_test; + +CREATE TABLE repmodel_test (a int); +SELECT create_distributed_table('repmodel_test', 'a', 'range'); +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass; +DROP TABLE repmodel_test; + +-- Show that master_create_distributed_table created statement replicated tables no matter +-- what citus.replication_model set to + +CREATE TABLE repmodel_test (a int); +SELECT master_create_distributed_table('repmodel_test', 'a', 'hash'); +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass; +DROP TABLE repmodel_test; + +CREATE TABLE repmodel_test (a int); +SELECT master_create_distributed_table('repmodel_test', 'a', 'append'); +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass; +DROP TABLE repmodel_test; + +CREATE TABLE repmodel_test (a int); +SELECT master_create_distributed_table('repmodel_test', 'a', 'range'); +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass; +DROP TABLE repmodel_test; + +-- Check that the replication_model overwrite behavior is the same with RF=1 +SET citus.shard_replication_factor TO 1; + +CREATE TABLE repmodel_test (a int); +SELECT create_distributed_table('repmodel_test', 'a', 'append'); +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass; +DROP TABLE repmodel_test; + +CREATE TABLE repmodel_test (a int); +SELECT create_distributed_table('repmodel_test', 'a', 'range'); +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass; +DROP TABLE repmodel_test; + +CREATE TABLE repmodel_test (a int); +SELECT master_create_distributed_table('repmodel_test', 'a', 'hash'); +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass; +DROP TABLE repmodel_test; + +CREATE TABLE repmodel_test (a int); +SELECT master_create_distributed_table('repmodel_test', 'a', 'append'); +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass; +DROP TABLE repmodel_test; + +CREATE TABLE repmodel_test (a int); +SELECT master_create_distributed_table('repmodel_test', 'a', 'range'); +SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regclass; +DROP TABLE repmodel_test; + +RESET citus.replication_model; + +-- Test initial data loading +CREATE TABLE data_load_test (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test VALUES (132, 'hello'); +INSERT INTO data_load_test VALUES (243, 'world'); + +-- table must be empty when using append- or range-partitioning +SELECT create_distributed_table('data_load_test', 'col1', 'append'); +SELECT create_distributed_table('data_load_test', 'col1', 'range'); + +-- table must be empty when using master_create_distributed_table (no shards created) +SELECT master_create_distributed_table('data_load_test', 'col1', 'hash'); + +-- create_distributed_table creates shards and copies data into the distributed table +SELECT create_distributed_table('data_load_test', 'col1'); +SELECT * FROM data_load_test ORDER BY col1; +DROP TABLE data_load_test; + +-- test queries on distributed tables with no shards +CREATE TABLE no_shard_test (col1 int, col2 text); +SELECT create_distributed_table('no_shard_test', 'col1', 'append'); +SELECT * FROM no_shard_test WHERE col1 > 1; +DROP TABLE no_shard_test; + +CREATE TABLE no_shard_test (col1 int, col2 text); +SELECT create_distributed_table('no_shard_test', 'col1', 'range'); +SELECT * FROM no_shard_test WHERE col1 > 1; +DROP TABLE no_shard_test; + +CREATE TABLE no_shard_test (col1 int, col2 text); +SELECT master_create_distributed_table('no_shard_test', 'col1', 'hash'); +SELECT * FROM no_shard_test WHERE col1 > 1; +DROP TABLE no_shard_test; + +-- ensure writes in the same transaction as create_distributed_table are visible +BEGIN; +CREATE TABLE data_load_test (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test VALUES (132, 'hello'); +SELECT create_distributed_table('data_load_test', 'col1'); +INSERT INTO data_load_test VALUES (243, 'world'); +END; +SELECT * FROM data_load_test ORDER BY col1; +DROP TABLE data_load_test; + +-- creating co-located distributed tables in the same transaction works +BEGIN; +CREATE TABLE data_load_test1 (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test1 VALUES (132, 'hello'); +SELECT create_distributed_table('data_load_test1', 'col1'); + +CREATE TABLE data_load_test2 (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test2 VALUES (132, 'world'); +SELECT create_distributed_table('data_load_test2', 'col1'); + +SELECT a.col2 ||' '|| b.col2 +FROM data_load_test1 a JOIN data_load_test2 b USING (col1) +WHERE col1 = 132; + +DROP TABLE data_load_test1, data_load_test2; +END; diff --git a/src/test/regress/sql/multi_test_helpers_shadow.sql b/src/test/regress/sql/multi_test_helpers_shadow.sql new file mode 100644 index 000000000..0a73c51fc --- /dev/null +++ b/src/test/regress/sql/multi_test_helpers_shadow.sql @@ -0,0 +1,106 @@ +-- File to create functions and helpers needed for subsequent tests + +-- create a helper function to create objects on each node +CREATE OR REPLACE FUNCTION run_command_on_master_and_workers(p_sql text) +RETURNS void LANGUAGE plpgsql AS $$ +BEGIN + EXECUTE p_sql; + PERFORM run_command_on_workers(p_sql); +END;$$; + +-- Create a function to make sure that queries returning the same result +CREATE OR REPLACE FUNCTION raise_failed_execution(query text) RETURNS void AS $$ +BEGIN + EXECUTE query; + EXCEPTION WHEN OTHERS THEN + IF SQLERRM LIKE 'failed to execute task%' THEN + RAISE 'Task failed to execute'; + END IF; +END; +$$LANGUAGE plpgsql; + +-- Create a function to ignore worker plans in explain output +CREATE OR REPLACE FUNCTION coordinator_plan(explain_commmand text, out query_plan text) +RETURNS SETOF TEXT AS $$ +BEGIN + FOR query_plan IN execute explain_commmand LOOP + RETURN next; + IF query_plan LIKE '%Task Count:%' + THEN + RETURN; + END IF; + END LOOP; + RETURN; +END; $$ language plpgsql; + +-- Is a distributed plan? +CREATE OR REPLACE FUNCTION plan_is_distributed(explain_commmand text) +RETURNS BOOLEAN AS $$ +DECLARE + query_plan TEXT; +BEGIN + FOR query_plan IN execute explain_commmand LOOP + IF query_plan LIKE '%Task Count:%' + THEN + RETURN TRUE; + END IF; + END LOOP; + RETURN FALSE; +END; $$ language plpgsql; + +-- helper function to quickly run SQL on the whole cluster +CREATE OR REPLACE FUNCTION run_command_on_coordinator_and_workers(p_sql text) +RETURNS void LANGUAGE plpgsql AS $$ +BEGIN + EXECUTE p_sql; + PERFORM run_command_on_workers(p_sql); +END;$$; + +-- 1. Marks the given procedure as colocated with the given table. +-- 2. Marks the argument index with which we route the procedure. +CREATE OR REPLACE FUNCTION colocate_proc_with_table(procname text, tablerelid regclass, argument_index int) +RETURNS void LANGUAGE plpgsql AS $$ +BEGIN + update citus.pg_dist_object + set distribution_argument_index = argument_index, colocationid = pg_dist_partition.colocationid + from pg_proc, pg_dist_partition + where proname = procname and oid = objid and pg_dist_partition.logicalrelid = tablerelid; +END;$$; + +-- helper function to verify the function of a coordinator is the same on all workers +CREATE OR REPLACE FUNCTION verify_function_is_same_on_workers(funcname text) + RETURNS bool + LANGUAGE plpgsql +AS $func$ +DECLARE + coordinatorSql text; + workerSql text; +BEGIN + SELECT pg_get_functiondef(funcname::regprocedure) INTO coordinatorSql; + FOR workerSql IN SELECT result FROM run_command_on_workers('SELECT pg_get_functiondef(' || quote_literal(funcname) || '::regprocedure)') LOOP + IF workerSql != coordinatorSql THEN + RAISE INFO 'functions are different, coordinator:% worker:%', coordinatorSql, workerSql; + RETURN false; + END IF; + END LOOP; + + RETURN true; +END; +$func$; + +-- +-- Procedure for creating shards for range partitioned distributed table. +-- +CREATE OR REPLACE PROCEDURE create_range_partitioned_shards(rel regclass, minvalues text[], maxvalues text[]) +AS $$ +DECLARE + new_shardid bigint; + idx int; +BEGIN + FOR idx IN SELECT * FROM generate_series(1, array_length(minvalues, 1)) + LOOP + SELECT master_create_empty_shard(rel::text) INTO new_shardid; + UPDATE pg_dist_shard SET shardminvalue=minvalues[idx], shardmaxvalue=maxvalues[idx] WHERE shardid=new_shardid; + END LOOP; +END; +$$ LANGUAGE plpgsql;