Merge pull request #536 from citusdata/fix/fix_302_update_test_to_create_hash_partitioned_table

Update regression tests where metadata edited manually
pull/1938/head
Burak Yücesoy 2016-06-04 17:10:22 +03:00
commit 0f78eb6216
15 changed files with 510 additions and 372 deletions

View File

@ -56,82 +56,89 @@ CREATE FUNCTION column_name_to_column(regclass, text)
-- =================================================================== -- ===================================================================
-- test distribution metadata functionality -- test distribution metadata functionality
-- =================================================================== -- ===================================================================
-- create table to be distributed -- create hash distributed table
CREATE TABLE events_hash ( CREATE TABLE events_hash (
id bigint, id bigint,
name text name text
); );
-- for this table we'll "distribute" manually but verify using function calls SELECT master_create_distributed_table('events_hash', 'name', 'hash');
INSERT INTO pg_dist_shard master_create_distributed_table
(shardid, logicalrelid, shardstorage, shardminvalue, shardmaxvalue) ---------------------------------
VALUES
(1, 'events_hash'::regclass, 't', '0', '10'), (1 row)
(2, 'events_hash'::regclass, 't', '10', '20'),
(3, 'events_hash'::regclass, 't', '20', '30'), -- create worker shards
(4, 'events_hash'::regclass, 't', '30', '40'); SELECT master_create_worker_shards('events_hash', 4, 2);
INSERT INTO pg_dist_shard_placement master_create_worker_shards
(nodename, nodeport, shardid, shardstate, shardlength) -----------------------------
VALUES
('cluster-worker-01', 5432, 1, 0, 0), (1 row)
('cluster-worker-01', 5432, 2, 0, 0),
('cluster-worker-02', 5433, 3, 0, 0), -- set shardstate of one replication from each shard to 0 (invalid value)
('cluster-worker-02', 5433, 4, 0, 0), UPDATE pg_dist_shard_placement SET shardstate = 0 WHERE nodeport = 57638 AND shardid BETWEEN 103025 AND 103028;
('cluster-worker-03', 5434, 1, 1, 0),
('cluster-worker-03', 5434, 2, 1, 0),
('cluster-worker-04', 5435, 3, 1, 0),
('cluster-worker-04', 5435, 4, 1, 0);
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey)
VALUES
('events_hash'::regclass, 'h', column_name_to_column('events_hash'::regclass, 'name'));
-- should see above shard identifiers -- should see above shard identifiers
SELECT load_shard_id_array('events_hash'); SELECT load_shard_id_array('events_hash');
load_shard_id_array load_shard_id_array
--------------------- -------------------------------
{1,2,3,4} {103025,103026,103027,103028}
(1 row) (1 row)
-- should see array with first shard range -- should see array with first shard range
SELECT load_shard_interval_array(1, 0); SELECT load_shard_interval_array(103025, 0);
load_shard_interval_array load_shard_interval_array
--------------------------- ---------------------------
{0,10} {-2147483648,-1073741825}
(1 row) (1 row)
-- should even work for range-partitioned shards -- should even work for range-partitioned shards
BEGIN; -- create range distributed table
UPDATE pg_dist_shard SET CREATE TABLE events_range (
shardminvalue = 'Aardvark', id bigint,
shardmaxvalue = 'Zebra' name text
WHERE shardid = 1; );
UPDATE pg_dist_partition SET partmethod = 'r' SELECT master_create_distributed_table('events_range', 'name', 'range');
WHERE logicalrelid = 'events_hash'::regclass; master_create_distributed_table
SELECT load_shard_interval_array(1, ''::text); ---------------------------------
(1 row)
-- create empty shard
SELECT master_create_empty_shard('events_range');
master_create_empty_shard
---------------------------
103029
(1 row)
UPDATE pg_dist_shard SET
shardminvalue = 'Aardvark',
shardmaxvalue = 'Zebra'
WHERE shardid = 103029;
SELECT load_shard_interval_array(103029, ''::text);
load_shard_interval_array load_shard_interval_array
--------------------------- ---------------------------
{Aardvark,Zebra} {Aardvark,Zebra}
(1 row) (1 row)
ROLLBACK;
-- should see error for non-existent shard -- should see error for non-existent shard
SELECT load_shard_interval_array(5, 0); SELECT load_shard_interval_array(103030, 0);
ERROR: could not find valid entry for shard 5 ERROR: could not find valid entry for shard 103030
-- should see two placements -- should see two placements
SELECT load_shard_placement_array(2, false); SELECT load_shard_placement_array(103026, false);
load_shard_placement_array load_shard_placement_array
------------------------------------------------- -----------------------------------
{cluster-worker-03:5434,cluster-worker-01:5432} {localhost:57637,localhost:57638}
(1 row) (1 row)
-- only one of which is finalized -- only one of which is finalized
SELECT load_shard_placement_array(2, true); SELECT load_shard_placement_array(103026, true);
load_shard_placement_array load_shard_placement_array
---------------------------- ----------------------------
{cluster-worker-03:5434} {localhost:57637}
(1 row) (1 row)
-- should see error for non-existent shard -- should see error for non-existent shard
SELECT load_shard_placement_array(6, false); SELECT load_shard_placement_array(103031, false);
WARNING: could not find any shard placements for shardId 6 WARNING: could not find any shard placements for shardId 103031
load_shard_placement_array load_shard_placement_array
---------------------------- ----------------------------
{} {}
@ -185,9 +192,11 @@ SELECT column_name_to_column_id('events_hash', 'non_existent');
ERROR: column "non_existent" of relation "events_hash" does not exist ERROR: column "non_existent" of relation "events_hash" does not exist
-- drop shard rows (must drop placements first) -- drop shard rows (must drop placements first)
DELETE FROM pg_dist_shard_placement DELETE FROM pg_dist_shard_placement
WHERE shardid BETWEEN 1 AND 4; WHERE shardid BETWEEN 103025 AND 103029;
DELETE FROM pg_dist_shard DELETE FROM pg_dist_shard
WHERE logicalrelid = 'events_hash'::regclass; WHERE logicalrelid = 'events_hash'::regclass;
DELETE FROM pg_dist_shard
WHERE logicalrelid = 'events_range'::regclass;
-- verify that an eager load shows them missing -- verify that an eager load shows them missing
SELECT load_shard_id_array('events_hash'); SELECT load_shard_id_array('events_hash');
load_shard_id_array load_shard_id_array
@ -266,9 +275,9 @@ WHERE shardid = :new_shard_id AND nodename = 'localhost' and nodeport = 5432;
-- deleting or updating a non-existent row should fail -- deleting or updating a non-existent row should fail
SELECT delete_shard_placement_row(:new_shard_id, 'wrong_localhost', 5432); SELECT delete_shard_placement_row(:new_shard_id, 'wrong_localhost', 5432);
ERROR: could not find valid entry for shard placement 103024 on node "wrong_localhost:5432" ERROR: could not find valid entry for shard placement 103030 on node "wrong_localhost:5432"
SELECT update_shard_placement_row_state(:new_shard_id, 'localhost', 5432, 3); SELECT update_shard_placement_row_state(:new_shard_id, 'localhost', 5432, 3);
ERROR: could not find valid entry for shard placement 103024 on node "localhost:5432" ERROR: could not find valid entry for shard placement 103030 on node "localhost:5432"
-- now we'll even test our lock methods... -- now we'll even test our lock methods...
-- use transaction to bound how long we hold the lock -- use transaction to bound how long we hold the lock
BEGIN; BEGIN;

View File

