Merge pull request #2572 from citusdata/execute-functions-on-coordinator

Wrap function calls in joins inside subqueries
pull/2521/head
Hanefi Onaldi 2019-02-04 23:27:04 +03:00 committed by GitHub
commit d6767ad521
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 661 additions and 27 deletions

View File

@ -50,6 +50,7 @@
*/
#include "postgres.h"
#include "funcapi.h"
#include "catalog/pg_type.h"
#include "catalog/pg_class.h"
@ -66,6 +67,7 @@
#include "distributed/query_colocation_checker.h"
#include "distributed/recursive_planning.h"
#include "distributed/relation_restriction_equivalence.h"
#include "distributed/version_compat.h"
#include "lib/stringinfo.h"
#include "optimizer/planner.h"
#include "optimizer/prep.h"
@ -162,7 +164,9 @@ static bool CteReferenceListWalker(Node *node, CteReferenceWalkerContext *contex
static bool ContainsReferencesToOuterQuery(Query *query);
static bool ContainsReferencesToOuterQueryWalker(Node *node,
VarLevelsUpWalkerContext *context);
static void WrapFunctionsInSubqueries(Query *query);
static void TransformFunctionRTE(RangeTblEntry *rangeTblEntry);
static bool ShouldTransformRTE(RangeTblEntry *rangeTableEntry);
/*
* GenerateSubplansForSubqueriesAndCTEs is a wrapper around RecursivelyPlanSubqueriesAndCTEs.
@ -263,6 +267,9 @@ RecursivelyPlanSubqueriesAndCTEs(Query *query, RecursivePlanningContext *context
return NULL;
}
/* make sure function calls in joins are executed in the coordinator */
WrapFunctionsInSubqueries(query);
/* descend into subqueries */
query_tree_walker(query, RecursivelyPlanSubqueryWalker, context, 0);
@ -1305,6 +1312,234 @@ ContainsReferencesToOuterQueryWalker(Node *node, VarLevelsUpWalkerContext *conte
}
/*
* WrapFunctionsInSubqueries iterates over all the immediate Range Table Entries
* of a query and wraps the functions inside (SELECT * FROM fnc() f)
* subqueries, so that those functions will be executed on the coordinator if
* necessary.
*
* We wrap all the functions that are used in joins except the ones that are
* laterally joined or have WITH ORDINALITY clauses.
* */
static void
WrapFunctionsInSubqueries(Query *query)
{
List *rangeTableList = query->rtable;
ListCell *rangeTableCell = NULL;
/*
* If we have only one function call in a query without any joins, we can
* easily decide where to execute it.
*
* If there are some subqueries and/or functions that are joined with a
* function, it is not trivial to decide whether we should run this
* function in the coordinator or in workers and therefore we may need to
* wrap some of those functions in subqueries.
*
* If we have only one RTE, we leave the parsed query tree as it is. This
* also makes sure we do not wrap an already wrapped function call
* because we know that there will always be 1 RTE in a wrapped function.
* */
if (list_length(rangeTableList) < 2)
{
return;
}
/* iterate over all RTEs and wrap them if necessary */
foreach(rangeTableCell, rangeTableList)
{
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
if (ShouldTransformRTE(rangeTableEntry))
{
TransformFunctionRTE(rangeTableEntry);
}
}
}
/*
* TransformFunctionRTE wraps a given function RangeTableEntry
* inside a (SELECT * from function() f) subquery.
*
* The said RangeTableEntry is modified and now points to the new subquery.
* */
static void
TransformFunctionRTE(RangeTblEntry *rangeTblEntry)
{
Query *subquery = makeNode(Query);
RangeTblRef *newRangeTableRef = makeNode(RangeTblRef);
RangeTblEntry *newRangeTableEntry = NULL;
Var *targetColumn = NULL;
TargetEntry *targetEntry = NULL;
RangeTblFunction *rangeTblFunction = NULL;
AttrNumber targetColumnIndex = 0;
TupleDesc tupleDesc = NULL;
rangeTblFunction = linitial(rangeTblEntry->functions);
subquery->commandType = CMD_SELECT;
/* copy the input rangeTblEntry to prevent cycles */
newRangeTableEntry = copyObject(rangeTblEntry);
/* set the FROM expression to the subquery */
subquery->rtable = list_make1(newRangeTableEntry);
newRangeTableRef->rtindex = 1;
subquery->jointree = makeFromExpr(list_make1(newRangeTableRef), NULL);
/* Determine the result type of the function.
*
* If function return type is not composite or rowtype can't be determined,
* tupleDesc is set to null here
*/
tupleDesc = (TupleDesc) get_expr_result_tupdesc(rangeTblFunction->funcexpr,
true);
/*
* If tupleDesc is not null, we iterate over all the attributes and
* create targetEntries
* */
if (tupleDesc)
{
/*
* A sample function join that end up here:
*
* CREATE FUNCTION f(..) RETURNS TABLE(c1 int, c2 text) AS .. ;
* SELECT .. FROM table JOIN f(..) ON ( .. ) ;
*
* We will iterate over Tuple Description attributes. i.e (c1 int, c2 text)
*/
for (targetColumnIndex = 0; targetColumnIndex < tupleDesc->natts;
targetColumnIndex++)
{
FormData_pg_attribute *attribute = TupleDescAttr(tupleDesc,
targetColumnIndex);
Oid columnType = attribute->atttypid;
char *columnName = attribute->attname.data;
/*
* The indexing of attributes and TupleDesc and varattno differ
*
* varattno=0 corresponds to whole row
* varattno=1 corresponds to first column that is stored in tupDesc->attrs[0]
*
* That's why we need to add one to the targetColumnIndex
* */
targetColumn = makeVar(1, targetColumnIndex + 1, columnType, -1, InvalidOid,
0);
targetEntry = makeTargetEntry((Expr *) targetColumn, targetColumnIndex + 1,
columnName, false);
subquery->targetList = lappend(subquery->targetList, targetEntry);
}
}
/*
* If tupleDesc is NULL we have 2 different cases:
*
* 1. The function returns a record but the attributes can not be
* determined just by looking at the function definition. In this case the
* column names and types must be defined explicitly in the query
*
* 2. The function returns a non-composite type (e.g. int, text, jsonb ..)
* */
else
{
/* create target entries for all columns returned by the function */
List *functionColumnNames = NULL;
ListCell *functionColumnName = NULL;
functionColumnNames = rangeTblEntry->eref->colnames;
foreach(functionColumnName, functionColumnNames)
{
char *columnName = strVal(lfirst(functionColumnName));
Oid columnType = InvalidOid;
/*
* If the function returns a set of records, the query needs
* to explicitly name column names and types
*
* Use explicitly defined types in the query if they are
* available
* */
if (list_length(rangeTblFunction->funccoltypes) > 0)
{
/*
* A sample function join that end up here:
*
* CREATE FUNCTION get_set_of_records() RETURNS SETOF RECORD AS
* $cmd$
* SELECT x, x+1 FROM generate_series(0,4) f(x)
* $cmd$
* LANGUAGE SQL;
*
* SELECT *
* FROM table1 JOIN get_set_of_records() AS t2(x int, y int)
* ON (id = x);
*
* Note that the function definition does not have column
* names and types. Therefore the user needs to explicitly
* state them in the query
* */
columnType = list_nth_oid(rangeTblFunction->funccoltypes,
targetColumnIndex);
}
/* use the types in the function definition otherwise */
else
{
/*
* Only functions returning simple types end up here.
* A sample function:
*
* CREATE FUNCTION add(integer, integer) RETURNS integer AS
* 'SELECT $1 + $2;'
* LANGUAGE SQL;
* SELECT * FROM table JOIN add(3,5) sum ON ( .. ) ;
* */
FuncExpr *funcExpr = (FuncExpr *) rangeTblFunction->funcexpr;
columnType = funcExpr->funcresulttype;
}
/* Note that the column k is associated with varattno/resno of k+1 */
targetColumn = makeVar(1, targetColumnIndex + 1, columnType, -1,
InvalidOid, 0);
targetEntry = makeTargetEntry((Expr *) targetColumn,
targetColumnIndex + 1, columnName, false);
subquery->targetList = lappend(subquery->targetList, targetEntry);
targetColumnIndex++;
}
}
/* replace the function with the constructed subquery */
rangeTblEntry->rtekind = RTE_SUBQUERY;
rangeTblEntry->subquery = subquery;
}
/*
* ShouldTransformRTE determines whether a given RTE should bne wrapped in a
* subquery.
*
* Not all functions should be wrapped in a subquery for now. As we support more
* functions to be used in joins, the constraints here will be relaxed.
* */
static bool
ShouldTransformRTE(RangeTblEntry *rangeTableEntry)
{
/*
* We should wrap only function rtes that are not LATERAL and
* without WITH CARDINALITY clause
* */
if (rangeTableEntry->rtekind != RTE_FUNCTION ||
rangeTableEntry->lateral ||
rangeTableEntry->funcordinality)
{
return false;
}
return true;
}
/*
* BuildSubPlanResultQuery returns a query of the form:
*

View File

@ -23,6 +23,7 @@
#include "optimizer/prep.h"
#include "postmaster/bgworker.h"
#include "utils/memutils.h"
#include "funcapi.h"
/* PostgreSQL 11 splits hash procs into "standard" and "extended" */
#define HASHSTANDARD_PROC HASHPROC
@ -134,6 +135,33 @@ canonicalize_qual_compat(Expr *qual, bool is_check)
}
/*
* A convenient wrapper around get_expr_result_type() that is added on PG11
*
* Note that this function ignores the second parameter and behaves
* slightly differently than the PG11 version.
*
* 1. The original function throws errors if noError flag is not set, we ignore
* this flag here and return NULL in that case
* 2. TYPEFUNC_COMPOSITE_DOMAIN is introduced in PG11, and references to this
* macro is removed
* */
static inline TupleDesc
get_expr_result_tupdesc(Node *expr, bool noError)
{
TupleDesc tupleDesc;
TypeFuncClass functypclass;
functypclass = get_expr_result_type(expr, NULL, &tupleDesc);
if (functypclass == TYPEFUNC_COMPOSITE)
{
return tupleDesc;
}
return NULL;
}
#endif
#if (PG_VERSION_NUM >= 110000)

