SET citus.next_shard_id TO 850000; -- =================================================================== -- test end-to-end query functionality -- =================================================================== CREATE TABLE articles ( id bigint NOT NULL, author_id bigint NOT NULL, title varchar(20) NOT NULL, word_count integer NOT NULL CHECK (word_count > 0) ); -- this table is used in a CTE test CREATE TABLE authors ( name text, id bigint ); -- this table is used in router executor tests CREATE TABLE articles_single_shard (LIKE articles); SELECT master_create_distributed_table('articles', 'author_id', 'hash'); master_create_distributed_table --------------------------------- (1 row) SELECT master_create_distributed_table('articles_single_shard', 'author_id', 'hash'); master_create_distributed_table --------------------------------- (1 row) SELECT master_create_worker_shards('articles', 2, 1); master_create_worker_shards ----------------------------- (1 row) SELECT master_create_worker_shards('articles_single_shard', 1, 1); master_create_worker_shards ----------------------------- (1 row) -- create a bunch of test data INSERT INTO articles VALUES ( 1, 1, 'arsenous', 9572); INSERT INTO articles VALUES ( 2, 2, 'abducing', 13642); INSERT INTO articles VALUES ( 3, 3, 'asternal', 10480); INSERT INTO articles VALUES ( 4, 4, 'altdorfer', 14551); INSERT INTO articles VALUES ( 5, 5, 'aruru', 11389); INSERT INTO articles VALUES ( 6, 6, 'atlases', 15459); INSERT INTO articles VALUES ( 7, 7, 'aseptic', 12298); INSERT INTO articles VALUES ( 8, 8, 'agatized', 16368); INSERT INTO articles VALUES ( 9, 9, 'alligate', 438); INSERT INTO articles VALUES (10, 10, 'aggrandize', 17277); INSERT INTO articles VALUES (11, 1, 'alamo', 1347); INSERT INTO articles VALUES (12, 2, 'archiblast', 18185); INSERT INTO articles VALUES (13, 3, 'aseyev', 2255); INSERT INTO articles VALUES (14, 4, 'andesite', 19094); INSERT INTO articles VALUES (15, 5, 'adversa', 3164); INSERT INTO articles VALUES (16, 6, 'allonym', 2); INSERT INTO articles VALUES (17, 7, 'auriga', 4073); INSERT INTO articles VALUES (18, 8, 'assembly', 911); INSERT INTO articles VALUES (19, 9, 'aubergiste', 4981); INSERT INTO articles VALUES (20, 10, 'absentness', 1820); INSERT INTO articles VALUES (21, 1, 'arcading', 5890); INSERT INTO articles VALUES (22, 2, 'antipope', 2728); INSERT INTO articles VALUES (23, 3, 'abhorring', 6799); INSERT INTO articles VALUES (24, 4, 'audacious', 3637); INSERT INTO articles VALUES (25, 5, 'antehall', 7707); INSERT INTO articles VALUES (26, 6, 'abington', 4545); INSERT INTO articles VALUES (27, 7, 'arsenous', 8616); INSERT INTO articles VALUES (28, 8, 'aerophyte', 5454); INSERT INTO articles VALUES (29, 9, 'amateur', 9524); INSERT INTO articles VALUES (30, 10, 'andelee', 6363); INSERT INTO articles VALUES (31, 1, 'athwartships', 7271); INSERT INTO articles VALUES (32, 2, 'amazon', 11342); INSERT INTO articles VALUES (33, 3, 'autochrome', 8180); INSERT INTO articles VALUES (34, 4, 'amnestied', 12250); INSERT INTO articles VALUES (35, 5, 'aminate', 9089); INSERT INTO articles VALUES (36, 6, 'ablation', 13159); INSERT INTO articles VALUES (37, 7, 'archduchies', 9997); INSERT INTO articles VALUES (38, 8, 'anatine', 14067); INSERT INTO articles VALUES (39, 9, 'anchises', 10906); INSERT INTO articles VALUES (40, 10, 'attemper', 14976); INSERT INTO articles VALUES (41, 1, 'aznavour', 11814); INSERT INTO articles VALUES (42, 2, 'ausable', 15885); INSERT INTO articles VALUES (43, 3, 'affixal', 12723); INSERT INTO articles VALUES (44, 4, 'anteport', 16793); INSERT INTO articles VALUES (45, 5, 'afrasia', 864); INSERT INTO articles VALUES (46, 6, 'atlanta', 17702); INSERT INTO articles VALUES (47, 7, 'abeyance', 1772); INSERT INTO articles VALUES (48, 8, 'alkylic', 18610); INSERT INTO articles VALUES (49, 9, 'anyone', 2681); INSERT INTO articles VALUES (50, 10, 'anjanette', 19519); -- insert a single row for the test INSERT INTO articles_single_shard VALUES (50, 10, 'anjanette', 19519); -- zero-shard modifications should succeed UPDATE articles SET title = '' WHERE author_id = 1 AND author_id = 2; UPDATE articles SET title = '' WHERE 0 = 1; DELETE FROM articles WHERE author_id = 1 AND author_id = 2; -- single-shard tests -- test simple select for a single row SELECT * FROM articles WHERE author_id = 10 AND id = 50; id | author_id | title | word_count ----+-----------+-----------+------------ 50 | 10 | anjanette | 19519 (1 row) -- get all titles by a single author SELECT title FROM articles WHERE author_id = 10; title ------------ aggrandize absentness andelee attemper anjanette (5 rows) -- try ordering them by word count SELECT title, word_count FROM articles WHERE author_id = 10 ORDER BY word_count DESC NULLS LAST; title | word_count ------------+------------ anjanette | 19519 aggrandize | 17277 attemper | 14976 andelee | 6363 absentness | 1820 (5 rows) -- look at last two articles by an author SELECT title, id FROM articles WHERE author_id = 5 ORDER BY id LIMIT 2; title | id ---------+---- aruru | 5 adversa | 15 (2 rows) -- find all articles by two authors in same shard SELECT title, author_id FROM articles WHERE author_id = 7 OR author_id = 8 ORDER BY author_id ASC, id; title | author_id -------------+----------- aseptic | 7 auriga | 7 arsenous | 7 archduchies | 7 abeyance | 7 agatized | 8 assembly | 8 aerophyte | 8 anatine | 8 alkylic | 8 (10 rows) -- add in some grouping expressions SELECT author_id, sum(word_count) AS corpus_size FROM articles WHERE author_id = 1 OR author_id = 2 OR author_id = 8 OR author_id = 10 GROUP BY author_id HAVING sum(word_count) > 40000 ORDER BY sum(word_count) DESC; author_id | corpus_size -----------+------------- 2 | 61782 10 | 59955 8 | 55410 (3 rows) -- UNION/INTERSECT queries are supported if on multiple shards SELECT * FROM articles WHERE author_id = 10 UNION SELECT * FROM articles WHERE author_id = 2 ORDER BY 1,2,3; id | author_id | title | word_count ----+-----------+------------+------------ 2 | 2 | abducing | 13642 10 | 10 | aggrandize | 17277 12 | 2 | archiblast | 18185 20 | 10 | absentness | 1820 22 | 2 | antipope | 2728 30 | 10 | andelee | 6363 32 | 2 | amazon | 11342 40 | 10 | attemper | 14976 42 | 2 | ausable | 15885 50 | 10 | anjanette | 19519 (10 rows) -- queries using CTEs are supported WITH long_names AS ( SELECT id FROM authors WHERE char_length(name) > 15 ) SELECT title FROM articles ORDER BY 1 LIMIT 5; title ----------- abducing abeyance abhorring abington ablation (5 rows) -- queries which involve functions in FROM clause are recursively planned SELECT * FROM articles, position('om' in 'Thomas') ORDER BY 2 DESC, 1 DESC, 3 DESC LIMIT 5; id | author_id | title | word_count | position ----+-----------+------------+------------+---------- 50 | 10 | anjanette | 19519 | 3 40 | 10 | attemper | 14976 | 3 30 | 10 | andelee | 6363 | 3 20 | 10 | absentness | 1820 | 3 10 | 10 | aggrandize | 17277 | 3 (5 rows) -- subqueries are supported in WHERE clause in Citus even if the relations are not distributed SELECT * FROM articles WHERE author_id IN (SELECT id FROM authors WHERE name LIKE '%a'); id | author_id | title | word_count ----+-----------+-------+------------ (0 rows) -- subqueries are supported in FROM clause SELECT articles.id,test.word_count FROM articles, (SELECT id, word_count FROM articles) AS test WHERE test.id = articles.id ORDER BY articles.id; id | word_count ----+------------ 1 | 9572 2 | 13642 3 | 10480 4 | 14551 5 | 11389 6 | 15459 7 | 12298 8 | 16368 9 | 438 10 | 17277 11 | 1347 12 | 18185 13 | 2255 14 | 19094 15 | 3164 16 | 2 17 | 4073 18 | 911 19 | 4981 20 | 1820 21 | 5890 22 | 2728 23 | 6799 24 | 3637 25 | 7707 26 | 4545 27 | 8616 28 | 5454 29 | 9524 30 | 6363 31 | 7271 32 | 11342 33 | 8180 34 | 12250 35 | 9089 36 | 13159 37 | 9997 38 | 14067 39 | 10906 40 | 14976 41 | 11814 42 | 15885 43 | 12723 44 | 16793 45 | 864 46 | 17702 47 | 1772 48 | 18610 49 | 2681 50 | 19519 (50 rows) -- subqueries are not supported in SELECT clause SELECT a.title AS name, (SELECT a2.id FROM articles_single_shard a2 WHERE a.id = a2.id LIMIT 1) AS special_price FROM articles a; ERROR: could not run distributed query with subquery outside the FROM and WHERE clauses HINT: Consider using an equality filter on the distributed table's partition column. -- joins are not supported between local and distributed tables SELECT title, authors.name FROM authors, articles WHERE authors.id = articles.author_id; ERROR: relation authors is not distributed -- inner joins are not supported (I think) SELECT * FROM (articles INNER JOIN authors ON articles.id = authors.id); ERROR: relation authors is not distributed -- test use of EXECUTE statements within plpgsql DO $sharded_execute$ BEGIN EXECUTE 'SELECT COUNT(*) FROM articles ' || 'WHERE author_id = $1 AND author_id = $2' USING 1, 2; END $sharded_execute$; -- test use of bare SQL within plpgsql DO $sharded_sql$ BEGIN SELECT COUNT(*) FROM articles WHERE author_id = 1 AND author_id = 2; END $sharded_sql$; ERROR: query has no destination for result data HINT: If you want to discard the results of a SELECT, use PERFORM instead. CONTEXT: PL/pgSQL function inline_code_block line 3 at SQL statement -- test cross-shard queries SELECT COUNT(*) FROM articles; count ------- 50 (1 row) -- test with empty target list SELECT FROM articles; -- (50 rows) SELECT FROM articles WHERE author_id = 3737; -- (0 rows) SELECT FROM articles WHERE word_count = 65500; -- (0 rows) -- having queries supported in Citus SELECT author_id, sum(word_count) AS corpus_size FROM articles GROUP BY author_id HAVING sum(word_count) > 25000 ORDER BY sum(word_count) DESC LIMIT 5; author_id | corpus_size -----------+------------- 4 | 66325 2 | 61782 10 | 59955 8 | 55410 6 | 50867 (5 rows) SELECT author_id FROM articles GROUP BY author_id HAVING sum(word_count) > 50000 ORDER BY author_id; author_id ----------- 2 4 6 8 10 (5 rows) SELECT author_id FROM articles GROUP BY author_id HAVING sum(word_count) > 50000 AND author_id < 5 ORDER BY author_id; author_id ----------- 2 4 (2 rows) SELECT author_id FROM articles GROUP BY author_id HAVING sum(word_count) > 50000 OR author_id < 5 ORDER BY author_id; author_id ----------- 1 2 3 4 6 8 10 (7 rows) SELECT author_id FROM articles GROUP BY author_id HAVING author_id <= 2 OR author_id = 8 ORDER BY author_id; author_id ----------- 1 2 8 (3 rows) SELECT o_orderstatus, count(*), avg(o_totalprice) FROM orders GROUP BY o_orderstatus HAVING count(*) > 1450 OR avg(o_totalprice) > 150000 ORDER BY o_orderstatus; o_orderstatus | count | avg ---------------+-------+--------------------- O | 1461 | 143326.447029431896 P | 75 | 164847.914533333333 (2 rows) SELECT o_orderstatus, sum(l_linenumber), avg(l_linenumber) FROM lineitem, orders WHERE l_orderkey = o_orderkey AND l_orderkey > 9030 GROUP BY o_orderstatus HAVING sum(l_linenumber) > 1000 ORDER BY o_orderstatus; o_orderstatus | sum | avg ---------------+------+-------------------- F | 8559 | 3.0126715945089757 O | 8904 | 3.0040485829959514 (2 rows) -- now, test the cases where Citus do or do not need to create -- the master queries SET client_min_messages TO 'DEBUG2'; SET citus.task_executor_type TO 'real-time'; -- start with the simple lookup query SELECT * FROM articles WHERE author_id = 1; DEBUG: Creating router plan DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+--------------+------------ 1 | 1 | arsenous | 9572 11 | 1 | alamo | 1347 21 | 1 | arcading | 5890 31 | 1 | athwartships | 7271 41 | 1 | aznavour | 11814 (5 rows) -- below query hits a single shard, so no need to create the master query SELECT * FROM articles WHERE author_id = 1 OR author_id = 17; DEBUG: Creating router plan DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+--------------+------------ 1 | 1 | arsenous | 9572 11 | 1 | alamo | 1347 21 | 1 | arcading | 5890 31 | 1 | athwartships | 7271 41 | 1 | aznavour | 11814 (5 rows) -- below query hits two shards, so needs to create the master query SELECT * FROM articles WHERE author_id = 1 OR author_id = 18; id | author_id | title | word_count ----+-----------+--------------+------------ 1 | 1 | arsenous | 9572 11 | 1 | alamo | 1347 21 | 1 | arcading | 5890 31 | 1 | athwartships | 7271 41 | 1 | aznavour | 11814 (5 rows) -- rename the output columns on a no master query case SELECT id as article_id, word_count * id as random_value FROM articles WHERE author_id = 1; DEBUG: Creating router plan DEBUG: Plan is router executable article_id | random_value ------------+-------------- 1 | 9572 11 | 14817 21 | 123690 31 | 225401 41 | 484374 (5 rows) -- we can push down co-located joins to a single worker without the -- master query being required for only the same tables SELECT a.author_id as first_author, b.word_count as second_word_count FROM articles a, articles b WHERE a.author_id = 10 and a.author_id = b.author_id LIMIT 3; DEBUG: Creating router plan DEBUG: Plan is router executable first_author | second_word_count --------------+------------------- 10 | 17277 10 | 1820 10 | 6363 (3 rows) -- now show that JOINs with multiple tables are not router executable -- they are executed by real-time executor SELECT a.author_id as first_author, b.word_count as second_word_count FROM articles a, articles_single_shard b WHERE a.author_id = 10 and a.author_id = b.author_id LIMIT 3; DEBUG: Creating router plan DEBUG: Plan is router executable first_author | second_word_count --------------+------------------- 10 | 19519 10 | 19519 10 | 19519 (3 rows) -- do not create the master query for LIMIT on a single shard SELECT SELECT * FROM articles WHERE author_id = 1 LIMIT 2; DEBUG: Creating router plan DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+----------+------------ 1 | 1 | arsenous | 9572 11 | 1 | alamo | 1347 (2 rows) -- This query hits a single shard. So GROUP BY can be -- pushed down to the workers directly. This query is -- equivalent to SELECT DISTINCT on a single shard. SELECT id FROM articles WHERE author_id = 1 GROUP BY id ORDER BY id; DEBUG: Creating router plan DEBUG: Plan is router executable id ---- 1 11 21 31 41 (5 rows) -- copying from a single shard table does not require the master query COPY articles_single_shard TO stdout; DEBUG: Creating router plan DEBUG: Plan is router executable 50 10 anjanette 19519 -- error out for queries with aggregates SELECT avg(word_count) FROM articles WHERE author_id = 2; DEBUG: Creating router plan DEBUG: Plan is router executable avg -------------------- 12356.400000000000 (1 row) -- max, min, sum, count is somehow implemented -- differently in distributed planning SELECT max(word_count) as max, min(word_count) as min, sum(word_count) as sum, count(word_count) as cnt FROM articles WHERE author_id = 2; DEBUG: Creating router plan DEBUG: Plan is router executable max | min | sum | cnt -------+------+-------+----- 18185 | 2728 | 61782 | 5 (1 row) -- error out for queries with repartition jobs SELECT * FROM articles a, articles b WHERE a.id = b.id AND a.author_id = 1; DEBUG: join prunable for task partitionId 0 and 1 DEBUG: join prunable for task partitionId 0 and 2 DEBUG: join prunable for task partitionId 0 and 3 DEBUG: join prunable for task partitionId 1 and 0 DEBUG: join prunable for task partitionId 1 and 2 DEBUG: join prunable for task partitionId 1 and 3 DEBUG: join prunable for task partitionId 2 and 0 DEBUG: join prunable for task partitionId 2 and 1 DEBUG: join prunable for task partitionId 2 and 3 DEBUG: join prunable for task partitionId 3 and 0 DEBUG: join prunable for task partitionId 3 and 1 DEBUG: join prunable for task partitionId 3 and 2 DEBUG: pruning merge fetch taskId 1 DETAIL: Creating dependency on merge taskId 2 DEBUG: pruning merge fetch taskId 2 DETAIL: Creating dependency on merge taskId 3 DEBUG: pruning merge fetch taskId 4 DETAIL: Creating dependency on merge taskId 4 DEBUG: pruning merge fetch taskId 5 DETAIL: Creating dependency on merge taskId 6 DEBUG: pruning merge fetch taskId 7 DETAIL: Creating dependency on merge taskId 6 DEBUG: pruning merge fetch taskId 8 DETAIL: Creating dependency on merge taskId 9 DEBUG: pruning merge fetch taskId 10 DETAIL: Creating dependency on merge taskId 8 DEBUG: pruning merge fetch taskId 11 DETAIL: Creating dependency on merge taskId 12 ERROR: the query contains a join that requires repartitioning HINT: Set citus.enable_repartition_joins to on to enable repartitioning -- system columns from shard tables can be queried and retrieved SELECT count(*) FROM ( SELECT tableoid, ctid, cmin, cmax, xmin, xmax FROM articles WHERE tableoid IS NOT NULL OR ctid IS NOT NULL OR cmin IS NOT NULL OR cmax IS NOT NULL OR xmin IS NOT NULL OR xmax IS NOT NULL ) x; count ------- 50 (1 row) SET client_min_messages to 'NOTICE';