@ -5,6 +5,90 @@
SET citus.explain_distributed_queries TO off; SET citus.explain_distributed_queries TO off;
SET citus.log_multi_join_order TO TRUE; SET citus.log_multi_join_order TO TRUE;
SET client_min_messages TO DEBUG2; SET client_min_messages TO DEBUG2;
-- Create new table definitions for use in testing in distributed planning and
-- execution functionality. Also create indexes to boost performance.
CREATE TABLE lineitem_hash (
l_orderkey bigint not null,
l_partkey integer not null,
l_suppkey integer not null,
l_linenumber integer not null,
l_quantity decimal(15, 2) not null,
l_extendedprice decimal(15, 2) not null,
l_discount decimal(15, 2) not null,
l_tax decimal(15, 2) not null,
l_returnflag char(1) not null,
l_linestatus char(1) not null,
l_shipdate date not null,
l_commitdate date not null,
l_receiptdate date not null,
l_shipinstruct char(25) not null,
l_shipmode char(10) not null,
l_comment varchar(44) not null,
PRIMARY KEY(l_orderkey, l_linenumber) );
DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "lineitem_hash_pkey" for table "lineitem_hash"
DEBUG: building index "lineitem_hash_pkey" on table "lineitem_hash"
SELECT master_create_distributed_table('lineitem_hash', 'l_orderkey', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('lineitem_hash', 2, 1);
master_create_worker_shards
-----------------------------
(1 row)
CREATE INDEX lineitem_hash_time_index ON lineitem_hash (l_shipdate);
DEBUG: applied command on shard 102037 on node localhost:57637
DEBUG: applied command on shard 102038 on node localhost:57638
DEBUG: building index "lineitem_hash_time_index" on table "lineitem_hash"
CREATE TABLE orders_hash (
o_orderkey bigint not null,
o_custkey integer not null,
o_orderstatus char(1) not null,
o_totalprice decimal(15,2) not null,
o_orderdate date not null,
o_orderpriority char(15) not null,
o_clerk char(15) not null,
o_shippriority integer not null,
o_comment varchar(79) not null,
PRIMARY KEY(o_orderkey) );
DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "orders_hash_pkey" for table "orders_hash"
DEBUG: building index "orders_hash_pkey" on table "orders_hash"
SELECT master_create_distributed_table('orders_hash', 'o_orderkey', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('orders_hash', 2, 1);
master_create_worker_shards
-----------------------------
(1 row)
CREATE TABLE customer_hash (
c_custkey integer not null,
c_name varchar(25) not null,
c_address varchar(40) not null,
c_nationkey integer not null,
c_phone char(15) not null,
c_acctbal decimal(15,2) not null,
c_mktsegment char(10) not null,
c_comment varchar(117) not null);
SELECT master_create_distributed_table('customer_hash', 'c_custkey', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('customer_hash', 1, 1);
master_create_worker_shards
-----------------------------
(1 row)
-- The following query checks that we can correctly handle self-joins -- The following query checks that we can correctly handle self-joins
EXPLAIN SELECT l1.l_quantity FROM lineitem l1, lineitem l2 EXPLAIN SELECT l1.l_quantity FROM lineitem l1, lineitem l2
WHERE l1.l_orderkey = l2.l_orderkey AND l1.l_quantity > 5; WHERE l1.l_orderkey = l2.l_orderkey AND l1.l_quantity > 5;
@ -68,44 +152,32 @@ DETAIL: Cartesian products are currently unsupported
BEGIN; BEGIN;
-- Validate that we take into account the partition method when building the -- Validate that we take into account the partition method when building the
-- join-order plan. -- join-order plan.
UPDATE pg_dist_partition SET partmethod = 'h' WHERE EXPLAIN SELECT count(*) FROM orders, lineitem_hash
logicalrelid = (SELECT relfilenode FROM pg_class WHERE relname = 'lineitem');
EXPLAIN SELECT count(*) FROM orders, lineitem
WHERE o_orderkey = l_orderkey; WHERE o_orderkey = l_orderkey;
LOG: join order: [ "orders" ][ single partition join "lineitem" ] LOG: join order: [ "orders" ][ single partition join "lineitem_hash" ]
QUERY PLAN QUERY PLAN
------------------------------------------------------------ ------------------------------------------------------------
explain statements for distributed queries are not enabled explain statements for distributed queries are not enabled
(1 row) (1 row)
-- Verify we handle local joins between two hash-partitioned tables. -- Verify we handle local joins between two hash-partitioned tables.
UPDATE pg_dist_partition SET partmethod = 'h' WHERE EXPLAIN SELECT count(*) FROM orders_hash, lineitem_hash
logicalrelid = (SELECT relfilenode FROM pg_class WHERE relname = 'orders');
EXPLAIN SELECT count(*) FROM orders, lineitem
WHERE o_orderkey = l_orderkey; WHERE o_orderkey = l_orderkey;
LOG: join order: [ "orders" ][ local partition join "lineitem" ] LOG: join order: [ "orders_hash" ][ local partition join "lineitem_hash" ]
QUERY PLAN QUERY PLAN
------------------------------------------------------------ ------------------------------------------------------------
explain statements for distributed queries are not enabled explain statements for distributed queries are not enabled
(1 row) (1 row)
UPDATE pg_dist_partition SET partmethod = 'a' WHERE
logicalrelid = (SELECT relfilenode FROM pg_class WHERE relname = 'lineitem');
UPDATE pg_dist_partition SET partmethod = 'a' WHERE
logicalrelid = (SELECT relfilenode FROM pg_class WHERE relname = 'orders');
-- Validate that we can handle broadcast joins with hash-partitioned tables. -- Validate that we can handle broadcast joins with hash-partitioned tables.
UPDATE pg_dist_partition SET partmethod = 'h' WHERE EXPLAIN SELECT count(*) FROM customer_hash, nation
logicalrelid = (SELECT relfilenode FROM pg_class WHERE relname = 'customer');
EXPLAIN SELECT count(*) FROM customer, nation
WHERE c_nationkey = n_nationkey; WHERE c_nationkey = n_nationkey;
LOG: join order: [ "customer" ][ broadcast join "nation" ] LOG: join order: [ "customer_hash" ][ broadcast join "nation" ]
QUERY PLAN QUERY PLAN
------------------------------------------------------------ ------------------------------------------------------------
explain statements for distributed queries are not enabled explain statements for distributed queries are not enabled
(1 row) (1 row)
UPDATE pg_dist_partition SET partmethod = 'a' WHERE
logicalrelid = (SELECT relfilenode FROM pg_class WHERE relname = 'customer');
-- Update the large table shard count for all the following tests. -- Update the large table shard count for all the following tests.
SET citus.large_table_shard_count TO 1; SET citus.large_table_shard_count TO 1;
-- Validate that we don't use a single-partition join method for a hash -- Validate that we don't use a single-partition join method for a hash
@ -120,32 +192,27 @@ LOG: join order: [ "orders" ][ dual partition join "lineitem" ][ dual partition
-- Validate that we don't chose a single-partition join method with a -- Validate that we don't chose a single-partition join method with a
-- hash-partitioned base table -- hash-partitioned base table
UPDATE pg_dist_partition SET partmethod = 'h' WHERE EXPLAIN SELECT count(*) FROM orders, customer_hash
logicalrelid = (SELECT relfilenode FROM pg_class WHERE relname = 'customer');
EXPLAIN SELECT count(*) FROM orders, customer
WHERE c_custkey = o_custkey; WHERE c_custkey = o_custkey;
LOG: join order: [ "orders" ][ dual partition join "customer" ] LOG: join order: [ "orders" ][ dual partition join "customer_hash" ]
QUERY PLAN QUERY PLAN
------------------------------------------------------------ ------------------------------------------------------------
explain statements for distributed queries are not enabled explain statements for distributed queries are not enabled
(1 row) (1 row)
UPDATE pg_dist_partition SET partmethod = 'a' WHERE
logicalrelid = (SELECT relfilenode FROM pg_class WHERE relname = 'customer');
-- Validate that we can re-partition a hash partitioned table to join with a -- Validate that we can re-partition a hash partitioned table to join with a
-- range partitioned one. -- range partitioned one.
UPDATE pg_dist_partition SET partmethod = 'h' WHERE EXPLAIN SELECT count(*) FROM orders_hash, customer
logicalrelid = (SELECT relfilenode FROM pg_class WHERE relname = 'orders');
EXPLAIN SELECT count(*) FROM orders, customer
WHERE c_custkey = o_custkey; WHERE c_custkey = o_custkey;
LOG: join order: [ "orders" ][ single partition join "customer" ] LOG: join order: [ "orders_hash" ][ single partition join "customer" ]
QUERY PLAN QUERY PLAN
------------------------------------------------------------ ------------------------------------------------------------
explain statements for distributed queries are not enabled explain statements for distributed queries are not enabled
(1 row) (1 row)
UPDATE pg_dist_partition SET partmethod = 'a' WHERE
logicalrelid = (SELECT relfilenode FROM pg_class WHERE relname = 'orders');
COMMIT; COMMIT;
-- Reset client logging level to its previous value -- Reset client logging level to its previous value
SET client_min_messages TO NOTICE; SET client_min_messages TO NOTICE;
DROP TABLE lineitem_hash;
DROP TABLE orders_hash;
DROP TABLE customer_hash;

View File

@ -74,13 +74,13 @@ DEBUG: join prunable for intervals [6001,7000] and [1,1000]
DEBUG: generated sql query for job 1251 and task 3 DEBUG: generated sql query for job 1251 and task 3
DETAIL: query string: "SELECT "pg_merge_job_1250.task_000019".intermediate_column_1250_0, "pg_merge_job_1250.task_000019".intermediate_column_1250_1, "pg_merge_job_1250.task_000019".intermediate_column_1250_2, "pg_merge_job_1250.task_000019".intermediate_column_1250_3, "pg_merge_job_1250.task_000019".intermediate_column_1250_4 FROM (pg_merge_job_1250.task_000019 "pg_merge_job_1250.task_000019" JOIN part_102019 part ON (("pg_merge_job_1250.task_000019".intermediate_column_1250_0 = part.p_partkey))) WHERE (part.p_size > 8)" DETAIL: query string: "SELECT "pg_merge_job_1250.task_000019".intermediate_column_1250_0, "pg_merge_job_1250.task_000019".intermediate_column_1250_1, "pg_merge_job_1250.task_000019".intermediate_column_1250_2, "pg_merge_job_1250.task_000019".intermediate_column_1250_3, "pg_merge_job_1250.task_000019".intermediate_column_1250_4 FROM (pg_merge_job_1250.task_000019 "pg_merge_job_1250.task_000019" JOIN part_102019 part ON (("pg_merge_job_1250.task_000019".intermediate_column_1250_0 = part.p_partkey))) WHERE (part.p_size > 8)"
DEBUG: generated sql query for job 1251 and task 6 DEBUG: generated sql query for job 1251 and task 6
DETAIL: query string: "SELECT "pg_merge_job_1250.task_000026".intermediate_column_1250_0, "pg_merge_job_1250.task_000026".intermediate_column_1250_1, "pg_merge_job_1250.task_000026".intermediate_column_1250_2, "pg_merge_job_1250.task_000026".intermediate_column_1250_3, "pg_merge_job_1250.task_000026".intermediate_column_1250_4 FROM (pg_merge_job_1250.task_000026 "pg_merge_job_1250.task_000026" JOIN part_102039 part ON (("pg_merge_job_1250.task_000026".intermediate_column_1250_0 = part.p_partkey))) WHERE (part.p_size > 8)" DETAIL: query string: "SELECT "pg_merge_job_1250.task_000026".intermediate_column_1250_0, "pg_merge_job_1250.task_000026".intermediate_column_1250_1, "pg_merge_job_1250.task_000026".intermediate_column_1250_2, "pg_merge_job_1250.task_000026".intermediate_column_1250_3, "pg_merge_job_1250.task_000026".intermediate_column_1250_4 FROM (pg_merge_job_1250.task_000026 "pg_merge_job_1250.task_000026" JOIN part_102044 part ON (("pg_merge_job_1250.task_000026".intermediate_column_1250_0 = part.p_partkey))) WHERE (part.p_size > 8)"
DEBUG: pruning merge fetch taskId 1 DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 19 DETAIL: Creating dependency on merge taskId 19
DEBUG: pruning merge fetch taskId 4 DEBUG: pruning merge fetch taskId 4
DETAIL: Creating dependency on merge taskId 26 DETAIL: Creating dependency on merge taskId 26
DEBUG: assigned task 3 to node localhost:57637 DEBUG: assigned task 6 to node localhost:57637
DEBUG: assigned task 6 to node localhost:57638 DEBUG: assigned task 3 to node localhost:57638
DEBUG: join prunable for intervals [1,1000] and [1001,2000] DEBUG: join prunable for intervals [1,1000] and [1001,2000]
DEBUG: join prunable for intervals [1,1000] and [6001,7000] DEBUG: join prunable for intervals [1,1000] and [6001,7000]
DEBUG: join prunable for intervals [1001,2000] and [1,1000] DEBUG: join prunable for intervals [1001,2000] and [1,1000]
@ -90,18 +90,18 @@ DEBUG: join prunable for intervals [6001,7000] and [1001,2000]
DEBUG: generated sql query for job 1252 and task 3 DEBUG: generated sql query for job 1252 and task 3
DETAIL: query string: "SELECT "pg_merge_job_1251.task_000007".intermediate_column_1251_0 AS l_partkey, "pg_merge_job_1251.task_000007".intermediate_column_1251_1 AS o_orderkey, count(*) AS count FROM (pg_merge_job_1251.task_000007 "pg_merge_job_1251.task_000007" JOIN customer_102017 customer ON ((customer.c_custkey = "pg_merge_job_1251.task_000007".intermediate_column_1251_4))) WHERE ((("pg_merge_job_1251.task_000007".intermediate_column_1251_2 > 5.0) OR ("pg_merge_job_1251.task_000007".intermediate_column_1251_3 > 1200.0)) AND (customer.c_acctbal < 5000.0)) GROUP BY "pg_merge_job_1251.task_000007".intermediate_column_1251_0, "pg_merge_job_1251.task_000007".intermediate_column_1251_1 ORDER BY "pg_merge_job_1251.task_000007".intermediate_column_1251_0, "pg_merge_job_1251.task_000007".intermediate_column_1251_1, "pg_merge_job_1251.task_000007".intermediate_column_1251_0, "pg_merge_job_1251.task_000007".intermediate_column_1251_1 LIMIT '30'::bigint" DETAIL: query string: "SELECT "pg_merge_job_1251.task_000007".intermediate_column_1251_0 AS l_partkey, "pg_merge_job_1251.task_000007".intermediate_column_1251_1 AS o_orderkey, count(*) AS count FROM (pg_merge_job_1251.task_000007 "pg_merge_job_1251.task_000007" JOIN customer_102017 customer ON ((customer.c_custkey = "pg_merge_job_1251.task_000007".intermediate_column_1251_4))) WHERE ((("pg_merge_job_1251.task_000007".intermediate_column_1251_2 > 5.0) OR ("pg_merge_job_1251.task_000007".intermediate_column_1251_3 > 1200.0)) AND (customer.c_acctbal < 5000.0)) GROUP BY "pg_merge_job_1251.task_000007".intermediate_column_1251_0, "pg_merge_job_1251.task_000007".intermediate_column_1251_1 ORDER BY "pg_merge_job_1251.task_000007".intermediate_column_1251_0, "pg_merge_job_1251.task_000007".intermediate_column_1251_1, "pg_merge_job_1251.task_000007".intermediate_column_1251_0, "pg_merge_job_1251.task_000007".intermediate_column_1251_1 LIMIT '30'::bigint"
DEBUG: generated sql query for job 1252 and task 6 DEBUG: generated sql query for job 1252 and task 6
DETAIL: query string: "SELECT "pg_merge_job_1251.task_000010".intermediate_column_1251_0 AS l_partkey, "pg_merge_job_1251.task_000010".intermediate_column_1251_1 AS o_orderkey, count(*) AS count FROM (pg_merge_job_1251.task_000010 "pg_merge_job_1251.task_000010" JOIN customer_102038 customer ON ((customer.c_custkey = "pg_merge_job_1251.task_000010".intermediate_column_1251_4))) WHERE ((("pg_merge_job_1251.task_000010".intermediate_column_1251_2 > 5.0) OR ("pg_merge_job_1251.task_000010".intermediate_column_1251_3 > 1200.0)) AND (customer.c_acctbal < 5000.0)) GROUP BY "pg_merge_job_1251.task_000010".intermediate_column_1251_0, "pg_merge_job_1251.task_000010".intermediate_column_1251_1 ORDER BY "pg_merge_job_1251.task_000010".intermediate_column_1251_0, "pg_merge_job_1251.task_000010".intermediate_column_1251_1, "pg_merge_job_1251.task_000010".intermediate_column_1251_0, "pg_merge_job_1251.task_000010".intermediate_column_1251_1 LIMIT '30'::bigint" DETAIL: query string: "SELECT "pg_merge_job_1251.task_000010".intermediate_column_1251_0 AS l_partkey, "pg_merge_job_1251.task_000010".intermediate_column_1251_1 AS o_orderkey, count(*) AS count FROM (pg_merge_job_1251.task_000010 "pg_merge_job_1251.task_000010" JOIN customer_102043 customer ON ((customer.c_custkey = "pg_merge_job_1251.task_000010".intermediate_column_1251_4))) WHERE ((("pg_merge_job_1251.task_000010".intermediate_column_1251_2 > 5.0) OR ("pg_merge_job_1251.task_000010".intermediate_column_1251_3 > 1200.0)) AND (customer.c_acctbal < 5000.0)) GROUP BY "pg_merge_job_1251.task_000010".intermediate_column_1251_0, "pg_merge_job_1251.task_000010".intermediate_column_1251_1 ORDER BY "pg_merge_job_1251.task_000010".intermediate_column_1251_0, "pg_merge_job_1251.task_000010".intermediate_column_1251_1, "pg_merge_job_1251.task_000010".intermediate_column_1251_0, "pg_merge_job_1251.task_000010".intermediate_column_1251_1 LIMIT '30'::bigint"
DEBUG: generated sql query for job 1252 and task 9 DEBUG: generated sql query for job 1252 and task 9
DETAIL: query string: "SELECT "pg_merge_job_1251.task_000013".intermediate_column_1251_0 AS l_partkey, "pg_merge_job_1251.task_000013".intermediate_column_1251_1 AS o_orderkey, count(*) AS count FROM (pg_merge_job_1251.task_000013 "pg_merge_job_1251.task_000013" JOIN customer_102037 customer ON ((customer.c_custkey = "pg_merge_job_1251.task_000013".intermediate_column_1251_4))) WHERE ((("pg_merge_job_1251.task_000013".intermediate_column_1251_2 > 5.0) OR ("pg_merge_job_1251.task_000013".intermediate_column_1251_3 > 1200.0)) AND (customer.c_acctbal < 5000.0)) GROUP BY "pg_merge_job_1251.task_000013".intermediate_column_1251_0, "pg_merge_job_1251.task_000013".intermediate_column_1251_1 ORDER BY "pg_merge_job_1251.task_000013".intermediate_column_1251_0, "pg_merge_job_1251.task_000013".intermediate_column_1251_1, "pg_merge_job_1251.task_000013".intermediate_column_1251_0, "pg_merge_job_1251.task_000013".intermediate_column_1251_1 LIMIT '30'::bigint" DETAIL: query string: "SELECT "pg_merge_job_1251.task_000013".intermediate_column_1251_0 AS l_partkey, "pg_merge_job_1251.task_000013".intermediate_column_1251_1 AS o_orderkey, count(*) AS count FROM (pg_merge_job_1251.task_000013 "pg_merge_job_1251.task_000013" JOIN customer_102042 customer ON ((customer.c_custkey = "pg_merge_job_1251.task_000013".intermediate_column_1251_4))) WHERE ((("pg_merge_job_1251.task_000013".intermediate_column_1251_2 > 5.0) OR ("pg_merge_job_1251.task_000013".intermediate_column_1251_3 > 1200.0)) AND (customer.c_acctbal < 5000.0)) GROUP BY "pg_merge_job_1251.task_000013".intermediate_column_1251_0, "pg_merge_job_1251.task_000013".intermediate_column_1251_1 ORDER BY "pg_merge_job_1251.task_000013".intermediate_column_1251_0, "pg_merge_job_1251.task_000013".intermediate_column_1251_1, "pg_merge_job_1251.task_000013".intermediate_column_1251_0, "pg_merge_job_1251.task_000013".intermediate_column_1251_1 LIMIT '30'::bigint"
DEBUG: pruning merge fetch taskId 1 DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 7 DETAIL: Creating dependency on merge taskId 7
DEBUG: pruning merge fetch taskId 4 DEBUG: pruning merge fetch taskId 4
DETAIL: Creating dependency on merge taskId 10 DETAIL: Creating dependency on merge taskId 10
DEBUG: pruning merge fetch taskId 7 DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 13 DETAIL: Creating dependency on merge taskId 13
DEBUG: assigned task 6 to node localhost:57637
DEBUG: assigned task 3 to node localhost:57638
DEBUG: assigned task 9 to node localhost:57637 DEBUG: assigned task 9 to node localhost:57637
DEBUG: assigned task 3 to node localhost:57638
DEBUG: assigned task 6 to node localhost:57637
DEBUG: completed cleanup query for job 1252 on node "localhost:57638" DEBUG: completed cleanup query for job 1252 on node "localhost:57638"
DEBUG: completed cleanup query for job 1252 on node "localhost:57637" DEBUG: completed cleanup query for job 1252 on node "localhost:57637"
DEBUG: completed cleanup query for job 1251 on node "localhost:57638" DEBUG: completed cleanup query for job 1251 on node "localhost:57638"

View File

@ -74,13 +74,13 @@ DEBUG: join prunable for intervals [6001,7000] and [1,1000]
DEBUG: generated sql query for job 1251 and task 3 DEBUG: generated sql query for job 1251 and task 3
DETAIL: query string: "SELECT "pg_merge_job_1250.task_000019".intermediate_column_1250_0, "pg_merge_job_1250.task_000019".intermediate_column_1250_1, "pg_merge_job_1250.task_000019".intermediate_column_1250_2, "pg_merge_job_1250.task_000019".intermediate_column_1250_3, "pg_merge_job_1250.task_000019".intermediate_column_1250_4 FROM (pg_merge_job_1250.task_000019 "pg_merge_job_1250.task_000019" JOIN part_102019 part ON (("pg_merge_job_1250.task_000019".intermediate_column_1250_0 = part.p_partkey))) WHERE (part.p_size > 8)" DETAIL: query string: "SELECT "pg_merge_job_1250.task_000019".intermediate_column_1250_0, "pg_merge_job_1250.task_000019".intermediate_column_1250_1, "pg_merge_job_1250.task_000019".intermediate_column_1250_2, "pg_merge_job_1250.task_000019".intermediate_column_1250_3, "pg_merge_job_1250.task_000019".intermediate_column_1250_4 FROM (pg_merge_job_1250.task_000019 "pg_merge_job_1250.task_000019" JOIN part_102019 part ON (("pg_merge_job_1250.task_000019".intermediate_column_1250_0 = part.p_partkey))) WHERE (part.p_size > 8)"
DEBUG: generated sql query for job 1251 and task 6 DEBUG: generated sql query for job 1251 and task 6
DETAIL: query string: "SELECT "pg_merge_job_1250.task_000026".intermediate_column_1250_0, "pg_merge_job_1250.task_000026".intermediate_column_1250_1, "pg_merge_job_1250.task_000026".intermediate_column_1250_2, "pg_merge_job_1250.task_000026".intermediate_column_1250_3, "pg_merge_job_1250.task_000026".intermediate_column_1250_4 FROM (pg_merge_job_1250.task_000026 "pg_merge_job_1250.task_000026" JOIN part_102039 part ON (("pg_merge_job_1250.task_000026".intermediate_column_1250_0 = part.p_partkey))) WHERE (part.p_size > 8)" DETAIL: query string: "SELECT "pg_merge_job_1250.task_000026".intermediate_column_1250_0, "pg_merge_job_1250.task_000026".intermediate_column_1250_1, "pg_merge_job_1250.task_000026".intermediate_column_1250_2, "pg_merge_job_1250.task_000026".intermediate_column_1250_3, "pg_merge_job_1250.task_000026".intermediate_column_1250_4 FROM (pg_merge_job_1250.task_000026 "pg_merge_job_1250.task_000026" JOIN part_102044 part ON (("pg_merge_job_1250.task_000026".intermediate_column_1250_0 = part.p_partkey))) WHERE (part.p_size > 8)"
DEBUG: pruning merge fetch taskId 1 DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 19 DETAIL: Creating dependency on merge taskId 19
DEBUG: pruning merge fetch taskId 4 DEBUG: pruning merge fetch taskId 4
DETAIL: Creating dependency on merge taskId 26 DETAIL: Creating dependency on merge taskId 26
DEBUG: assigned task 3 to node localhost:57637 DEBUG: assigned task 6 to node localhost:57637
DEBUG: assigned task 6 to node localhost:57638 DEBUG: assigned task 3 to node localhost:57638
DEBUG: join prunable for intervals [1,1000] and [1001,2000] DEBUG: join prunable for intervals [1,1000] and [1001,2000]
DEBUG: join prunable for intervals [1,1000] and [6001,7000] DEBUG: join prunable for intervals [1,1000] and [6001,7000]
DEBUG: join prunable for intervals [1001,2000] and [1,1000] DEBUG: join prunable for intervals [1001,2000] and [1,1000]
@ -90,18 +90,18 @@ DEBUG: join prunable for intervals [6001,7000] and [1001,2000]
DEBUG: generated sql query for job 1252 and task 3 DEBUG: generated sql query for job 1252 and task 3
DETAIL: query string: "SELECT "pg_merge_job_1251.task_000007".intermediate_column_1251_0 AS l_partkey, "pg_merge_job_1251.task_000007".intermediate_column_1251_1 AS o_orderkey, count(*) AS count FROM (pg_merge_job_1251.task_000007 "pg_merge_job_1251.task_000007" JOIN customer_102017 customer ON ((customer.c_custkey = "pg_merge_job_1251.task_000007".intermediate_column_1251_4))) WHERE ((("pg_merge_job_1251.task_000007".intermediate_column_1251_2 > 5.0) OR ("pg_merge_job_1251.task_000007".intermediate_column_1251_3 > 1200.0)) AND (customer.c_acctbal < 5000.0)) GROUP BY "pg_merge_job_1251.task_000007".intermediate_column_1251_0, "pg_merge_job_1251.task_000007".intermediate_column_1251_1 ORDER BY "pg_merge_job_1251.task_000007".intermediate_column_1251_0, "pg_merge_job_1251.task_000007".intermediate_column_1251_1, "pg_merge_job_1251.task_000007".intermediate_column_1251_0, "pg_merge_job_1251.task_000007".intermediate_column_1251_1 LIMIT 30::bigint" DETAIL: query string: "SELECT "pg_merge_job_1251.task_000007".intermediate_column_1251_0 AS l_partkey, "pg_merge_job_1251.task_000007".intermediate_column_1251_1 AS o_orderkey, count(*) AS count FROM (pg_merge_job_1251.task_000007 "pg_merge_job_1251.task_000007" JOIN customer_102017 customer ON ((customer.c_custkey = "pg_merge_job_1251.task_000007".intermediate_column_1251_4))) WHERE ((("pg_merge_job_1251.task_000007".intermediate_column_1251_2 > 5.0) OR ("pg_merge_job_1251.task_000007".intermediate_column_1251_3 > 1200.0)) AND (customer.c_acctbal < 5000.0)) GROUP BY "pg_merge_job_1251.task_000007".intermediate_column_1251_0, "pg_merge_job_1251.task_000007".intermediate_column_1251_1 ORDER BY "pg_merge_job_1251.task_000007".intermediate_column_1251_0, "pg_merge_job_1251.task_000007".intermediate_column_1251_1, "pg_merge_job_1251.task_000007".intermediate_column_1251_0, "pg_merge_job_1251.task_000007".intermediate_column_1251_1 LIMIT 30::bigint"
DEBUG: generated sql query for job 1252 and task 6 DEBUG: generated sql query for job 1252 and task 6
DETAIL: query string: "SELECT "pg_merge_job_1251.task_000010".intermediate_column_1251_0 AS l_partkey, "pg_merge_job_1251.task_000010".intermediate_column_1251_1 AS o_orderkey, count(*) AS count FROM (pg_merge_job_1251.task_000010 "pg_merge_job_1251.task_000010" JOIN customer_102038 customer ON ((customer.c_custkey = "pg_merge_job_1251.task_000010".intermediate_column_1251_4))) WHERE ((("pg_merge_job_1251.task_000010".intermediate_column_1251_2 > 5.0) OR ("pg_merge_job_1251.task_000010".intermediate_column_1251_3 > 1200.0)) AND (customer.c_acctbal < 5000.0)) GROUP BY "pg_merge_job_1251.task_000010".intermediate_column_1251_0, "pg_merge_job_1251.task_000010".intermediate_column_1251_1 ORDER BY "pg_merge_job_1251.task_000010".intermediate_column_1251_0, "pg_merge_job_1251.task_000010".intermediate_column_1251_1, "pg_merge_job_1251.task_000010".intermediate_column_1251_0, "pg_merge_job_1251.task_000010".intermediate_column_1251_1 LIMIT 30::bigint" DETAIL: query string: "SELECT "pg_merge_job_1251.task_000010".intermediate_column_1251_0 AS l_partkey, "pg_merge_job_1251.task_000010".intermediate_column_1251_1 AS o_orderkey, count(*) AS count FROM (pg_merge_job_1251.task_000010 "pg_merge_job_1251.task_000010" JOIN customer_102043 customer ON ((customer.c_custkey = "pg_merge_job_1251.task_000010".intermediate_column_1251_4))) WHERE ((("pg_merge_job_1251.task_000010".intermediate_column_1251_2 > 5.0) OR ("pg_merge_job_1251.task_000010".intermediate_column_1251_3 > 1200.0)) AND (customer.c_acctbal < 5000.0)) GROUP BY "pg_merge_job_1251.task_000010".intermediate_column_1251_0, "pg_merge_job_1251.task_000010".intermediate_column_1251_1 ORDER BY "pg_merge_job_1251.task_000010".intermediate_column_1251_0, "pg_merge_job_1251.task_000010".intermediate_column_1251_1, "pg_merge_job_1251.task_000010".intermediate_column_1251_0, "pg_merge_job_1251.task_000010".intermediate_column_1251_1 LIMIT 30::bigint"
DEBUG: generated sql query for job 1252 and task 9 DEBUG: generated sql query for job 1252 and task 9
DETAIL: query string: "SELECT "pg_merge_job_1251.task_000013".intermediate_column_1251_0 AS l_partkey, "pg_merge_job_1251.task_000013".intermediate_column_1251_1 AS o_orderkey, count(*) AS count FROM (pg_merge_job_1251.task_000013 "pg_merge_job_1251.task_000013" JOIN customer_102037 customer ON ((customer.c_custkey = "pg_merge_job_1251.task_000013".intermediate_column_1251_4))) WHERE ((("pg_merge_job_1251.task_000013".intermediate_column_1251_2 > 5.0) OR ("pg_merge_job_1251.task_000013".intermediate_column_1251_3 > 1200.0)) AND (customer.c_acctbal < 5000.0)) GROUP BY "pg_merge_job_1251.task_000013".intermediate_column_1251_0, "pg_merge_job_1251.task_000013".intermediate_column_1251_1 ORDER BY "pg_merge_job_1251.task_000013".intermediate_column_1251_0, "pg_merge_job_1251.task_000013".intermediate_column_1251_1, "pg_merge_job_1251.task_000013".intermediate_column_1251_0, "pg_merge_job_1251.task_000013".intermediate_column_1251_1 LIMIT 30::bigint" DETAIL: query string: "SELECT "pg_merge_job_1251.task_000013".intermediate_column_1251_0 AS l_partkey, "pg_merge_job_1251.task_000013".intermediate_column_1251_1 AS o_orderkey, count(*) AS count FROM (pg_merge_job_1251.task_000013 "pg_merge_job_1251.task_000013" JOIN customer_102042 customer ON ((customer.c_custkey = "pg_merge_job_1251.task_000013".intermediate_column_1251_4))) WHERE ((("pg_merge_job_1251.task_000013".intermediate_column_1251_2 > 5.0) OR ("pg_merge_job_1251.task_000013".intermediate_column_1251_3 > 1200.0)) AND (customer.c_acctbal < 5000.0)) GROUP BY "pg_merge_job_1251.task_000013".intermediate_column_1251_0, "pg_merge_job_1251.task_000013".intermediate_column_1251_1 ORDER BY "pg_merge_job_1251.task_000013".intermediate_column_1251_0, "pg_merge_job_1251.task_000013".intermediate_column_1251_1, "pg_merge_job_1251.task_000013".intermediate_column_1251_0, "pg_merge_job_1251.task_000013".intermediate_column_1251_1 LIMIT 30::bigint"
DEBUG: pruning merge fetch taskId 1 DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 7 DETAIL: Creating dependency on merge taskId 7
DEBUG: pruning merge fetch taskId 4 DEBUG: pruning merge fetch taskId 4
DETAIL: Creating dependency on merge taskId 10 DETAIL: Creating dependency on merge taskId 10
DEBUG: pruning merge fetch taskId 7 DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 13 DETAIL: Creating dependency on merge taskId 13
DEBUG: assigned task 6 to node localhost:57637
DEBUG: assigned task 3 to node localhost:57638
DEBUG: assigned task 9 to node localhost:57637 DEBUG: assigned task 9 to node localhost:57637
DEBUG: assigned task 3 to node localhost:57638
DEBUG: assigned task 6 to node localhost:57637
DEBUG: completed cleanup query for job 1252 on node "localhost:57638" DEBUG: completed cleanup query for job 1252 on node "localhost:57638"
DEBUG: completed cleanup query for job 1252 on node "localhost:57637" DEBUG: completed cleanup query for job 1252 on node "localhost:57637"
DEBUG: completed cleanup query for job 1251 on node "localhost:57638" DEBUG: completed cleanup query for job 1251 on node "localhost:57638"

View File

@ -57,8 +57,8 @@ WHERE
o_custkey = c_custkey AND o_custkey = c_custkey AND
c_custkey < 0; c_custkey < 0;
DEBUG: predicate pruning for shardId 102017 DEBUG: predicate pruning for shardId 102017
DEBUG: predicate pruning for shardId 102038 DEBUG: predicate pruning for shardId 102043
DEBUG: predicate pruning for shardId 102037 DEBUG: predicate pruning for shardId 102042
count count
------- -------

View File

@ -39,9 +39,9 @@ DEBUG: pruning merge fetch taskId 4
DETAIL: Creating dependency on merge taskId 8 DETAIL: Creating dependency on merge taskId 8
DEBUG: pruning merge fetch taskId 7 DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 11 DETAIL: Creating dependency on merge taskId 11
DEBUG: assigned task 6 to node localhost:57637
DEBUG: assigned task 3 to node localhost:57638
DEBUG: assigned task 9 to node localhost:57637 DEBUG: assigned task 9 to node localhost:57637
DEBUG: assigned task 3 to node localhost:57638
DEBUG: assigned task 6 to node localhost:57637
DEBUG: CommitTransactionCommand DEBUG: CommitTransactionCommand
count count
------- -------
@ -64,12 +64,12 @@ WHERE
o_custkey = c_custkey AND o_custkey = c_custkey AND
o_orderkey = l_orderkey; o_orderkey = l_orderkey;
DEBUG: StartTransactionCommand DEBUG: StartTransactionCommand
DEBUG: assigned task 9 to node localhost:57637
DEBUG: assigned task 3 to node localhost:57638
DEBUG: assigned task 12 to node localhost:57637
DEBUG: assigned task 6 to node localhost:57638
DEBUG: assigned task 15 to node localhost:57637 DEBUG: assigned task 15 to node localhost:57637
DEBUG: assigned task 18 to node localhost:57638 DEBUG: assigned task 3 to node localhost:57638
DEBUG: assigned task 18 to node localhost:57637
DEBUG: assigned task 6 to node localhost:57638
DEBUG: assigned task 9 to node localhost:57637
DEBUG: assigned task 12 to node localhost:57638
DEBUG: join prunable for intervals [1,2496] and [2497,4964] DEBUG: join prunable for intervals [1,2496] and [2497,4964]
DEBUG: join prunable for intervals [1,2496] and [4965,5986] DEBUG: join prunable for intervals [1,2496] and [4965,5986]
DEBUG: join prunable for intervals [1,2496] and [8997,11554] DEBUG: join prunable for intervals [1,2496] and [8997,11554]
@ -160,9 +160,9 @@ DEBUG: assigned task 8 to node localhost:57637
DEBUG: assigned task 6 to node localhost:57638 DEBUG: assigned task 6 to node localhost:57638
DEBUG: assigned task 12 to node localhost:57637 DEBUG: assigned task 12 to node localhost:57637
DEBUG: assigned task 10 to node localhost:57638 DEBUG: assigned task 10 to node localhost:57638
DEBUG: assigned task 4 to node localhost:57637
DEBUG: assigned task 2 to node localhost:57638
DEBUG: assigned task 6 to node localhost:57637 DEBUG: assigned task 6 to node localhost:57637
DEBUG: assigned task 2 to node localhost:57638
DEBUG: assigned task 4 to node localhost:57637
DEBUG: join prunable for task partitionId 0 and 1 DEBUG: join prunable for task partitionId 0 and 1
DEBUG: join prunable for task partitionId 0 and 2 DEBUG: join prunable for task partitionId 0 and 2
DEBUG: join prunable for task partitionId 0 and 3 DEBUG: join prunable for task partitionId 0 and 3

View File

@ -93,7 +93,7 @@ INSERT INTO append_partitioned VALUES (414123, 'AAPL', 9580, '2004-10-19 10:23:5
SET client_min_messages TO 'DEBUG2'; SET client_min_messages TO 'DEBUG2';
SET citus.task_executor_type TO 'real-time'; SET citus.task_executor_type TO 'real-time';
SELECT * FROM range_partitioned WHERE id = 32743; SELECT * FROM range_partitioned WHERE id = 32743;
DEBUG: predicate pruning for shardId 103070 DEBUG: predicate pruning for shardId 103084
DEBUG: Plan is router executable DEBUG: Plan is router executable
id | symbol | bidder_id | placed_at | kind | limit_price id | symbol | bidder_id | placed_at | kind | limit_price
-------+--------+-----------+--------------------------+------+------------- -------+--------+-----------+--------------------------+------+-------------
@ -101,7 +101,7 @@ DEBUG: Plan is router executable
(1 row) (1 row)
SELECT * FROM append_partitioned WHERE id = 414123; SELECT * FROM append_partitioned WHERE id = 414123;
DEBUG: predicate pruning for shardId 103072 DEBUG: predicate pruning for shardId 103086
DEBUG: Plan is router executable DEBUG: Plan is router executable
id | symbol | bidder_id | placed_at | kind | limit_price id | symbol | bidder_id | placed_at | kind | limit_price
--------+--------+-----------+--------------------------+------+------------- --------+--------+-----------+--------------------------+------+-------------
@ -275,7 +275,7 @@ WHERE nodename = 'localhost' AND
-- Fourth: Perform the same INSERT (primary key violation) -- Fourth: Perform the same INSERT (primary key violation)
INSERT INTO limit_orders VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67); INSERT INTO limit_orders VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67);
WARNING: Bad result from localhost:57638 WARNING: Bad result from localhost:57638
DETAIL: Remote message: duplicate key value violates unique constraint "limit_orders_pkey_103067" DETAIL: Remote message: duplicate key value violates unique constraint "limit_orders_pkey_103081"
-- Last: Verify the insert worked but the placement with the PK violation is now unhealthy -- Last: Verify the insert worked but the placement with the PK violation is now unhealthy
SELECT count(*) FROM limit_orders WHERE id = 275; SELECT count(*) FROM limit_orders WHERE id = 275;
count count

View File

@ -28,44 +28,47 @@ CREATE FUNCTION print_sorted_shard_intervals(regclass)
-- =================================================================== -- ===================================================================
-- test shard pruning functionality -- test shard pruning functionality
-- =================================================================== -- ===================================================================
-- create distributed table metadata to observe shard pruning -- create distributed table observe shard pruning
CREATE TABLE pruning ( species text, last_pruned date, plant_id integer ); CREATE TABLE pruning ( species text, last_pruned date, plant_id integer );
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey) SELECT master_create_distributed_table('pruning', 'species', 'hash');
VALUES master_create_distributed_table
('pruning'::regclass, 'h', column_name_to_column('pruning'::regclass, 'species')); ---------------------------------
INSERT INTO pg_dist_shard
(shardid, logicalrelid, shardstorage, shardminvalue, shardmaxvalue) (1 row)
VALUES
(10, 'pruning'::regclass, 't', '-2147483648', '-1073741826'), -- create worker shards
(11, 'pruning'::regclass, 't', '-1073741825', '-3'), SELECT master_create_worker_shards('pruning', 4, 1);
(12, 'pruning'::regclass, 't', '-2', '1073741820'), master_create_worker_shards
(13, 'pruning'::regclass, 't', '1073741821', '2147483647'); -----------------------------
(1 row)
-- with no values, expect all shards -- with no values, expect all shards
SELECT prune_using_no_values('pruning'); SELECT prune_using_no_values('pruning');
prune_using_no_values prune_using_no_values
----------------------- -------------------------------
{10,11,12,13} {103070,103071,103072,103073}
(1 row) (1 row)
-- with a single value, expect a single shard -- with a single value, expect a single shard
SELECT prune_using_single_value('pruning', 'tomato'); SELECT prune_using_single_value('pruning', 'tomato');
prune_using_single_value prune_using_single_value
-------------------------- --------------------------
{12} {103072}
(1 row) (1 row)
-- the above is true even if that value is null -- the above is true even if that value is null
SELECT prune_using_single_value('pruning', NULL); SELECT prune_using_single_value('pruning', NULL);
prune_using_single_value prune_using_single_value
-------------------------- --------------------------
{12} {103072}
(1 row) (1 row)
-- build an OR clause and expect more than one sahrd -- build an OR clause and expect more than one sahrd
SELECT prune_using_either_value('pruning', 'tomato', 'petunia'); SELECT prune_using_either_value('pruning', 'tomato', 'petunia');
prune_using_either_value prune_using_either_value
-------------------------- --------------------------
{11,12} {103071,103072}
(1 row) (1 row)
-- an AND clause with incompatible values returns no shards -- an AND clause with incompatible values returns no shards
@ -79,7 +82,7 @@ SELECT prune_using_both_values('pruning', 'tomato', 'petunia');
SELECT prune_using_both_values('pruning', 'tomato', 'rose'); SELECT prune_using_both_values('pruning', 'tomato', 'rose');
prune_using_both_values prune_using_both_values
------------------------- -------------------------
{12} {103072}
(1 row) (1 row)
-- unit test of the equality expression generation code -- unit test of the equality expression generation code
@ -92,85 +95,116 @@ SELECT debug_equality_expression('pruning');
-- print the initial ordering of shard intervals -- print the initial ordering of shard intervals
SELECT print_sorted_shard_intervals('pruning'); SELECT print_sorted_shard_intervals('pruning');
print_sorted_shard_intervals print_sorted_shard_intervals
------------------------------ -------------------------------
{10,11,12,13} {103070,103071,103072,103073}
(1 row) (1 row)
-- update only min value for one shard -- update only min value for one shard
UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 11; UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 103071;
SELECT print_sorted_shard_intervals('pruning'); SELECT print_sorted_shard_intervals('pruning');
print_sorted_shard_intervals print_sorted_shard_intervals
------------------------------ -------------------------------
{10,12,13,11} {103070,103072,103073,103071}
(1 row) (1 row)
-- now lets have one more shard without min/max values -- now lets have one more shard without min/max values
UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 12; UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 103072;
SELECT print_sorted_shard_intervals('pruning'); SELECT print_sorted_shard_intervals('pruning');
print_sorted_shard_intervals print_sorted_shard_intervals
------------------------------ -------------------------------
{10,13,11,12} {103070,103073,103071,103072}
(1 row) (1 row)
-- now lets have one more shard without min/max values -- now lets have one more shard without min/max values
UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 10; UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 103070;
SELECT print_sorted_shard_intervals('pruning'); SELECT print_sorted_shard_intervals('pruning');
print_sorted_shard_intervals print_sorted_shard_intervals
------------------------------ -------------------------------
{13,10,11,12} {103073,103070,103071,103072}
(1 row) (1 row)
-- all shard placements are uninitialized -- all shard placements are uninitialized
UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 13; UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 103073;
SELECT print_sorted_shard_intervals('pruning'); SELECT print_sorted_shard_intervals('pruning');
print_sorted_shard_intervals print_sorted_shard_intervals
------------------------------ -------------------------------
{10,11,12,13} {103070,103071,103072,103073}
(1 row)
-- create range distributed table observe shard pruning
CREATE TABLE pruning_range ( species text, last_pruned date, plant_id integer );
SELECT master_create_distributed_table('pruning_range', 'species', 'range');
master_create_distributed_table
---------------------------------
(1 row)
-- create worker shards
SELECT master_create_empty_shard('pruning_range');
master_create_empty_shard
---------------------------
103074
(1 row)
SELECT master_create_empty_shard('pruning_range');
master_create_empty_shard
---------------------------
103075
(1 row)
SELECT master_create_empty_shard('pruning_range');
master_create_empty_shard
---------------------------
103076
(1 row)
SELECT master_create_empty_shard('pruning_range');
master_create_empty_shard
---------------------------
103077
(1 row) (1 row)
-- now update the metadata so that the table is a range distributed table
UPDATE pg_dist_partition SET partmethod = 'r' WHERE logicalrelid = 'pruning'::regclass;
-- now the comparison is done via the partition column type, which is text -- now the comparison is done via the partition column type, which is text
UPDATE pg_dist_shard SET shardminvalue = 'a', shardmaxvalue = 'b' WHERE shardid = 10; UPDATE pg_dist_shard SET shardminvalue = 'a', shardmaxvalue = 'b' WHERE shardid = 103074;
UPDATE pg_dist_shard SET shardminvalue = 'c', shardmaxvalue = 'd' WHERE shardid = 11; UPDATE pg_dist_shard SET shardminvalue = 'c', shardmaxvalue = 'd' WHERE shardid = 103075;
UPDATE pg_dist_shard SET shardminvalue = 'e', shardmaxvalue = 'f' WHERE shardid = 12; UPDATE pg_dist_shard SET shardminvalue = 'e', shardmaxvalue = 'f' WHERE shardid = 103076;
UPDATE pg_dist_shard SET shardminvalue = 'g', shardmaxvalue = 'h' WHERE shardid = 13; UPDATE pg_dist_shard SET shardminvalue = 'g', shardmaxvalue = 'h' WHERE shardid = 103077;
-- print the ordering of shard intervals with range partitioning as well -- print the ordering of shard intervals with range partitioning as well
SELECT print_sorted_shard_intervals('pruning'); SELECT print_sorted_shard_intervals('pruning_range');
print_sorted_shard_intervals print_sorted_shard_intervals
------------------------------ -------------------------------
{10,11,12,13} {103074,103075,103076,103077}
(1 row) (1 row)
-- update only min value for one shard -- update only min value for one shard
UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 11; UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 103075;
SELECT print_sorted_shard_intervals('pruning'); SELECT print_sorted_shard_intervals('pruning_range');
print_sorted_shard_intervals print_sorted_shard_intervals
------------------------------ -------------------------------
{10,12,13,11} {103074,103076,103077,103075}
(1 row) (1 row)
-- now lets have one more shard without min/max values -- now lets have one more shard without min/max values
UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 12; UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 103076;
SELECT print_sorted_shard_intervals('pruning'); SELECT print_sorted_shard_intervals('pruning_range');
print_sorted_shard_intervals print_sorted_shard_intervals
------------------------------ -------------------------------
{10,13,11,12} {103074,103077,103075,103076}
(1 row) (1 row)
-- now lets have one more shard without min/max values -- now lets have one more shard without min/max values
UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 10; UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 103074;
SELECT print_sorted_shard_intervals('pruning'); SELECT print_sorted_shard_intervals('pruning_range');
print_sorted_shard_intervals print_sorted_shard_intervals
------------------------------ -------------------------------
{13,10,11,12} {103077,103074,103075,103076}
(1 row) (1 row)
-- all shard placements are uninitialized -- all shard placements are uninitialized
UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 13; UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 103077;
SELECT print_sorted_shard_intervals('pruning'); SELECT print_sorted_shard_intervals('pruning_range');
print_sorted_shard_intervals print_sorted_shard_intervals
------------------------------ -------------------------------
{10,11,12,13} {103074,103075,103076,103077}
(1 row) (1 row)

View File

@ -323,7 +323,7 @@ SELECT *
FROM articles FROM articles
WHERE author_id = 1; WHERE author_id = 1;
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103094 DEBUG: predicate pruning for shardId 103108
DEBUG: Plan is router executable DEBUG: Plan is router executable
id | author_id | title | word_count id | author_id | title | word_count
----+-----------+--------------+------------ ----+-----------+--------------+------------
@ -338,7 +338,7 @@ DEBUG: Plan is router executable
SELECT * SELECT *
FROM articles FROM articles
WHERE author_id = 1 OR author_id = 17; WHERE author_id = 1 OR author_id = 17;
DEBUG: predicate pruning for shardId 103094 DEBUG: predicate pruning for shardId 103108
DEBUG: Plan is router executable DEBUG: Plan is router executable
id | author_id | title | word_count id | author_id | title | word_count
----+-----------+--------------+------------ ----+-----------+--------------+------------
@ -367,7 +367,7 @@ SELECT id as article_id, word_count * id as random_value
FROM articles FROM articles
WHERE author_id = 1; WHERE author_id = 1;
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103094 DEBUG: predicate pruning for shardId 103108
DEBUG: Plan is router executable DEBUG: Plan is router executable
article_id | random_value article_id | random_value
------------+-------------- ------------+--------------
@ -385,7 +385,7 @@ SELECT a.author_id as first_author, b.word_count as second_word_count
WHERE a.author_id = 10 and a.author_id = b.author_id WHERE a.author_id = 10 and a.author_id = b.author_id
LIMIT 3; LIMIT 3;
DEBUG: push down of limit count: 3 DEBUG: push down of limit count: 3
DEBUG: predicate pruning for shardId 103094 DEBUG: predicate pruning for shardId 103108
DEBUG: join prunable for intervals [-2147483648,-1] and [0,2147483647] DEBUG: join prunable for intervals [-2147483648,-1] and [0,2147483647]
DEBUG: Plan is router executable DEBUG: Plan is router executable
first_author | second_word_count first_author | second_word_count
@ -402,7 +402,7 @@ SELECT a.author_id as first_author, b.word_count as second_word_count
WHERE a.author_id = 10 and a.author_id = b.author_id WHERE a.author_id = 10 and a.author_id = b.author_id
LIMIT 3; LIMIT 3;
DEBUG: push down of limit count: 3 DEBUG: push down of limit count: 3
DEBUG: predicate pruning for shardId 103094 DEBUG: predicate pruning for shardId 103108
first_author | second_word_count first_author | second_word_count
--------------+------------------- --------------+-------------------
10 | 19519 10 | 19519
@ -416,7 +416,7 @@ SELECT *
WHERE author_id = 1 WHERE author_id = 1
LIMIT 2; LIMIT 2;
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103094 DEBUG: predicate pruning for shardId 103108
DEBUG: Plan is router executable DEBUG: Plan is router executable
id | author_id | title | word_count id | author_id | title | word_count
----+-----------+----------+------------ ----+-----------+----------+------------
@ -432,7 +432,7 @@ SELECT id
WHERE author_id = 1 WHERE author_id = 1
GROUP BY id; GROUP BY id;
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103094 DEBUG: predicate pruning for shardId 103108
DEBUG: Plan is router executable DEBUG: Plan is router executable
id id
---- ----
@ -452,7 +452,7 @@ SELECT avg(word_count)
FROM articles FROM articles
WHERE author_id = 2; WHERE author_id = 2;
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103093 DEBUG: predicate pruning for shardId 103107
DEBUG: Plan is router executable DEBUG: Plan is router executable
avg avg
-------------------- --------------------
@ -466,7 +466,7 @@ SELECT max(word_count) as max, min(word_count) as min,
FROM articles FROM articles
WHERE author_id = 2; WHERE author_id = 2;
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103093 DEBUG: predicate pruning for shardId 103107
DEBUG: Plan is router executable DEBUG: Plan is router executable
max | min | sum | cnt max | min | sum | cnt
-------+------+-------+----- -------+------+-------+-----
@ -477,7 +477,7 @@ DEBUG: Plan is router executable
SELECT * SELECT *
FROM articles a, articles b FROM articles a, articles b
WHERE a.id = b.id AND a.author_id = 1; WHERE a.id = b.id AND a.author_id = 1;
DEBUG: predicate pruning for shardId 103094 DEBUG: predicate pruning for shardId 103108
DEBUG: join prunable for task partitionId 0 and 1 DEBUG: join prunable for task partitionId 0 and 1
DEBUG: join prunable for task partitionId 0 and 2 DEBUG: join prunable for task partitionId 0 and 2
DEBUG: join prunable for task partitionId 0 and 3 DEBUG: join prunable for task partitionId 0 and 3

View File

@ -16,6 +16,14 @@ CREATE TABLE multi_append_table_to_shard_left
); );
SELECT master_create_distributed_table('multi_append_table_to_shard_left', 'left_number', 'append'); SELECT master_create_distributed_table('multi_append_table_to_shard_left', 'left_number', 'append');
CREATE TABLE multi_append_table_to_shard_right_hash
(
right_number INTEGER not null,
right_text TEXT not null
);
SELECT master_create_distributed_table('multi_append_table_to_shard_right_hash', 'right_number', 'hash');
SELECT master_create_worker_shards('multi_append_table_to_shard_right_hash', 1, 1);
-- Replicate 'left' table on both workers -- Replicate 'left' table on both workers
SELECT set_config('citus.shard_replication_factor', '2', false); SELECT set_config('citus.shard_replication_factor', '2', false);
\STAGE multi_append_table_to_shard_left FROM '@abs_srcdir@/data/agg.data' \STAGE multi_append_table_to_shard_left FROM '@abs_srcdir@/data/agg.data'
@ -70,19 +78,12 @@ FROM multi_append_table_to_shard_left,
WHERE left_number = right_number; WHERE left_number = right_number;
-- Check that we error out if we try to append data to a hash partitioned table. -- Check that we error out if we try to append data to a hash partitioned table.
SELECT master_create_empty_shard('multi_append_table_to_shard_right_hash');
UPDATE pg_dist_partition SET partmethod = 'h' WHERE
logicalrelid = 'multi_append_table_to_shard_right'::regclass::oid;
SELECT master_create_empty_shard('multi_append_table_to_shard_right');
SELECT master_append_table_to_shard(shardid, 'multi_append_table_to_shard_stage', 'localhost', 57636) SELECT master_append_table_to_shard(shardid, 'multi_append_table_to_shard_stage', 'localhost', 57636)
FROM FROM
pg_dist_shard pg_dist_shard
WHERE 'multi_append_table_to_shard_right'::regclass::oid = logicalrelid; WHERE 'multi_append_table_to_shard_right_hash'::regclass::oid = logicalrelid;
UPDATE pg_dist_partition SET partmethod = 'a' WHERE
logicalrelid = 'multi_append_table_to_shard_right'::regclass::oid;
-- Clean up after test -- Clean up after test
SELECT master_apply_delete_command('DELETE FROM multi_append_table_to_shard_right'); SELECT master_apply_delete_command('DELETE FROM multi_append_table_to_shard_right');

