mirror of https://github.com/citusdata/citus.git
First stab at addressing 9.6 API changes
Still need to deal with ruleutils.pull/858/head
parent
cf88cc1ef4
commit
026b444fd2
|
@ -200,7 +200,7 @@ CopyQueryResults(List *masterCopyStmtList)
|
||||||
|
|
||||||
/* Execute query plan. */
|
/* Execute query plan. */
|
||||||
void
|
void
|
||||||
multi_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count)
|
multi_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, tuplecount_t count)
|
||||||
{
|
{
|
||||||
int eflags = queryDesc->estate->es_top_eflags;
|
int eflags = queryDesc->estate->es_top_eflags;
|
||||||
|
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
#include "access/nbtree.h"
|
#include "access/nbtree.h"
|
||||||
#include "access/heapam.h"
|
#include "access/heapam.h"
|
||||||
#include "access/htup_details.h"
|
#include "access/htup_details.h"
|
||||||
|
#include "catalog/pg_am.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/multi_join_order.h"
|
#include "distributed/multi_join_order.h"
|
||||||
#include "distributed/multi_physical_planner.h"
|
#include "distributed/multi_physical_planner.h"
|
||||||
|
|
|
@ -1526,6 +1526,11 @@ MasterAggregateExpression(Aggref *originalAggregate,
|
||||||
newMasterAggregate->aggdistinct = NULL;
|
newMasterAggregate->aggdistinct = NULL;
|
||||||
newMasterAggregate->aggfnoid = sumFunctionId;
|
newMasterAggregate->aggfnoid = sumFunctionId;
|
||||||
newMasterAggregate->aggtype = masterReturnType;
|
newMasterAggregate->aggtype = masterReturnType;
|
||||||
|
#if (PG_VERSION_NUM >= 90600)
|
||||||
|
newMasterAggregate->aggtranstype = INTERNALOID;
|
||||||
|
newMasterAggregate->aggargtypes = list_make1_oid(NUMERICOID);
|
||||||
|
newMasterAggregate->aggsplit = AGGSPLIT_SIMPLE;
|
||||||
|
#endif
|
||||||
|
|
||||||
column = makeVar(masterTableId, walkerContext->columnId, workerReturnType,
|
column = makeVar(masterTableId, walkerContext->columnId, workerReturnType,
|
||||||
workerReturnTypeMod, workerCollationId, columnLevelsUp);
|
workerReturnTypeMod, workerCollationId, columnLevelsUp);
|
||||||
|
@ -1682,6 +1687,11 @@ MasterAverageExpression(Oid sumAggregateType, Oid countAggregateType,
|
||||||
firstSum->aggtype = get_func_rettype(firstSum->aggfnoid);
|
firstSum->aggtype = get_func_rettype(firstSum->aggfnoid);
|
||||||
firstSum->args = list_make1(firstTargetEntry);
|
firstSum->args = list_make1(firstTargetEntry);
|
||||||
firstSum->aggkind = AGGKIND_NORMAL;
|
firstSum->aggkind = AGGKIND_NORMAL;
|
||||||
|
#if (PG_VERSION_NUM >= 90600)
|
||||||
|
firstSum->aggtranstype = INTERNALOID;
|
||||||
|
firstSum->aggargtypes = list_make1_oid(NUMERICOID);
|
||||||
|
firstSum->aggsplit = AGGSPLIT_SIMPLE;
|
||||||
|
#endif
|
||||||
|
|
||||||
/* create the second argument for sum(column2) */
|
/* create the second argument for sum(column2) */
|
||||||
secondColumn = makeVar(masterTableId, (*columnId), countAggregateType,
|
secondColumn = makeVar(masterTableId, (*columnId), countAggregateType,
|
||||||
|
@ -1694,6 +1704,11 @@ MasterAverageExpression(Oid sumAggregateType, Oid countAggregateType,
|
||||||
secondSum->aggtype = get_func_rettype(secondSum->aggfnoid);
|
secondSum->aggtype = get_func_rettype(secondSum->aggfnoid);
|
||||||
secondSum->args = list_make1(secondTargetEntry);
|
secondSum->args = list_make1(secondTargetEntry);
|
||||||
secondSum->aggkind = AGGKIND_NORMAL;
|
secondSum->aggkind = AGGKIND_NORMAL;
|
||||||
|
#if (PG_VERSION_NUM >= 90600)
|
||||||
|
secondSum->aggtranstype = INTERNALOID;
|
||||||
|
secondSum->aggargtypes = list_make1_oid(NUMERICOID);
|
||||||
|
secondSum->aggsplit = AGGSPLIT_SIMPLE;
|
||||||
|
#endif
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Build the division operator between these two aggregates. This function
|
* Build the division operator between these two aggregates. This function
|
||||||
|
@ -2365,8 +2380,17 @@ ErrorIfContainsUnsupportedAggregate(MultiNode *logicalPlanNode)
|
||||||
MultiExtendedOp *extendedOpNode = (MultiExtendedOp *) linitial(opNodeList);
|
MultiExtendedOp *extendedOpNode = (MultiExtendedOp *) linitial(opNodeList);
|
||||||
|
|
||||||
List *targetList = extendedOpNode->targetList;
|
List *targetList = extendedOpNode->targetList;
|
||||||
|
#if (PG_VERSION_NUM >= 90600)
|
||||||
|
|
||||||
|
/*
|
||||||
|
* PVC_REJECT_PLACEHOLDERS is now implicit if PVC_INCLUDE_PLACEHOLDERS
|
||||||
|
* isn't specified.
|
||||||
|
*/
|
||||||
|
List *expressionList = pull_var_clause((Node *) targetList, PVC_INCLUDE_AGGREGATES);
|
||||||
|
#else
|
||||||
List *expressionList = pull_var_clause((Node *) targetList, PVC_INCLUDE_AGGREGATES,
|
List *expressionList = pull_var_clause((Node *) targetList, PVC_INCLUDE_AGGREGATES,
|
||||||
PVC_REJECT_PLACEHOLDERS);
|
PVC_REJECT_PLACEHOLDERS);
|
||||||
|
#endif
|
||||||
|
|
||||||
ListCell *expressionCell = NULL;
|
ListCell *expressionCell = NULL;
|
||||||
foreach(expressionCell, expressionList)
|
foreach(expressionCell, expressionList)
|
||||||
|
|
|
@ -1612,8 +1612,18 @@ ExtractRangeTableEntryWalker(Node *node, List **rangeTableList)
|
||||||
List *
|
List *
|
||||||
pull_var_clause_default(Node *node)
|
pull_var_clause_default(Node *node)
|
||||||
{
|
{
|
||||||
|
#if (PG_VERSION_NUM >= 90600)
|
||||||
|
|
||||||
|
/*
|
||||||
|
* PVC_REJECT_PLACEHOLDERS is now implicit if PVC_INCLUDE_PLACEHOLDERS
|
||||||
|
* isn't specified.
|
||||||
|
*/
|
||||||
|
List *columnList = pull_var_clause(node, PVC_RECURSE_AGGREGATES);
|
||||||
|
#else
|
||||||
List *columnList = pull_var_clause(node, PVC_RECURSE_AGGREGATES,
|
List *columnList = pull_var_clause(node, PVC_RECURSE_AGGREGATES,
|
||||||
PVC_REJECT_PLACEHOLDERS);
|
PVC_REJECT_PLACEHOLDERS);
|
||||||
|
#endif
|
||||||
|
|
||||||
return columnList;
|
return columnList;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -140,9 +140,13 @@ BuildAggregatePlan(Query *masterQuery, Plan *subPlan)
|
||||||
havingQual = masterQuery->havingQual;
|
havingQual = masterQuery->havingQual;
|
||||||
|
|
||||||
/* estimate aggregate execution costs */
|
/* estimate aggregate execution costs */
|
||||||
MemSet(&aggregateCosts, 0, sizeof(AggClauseCosts));
|
memset(&aggregateCosts, 0, sizeof(AggClauseCosts));
|
||||||
|
#if (PG_VERSION_NUM >= 90600)
|
||||||
|
get_agg_clause_costs(NULL, (Node *) aggregateTargetList, AGGSPLIT_SIMPLE,
|
||||||
|
&aggregateCosts);
|
||||||
|
#else
|
||||||
count_agg_clauses(NULL, (Node *) aggregateTargetList, &aggregateCosts);
|
count_agg_clauses(NULL, (Node *) aggregateTargetList, &aggregateCosts);
|
||||||
count_agg_clauses(NULL, havingQual, &aggregateCosts);
|
#endif
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* For upper level plans above the sequential scan, the planner expects the
|
* For upper level plans above the sequential scan, the planner expects the
|
||||||
|
@ -178,10 +182,16 @@ BuildAggregatePlan(Query *masterQuery, Plan *subPlan)
|
||||||
}
|
}
|
||||||
|
|
||||||
/* finally create the plan */
|
/* finally create the plan */
|
||||||
aggregatePlan = make_agg(NULL, aggregateTargetList, (List *) havingQual,
|
#if (PG_VERSION_NUM >= 90600)
|
||||||
aggregateStrategy, &aggregateCosts, groupColumnCount,
|
aggregatePlan = make_agg(aggregateTargetList, (List *) havingQual, aggregateStrategy,
|
||||||
groupColumnIdArray, groupColumnOpArray, NIL,
|
AGGSPLIT_SIMPLE, groupColumnCount, groupColumnIdArray,
|
||||||
|
groupColumnOpArray, NIL, NIL,
|
||||||
rowEstimate, subPlan);
|
rowEstimate, subPlan);
|
||||||
|
#else
|
||||||
|
aggregatePlan = make_agg(NULL, aggregateTargetList, (List *) havingQual, aggregateStrategy,
|
||||||
|
&aggregateCosts, groupColumnCount, groupColumnIdArray,
|
||||||
|
groupColumnOpArray, NIL, rowEstimate, subPlan);
|
||||||
|
#endif
|
||||||
|
|
||||||
return aggregatePlan;
|
return aggregatePlan;
|
||||||
}
|
}
|
||||||
|
@ -247,7 +257,11 @@ BuildSelectStatement(Query *masterQuery, char *masterTableName,
|
||||||
if (masterQuery->sortClause)
|
if (masterQuery->sortClause)
|
||||||
{
|
{
|
||||||
List *sortClauseList = masterQuery->sortClause;
|
List *sortClauseList = masterQuery->sortClause;
|
||||||
|
#if (PG_VERSION_NUM >= 90600)
|
||||||
|
Sort *sortPlan = make_sort_from_sortclauses(sortClauseList, topLevelPlan);
|
||||||
|
#else
|
||||||
Sort *sortPlan = make_sort_from_sortclauses(NULL, sortClauseList, topLevelPlan);
|
Sort *sortPlan = make_sort_from_sortclauses(NULL, sortClauseList, topLevelPlan);
|
||||||
|
#endif
|
||||||
topLevelPlan = (Plan *) sortPlan;
|
topLevelPlan = (Plan *) sortPlan;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -256,11 +270,15 @@ BuildSelectStatement(Query *masterQuery, char *masterTableName,
|
||||||
{
|
{
|
||||||
Node *limitCount = masterQuery->limitCount;
|
Node *limitCount = masterQuery->limitCount;
|
||||||
Node *limitOffset = masterQuery->limitOffset;
|
Node *limitOffset = masterQuery->limitOffset;
|
||||||
|
#if (PG_VERSION_NUM >= 90600)
|
||||||
|
Limit *limitPlan = make_limit(topLevelPlan, limitOffset, limitCount);
|
||||||
|
#else
|
||||||
int64 offsetEstimate = 0;
|
int64 offsetEstimate = 0;
|
||||||
int64 countEstimate = 0;
|
int64 countEstimate = 0;
|
||||||
|
|
||||||
Limit *limitPlan = make_limit(topLevelPlan, limitOffset, limitCount,
|
Limit *limitPlan = make_limit(topLevelPlan, limitOffset, limitCount,
|
||||||
offsetEstimate, countEstimate);
|
offsetEstimate, countEstimate);
|
||||||
|
#endif
|
||||||
topLevelPlan = (Plan *) limitPlan;
|
topLevelPlan = (Plan *) limitPlan;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -535,10 +535,21 @@ MasterIrreducibleExpressionWalker(Node *expression, WalkerState *state)
|
||||||
* Once you've added them to this check, make sure you also evaluate them in the
|
* Once you've added them to this check, make sure you also evaluate them in the
|
||||||
* executor!
|
* executor!
|
||||||
*/
|
*/
|
||||||
StaticAssertStmt(PG_VERSION_NUM < 90600, "When porting to a newer PG this section"
|
StaticAssertStmt(PG_VERSION_NUM < 90700, "When porting to a newer PG this section"
|
||||||
" needs to be reviewed.");
|
" needs to be reviewed.");
|
||||||
|
if (IsA(expression, Aggref))
|
||||||
|
{
|
||||||
|
Aggref *expr = (Aggref *) expression;
|
||||||
|
|
||||||
if (IsA(expression, OpExpr))
|
volatileFlag = func_volatile(expr->aggfnoid);
|
||||||
|
}
|
||||||
|
else if (IsA(expression, WindowFunc))
|
||||||
|
{
|
||||||
|
WindowFunc *expr = (WindowFunc *) expression;
|
||||||
|
|
||||||
|
volatileFlag = func_volatile(expr->winfnoid);
|
||||||
|
}
|
||||||
|
else if (IsA(expression, OpExpr))
|
||||||
{
|
{
|
||||||
OpExpr *expr = (OpExpr *) expression;
|
OpExpr *expr = (OpExpr *) expression;
|
||||||
|
|
||||||
|
|
|
@ -89,9 +89,15 @@ FakeGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid)
|
||||||
Cost startup_cost = 0;
|
Cost startup_cost = 0;
|
||||||
Cost total_cost = startup_cost + baserel->rows;
|
Cost total_cost = startup_cost + baserel->rows;
|
||||||
|
|
||||||
|
#if (PG_VERSION_NUM >= 90600)
|
||||||
|
add_path(baserel, (Path *) create_foreignscan_path(root, baserel, NULL, baserel->rows,
|
||||||
|
startup_cost, total_cost, NIL,
|
||||||
|
NULL, NULL, NIL));
|
||||||
|
#else
|
||||||
add_path(baserel, (Path *) create_foreignscan_path(root, baserel, baserel->rows,
|
add_path(baserel, (Path *) create_foreignscan_path(root, baserel, baserel->rows,
|
||||||
startup_cost, total_cost, NIL,
|
startup_cost, total_cost, NIL,
|
||||||
NULL, NULL, NIL));
|
NULL, NULL, NIL));
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
#include "access/xact.h"
|
#include "access/xact.h"
|
||||||
#include "access/sysattr.h"
|
#include "access/sysattr.h"
|
||||||
#include "catalog/indexing.h"
|
#include "catalog/indexing.h"
|
||||||
|
#include "catalog/pg_am.h"
|
||||||
#include "catalog/pg_extension.h"
|
#include "catalog/pg_extension.h"
|
||||||
#include "catalog/pg_namespace.h"
|
#include "catalog/pg_namespace.h"
|
||||||
#include "catalog/pg_type.h"
|
#include "catalog/pg_type.h"
|
||||||
|
|
|
@ -25,6 +25,7 @@
|
||||||
#include "access/hash.h"
|
#include "access/hash.h"
|
||||||
#include "access/htup_details.h"
|
#include "access/htup_details.h"
|
||||||
#include "access/nbtree.h"
|
#include "access/nbtree.h"
|
||||||
|
#include "catalog/pg_am.h"
|
||||||
#include "catalog/pg_collation.h"
|
#include "catalog/pg_collation.h"
|
||||||
#include "commands/copy.h"
|
#include "commands/copy.h"
|
||||||
#include "commands/defrem.h"
|
#include "commands/defrem.h"
|
||||||
|
|
|
@ -17,9 +17,15 @@
|
||||||
#define EXEC_FLAG_CITUS_MASTER_SELECT 0x100
|
#define EXEC_FLAG_CITUS_MASTER_SELECT 0x100
|
||||||
#define EXEC_FLAG_CITUS_ROUTER_EXECUTOR 0x200
|
#define EXEC_FLAG_CITUS_ROUTER_EXECUTOR 0x200
|
||||||
|
|
||||||
|
#if (PG_VERSION_NUM >= 90600)
|
||||||
|
#define tuplecount_t uint64
|
||||||
|
#else
|
||||||
|
#define tuplecount_t long
|
||||||
|
#endif
|
||||||
|
|
||||||
extern void multi_ExecutorStart(QueryDesc *queryDesc, int eflags);
|
extern void multi_ExecutorStart(QueryDesc *queryDesc, int eflags);
|
||||||
extern void multi_ExecutorRun(QueryDesc *queryDesc,
|
extern void multi_ExecutorRun(QueryDesc *queryDesc,
|
||||||
ScanDirection direction, long count);
|
ScanDirection direction, tuplecount_t count);
|
||||||
extern void multi_ExecutorFinish(QueryDesc *queryDesc);
|
extern void multi_ExecutorFinish(QueryDesc *queryDesc);
|
||||||
extern void multi_ExecutorEnd(QueryDesc *queryDesc);
|
extern void multi_ExecutorEnd(QueryDesc *queryDesc);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue