mirror of https://github.com/citusdata/citus.git
Merge pull request #3006 from citusdata/router_row_types
Support anonymous composite types on the target list in router queriespull/3013/head
commit
9474bee98b
|
@ -8,10 +8,12 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "postgres.h"
|
#include "postgres.h"
|
||||||
|
#include "funcapi.h"
|
||||||
|
|
||||||
#include <float.h>
|
#include <float.h>
|
||||||
#include <limits.h>
|
#include <limits.h>
|
||||||
|
|
||||||
|
#include "access/htup_details.h"
|
||||||
#include "catalog/pg_class.h"
|
#include "catalog/pg_class.h"
|
||||||
#include "catalog/pg_type.h"
|
#include "catalog/pg_type.h"
|
||||||
#include "distributed/citus_nodefuncs.h"
|
#include "distributed/citus_nodefuncs.h"
|
||||||
|
@ -80,6 +82,7 @@ static PlannedStmt * FinalizeNonRouterPlan(PlannedStmt *localPlan,
|
||||||
DistributedPlan *distributedPlan,
|
DistributedPlan *distributedPlan,
|
||||||
CustomScan *customScan);
|
CustomScan *customScan);
|
||||||
static PlannedStmt * FinalizeRouterPlan(PlannedStmt *localPlan, CustomScan *customScan);
|
static PlannedStmt * FinalizeRouterPlan(PlannedStmt *localPlan, CustomScan *customScan);
|
||||||
|
static int32 BlessRecordExpression(Expr *expr);
|
||||||
static void CheckNodeIsDumpable(Node *node);
|
static void CheckNodeIsDumpable(Node *node);
|
||||||
static Node * CheckNodeCopyAndSerialization(Node *node);
|
static Node * CheckNodeCopyAndSerialization(Node *node);
|
||||||
static void AdjustReadIntermediateResultCost(RangeTblEntry *rangeTableEntry,
|
static void AdjustReadIntermediateResultCost(RangeTblEntry *rangeTableEntry,
|
||||||
|
@ -1089,6 +1092,18 @@ FinalizeRouterPlan(PlannedStmt *localPlan, CustomScan *customScan)
|
||||||
|
|
||||||
/* build target entry pointing to remote scan range table entry */
|
/* build target entry pointing to remote scan range table entry */
|
||||||
newVar = makeVarFromTargetEntry(customScanRangeTableIndex, targetEntry);
|
newVar = makeVarFromTargetEntry(customScanRangeTableIndex, targetEntry);
|
||||||
|
|
||||||
|
if (newVar->vartype == RECORDOID)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Add the anonymous composite type to the type cache and store
|
||||||
|
* the key in vartypmod. Eventually this makes its way into the
|
||||||
|
* TupleDesc used by the executor, which uses it to parse the
|
||||||
|
* query results from the workers in BuildTupleFromCStrings.
|
||||||
|
*/
|
||||||
|
newVar->vartypmod = BlessRecordExpression(targetEntry->expr);
|
||||||
|
}
|
||||||
|
|
||||||
newTargetEntry = flatCopyTargetEntry(targetEntry);
|
newTargetEntry = flatCopyTargetEntry(targetEntry);
|
||||||
newTargetEntry->expr = (Expr *) newVar;
|
newTargetEntry->expr = (Expr *) newVar;
|
||||||
targetList = lappend(targetList, newTargetEntry);
|
targetList = lappend(targetList, newTargetEntry);
|
||||||
|
@ -1120,6 +1135,78 @@ FinalizeRouterPlan(PlannedStmt *localPlan, CustomScan *customScan)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* BlessRecordExpression ensures we can parse an anonymous composite type on the
|
||||||
|
* target list of a query that is sent to the worker.
|
||||||
|
*
|
||||||
|
* We cannot normally parse record types coming from the workers unless we
|
||||||
|
* "bless" the tuple descriptor, which adds a transient type to the type cache
|
||||||
|
* and assigns it a type mod value, which is the key in the type cache.
|
||||||
|
*/
|
||||||
|
static int32
|
||||||
|
BlessRecordExpression(Expr *expr)
|
||||||
|
{
|
||||||
|
int32 typeMod = -1;
|
||||||
|
|
||||||
|
if (IsA(expr, FuncExpr))
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Handle functions that return records on the target
|
||||||
|
* list, e.g. SELECT function_call(1,2);
|
||||||
|
*/
|
||||||
|
Oid resultTypeId = InvalidOid;
|
||||||
|
TupleDesc resultTupleDesc = NULL;
|
||||||
|
TypeFuncClass typeClass;
|
||||||
|
|
||||||
|
/* get_expr_result_type blesses the tuple descriptor */
|
||||||
|
typeClass = get_expr_result_type((Node *) expr, &resultTypeId,
|
||||||
|
&resultTupleDesc);
|
||||||
|
if (typeClass == TYPEFUNC_COMPOSITE)
|
||||||
|
{
|
||||||
|
typeMod = resultTupleDesc->tdtypmod;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (IsA(expr, RowExpr))
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Handle row expressions, e.g. SELECT (1,2);
|
||||||
|
*/
|
||||||
|
RowExpr *rowExpr = (RowExpr *) expr;
|
||||||
|
TupleDesc rowTupleDesc = NULL;
|
||||||
|
ListCell *argCell = NULL;
|
||||||
|
int currentResno = 1;
|
||||||
|
|
||||||
|
rowTupleDesc = CreateTemplateTupleDesc(list_length(rowExpr->args), false);
|
||||||
|
|
||||||
|
foreach(argCell, rowExpr->args)
|
||||||
|
{
|
||||||
|
Node *rowArg = (Node *) lfirst(argCell);
|
||||||
|
Oid rowArgTypeId = exprType(rowArg);
|
||||||
|
int rowArgTypeMod = exprTypmod(rowArg);
|
||||||
|
|
||||||
|
if (rowArgTypeId == RECORDOID)
|
||||||
|
{
|
||||||
|
/* ensure nested rows are blessed as well */
|
||||||
|
rowArgTypeMod = BlessRecordExpression((Expr *) rowArg);
|
||||||
|
}
|
||||||
|
|
||||||
|
TupleDescInitEntry(rowTupleDesc, currentResno, NULL,
|
||||||
|
rowArgTypeId, rowArgTypeMod, 0);
|
||||||
|
TupleDescInitEntryCollation(rowTupleDesc, currentResno,
|
||||||
|
exprCollation(rowArg));
|
||||||
|
|
||||||
|
currentResno++;
|
||||||
|
}
|
||||||
|
|
||||||
|
BlessTupleDesc(rowTupleDesc);
|
||||||
|
|
||||||
|
typeMod = rowTupleDesc->tdtypmod;
|
||||||
|
}
|
||||||
|
|
||||||
|
return typeMod;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* RemoteScanRangeTableEntry creates a range table entry from given column name
|
* RemoteScanRangeTableEntry creates a range table entry from given column name
|
||||||
* list to represent a remote scan.
|
* list to represent a remote scan.
|
||||||
|
|
|
@ -0,0 +1,151 @@
|
||||||
|
-- Tests for row types on the target list
|
||||||
|
CREATE SCHEMA row_types;
|
||||||
|
SET search_path TO row_types;
|
||||||
|
CREATE TABLE test (x int, y int);
|
||||||
|
SELECT create_distributed_table('test','x');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION table_returner(INT) RETURNS TABLE(name text, id INT)
|
||||||
|
AS $$
|
||||||
|
BEGIN
|
||||||
|
RETURN QUERY SELECT $1::text, $1;
|
||||||
|
END;
|
||||||
|
$$ language plpgsql;
|
||||||
|
SELECT create_distributed_function('table_returner(int)');
|
||||||
|
create_distributed_function
|
||||||
|
-----------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION record_returner(INOUT id int, OUT name text)
|
||||||
|
RETURNS record
|
||||||
|
AS $$
|
||||||
|
BEGIN
|
||||||
|
id := id + 1;
|
||||||
|
name := 'returned';
|
||||||
|
END;
|
||||||
|
$$ language plpgsql;
|
||||||
|
SELECT create_distributed_function('record_returner(int)');
|
||||||
|
create_distributed_function
|
||||||
|
-----------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO test VALUES (1,2), (1,3), (2,2), (2,3);
|
||||||
|
-- multi-shard queries do not support row types
|
||||||
|
SELECT (x,y) FROM test ORDER BY x, y;
|
||||||
|
ERROR: input of anonymous composite types is not implemented
|
||||||
|
SELECT (x,y) FROM test GROUP BY x, y ORDER BY x, y;
|
||||||
|
ERROR: input of anonymous composite types is not implemented
|
||||||
|
select distinct (x,y) AS foo, x, y FROM test ORDER BY x, y;
|
||||||
|
ERROR: input of anonymous composite types is not implemented
|
||||||
|
SELECT table_returner(x) FROM test ORDER BY x, y;
|
||||||
|
ERROR: input of anonymous composite types is not implemented
|
||||||
|
SELECT record_returner(x) FROM test ORDER BY x, y;
|
||||||
|
ERROR: input of anonymous composite types is not implemented
|
||||||
|
-- router queries support row types
|
||||||
|
SELECT (x,y) FROM test WHERE x = 1 ORDER BY x, y;
|
||||||
|
row
|
||||||
|
-------
|
||||||
|
(1,2)
|
||||||
|
(1,3)
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
SELECT (x,y) AS foo FROM test WHERE x = 1 ORDER BY x, y;
|
||||||
|
foo
|
||||||
|
-------
|
||||||
|
(1,2)
|
||||||
|
(1,3)
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
select distinct (x,y) AS foo, x, y FROM test WHERE x = 1 ORDER BY x, y;
|
||||||
|
foo | x | y
|
||||||
|
-------+---+---
|
||||||
|
(1,2) | 1 | 2
|
||||||
|
(1,3) | 1 | 3
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
SELECT table_returner(x) FROM test WHERE x = 1 ORDER BY x, y;
|
||||||
|
table_returner
|
||||||
|
----------------
|
||||||
|
(1,1)
|
||||||
|
(1,1)
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
SELECT record_returner(x) FROM test WHERE x = 1 ORDER BY x, y;
|
||||||
|
record_returner
|
||||||
|
-----------------
|
||||||
|
(2,returned)
|
||||||
|
(2,returned)
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- nested row expressions
|
||||||
|
SELECT (x,(x,y)) AS foo FROM test WHERE x = 1 ORDER BY x, y;
|
||||||
|
foo
|
||||||
|
-------------
|
||||||
|
(1,"(1,2)")
|
||||||
|
(1,"(1,3)")
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
SELECT (x,record_returner(x)) FROM test WHERE x = 1 ORDER BY x, y;
|
||||||
|
row
|
||||||
|
--------------------
|
||||||
|
(1,"(2,returned)")
|
||||||
|
(1,"(2,returned)")
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- table functions in row expressions are not supported
|
||||||
|
SELECT (x,table_returner(x)) FROM test WHERE x = 1 ORDER BY x, y;
|
||||||
|
ERROR: input of anonymous composite types is not implemented
|
||||||
|
-- try prepared statements
|
||||||
|
PREPARE rec(int) AS SELECT (x,y*$1) FROM test WHERE x = $1 ORDER BY x, y;
|
||||||
|
EXECUTE rec(1);
|
||||||
|
row
|
||||||
|
-------
|
||||||
|
(1,2)
|
||||||
|
(1,3)
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
EXECUTE rec(1);
|
||||||
|
row
|
||||||
|
-------
|
||||||
|
(1,2)
|
||||||
|
(1,3)
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
EXECUTE rec(1);
|
||||||
|
row
|
||||||
|
-------
|
||||||
|
(1,2)
|
||||||
|
(1,3)
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
EXECUTE rec(1);
|
||||||
|
row
|
||||||
|
-------
|
||||||
|
(1,2)
|
||||||
|
(1,3)
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
EXECUTE rec(1);
|
||||||
|
row
|
||||||
|
-------
|
||||||
|
(1,2)
|
||||||
|
(1,3)
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
EXECUTE rec(1);
|
||||||
|
row
|
||||||
|
-------
|
||||||
|
(1,2)
|
||||||
|
(1,3)
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
DROP SCHEMA row_types CASCADE;
|
||||||
|
NOTICE: drop cascades to 3 other objects
|
||||||
|
DETAIL: drop cascades to table test
|
||||||
|
drop cascades to function table_returner(integer)
|
||||||
|
drop cascades to function record_returner(integer)
|
|
@ -65,7 +65,7 @@ test: multi_deparse_shard_query multi_distributed_transaction_id multi_real_time
|
||||||
test: multi_explain hyperscale_tutorial
|
test: multi_explain hyperscale_tutorial
|
||||||
test: multi_basic_queries multi_complex_expressions multi_subquery multi_subquery_complex_queries multi_subquery_behavioral_analytics
|
test: multi_basic_queries multi_complex_expressions multi_subquery multi_subquery_complex_queries multi_subquery_behavioral_analytics
|
||||||
test: multi_subquery_complex_reference_clause multi_subquery_window_functions multi_view multi_sql_function multi_prepare_sql
|
test: multi_subquery_complex_reference_clause multi_subquery_window_functions multi_view multi_sql_function multi_prepare_sql
|
||||||
test: sql_procedure multi_function_in_join
|
test: sql_procedure multi_function_in_join row_types
|
||||||
test: multi_subquery_in_where_reference_clause full_join adaptive_executor propagate_set_commands
|
test: multi_subquery_in_where_reference_clause full_join adaptive_executor propagate_set_commands
|
||||||
test: multi_subquery_union multi_subquery_in_where_clause multi_subquery_misc
|
test: multi_subquery_union multi_subquery_in_where_clause multi_subquery_misc
|
||||||
test: multi_agg_distinct multi_agg_approximate_distinct multi_limit_clause_approximate multi_outer_join_reference multi_single_relation_subquery multi_prepare_plsql
|
test: multi_agg_distinct multi_agg_approximate_distinct multi_limit_clause_approximate multi_outer_join_reference multi_single_relation_subquery multi_prepare_plsql
|
||||||
|
|
|
@ -0,0 +1,59 @@
|
||||||
|
-- Tests for row types on the target list
|
||||||
|
CREATE SCHEMA row_types;
|
||||||
|
SET search_path TO row_types;
|
||||||
|
|
||||||
|
CREATE TABLE test (x int, y int);
|
||||||
|
SELECT create_distributed_table('test','x');
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION table_returner(INT) RETURNS TABLE(name text, id INT)
|
||||||
|
AS $$
|
||||||
|
BEGIN
|
||||||
|
RETURN QUERY SELECT $1::text, $1;
|
||||||
|
END;
|
||||||
|
$$ language plpgsql;
|
||||||
|
SELECT create_distributed_function('table_returner(int)');
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION record_returner(INOUT id int, OUT name text)
|
||||||
|
RETURNS record
|
||||||
|
AS $$
|
||||||
|
BEGIN
|
||||||
|
id := id + 1;
|
||||||
|
name := 'returned';
|
||||||
|
END;
|
||||||
|
$$ language plpgsql;
|
||||||
|
SELECT create_distributed_function('record_returner(int)');
|
||||||
|
|
||||||
|
|
||||||
|
INSERT INTO test VALUES (1,2), (1,3), (2,2), (2,3);
|
||||||
|
|
||||||
|
-- multi-shard queries do not support row types
|
||||||
|
SELECT (x,y) FROM test ORDER BY x, y;
|
||||||
|
SELECT (x,y) FROM test GROUP BY x, y ORDER BY x, y;
|
||||||
|
select distinct (x,y) AS foo, x, y FROM test ORDER BY x, y;
|
||||||
|
SELECT table_returner(x) FROM test ORDER BY x, y;
|
||||||
|
SELECT record_returner(x) FROM test ORDER BY x, y;
|
||||||
|
|
||||||
|
-- router queries support row types
|
||||||
|
SELECT (x,y) FROM test WHERE x = 1 ORDER BY x, y;
|
||||||
|
SELECT (x,y) AS foo FROM test WHERE x = 1 ORDER BY x, y;
|
||||||
|
select distinct (x,y) AS foo, x, y FROM test WHERE x = 1 ORDER BY x, y;
|
||||||
|
SELECT table_returner(x) FROM test WHERE x = 1 ORDER BY x, y;
|
||||||
|
SELECT record_returner(x) FROM test WHERE x = 1 ORDER BY x, y;
|
||||||
|
|
||||||
|
-- nested row expressions
|
||||||
|
SELECT (x,(x,y)) AS foo FROM test WHERE x = 1 ORDER BY x, y;
|
||||||
|
SELECT (x,record_returner(x)) FROM test WHERE x = 1 ORDER BY x, y;
|
||||||
|
|
||||||
|
-- table functions in row expressions are not supported
|
||||||
|
SELECT (x,table_returner(x)) FROM test WHERE x = 1 ORDER BY x, y;
|
||||||
|
|
||||||
|
-- try prepared statements
|
||||||
|
PREPARE rec(int) AS SELECT (x,y*$1) FROM test WHERE x = $1 ORDER BY x, y;
|
||||||
|
EXECUTE rec(1);
|
||||||
|
EXECUTE rec(1);
|
||||||
|
EXECUTE rec(1);
|
||||||
|
EXECUTE rec(1);
|
||||||
|
EXECUTE rec(1);
|
||||||
|
EXECUTE rec(1);
|
||||||
|
|
||||||
|
DROP SCHEMA row_types CASCADE;
|
Loading…
Reference in New Issue