mirror of https://github.com/citusdata/citus.git
Improve RECORD support
parent
eb35743c3f
commit
edc7a2ee38
|
@ -84,7 +84,6 @@ static PlannedStmt * FinalizeNonRouterPlan(PlannedStmt *localPlan,
|
||||||
DistributedPlan *distributedPlan,
|
DistributedPlan *distributedPlan,
|
||||||
CustomScan *customScan);
|
CustomScan *customScan);
|
||||||
static PlannedStmt * FinalizeRouterPlan(PlannedStmt *localPlan, CustomScan *customScan);
|
static PlannedStmt * FinalizeRouterPlan(PlannedStmt *localPlan, CustomScan *customScan);
|
||||||
static int32 BlessRecordExpression(Expr *expr);
|
|
||||||
static void CheckNodeIsDumpable(Node *node);
|
static void CheckNodeIsDumpable(Node *node);
|
||||||
static Node * CheckNodeCopyAndSerialization(Node *node);
|
static Node * CheckNodeCopyAndSerialization(Node *node);
|
||||||
static void AdjustReadIntermediateResultCost(RangeTblEntry *rangeTableEntry,
|
static void AdjustReadIntermediateResultCost(RangeTblEntry *rangeTableEntry,
|
||||||
|
@ -648,7 +647,7 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
|
||||||
/*
|
/*
|
||||||
* For select queries we, if router executor is enabled, first try to
|
* For select queries we, if router executor is enabled, first try to
|
||||||
* plan the query as a router query. If not supported, otherwise try
|
* plan the query as a router query. If not supported, otherwise try
|
||||||
* the full blown plan/optimize/physical planing process needed to
|
* the full blown plan/optimize/physical planning process needed to
|
||||||
* produce distributed query plans.
|
* produce distributed query plans.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
@ -1151,7 +1150,7 @@ FinalizeRouterPlan(PlannedStmt *localPlan, CustomScan *customScan)
|
||||||
* "bless" the tuple descriptor, which adds a transient type to the type cache
|
* "bless" the tuple descriptor, which adds a transient type to the type cache
|
||||||
* and assigns it a type mod value, which is the key in the type cache.
|
* and assigns it a type mod value, which is the key in the type cache.
|
||||||
*/
|
*/
|
||||||
static int32
|
int32
|
||||||
BlessRecordExpression(Expr *expr)
|
BlessRecordExpression(Expr *expr)
|
||||||
{
|
{
|
||||||
int32 typeMod = -1;
|
int32 typeMod = -1;
|
||||||
|
|
|
@ -1382,7 +1382,7 @@ MasterExtendedOpNode(MultiExtendedOp *originalOpNode,
|
||||||
foreach(targetEntryCell, targetEntryList)
|
foreach(targetEntryCell, targetEntryList)
|
||||||
{
|
{
|
||||||
TargetEntry *originalTargetEntry = (TargetEntry *) lfirst(targetEntryCell);
|
TargetEntry *originalTargetEntry = (TargetEntry *) lfirst(targetEntryCell);
|
||||||
TargetEntry *newTargetEntry = copyObject(originalTargetEntry);
|
TargetEntry *newTargetEntry = flatCopyTargetEntry(originalTargetEntry);
|
||||||
Expr *originalExpression = originalTargetEntry->expr;
|
Expr *originalExpression = originalTargetEntry->expr;
|
||||||
Expr *newExpression = NULL;
|
Expr *newExpression = NULL;
|
||||||
|
|
||||||
|
@ -1412,6 +1412,11 @@ MasterExtendedOpNode(MultiExtendedOp *originalOpNode,
|
||||||
column->varoattno = walkerContext->columnId;
|
column->varoattno = walkerContext->columnId;
|
||||||
walkerContext->columnId++;
|
walkerContext->columnId++;
|
||||||
|
|
||||||
|
if (column->vartype == RECORDOID)
|
||||||
|
{
|
||||||
|
column->vartypmod = BlessRecordExpression(originalTargetEntry->expr);
|
||||||
|
}
|
||||||
|
|
||||||
newExpression = (Expr *) column;
|
newExpression = (Expr *) column;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -777,7 +777,7 @@ MultiNodeTree(Query *queryTree)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ContainsReadIntermediateResultFunction determines whether an expresion tree contains
|
* ContainsReadIntermediateResultFunction determines whether an expresion tree contains
|
||||||
* a call to the read_intermediate_results function.
|
* a call to the read_intermediate_result function.
|
||||||
*/
|
*/
|
||||||
bool
|
bool
|
||||||
ContainsReadIntermediateResultFunction(Node *node)
|
ContainsReadIntermediateResultFunction(Node *node)
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
|
|
||||||
#include "postgres.h"
|
#include "postgres.h"
|
||||||
|
|
||||||
|
#include "catalog/pg_type.h"
|
||||||
#include "commands/extension.h"
|
#include "commands/extension.h"
|
||||||
#include "distributed/citus_ruleutils.h"
|
#include "distributed/citus_ruleutils.h"
|
||||||
#include "distributed/function_utils.h"
|
#include "distributed/function_utils.h"
|
||||||
|
@ -83,7 +84,7 @@ MasterNodeSelectPlan(DistributedPlan *distributedPlan, CustomScan *remoteScan)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* MasterTargetList uses the given worker target list's expressions, and creates
|
* MasterTargetList uses the given worker target list's expressions, and creates
|
||||||
* a target target list for the master node. This master target list keeps the
|
* a target list for the master node. This master target list keeps the
|
||||||
* temporary table's columns on the master node.
|
* temporary table's columns on the master node.
|
||||||
*/
|
*/
|
||||||
static List *
|
static List *
|
||||||
|
@ -105,13 +106,16 @@ MasterTargetList(List *workerTargetList)
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
masterTargetEntry = copyObject(workerTargetEntry);
|
|
||||||
|
|
||||||
masterColumn = makeVarFromTargetEntry(tableId, workerTargetEntry);
|
masterColumn = makeVarFromTargetEntry(tableId, workerTargetEntry);
|
||||||
masterColumn->varattno = columnId;
|
masterColumn->varattno = columnId;
|
||||||
masterColumn->varoattno = columnId;
|
masterColumn->varoattno = columnId;
|
||||||
columnId++;
|
columnId++;
|
||||||
|
|
||||||
|
if (masterColumn->vartype == RECORDOID)
|
||||||
|
{
|
||||||
|
masterColumn->vartypmod = BlessRecordExpression(workerTargetEntry->expr);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* The master target entry has two pieces to it. The first piece is the
|
* The master target entry has two pieces to it. The first piece is the
|
||||||
* target entry's expression, which we set to the newly created column.
|
* target entry's expression, which we set to the newly created column.
|
||||||
|
@ -119,6 +123,7 @@ MasterTargetList(List *workerTargetList)
|
||||||
* from the worker target entry. Note that any changes to worker target
|
* from the worker target entry. Note that any changes to worker target
|
||||||
* entry's sort and group clauses will *break* us here.
|
* entry's sort and group clauses will *break* us here.
|
||||||
*/
|
*/
|
||||||
|
masterTargetEntry = flatCopyTargetEntry(workerTargetEntry);
|
||||||
masterTargetEntry->expr = (Expr *) masterColumn;
|
masterTargetEntry->expr = (Expr *) masterColumn;
|
||||||
masterTargetList = lappend(masterTargetList, masterTargetEntry);
|
masterTargetList = lappend(masterTargetList, masterTargetEntry);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1542,7 +1542,7 @@ ShouldTransformRTE(RangeTblEntry *rangeTableEntry)
|
||||||
/*
|
/*
|
||||||
* We should wrap only function rtes that are not LATERAL and
|
* We should wrap only function rtes that are not LATERAL and
|
||||||
* without WITH ORDINALITY clause
|
* without WITH ORDINALITY clause
|
||||||
* */
|
*/
|
||||||
if (rangeTableEntry->rtekind != RTE_FUNCTION ||
|
if (rangeTableEntry->rtekind != RTE_FUNCTION ||
|
||||||
rangeTableEntry->lateral ||
|
rangeTableEntry->lateral ||
|
||||||
rangeTableEntry->funcordinality)
|
rangeTableEntry->funcordinality)
|
||||||
|
|
|
@ -122,5 +122,6 @@ extern Node * ResolveExternalParams(Node *inputNode, ParamListInfo boundParams);
|
||||||
extern bool IsMultiTaskPlan(struct DistributedPlan *distributedPlan);
|
extern bool IsMultiTaskPlan(struct DistributedPlan *distributedPlan);
|
||||||
extern RangeTblEntry * RemoteScanRangeTableEntry(List *columnNameList);
|
extern RangeTblEntry * RemoteScanRangeTableEntry(List *columnNameList);
|
||||||
extern int GetRTEIdentity(RangeTblEntry *rte);
|
extern int GetRTEIdentity(RangeTblEntry *rte);
|
||||||
|
extern int32 BlessRecordExpression(Expr *expr);
|
||||||
|
|
||||||
#endif /* DISTRIBUTED_PLANNER_H */
|
#endif /* DISTRIBUTED_PLANNER_H */
|
||||||
|
|
|
@ -77,6 +77,17 @@ SELECT DISTINCT l_orderkey, strpos(l_shipmode, 'I')
|
||||||
33 | 2
|
33 | 2
|
||||||
(5 rows)
|
(5 rows)
|
||||||
|
|
||||||
|
-- row types are supported
|
||||||
|
SELECT DISTINCT (l_orderkey, l_partkey) AS pair FROM lineitem_hash_part ORDER BY 1 LIMIT 5;
|
||||||
|
pair
|
||||||
|
-----------
|
||||||
|
(1,2132)
|
||||||
|
(1,15635)
|
||||||
|
(1,24027)
|
||||||
|
(1,63700)
|
||||||
|
(1,67310)
|
||||||
|
(5 rows)
|
||||||
|
|
||||||
-- distinct on partition column
|
-- distinct on partition column
|
||||||
-- verify counts match with respect to count(distinct)
|
-- verify counts match with respect to count(distinct)
|
||||||
CREATE TEMP TABLE temp_orderkeys AS SELECT DISTINCT l_orderkey FROM lineitem_hash_part;
|
CREATE TEMP TABLE temp_orderkeys AS SELECT DISTINCT l_orderkey FROM lineitem_hash_part;
|
||||||
|
|
|
@ -35,17 +35,52 @@ SELECT create_distributed_function('record_returner(int)');
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
INSERT INTO test VALUES (1,2), (1,3), (2,2), (2,3);
|
INSERT INTO test VALUES (1,2), (1,3), (2,2), (2,3);
|
||||||
-- multi-shard queries do not support row types
|
-- multi-shard queries support row types
|
||||||
SELECT (x,y) FROM test ORDER BY x, y;
|
SELECT (x,y) FROM test ORDER BY x, y;
|
||||||
ERROR: input of anonymous composite types is not implemented
|
row
|
||||||
|
-------
|
||||||
|
(1,2)
|
||||||
|
(1,3)
|
||||||
|
(2,2)
|
||||||
|
(2,3)
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
SELECT (x,y) FROM test GROUP BY x, y ORDER BY x, y;
|
SELECT (x,y) FROM test GROUP BY x, y ORDER BY x, y;
|
||||||
ERROR: input of anonymous composite types is not implemented
|
row
|
||||||
|
-------
|
||||||
|
(1,2)
|
||||||
|
(1,3)
|
||||||
|
(2,2)
|
||||||
|
(2,3)
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
select distinct (x,y) AS foo, x, y FROM test ORDER BY x, y;
|
select distinct (x,y) AS foo, x, y FROM test ORDER BY x, y;
|
||||||
ERROR: input of anonymous composite types is not implemented
|
foo | x | y
|
||||||
|
-------+---+---
|
||||||
|
(1,2) | 1 | 2
|
||||||
|
(1,3) | 1 | 3
|
||||||
|
(2,2) | 2 | 2
|
||||||
|
(2,3) | 2 | 3
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
SELECT table_returner(x) FROM test ORDER BY x, y;
|
SELECT table_returner(x) FROM test ORDER BY x, y;
|
||||||
ERROR: input of anonymous composite types is not implemented
|
table_returner
|
||||||
|
----------------
|
||||||
|
(1,1)
|
||||||
|
(1,1)
|
||||||
|
(2,2)
|
||||||
|
(2,2)
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
SELECT record_returner(x) FROM test ORDER BY x, y;
|
SELECT record_returner(x) FROM test ORDER BY x, y;
|
||||||
ERROR: input of anonymous composite types is not implemented
|
record_returner
|
||||||
|
-----------------
|
||||||
|
(2,returned)
|
||||||
|
(2,returned)
|
||||||
|
(3,returned)
|
||||||
|
(3,returned)
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
-- router queries support row types
|
-- router queries support row types
|
||||||
SELECT (x,y) FROM test WHERE x = 1 ORDER BY x, y;
|
SELECT (x,y) FROM test WHERE x = 1 ORDER BY x, y;
|
||||||
row
|
row
|
||||||
|
|
|
@ -30,6 +30,9 @@ SELECT DISTINCT l_orderkey, strpos(l_shipmode, 'I')
|
||||||
ORDER BY 2, 1
|
ORDER BY 2, 1
|
||||||
LIMIT 5;
|
LIMIT 5;
|
||||||
|
|
||||||
|
-- row types are supported
|
||||||
|
SELECT DISTINCT (l_orderkey, l_partkey) AS pair FROM lineitem_hash_part ORDER BY 1 LIMIT 5;
|
||||||
|
|
||||||
-- distinct on partition column
|
-- distinct on partition column
|
||||||
-- verify counts match with respect to count(distinct)
|
-- verify counts match with respect to count(distinct)
|
||||||
CREATE TEMP TABLE temp_orderkeys AS SELECT DISTINCT l_orderkey FROM lineitem_hash_part;
|
CREATE TEMP TABLE temp_orderkeys AS SELECT DISTINCT l_orderkey FROM lineitem_hash_part;
|
||||||
|
|
|
@ -26,7 +26,7 @@ SELECT create_distributed_function('record_returner(int)');
|
||||||
|
|
||||||
INSERT INTO test VALUES (1,2), (1,3), (2,2), (2,3);
|
INSERT INTO test VALUES (1,2), (1,3), (2,2), (2,3);
|
||||||
|
|
||||||
-- multi-shard queries do not support row types
|
-- multi-shard queries support row types
|
||||||
SELECT (x,y) FROM test ORDER BY x, y;
|
SELECT (x,y) FROM test ORDER BY x, y;
|
||||||
SELECT (x,y) FROM test GROUP BY x, y ORDER BY x, y;
|
SELECT (x,y) FROM test GROUP BY x, y ORDER BY x, y;
|
||||||
select distinct (x,y) AS foo, x, y FROM test ORDER BY x, y;
|
select distinct (x,y) AS foo, x, y FROM test ORDER BY x, y;
|
||||||
|
|
Loading…
Reference in New Issue