diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index fc8c65746..389f5a68b 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -50,6 +50,7 @@ */ #include "postgres.h" +#include "funcapi.h" #include "catalog/pg_type.h" #include "catalog/pg_class.h" @@ -162,7 +163,9 @@ static bool CteReferenceListWalker(Node *node, CteReferenceWalkerContext *contex static bool ContainsReferencesToOuterQuery(Query *query); static bool ContainsReferencesToOuterQueryWalker(Node *node, VarLevelsUpWalkerContext *context); - +static void WrapFunctionsInQuery(Query *query); +static void TransformFunctionRTE(RangeTblEntry *rangeTblEntry); +static bool ShouldTransformRTE(RangeTblEntry *rangeTableEntry); /* * GenerateSubplansForSubqueriesAndCTEs is a wrapper around RecursivelyPlanSubqueriesAndCTEs. @@ -263,6 +266,9 @@ RecursivelyPlanSubqueriesAndCTEs(Query *query, RecursivePlanningContext *context return NULL; } + /* make sure function calls in joins are executed in the coordinator */ + WrapFunctionsInQuery(query); + /* descend into subqueries */ query_tree_walker(query, RecursivelyPlanSubqueryWalker, context, 0); @@ -1305,6 +1311,186 @@ ContainsReferencesToOuterQueryWalker(Node *node, VarLevelsUpWalkerContext *conte } +/* + * WrapFunctionsInQuery iterates over all the immediate Range Table Entries + * of a query and wraps the functions inside (SELECT * FROM fnc() f) subqueries. + * + * We currently wrap only those functions that return a single value. If a + * function returns records or a table we leave it as it is + * */ +static void +WrapFunctionsInQuery(Query *query) +{ + List *rangeTableList = query->rtable; + ListCell *rangeTableCell = NULL; + + /* there needs to be at least two RTEs for a join operation */ + if (list_length(rangeTableList) < 2) + { + return; + } + + foreach(rangeTableCell, rangeTableList) + { + RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); + + if (ShouldTransformRTE(rangeTableEntry)) + { + TransformFunctionRTE(rangeTableEntry); + } + } +} + + +/* + * TransformFunctionRTE wraps a given function RangeTableEntry + * inside a (SELECT * from function() f) subquery. + * + * The said RangeTableEntry is modified and now points to the new subquery. + * */ +static void +TransformFunctionRTE(RangeTblEntry *rangeTblEntry) +{ + Query *subquery = makeNode(Query); + RangeTblRef *newRangeTableRef = makeNode(RangeTblRef); + RangeTblEntry *newRangeTableEntry = NULL; + Var *targetColumn = NULL; + TargetEntry *targetEntry = NULL; + RangeTblFunction *rangeTblFunction = NULL; + int targetColumnIndex = 0; + TupleDesc tupleDesc = NULL; + + rangeTblFunction = linitial(rangeTblEntry->functions); + + subquery->commandType = CMD_SELECT; + + /* copy the input rangeTblEntry to prevent cycles */ + newRangeTableEntry = copyObject(rangeTblEntry); + + /* set the FROM expression to the subquery */ + subquery->rtable = list_make1(newRangeTableEntry); + newRangeTableRef->rtindex = 1; + subquery->jointree = makeFromExpr(list_make1(newRangeTableRef), NULL); + + /* Determine the result type of the function. + * + * If function return type is not composite or rowtype can't be determined, + * tupleDesc is set to null here + */ + tupleDesc = (TupleDesc) get_expr_result_tupdesc(rangeTblFunction->funcexpr, + true); + + /* if tupleDesc is not null, we iterate over all the attributes and + * create targetEntries*/ + if (tupleDesc) + { + for (targetColumnIndex = 0; targetColumnIndex < tupleDesc->natts; + targetColumnIndex++) + { + FormData_pg_attribute *attribute = TupleDescAttr(tupleDesc, + targetColumnIndex); + Oid columnType = attribute->atttypid; + char *columnName = attribute->attname.data; + + /* + * The indexing of attributes and TupleDesc and varattno differ + * + * varattno=0 corresponds to whole row + * varattno=1 corrensponds to first column that is stored in tupDesc->attrs[0] */ + targetColumn = makeVar(1, targetColumnIndex + 1, columnType, -1, InvalidOid, + 0); + targetEntry = makeTargetEntry((Expr *) targetColumn, targetColumnIndex + 1, + columnName, + false); + subquery->targetList = lappend(subquery->targetList, targetEntry); + } + } + /* + * If tupleDesc is NULL we have several cases: + * + * 1. The function returns a record but the attributes can not be + * determined before running the query. In this case the column names and + * types must be defined explicitly in the query + * + * 2. The function returns a non-composite type + * */ + else + { + /* create target entries for all columns returned by the function */ + List *functionColumnNames = NULL; + ListCell *functionColumnName = NULL; + + functionColumnNames = rangeTblEntry->eref->colnames; + foreach(functionColumnName, functionColumnNames) + { + char *columnName = strVal(lfirst(functionColumnName)); + Oid columnType = InvalidOid; + + /* use explicitly defined types in the query if they are available */ + if (list_length(rangeTblFunction->funccoltypes) > 0) + { + columnType = list_nth_oid(rangeTblFunction->funccoltypes, + targetColumnIndex); + } + /* use the types in the function definition otherwise */ + else + { + FuncExpr *funcExpr = (FuncExpr *) rangeTblFunction->funcexpr; + columnType = funcExpr->funcresulttype; + } + + targetColumn = makeVar(1, targetColumnIndex + 1, columnType, -1, + InvalidOid, 0); + targetEntry = makeTargetEntry((Expr *) targetColumn, + targetColumnIndex + 1, columnName, + false); + subquery->targetList = lappend(subquery->targetList, targetEntry); + + targetColumnIndex++; + } + } + + /* replace the function with the constructed subquery */ + rangeTblEntry->rtekind = RTE_SUBQUERY; + rangeTblEntry->subquery = subquery; +} + + +/* + * ShouldTransformRTE determines whether a given RTE should bne wrapped in a + * subquery. + * + * Not all functions can be wrapped in a subquery for now. As we support more + * functions to be used in joins, the constraints here will be relaxed. + * */ +static bool +ShouldTransformRTE(RangeTblEntry *rangeTableEntry) +{ + /* wrap only function rtes */ + if (rangeTableEntry->rtekind != RTE_FUNCTION) + { + return false; + } + + /* + * TODO: remove this check once lateral joins are supported + * We do not support lateral joins on functions for now, as referencing + * columns of an outer query is quite tricky */ + if (rangeTableEntry->lateral) + { + return false; + } + + /* We do not want to wrap read-intermediate-result function calls */ + if (ContainsReadIntermediateResultFunction(linitial(rangeTableEntry->functions))) + { + return false; + } + + return true; +} + + /* * BuildSubPlanResultQuery returns a query of the form: * diff --git a/src/test/regress/expected/multi_function_in_join.out b/src/test/regress/expected/multi_function_in_join.out new file mode 100644 index 000000000..ef65aac82 --- /dev/null +++ b/src/test/regress/expected/multi_function_in_join.out @@ -0,0 +1,227 @@ +-- +-- multi function in join queries aims to test the function calls that are +-- used in joins. +-- +-- These functions are supposed to be executed on the worker and to ensure +-- that we wrap those functions inside (SELECT * FROM fnc()) sub queries. +-- +-- We do not yet support those functions that: +-- - return records +-- - return tables +-- - are user-defined and immutable +CREATE SCHEMA functions_in_joins; +SET search_path TO 'functions_in_joins'; +SET citus.next_shard_id TO 2500000; +CREATE TABLE table1 (id int, data int); +SELECT create_distributed_table('table1','id'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO table1 +SELECT x, x*x +from generate_series(1, 100) as f (x); +-- Verbose messages for observing the subqueries that wrapped function calls +SET client_min_messages TO DEBUG1; +-- Check joins on a sequence +CREATE SEQUENCE numbers; +SELECT * FROM table1 JOIN nextval('numbers') n ON (id = n) ORDER BY id ASC; +DEBUG: generating subplan 2_1 for subquery SELECT n FROM nextval('functions_in_joins.numbers'::regclass) n(n) +DEBUG: Plan 2 query after replacing subqueries and CTEs: SELECT table1.id, table1.data, n.n FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.n FROM read_intermediate_result('2_1'::text, 'binary'::citus_copy_format) intermediate_result(n bigint)) n ON ((table1.id OPERATOR(pg_catalog.=) n.n))) ORDER BY table1.id + id | data | n +----+------+--- + 1 | 1 | 1 +(1 row) + +-- Check joins of a function that returns a single integer +CREATE FUNCTION add(integer, integer) RETURNS integer +AS 'SELECT $1 + $2;' +LANGUAGE SQL; +SELECT * FROM table1 JOIN add(3,5) sum ON (id = sum) ORDER BY id ASC; +DEBUG: generating subplan 3_1 for subquery SELECT sum FROM functions_in_joins.add(3, 5) sum(sum) +DEBUG: Plan 3 query after replacing subqueries and CTEs: SELECT table1.id, table1.data, sum.sum FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.sum FROM read_intermediate_result('3_1'::text, 'binary'::citus_copy_format) intermediate_result(sum integer)) sum ON ((table1.id OPERATOR(pg_catalog.=) sum.sum))) ORDER BY table1.id + id | data | sum +----+------+----- + 8 | 64 | 8 +(1 row) + +-- Check join of plpgsql functions +-- a function returning a single integer +CREATE OR REPLACE FUNCTION increment(i integer) RETURNS integer AS $$ +BEGIN + RETURN i + 1; +END; +$$ LANGUAGE plpgsql; +SELECT * FROM table1 JOIN increment(2) val ON (id = val) ORDER BY id ASC; +DEBUG: generating subplan 4_1 for subquery SELECT val FROM functions_in_joins.increment(2) val(val) +DEBUG: Plan 4 query after replacing subqueries and CTEs: SELECT table1.id, table1.data, val.val FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.val FROM read_intermediate_result('4_1'::text, 'binary'::citus_copy_format) intermediate_result(val integer)) val ON ((table1.id OPERATOR(pg_catalog.=) val.val))) ORDER BY table1.id + id | data | val +----+------+----- + 3 | 9 | 3 +(1 row) + +-- a function that returns a set of integers +CREATE OR REPLACE FUNCTION next_k_integers(IN first_value INTEGER, + IN k INTEGER DEFAULT 3, + OUT result INTEGER) + RETURNS SETOF INTEGER AS $$ +BEGIN + RETURN QUERY SELECT x FROM generate_series(first_value, first_value+k-1) f(x); +END; +$$ LANGUAGE plpgsql; +SELECT * +FROM table1 JOIN next_k_integers(3,2) next_integers ON (id = next_integers.result) +ORDER BY id ASC; +DEBUG: generating subplan 5_1 for subquery SELECT result FROM functions_in_joins.next_k_integers(3, 2) next_integers(result) +DEBUG: Plan 5 query after replacing subqueries and CTEs: SELECT table1.id, table1.data, next_integers.result FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.result FROM read_intermediate_result('5_1'::text, 'binary'::citus_copy_format) intermediate_result(result integer)) next_integers ON ((table1.id OPERATOR(pg_catalog.=) next_integers.result))) ORDER BY table1.id + id | data | result +----+------+-------- + 3 | 9 | 3 + 4 | 16 | 4 +(2 rows) + +-- a function returning set of records +CREATE FUNCTION get_set_of_records() RETURNS SETOF RECORD AS $cmd$ +SELECT x, x+1 FROM generate_series(0,4) f(x) +$cmd$ +LANGUAGE SQL; +SELECT * FROM table1 JOIN get_set_of_records() AS t2(x int, y int) ON (id = x) ORDER BY id ASC; +DEBUG: generating subplan 6_1 for subquery SELECT x, y FROM functions_in_joins.get_set_of_records() t2(x integer, y integer) +DEBUG: Plan 6 query after replacing subqueries and CTEs: SELECT table1.id, table1.data, t2.x, t2.y FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('6_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) t2 ON ((table1.id OPERATOR(pg_catalog.=) t2.x))) ORDER BY table1.id + id | data | x | y +----+------+---+--- + 1 | 1 | 1 | 2 + 2 | 4 | 2 | 3 + 3 | 9 | 3 | 4 + 4 | 16 | 4 | 5 +(4 rows) + +-- a function returning table +CREATE FUNCTION dup(int) RETURNS TABLE(f1 int, f2 text) +AS $$ SELECT $1, CAST($1 AS text) || ' is text' $$ +LANGUAGE SQL; +SELECT f.* FROM table1 t JOIN dup(32) f ON (f1 = id); +DEBUG: generating subplan 7_1 for subquery SELECT f1, f2 FROM functions_in_joins.dup(32) f(f1, f2) +DEBUG: Plan 7 query after replacing subqueries and CTEs: SELECT f.f1, f.f2 FROM (functions_in_joins.table1 t JOIN (SELECT intermediate_result.f1, intermediate_result.f2 FROM read_intermediate_result('7_1'::text, 'binary'::citus_copy_format) intermediate_result(f1 integer, f2 text)) f ON ((f.f1 OPERATOR(pg_catalog.=) t.id))) + f1 | f2 +----+------------ + 32 | 32 is text +(1 row) + +-- a stable function +CREATE OR REPLACE FUNCTION the_minimum_id() + RETURNS INTEGER STABLE AS 'SELECT min(id) FROM table1' LANGUAGE SQL; +SELECT * FROM table1 JOIN the_minimum_id() min_id ON (id = min_id); +DEBUG: generating subplan 8_1 for subquery SELECT min_id FROM functions_in_joins.the_minimum_id() min_id(min_id) +DEBUG: Plan 8 query after replacing subqueries and CTEs: SELECT table1.id, table1.data, min_id.min_id FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.min_id FROM read_intermediate_result('8_1'::text, 'binary'::citus_copy_format) intermediate_result(min_id integer)) min_id ON ((table1.id OPERATOR(pg_catalog.=) min_id.min_id))) + id | data | min_id +----+------+-------- + 1 | 1 | 1 +(1 row) + +-- a built-in immutable function +SELECT * FROM table1 JOIN abs(100) as hundred ON (id = hundred) ORDER BY id ASC; + id | data | hundred +-----+-------+--------- + 100 | 10000 | 100 +(1 row) + +-- function joins inside a CTE +WITH next_row_to_process AS ( + SELECT * FROM table1 JOIN nextval('numbers') n ON (id = n) + ) +SELECT * +FROM table1, next_row_to_process +WHERE table1.data <= next_row_to_process.data +ORDER BY 1,2 ASC; +DEBUG: generating subplan 11_1 for CTE next_row_to_process: SELECT table1.id, table1.data, n.n FROM (functions_in_joins.table1 JOIN nextval('functions_in_joins.numbers'::regclass) n(n) ON ((table1.id OPERATOR(pg_catalog.=) n.n))) +DEBUG: generating subplan 12_1 for subquery SELECT n FROM nextval('functions_in_joins.numbers'::regclass) n(n) +DEBUG: Plan 12 query after replacing subqueries and CTEs: SELECT table1.id, table1.data, n.n FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.n FROM read_intermediate_result('12_1'::text, 'binary'::citus_copy_format) intermediate_result(n bigint)) n ON ((table1.id OPERATOR(pg_catalog.=) n.n))) +DEBUG: Plan 11 query after replacing subqueries and CTEs: SELECT table1.id, table1.data, next_row_to_process.id, next_row_to_process.data, next_row_to_process.n FROM functions_in_joins.table1, (SELECT intermediate_result.id, intermediate_result.data, intermediate_result.n FROM read_intermediate_result('11_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer, data integer, n bigint)) next_row_to_process WHERE (table1.data OPERATOR(pg_catalog.<=) next_row_to_process.data) ORDER BY table1.id, table1.data + id | data | id | data | n +----+------+----+------+--- + 1 | 1 | 2 | 4 | 2 + 2 | 4 | 2 | 4 | 2 +(2 rows) + +-- Multiple functions in an RTE +SELECT * FROM ROWS FROM (next_k_integers(5), next_k_integers(10)) AS f(a, b), + table1 WHERE id = a ORDER BY id ASC; +DEBUG: generating subplan 13_1 for subquery SELECT a, b FROM ROWS FROM(functions_in_joins.next_k_integers(5), functions_in_joins.next_k_integers(10)) f(a, b) +DEBUG: Plan 13 query after replacing subqueries and CTEs: SELECT f.a, f.b, table1.id, table1.data FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('13_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) f(a, b), functions_in_joins.table1 WHERE (table1.id OPERATOR(pg_catalog.=) f.a) ORDER BY table1.id + a | b | id | data +---+----+----+------ + 5 | 10 | 5 | 25 + 6 | 11 | 6 | 36 + 7 | 12 | 7 | 49 +(3 rows) + +-- Custom Type returning function used in a join +CREATE TYPE min_and_max AS ( + minimum INT, + maximum INT +); +CREATE OR REPLACE FUNCTION max_and_min () RETURNS + min_and_max AS $$ +DECLARE + result min_and_max%rowtype; +begin + select into result min(data) as minimum, max(data) as maximum from table1; + return result; +end; +$$ language plpgsql; +SELECT * FROM table1 JOIN max_and_min() m ON (m.maximum = data OR m.minimum = data); +DEBUG: generating subplan 14_1 for subquery SELECT minimum, maximum FROM functions_in_joins.max_and_min() m(minimum, maximum) +DEBUG: Plan 14 query after replacing subqueries and CTEs: SELECT table1.id, table1.data, m.minimum, m.maximum FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.minimum, intermediate_result.maximum FROM read_intermediate_result('14_1'::text, 'binary'::citus_copy_format) intermediate_result(minimum integer, maximum integer)) m ON (((m.maximum OPERATOR(pg_catalog.=) table1.data) OR (m.minimum OPERATOR(pg_catalog.=) table1.data)))) + id | data | minimum | maximum +-----+-------+---------+--------- + 1 | 1 | 1 | 10000 + 100 | 10000 | 1 | 10000 +(2 rows) + +-- The following tests will fail as we do not support all joins on +-- all kinds of functions +SET client_min_messages TO ERROR; +-- function joins in CTE results can create lateral joins that are not supported +SELECT public.raise_failed_execution($cmd$ +WITH one_row AS ( + SELECT * FROM table1 WHERE id=52 + ) +SELECT table1.id, table1.data +FROM one_row, table1, next_k_integers(one_row.id, 5) next_five_ids +WHERE table1.id = next_five_ids; +$cmd$); +ERROR: Task failed to execute +CONTEXT: PL/pgSQL function public.raise_failed_execution(text) line 6 at RAISE +-- a user-defined immutable function +CREATE OR REPLACE FUNCTION the_answer_to_life() + RETURNS INTEGER IMMUTABLE AS 'SELECT 42' LANGUAGE SQL; +SELECT public.raise_failed_execution($cmd$ +SELECT * FROM table1 JOIN the_answer_to_life() the_answer ON (id = the_answer) +$cmd$); +ERROR: Task failed to execute +CONTEXT: PL/pgSQL function public.raise_failed_execution(text) line 6 at RAISE +-- WITH ORDINALITY clause forcing the result type to be RECORD/RECORDs +SELECT * +FROM table1 + JOIN next_k_integers(10,5) WITH ORDINALITY next_integers + ON (id = next_integers.result) +ORDER BY id ASC; +ERROR: attribute 2 of type record has wrong type +DETAIL: Table has type bigint, but query expects integer. +RESET client_min_messages; +DROP SCHEMA functions_in_joins CASCADE; +NOTICE: drop cascades to 11 other objects +DETAIL: drop cascades to table table1 +drop cascades to sequence numbers +drop cascades to function add(integer,integer) +drop cascades to function increment(integer) +drop cascades to function next_k_integers(integer,integer) +drop cascades to function get_set_of_records() +drop cascades to function dup(integer) +drop cascades to function the_minimum_id() +drop cascades to type min_and_max +drop cascades to function max_and_min() +drop cascades to function the_answer_to_life() +SET search_path TO DEFAULT; diff --git a/src/test/regress/expected/multi_subquery_complex_reference_clause.out b/src/test/regress/expected/multi_subquery_complex_reference_clause.out index 20b689a61..7aec16798 100644 --- a/src/test/regress/expected/multi_subquery_complex_reference_clause.out +++ b/src/test/regress/expected/multi_subquery_complex_reference_clause.out @@ -281,10 +281,8 @@ SET client_min_messages TO DEBUG; SELECT count(*) FROM (SELECT random() FROM user_buy_test_table JOIN random() AS users_ref_test_table(id) ON user_buy_test_table.item_id > users_ref_test_table.id) subquery_1; -DEBUG: generating subplan 30_1 for subquery SELECT random() AS random FROM (public.user_buy_test_table JOIN random() users_ref_test_table(id) ON (((user_buy_test_table.item_id)::double precision OPERATOR(pg_catalog.>) users_ref_test_table.id))) -DEBUG: Plan 30 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.random FROM read_intermediate_result('30_1'::text, 'binary'::citus_copy_format) intermediate_result(random double precision)) subquery_1 -DEBUG: Creating router plan -DEBUG: Plan is router executable +DEBUG: generating subplan 30_1 for subquery SELECT id FROM random() users_ref_test_table(id) +DEBUG: Plan 30 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT random() AS random FROM (public.user_buy_test_table JOIN (SELECT intermediate_result.id FROM read_intermediate_result('30_1'::text, 'binary'::citus_copy_format) intermediate_result(id double precision)) users_ref_test_table(id) ON (((user_buy_test_table.item_id)::double precision OPERATOR(pg_catalog.>) users_ref_test_table.id)))) subquery_1 count ------- 4 @@ -295,10 +293,8 @@ SELECT count(*) FROM (SELECT item_id FROM user_buy_test_table JOIN generate_series(random()::int,10) AS users_ref_test_table(id) ON user_buy_test_table.item_id > users_ref_test_table.id) subquery_1 WHERE item_id = 6; -DEBUG: generating subplan 32_1 for subquery SELECT user_buy_test_table.item_id FROM (public.user_buy_test_table JOIN generate_series((random())::integer, 10) users_ref_test_table(id) ON ((user_buy_test_table.item_id OPERATOR(pg_catalog.>) users_ref_test_table.id))) -DEBUG: Plan 32 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.item_id FROM read_intermediate_result('32_1'::text, 'binary'::citus_copy_format) intermediate_result(item_id integer)) subquery_1 WHERE (item_id OPERATOR(pg_catalog.=) 6) -DEBUG: Creating router plan -DEBUG: Plan is router executable +DEBUG: generating subplan 31_1 for subquery SELECT id FROM generate_series((random())::integer, 10) users_ref_test_table(id) +DEBUG: Plan 31 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT user_buy_test_table.item_id FROM (public.user_buy_test_table JOIN (SELECT intermediate_result.id FROM read_intermediate_result('31_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) users_ref_test_table(id) ON ((user_buy_test_table.item_id OPERATOR(pg_catalog.>) users_ref_test_table.id)))) subquery_1 WHERE (item_id OPERATOR(pg_catalog.=) 6) count ------- 0 @@ -309,11 +305,11 @@ SELECT count(*) FROM (SELECT user_id FROM user_buy_test_table UNION ALL SELECT id FROM generate_series(1,10) AS users_ref_test_table(id)) subquery_1; -DEBUG: generating subplan 34_1 for subquery SELECT user_id FROM public.user_buy_test_table +DEBUG: generating subplan 32_1 for subquery SELECT user_id FROM public.user_buy_test_table DEBUG: Creating router plan DEBUG: Plan is router executable -DEBUG: generating subplan 34_2 for subquery SELECT intermediate_result.user_id FROM read_intermediate_result('34_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer) UNION ALL SELECT users_ref_test_table.id FROM generate_series(1, 10) users_ref_test_table(id) -DEBUG: Plan 34 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.user_id FROM read_intermediate_result('34_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) subquery_1 +DEBUG: generating subplan 32_2 for subquery SELECT intermediate_result.user_id FROM read_intermediate_result('32_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer) UNION ALL SELECT users_ref_test_table.id FROM generate_series(1, 10) users_ref_test_table(id) +DEBUG: Plan 32 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.user_id FROM read_intermediate_result('32_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) subquery_1 DEBUG: Creating router plan DEBUG: Plan is router executable count @@ -359,11 +355,11 @@ SELECT count(*) FROM (SELECT user_id FROM user_buy_test_table UNION ALL SELECT id FROM (SELECT 5 AS id) users_ref_test_table) subquery_1; -DEBUG: generating subplan 41_1 for subquery SELECT user_id FROM public.user_buy_test_table +DEBUG: generating subplan 39_1 for subquery SELECT user_id FROM public.user_buy_test_table DEBUG: Creating router plan DEBUG: Plan is router executable -DEBUG: generating subplan 41_2 for subquery SELECT intermediate_result.user_id FROM read_intermediate_result('41_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer) UNION ALL SELECT users_ref_test_table.id FROM (SELECT 5 AS id) users_ref_test_table -DEBUG: Plan 41 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.user_id FROM read_intermediate_result('41_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) subquery_1 +DEBUG: generating subplan 39_2 for subquery SELECT intermediate_result.user_id FROM read_intermediate_result('39_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer) UNION ALL SELECT users_ref_test_table.id FROM (SELECT 5 AS id) users_ref_test_table +DEBUG: Plan 39 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.user_id FROM read_intermediate_result('39_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) subquery_1 DEBUG: Creating router plan DEBUG: Plan is router executable count @@ -379,11 +375,11 @@ SELECT * FROM UNION SELECT user_id FROM user_buy_test_table) sub ORDER BY 1 DESC; -DEBUG: generating subplan 44_1 for subquery SELECT user_id FROM public.user_buy_test_table +DEBUG: generating subplan 42_1 for subquery SELECT user_id FROM public.user_buy_test_table DEBUG: Creating router plan DEBUG: Plan is router executable -DEBUG: generating subplan 44_2 for subquery SELECT users_ref_test_table.id FROM public.users_ref_test_table UNION SELECT intermediate_result.user_id FROM read_intermediate_result('44_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer) -DEBUG: Plan 44 query after replacing subqueries and CTEs: SELECT id FROM (SELECT intermediate_result.id FROM read_intermediate_result('44_2'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) sub ORDER BY id DESC +DEBUG: generating subplan 42_2 for subquery SELECT users_ref_test_table.id FROM public.users_ref_test_table UNION SELECT intermediate_result.user_id FROM read_intermediate_result('42_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer) +DEBUG: Plan 42 query after replacing subqueries and CTEs: SELECT id FROM (SELECT intermediate_result.id FROM read_intermediate_result('42_2'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) sub ORDER BY id DESC DEBUG: Creating router plan DEBUG: Plan is router executable id @@ -403,11 +399,11 @@ SELECT * FROM UNION SELECT user_id, random() * 0 FROM (SELECT user_id FROM user_buy_test_table) sub2) sub ORDER BY 1 DESC; -DEBUG: generating subplan 47_1 for subquery SELECT user_id, (random() OPERATOR(pg_catalog.*) (0)::double precision) FROM (SELECT user_buy_test_table.user_id FROM public.user_buy_test_table) sub2 +DEBUG: generating subplan 45_1 for subquery SELECT user_id, (random() OPERATOR(pg_catalog.*) (0)::double precision) FROM (SELECT user_buy_test_table.user_id FROM public.user_buy_test_table) sub2 DEBUG: Creating router plan DEBUG: Plan is router executable -DEBUG: generating subplan 47_2 for subquery SELECT sub1.id, (random() OPERATOR(pg_catalog.*) (0)::double precision) FROM (SELECT users_ref_test_table.id FROM public.users_ref_test_table) sub1 UNION SELECT intermediate_result.user_id, intermediate_result."?column?" FROM read_intermediate_result('47_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, "?column?" double precision) -DEBUG: Plan 47 query after replacing subqueries and CTEs: SELECT id, "?column?" FROM (SELECT intermediate_result.id, intermediate_result."?column?" FROM read_intermediate_result('47_2'::text, 'binary'::citus_copy_format) intermediate_result(id integer, "?column?" double precision)) sub ORDER BY id DESC +DEBUG: generating subplan 45_2 for subquery SELECT sub1.id, (random() OPERATOR(pg_catalog.*) (0)::double precision) FROM (SELECT users_ref_test_table.id FROM public.users_ref_test_table) sub1 UNION SELECT intermediate_result.user_id, intermediate_result."?column?" FROM read_intermediate_result('45_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, "?column?" double precision) +DEBUG: Plan 45 query after replacing subqueries and CTEs: SELECT id, "?column?" FROM (SELECT intermediate_result.id, intermediate_result."?column?" FROM read_intermediate_result('45_2'::text, 'binary'::citus_copy_format) intermediate_result(id integer, "?column?" double precision)) sub ORDER BY id DESC DEBUG: Creating router plan DEBUG: Plan is router executable id | ?column? @@ -1334,8 +1330,8 @@ SELECT count(*) FROM (SELECT user_buy_test_table.user_id, random() FROM user_buy_test_table LEFT JOIN users_ref_test_table ON user_buy_test_table.user_id > users_ref_test_table.id) subquery_2 WHERE subquery_1.user_id != subquery_2.user_id ; -DEBUG: generating subplan 86_1 for subquery SELECT user_buy_test_table.user_id, random() AS random FROM (public.user_buy_test_table LEFT JOIN public.users_ref_test_table ON ((user_buy_test_table.user_id OPERATOR(pg_catalog.>) users_ref_test_table.id))) -DEBUG: Plan 86 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT user_buy_test_table.user_id, random() AS random FROM (public.user_buy_test_table LEFT JOIN public.users_ref_test_table ON ((user_buy_test_table.item_id OPERATOR(pg_catalog.>) users_ref_test_table.id)))) subquery_1, (SELECT intermediate_result.user_id, intermediate_result.random FROM read_intermediate_result('86_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, random double precision)) subquery_2 WHERE (subquery_1.user_id OPERATOR(pg_catalog.<>) subquery_2.user_id) +DEBUG: generating subplan 84_1 for subquery SELECT user_buy_test_table.user_id, random() AS random FROM (public.user_buy_test_table LEFT JOIN public.users_ref_test_table ON ((user_buy_test_table.user_id OPERATOR(pg_catalog.>) users_ref_test_table.id))) +DEBUG: Plan 84 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT user_buy_test_table.user_id, random() AS random FROM (public.user_buy_test_table LEFT JOIN public.users_ref_test_table ON ((user_buy_test_table.item_id OPERATOR(pg_catalog.>) users_ref_test_table.id)))) subquery_1, (SELECT intermediate_result.user_id, intermediate_result.random FROM read_intermediate_result('84_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, random double precision)) subquery_2 WHERE (subquery_1.user_id OPERATOR(pg_catalog.<>) subquery_2.user_id) count ------- 67 @@ -1380,8 +1376,8 @@ count(*) AS cnt, "generated_group_field" ORDER BY cnt DESC, generated_group_field ASC LIMIT 10; -DEBUG: generating subplan 88_1 for subquery SELECT user_id, value_2 AS generated_group_field FROM public.users_table users -DEBUG: Plan 88 query after replacing subqueries and CTEs: SELECT count(*) AS cnt, generated_group_field FROM (SELECT "eventQuery".user_id, random() AS random, "eventQuery".generated_group_field FROM (SELECT multi_group_wrapper_1."time", multi_group_wrapper_1.event_user_id, multi_group_wrapper_1.user_id, left_group_by_1.generated_group_field, random() AS random FROM ((SELECT temp_data_queries."time", temp_data_queries.event_user_id, user_filters_1.user_id FROM ((SELECT events."time", events.user_id AS event_user_id FROM public.events_table events WHERE (events.user_id OPERATOR(pg_catalog.>) 2)) temp_data_queries JOIN (SELECT users.user_id FROM public.users_reference_table users WHERE ((users.user_id OPERATOR(pg_catalog.>) 2) AND (users.value_2 OPERATOR(pg_catalog.=) 5))) user_filters_1 ON ((temp_data_queries.event_user_id OPERATOR(pg_catalog.<) user_filters_1.user_id)))) multi_group_wrapper_1 RIGHT JOIN (SELECT intermediate_result.user_id, intermediate_result.generated_group_field FROM read_intermediate_result('88_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, generated_group_field integer)) left_group_by_1 ON ((left_group_by_1.user_id OPERATOR(pg_catalog.>) multi_group_wrapper_1.event_user_id)))) "eventQuery") "pushedDownQuery" GROUP BY generated_group_field ORDER BY (count(*)) DESC, generated_group_field LIMIT 10 +DEBUG: generating subplan 86_1 for subquery SELECT user_id, value_2 AS generated_group_field FROM public.users_table users +DEBUG: Plan 86 query after replacing subqueries and CTEs: SELECT count(*) AS cnt, generated_group_field FROM (SELECT "eventQuery".user_id, random() AS random, "eventQuery".generated_group_field FROM (SELECT multi_group_wrapper_1."time", multi_group_wrapper_1.event_user_id, multi_group_wrapper_1.user_id, left_group_by_1.generated_group_field, random() AS random FROM ((SELECT temp_data_queries."time", temp_data_queries.event_user_id, user_filters_1.user_id FROM ((SELECT events."time", events.user_id AS event_user_id FROM public.events_table events WHERE (events.user_id OPERATOR(pg_catalog.>) 2)) temp_data_queries JOIN (SELECT users.user_id FROM public.users_reference_table users WHERE ((users.user_id OPERATOR(pg_catalog.>) 2) AND (users.value_2 OPERATOR(pg_catalog.=) 5))) user_filters_1 ON ((temp_data_queries.event_user_id OPERATOR(pg_catalog.<) user_filters_1.user_id)))) multi_group_wrapper_1 RIGHT JOIN (SELECT intermediate_result.user_id, intermediate_result.generated_group_field FROM read_intermediate_result('86_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, generated_group_field integer)) left_group_by_1 ON ((left_group_by_1.user_id OPERATOR(pg_catalog.>) multi_group_wrapper_1.event_user_id)))) "eventQuery") "pushedDownQuery" GROUP BY generated_group_field ORDER BY (count(*)) DESC, generated_group_field LIMIT 10 ERROR: cannot pushdown the subquery DETAIL: Complex subqueries and CTEs cannot be in the outer part of the outer join RESET client_min_messages; diff --git a/src/test/regress/expected/set_operations.out b/src/test/regress/expected/set_operations.out index 1fde06dae..14d7f6268 100644 --- a/src/test/regress/expected/set_operations.out +++ b/src/test/regress/expected/set_operations.out @@ -509,7 +509,7 @@ DEBUG: generating subplan 100_2 for subquery SELECT x, y FROM recursive_union.t DEBUG: Creating router plan DEBUG: Plan is router executable DEBUG: generating subplan 100_3 for subquery SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('100_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer) EXCEPT SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('100_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer) -DEBUG: Plan 100 query after replacing subqueries and CTEs: SELECT u.x, u.y FROM ((SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('100_3'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) u JOIN generate_series(1, 10) x(x) USING (x)) ORDER BY u.x, u.y +DEBUG: Plan 100 query after replacing subqueries and CTEs: SELECT u.x, u.y FROM ((SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('100_3'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) u JOIN (SELECT x_1.x FROM generate_series(1, 10) x_1(x)) x USING (x)) ORDER BY u.x, u.y DEBUG: Creating router plan DEBUG: Plan is router executable x | y diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index d227bbcf9..9095dabaf 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -64,7 +64,7 @@ test: multi_deparse_shard_query multi_distributed_transaction_id multi_real_time test: multi_explain test: multi_basic_queries multi_complex_expressions multi_subquery multi_subquery_complex_queries multi_subquery_behavioral_analytics test: multi_subquery_complex_reference_clause multi_subquery_window_functions multi_view multi_sql_function multi_prepare_sql -test: sql_procedure +test: sql_procedure multi_function_in_join test: multi_subquery_in_where_reference_clause test: multi_subquery_union multi_subquery_in_where_clause multi_subquery_misc test: multi_agg_distinct multi_agg_approximate_distinct multi_limit_clause_approximate multi_outer_join_reference multi_single_relation_subquery multi_prepare_plsql diff --git a/src/test/regress/sql/multi_function_in_join.sql b/src/test/regress/sql/multi_function_in_join.sql new file mode 100644 index 000000000..86fe2a769 --- /dev/null +++ b/src/test/regress/sql/multi_function_in_join.sql @@ -0,0 +1,144 @@ +-- +-- multi function in join queries aims to test the function calls that are +-- used in joins. +-- +-- These functions are supposed to be executed on the worker and to ensure +-- that we wrap those functions inside (SELECT * FROM fnc()) sub queries. +-- +-- We do not yet support those functions that: +-- - return records +-- - return tables +-- - are user-defined and immutable + +CREATE SCHEMA functions_in_joins; +SET search_path TO 'functions_in_joins'; +SET citus.next_shard_id TO 2500000; + +CREATE TABLE table1 (id int, data int); +SELECT create_distributed_table('table1','id'); + +INSERT INTO table1 +SELECT x, x*x +from generate_series(1, 100) as f (x); + +-- Verbose messages for observing the subqueries that wrapped function calls +SET client_min_messages TO DEBUG1; + +-- Check joins on a sequence +CREATE SEQUENCE numbers; +SELECT * FROM table1 JOIN nextval('numbers') n ON (id = n) ORDER BY id ASC; + +-- Check joins of a function that returns a single integer +CREATE FUNCTION add(integer, integer) RETURNS integer +AS 'SELECT $1 + $2;' +LANGUAGE SQL; +SELECT * FROM table1 JOIN add(3,5) sum ON (id = sum) ORDER BY id ASC; + +-- Check join of plpgsql functions +-- a function returning a single integer +CREATE OR REPLACE FUNCTION increment(i integer) RETURNS integer AS $$ +BEGIN + RETURN i + 1; +END; +$$ LANGUAGE plpgsql; +SELECT * FROM table1 JOIN increment(2) val ON (id = val) ORDER BY id ASC; + +-- a function that returns a set of integers +CREATE OR REPLACE FUNCTION next_k_integers(IN first_value INTEGER, + IN k INTEGER DEFAULT 3, + OUT result INTEGER) + RETURNS SETOF INTEGER AS $$ +BEGIN + RETURN QUERY SELECT x FROM generate_series(first_value, first_value+k-1) f(x); +END; +$$ LANGUAGE plpgsql; +SELECT * +FROM table1 JOIN next_k_integers(3,2) next_integers ON (id = next_integers.result) +ORDER BY id ASC; + +-- a function returning set of records +CREATE FUNCTION get_set_of_records() RETURNS SETOF RECORD AS $cmd$ +SELECT x, x+1 FROM generate_series(0,4) f(x) +$cmd$ +LANGUAGE SQL; +SELECT * FROM table1 JOIN get_set_of_records() AS t2(x int, y int) ON (id = x) ORDER BY id ASC; + +-- a function returning table +CREATE FUNCTION dup(int) RETURNS TABLE(f1 int, f2 text) +AS $$ SELECT $1, CAST($1 AS text) || ' is text' $$ +LANGUAGE SQL; + +SELECT f.* FROM table1 t JOIN dup(32) f ON (f1 = id); + +-- a stable function +CREATE OR REPLACE FUNCTION the_minimum_id() + RETURNS INTEGER STABLE AS 'SELECT min(id) FROM table1' LANGUAGE SQL; +SELECT * FROM table1 JOIN the_minimum_id() min_id ON (id = min_id); + +-- a built-in immutable function +SELECT * FROM table1 JOIN abs(100) as hundred ON (id = hundred) ORDER BY id ASC; + +-- function joins inside a CTE +WITH next_row_to_process AS ( + SELECT * FROM table1 JOIN nextval('numbers') n ON (id = n) + ) +SELECT * +FROM table1, next_row_to_process +WHERE table1.data <= next_row_to_process.data +ORDER BY 1,2 ASC; + +-- Multiple functions in an RTE +SELECT * FROM ROWS FROM (next_k_integers(5), next_k_integers(10)) AS f(a, b), + table1 WHERE id = a ORDER BY id ASC; + + +-- Custom Type returning function used in a join +CREATE TYPE min_and_max AS ( + minimum INT, + maximum INT +); + +CREATE OR REPLACE FUNCTION max_and_min () RETURNS + min_and_max AS $$ +DECLARE + result min_and_max%rowtype; +begin + select into result min(data) as minimum, max(data) as maximum from table1; + return result; +end; +$$ language plpgsql; + +SELECT * FROM table1 JOIN max_and_min() m ON (m.maximum = data OR m.minimum = data); + +-- The following tests will fail as we do not support all joins on +-- all kinds of functions +SET client_min_messages TO ERROR; + +-- function joins in CTE results can create lateral joins that are not supported +SELECT public.raise_failed_execution($cmd$ +WITH one_row AS ( + SELECT * FROM table1 WHERE id=52 + ) +SELECT table1.id, table1.data +FROM one_row, table1, next_k_integers(one_row.id, 5) next_five_ids +WHERE table1.id = next_five_ids; +$cmd$); + + +-- a user-defined immutable function +CREATE OR REPLACE FUNCTION the_answer_to_life() + RETURNS INTEGER IMMUTABLE AS 'SELECT 42' LANGUAGE SQL; +SELECT public.raise_failed_execution($cmd$ +SELECT * FROM table1 JOIN the_answer_to_life() the_answer ON (id = the_answer) +$cmd$); + +-- WITH ORDINALITY clause forcing the result type to be RECORD/RECORDs +SELECT * +FROM table1 + JOIN next_k_integers(10,5) WITH ORDINALITY next_integers + ON (id = next_integers.result) +ORDER BY id ASC; + +RESET client_min_messages; +DROP SCHEMA functions_in_joins CASCADE; +SET search_path TO DEFAULT;