diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index ddb21593e..c2956e1e0 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -8,10 +8,12 @@ */ #include "postgres.h" +#include "funcapi.h" #include #include +#include "access/htup_details.h" #include "catalog/pg_class.h" #include "catalog/pg_type.h" #include "distributed/citus_nodefuncs.h" @@ -80,6 +82,7 @@ static PlannedStmt * FinalizeNonRouterPlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan, CustomScan *customScan); static PlannedStmt * FinalizeRouterPlan(PlannedStmt *localPlan, CustomScan *customScan); +static int32 BlessRecordExpression(Expr *expr); static void CheckNodeIsDumpable(Node *node); static Node * CheckNodeCopyAndSerialization(Node *node); static void AdjustReadIntermediateResultCost(RangeTblEntry *rangeTableEntry, @@ -1089,6 +1092,18 @@ FinalizeRouterPlan(PlannedStmt *localPlan, CustomScan *customScan) /* build target entry pointing to remote scan range table entry */ newVar = makeVarFromTargetEntry(customScanRangeTableIndex, targetEntry); + + if (newVar->vartype == RECORDOID) + { + /* + * Add the anonymous composite type to the type cache and store + * the key in vartypmod. Eventually this makes its way into the + * TupleDesc used by the executor, which uses it to parse the + * query results from the workers in BuildTupleFromCStrings. + */ + newVar->vartypmod = BlessRecordExpression(targetEntry->expr); + } + newTargetEntry = flatCopyTargetEntry(targetEntry); newTargetEntry->expr = (Expr *) newVar; targetList = lappend(targetList, newTargetEntry); @@ -1120,6 +1135,78 @@ FinalizeRouterPlan(PlannedStmt *localPlan, CustomScan *customScan) } +/* + * BlessRecordExpression ensures we can parse an anonymous composite type on the + * target list of a query that is sent to the worker. + * + * We cannot normally parse record types coming from the workers unless we + * "bless" the tuple descriptor, which adds a transient type to the type cache + * and assigns it a type mod value, which is the key in the type cache. + */ +static int32 +BlessRecordExpression(Expr *expr) +{ + int32 typeMod = -1; + + if (IsA(expr, FuncExpr)) + { + /* + * Handle functions that return records on the target + * list, e.g. SELECT function_call(1,2); + */ + Oid resultTypeId = InvalidOid; + TupleDesc resultTupleDesc = NULL; + TypeFuncClass typeClass; + + /* get_expr_result_type blesses the tuple descriptor */ + typeClass = get_expr_result_type((Node *) expr, &resultTypeId, + &resultTupleDesc); + if (typeClass == TYPEFUNC_COMPOSITE) + { + typeMod = resultTupleDesc->tdtypmod; + } + } + else if (IsA(expr, RowExpr)) + { + /* + * Handle row expressions, e.g. SELECT (1,2); + */ + RowExpr *rowExpr = (RowExpr *) expr; + TupleDesc rowTupleDesc = NULL; + ListCell *argCell = NULL; + int currentResno = 1; + + rowTupleDesc = CreateTemplateTupleDesc(list_length(rowExpr->args), false); + + foreach(argCell, rowExpr->args) + { + Node *rowArg = (Node *) lfirst(argCell); + Oid rowArgTypeId = exprType(rowArg); + int rowArgTypeMod = exprTypmod(rowArg); + + if (rowArgTypeId == RECORDOID) + { + /* ensure nested rows are blessed as well */ + rowArgTypeMod = BlessRecordExpression((Expr *) rowArg); + } + + TupleDescInitEntry(rowTupleDesc, currentResno, NULL, + rowArgTypeId, rowArgTypeMod, 0); + TupleDescInitEntryCollation(rowTupleDesc, currentResno, + exprCollation(rowArg)); + + currentResno++; + } + + BlessTupleDesc(rowTupleDesc); + + typeMod = rowTupleDesc->tdtypmod; + } + + return typeMod; +} + + /* * RemoteScanRangeTableEntry creates a range table entry from given column name * list to represent a remote scan. diff --git a/src/test/regress/expected/row_types.out b/src/test/regress/expected/row_types.out new file mode 100644 index 000000000..e6f519e55 --- /dev/null +++ b/src/test/regress/expected/row_types.out @@ -0,0 +1,151 @@ +-- Tests for row types on the target list +CREATE SCHEMA row_types; +SET search_path TO row_types; +CREATE TABLE test (x int, y int); +SELECT create_distributed_table('test','x'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE OR REPLACE FUNCTION table_returner(INT) RETURNS TABLE(name text, id INT) +AS $$ +BEGIN + RETURN QUERY SELECT $1::text, $1; +END; +$$ language plpgsql; +SELECT create_distributed_function('table_returner(int)'); + create_distributed_function +----------------------------- + +(1 row) + +CREATE OR REPLACE FUNCTION record_returner(INOUT id int, OUT name text) +RETURNS record +AS $$ +BEGIN + id := id + 1; + name := 'returned'; +END; +$$ language plpgsql; +SELECT create_distributed_function('record_returner(int)'); + create_distributed_function +----------------------------- + +(1 row) + +INSERT INTO test VALUES (1,2), (1,3), (2,2), (2,3); +-- multi-shard queries do not support row types +SELECT (x,y) FROM test ORDER BY x, y; +ERROR: input of anonymous composite types is not implemented +SELECT (x,y) FROM test GROUP BY x, y ORDER BY x, y; +ERROR: input of anonymous composite types is not implemented +select distinct (x,y) AS foo, x, y FROM test ORDER BY x, y; +ERROR: input of anonymous composite types is not implemented +SELECT table_returner(x) FROM test ORDER BY x, y; +ERROR: input of anonymous composite types is not implemented +SELECT record_returner(x) FROM test ORDER BY x, y; +ERROR: input of anonymous composite types is not implemented +-- router queries support row types +SELECT (x,y) FROM test WHERE x = 1 ORDER BY x, y; + row +------- + (1,2) + (1,3) +(2 rows) + +SELECT (x,y) AS foo FROM test WHERE x = 1 ORDER BY x, y; + foo +------- + (1,2) + (1,3) +(2 rows) + +select distinct (x,y) AS foo, x, y FROM test WHERE x = 1 ORDER BY x, y; + foo | x | y +-------+---+--- + (1,2) | 1 | 2 + (1,3) | 1 | 3 +(2 rows) + +SELECT table_returner(x) FROM test WHERE x = 1 ORDER BY x, y; + table_returner +---------------- + (1,1) + (1,1) +(2 rows) + +SELECT record_returner(x) FROM test WHERE x = 1 ORDER BY x, y; + record_returner +----------------- + (2,returned) + (2,returned) +(2 rows) + +-- nested row expressions +SELECT (x,(x,y)) AS foo FROM test WHERE x = 1 ORDER BY x, y; + foo +------------- + (1,"(1,2)") + (1,"(1,3)") +(2 rows) + +SELECT (x,record_returner(x)) FROM test WHERE x = 1 ORDER BY x, y; + row +-------------------- + (1,"(2,returned)") + (1,"(2,returned)") +(2 rows) + +-- table functions in row expressions are not supported +SELECT (x,table_returner(x)) FROM test WHERE x = 1 ORDER BY x, y; +ERROR: input of anonymous composite types is not implemented +-- try prepared statements +PREPARE rec(int) AS SELECT (x,y*$1) FROM test WHERE x = $1 ORDER BY x, y; +EXECUTE rec(1); + row +------- + (1,2) + (1,3) +(2 rows) + +EXECUTE rec(1); + row +------- + (1,2) + (1,3) +(2 rows) + +EXECUTE rec(1); + row +------- + (1,2) + (1,3) +(2 rows) + +EXECUTE rec(1); + row +------- + (1,2) + (1,3) +(2 rows) + +EXECUTE rec(1); + row +------- + (1,2) + (1,3) +(2 rows) + +EXECUTE rec(1); + row +------- + (1,2) + (1,3) +(2 rows) + +DROP SCHEMA row_types CASCADE; +NOTICE: drop cascades to 3 other objects +DETAIL: drop cascades to table test +drop cascades to function table_returner(integer) +drop cascades to function record_returner(integer) diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 1cb8334b4..49ef8fdbb 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -65,7 +65,7 @@ test: multi_deparse_shard_query multi_distributed_transaction_id multi_real_time test: multi_explain hyperscale_tutorial test: multi_basic_queries multi_complex_expressions multi_subquery multi_subquery_complex_queries multi_subquery_behavioral_analytics test: multi_subquery_complex_reference_clause multi_subquery_window_functions multi_view multi_sql_function multi_prepare_sql -test: sql_procedure multi_function_in_join +test: sql_procedure multi_function_in_join row_types test: multi_subquery_in_where_reference_clause full_join adaptive_executor propagate_set_commands test: multi_subquery_union multi_subquery_in_where_clause multi_subquery_misc test: multi_agg_distinct multi_agg_approximate_distinct multi_limit_clause_approximate multi_outer_join_reference multi_single_relation_subquery multi_prepare_plsql diff --git a/src/test/regress/sql/row_types.sql b/src/test/regress/sql/row_types.sql new file mode 100644 index 000000000..cedd6b48f --- /dev/null +++ b/src/test/regress/sql/row_types.sql @@ -0,0 +1,59 @@ +-- Tests for row types on the target list +CREATE SCHEMA row_types; +SET search_path TO row_types; + +CREATE TABLE test (x int, y int); +SELECT create_distributed_table('test','x'); + +CREATE OR REPLACE FUNCTION table_returner(INT) RETURNS TABLE(name text, id INT) +AS $$ +BEGIN + RETURN QUERY SELECT $1::text, $1; +END; +$$ language plpgsql; +SELECT create_distributed_function('table_returner(int)'); + +CREATE OR REPLACE FUNCTION record_returner(INOUT id int, OUT name text) +RETURNS record +AS $$ +BEGIN + id := id + 1; + name := 'returned'; +END; +$$ language plpgsql; +SELECT create_distributed_function('record_returner(int)'); + + +INSERT INTO test VALUES (1,2), (1,3), (2,2), (2,3); + +-- multi-shard queries do not support row types +SELECT (x,y) FROM test ORDER BY x, y; +SELECT (x,y) FROM test GROUP BY x, y ORDER BY x, y; +select distinct (x,y) AS foo, x, y FROM test ORDER BY x, y; +SELECT table_returner(x) FROM test ORDER BY x, y; +SELECT record_returner(x) FROM test ORDER BY x, y; + +-- router queries support row types +SELECT (x,y) FROM test WHERE x = 1 ORDER BY x, y; +SELECT (x,y) AS foo FROM test WHERE x = 1 ORDER BY x, y; +select distinct (x,y) AS foo, x, y FROM test WHERE x = 1 ORDER BY x, y; +SELECT table_returner(x) FROM test WHERE x = 1 ORDER BY x, y; +SELECT record_returner(x) FROM test WHERE x = 1 ORDER BY x, y; + +-- nested row expressions +SELECT (x,(x,y)) AS foo FROM test WHERE x = 1 ORDER BY x, y; +SELECT (x,record_returner(x)) FROM test WHERE x = 1 ORDER BY x, y; + +-- table functions in row expressions are not supported +SELECT (x,table_returner(x)) FROM test WHERE x = 1 ORDER BY x, y; + +-- try prepared statements +PREPARE rec(int) AS SELECT (x,y*$1) FROM test WHERE x = $1 ORDER BY x, y; +EXECUTE rec(1); +EXECUTE rec(1); +EXECUTE rec(1); +EXECUTE rec(1); +EXECUTE rec(1); +EXECUTE rec(1); + +DROP SCHEMA row_types CASCADE;