-- Test functions for partitioning intermediate results CREATE SCHEMA partitioned_intermediate_results; SET search_path TO 'partitioned_intermediate_results'; -- hash partitioned intermediate results BEGIN; SELECT * FROM worker_partition_query_result('squares_hash', 'SELECT i, i * i FROM generate_series(1, 10) i', 0, 'hash', '{-2147483648,-1073741824,0,1073741824}'::text[], '{-1073741825,-1,1073741823,2147483647}'::text[], false); partition_index | rows_written | bytes_written --------------------------------------------------------------------- 0 | 4 | 21 1 | 3 | 14 2 | 1 | 5 3 | 2 | 9 (4 rows) SELECT hashint4(x), x, x2 FROM read_intermediate_result('squares_hash_0', 'text') AS res (x int, x2 int) ORDER BY x; hashint4 | x | x2 --------------------------------------------------------------------- -1905060026 | 1 | 1 -1330264708 | 5 | 25 -2047600124 | 8 | 64 -1547814713 | 10 | 100 (4 rows) SELECT hashint4(x), x, x2 FROM read_intermediate_result('squares_hash_1', 'text') AS res (x int, x2 int) ORDER BY x; hashint4 | x | x2 --------------------------------------------------------------------- -28094569 | 3 | 9 -1011077333 | 4 | 16 -978793473 | 7 | 49 (3 rows) SELECT hashint4(x), x, x2 FROM read_intermediate_result('squares_hash_2', 'text') AS res (x int, x2 int) ORDER BY x; hashint4 | x | x2 --------------------------------------------------------------------- 566031088 | 6 | 36 (1 row) SELECT hashint4(x), x, x2 FROM read_intermediate_result('squares_hash_3', 'text') AS res (x int, x2 int) ORDER BY x; hashint4 | x | x2 --------------------------------------------------------------------- 1134484726 | 2 | 4 1672378334 | 9 | 81 (2 rows) END; -- range partitioned intermediate results BEGIN; SELECT * FROM worker_partition_query_result('squares_range', 'SELECT i, i * i FROM generate_series(1, 10) i', 1, /* partition by x^2 */ 'range', '{0,21,41,61}'::text[], '{20,40,60,100}'::text[], true /* binary format */); partition_index | rows_written | bytes_written --------------------------------------------------------------------- 0 | 4 | 93 1 | 2 | 57 2 | 1 | 39 3 | 3 | 75 (4 rows) SELECT x, x2 FROM read_intermediate_result('squares_range_0', 'binary') AS res (x int, x2 int) ORDER BY x; x | x2 --------------------------------------------------------------------- 1 | 1 2 | 4 3 | 9 4 | 16 (4 rows) SELECT x, x2 FROM read_intermediate_result('squares_range_1', 'binary') AS res (x int, x2 int) ORDER BY x; x | x2 --------------------------------------------------------------------- 5 | 25 6 | 36 (2 rows) SELECT x, x2 FROM read_intermediate_result('squares_range_2', 'binary') AS res (x int, x2 int) ORDER BY x; x | x2 --------------------------------------------------------------------- 7 | 49 (1 row) SELECT x, x2 FROM read_intermediate_result('squares_range_3', 'binary') AS res (x int, x2 int) ORDER BY x; x | x2 --------------------------------------------------------------------- 8 | 64 9 | 81 10 | 100 (3 rows) END; -- 1M rows, just in case. text format. BEGIN; SELECT * FROM worker_partition_query_result('doubles_hash', 'SELECT i, i * 2 FROM generate_series(1, 1000000) i', 0, 'hash', '{-2147483648,-1073741824,0,1073741824}'::text[], '{-1073741825,-1,1073741823,2147483647}'::text[], false); partition_index | rows_written | bytes_written --------------------------------------------------------------------- 0 | 250199 | 3586179 1 | 249872 | 3581280 2 | 250278 | 3587487 3 | 249651 | 3578401 (4 rows) SELECT count(*) FROM read_intermediate_results(ARRAY['doubles_hash_0', 'doubles_hash_1', 'doubles_hash_2', 'doubles_hash_3'], 'text') AS res (x int, x2 int); count --------------------------------------------------------------------- 1000000 (1 row) END; -- 1M rows, just in case. binary format. BEGIN; SELECT * FROM worker_partition_query_result('doubles_range', 'SELECT i, i * 2 FROM generate_series(1, 1000000) i', 0, 'range', '{0,250001,500001,750001}'::text[], '{250000,500000,750000,1000000}'::text[], true); partition_index | rows_written | bytes_written --------------------------------------------------------------------- 0 | 250000 | 4500021 1 | 250000 | 4500021 2 | 250000 | 4500021 3 | 250000 | 4500021 (4 rows) SELECT count(*) FROM read_intermediate_results(ARRAY['doubles_range_0', 'doubles_range_1', 'doubles_range_2', 'doubles_range_3'], 'binary') AS res (x int, x2 int); count --------------------------------------------------------------------- 1000000 (1 row) END; -- -- Some error cases -- -- not allowed outside transaction block SELECT * FROM worker_partition_query_result('squares_range', 'SELECT i, i * i FROM generate_series(1, 10) i', 1, 'range', '{0}'::text[], '{20}'::text[], true); ERROR: worker_partition_query_result can only be used in a transaction block BEGIN; SAVEPOINT s1; -- syntax error in query SELECT worker_partition_query_result('squares_range', 'SELECxT i, i * i FROM generate_series(1, 10) i', 1, 'range', '{0,21,41,61}'::text[], '{20,40,60,100}'::text[], true); ERROR: syntax error at or near "SELECxT" ROLLBACK TO SAVEPOINT s1; -- invalid result prefix SELECT worker_partition_query_result('squares_range/a/', 'SELECT i, i * i FROM generate_series(1, 10) i', 1, 'range', '{0,21,41,61}'::text[], '{20,40,60,100}'::text[], true); ERROR: result key "squares_range/a/" contains invalid character HINT: Result keys may only contain letters, numbers, underscores and hyphens. ROLLBACK TO SAVEPOINT s1; -- empty min/max values SELECT worker_partition_query_result('squares_range', 'SELECT i, i * i FROM generate_series(1, 10) i', 1, 'range', ARRAY[]::text[], ARRAY[]::text[], true); ERROR: number of partitions cannot be 0 ROLLBACK TO SAVEPOINT s1; -- append partitioning SELECT worker_partition_query_result('squares_range', 'SELECT i, i * i FROM generate_series(1, 10) i', 1, 'append', '{0,21,41,61}'::text[], '{20,40,60,100}'::text[], true); ERROR: only hash and range partitiong schemes are supported ROLLBACK TO SAVEPOINT s1; -- query with no results CREATE TABLE t(a int); SELECT worker_partition_query_result('squares_range', 'INSERT INTO t VALUES (1), (2)', 1, 'range', '{0,21,41,61}'::text[], '{20,40,60,100}'::text[], true); ERROR: query must generate a set of rows ROLLBACK TO SAVEPOINT s1; -- negative partition index SELECT worker_partition_query_result('squares_range', 'SELECT i, i * i FROM generate_series(1, 10) i', -1, 'range', '{0,21,41,61}'::text[], '{20,40,60,100}'::text[], true); ERROR: partition column index must be between 0 and 1 ROLLBACK TO SAVEPOINT s1; -- too large partition index SELECT worker_partition_query_result('squares_range', 'SELECT i, i * i FROM generate_series(1, 10) i', 2, 'range', '{0,21,41,61}'::text[], '{20,40,60,100}'::text[], true); ERROR: partition column index must be between 0 and 1 ROLLBACK TO SAVEPOINT s1; -- min/max values of different lengths SELECT worker_partition_query_result('squares_range', 'SELECT i, i * i FROM generate_series(1, 10) i', 1, 'range', '{0,21,41,61,101}'::text[], '{20,40,60,100}'::text[], true); ERROR: min values and max values must have the same number of elements ROLLBACK TO SAVEPOINT s1; -- null values in min/max values of hash partitioned results SELECT worker_partition_query_result('squares_hash', 'SELECT i, i * i FROM generate_series(1, 10) i', 1, 'hash', '{NULL,21,41,61}'::text[], '{20,40,60,100}'::text[], true); ERROR: hash partitioned table has uninitialized shards ROLLBACK TO SAVEPOINT s1; -- multiple queries SELECT worker_partition_query_result('squares_hash', 'SELECT i, i * i FROM generate_series(1, 10) i; SELECT 4, 16;', 1, 'hash', '{NULL,21,41,61}'::text[], '{20,40,60,100}'::text[], true); ERROR: cannot execute multiple utility events ROLLBACK TO SAVEPOINT s1; ROLLBACK; -- -- Procedure for conveniently testing worker_partition_query_result(). It uses -- worker_partition_query_results to partition result of query using the same -- scheme as the distributed table rel, and then compares if it did the partitioning -- the same way as shards of rel. -- CREATE OR REPLACE PROCEDURE test_partition_query_results(rel regclass, query text, binaryCopy boolean DEFAULT true) AS $$ DECLARE partition_min_values text[]; partition_max_values text[]; partition_column_index int; partition_method citus.distribution_type; partitioned_results_row_counts text[]; distributed_table_row_counts text[]; tuple_def text; partition_result_names text[]; non_empty_partitions int[]; rows_different int; BEGIN -- get tuple definition SELECT string_agg(a.attname || ' ' || pg_catalog.format_type(a.atttypid, a.atttypmod), ', ' ORDER BY a.attnum) INTO tuple_def FROM pg_catalog.pg_attribute a WHERE a.attrelid = rel::oid AND a.attnum > 0 AND NOT a.attisdropped; -- get min/max value arrays SELECT array_agg(shardminvalue ORDER BY shardid), array_agg(shardmaxvalue ORDER BY shardid) INTO partition_min_values, partition_max_values FROM pg_dist_shard WHERE logicalrelid=rel; -- get partition column index and partition method SELECT (regexp_matches(partkey, ':varattno ([0-9]+)'))[1]::int - 1, (CASE WHEN partmethod='h' THEN 'hash' ELSE 'range' END) INTO partition_column_index, partition_method FROM pg_dist_partition WHERE logicalrelid=rel; -- insert into the distributed table EXECUTE 'INSERT INTO ' || rel::text || ' ' || query; -- repartition the query locally SELECT array_agg(rows_written::text ORDER BY partition_index), array_agg(partition_index) FILTER (WHERE rows_written > 0) INTO partitioned_results_row_counts, non_empty_partitions FROM worker_partition_query_result('test_prefix', query, partition_column_index, partition_method, partition_min_values, partition_max_values, binaryCopy); SELECT array_agg('test_prefix_' || i::text) INTO partition_result_names FROM unnest(non_empty_partitions) i; EXECUTE 'SELECT count(*) FROM ((' || query || ') EXCEPT (SELECT * FROM read_intermediate_results($1,$2) AS res (' || tuple_def || '))) t' INTO rows_different USING partition_result_names, (CASE WHEN binaryCopy THEN 'binary' ELSE 'text' END)::pg_catalog.citus_copy_format; -- commit so results are available in run_command_on_shards COMMIT; -- rows per shard of the distributed table SELECT array_agg(result order by shardid) INTO distributed_table_row_counts FROM run_command_on_shards(rel, 'SELECT count(*) FROM %s'); IF partitioned_results_row_counts = distributed_table_row_counts THEN RAISE NOTICE 'Rows per partition match ...'; ELSE RAISE 'FAILED: rows per partition do not match, expecting % got %', distributed_table_row_counts, partitioned_results_row_counts; END IF; IF rows_different = 0 THEN RAISE NOTICE 'Row values match ...'; ELSE RAISE 'FAILED: Could not find % of expected rows in partitions', rows_different; END IF; RAISE NOTICE 'PASSED.'; END; $$ LANGUAGE plpgsql; \set VERBOSITY terse -- hash partitioning, 32 shards SET citus.shard_count TO 32; CREATE TABLE t(a int, b int); SELECT create_distributed_table('t', 'a'); create_distributed_table --------------------------------------------------------------------- (1 row) CALL test_partition_query_results('t', 'SELECT x, x * x FROM generate_series(1, 100) x'); NOTICE: Rows per partition match ... NOTICE: Row values match ... NOTICE: PASSED. DROP TABLE t; -- hash partitioning, 1 shard SET citus.shard_count TO 1; CREATE TABLE t(a int, b int); SELECT create_distributed_table('t', 'a'); create_distributed_table --------------------------------------------------------------------- (1 row) CALL test_partition_query_results('t', 'SELECT x, x * x FROM generate_series(1, 100) x'); NOTICE: Rows per partition match ... NOTICE: Row values match ... NOTICE: PASSED. DROP TABLE t; -- hash partitioning, 17 shards (so hash partitions aren't uniform) SET citus.shard_count TO 17; CREATE TABLE t(a int, b int); SELECT create_distributed_table('t', 'a'); create_distributed_table --------------------------------------------------------------------- (1 row) CALL test_partition_query_results('t', 'SELECT x, x * x FROM generate_series(1, 100) x'); NOTICE: Rows per partition match ... NOTICE: Row values match ... NOTICE: PASSED. DROP TABLE t; -- hash partitioning, date partition column SET citus.shard_count TO 8; CREATE TABLE t(a DATE, b int); SELECT create_distributed_table('t', 'a'); create_distributed_table --------------------------------------------------------------------- (1 row) CALL test_partition_query_results('t', 'SELECT (''1985-05-18''::date + (x::text || '' days'')::interval)::date, x * x FROM generate_series(1, 100) x'); NOTICE: Rows per partition match ... NOTICE: Row values match ... NOTICE: PASSED. DROP TABLE t; -- hash partitioning, int4 range partition column SET citus.shard_count TO 8; CREATE TABLE t(a int4range, b int); SELECT create_distributed_table('t', 'a'); create_distributed_table --------------------------------------------------------------------- (1 row) CALL test_partition_query_results('t', 'SELECT int4range(x,2*x+10), x * x FROM generate_series(1, 100) x'); NOTICE: Rows per partition match ... NOTICE: Row values match ... NOTICE: PASSED. DROP TABLE t; -- range partitioning, int partition column CREATE TABLE t(key int, value int); SELECT create_distributed_table('t', 'key', 'range'); create_distributed_table --------------------------------------------------------------------- (1 row) CALL public.create_range_partitioned_shards('t', '{0,25,50,76}', '{24,49,75,200}'); CALL test_partition_query_results('t', 'SELECT x, x * x * x FROM generate_series(1, 105) x'); NOTICE: Rows per partition match ... NOTICE: Row values match ... NOTICE: PASSED. DROP TABLE t; -- not covering ranges, should ERROR CREATE TABLE t(key int, value int); SELECT create_distributed_table('t', 'key', 'range'); create_distributed_table --------------------------------------------------------------------- (1 row) CALL public.create_range_partitioned_shards('t', '{0,25,50,100}', '{24,49,75,200}'); CALL test_partition_query_results('t', 'SELECT x, x * x * x FROM generate_series(1, 105) x'); ERROR: could not find shard for partition column value DROP TABLE t; -- overlapping ranges, we allow this in range partitioned distributed tables, should be fine CREATE TABLE t(key int, value int); SELECT create_distributed_table('t', 'key', 'range'); create_distributed_table --------------------------------------------------------------------- (1 row) CALL public.create_range_partitioned_shards('t', '{0,25,50,76}', '{50,49,90,200}'); CALL test_partition_query_results('t', 'SELECT x, x * x * x FROM generate_series(1, 105) x'); NOTICE: Rows per partition match ... NOTICE: Row values match ... NOTICE: PASSED. DROP TABLE t; -- range partitioning, composite partition column CREATE TYPE composite_key_type AS (f1 int, f2 text); SET citus.shard_count TO 8; CREATE TABLE t(key composite_key_type, value int); SELECT create_distributed_table('t', 'key', 'range'); create_distributed_table --------------------------------------------------------------------- (1 row) CALL public.create_range_partitioned_shards('t', '{"(0,a)","(25,a)","(50,a)","(75,a)"}', '{"(24,z)","(49,z)","(74,z)","(100,z)"}'); CALL test_partition_query_results('t', 'SELECT (x, ''f2_'' || x::text)::composite_key_type, x * x * x FROM generate_series(1, 100) x'); NOTICE: Rows per partition match ... NOTICE: Row values match ... NOTICE: PASSED. DROP TABLE t; DROP TYPE composite_key_type; -- unsorted ranges CREATE TABLE t(key int, value int); SELECT create_distributed_table('t', 'key', 'range'); create_distributed_table --------------------------------------------------------------------- (1 row) CALL public.create_range_partitioned_shards('t', '{50,25,76,0}', '{75,49,200,24}'); CALL test_partition_query_results('t', 'SELECT x, x * x * x FROM generate_series(1, 105) x'); NOTICE: Rows per partition match ... NOTICE: Row values match ... NOTICE: PASSED. DROP TABLE t; SET client_min_messages TO WARNING; DROP SCHEMA partitioned_intermediate_results CASCADE; \set VERBOSITY default SET client_min_messages TO DEFAULT; SET citus.shard_count TO DEFAULT;