View File

@ -24,6 +24,23 @@ SELECT master_create_distributed_table('multi_append_table_to_shard_left', 'left
(1 row) (1 row)
CREATE TABLE multi_append_table_to_shard_right_hash
(
right_number INTEGER not null,
right_text TEXT not null
);
SELECT master_create_distributed_table('multi_append_table_to_shard_right_hash', 'right_number', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('multi_append_table_to_shard_right_hash', 1, 1);
master_create_worker_shards
-----------------------------
(1 row)
-- Replicate 'left' table on both workers -- Replicate 'left' table on both workers
SELECT set_config('citus.shard_replication_factor', '2', false); SELECT set_config('citus.shard_replication_factor', '2', false);
set_config set_config
@ -107,19 +124,15 @@ WHERE left_number = right_number;
(1 row) (1 row)
-- Check that we error out if we try to append data to a hash partitioned table. -- Check that we error out if we try to append data to a hash partitioned table.
UPDATE pg_dist_partition SET partmethod = 'h' WHERE SELECT master_create_empty_shard('multi_append_table_to_shard_right_hash');
logicalrelid = 'multi_append_table_to_shard_right'::regclass::oid; ERROR: relation "multi_append_table_to_shard_right_hash" is a hash partitioned table
SELECT master_create_empty_shard('multi_append_table_to_shard_right');
ERROR: relation "multi_append_table_to_shard_right" is a hash partitioned table
DETAIL: We currently don't support creating shards on hash-partitioned tables DETAIL: We currently don't support creating shards on hash-partitioned tables
SELECT master_append_table_to_shard(shardid, 'multi_append_table_to_shard_stage', 'localhost', 57636) SELECT master_append_table_to_shard(shardid, 'multi_append_table_to_shard_stage', 'localhost', 57636)
FROM FROM
pg_dist_shard pg_dist_shard
WHERE 'multi_append_table_to_shard_right'::regclass::oid = logicalrelid; WHERE 'multi_append_table_to_shard_right_hash'::regclass::oid = logicalrelid;
ERROR: cannot append to shardId 103013 ERROR: cannot append to shardId 103011
DETAIL: We currently don't support appending to shards in hash-partitioned tables DETAIL: We currently don't support appending to shards in hash-partitioned tables
UPDATE pg_dist_partition SET partmethod = 'a' WHERE
logicalrelid = 'multi_append_table_to_shard_right'::regclass::oid;
-- Clean up after test -- Clean up after test
SELECT master_apply_delete_command('DELETE FROM multi_append_table_to_shard_right'); SELECT master_apply_delete_command('DELETE FROM multi_append_table_to_shard_right');
master_apply_delete_command master_apply_delete_command

View File

@ -301,16 +301,6 @@ FROM
LOG: join order: [ "multi_outer_join_left" ][ local partition join "multi_outer_join_right" ][ broadcast join "multi_outer_join_third" ] LOG: join order: [ "multi_outer_join_left" ][ local partition join "multi_outer_join_right" ][ broadcast join "multi_outer_join_third" ]
l_custkey | r_custkey | t_custkey l_custkey | r_custkey | t_custkey
-----------+-----------+----------- -----------+-----------+-----------
21 | 21 | 21
22 | 22 | 22
23 | 23 | 23
24 | 24 | 24
25 | 25 | 25
26 | 26 | 26
27 | 27 | 27
28 | 28 | 28
29 | 29 | 29
30 | 30 | 30
1 | | 1 | |
2 | | 2 | |
3 | | 3 | |
@ -326,6 +316,16 @@ LOG: join order: [ "multi_outer_join_left" ][ local partition join "multi_outer
13 | 13 | 13 13 | 13 | 13
14 | 14 | 14 14 | 14 | 14
15 | 15 | 15 15 | 15 | 15
21 | 21 | 21
22 | 22 | 22
23 | 23 | 23
24 | 24 | 24
25 | 25 | 25
26 | 26 | 26
27 | 27 | 27
28 | 28 | 28
29 | 29 | 29
30 | 30 | 30
(25 rows) (25 rows)
-- Right join with single shard right most table should error out -- Right join with single shard right most table should error out
@ -347,16 +347,6 @@ FROM
LOG: join order: [ "multi_outer_join_right" ][ broadcast join "multi_outer_join_third" ][ local partition join "multi_outer_join_left" ] LOG: join order: [ "multi_outer_join_right" ][ broadcast join "multi_outer_join_third" ][ local partition join "multi_outer_join_left" ]
t_custkey | r_custkey | l_custkey t_custkey | r_custkey | l_custkey
-----------+-----------+----------- -----------+-----------+-----------
21 | 21 | 21
22 | 22 | 22
23 | 23 | 23
24 | 24 | 24
25 | 25 | 25
26 | 26 | 26
27 | 27 | 27
28 | 28 | 28
29 | 29 | 29
30 | 30 | 30
11 | 11 | 11 11 | 11 | 11
12 | 12 | 12 12 | 12 | 12
13 | 13 | 13 13 | 13 | 13
@ -367,6 +357,16 @@ LOG: join order: [ "multi_outer_join_right" ][ broadcast join "multi_outer_join
18 | 18 | 18 | 18 |
19 | 19 | 19 | 19 |
20 | 20 | 20 | 20 |
21 | 21 | 21
22 | 22 | 22
23 | 23 | 23
24 | 24 | 24
25 | 25 | 25
26 | 26 | 26
27 | 27 | 27
28 | 28 | 28
29 | 29 | 29
30 | 30 | 30
(20 rows) (20 rows)
-- Make it anti-join, should display values with l_custkey is null -- Make it anti-join, should display values with l_custkey is null
@ -406,16 +406,6 @@ FROM
LOG: join order: [ "multi_outer_join_left" ][ local partition join "multi_outer_join_right" ] LOG: join order: [ "multi_outer_join_left" ][ local partition join "multi_outer_join_right" ]
l_custkey | r_custkey l_custkey | r_custkey
-----------+----------- -----------+-----------
21 | 21
22 | 22
23 | 23
24 | 24
25 | 25
26 | 26
27 | 27
28 | 28
29 | 29
30 | 30
1 | 1 |
2 | 2 |
3 | 3 |
@ -436,6 +426,16 @@ LOG: join order: [ "multi_outer_join_left" ][ local partition join "multi_outer
| 18 | 18
| 19 | 19
| 16 | 16
21 | 21
22 | 22
23 | 23
24 | 24
25 | 25
26 | 26
27 | 27
28 | 28
29 | 29
30 | 30
(30 rows) (30 rows)
-- full outer join + anti (right) should work with 1-1 matched shards -- full outer join + anti (right) should work with 1-1 matched shards
@ -525,6 +525,11 @@ FROM
LOG: join order: [ "multi_outer_join_left" ][ local partition join "multi_outer_join_right" ][ broadcast join "multi_outer_join_third" ] LOG: join order: [ "multi_outer_join_left" ][ local partition join "multi_outer_join_right" ][ broadcast join "multi_outer_join_third" ]
l_custkey | r_custkey | t_custkey l_custkey | r_custkey | t_custkey
-----------+-----------+----------- -----------+-----------+-----------
11 | 11 | 11
12 | 12 | 12
13 | 13 | 13
14 | 14 | 14
15 | 15 | 15
21 | 21 | 21 21 | 21 | 21
22 | 22 | 22 22 | 22 | 22
23 | 23 | 23 23 | 23 | 23
@ -535,11 +540,6 @@ LOG: join order: [ "multi_outer_join_left" ][ local partition join "multi_outer
28 | 28 | 28 28 | 28 | 28
29 | 29 | 29 29 | 29 | 29
30 | 30 | 30 30 | 30 | 30
11 | 11 | 11
12 | 12 | 12
13 | 13 | 13
14 | 14 | 14
15 | 15 | 15
(15 rows) (15 rows)
-- inner (broadcast) join + 2 shards left (local) join should work -- inner (broadcast) join + 2 shards left (local) join should work
@ -552,16 +552,6 @@ FROM
LOG: join order: [ "multi_outer_join_left" ][ broadcast join "multi_outer_join_third" ][ local partition join "multi_outer_join_right" ] LOG: join order: [ "multi_outer_join_left" ][ broadcast join "multi_outer_join_third" ][ local partition join "multi_outer_join_right" ]
l_custkey | t_custkey | r_custkey l_custkey | t_custkey | r_custkey
-----------+-----------+----------- -----------+-----------+-----------
21 | 21 | 21
22 | 22 | 22
23 | 23 | 23
24 | 24 | 24
25 | 25 | 25
26 | 26 | 26
27 | 27 | 27
28 | 28 | 28
29 | 29 | 29
30 | 30 | 30
1 | 1 | 1 | 1 |
2 | 2 | 2 | 2 |
3 | 3 | 3 | 3 |
@ -577,6 +567,16 @@ LOG: join order: [ "multi_outer_join_left" ][ broadcast join "multi_outer_join_
13 | 13 | 13 13 | 13 | 13
14 | 14 | 14 14 | 14 | 14
15 | 15 | 15 15 | 15 | 15
21 | 21 | 21
22 | 22 | 22
23 | 23 | 23
24 | 24 | 24
25 | 25 | 25
26 | 26 | 26
27 | 27 | 27
28 | 28 | 28
29 | 29 | 29
30 | 30 | 30
(25 rows) (25 rows)
-- inner (local) join + 2 shards left (dual partition) join should error out -- inner (local) join + 2 shards left (dual partition) join should error out
@ -598,16 +598,6 @@ FROM
LOG: join order: [ "multi_outer_join_left" ][ broadcast join "multi_outer_join_third" ][ local partition join "multi_outer_join_right" ] LOG: join order: [ "multi_outer_join_left" ][ broadcast join "multi_outer_join_third" ][ local partition join "multi_outer_join_right" ]
l_custkey | t_custkey | r_custkey l_custkey | t_custkey | r_custkey
-----------+-----------+----------- -----------+-----------+-----------
21 | 21 | 21
22 | 22 | 22
23 | 23 | 23
24 | 24 | 24
25 | 25 | 25
26 | 26 | 26
27 | 27 | 27
28 | 28 | 28
29 | 29 | 29
30 | 30 | 30
1 | 1 | 1 | 1 |
2 | 2 | 2 | 2 |
3 | 3 | 3 | 3 |
@ -623,6 +613,16 @@ LOG: join order: [ "multi_outer_join_left" ][ broadcast join "multi_outer_join_
13 | 13 | 13 13 | 13 | 13
14 | 14 | 14 14 | 14 | 14
15 | 15 | 15 15 | 15 | 15
21 | 21 | 21
22 | 22 | 22
23 | 23 | 23
24 | 24 | 24
25 | 25 | 25
26 | 26 | 26
27 | 27 | 27
28 | 28 | 28
29 | 29 | 29
30 | 30 | 30
(25 rows) (25 rows)
-- inner (broadcast) join + 2 shards left (local) + anti join should work -- inner (broadcast) join + 2 shards left (local) + anti join should work
@ -660,16 +660,6 @@ FROM
LOG: join order: [ "multi_outer_join_right" ][ local partition join "multi_outer_join_left" ][ broadcast join "multi_outer_join_third" ] LOG: join order: [ "multi_outer_join_right" ][ local partition join "multi_outer_join_left" ][ broadcast join "multi_outer_join_third" ]
t_custkey t_custkey
----------- -----------
21
22
23
24
25
26
27
28
29
30
11 11
12 12
13 13
@ -680,6 +670,16 @@ LOG: join order: [ "multi_outer_join_right" ][ local partition join "multi_oute
18 18
19 19
20 20
21
22
23
24
25
26
27
28
29
30
(20 rows) (20 rows)
-- Add a shard to the left table that overlaps with multiple shards in the right -- Add a shard to the left table that overlaps with multiple shards in the right

View File

@ -71,67 +71,54 @@ CREATE FUNCTION column_name_to_column(regclass, text)
-- test distribution metadata functionality -- test distribution metadata functionality
-- =================================================================== -- ===================================================================
-- create table to be distributed -- create hash distributed table
CREATE TABLE events_hash ( CREATE TABLE events_hash (
id bigint, id bigint,
name text name text
); );
SELECT master_create_distributed_table('events_hash', 'name', 'hash');
-- for this table we'll "distribute" manually but verify using function calls -- create worker shards
INSERT INTO pg_dist_shard SELECT master_create_worker_shards('events_hash', 4, 2);
(shardid, logicalrelid, shardstorage, shardminvalue, shardmaxvalue)
VALUES
(1, 'events_hash'::regclass, 't', '0', '10'),
(2, 'events_hash'::regclass, 't', '10', '20'),
(3, 'events_hash'::regclass, 't', '20', '30'),
(4, 'events_hash'::regclass, 't', '30', '40');
INSERT INTO pg_dist_shard_placement -- set shardstate of one replication from each shard to 0 (invalid value)
(nodename, nodeport, shardid, shardstate, shardlength) UPDATE pg_dist_shard_placement SET shardstate = 0 WHERE nodeport = 57638 AND shardid BETWEEN 103025 AND 103028;
VALUES
('cluster-worker-01', 5432, 1, 0, 0),
('cluster-worker-01', 5432, 2, 0, 0),
('cluster-worker-02', 5433, 3, 0, 0),
('cluster-worker-02', 5433, 4, 0, 0),
('cluster-worker-03', 5434, 1, 1, 0),
('cluster-worker-03', 5434, 2, 1, 0),
('cluster-worker-04', 5435, 3, 1, 0),
('cluster-worker-04', 5435, 4, 1, 0);
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey)
VALUES
('events_hash'::regclass, 'h', column_name_to_column('events_hash'::regclass, 'name'));
-- should see above shard identifiers -- should see above shard identifiers
SELECT load_shard_id_array('events_hash'); SELECT load_shard_id_array('events_hash');
-- should see array with first shard range -- should see array with first shard range
SELECT load_shard_interval_array(1, 0); SELECT load_shard_interval_array(103025, 0);
-- should even work for range-partitioned shards -- should even work for range-partitioned shards
BEGIN; -- create range distributed table
UPDATE pg_dist_shard SET CREATE TABLE events_range (
shardminvalue = 'Aardvark', id bigint,
shardmaxvalue = 'Zebra' name text
WHERE shardid = 1; );
SELECT master_create_distributed_table('events_range', 'name', 'range');
UPDATE pg_dist_partition SET partmethod = 'r' -- create empty shard
WHERE logicalrelid = 'events_hash'::regclass; SELECT master_create_empty_shard('events_range');
SELECT load_shard_interval_array(1, ''::text); UPDATE pg_dist_shard SET
ROLLBACK; shardminvalue = 'Aardvark',
shardmaxvalue = 'Zebra'
WHERE shardid = 103029;
SELECT load_shard_interval_array(103029, ''::text);
-- should see error for non-existent shard -- should see error for non-existent shard
SELECT load_shard_interval_array(5, 0); SELECT load_shard_interval_array(103030, 0);
-- should see two placements -- should see two placements
SELECT load_shard_placement_array(2, false); SELECT load_shard_placement_array(103026, false);
-- only one of which is finalized -- only one of which is finalized
SELECT load_shard_placement_array(2, true); SELECT load_shard_placement_array(103026, true);
-- should see error for non-existent shard -- should see error for non-existent shard
SELECT load_shard_placement_array(6, false); SELECT load_shard_placement_array(103031, false);
-- should see column id of 'name' -- should see column id of 'name'
SELECT partition_column_id('events_hash'); SELECT partition_column_id('events_hash');
@ -152,9 +139,11 @@ SELECT column_name_to_column_id('events_hash', 'non_existent');
-- drop shard rows (must drop placements first) -- drop shard rows (must drop placements first)
DELETE FROM pg_dist_shard_placement DELETE FROM pg_dist_shard_placement
WHERE shardid BETWEEN 1 AND 4; WHERE shardid BETWEEN 103025 AND 103029;
DELETE FROM pg_dist_shard DELETE FROM pg_dist_shard
WHERE logicalrelid = 'events_hash'::regclass; WHERE logicalrelid = 'events_hash'::regclass;
DELETE FROM pg_dist_shard
WHERE logicalrelid = 'events_range'::regclass;
-- verify that an eager load shows them missing -- verify that an eager load shows them missing
SELECT load_shard_id_array('events_hash'); SELECT load_shard_id_array('events_hash');

View File

@ -8,6 +8,58 @@ SET citus.explain_distributed_queries TO off;
SET citus.log_multi_join_order TO TRUE; SET citus.log_multi_join_order TO TRUE;
SET client_min_messages TO DEBUG2; SET client_min_messages TO DEBUG2;
-- Create new table definitions for use in testing in distributed planning and
-- execution functionality. Also create indexes to boost performance.
CREATE TABLE lineitem_hash (
l_orderkey bigint not null,
l_partkey integer not null,
l_suppkey integer not null,
l_linenumber integer not null,
l_quantity decimal(15, 2) not null,
l_extendedprice decimal(15, 2) not null,
l_discount decimal(15, 2) not null,
l_tax decimal(15, 2) not null,
l_returnflag char(1) not null,
l_linestatus char(1) not null,
l_shipdate date not null,
l_commitdate date not null,
l_receiptdate date not null,
l_shipinstruct char(25) not null,
l_shipmode char(10) not null,
l_comment varchar(44) not null,
PRIMARY KEY(l_orderkey, l_linenumber) );
SELECT master_create_distributed_table('lineitem_hash', 'l_orderkey', 'hash');
SELECT master_create_worker_shards('lineitem_hash', 2, 1);
CREATE INDEX lineitem_hash_time_index ON lineitem_hash (l_shipdate);
CREATE TABLE orders_hash (
o_orderkey bigint not null,
o_custkey integer not null,
o_orderstatus char(1) not null,
o_totalprice decimal(15,2) not null,
o_orderdate date not null,
o_orderpriority char(15) not null,
o_clerk char(15) not null,
o_shippriority integer not null,
o_comment varchar(79) not null,
PRIMARY KEY(o_orderkey) );
SELECT master_create_distributed_table('orders_hash', 'o_orderkey', 'hash');
SELECT master_create_worker_shards('orders_hash', 2, 1);
CREATE TABLE customer_hash (
c_custkey integer not null,
c_name varchar(25) not null,
c_address varchar(40) not null,
c_nationkey integer not null,
c_phone char(15) not null,
c_acctbal decimal(15,2) not null,
c_mktsegment char(10) not null,
c_comment varchar(117) not null);
SELECT master_create_distributed_table('customer_hash', 'c_custkey', 'hash');
SELECT master_create_worker_shards('customer_hash', 1, 1);
-- The following query checks that we can correctly handle self-joins -- The following query checks that we can correctly handle self-joins
EXPLAIN SELECT l1.l_quantity FROM lineitem l1, lineitem l2 EXPLAIN SELECT l1.l_quantity FROM lineitem l1, lineitem l2
@ -37,35 +89,17 @@ BEGIN;
-- Validate that we take into account the partition method when building the -- Validate that we take into account the partition method when building the
-- join-order plan. -- join-order plan.
UPDATE pg_dist_partition SET partmethod = 'h' WHERE EXPLAIN SELECT count(*) FROM orders, lineitem_hash
logicalrelid = (SELECT relfilenode FROM pg_class WHERE relname = 'lineitem');
EXPLAIN SELECT count(*) FROM orders, lineitem
WHERE o_orderkey = l_orderkey; WHERE o_orderkey = l_orderkey;
-- Verify we handle local joins between two hash-partitioned tables. -- Verify we handle local joins between two hash-partitioned tables.
UPDATE pg_dist_partition SET partmethod = 'h' WHERE EXPLAIN SELECT count(*) FROM orders_hash, lineitem_hash
logicalrelid = (SELECT relfilenode FROM pg_class WHERE relname = 'orders');
EXPLAIN SELECT count(*) FROM orders, lineitem
WHERE o_orderkey = l_orderkey; WHERE o_orderkey = l_orderkey;
UPDATE pg_dist_partition SET partmethod = 'a' WHERE
logicalrelid = (SELECT relfilenode FROM pg_class WHERE relname = 'lineitem');
UPDATE pg_dist_partition SET partmethod = 'a' WHERE
logicalrelid = (SELECT relfilenode FROM pg_class WHERE relname = 'orders');
-- Validate that we can handle broadcast joins with hash-partitioned tables. -- Validate that we can handle broadcast joins with hash-partitioned tables.
UPDATE pg_dist_partition SET partmethod = 'h' WHERE EXPLAIN SELECT count(*) FROM customer_hash, nation
logicalrelid = (SELECT relfilenode FROM pg_class WHERE relname = 'customer');
EXPLAIN SELECT count(*) FROM customer, nation
WHERE c_nationkey = n_nationkey; WHERE c_nationkey = n_nationkey;
UPDATE pg_dist_partition SET partmethod = 'a' WHERE
logicalrelid = (SELECT relfilenode FROM pg_class WHERE relname = 'customer');
-- Update the large table shard count for all the following tests. -- Update the large table shard count for all the following tests.
SET citus.large_table_shard_count TO 1; SET citus.large_table_shard_count TO 1;
@ -76,28 +110,20 @@ EXPLAIN SELECT count(*) FROM orders, lineitem, customer
-- Validate that we don't chose a single-partition join method with a -- Validate that we don't chose a single-partition join method with a
-- hash-partitioned base table -- hash-partitioned base table
UPDATE pg_dist_partition SET partmethod = 'h' WHERE EXPLAIN SELECT count(*) FROM orders, customer_hash
logicalrelid = (SELECT relfilenode FROM pg_class WHERE relname = 'customer');
EXPLAIN SELECT count(*) FROM orders, customer
WHERE c_custkey = o_custkey; WHERE c_custkey = o_custkey;
UPDATE pg_dist_partition SET partmethod = 'a' WHERE
logicalrelid = (SELECT relfilenode FROM pg_class WHERE relname = 'customer');
-- Validate that we can re-partition a hash partitioned table to join with a -- Validate that we can re-partition a hash partitioned table to join with a
-- range partitioned one. -- range partitioned one.
UPDATE pg_dist_partition SET partmethod = 'h' WHERE EXPLAIN SELECT count(*) FROM orders_hash, customer
logicalrelid = (SELECT relfilenode FROM pg_class WHERE relname = 'orders');
EXPLAIN SELECT count(*) FROM orders, customer
WHERE c_custkey = o_custkey; WHERE c_custkey = o_custkey;
UPDATE pg_dist_partition SET partmethod = 'a' WHERE
logicalrelid = (SELECT relfilenode FROM pg_class WHERE relname = 'orders');
COMMIT; COMMIT;
-- Reset client logging level to its previous value -- Reset client logging level to its previous value
SET client_min_messages TO NOTICE; SET client_min_messages TO NOTICE;
DROP TABLE lineitem_hash;
DROP TABLE orders_hash;
DROP TABLE customer_hash;

View File

@ -36,20 +36,12 @@ CREATE FUNCTION print_sorted_shard_intervals(regclass)
-- test shard pruning functionality -- test shard pruning functionality
-- =================================================================== -- ===================================================================
-- create distributed table metadata to observe shard pruning -- create distributed table observe shard pruning
CREATE TABLE pruning ( species text, last_pruned date, plant_id integer ); CREATE TABLE pruning ( species text, last_pruned date, plant_id integer );
SELECT master_create_distributed_table('pruning', 'species', 'hash');
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey) -- create worker shards
VALUES SELECT master_create_worker_shards('pruning', 4, 1);
('pruning'::regclass, 'h', column_name_to_column('pruning'::regclass, 'species'));
INSERT INTO pg_dist_shard
(shardid, logicalrelid, shardstorage, shardminvalue, shardmaxvalue)
VALUES
(10, 'pruning'::regclass, 't', '-2147483648', '-1073741826'),
(11, 'pruning'::regclass, 't', '-1073741825', '-3'),
(12, 'pruning'::regclass, 't', '-2', '1073741820'),
(13, 'pruning'::regclass, 't', '1073741821', '2147483647');
-- with no values, expect all shards -- with no values, expect all shards
SELECT prune_using_no_values('pruning'); SELECT prune_using_no_values('pruning');
@ -76,45 +68,52 @@ SELECT debug_equality_expression('pruning');
SELECT print_sorted_shard_intervals('pruning'); SELECT print_sorted_shard_intervals('pruning');
-- update only min value for one shard -- update only min value for one shard
UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 11; UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 103071;
SELECT print_sorted_shard_intervals('pruning'); SELECT print_sorted_shard_intervals('pruning');
-- now lets have one more shard without min/max values -- now lets have one more shard without min/max values
UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 12; UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 103072;
SELECT print_sorted_shard_intervals('pruning'); SELECT print_sorted_shard_intervals('pruning');
-- now lets have one more shard without min/max values -- now lets have one more shard without min/max values
UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 10; UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 103070;
SELECT print_sorted_shard_intervals('pruning'); SELECT print_sorted_shard_intervals('pruning');
-- all shard placements are uninitialized -- all shard placements are uninitialized
UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 13; UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 103073;
SELECT print_sorted_shard_intervals('pruning'); SELECT print_sorted_shard_intervals('pruning');
-- now update the metadata so that the table is a range distributed table -- create range distributed table observe shard pruning
UPDATE pg_dist_partition SET partmethod = 'r' WHERE logicalrelid = 'pruning'::regclass; CREATE TABLE pruning_range ( species text, last_pruned date, plant_id integer );
SELECT master_create_distributed_table('pruning_range', 'species', 'range');
-- create worker shards
SELECT master_create_empty_shard('pruning_range');
SELECT master_create_empty_shard('pruning_range');
SELECT master_create_empty_shard('pruning_range');
SELECT master_create_empty_shard('pruning_range');
-- now the comparison is done via the partition column type, which is text -- now the comparison is done via the partition column type, which is text
UPDATE pg_dist_shard SET shardminvalue = 'a', shardmaxvalue = 'b' WHERE shardid = 10; UPDATE pg_dist_shard SET shardminvalue = 'a', shardmaxvalue = 'b' WHERE shardid = 103074;
UPDATE pg_dist_shard SET shardminvalue = 'c', shardmaxvalue = 'd' WHERE shardid = 11; UPDATE pg_dist_shard SET shardminvalue = 'c', shardmaxvalue = 'd' WHERE shardid = 103075;
UPDATE pg_dist_shard SET shardminvalue = 'e', shardmaxvalue = 'f' WHERE shardid = 12; UPDATE pg_dist_shard SET shardminvalue = 'e', shardmaxvalue = 'f' WHERE shardid = 103076;
UPDATE pg_dist_shard SET shardminvalue = 'g', shardmaxvalue = 'h' WHERE shardid = 13; UPDATE pg_dist_shard SET shardminvalue = 'g', shardmaxvalue = 'h' WHERE shardid = 103077;
-- print the ordering of shard intervals with range partitioning as well -- print the ordering of shard intervals with range partitioning as well
SELECT print_sorted_shard_intervals('pruning'); SELECT print_sorted_shard_intervals('pruning_range');
-- update only min value for one shard -- update only min value for one shard
UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 11; UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 103075;
SELECT print_sorted_shard_intervals('pruning'); SELECT print_sorted_shard_intervals('pruning_range');
-- now lets have one more shard without min/max values -- now lets have one more shard without min/max values
UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 12; UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 103076;
SELECT print_sorted_shard_intervals('pruning'); SELECT print_sorted_shard_intervals('pruning_range');
-- now lets have one more shard without min/max values -- now lets have one more shard without min/max values
UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 10; UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 103074;
SELECT print_sorted_shard_intervals('pruning'); SELECT print_sorted_shard_intervals('pruning_range');
-- all shard placements are uninitialized -- all shard placements are uninitialized
UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 13; UPDATE pg_dist_shard set shardminvalue = NULL, shardmaxvalue = NULL WHERE shardid = 103077;
SELECT print_sorted_shard_intervals('pruning'); SELECT print_sorted_shard_intervals('pruning_range');