From 95862632de600aee112eb7196d71519c43efc37d Mon Sep 17 00:00:00 2001 From: Murat Tuncer Date: Mon, 26 Dec 2016 16:42:22 +0300 Subject: [PATCH] Add citus tools to default configuration --- src/backend/distributed/Makefile | 4 +- ...us_tools.sql => citus--6.1-13--6.1-14.sql} | 77 +++--- src/backend/distributed/citus.control | 2 +- .../regress/expected/multi_citus_tools.out | 220 ++++++++++++++++-- src/test/regress/sql/multi_citus_tools.sql | 81 +++++-- 5 files changed, 318 insertions(+), 66 deletions(-) rename src/backend/distributed/{master/citus_tools.sql => citus--6.1-13--6.1-14.sql} (79%) diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index 1648ec5fb..42fd988f1 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -9,7 +9,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \ 5.2-1 5.2-2 5.2-3 5.2-4 \ 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \ - 6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 + 6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 6.1-14 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -121,6 +121,8 @@ $(EXTENSION)--6.1-12.sql: $(EXTENSION)--6.1-11.sql $(EXTENSION)--6.1-11--6.1-12. cat $^ > $@ $(EXTENSION)--6.1-13.sql: $(EXTENSION)--6.1-12.sql $(EXTENSION)--6.1-12--6.1-13.sql cat $^ > $@ +$(EXTENSION)--6.1-14.sql: $(EXTENSION)--6.1-13.sql $(EXTENSION)--6.1-13--6.1-14.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/master/citus_tools.sql b/src/backend/distributed/citus--6.1-13--6.1-14.sql similarity index 79% rename from src/backend/distributed/master/citus_tools.sql rename to src/backend/distributed/citus--6.1-13--6.1-14.sql index 136d71cd8..0d6c5e2b0 100644 --- a/src/backend/distributed/master/citus_tools.sql +++ b/src/backend/distributed/citus--6.1-13--6.1-14.sql @@ -1,24 +1,19 @@ -/* - * citus_tools.sql - * Contains definitions of citus_tools UDFs - * - citus_run_on_all_workers - * - citus_run_on_all_placements - * - citus_run_on_all_colocated_placements - * - citus_run_on_all_shards - * - * These functions depends on presence of UDF master_run_on_worker - */ +/* citus--6.1-13--6.1-14.sql */ -CREATE OR REPLACE FUNCTION master_run_on_worker(worker_name text[], port integer[], - command text[], parallel boolean, - OUT node_name text, OUT node_port integer, - OUT success boolean, OUT result text ) +CREATE OR REPLACE FUNCTION pg_catalog.master_run_on_worker(worker_name text[], + port integer[], + command text[], + parallel boolean, + OUT node_name text, + OUT node_port integer, + OUT success boolean, + OUT result text ) RETURNS SETOF record LANGUAGE C STABLE STRICT - AS 'citus.so', $$master_run_on_worker$$; + AS 'MODULE_PATHNAME', $$master_run_on_worker$$; -CREATE TYPE colocation_placement_type AS ( +CREATE TYPE citus.colocation_placement_type AS ( shardid1 bigint, shardid2 bigint, nodename text, @@ -26,10 +21,11 @@ CREATE TYPE colocation_placement_type AS ( ); -- --- tables_colocated returns true if given tables are co-located, false otherwise. +-- distributed_tables_colocated returns true if given tables are co-located, false otherwise. -- The function checks shard definitions, matches shard placements for given tables. -- -CREATE OR REPLACE FUNCTION citus_tables_colocated(table1 regclass, table2 regclass) +CREATE OR REPLACE FUNCTION pg_catalog.distributed_tables_colocated(table1 regclass, + table2 regclass) RETURNS bool LANGUAGE plpgsql AS $function$ @@ -39,8 +35,8 @@ DECLARE table2_shard_count int; table1_placement_count int; table2_placement_count int; - table1_placements colocation_placement_type[]; - table2_placements colocation_placement_type[]; + table1_placements citus.colocation_placement_type[]; + table2_placements citus.colocation_placement_type[]; BEGIN SELECT count(*), (SELECT count(*) FROM pg_dist_shard a WHERE a.logicalrelid = table1), @@ -61,10 +57,12 @@ BEGIN WHERE tba.logicalrelid = table1 AND tbb.logicalrelid = table2), left_shard_placements AS ( SELECT cs.shardid1, cs.shardid2, sp.nodename, sp.nodeport - FROM colocated_shards cs JOIN pg_dist_shard_placement sp ON (cs.shardid1 = sp.shardid) + FROM colocated_shards cs JOIN pg_dist_shard_placement sp + ON (cs.shardid1 = sp.shardid) WHERE sp.shardstate = 1) SELECT - array_agg((lsp.shardid1, lsp.shardid2, lsp.nodename, lsp.nodeport)::colocation_placement_type + array_agg( + (lsp.shardid1, lsp.shardid2, lsp.nodename, lsp.nodeport)::citus.colocation_placement_type ORDER BY shardid1, shardid2, nodename, nodeport), count(distinct lsp.shardid1) FROM left_shard_placements lsp @@ -79,7 +77,8 @@ BEGIN FROM colocated_shards cs LEFT JOIN pg_dist_shard_placement sp ON(cs.shardid2 = sp.shardid) WHERE sp.shardstate = 1) SELECT - array_agg((rsp.shardid1, rsp.shardid2, rsp.nodename, rsp.nodeport)::colocation_placement_type + array_agg( + (rsp.shardid1, rsp.shardid2, rsp.nodename, rsp.nodeport)::citus.colocation_placement_type ORDER BY shardid1, shardid2, nodename, nodeport), count(distinct rsp.shardid2) FROM right_shard_placements rsp @@ -106,7 +105,7 @@ END; $function$; -CREATE OR REPLACE FUNCTION citus_run_on_all_workers(command text, +CREATE OR REPLACE FUNCTION pg_catalog.run_command_on_workers(command text, parallel bool default true, OUT nodename text, OUT nodeport int, @@ -131,12 +130,14 @@ END; $function$; -CREATE OR REPLACE FUNCTION citus_run_on_all_placements(table_name regclass, command text, - parallel bool default true, - OUT nodename text, - OUT nodeport int, - OUT shardid bigint, - OUT success bool, OUT result text) +CREATE OR REPLACE FUNCTION pg_catalog.run_command_on_placements(table_name regclass, + command text, + parallel bool default true, + OUT nodename text, + OUT nodeport int, + OUT shardid bigint, + OUT success bool, + OUT result text) RETURNS SETOF record LANGUAGE plpgsql AS $function$ @@ -169,7 +170,8 @@ END; $function$; -CREATE OR REPLACE FUNCTION citus_run_on_all_colocated_placements(table_name1 regclass, +CREATE OR REPLACE FUNCTION pg_catalog.run_command_on_colocated_placements( + table_name1 regclass, table_name2 regclass, command text, parallel bool default true, @@ -189,7 +191,7 @@ DECLARE shards2 bigint[]; commands text[]; BEGIN - IF NOT (SELECT citus_tables_colocated(table_name1, table_name2)) THEN + IF NOT (SELECT distributed_tables_colocated(table_name1, table_name2)) THEN RAISE EXCEPTION 'tables % and % are not co-located', table_name1, table_name2; END IF; @@ -240,11 +242,12 @@ END; $function$; -CREATE OR REPLACE FUNCTION citus_run_on_all_shards(table_name regclass, command text, - parallel bool default true, - OUT shardid bigint, - OUT success bool, - OUT result text) +CREATE OR REPLACE FUNCTION pg_catalog.run_command_on_shards(table_name regclass, + command text, + parallel bool default true, + OUT shardid bigint, + OUT success bool, + OUT result text) RETURNS SETOF record LANGUAGE plpgsql AS $function$ diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index a3c856e4f..5c7ded9ec 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '6.1-13' +default_version = '6.1-14' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/test/regress/expected/multi_citus_tools.out b/src/test/regress/expected/multi_citus_tools.out index 3e604087a..6d89c6d51 100644 --- a/src/test/regress/expected/multi_citus_tools.out +++ b/src/test/regress/expected/multi_citus_tools.out @@ -5,17 +5,13 @@ -- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1240000; ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1240000; --- the function is not exposed explicitly, create the entry point -CREATE OR REPLACE FUNCTION master_run_on_worker(worker_name text[], port integer[], - command text[], - parallel boolean default false, - OUT node_name text, OUT node_port integer, - OUT success boolean, OUT result text) - RETURNS SETOF record - LANGUAGE C STABLE STRICT - AS 'citus.so', $$master_run_on_worker$$; -- test with invalid port, prevent OS dependent warning from being displayed SET client_min_messages to ERROR; +-- PG 9.5 does not show context for plpgsql raise +-- message whereas PG 9.6 shows. disabling it +-- for this test only to have consistent behavior +-- b/w PG 9.6+ and PG 9.5. +\set SHOW_CONTEXT never SELECT * FROM master_run_on_worker(ARRAY['localhost']::text[], ARRAY['666']::int[], ARRAY['select count(*) from pg_dist_shard']::text[], false); @@ -364,7 +360,205 @@ SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]: localhost | 57637 | f | ERROR: relation "second_table" does not exist (1 row) --- drop the function after use -DROP FUNCTION master_run_on_worker(worker_name text[], port integer[], command text[], - parallel boolean, OUT node_name text, OUT node_port integer, - OUT success boolean, OUT result text); +-- run_command_on_XXX tests +SELECT * FROM run_command_on_workers('select 1') ORDER BY 2 ASC; + nodename | nodeport | success | result +-----------+----------+---------+-------- + localhost | 57637 | t | 1 + localhost | 57638 | t | 1 +(2 rows) + +SELECT * FROM run_command_on_workers('select count(*) from pg_dist_partition') ORDER BY 2 ASC; + nodename | nodeport | success | result +-----------+----------+---------+-------- + localhost | 57637 | t | 0 + localhost | 57638 | t | 0 +(2 rows) + +-- make sure run_on_all_placements respects shardstate +CREATE TABLE check_placements (key int); +SELECT master_create_distributed_table('check_placements', 'key', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_worker_shards('check_placements', 5, 2); + master_create_worker_shards +----------------------------- + +(1 row) + +SELECT * FROM run_command_on_placements('check_placements', 'select 1'); + nodename | nodeport | shardid | success | result +-----------+----------+---------+---------+-------- + localhost | 57637 | 1240000 | t | 1 + localhost | 57638 | 1240000 | t | 1 + localhost | 57637 | 1240001 | t | 1 + localhost | 57638 | 1240001 | t | 1 + localhost | 57637 | 1240002 | t | 1 + localhost | 57638 | 1240002 | t | 1 + localhost | 57637 | 1240003 | t | 1 + localhost | 57638 | 1240003 | t | 1 + localhost | 57637 | 1240004 | t | 1 + localhost | 57638 | 1240004 | t | 1 +(10 rows) + +UPDATE pg_dist_shard_placement SET shardstate = 3 + WHERE shardid % 2 = 0 AND nodeport = :worker_1_port; +SELECT * FROM run_command_on_placements('check_placements', 'select 1'); + nodename | nodeport | shardid | success | result +-----------+----------+---------+---------+-------- + localhost | 57638 | 1240000 | t | 1 + localhost | 57637 | 1240001 | t | 1 + localhost | 57638 | 1240001 | t | 1 + localhost | 57638 | 1240002 | t | 1 + localhost | 57637 | 1240003 | t | 1 + localhost | 57638 | 1240003 | t | 1 + localhost | 57638 | 1240004 | t | 1 +(7 rows) + +DROP TABLE check_placements CASCADE; +-- make sure run_on_all_colocated_placements correctly detects colocation +CREATE TABLE check_colocated (key int); +SELECT master_create_distributed_table('check_colocated', 'key', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_worker_shards('check_colocated', 5, 2); + master_create_worker_shards +----------------------------- + +(1 row) + +CREATE TABLE second_table (key int); +SELECT master_create_distributed_table('second_table', 'key', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_worker_shards('second_table', 4, 2); + master_create_worker_shards +----------------------------- + +(1 row) + +SELECT * FROM run_command_on_colocated_placements('check_colocated', 'second_table', + 'select 1'); +ERROR: tables check_colocated and second_table are not co-located +-- even when the difference is in replication factor, an error is thrown +SELECT master_drop_all_shards('second_table'::regclass, current_schema(), 'second_table'); + master_drop_all_shards +------------------------ + 4 +(1 row) + +SELECT master_create_worker_shards('second_table', 5, 1); + master_create_worker_shards +----------------------------- + +(1 row) + +SELECT * FROM run_command_on_colocated_placements('check_colocated', 'second_table', + 'select 1'); +ERROR: tables check_colocated and second_table are not co-located +-- when everything matches, the command is run! +SELECT master_drop_all_shards('second_table'::regclass, current_schema(), 'second_table'); + master_drop_all_shards +------------------------ + 5 +(1 row) + +SELECT master_create_worker_shards('second_table', 5, 2); + master_create_worker_shards +----------------------------- + +(1 row) + +SELECT * FROM run_command_on_colocated_placements('check_colocated', 'second_table', + 'select 1'); + nodename | nodeport | shardid1 | shardid2 | success | result +-----------+----------+----------+----------+---------+-------- + localhost | 57637 | 1240005 | 1240019 | t | 1 + localhost | 57638 | 1240005 | 1240019 | t | 1 + localhost | 57637 | 1240006 | 1240020 | t | 1 + localhost | 57638 | 1240006 | 1240020 | t | 1 + localhost | 57637 | 1240007 | 1240021 | t | 1 + localhost | 57638 | 1240007 | 1240021 | t | 1 + localhost | 57637 | 1240008 | 1240022 | t | 1 + localhost | 57638 | 1240008 | 1240022 | t | 1 + localhost | 57637 | 1240009 | 1240023 | t | 1 + localhost | 57638 | 1240009 | 1240023 | t | 1 +(10 rows) + +-- when a placement is invalid considers the tables to not be colocated +UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = ( + SELECT shardid FROM pg_dist_shard + WHERE nodeport = :worker_1_port AND logicalrelid = 'second_table'::regclass + ORDER BY 1 ASC LIMIT 1 +); +SELECT * FROM run_command_on_colocated_placements('check_colocated', 'second_table', + 'select 1'); +ERROR: tables check_colocated and second_table are not co-located +-- when matching placement is also invalid, considers the tables to be colocated +UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = ( + SELECT shardid FROM pg_dist_shard + WHERE nodeport = :worker_1_port AND logicalrelid = 'check_colocated'::regclass + ORDER BY 1 ASC LIMIT 1 +); +SELECT * FROM run_command_on_colocated_placements('check_colocated', 'second_table', + 'select 1'); + nodename | nodeport | shardid1 | shardid2 | success | result +-----------+----------+----------+----------+---------+-------- + localhost | 57638 | 1240005 | 1240019 | t | 1 + localhost | 57637 | 1240006 | 1240020 | t | 1 + localhost | 57638 | 1240006 | 1240020 | t | 1 + localhost | 57637 | 1240007 | 1240021 | t | 1 + localhost | 57638 | 1240007 | 1240021 | t | 1 + localhost | 57637 | 1240008 | 1240022 | t | 1 + localhost | 57638 | 1240008 | 1240022 | t | 1 + localhost | 57637 | 1240009 | 1240023 | t | 1 + localhost | 57638 | 1240009 | 1240023 | t | 1 +(9 rows) + +DROP TABLE check_colocated CASCADE; +DROP TABLE second_table CASCADE; +-- runs on all shards +CREATE TABLE check_shards (key int); +SELECT master_create_distributed_table('check_shards', 'key', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_worker_shards('check_shards', 5, 2); + master_create_worker_shards +----------------------------- + +(1 row) + +SELECT * FROM run_command_on_shards('check_shards', 'select 1'); + shardid | success | result +---------+---------+-------- + 1240024 | t | 1 + 1240025 | t | 1 + 1240026 | t | 1 + 1240027 | t | 1 + 1240028 | t | 1 +(5 rows) + +UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid % 2 = 0; +SELECT * FROM run_command_on_shards('check_shards', 'select 1'); +NOTICE: some shards do not have active placements + shardid | success | result +---------+---------+-------- + 1240025 | t | 1 + 1240027 | t | 1 +(2 rows) + +DROP TABLE check_shards CASCADE; +-- set SHOW_CONTEXT back to default +\set SHOW_CONTEXT errors diff --git a/src/test/regress/sql/multi_citus_tools.sql b/src/test/regress/sql/multi_citus_tools.sql index 6e17dd1c0..81032a994 100644 --- a/src/test/regress/sql/multi_citus_tools.sql +++ b/src/test/regress/sql/multi_citus_tools.sql @@ -7,18 +7,14 @@ ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1240000; ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1240000; --- the function is not exposed explicitly, create the entry point -CREATE OR REPLACE FUNCTION master_run_on_worker(worker_name text[], port integer[], - command text[], - parallel boolean default false, - OUT node_name text, OUT node_port integer, - OUT success boolean, OUT result text) - RETURNS SETOF record - LANGUAGE C STABLE STRICT - AS 'citus.so', $$master_run_on_worker$$; - -- test with invalid port, prevent OS dependent warning from being displayed SET client_min_messages to ERROR; +-- PG 9.5 does not show context for plpgsql raise +-- message whereas PG 9.6 shows. disabling it +-- for this test only to have consistent behavior +-- b/w PG 9.6+ and PG 9.5. +\set SHOW_CONTEXT never + SELECT * FROM master_run_on_worker(ARRAY['localhost']::text[], ARRAY['666']::int[], ARRAY['select count(*) from pg_dist_shard']::text[], false); @@ -208,9 +204,66 @@ SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]: SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], ARRAY['select count(*) from second_table']::text[], true); +-- run_command_on_XXX tests +SELECT * FROM run_command_on_workers('select 1') ORDER BY 2 ASC; +SELECT * FROM run_command_on_workers('select count(*) from pg_dist_partition') ORDER BY 2 ASC; --- drop the function after use -DROP FUNCTION master_run_on_worker(worker_name text[], port integer[], command text[], - parallel boolean, OUT node_name text, OUT node_port integer, - OUT success boolean, OUT result text); +-- make sure run_on_all_placements respects shardstate +CREATE TABLE check_placements (key int); +SELECT master_create_distributed_table('check_placements', 'key', 'hash'); +SELECT master_create_worker_shards('check_placements', 5, 2); +SELECT * FROM run_command_on_placements('check_placements', 'select 1'); +UPDATE pg_dist_shard_placement SET shardstate = 3 + WHERE shardid % 2 = 0 AND nodeport = :worker_1_port; +SELECT * FROM run_command_on_placements('check_placements', 'select 1'); +DROP TABLE check_placements CASCADE; +-- make sure run_on_all_colocated_placements correctly detects colocation +CREATE TABLE check_colocated (key int); +SELECT master_create_distributed_table('check_colocated', 'key', 'hash'); +SELECT master_create_worker_shards('check_colocated', 5, 2); +CREATE TABLE second_table (key int); +SELECT master_create_distributed_table('second_table', 'key', 'hash'); +SELECT master_create_worker_shards('second_table', 4, 2); +SELECT * FROM run_command_on_colocated_placements('check_colocated', 'second_table', + 'select 1'); +-- even when the difference is in replication factor, an error is thrown +SELECT master_drop_all_shards('second_table'::regclass, current_schema(), 'second_table'); +SELECT master_create_worker_shards('second_table', 5, 1); +SELECT * FROM run_command_on_colocated_placements('check_colocated', 'second_table', + 'select 1'); +-- when everything matches, the command is run! +SELECT master_drop_all_shards('second_table'::regclass, current_schema(), 'second_table'); +SELECT master_create_worker_shards('second_table', 5, 2); +SELECT * FROM run_command_on_colocated_placements('check_colocated', 'second_table', + 'select 1'); +-- when a placement is invalid considers the tables to not be colocated +UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = ( + SELECT shardid FROM pg_dist_shard + WHERE nodeport = :worker_1_port AND logicalrelid = 'second_table'::regclass + ORDER BY 1 ASC LIMIT 1 +); +SELECT * FROM run_command_on_colocated_placements('check_colocated', 'second_table', + 'select 1'); +-- when matching placement is also invalid, considers the tables to be colocated +UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = ( + SELECT shardid FROM pg_dist_shard + WHERE nodeport = :worker_1_port AND logicalrelid = 'check_colocated'::regclass + ORDER BY 1 ASC LIMIT 1 +); +SELECT * FROM run_command_on_colocated_placements('check_colocated', 'second_table', + 'select 1'); +DROP TABLE check_colocated CASCADE; +DROP TABLE second_table CASCADE; + +-- runs on all shards +CREATE TABLE check_shards (key int); +SELECT master_create_distributed_table('check_shards', 'key', 'hash'); +SELECT master_create_worker_shards('check_shards', 5, 2); +SELECT * FROM run_command_on_shards('check_shards', 'select 1'); +UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid % 2 = 0; +SELECT * FROM run_command_on_shards('check_shards', 'select 1'); +DROP TABLE check_shards CASCADE; + +-- set SHOW_CONTEXT back to default +\set SHOW_CONTEXT errors