From 4ef18e8737661208fbc2aa876dfe1c3a2ea10a81 Mon Sep 17 00:00:00 2001 From: Burak Velioglu Date: Wed, 16 Feb 2022 13:11:25 +0300 Subject: [PATCH] Address reviews --- src/backend/distributed/commands/function.c | 31 +++++++++++++------ .../regress/expected/function_propagation.out | 6 ++-- src/test/regress/expected/multi_extension.out | 6 ++-- .../expected/multi_function_in_join.out | 6 ++-- .../expected/multi_function_in_join_0.out | 14 ++++++++- .../regress/sql/multi_function_in_join.sql | 2 +- 6 files changed, 43 insertions(+), 22 deletions(-) diff --git a/src/backend/distributed/commands/function.c b/src/backend/distributed/commands/function.c index d08c44972..1158c2574 100644 --- a/src/backend/distributed/commands/function.c +++ b/src/backend/distributed/commands/function.c @@ -1298,16 +1298,29 @@ PostprocessCreateFunctionStmt(Node *node, const char *queryString) &functionAddress); if (undistributableDependency != NULL) { - RangeVar *functionRangeVar = makeRangeVarFromNameList(stmt->funcname); - char *functionName = functionRangeVar->relname; - char *dependentRelationName = get_rel_name(undistributableDependency->objectId); + if (SupportedDependencyByCitus(undistributableDependency)) + { + RangeVar *functionRangeVar = makeRangeVarFromNameList(stmt->funcname); + char *functionName = functionRangeVar->relname; + char *dependentRelationName = + get_rel_name(undistributableDependency->objectId); + + ereport(WARNING, (errmsg("Citus can't distribute function \"%s\" having " + "dependency on non-distributed relation \"%s\"", + functionName, dependentRelationName), + errdetail("Function will be created only locally"), + errhint("To distribute function, distribute dependent " + "relations first. Then, re-create the function"))); + } + else + { + ereport(WARNING, (errmsg("Citus can't distribute functions having " + "dependency on unsupported relation with relkind %c", + get_rel_relkind( + undistributableDependency->objectId)), + errdetail("Function will be created only locally"))); + } - ereport(WARNING, (errmsg("Citus can't distribute function \"%s\" having " - "dependency on non-distributed relation \"%s\"", - functionName, dependentRelationName), - errdetail("Function will be created only locally"), - errhint("To distribute function, distribute dependent relations" - " first. Then, re-create the function"))); return NIL; } diff --git a/src/test/regress/expected/function_propagation.out b/src/test/regress/expected/function_propagation.out index 41f70e4fa..238685c64 100644 --- a/src/test/regress/expected/function_propagation.out +++ b/src/test/regress/expected/function_propagation.out @@ -186,9 +186,8 @@ BEGIN return 1; END; $$; -WARNING: Citus can't distribute function "func_7" having dependency on non-distributed relation "function_prop_view" +WARNING: Citus can't distribute functions having dependency on unsupported relation with relkind v DETAIL: Function will be created only locally -HINT: To distribute function, distribute dependent relations first. Then, re-create the function CREATE OR REPLACE FUNCTION func_8(param_1 int) RETURNS function_prop_view LANGUAGE plpgsql AS @@ -197,9 +196,8 @@ BEGIN return 1; END; $$; -WARNING: Citus can't distribute function "func_8" having dependency on non-distributed relation "function_prop_view" +WARNING: Citus can't distribute functions having dependency on unsupported relation with relkind v DETAIL: Function will be created only locally -HINT: To distribute function, distribute dependent relations first. Then, re-create the function -- Check within transaction BEGIN; CREATE TYPE type_in_transaction AS (a int, b int); diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index db4a54775..946c33b93 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -77,9 +77,8 @@ END $func$ LANGUAGE plpgsql; CREATE SCHEMA test; :create_function_test_maintenance_worker -WARNING: Citus can't distribute function "maintenance_worker" having dependency on non-distributed relation "pg_stat_activity" +WARNING: Citus can't distribute functions having dependency on unsupported relation with relkind v DETAIL: Function will be created only locally -HINT: To distribute function, distribute dependent relations first. Then, re-create the function -- check maintenance daemon is started SELECT datname, current_database(), usename, (SELECT extowner::regrole::text FROM pg_extension WHERE extname = 'citus') @@ -1200,9 +1199,8 @@ HINT: You can manually create a database and its extensions on workers. CREATE EXTENSION citus; CREATE SCHEMA test; :create_function_test_maintenance_worker -WARNING: Citus can't distribute function "maintenance_worker" having dependency on non-distributed relation "pg_stat_activity" +WARNING: Citus can't distribute functions having dependency on unsupported relation with relkind v DETAIL: Function will be created only locally -HINT: To distribute function, distribute dependent relations first. Then, re-create the function -- see that the daemon started SELECT datname, current_database(), usename, (SELECT extowner::regrole::text FROM pg_extension WHERE extname = 'citus') diff --git a/src/test/regress/expected/multi_function_in_join.out b/src/test/regress/expected/multi_function_in_join.out index f4fedeed1..7d62e286b 100644 --- a/src/test/regress/expected/multi_function_in_join.out +++ b/src/test/regress/expected/multi_function_in_join.out @@ -66,6 +66,8 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, ta (1 row) -- a function that returns a set of integers +-- Block distributing function as we have tests below to test it locally +SET citus.enable_metadata_sync TO OFF; CREATE OR REPLACE FUNCTION next_k_integers(IN first_value INTEGER, IN k INTEGER DEFAULT 3, OUT result INTEGER) @@ -74,12 +76,10 @@ BEGIN RETURN QUERY SELECT x FROM generate_series(first_value, first_value+k-1) f(x); END; $$ LANGUAGE plpgsql; -DEBUG: switching to sequential query execution mode -DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands +RESET citus.enable_metadata_sync; SELECT * FROM table1 JOIN next_k_integers(3,2) next_integers ON (id = next_integers.result) ORDER BY id ASC; -DEBUG: function does not have co-located tables DEBUG: generating subplan XXX_1 for subquery SELECT result FROM functions_in_joins.next_k_integers(3, 2) next_integers(result) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, table1.data, next_integers.result FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.result FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(result integer)) next_integers ON ((table1.id OPERATOR(pg_catalog.=) next_integers.result))) ORDER BY table1.id id | data | result diff --git a/src/test/regress/expected/multi_function_in_join_0.out b/src/test/regress/expected/multi_function_in_join_0.out index 4c3eda2b0..5f2bd70c7 100644 --- a/src/test/regress/expected/multi_function_in_join_0.out +++ b/src/test/regress/expected/multi_function_in_join_0.out @@ -78,7 +78,7 @@ BEGIN RETURN QUERY SELECT x FROM generate_series(first_value, first_value+k-1) f(x); END; $$ LANGUAGE plpgsql; -SET citus.enable_metadata_sync TO OFF; +RESET citus.enable_metadata_sync; SELECT * FROM table1 JOIN next_k_integers(3,2) next_integers ON (id = next_integers.result) ORDER BY id ASC; @@ -95,7 +95,10 @@ CREATE FUNCTION get_set_of_records() RETURNS SETOF RECORD AS $cmd$ SELECT x, x+1 FROM generate_series(0,4) f(x) $cmd$ LANGUAGE SQL; +DEBUG: switching to sequential query execution mode +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands SELECT * FROM table1 JOIN get_set_of_records() AS t2(x int, y int) ON (id = x) ORDER BY id ASC; +DEBUG: function does not have co-located tables DEBUG: generating subplan XXX_1 for subquery SELECT x, y FROM functions_in_joins.get_set_of_records() t2(x integer, y integer) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, table1.data, t2.x, t2.y FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) t2 ON ((table1.id OPERATOR(pg_catalog.=) t2.x))) ORDER BY table1.id id | data | x | y @@ -110,7 +113,10 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, ta CREATE FUNCTION dup(int) RETURNS TABLE(f1 int, f2 text) AS $$ SELECT $1, CAST($1 AS text) || ' is text' $$ LANGUAGE SQL; +DEBUG: switching to sequential query execution mode +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands SELECT f.* FROM table1 t JOIN dup(32) f ON (f1 = id); +DEBUG: function does not have co-located tables DEBUG: generating subplan XXX_1 for subquery SELECT f1, f2 FROM functions_in_joins.dup(32) f(f1, f2) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT f.f1, f.f2 FROM (functions_in_joins.table1 t JOIN (SELECT intermediate_result.f1, intermediate_result.f2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(f1 integer, f2 text)) f ON ((f.f1 OPERATOR(pg_catalog.=) t.id))) f1 | f2 @@ -121,7 +127,10 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT f.f1, f.f2 FR -- a stable function CREATE OR REPLACE FUNCTION the_minimum_id() RETURNS INTEGER STABLE AS 'SELECT min(id) FROM table1' LANGUAGE SQL; +DEBUG: switching to sequential query execution mode +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands SELECT * FROM table1 JOIN the_minimum_id() min_id ON (id = min_id); +DEBUG: function does not have co-located tables DEBUG: generating subplan XXX_1 for subquery SELECT min_id FROM functions_in_joins.the_minimum_id() min_id(min_id) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, table1.data, min_id.min_id FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.min_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(min_id integer)) min_id ON ((table1.id OPERATOR(pg_catalog.=) min_id.min_id))) id | data | min_id @@ -182,7 +191,10 @@ begin return result; end; $$ language plpgsql; +DEBUG: switching to sequential query execution mode +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands SELECT * FROM table1 JOIN max_and_min() m ON (m.maximum = data OR m.minimum = data) ORDER BY 1,2,3,4; +DEBUG: function does not have co-located tables DEBUG: generating subplan XXX_1 for subquery SELECT minimum, maximum FROM functions_in_joins.max_and_min() m(minimum, maximum) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, table1.data, m.minimum, m.maximum FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.minimum, intermediate_result.maximum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(minimum integer, maximum integer)) m ON (((m.maximum OPERATOR(pg_catalog.=) table1.data) OR (m.minimum OPERATOR(pg_catalog.=) table1.data)))) ORDER BY table1.id, table1.data, m.minimum, m.maximum id | data | minimum | maximum diff --git a/src/test/regress/sql/multi_function_in_join.sql b/src/test/regress/sql/multi_function_in_join.sql index 4b654e7fb..1e2844f4e 100644 --- a/src/test/regress/sql/multi_function_in_join.sql +++ b/src/test/regress/sql/multi_function_in_join.sql @@ -55,7 +55,7 @@ BEGIN RETURN QUERY SELECT x FROM generate_series(first_value, first_value+k-1) f(x); END; $$ LANGUAGE plpgsql; -SET citus.enable_metadata_sync TO OFF; +RESET citus.enable_metadata_sync; SELECT * FROM table1 JOIN next_k_integers(3,2) next_integers ON (id = next_integers.result) ORDER BY id ASC;