Merge pull request #1069 from citusdata/feature/citus_tools

Feature/citus tools
pull/1114/head
Murat Tuncer 2017-01-10 17:01:42 +02:00 committed by GitHub
commit 54b1ebb14e
5 changed files with 318 additions and 66 deletions

View File

@ -9,7 +9,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \
5.2-1 5.2-2 5.2-3 5.2-4 \
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \
6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13
6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 6.1-14
# All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -121,6 +121,8 @@ $(EXTENSION)--6.1-12.sql: $(EXTENSION)--6.1-11.sql $(EXTENSION)--6.1-11--6.1-12.
cat $^ > $@
$(EXTENSION)--6.1-13.sql: $(EXTENSION)--6.1-12.sql $(EXTENSION)--6.1-12--6.1-13.sql
cat $^ > $@
$(EXTENSION)--6.1-14.sql: $(EXTENSION)--6.1-13.sql $(EXTENSION)--6.1-13--6.1-14.sql
cat $^ > $@
NO_PGXS = 1

View File

@ -1,24 +1,19 @@
/*
* citus_tools.sql
* Contains definitions of citus_tools UDFs
* - citus_run_on_all_workers
* - citus_run_on_all_placements
* - citus_run_on_all_colocated_placements
* - citus_run_on_all_shards
*
* These functions depends on presence of UDF master_run_on_worker
*/
/* citus--6.1-13--6.1-14.sql */
CREATE OR REPLACE FUNCTION master_run_on_worker(worker_name text[], port integer[],
command text[], parallel boolean,
OUT node_name text, OUT node_port integer,
OUT success boolean, OUT result text )
CREATE OR REPLACE FUNCTION pg_catalog.master_run_on_worker(worker_name text[],
port integer[],
command text[],
parallel boolean,
OUT node_name text,
OUT node_port integer,
OUT success boolean,
OUT result text )
RETURNS SETOF record
LANGUAGE C STABLE STRICT
AS 'citus.so', $$master_run_on_worker$$;
AS 'MODULE_PATHNAME', $$master_run_on_worker$$;
CREATE TYPE colocation_placement_type AS (
CREATE TYPE citus.colocation_placement_type AS (
shardid1 bigint,
shardid2 bigint,
nodename text,
@ -26,10 +21,11 @@ CREATE TYPE colocation_placement_type AS (
);
--
-- tables_colocated returns true if given tables are co-located, false otherwise.
-- distributed_tables_colocated returns true if given tables are co-located, false otherwise.
-- The function checks shard definitions, matches shard placements for given tables.
--
CREATE OR REPLACE FUNCTION citus_tables_colocated(table1 regclass, table2 regclass)
CREATE OR REPLACE FUNCTION pg_catalog.distributed_tables_colocated(table1 regclass,
table2 regclass)
RETURNS bool
LANGUAGE plpgsql
AS $function$
@ -39,8 +35,8 @@ DECLARE
table2_shard_count int;
table1_placement_count int;
table2_placement_count int;
table1_placements colocation_placement_type[];
table2_placements colocation_placement_type[];
table1_placements citus.colocation_placement_type[];
table2_placements citus.colocation_placement_type[];
BEGIN
SELECT count(*),
(SELECT count(*) FROM pg_dist_shard a WHERE a.logicalrelid = table1),
@ -61,10 +57,12 @@ BEGIN
WHERE tba.logicalrelid = table1 AND tbb.logicalrelid = table2),
left_shard_placements AS (
SELECT cs.shardid1, cs.shardid2, sp.nodename, sp.nodeport
FROM colocated_shards cs JOIN pg_dist_shard_placement sp ON (cs.shardid1 = sp.shardid)
FROM colocated_shards cs JOIN pg_dist_shard_placement sp
ON (cs.shardid1 = sp.shardid)
WHERE sp.shardstate = 1)
SELECT
array_agg((lsp.shardid1, lsp.shardid2, lsp.nodename, lsp.nodeport)::colocation_placement_type
array_agg(
(lsp.shardid1, lsp.shardid2, lsp.nodename, lsp.nodeport)::citus.colocation_placement_type
ORDER BY shardid1, shardid2, nodename, nodeport),
count(distinct lsp.shardid1)
FROM left_shard_placements lsp
@ -79,7 +77,8 @@ BEGIN
FROM colocated_shards cs LEFT JOIN pg_dist_shard_placement sp ON(cs.shardid2 = sp.shardid)
WHERE sp.shardstate = 1)
SELECT
array_agg((rsp.shardid1, rsp.shardid2, rsp.nodename, rsp.nodeport)::colocation_placement_type
array_agg(
(rsp.shardid1, rsp.shardid2, rsp.nodename, rsp.nodeport)::citus.colocation_placement_type
ORDER BY shardid1, shardid2, nodename, nodeport),
count(distinct rsp.shardid2)
FROM right_shard_placements rsp
@ -106,7 +105,7 @@ END;
$function$;
CREATE OR REPLACE FUNCTION citus_run_on_all_workers(command text,
CREATE OR REPLACE FUNCTION pg_catalog.run_command_on_workers(command text,
parallel bool default true,
OUT nodename text,
OUT nodeport int,
@ -131,12 +130,14 @@ END;
$function$;
CREATE OR REPLACE FUNCTION citus_run_on_all_placements(table_name regclass, command text,
CREATE OR REPLACE FUNCTION pg_catalog.run_command_on_placements(table_name regclass,
command text,
parallel bool default true,
OUT nodename text,
OUT nodeport int,
OUT shardid bigint,
OUT success bool, OUT result text)
OUT success bool,
OUT result text)
RETURNS SETOF record
LANGUAGE plpgsql
AS $function$
@ -169,7 +170,8 @@ END;
$function$;
CREATE OR REPLACE FUNCTION citus_run_on_all_colocated_placements(table_name1 regclass,
CREATE OR REPLACE FUNCTION pg_catalog.run_command_on_colocated_placements(
table_name1 regclass,
table_name2 regclass,
command text,
parallel bool default true,
@ -189,7 +191,7 @@ DECLARE
shards2 bigint[];
commands text[];
BEGIN
IF NOT (SELECT citus_tables_colocated(table_name1, table_name2)) THEN
IF NOT (SELECT distributed_tables_colocated(table_name1, table_name2)) THEN
RAISE EXCEPTION 'tables % and % are not co-located', table_name1, table_name2;
END IF;
@ -240,7 +242,8 @@ END;
$function$;
CREATE OR REPLACE FUNCTION citus_run_on_all_shards(table_name regclass, command text,
CREATE OR REPLACE FUNCTION pg_catalog.run_command_on_shards(table_name regclass,
command text,
parallel bool default true,
OUT shardid bigint,
OUT success bool,

View File

@ -1,6 +1,6 @@
# Citus extension
comment = 'Citus distributed database'
default_version = '6.1-13'
default_version = '6.1-14'
module_pathname = '$libdir/citus'
relocatable = false
schema = pg_catalog

View File

@ -5,17 +5,13 @@
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1240000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1240000;
-- the function is not exposed explicitly, create the entry point
CREATE OR REPLACE FUNCTION master_run_on_worker(worker_name text[], port integer[],
command text[],
parallel boolean default false,
OUT node_name text, OUT node_port integer,
OUT success boolean, OUT result text)
RETURNS SETOF record
LANGUAGE C STABLE STRICT
AS 'citus.so', $$master_run_on_worker$$;
-- test with invalid port, prevent OS dependent warning from being displayed
SET client_min_messages to ERROR;
-- PG 9.5 does not show context for plpgsql raise
-- message whereas PG 9.6 shows. disabling it
-- for this test only to have consistent behavior
-- b/w PG 9.6+ and PG 9.5.
\set SHOW_CONTEXT never
SELECT * FROM master_run_on_worker(ARRAY['localhost']::text[], ARRAY['666']::int[],
ARRAY['select count(*) from pg_dist_shard']::text[],
false);
@ -364,7 +360,205 @@ SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]:
localhost | 57637 | f | ERROR: relation "second_table" does not exist
(1 row)
-- drop the function after use
DROP FUNCTION master_run_on_worker(worker_name text[], port integer[], command text[],
parallel boolean, OUT node_name text, OUT node_port integer,
OUT success boolean, OUT result text);
-- run_command_on_XXX tests
SELECT * FROM run_command_on_workers('select 1') ORDER BY 2 ASC;
nodename | nodeport | success | result
-----------+----------+---------+--------
localhost | 57637 | t | 1
localhost | 57638 | t | 1
(2 rows)
SELECT * FROM run_command_on_workers('select count(*) from pg_dist_partition') ORDER BY 2 ASC;
nodename | nodeport | success | result
-----------+----------+---------+--------
localhost | 57637 | t | 0
localhost | 57638 | t | 0
(2 rows)
-- make sure run_on_all_placements respects shardstate
CREATE TABLE check_placements (key int);
SELECT master_create_distributed_table('check_placements', 'key', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('check_placements', 5, 2);
master_create_worker_shards
-----------------------------
(1 row)
SELECT * FROM run_command_on_placements('check_placements', 'select 1');
nodename | nodeport | shardid | success | result
-----------+----------+---------+---------+--------
localhost | 57637 | 1240000 | t | 1
localhost | 57638 | 1240000 | t | 1
localhost | 57637 | 1240001 | t | 1
localhost | 57638 | 1240001 | t | 1
localhost | 57637 | 1240002 | t | 1
localhost | 57638 | 1240002 | t | 1
localhost | 57637 | 1240003 | t | 1
localhost | 57638 | 1240003 | t | 1
localhost | 57637 | 1240004 | t | 1
localhost | 57638 | 1240004 | t | 1
(10 rows)
UPDATE pg_dist_shard_placement SET shardstate = 3
WHERE shardid % 2 = 0 AND nodeport = :worker_1_port;
SELECT * FROM run_command_on_placements('check_placements', 'select 1');
nodename | nodeport | shardid | success | result
-----------+----------+---------+---------+--------
localhost | 57638 | 1240000 | t | 1
localhost | 57637 | 1240001 | t | 1
localhost | 57638 | 1240001 | t | 1
localhost | 57638 | 1240002 | t | 1
localhost | 57637 | 1240003 | t | 1
localhost | 57638 | 1240003 | t | 1
localhost | 57638 | 1240004 | t | 1
(7 rows)
DROP TABLE check_placements CASCADE;
-- make sure run_on_all_colocated_placements correctly detects colocation
CREATE TABLE check_colocated (key int);
SELECT master_create_distributed_table('check_colocated', 'key', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('check_colocated', 5, 2);
master_create_worker_shards
-----------------------------
(1 row)
CREATE TABLE second_table (key int);
SELECT master_create_distributed_table('second_table', 'key', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('second_table', 4, 2);
master_create_worker_shards
-----------------------------
(1 row)
SELECT * FROM run_command_on_colocated_placements('check_colocated', 'second_table',
'select 1');
ERROR: tables check_colocated and second_table are not co-located
-- even when the difference is in replication factor, an error is thrown
SELECT master_drop_all_shards('second_table'::regclass, current_schema(), 'second_table');
master_drop_all_shards
------------------------
4
(1 row)
SELECT master_create_worker_shards('second_table', 5, 1);
master_create_worker_shards
-----------------------------
(1 row)
SELECT * FROM run_command_on_colocated_placements('check_colocated', 'second_table',
'select 1');
ERROR: tables check_colocated and second_table are not co-located
-- when everything matches, the command is run!
SELECT master_drop_all_shards('second_table'::regclass, current_schema(), 'second_table');
master_drop_all_shards
------------------------
5
(1 row)
SELECT master_create_worker_shards('second_table', 5, 2);
master_create_worker_shards
-----------------------------
(1 row)
SELECT * FROM run_command_on_colocated_placements('check_colocated', 'second_table',
'select 1');
nodename | nodeport | shardid1 | shardid2 | success | result
-----------+----------+----------+----------+---------+--------
localhost | 57637 | 1240005 | 1240019 | t | 1
localhost | 57638 | 1240005 | 1240019 | t | 1
localhost | 57637 | 1240006 | 1240020 | t | 1
localhost | 57638 | 1240006 | 1240020 | t | 1
localhost | 57637 | 1240007 | 1240021 | t | 1
localhost | 57638 | 1240007 | 1240021 | t | 1
localhost | 57637 | 1240008 | 1240022 | t | 1
localhost | 57638 | 1240008 | 1240022 | t | 1
localhost | 57637 | 1240009 | 1240023 | t | 1
localhost | 57638 | 1240009 | 1240023 | t | 1
(10 rows)
-- when a placement is invalid considers the tables to not be colocated
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = (
SELECT shardid FROM pg_dist_shard
WHERE nodeport = :worker_1_port AND logicalrelid = 'second_table'::regclass
ORDER BY 1 ASC LIMIT 1
);
SELECT * FROM run_command_on_colocated_placements('check_colocated', 'second_table',
'select 1');
ERROR: tables check_colocated and second_table are not co-located
-- when matching placement is also invalid, considers the tables to be colocated
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = (
SELECT shardid FROM pg_dist_shard
WHERE nodeport = :worker_1_port AND logicalrelid = 'check_colocated'::regclass
ORDER BY 1 ASC LIMIT 1
);
SELECT * FROM run_command_on_colocated_placements('check_colocated', 'second_table',
'select 1');
nodename | nodeport | shardid1 | shardid2 | success | result
-----------+----------+----------+----------+---------+--------
localhost | 57638 | 1240005 | 1240019 | t | 1
localhost | 57637 | 1240006 | 1240020 | t | 1
localhost | 57638 | 1240006 | 1240020 | t | 1
localhost | 57637 | 1240007 | 1240021 | t | 1
localhost | 57638 | 1240007 | 1240021 | t | 1
localhost | 57637 | 1240008 | 1240022 | t | 1
localhost | 57638 | 1240008 | 1240022 | t | 1
localhost | 57637 | 1240009 | 1240023 | t | 1
localhost | 57638 | 1240009 | 1240023 | t | 1
(9 rows)
DROP TABLE check_colocated CASCADE;
DROP TABLE second_table CASCADE;
-- runs on all shards
CREATE TABLE check_shards (key int);
SELECT master_create_distributed_table('check_shards', 'key', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('check_shards', 5, 2);
master_create_worker_shards
-----------------------------
(1 row)
SELECT * FROM run_command_on_shards('check_shards', 'select 1');
shardid | success | result
---------+---------+--------
1240024 | t | 1
1240025 | t | 1
1240026 | t | 1
1240027 | t | 1
1240028 | t | 1
(5 rows)
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid % 2 = 0;
SELECT * FROM run_command_on_shards('check_shards', 'select 1');
NOTICE: some shards do not have active placements
shardid | success | result
---------+---------+--------
1240025 | t | 1
1240027 | t | 1
(2 rows)
DROP TABLE check_shards CASCADE;
-- set SHOW_CONTEXT back to default
\set SHOW_CONTEXT errors

View File

@ -7,18 +7,14 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1240000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1240000;
-- the function is not exposed explicitly, create the entry point
CREATE OR REPLACE FUNCTION master_run_on_worker(worker_name text[], port integer[],
command text[],
parallel boolean default false,
OUT node_name text, OUT node_port integer,
OUT success boolean, OUT result text)
RETURNS SETOF record
LANGUAGE C STABLE STRICT
AS 'citus.so', $$master_run_on_worker$$;
-- test with invalid port, prevent OS dependent warning from being displayed
SET client_min_messages to ERROR;
-- PG 9.5 does not show context for plpgsql raise
-- message whereas PG 9.6 shows. disabling it
-- for this test only to have consistent behavior
-- b/w PG 9.6+ and PG 9.5.
\set SHOW_CONTEXT never
SELECT * FROM master_run_on_worker(ARRAY['localhost']::text[], ARRAY['666']::int[],
ARRAY['select count(*) from pg_dist_shard']::text[],
false);
@ -208,9 +204,66 @@ SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]:
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['select count(*) from second_table']::text[],
true);
-- run_command_on_XXX tests
SELECT * FROM run_command_on_workers('select 1') ORDER BY 2 ASC;
SELECT * FROM run_command_on_workers('select count(*) from pg_dist_partition') ORDER BY 2 ASC;
-- drop the function after use
DROP FUNCTION master_run_on_worker(worker_name text[], port integer[], command text[],
parallel boolean, OUT node_name text, OUT node_port integer,
OUT success boolean, OUT result text);
-- make sure run_on_all_placements respects shardstate
CREATE TABLE check_placements (key int);
SELECT master_create_distributed_table('check_placements', 'key', 'hash');
SELECT master_create_worker_shards('check_placements', 5, 2);
SELECT * FROM run_command_on_placements('check_placements', 'select 1');
UPDATE pg_dist_shard_placement SET shardstate = 3
WHERE shardid % 2 = 0 AND nodeport = :worker_1_port;
SELECT * FROM run_command_on_placements('check_placements', 'select 1');
DROP TABLE check_placements CASCADE;
-- make sure run_on_all_colocated_placements correctly detects colocation
CREATE TABLE check_colocated (key int);
SELECT master_create_distributed_table('check_colocated', 'key', 'hash');
SELECT master_create_worker_shards('check_colocated', 5, 2);
CREATE TABLE second_table (key int);
SELECT master_create_distributed_table('second_table', 'key', 'hash');
SELECT master_create_worker_shards('second_table', 4, 2);
SELECT * FROM run_command_on_colocated_placements('check_colocated', 'second_table',
'select 1');
-- even when the difference is in replication factor, an error is thrown
SELECT master_drop_all_shards('second_table'::regclass, current_schema(), 'second_table');
SELECT master_create_worker_shards('second_table', 5, 1);
SELECT * FROM run_command_on_colocated_placements('check_colocated', 'second_table',
'select 1');
-- when everything matches, the command is run!
SELECT master_drop_all_shards('second_table'::regclass, current_schema(), 'second_table');
SELECT master_create_worker_shards('second_table', 5, 2);
SELECT * FROM run_command_on_colocated_placements('check_colocated', 'second_table',
'select 1');
-- when a placement is invalid considers the tables to not be colocated
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = (
SELECT shardid FROM pg_dist_shard
WHERE nodeport = :worker_1_port AND logicalrelid = 'second_table'::regclass
ORDER BY 1 ASC LIMIT 1
);
SELECT * FROM run_command_on_colocated_placements('check_colocated', 'second_table',
'select 1');
-- when matching placement is also invalid, considers the tables to be colocated
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = (
SELECT shardid FROM pg_dist_shard
WHERE nodeport = :worker_1_port AND logicalrelid = 'check_colocated'::regclass
ORDER BY 1 ASC LIMIT 1
);
SELECT * FROM run_command_on_colocated_placements('check_colocated', 'second_table',
'select 1');
DROP TABLE check_colocated CASCADE;
DROP TABLE second_table CASCADE;
-- runs on all shards
CREATE TABLE check_shards (key int);
SELECT master_create_distributed_table('check_shards', 'key', 'hash');
SELECT master_create_worker_shards('check_shards', 5, 2);
SELECT * FROM run_command_on_shards('check_shards', 'select 1');
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid % 2 = 0;
SELECT * FROM run_command_on_shards('check_shards', 'select 1');
DROP TABLE check_shards CASCADE;
-- set SHOW_CONTEXT back to default
\set SHOW_CONTEXT errors