View File

@ -0,0 +1,229 @@
--
-- multi function in join queries aims to test the function calls that are
-- used in joins.
--
-- These functions are supposed to be executed on the worker and to ensure
-- that we wrap those functions inside (SELECT * FROM fnc()) sub queries.
--
-- We do not yet support those functions that:
-- - have lateral joins
-- - have WITH ORDINALITY clause
-- - are user-defined and immutable
CREATE SCHEMA functions_in_joins;
SET search_path TO 'functions_in_joins';
SET citus.next_shard_id TO 2500000;
CREATE TABLE table1 (id int, data int);
SELECT create_distributed_table('table1','id');
create_distributed_table
--------------------------
(1 row)
INSERT INTO table1
SELECT x, x*x
from generate_series(1, 100) as f (x);
-- Verbose messages for observing the subqueries that wrapped function calls
SET client_min_messages TO DEBUG1;
-- Check joins on a sequence
CREATE SEQUENCE numbers;
SELECT * FROM table1 JOIN nextval('numbers') n ON (id = n) ORDER BY id ASC;
DEBUG: generating subplan 2_1 for subquery SELECT n FROM nextval('functions_in_joins.numbers'::regclass) n(n)
DEBUG: Plan 2 query after replacing subqueries and CTEs: SELECT table1.id, table1.data, n.n FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.n FROM read_intermediate_result('2_1'::text, 'binary'::citus_copy_format) intermediate_result(n bigint)) n ON ((table1.id OPERATOR(pg_catalog.=) n.n))) ORDER BY table1.id
id | data | n
----+------+---
1 | 1 | 1
(1 row)
-- Check joins of a function that returns a single integer
CREATE FUNCTION add(integer, integer) RETURNS integer
AS 'SELECT $1 + $2;'
LANGUAGE SQL;
SELECT * FROM table1 JOIN add(3,5) sum ON (id = sum) ORDER BY id ASC;
DEBUG: generating subplan 3_1 for subquery SELECT sum FROM functions_in_joins.add(3, 5) sum(sum)
DEBUG: Plan 3 query after replacing subqueries and CTEs: SELECT table1.id, table1.data, sum.sum FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.sum FROM read_intermediate_result('3_1'::text, 'binary'::citus_copy_format) intermediate_result(sum integer)) sum ON ((table1.id OPERATOR(pg_catalog.=) sum.sum))) ORDER BY table1.id
id | data | sum
----+------+-----
8 | 64 | 8
(1 row)
-- Check join of plpgsql functions
-- a function returning a single integer
CREATE OR REPLACE FUNCTION increment(i integer) RETURNS integer AS $$
BEGIN
RETURN i + 1;
END;
$$ LANGUAGE plpgsql;
SELECT * FROM table1 JOIN increment(2) val ON (id = val) ORDER BY id ASC;
DEBUG: generating subplan 4_1 for subquery SELECT val FROM functions_in_joins.increment(2) val(val)
DEBUG: Plan 4 query after replacing subqueries and CTEs: SELECT table1.id, table1.data, val.val FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.val FROM read_intermediate_result('4_1'::text, 'binary'::citus_copy_format) intermediate_result(val integer)) val ON ((table1.id OPERATOR(pg_catalog.=) val.val))) ORDER BY table1.id
id | data | val
----+------+-----
3 | 9 | 3
(1 row)
-- a function that returns a set of integers
CREATE OR REPLACE FUNCTION next_k_integers(IN first_value INTEGER,
IN k INTEGER DEFAULT 3,
OUT result INTEGER)
RETURNS SETOF INTEGER AS $$
BEGIN
RETURN QUERY SELECT x FROM generate_series(first_value, first_value+k-1) f(x);
END;
$$ LANGUAGE plpgsql;
SELECT *
FROM table1 JOIN next_k_integers(3,2) next_integers ON (id = next_integers.result)
ORDER BY id ASC;
DEBUG: generating subplan 5_1 for subquery SELECT result FROM functions_in_joins.next_k_integers(3, 2) next_integers(result)
DEBUG: Plan 5 query after replacing subqueries and CTEs: SELECT table1.id, table1.data, next_integers.result FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.result FROM read_intermediate_result('5_1'::text, 'binary'::citus_copy_format) intermediate_result(result integer)) next_integers ON ((table1.id OPERATOR(pg_catalog.=) next_integers.result))) ORDER BY table1.id
id | data | result
----+------+--------
3 | 9 | 3
4 | 16 | 4
(2 rows)
-- a function returning set of records
CREATE FUNCTION get_set_of_records() RETURNS SETOF RECORD AS $cmd$
SELECT x, x+1 FROM generate_series(0,4) f(x)
$cmd$
LANGUAGE SQL;
SELECT * FROM table1 JOIN get_set_of_records() AS t2(x int, y int) ON (id = x) ORDER BY id ASC;
DEBUG: generating subplan 6_1 for subquery SELECT x, y FROM functions_in_joins.get_set_of_records() t2(x integer, y integer)
DEBUG: Plan 6 query after replacing subqueries and CTEs: SELECT table1.id, table1.data, t2.x, t2.y FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('6_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) t2 ON ((table1.id OPERATOR(pg_catalog.=) t2.x))) ORDER BY table1.id
id | data | x | y
----+------+---+---
1 | 1 | 1 | 2
2 | 4 | 2 | 3
3 | 9 | 3 | 4
4 | 16 | 4 | 5
(4 rows)
-- a function returning table
CREATE FUNCTION dup(int) RETURNS TABLE(f1 int, f2 text)
AS $$ SELECT $1, CAST($1 AS text) || ' is text' $$
LANGUAGE SQL;
SELECT f.* FROM table1 t JOIN dup(32) f ON (f1 = id);
DEBUG: generating subplan 7_1 for subquery SELECT f1, f2 FROM functions_in_joins.dup(32) f(f1, f2)
DEBUG: Plan 7 query after replacing subqueries and CTEs: SELECT f.f1, f.f2 FROM (functions_in_joins.table1 t JOIN (SELECT intermediate_result.f1, intermediate_result.f2 FROM read_intermediate_result('7_1'::text, 'binary'::citus_copy_format) intermediate_result(f1 integer, f2 text)) f ON ((f.f1 OPERATOR(pg_catalog.=) t.id)))
f1 | f2
----+------------
32 | 32 is text
(1 row)
-- a stable function
CREATE OR REPLACE FUNCTION the_minimum_id()
RETURNS INTEGER STABLE AS 'SELECT min(id) FROM table1' LANGUAGE SQL;
SELECT * FROM table1 JOIN the_minimum_id() min_id ON (id = min_id);
DEBUG: generating subplan 8_1 for subquery SELECT min_id FROM functions_in_joins.the_minimum_id() min_id(min_id)
DEBUG: Plan 8 query after replacing subqueries and CTEs: SELECT table1.id, table1.data, min_id.min_id FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.min_id FROM read_intermediate_result('8_1'::text, 'binary'::citus_copy_format) intermediate_result(min_id integer)) min_id ON ((table1.id OPERATOR(pg_catalog.=) min_id.min_id)))
id | data | min_id
----+------+--------
1 | 1 | 1
(1 row)
-- a built-in immutable function
SELECT * FROM table1 JOIN abs(100) as hundred ON (id = hundred) ORDER BY id ASC;
id | data | hundred
-----+-------+---------
100 | 10000 | 100
(1 row)
-- function joins inside a CTE
WITH next_row_to_process AS (
SELECT * FROM table1 JOIN nextval('numbers') n ON (id = n)
)
SELECT *
FROM table1, next_row_to_process
WHERE table1.data <= next_row_to_process.data
ORDER BY 1,2 ASC;
DEBUG: generating subplan 11_1 for CTE next_row_to_process: SELECT table1.id, table1.data, n.n FROM (functions_in_joins.table1 JOIN nextval('functions_in_joins.numbers'::regclass) n(n) ON ((table1.id OPERATOR(pg_catalog.=) n.n)))
DEBUG: generating subplan 12_1 for subquery SELECT n FROM nextval('functions_in_joins.numbers'::regclass) n(n)
DEBUG: Plan 12 query after replacing subqueries and CTEs: SELECT table1.id, table1.data, n.n FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.n FROM read_intermediate_result('12_1'::text, 'binary'::citus_copy_format) intermediate_result(n bigint)) n ON ((table1.id OPERATOR(pg_catalog.=) n.n)))
DEBUG: Plan 11 query after replacing subqueries and CTEs: SELECT table1.id, table1.data, next_row_to_process.id, next_row_to_process.data, next_row_to_process.n FROM functions_in_joins.table1, (SELECT intermediate_result.id, intermediate_result.data, intermediate_result.n FROM read_intermediate_result('11_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer, data integer, n bigint)) next_row_to_process WHERE (table1.data OPERATOR(pg_catalog.<=) next_row_to_process.data) ORDER BY table1.id, table1.data
id | data | id | data | n
----+------+----+------+---
1 | 1 | 2 | 4 | 2
2 | 4 | 2 | 4 | 2
(2 rows)
-- Multiple functions in an RTE
SELECT * FROM ROWS FROM (next_k_integers(5), next_k_integers(10)) AS f(a, b),
table1 WHERE id = a ORDER BY id ASC;
DEBUG: generating subplan 13_1 for subquery SELECT a, b FROM ROWS FROM(functions_in_joins.next_k_integers(5), functions_in_joins.next_k_integers(10)) f(a, b)
DEBUG: Plan 13 query after replacing subqueries and CTEs: SELECT f.a, f.b, table1.id, table1.data FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('13_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) f(a, b), functions_in_joins.table1 WHERE (table1.id OPERATOR(pg_catalog.=) f.a) ORDER BY table1.id
a | b | id | data
---+----+----+------
5 | 10 | 5 | 25
6 | 11 | 6 | 36
7 | 12 | 7 | 49
(3 rows)
-- Custom Type returning function used in a join
CREATE TYPE min_and_max AS (
minimum INT,
maximum INT
);
CREATE OR REPLACE FUNCTION max_and_min () RETURNS
min_and_max AS $$
DECLARE
result min_and_max%rowtype;
begin
select into result min(data) as minimum, max(data) as maximum from table1;
return result;
end;
$$ language plpgsql;
SELECT * FROM table1 JOIN max_and_min() m ON (m.maximum = data OR m.minimum = data);
DEBUG: generating subplan 14_1 for subquery SELECT minimum, maximum FROM functions_in_joins.max_and_min() m(minimum, maximum)
DEBUG: Plan 14 query after replacing subqueries and CTEs: SELECT table1.id, table1.data, m.minimum, m.maximum FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.minimum, intermediate_result.maximum FROM read_intermediate_result('14_1'::text, 'binary'::citus_copy_format) intermediate_result(minimum integer, maximum integer)) m ON (((m.maximum OPERATOR(pg_catalog.=) table1.data) OR (m.minimum OPERATOR(pg_catalog.=) table1.data))))
id | data | minimum | maximum
-----+-------+---------+---------
1 | 1 | 1 | 10000
100 | 10000 | 1 | 10000
(2 rows)
-- The following tests will fail as we do not support all joins on
-- all kinds of functions
SET client_min_messages TO ERROR;
-- function joins in CTE results can create lateral joins that are not supported
SELECT public.raise_failed_execution($cmd$
WITH one_row AS (
SELECT * FROM table1 WHERE id=52
)
SELECT table1.id, table1.data
FROM one_row, table1, next_k_integers(one_row.id, 5) next_five_ids
WHERE table1.id = next_five_ids;
$cmd$);
ERROR: Task failed to execute
CONTEXT: PL/pgSQL function public.raise_failed_execution(text) line 6 at RAISE
-- a user-defined immutable function
CREATE OR REPLACE FUNCTION the_answer_to_life()
RETURNS INTEGER IMMUTABLE AS 'SELECT 42' LANGUAGE SQL;
SELECT public.raise_failed_execution($cmd$
SELECT * FROM table1 JOIN the_answer_to_life() the_answer ON (id = the_answer)
$cmd$);
ERROR: Task failed to execute
CONTEXT: PL/pgSQL function public.raise_failed_execution(text) line 6 at RAISE
-- WITH ORDINALITY clause
SELECT public.raise_failed_execution($cmd$
SELECT *
FROM table1
JOIN next_k_integers(10,5) WITH ORDINALITY next_integers
ON (id = next_integers.result)
ORDER BY id ASC;
$cmd$);
ERROR: Task failed to execute
CONTEXT: PL/pgSQL function public.raise_failed_execution(text) line 6 at RAISE
RESET client_min_messages;
DROP SCHEMA functions_in_joins CASCADE;
NOTICE: drop cascades to 11 other objects
DETAIL: drop cascades to table table1
drop cascades to sequence numbers
drop cascades to function add(integer,integer)
drop cascades to function increment(integer)
drop cascades to function next_k_integers(integer,integer)
drop cascades to function get_set_of_records()
drop cascades to function dup(integer)
drop cascades to function the_minimum_id()
drop cascades to type min_and_max
drop cascades to function max_and_min()
drop cascades to function the_answer_to_life()
SET search_path TO DEFAULT;

View File

@ -281,10 +281,8 @@ SET client_min_messages TO DEBUG;
SELECT count(*) FROM
(SELECT random() FROM user_buy_test_table JOIN random() AS users_ref_test_table(id)
ON user_buy_test_table.item_id > users_ref_test_table.id) subquery_1;
DEBUG: generating subplan 30_1 for subquery SELECT random() AS random FROM (public.user_buy_test_table JOIN random() users_ref_test_table(id) ON (((user_buy_test_table.item_id)::double precision OPERATOR(pg_catalog.>) users_ref_test_table.id)))
DEBUG: Plan 30 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.random FROM read_intermediate_result('30_1'::text, 'binary'::citus_copy_format) intermediate_result(random double precision)) subquery_1
DEBUG: Creating router plan
DEBUG: Plan is router executable
DEBUG: generating subplan 30_1 for subquery SELECT id FROM random() users_ref_test_table(id)
DEBUG: Plan 30 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT random() AS random FROM (public.user_buy_test_table JOIN (SELECT intermediate_result.id FROM read_intermediate_result('30_1'::text, 'binary'::citus_copy_format) intermediate_result(id double precision)) users_ref_test_table(id) ON (((user_buy_test_table.item_id)::double precision OPERATOR(pg_catalog.>) users_ref_test_table.id)))) subquery_1
count
-------
4
@ -295,10 +293,8 @@ SELECT count(*) FROM
(SELECT item_id FROM user_buy_test_table JOIN generate_series(random()::int,10) AS users_ref_test_table(id)
ON user_buy_test_table.item_id > users_ref_test_table.id) subquery_1
WHERE item_id = 6;
DEBUG: generating subplan 32_1 for subquery SELECT user_buy_test_table.item_id FROM (public.user_buy_test_table JOIN generate_series((random())::integer, 10) users_ref_test_table(id) ON ((user_buy_test_table.item_id OPERATOR(pg_catalog.>) users_ref_test_table.id)))
DEBUG: Plan 32 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.item_id FROM read_intermediate_result('32_1'::text, 'binary'::citus_copy_format) intermediate_result(item_id integer)) subquery_1 WHERE (item_id OPERATOR(pg_catalog.=) 6)
DEBUG: Creating router plan
DEBUG: Plan is router executable
DEBUG: generating subplan 31_1 for subquery SELECT id FROM generate_series((random())::integer, 10) users_ref_test_table(id)
DEBUG: Plan 31 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT user_buy_test_table.item_id FROM (public.user_buy_test_table JOIN (SELECT intermediate_result.id FROM read_intermediate_result('31_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) users_ref_test_table(id) ON ((user_buy_test_table.item_id OPERATOR(pg_catalog.>) users_ref_test_table.id)))) subquery_1 WHERE (item_id OPERATOR(pg_catalog.=) 6)
count
-------
0
@ -309,11 +305,11 @@ SELECT count(*) FROM
(SELECT user_id FROM user_buy_test_table
UNION ALL
SELECT id FROM generate_series(1,10) AS users_ref_test_table(id)) subquery_1;
DEBUG: generating subplan 34_1 for subquery SELECT user_id FROM public.user_buy_test_table
DEBUG: generating subplan 32_1 for subquery SELECT user_id FROM public.user_buy_test_table
DEBUG: Creating router plan
DEBUG: Plan is router executable
DEBUG: generating subplan 34_2 for subquery SELECT intermediate_result.user_id FROM read_intermediate_result('34_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer) UNION ALL SELECT users_ref_test_table.id FROM generate_series(1, 10) users_ref_test_table(id)
DEBUG: Plan 34 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.user_id FROM read_intermediate_result('34_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) subquery_1
DEBUG: generating subplan 32_2 for subquery SELECT intermediate_result.user_id FROM read_intermediate_result('32_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer) UNION ALL SELECT users_ref_test_table.id FROM generate_series(1, 10) users_ref_test_table(id)
DEBUG: Plan 32 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.user_id FROM read_intermediate_result('32_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) subquery_1
DEBUG: Creating router plan
DEBUG: Plan is router executable
count
@ -359,11 +355,11 @@ SELECT count(*) FROM
(SELECT user_id FROM user_buy_test_table
UNION ALL
SELECT id FROM (SELECT 5 AS id) users_ref_test_table) subquery_1;
DEBUG: generating subplan 41_1 for subquery SELECT user_id FROM public.user_buy_test_table
DEBUG: generating subplan 39_1 for subquery SELECT user_id FROM public.user_buy_test_table
DEBUG: Creating router plan
DEBUG: Plan is router executable
DEBUG: generating subplan 41_2 for subquery SELECT intermediate_result.user_id FROM read_intermediate_result('41_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer) UNION ALL SELECT users_ref_test_table.id FROM (SELECT 5 AS id) users_ref_test_table
DEBUG: Plan 41 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.user_id FROM read_intermediate_result('41_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) subquery_1
DEBUG: generating subplan 39_2 for subquery SELECT intermediate_result.user_id FROM read_intermediate_result('39_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer) UNION ALL SELECT users_ref_test_table.id FROM (SELECT 5 AS id) users_ref_test_table
DEBUG: Plan 39 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.user_id FROM read_intermediate_result('39_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) subquery_1
DEBUG: Creating router plan
DEBUG: Plan is router executable
count
@ -379,11 +375,11 @@ SELECT * FROM
UNION
SELECT user_id FROM user_buy_test_table) sub
ORDER BY 1 DESC;
DEBUG: generating subplan 44_1 for subquery SELECT user_id FROM public.user_buy_test_table
DEBUG: generating subplan 42_1 for subquery SELECT user_id FROM public.user_buy_test_table
DEBUG: Creating router plan
DEBUG: Plan is router executable
DEBUG: generating subplan 44_2 for subquery SELECT users_ref_test_table.id FROM public.users_ref_test_table UNION SELECT intermediate_result.user_id FROM read_intermediate_result('44_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)
DEBUG: Plan 44 query after replacing subqueries and CTEs: SELECT id FROM (SELECT intermediate_result.id FROM read_intermediate_result('44_2'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) sub ORDER BY id DESC
DEBUG: generating subplan 42_2 for subquery SELECT users_ref_test_table.id FROM public.users_ref_test_table UNION SELECT intermediate_result.user_id FROM read_intermediate_result('42_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)
DEBUG: Plan 42 query after replacing subqueries and CTEs: SELECT id FROM (SELECT intermediate_result.id FROM read_intermediate_result('42_2'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) sub ORDER BY id DESC
DEBUG: Creating router plan
DEBUG: Plan is router executable
id
@ -403,11 +399,11 @@ SELECT * FROM
UNION
SELECT user_id, random() * 0 FROM (SELECT user_id FROM user_buy_test_table) sub2) sub
ORDER BY 1 DESC;
DEBUG: generating subplan 47_1 for subquery SELECT user_id, (random() OPERATOR(pg_catalog.*) (0)::double precision) FROM (SELECT user_buy_test_table.user_id FROM public.user_buy_test_table) sub2
DEBUG: generating subplan 45_1 for subquery SELECT user_id, (random() OPERATOR(pg_catalog.*) (0)::double precision) FROM (SELECT user_buy_test_table.user_id FROM public.user_buy_test_table) sub2
DEBUG: Creating router plan
DEBUG: Plan is router executable
DEBUG: generating subplan 47_2 for subquery SELECT sub1.id, (random() OPERATOR(pg_catalog.*) (0)::double precision) FROM (SELECT users_ref_test_table.id FROM public.users_ref_test_table) sub1 UNION SELECT intermediate_result.user_id, intermediate_result."?column?" FROM read_intermediate_result('47_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, "?column?" double precision)
DEBUG: Plan 47 query after replacing subqueries and CTEs: SELECT id, "?column?" FROM (SELECT intermediate_result.id, intermediate_result."?column?" FROM read_intermediate_result('47_2'::text, 'binary'::citus_copy_format) intermediate_result(id integer, "?column?" double precision)) sub ORDER BY id DESC
DEBUG: generating subplan 45_2 for subquery SELECT sub1.id, (random() OPERATOR(pg_catalog.*) (0)::double precision) FROM (SELECT users_ref_test_table.id FROM public.users_ref_test_table) sub1 UNION SELECT intermediate_result.user_id, intermediate_result."?column?" FROM read_intermediate_result('45_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, "?column?" double precision)
DEBUG: Plan 45 query after replacing subqueries and CTEs: SELECT id, "?column?" FROM (SELECT intermediate_result.id, intermediate_result."?column?" FROM read_intermediate_result('45_2'::text, 'binary'::citus_copy_format) intermediate_result(id integer, "?column?" double precision)) sub ORDER BY id DESC
DEBUG: Creating router plan
DEBUG: Plan is router executable
id | ?column?
@ -1334,8 +1330,8 @@ SELECT count(*) FROM
(SELECT user_buy_test_table.user_id, random() FROM user_buy_test_table LEFT JOIN users_ref_test_table
ON user_buy_test_table.user_id > users_ref_test_table.id) subquery_2
WHERE subquery_1.user_id != subquery_2.user_id ;
DEBUG: generating subplan 86_1 for subquery SELECT user_buy_test_table.user_id, random() AS random FROM (public.user_buy_test_table LEFT JOIN public.users_ref_test_table ON ((user_buy_test_table.user_id OPERATOR(pg_catalog.>) users_ref_test_table.id)))
DEBUG: Plan 86 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT user_buy_test_table.user_id, random() AS random FROM (public.user_buy_test_table LEFT JOIN public.users_ref_test_table ON ((user_buy_test_table.item_id OPERATOR(pg_catalog.>) users_ref_test_table.id)))) subquery_1, (SELECT intermediate_result.user_id, intermediate_result.random FROM read_intermediate_result('86_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, random double precision)) subquery_2 WHERE (subquery_1.user_id OPERATOR(pg_catalog.<>) subquery_2.user_id)
DEBUG: generating subplan 84_1 for subquery SELECT user_buy_test_table.user_id, random() AS random FROM (public.user_buy_test_table LEFT JOIN public.users_ref_test_table ON ((user_buy_test_table.user_id OPERATOR(pg_catalog.>) users_ref_test_table.id)))
DEBUG: Plan 84 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT user_buy_test_table.user_id, random() AS random FROM (public.user_buy_test_table LEFT JOIN public.users_ref_test_table ON ((user_buy_test_table.item_id OPERATOR(pg_catalog.>) users_ref_test_table.id)))) subquery_1, (SELECT intermediate_result.user_id, intermediate_result.random FROM read_intermediate_result('84_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, random double precision)) subquery_2 WHERE (subquery_1.user_id OPERATOR(pg_catalog.<>) subquery_2.user_id)
count
-------
67
@ -1380,8 +1376,8 @@ count(*) AS cnt, "generated_group_field"
ORDER BY
cnt DESC, generated_group_field ASC
LIMIT 10;
DEBUG: generating subplan 88_1 for subquery SELECT user_id, value_2 AS generated_group_field FROM public.users_table users
DEBUG: Plan 88 query after replacing subqueries and CTEs: SELECT count(*) AS cnt, generated_group_field FROM (SELECT "eventQuery".user_id, random() AS random, "eventQuery".generated_group_field FROM (SELECT multi_group_wrapper_1."time", multi_group_wrapper_1.event_user_id, multi_group_wrapper_1.user_id, left_group_by_1.generated_group_field, random() AS random FROM ((SELECT temp_data_queries."time", temp_data_queries.event_user_id, user_filters_1.user_id FROM ((SELECT events."time", events.user_id AS event_user_id FROM public.events_table events WHERE (events.user_id OPERATOR(pg_catalog.>) 2)) temp_data_queries JOIN (SELECT users.user_id FROM public.users_reference_table users WHERE ((users.user_id OPERATOR(pg_catalog.>) 2) AND (users.value_2 OPERATOR(pg_catalog.=) 5))) user_filters_1 ON ((temp_data_queries.event_user_id OPERATOR(pg_catalog.<) user_filters_1.user_id)))) multi_group_wrapper_1 RIGHT JOIN (SELECT intermediate_result.user_id, intermediate_result.generated_group_field FROM read_intermediate_result('88_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, generated_group_field integer)) left_group_by_1 ON ((left_group_by_1.user_id OPERATOR(pg_catalog.>) multi_group_wrapper_1.event_user_id)))) "eventQuery") "pushedDownQuery" GROUP BY generated_group_field ORDER BY (count(*)) DESC, generated_group_field LIMIT 10
DEBUG: generating subplan 86_1 for subquery SELECT user_id, value_2 AS generated_group_field FROM public.users_table users
DEBUG: Plan 86 query after replacing subqueries and CTEs: SELECT count(*) AS cnt, generated_group_field FROM (SELECT "eventQuery".user_id, random() AS random, "eventQuery".generated_group_field FROM (SELECT multi_group_wrapper_1."time", multi_group_wrapper_1.event_user_id, multi_group_wrapper_1.user_id, left_group_by_1.generated_group_field, random() AS random FROM ((SELECT temp_data_queries."time", temp_data_queries.event_user_id, user_filters_1.user_id FROM ((SELECT events."time", events.user_id AS event_user_id FROM public.events_table events WHERE (events.user_id OPERATOR(pg_catalog.>) 2)) temp_data_queries JOIN (SELECT users.user_id FROM public.users_reference_table users WHERE ((users.user_id OPERATOR(pg_catalog.>) 2) AND (users.value_2 OPERATOR(pg_catalog.=) 5))) user_filters_1 ON ((temp_data_queries.event_user_id OPERATOR(pg_catalog.<) user_filters_1.user_id)))) multi_group_wrapper_1 RIGHT JOIN (SELECT intermediate_result.user_id, intermediate_result.generated_group_field FROM read_intermediate_result('86_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, generated_group_field integer)) left_group_by_1 ON ((left_group_by_1.user_id OPERATOR(pg_catalog.>) multi_group_wrapper_1.event_user_id)))) "eventQuery") "pushedDownQuery" GROUP BY generated_group_field ORDER BY (count(*)) DESC, generated_group_field LIMIT 10
ERROR: cannot pushdown the subquery
DETAIL: Complex subqueries and CTEs cannot be in the outer part of the outer join
RESET client_min_messages;

View File

@ -509,7 +509,7 @@ DEBUG: generating subplan 100_2 for subquery SELECT x, y FROM recursive_union.t
DEBUG: Creating router plan
DEBUG: Plan is router executable
DEBUG: generating subplan 100_3 for subquery SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('100_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer) EXCEPT SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('100_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)
DEBUG: Plan 100 query after replacing subqueries and CTEs: SELECT u.x, u.y FROM ((SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('100_3'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) u JOIN generate_series(1, 10) x(x) USING (x)) ORDER BY u.x, u.y
DEBUG: Plan 100 query after replacing subqueries and CTEs: SELECT u.x, u.y FROM ((SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('100_3'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) u JOIN (SELECT x_1.x FROM generate_series(1, 10) x_1(x)) x USING (x)) ORDER BY u.x, u.y
DEBUG: Creating router plan
DEBUG: Plan is router executable
x | y

View File

@ -64,7 +64,7 @@ test: multi_deparse_shard_query multi_distributed_transaction_id multi_real_time
test: multi_explain
test: multi_basic_queries multi_complex_expressions multi_subquery multi_subquery_complex_queries multi_subquery_behavioral_analytics
test: multi_subquery_complex_reference_clause multi_subquery_window_functions multi_view multi_sql_function multi_prepare_sql
test: sql_procedure
test: sql_procedure multi_function_in_join
test: multi_subquery_in_where_reference_clause
test: multi_subquery_union multi_subquery_in_where_clause multi_subquery_misc
test: multi_agg_distinct multi_agg_approximate_distinct multi_limit_clause_approximate multi_outer_join_reference multi_single_relation_subquery multi_prepare_plsql

View File

@ -0,0 +1,146 @@
--
-- multi function in join queries aims to test the function calls that are
-- used in joins.
--
-- These functions are supposed to be executed on the worker and to ensure
-- that we wrap those functions inside (SELECT * FROM fnc()) sub queries.
--
-- We do not yet support those functions that:
-- - have lateral joins
-- - have WITH ORDINALITY clause
-- - are user-defined and immutable
CREATE SCHEMA functions_in_joins;
SET search_path TO 'functions_in_joins';
SET citus.next_shard_id TO 2500000;
CREATE TABLE table1 (id int, data int);
SELECT create_distributed_table('table1','id');
INSERT INTO table1
SELECT x, x*x
from generate_series(1, 100) as f (x);
-- Verbose messages for observing the subqueries that wrapped function calls
SET client_min_messages TO DEBUG1;
-- Check joins on a sequence
CREATE SEQUENCE numbers;
SELECT * FROM table1 JOIN nextval('numbers') n ON (id = n) ORDER BY id ASC;
-- Check joins of a function that returns a single integer
CREATE FUNCTION add(integer, integer) RETURNS integer
AS 'SELECT $1 + $2;'
LANGUAGE SQL;
SELECT * FROM table1 JOIN add(3,5) sum ON (id = sum) ORDER BY id ASC;
-- Check join of plpgsql functions
-- a function returning a single integer
CREATE OR REPLACE FUNCTION increment(i integer) RETURNS integer AS $$
BEGIN
RETURN i + 1;
END;
$$ LANGUAGE plpgsql;
SELECT * FROM table1 JOIN increment(2) val ON (id = val) ORDER BY id ASC;
-- a function that returns a set of integers
CREATE OR REPLACE FUNCTION next_k_integers(IN first_value INTEGER,
IN k INTEGER DEFAULT 3,
OUT result INTEGER)
RETURNS SETOF INTEGER AS $$
BEGIN
RETURN QUERY SELECT x FROM generate_series(first_value, first_value+k-1) f(x);
END;
$$ LANGUAGE plpgsql;
SELECT *
FROM table1 JOIN next_k_integers(3,2) next_integers ON (id = next_integers.result)
ORDER BY id ASC;
-- a function returning set of records
CREATE FUNCTION get_set_of_records() RETURNS SETOF RECORD AS $cmd$
SELECT x, x+1 FROM generate_series(0,4) f(x)
$cmd$
LANGUAGE SQL;
SELECT * FROM table1 JOIN get_set_of_records() AS t2(x int, y int) ON (id = x) ORDER BY id ASC;
-- a function returning table
CREATE FUNCTION dup(int) RETURNS TABLE(f1 int, f2 text)
AS $$ SELECT $1, CAST($1 AS text) || ' is text' $$
LANGUAGE SQL;
SELECT f.* FROM table1 t JOIN dup(32) f ON (f1 = id);
-- a stable function
CREATE OR REPLACE FUNCTION the_minimum_id()
RETURNS INTEGER STABLE AS 'SELECT min(id) FROM table1' LANGUAGE SQL;
SELECT * FROM table1 JOIN the_minimum_id() min_id ON (id = min_id);
-- a built-in immutable function
SELECT * FROM table1 JOIN abs(100) as hundred ON (id = hundred) ORDER BY id ASC;
-- function joins inside a CTE
WITH next_row_to_process AS (
SELECT * FROM table1 JOIN nextval('numbers') n ON (id = n)
)
SELECT *
FROM table1, next_row_to_process
WHERE table1.data <= next_row_to_process.data
ORDER BY 1,2 ASC;
-- Multiple functions in an RTE
SELECT * FROM ROWS FROM (next_k_integers(5), next_k_integers(10)) AS f(a, b),
table1 WHERE id = a ORDER BY id ASC;
-- Custom Type returning function used in a join
CREATE TYPE min_and_max AS (
minimum INT,
maximum INT
);
CREATE OR REPLACE FUNCTION max_and_min () RETURNS
min_and_max AS $$
DECLARE
result min_and_max%rowtype;
begin
select into result min(data) as minimum, max(data) as maximum from table1;
return result;
end;
$$ language plpgsql;
SELECT * FROM table1 JOIN max_and_min() m ON (m.maximum = data OR m.minimum = data);
-- The following tests will fail as we do not support all joins on
-- all kinds of functions
SET client_min_messages TO ERROR;
-- function joins in CTE results can create lateral joins that are not supported
SELECT public.raise_failed_execution($cmd$
WITH one_row AS (
SELECT * FROM table1 WHERE id=52
)
SELECT table1.id, table1.data
FROM one_row, table1, next_k_integers(one_row.id, 5) next_five_ids
WHERE table1.id = next_five_ids;
$cmd$);
-- a user-defined immutable function
CREATE OR REPLACE FUNCTION the_answer_to_life()
RETURNS INTEGER IMMUTABLE AS 'SELECT 42' LANGUAGE SQL;
SELECT public.raise_failed_execution($cmd$
SELECT * FROM table1 JOIN the_answer_to_life() the_answer ON (id = the_answer)
$cmd$);
-- WITH ORDINALITY clause
SELECT public.raise_failed_execution($cmd$
SELECT *
FROM table1
JOIN next_k_integers(10,5) WITH ORDINALITY next_integers
ON (id = next_integers.result)
ORDER BY id ASC;
$cmd$);
RESET client_min_messages;
DROP SCHEMA functions_in_joins CASCADE;
SET search_path TO DEFAULT;