Merge pull request #850 from citusdata/add_9.6_support

Support PostgreSQL 9.6

cr: @anarazel
pull/869/merge
Jason Petersen 2016-10-18 16:30:30 -06:00 committed by GitHub
commit bd9a433709
43 changed files with 10335 additions and 459 deletions

1
.gitattributes vendored
View File

@ -29,4 +29,5 @@ src/backend/distributed/utils/citus_outfuncs.c -citus-style
src/backend/distributed/utils/citus_read.c -citus-style
src/backend/distributed/utils/citus_readfuncs_95.c -citus-style
src/backend/distributed/utils/ruleutils_95.c -citus-style
src/backend/distributed/utils/ruleutils_96.c -citus-style
src/include/distributed/citus_nodes.h -citus-style

View File

@ -9,8 +9,9 @@ env:
secure: degV+qb2xHiea7E2dGk/WLvmYjq4ZsBn6ZPko+YhRcNm2GRXRaU3FqMBIecPtsEEFYaL5GwCQq/CgBf9aQxgDQ+t2CrmtGTtI9AGAbVBl//amNeJOoLe6QvrDpSQX5pUxwDLCng8cvoQK7ZxGlNCzDKiu4Ep4DUWgQVpauJkQ9nHjtSMZvUqCoI9h1lBy9Mxh7YFfHPW2PAXCqpV4VlNiIYF84UKdX3MXKLy9Yt0JBSNTWLZFp/fFw2qNwzFvN94rF3ZvFSD7Wp6CIhT6R5/6k6Zx8YQIrjWhgm6OVy1osUA8X7W79h2ISPqKqMNVJkjJ+N8S4xuQU0kfejnQ74Ie/uJiHCmbW5W2TjpL1aU3FQpPsGwR8h0rSeHhJAJzd8Ma+z8vvnnQHDyvetPBB0WgA/VMQCu8uEutyfYw2hDmB2+l2dDwkViaI7R95bReAGrpd5uNqklAXuR7yOeArz0ZZpHV0aZHGcNBxznMaZExSVZ5DVPW38UPn7Kgse8BnOWeLgnA1hJVp6CmBCtu+hKYt+atBPgRbM8IUINnKKZf/Sk6HeJIJZs662jD8/X93vFi0ZtyV2jEKJpouWw8j4vrGGsaDzTEUcyJgDqZj7tPJptM2L5B3BcFJmkGj2HO3N+LGDarJrVBBSiEjhTgx4NnLiKZnUbMx547mCRg2akk2w=
matrix:
- PGVERSION=9.5
- PGVERSION=9.6
before_install:
- git clone -b v0.4.1 --depth 1 https://github.com/citusdata/tools.git
- git clone -b v0.4.3 --depth 1 https://github.com/citusdata/tools.git
- sudo make -C tools install
- setup_apt
- curl https://install.citusdata.com/community/deb.sh | sudo bash

2
configure vendored
View File

@ -1915,7 +1915,7 @@ if test -z "$version_num"; then
as_fn_error $? "Could not detect PostgreSQL version from pg_config." "$LINENO" 5
fi
if test "$version_num" != '9.5'; then
if test "$version_num" != '9.5' -a "$version_num" != '9.6'; then
as_fn_error $? "Citus is not compatible with the detected PostgreSQL version ${version_num}." "$LINENO" 5
else
{ $as_echo "$as_me:${as_lineno-$LINENO}: building against PostgreSQL $version_num" >&5

View File

@ -36,7 +36,7 @@ if test -z "$version_num"; then
AC_MSG_ERROR([Could not detect PostgreSQL version from pg_config.])
fi
if test "$version_num" != '9.5'; then
if test "$version_num" != '9.5' -a "$version_num" != '9.6'; then
AC_MSG_ERROR([Citus is not compatible with the detected PostgreSQL version ${version_num}.])
else
AC_MSG_NOTICE([building against PostgreSQL $version_num])

View File

@ -200,7 +200,7 @@ CopyQueryResults(List *masterCopyStmtList)
/* Execute query plan. */
void
multi_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count)
multi_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, tuplecount_t count)
{
int eflags = queryDesc->estate->es_top_eflags;

View File

@ -18,6 +18,7 @@
#include "commands/dbcommands.h"
#include "commands/explain.h"
#include "commands/tablecmds.h"
#include "optimizer/cost.h"
#include "distributed/citus_nodefuncs.h"
#include "distributed/multi_client_executor.h"
#include "distributed/multi_executor.h"
@ -108,9 +109,22 @@ MultiExplainOneQuery(Query *query, IntoClause *into, ExplainState *es,
instr_time planDuration;
Query *originalQuery = NULL;
RelationRestrictionContext *restrictionContext = NULL;
/* if local query, run the standard explain and return */
bool localQuery = !NeedsDistributedPlanning(query);
int cursorOptions = 0;
#if PG_VERSION_NUM >= 90600
/*
* Allow parallel plans in 9.6+ unless selecting into a table.
* Without this, we're breaking explain for non-Citus plans.
*/
if (!into)
{
cursorOptions |= CURSOR_OPT_PARALLEL_OK;
}
#endif
/* handle local queries in the same way as ExplainOneQuery */
if (localQuery)
{
PlannedStmt *plan = NULL;
@ -118,7 +132,7 @@ MultiExplainOneQuery(Query *query, IntoClause *into, ExplainState *es,
INSTR_TIME_SET_CURRENT(planStart);
/* plan the query */
plan = pg_plan_query(query, 0, params);
plan = pg_plan_query(query, cursorOptions, params);
INSTR_TIME_SET_CURRENT(planDuration);
INSTR_TIME_SUBTRACT(planDuration, planStart);
@ -143,7 +157,7 @@ MultiExplainOneQuery(Query *query, IntoClause *into, ExplainState *es,
PG_TRY();
{
/* call standard planner to modify the query structure before multi planning */
initialPlan = standard_planner(query, 0, params);
initialPlan = standard_planner(query, cursorOptions, params);
commandType = initialPlan->commandType;
if (commandType == CMD_INSERT || commandType == CMD_UPDATE ||

View File

@ -17,6 +17,7 @@
#include "access/nbtree.h"
#include "access/heapam.h"
#include "access/htup_details.h"
#include "catalog/pg_am.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_join_order.h"
#include "distributed/multi_physical_planner.h"

View File

@ -39,6 +39,7 @@
#include "optimizer/clauses.h"
#include "optimizer/tlist.h"
#include "optimizer/var.h"
#include "parser/parse_agg.h"
#include "parser/parse_coerce.h"
#include "parser/parse_oper.h"
#include "utils/builtins.h"
@ -1273,6 +1274,7 @@ MasterExtendedOpNode(MultiExtendedOp *originalOpNode)
{
Node *newNode = MasterAggregateMutator((Node *) originalExpression,
walkerContext);
newExpression = (Expr *) newNode;
}
else
@ -1379,6 +1381,7 @@ MasterAggregateExpression(Aggref *originalAggregate,
const uint32 masterTableId = 1; /* one table on the master node */
const Index columnLevelsUp = 0; /* normal column */
const AttrNumber argumentId = 1; /* our aggregates have single arguments */
AggClauseCosts aggregateCosts;
if (aggregateType == AGGREGATE_COUNT && originalAggregate->aggdistinct &&
CountDistinctErrorRate == DISABLE_DISTINCT_APPROXIMATION &&
@ -1467,6 +1470,11 @@ MasterAggregateExpression(Aggref *originalAggregate,
unionAggregate->aggtype = hllType;
unionAggregate->args = list_make1(hllTargetEntry);
unionAggregate->aggkind = AGGKIND_NORMAL;
#if (PG_VERSION_NUM >= 90600)
unionAggregate->aggtranstype = InvalidOid;
unionAggregate->aggargtypes = list_make1_oid(unionAggregate->aggtype);
unionAggregate->aggsplit = AGGSPLIT_SIMPLE;
#endif
cardinalityExpression = makeNode(FuncExpr);
cardinalityExpression->funcid = cardinalityFunctionId;
@ -1526,6 +1534,11 @@ MasterAggregateExpression(Aggref *originalAggregate,
newMasterAggregate->aggdistinct = NULL;
newMasterAggregate->aggfnoid = sumFunctionId;
newMasterAggregate->aggtype = masterReturnType;
#if (PG_VERSION_NUM >= 90600)
newMasterAggregate->aggtranstype = InvalidOid;
newMasterAggregate->aggargtypes = list_make1_oid(newMasterAggregate->aggtype);
newMasterAggregate->aggsplit = AGGSPLIT_SIMPLE;
#endif
column = makeVar(masterTableId, walkerContext->columnId, workerReturnType,
workerReturnTypeMod, workerCollationId, columnLevelsUp);
@ -1590,6 +1603,11 @@ MasterAggregateExpression(Aggref *originalAggregate,
newMasterAggregate = copyObject(originalAggregate);
newMasterAggregate->aggfnoid = aggregateFunctionId;
newMasterAggregate->args = list_make1(arrayCatAggArgument);
#if (PG_VERSION_NUM >= 90600)
newMasterAggregate->aggtranstype = InvalidOid;
newMasterAggregate->aggargtypes = list_make1_oid(ANYARRAYOID);
newMasterAggregate->aggsplit = AGGSPLIT_SIMPLE;
#endif
newMasterExpression = (Expr *) newMasterAggregate;
}
@ -1640,6 +1658,16 @@ MasterAggregateExpression(Aggref *originalAggregate,
newMasterExpression = typeConvertedExpression;
}
/* Run AggRefs through cost machinery to mark required fields sanely */
memset(&aggregateCosts, 0, sizeof(aggregateCosts));
#if PG_VERSION_NUM >= 90600
get_agg_clause_costs(NULL, (Node *) newMasterExpression, AGGSPLIT_SIMPLE,
&aggregateCosts);
#else
count_agg_clauses(NULL, (Node *) newMasterExpression, &aggregateCosts);
#endif
return newMasterExpression;
}
@ -1682,6 +1710,11 @@ MasterAverageExpression(Oid sumAggregateType, Oid countAggregateType,
firstSum->aggtype = get_func_rettype(firstSum->aggfnoid);
firstSum->args = list_make1(firstTargetEntry);
firstSum->aggkind = AGGKIND_NORMAL;
#if (PG_VERSION_NUM >= 90600)
firstSum->aggtranstype = InvalidOid;
firstSum->aggargtypes = list_make1_oid(firstSum->aggtype);
firstSum->aggsplit = AGGSPLIT_SIMPLE;
#endif
/* create the second argument for sum(column2) */
secondColumn = makeVar(masterTableId, (*columnId), countAggregateType,
@ -1694,6 +1727,11 @@ MasterAverageExpression(Oid sumAggregateType, Oid countAggregateType,
secondSum->aggtype = get_func_rettype(secondSum->aggfnoid);
secondSum->args = list_make1(secondTargetEntry);
secondSum->aggkind = AGGKIND_NORMAL;
#if (PG_VERSION_NUM >= 90600)
secondSum->aggtranstype = InvalidOid;
secondSum->aggargtypes = list_make1_oid(firstSum->aggtype);
secondSum->aggsplit = AGGSPLIT_SIMPLE;
#endif
/*
* Build the division operator between these two aggregates. This function
@ -1798,6 +1836,7 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode)
if (hasAggregates)
{
WorkerAggregateWalker((Node *) originalExpression, walkerContext);
newExpressionList = walkerContext->expressionList;
}
else
@ -1961,6 +2000,7 @@ WorkerAggregateExpressionList(Aggref *originalAggregate,
{
AggregateType aggregateType = GetAggregateType(originalAggregate->aggfnoid);
List *workerAggregateList = NIL;
AggClauseCosts aggregateCosts;
if (aggregateType == AGGREGATE_COUNT && originalAggregate->aggdistinct &&
CountDistinctErrorRate == DISABLE_DISTINCT_APPROXIMATION &&
@ -2060,9 +2100,20 @@ WorkerAggregateExpressionList(Aggref *originalAggregate,
sumAggregate->aggfnoid = AggregateFunctionOid(sumAggregateName, argumentType);
sumAggregate->aggtype = get_func_rettype(sumAggregate->aggfnoid);
#if (PG_VERSION_NUM >= 90600)
sumAggregate->aggtranstype = InvalidOid;
sumAggregate->aggargtypes = list_make1_oid(argumentType);
sumAggregate->aggsplit = AGGSPLIT_SIMPLE;
#endif
/* count has any input type */
countAggregate->aggfnoid = AggregateFunctionOid(countAggregateName, ANYOID);
countAggregate->aggtype = get_func_rettype(countAggregate->aggfnoid);
#if (PG_VERSION_NUM >= 90600)
countAggregate->aggtranstype = InvalidOid;
countAggregate->aggargtypes = list_make1_oid(argumentType);
countAggregate->aggsplit = AGGSPLIT_SIMPLE;
#endif
workerAggregateList = lappend(workerAggregateList, sumAggregate);
workerAggregateList = lappend(workerAggregateList, countAggregate);
@ -2077,6 +2128,17 @@ WorkerAggregateExpressionList(Aggref *originalAggregate,
workerAggregateList = lappend(workerAggregateList, workerAggregate);
}
/* Run AggRefs through cost machinery to mark required fields sanely */
memset(&aggregateCosts, 0, sizeof(aggregateCosts));
#if PG_VERSION_NUM >= 90600
get_agg_clause_costs(NULL, (Node *) workerAggregateList, AGGSPLIT_SIMPLE,
&aggregateCosts);
#else
count_agg_clauses(NULL, (Node *) workerAggregateList, &aggregateCosts);
#endif
return workerAggregateList;
}
@ -2365,8 +2427,18 @@ ErrorIfContainsUnsupportedAggregate(MultiNode *logicalPlanNode)
MultiExtendedOp *extendedOpNode = (MultiExtendedOp *) linitial(opNodeList);
List *targetList = extendedOpNode->targetList;
#if (PG_VERSION_NUM >= 90600)
/*
* PVC_REJECT_PLACEHOLDERS is now implicit if PVC_INCLUDE_PLACEHOLDERS
* isn't specified.
*/
List *expressionList = pull_var_clause((Node *) targetList, PVC_INCLUDE_AGGREGATES);
#else
List *expressionList = pull_var_clause((Node *) targetList, PVC_INCLUDE_AGGREGATES,
PVC_REJECT_PLACEHOLDERS);
#endif
ListCell *expressionCell = NULL;
foreach(expressionCell, expressionList)

View File

@ -1612,8 +1612,18 @@ ExtractRangeTableEntryWalker(Node *node, List **rangeTableList)
List *
pull_var_clause_default(Node *node)
{
#if (PG_VERSION_NUM >= 90600)
/*
* PVC_REJECT_PLACEHOLDERS is now implicit if PVC_INCLUDE_PLACEHOLDERS
* isn't specified.
*/
List *columnList = pull_var_clause(node, PVC_RECURSE_AGGREGATES);
#else
List *columnList = pull_var_clause(node, PVC_RECURSE_AGGREGATES,
PVC_REJECT_PLACEHOLDERS);
#endif
return columnList;
}

View File

@ -140,9 +140,15 @@ BuildAggregatePlan(Query *masterQuery, Plan *subPlan)
havingQual = masterQuery->havingQual;
/* estimate aggregate execution costs */
MemSet(&aggregateCosts, 0, sizeof(AggClauseCosts));
memset(&aggregateCosts, 0, sizeof(AggClauseCosts));
#if (PG_VERSION_NUM >= 90600)
get_agg_clause_costs(NULL, (Node *) aggregateTargetList, AGGSPLIT_SIMPLE,
&aggregateCosts);
get_agg_clause_costs(NULL, (Node *) havingQual, AGGSPLIT_SIMPLE, &aggregateCosts);
#else
count_agg_clauses(NULL, (Node *) aggregateTargetList, &aggregateCosts);
count_agg_clauses(NULL, havingQual, &aggregateCosts);
count_agg_clauses(NULL, (Node *) havingQual, &aggregateCosts);
#endif
/*
* For upper level plans above the sequential scan, the planner expects the
@ -178,10 +184,17 @@ BuildAggregatePlan(Query *masterQuery, Plan *subPlan)
}
/* finally create the plan */
aggregatePlan = make_agg(NULL, aggregateTargetList, (List *) havingQual,
aggregateStrategy, &aggregateCosts, groupColumnCount,
groupColumnIdArray, groupColumnOpArray, NIL,
#if (PG_VERSION_NUM >= 90600)
aggregatePlan = make_agg(aggregateTargetList, (List *) havingQual, aggregateStrategy,
AGGSPLIT_SIMPLE, groupColumnCount, groupColumnIdArray,
groupColumnOpArray, NIL, NIL,
rowEstimate, subPlan);
#else
aggregatePlan = make_agg(NULL, aggregateTargetList, (List *) havingQual,
aggregateStrategy,
&aggregateCosts, groupColumnCount, groupColumnIdArray,
groupColumnOpArray, NIL, rowEstimate, subPlan);
#endif
return aggregatePlan;
}
@ -247,7 +260,11 @@ BuildSelectStatement(Query *masterQuery, char *masterTableName,
if (masterQuery->sortClause)
{
List *sortClauseList = masterQuery->sortClause;
#if (PG_VERSION_NUM >= 90600)
Sort *sortPlan = make_sort_from_sortclauses(sortClauseList, topLevelPlan);
#else
Sort *sortPlan = make_sort_from_sortclauses(NULL, sortClauseList, topLevelPlan);
#endif
topLevelPlan = (Plan *) sortPlan;
}
@ -256,11 +273,15 @@ BuildSelectStatement(Query *masterQuery, char *masterTableName,
{
Node *limitCount = masterQuery->limitCount;
Node *limitOffset = masterQuery->limitOffset;
#if (PG_VERSION_NUM >= 90600)
Limit *limitPlan = make_limit(topLevelPlan, limitOffset, limitCount);
#else
int64 offsetEstimate = 0;
int64 countEstimate = 0;
Limit *limitPlan = make_limit(topLevelPlan, limitOffset, limitCount,
offsetEstimate, countEstimate);
#endif
topLevelPlan = (Plan *) limitPlan;
}

View File

@ -535,10 +535,21 @@ MasterIrreducibleExpressionWalker(Node *expression, WalkerState *state)
* Once you've added them to this check, make sure you also evaluate them in the
* executor!
*/
StaticAssertStmt(PG_VERSION_NUM < 90600, "When porting to a newer PG this section"
StaticAssertStmt(PG_VERSION_NUM < 90700, "When porting to a newer PG this section"
" needs to be reviewed.");
if (IsA(expression, Aggref))
{
Aggref *expr = (Aggref *) expression;
if (IsA(expression, OpExpr))
volatileFlag = func_volatile(expr->aggfnoid);
}
else if (IsA(expression, WindowFunc))
{
WindowFunc *expr = (WindowFunc *) expression;
volatileFlag = func_volatile(expr->winfnoid);
}
else if (IsA(expression, OpExpr))
{
OpExpr *expr = (OpExpr *) expression;

View File

@ -18,6 +18,7 @@
#include "commands/explain.h"
#include "executor/executor.h"
#include "distributed/citus_nodefuncs.h"
#include "distributed/commit_protocol.h"
#include "distributed/master_protocol.h"
#include "distributed/multi_copy.h"
@ -129,6 +130,9 @@ _PG_init(void)
*/
RegisterCitusConfigVariables();
/* make our additional node types known */
RegisterNodes();
/* intercept planner */
planner_hook = multi_planner;

View File

@ -89,9 +89,15 @@ FakeGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid)
Cost startup_cost = 0;
Cost total_cost = startup_cost + baserel->rows;
#if (PG_VERSION_NUM >= 90600)
add_path(baserel, (Path *) create_foreignscan_path(root, baserel, NULL, baserel->rows,
startup_cost, total_cost, NIL,
NULL, NULL, NIL));
#else
add_path(baserel, (Path *) create_foreignscan_path(root, baserel, baserel->rows,
startup_cost, total_cost, NIL,
NULL, NULL, NIL));
#endif
}

View File

@ -11,9 +11,31 @@
#include "postgres.h"
#include "catalog/pg_type.h"
#include "distributed/citus_nodes.h"
#include "distributed/citus_nodefuncs.h"
#include "distributed/metadata_cache.h"
static const char *CitusNodeTagNamesD[] = {
"MultiNode",
"MultiTreeRoot",
"MultiProject",
"MultiCollect",
"MultiSelect",
"MultiTable",
"MultiJoin",
"MultiPartition",
"MultiCartesianProduct",
"MultiExtendedOp",
"Job",
"MapMergeJob",
"MultiPlan",
"Task",
"ShardInterval",
"ShardPlacement"
};
const char **CitusNodeTagNames = CitusNodeTagNamesD;
/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(citus_extradata_container);
@ -307,3 +329,84 @@ citus_extradata_container(PG_FUNCTION_ARGS)
PG_RETURN_NULL();
}
#if (PG_VERSION_NUM >= 90600)
static void
CopyUnsupportedCitusNode(struct ExtensibleNode *newnode,
const struct ExtensibleNode *oldnode)
{
ereport(ERROR, (errmsg("not implemented")));
}
static bool
EqualUnsupportedCitusNode(const struct ExtensibleNode *a,
const struct ExtensibleNode *b)
{
ereport(ERROR, (errmsg("not implemented")));
}
/* *INDENT-OFF* */
#define DEFINE_NODE_METHODS(type) \
{ \
#type, \
sizeof(type), \
CopyUnsupportedCitusNode, \
EqualUnsupportedCitusNode, \
Out##type, \
Read##type \
}
#define DEFINE_NODE_METHODS_NO_READ(type) \
{ \
#type, \
sizeof(type), \
CopyUnsupportedCitusNode, \
EqualUnsupportedCitusNode, \
Out##type, \
ReadUnsupportedCitusNode \
}
/* *INDENT-ON* */
const ExtensibleNodeMethods nodeMethods[] =
{
DEFINE_NODE_METHODS(MultiPlan),
DEFINE_NODE_METHODS(Job),
DEFINE_NODE_METHODS(ShardInterval),
DEFINE_NODE_METHODS(MapMergeJob),
DEFINE_NODE_METHODS(ShardPlacement),
DEFINE_NODE_METHODS(Task),
/* nodes with only output support */
DEFINE_NODE_METHODS_NO_READ(MultiNode),
DEFINE_NODE_METHODS_NO_READ(MultiTreeRoot),
DEFINE_NODE_METHODS_NO_READ(MultiProject),
DEFINE_NODE_METHODS_NO_READ(MultiCollect),
DEFINE_NODE_METHODS_NO_READ(MultiSelect),
DEFINE_NODE_METHODS_NO_READ(MultiTable),
DEFINE_NODE_METHODS_NO_READ(MultiJoin),
DEFINE_NODE_METHODS_NO_READ(MultiPartition),
DEFINE_NODE_METHODS_NO_READ(MultiCartesianProduct),
DEFINE_NODE_METHODS_NO_READ(MultiExtendedOp)
};
#endif
void
RegisterNodes(void)
{
#if (PG_VERSION_NUM >= 90600)
int off;
StaticAssertExpr(lengthof(nodeMethods) == lengthof(CitusNodeTagNamesD),
"number of node methods and names do not match");
for (off = 0; off < lengthof(nodeMethods); off++)
{
RegisterExtensibleNodeMethods(&nodeMethods[off]);
}
#endif
}

View File

@ -38,10 +38,21 @@
* routine.
*/
/* Store const reference to raw input node in local named 'node' */
#define WRITE_LOCALS(nodeTypeName) \
const nodeTypeName *node = (const nodeTypeName *) raw_node
/* Write the label for the node type */
#if (PG_VERSION_NUM >= 90600)
#define WRITE_NODE_TYPE(nodelabel) \
(void) 0
#else
#define WRITE_NODE_TYPE(nodelabel) \
appendStringInfoString(str, nodelabel)
#endif
/* Write an integer field (anything written as ":fldname %d") */
#define WRITE_INT_FIELD(fldname) \
appendStringInfo(str, " :" CppAsString(fldname) " %d", node->fldname)
@ -83,7 +94,7 @@
/* Write a character-string (possibly NULL) field */
#define WRITE_STRING_FIELD(fldname) \
(appendStringInfo(str, " :" CppAsString(fldname) " "), \
_outToken(str, node->fldname))
outToken(str, node->fldname))
/* Write a parse location field (actually same as INT case) */
#define WRITE_LOCATION_FIELD(fldname) \
@ -92,7 +103,7 @@
/* Write a Node field */
#define WRITE_NODE_FIELD(fldname) \
(appendStringInfo(str, " :" CppAsString(fldname) " "), \
_outNode(str, node->fldname))
outNode(str, node->fldname))
/* Write a bitmapset field */
#define WRITE_BITMAPSET_FIELD(fldname) \
@ -102,18 +113,18 @@
#define booltostr(x) ((x) ? "true" : "false")
static void _outNode(StringInfo str, const void *obj);
#if (PG_VERSION_NUM < 90600)
static void outNode(StringInfo str, const void *obj);
/*
* _outToken
* outToken
* Convert an ordinary string (eg, an identifier) into a form that
* will be decoded back to a plain token by read.c's functions.
*
* If a null or empty string is given, it is encoded as "<>".
*/
static void
_outToken(StringInfo str, const char *s)
outToken(StringInfo str, const char *s)
{
if (s == NULL || *s == '\0')
{
@ -166,7 +177,7 @@ _outList(StringInfo str, const List *node)
*/
if (IsA(node, List))
{
_outNode(str, lfirst(lc));
outNode(str, lfirst(lc));
if (lnext(lc))
appendStringInfoChar(str, ' ');
}
@ -187,7 +198,7 @@ _outList(StringInfo str, const List *node)
* Print the value of a Datum given its type.
*/
static void
_outDatum(StringInfo str, Datum value, int typlen, bool typbyval)
outDatum(StringInfo str, Datum value, int typlen, bool typbyval)
{
Size length,
i;
@ -218,38 +229,49 @@ _outDatum(StringInfo str, Datum value, int typlen, bool typbyval)
}
}
#endif
/*****************************************************************************
* Output routines for Citus node types
*****************************************************************************/
static void
_outMultiUnaryNode(StringInfo str, const MultiUnaryNode *node)
OutMultiUnaryNodeFields(StringInfo str, const MultiUnaryNode *node)
{
WRITE_NODE_FIELD(childNode);
}
static void
_outMultiBinaryNode(StringInfo str, const MultiBinaryNode *node)
OutMultiBinaryNodeFields(StringInfo str, const MultiBinaryNode *node)
{
WRITE_NODE_FIELD(leftChildNode);
WRITE_NODE_FIELD(rightChildNode);
}
static void
_outMultiTreeRoot(StringInfo str, const MultiTreeRoot *node)
void
OutMultiNode(OUTFUNC_ARGS)
{
WRITE_NODE_TYPE("MULTITREEROOT");
_outMultiUnaryNode(str, (const MultiUnaryNode *) node);
WRITE_NODE_TYPE("MULTINODE");
}
static void
_outMultiPlan(StringInfo str, const MultiPlan *node)
void
OutMultiTreeRoot(OUTFUNC_ARGS)
{
WRITE_LOCALS(MultiTreeRoot);
WRITE_NODE_TYPE("MULTITREEROOT");
OutMultiUnaryNodeFields(str, (const MultiUnaryNode *) node);
}
void
OutMultiPlan(OUTFUNC_ARGS)
{
WRITE_LOCALS(MultiPlan);
WRITE_NODE_TYPE("MULTIPLAN");
WRITE_NODE_FIELD(workerJob);
@ -258,87 +280,95 @@ _outMultiPlan(StringInfo str, const MultiPlan *node)
}
static void
_outMultiProject(StringInfo str, const MultiProject *node)
void
OutMultiProject(OUTFUNC_ARGS)
{
WRITE_LOCALS(MultiProject);
WRITE_NODE_TYPE("MULTIPROJECT");
WRITE_NODE_FIELD(columnList);
_outMultiUnaryNode(str, (const MultiUnaryNode *) node);
OutMultiUnaryNodeFields(str, (const MultiUnaryNode *) node);
}
static void
_outMultiCollect(StringInfo str, const MultiCollect *node)
void
OutMultiCollect(OUTFUNC_ARGS)
{
WRITE_LOCALS(MultiCollect);
WRITE_NODE_TYPE("MULTICOLLECT");
_outMultiUnaryNode(str, (const MultiUnaryNode *) node);
OutMultiUnaryNodeFields(str, (const MultiUnaryNode *) node);
}
static void
_outMultiSelect(StringInfo str, const MultiSelect *node)
void
OutMultiSelect(OUTFUNC_ARGS)
{
WRITE_LOCALS(MultiSelect);
WRITE_NODE_TYPE("MULTISELECT");
WRITE_NODE_FIELD(selectClauseList);
_outMultiUnaryNode(str, (const MultiUnaryNode *) node);
OutMultiUnaryNodeFields(str, (const MultiUnaryNode *) node);
}
static void
_outMultiTable(StringInfo str, const MultiTable *node)
void
OutMultiTable(OUTFUNC_ARGS)
{
WRITE_LOCALS(MultiTable);
WRITE_NODE_TYPE("MULTITABLE");
WRITE_OID_FIELD(relationId);
WRITE_INT_FIELD(rangeTableId);
_outMultiUnaryNode(str, (const MultiUnaryNode *) node);
OutMultiUnaryNodeFields(str, (const MultiUnaryNode *) node);
}
static void
_outMultiJoin(StringInfo str, const MultiJoin *node)
void
OutMultiJoin(OUTFUNC_ARGS)
{
WRITE_LOCALS(MultiJoin);
WRITE_NODE_TYPE("MULTIJOIN");
WRITE_NODE_FIELD(joinClauseList);
WRITE_ENUM_FIELD(joinRuleType, JoinRuleType);
WRITE_ENUM_FIELD(joinType, JoinType);
_outMultiBinaryNode(str, (const MultiBinaryNode *) node);
OutMultiBinaryNodeFields(str, (const MultiBinaryNode *) node);
}
static void
_outMultiPartition(StringInfo str, const MultiPartition *node)
void
OutMultiPartition(OUTFUNC_ARGS)
{
WRITE_LOCALS(MultiPartition);
WRITE_NODE_TYPE("MULTIPARTITION");
WRITE_NODE_FIELD(partitionColumn);
_outMultiUnaryNode(str, (const MultiUnaryNode *) node);
OutMultiUnaryNodeFields(str, (const MultiUnaryNode *) node);
}
static void
_outMultiCartesianProduct(StringInfo str, const MultiCartesianProduct *node)
void
OutMultiCartesianProduct(OUTFUNC_ARGS)
{
WRITE_LOCALS(MultiCartesianProduct);
WRITE_NODE_TYPE("MULTICARTESIANPRODUCT");
_outMultiBinaryNode(str, (const MultiBinaryNode *) node);
OutMultiBinaryNodeFields(str, (const MultiBinaryNode *) node);
}
static void
_outMultiExtendedOp(StringInfo str, const MultiExtendedOp *node)
void
OutMultiExtendedOp(OUTFUNC_ARGS)
{
WRITE_LOCALS(MultiExtendedOp);
WRITE_NODE_TYPE("MULTIEXTENDEDOP");
WRITE_NODE_FIELD(targetList);
@ -348,11 +378,11 @@ _outMultiExtendedOp(StringInfo str, const MultiExtendedOp *node)
WRITE_NODE_FIELD(limitOffset);
WRITE_NODE_FIELD(havingQual);
_outMultiUnaryNode(str, (const MultiUnaryNode *) node);
OutMultiUnaryNodeFields(str, (const MultiUnaryNode *) node);
}
static void
_outJobInfo(StringInfo str, const Job *node)
OutJobFields(StringInfo str, const Job *node)
{
WRITE_UINT64_FIELD(jobId);
WRITE_NODE_FIELD(jobQuery);
@ -362,18 +392,20 @@ _outJobInfo(StringInfo str, const Job *node)
}
static void
_outJob(StringInfo str, const Job *node)
void
OutJob(OUTFUNC_ARGS)
{
WRITE_LOCALS(Job);
WRITE_NODE_TYPE("JOB");
_outJobInfo(str, node);
OutJobFields(str, node);
}
static void
_outShardInterval(StringInfo str, const ShardInterval *node)
void
OutShardInterval(OUTFUNC_ARGS)
{
WRITE_LOCALS(ShardInterval);
WRITE_NODE_TYPE("SHARDINTERVAL");
WRITE_OID_FIELD(relationId);
@ -388,27 +420,28 @@ _outShardInterval(StringInfo str, const ShardInterval *node)
if (!node->minValueExists)
appendStringInfoString(str, "<>");
else
_outDatum(str, node->minValue, node->valueTypeLen, node->valueByVal);
outDatum(str, node->minValue, node->valueTypeLen, node->valueByVal);
appendStringInfoString(str, " :maxValue ");
if (!node->maxValueExists)
appendStringInfoString(str, "<>");
else
_outDatum(str, node->maxValue, node->valueTypeLen, node->valueByVal);
outDatum(str, node->maxValue, node->valueTypeLen, node->valueByVal);
WRITE_UINT64_FIELD(shardId);
}
static void
_outMapMergeJob(StringInfo str, const MapMergeJob *node)
void
OutMapMergeJob(OUTFUNC_ARGS)
{
WRITE_LOCALS(MapMergeJob);
int arrayLength = node->sortedShardIntervalArrayLength;
int i;
WRITE_NODE_TYPE("MAPMERGEJOB");
_outJobInfo(str, (Job *) node);
OutJobFields(str, (Job *) node);
WRITE_NODE_FIELD(reduceQuery);
WRITE_ENUM_FIELD(partitionType, PartitionType);
WRITE_NODE_FIELD(partitionColumn);
@ -417,9 +450,7 @@ _outMapMergeJob(StringInfo str, const MapMergeJob *node)
for (i = 0; i < arrayLength; ++i)
{
ShardInterval *writeElement = node->sortedShardIntervalArray[i];
_outShardInterval(str, writeElement);
outNode(str, node->sortedShardIntervalArray[i]);
}
WRITE_NODE_FIELD(mapTaskList);
@ -427,9 +458,10 @@ _outMapMergeJob(StringInfo str, const MapMergeJob *node)
}
static void
_outShardPlacement(StringInfo str, const ShardPlacement *node)
void
OutShardPlacement(OUTFUNC_ARGS)
{
WRITE_LOCALS(ShardPlacement);
WRITE_NODE_TYPE("SHARDPLACEMENT");
WRITE_UINT64_FIELD(placementId);
@ -441,9 +473,10 @@ _outShardPlacement(StringInfo str, const ShardPlacement *node)
}
static void
_outTask(StringInfo str, const Task *node)
void
OutTask(OUTFUNC_ARGS)
{
WRITE_LOCALS(Task);
WRITE_NODE_TYPE("TASK");
WRITE_ENUM_FIELD(taskType, TaskType);
@ -462,13 +495,14 @@ _outTask(StringInfo str, const Task *node)
WRITE_BOOL_FIELD(requiresMasterEvaluation);
}
#if (PG_VERSION_NUM < 90600)
/*
* _outNode -
* outNode -
* converts a Node into ascii string and append it to 'str'
*/
static void
_outNode(StringInfo str, const void *obj)
outNode(StringInfo str, const void *obj)
{
if (obj == NULL)
{
@ -487,91 +521,91 @@ _outNode(StringInfo str, const void *obj)
case T_MultiTreeRoot:
appendStringInfoChar(str, '{');
_outMultiTreeRoot(str, obj);
OutMultiTreeRoot(str, obj);
appendStringInfoChar(str, '}');
break;
case T_MultiProject:
appendStringInfoChar(str, '{');
_outMultiProject(str, obj);
OutMultiProject(str, obj);
appendStringInfoChar(str, '}');
break;
case T_MultiCollect:
appendStringInfoChar(str, '{');
_outMultiCollect(str, obj);
OutMultiCollect(str, obj);
appendStringInfoChar(str, '}');
break;
case T_MultiSelect:
appendStringInfoChar(str, '{');
_outMultiSelect(str, obj);
OutMultiSelect(str, obj);
appendStringInfoChar(str, '}');
break;
case T_MultiTable:
appendStringInfoChar(str, '{');
_outMultiTable(str, obj);
OutMultiTable(str, obj);
appendStringInfoChar(str, '}');
break;
case T_MultiJoin:
appendStringInfoChar(str, '{');
_outMultiJoin(str, obj);
OutMultiJoin(str, obj);
appendStringInfoChar(str, '}');
break;
case T_MultiPartition:
appendStringInfoChar(str, '{');
_outMultiPartition(str, obj);
OutMultiPartition(str, obj);
appendStringInfoChar(str, '}');
break;
case T_MultiCartesianProduct:
appendStringInfoChar(str, '{');
_outMultiCartesianProduct(str, obj);
OutMultiCartesianProduct(str, obj);
appendStringInfoChar(str, '}');
break;
case T_MultiExtendedOp:
appendStringInfoChar(str, '{');
_outMultiExtendedOp(str, obj);
OutMultiExtendedOp(str, obj);
appendStringInfoChar(str, '}');
break;
case T_Job:
appendStringInfoChar(str, '{');
_outJob(str, obj);
OutJob(str, obj);
appendStringInfoChar(str, '}');
break;
case T_MapMergeJob:
appendStringInfoChar(str, '{');
_outMapMergeJob(str, obj);
OutMapMergeJob(str, obj);
appendStringInfoChar(str, '}');
break;
case T_MultiPlan:
appendStringInfoChar(str, '{');
_outMultiPlan(str, obj);
OutMultiPlan(str, obj);
appendStringInfoChar(str, '}');
break;
case T_Task:
appendStringInfoChar(str, '{');
_outTask(str, obj);
OutTask(str, obj);
appendStringInfoChar(str, '}');
break;
case T_ShardInterval:
appendStringInfoChar(str, '{');
_outShardInterval(str, obj);
OutShardInterval(str, obj);
appendStringInfoChar(str, '}');
break;
case T_ShardPlacement:
appendStringInfoChar(str, '{');
_outShardPlacement(str, obj);
OutShardPlacement(str, obj);
appendStringInfoChar(str, '}');
break;
@ -581,6 +615,7 @@ _outNode(StringInfo str, const void *obj)
}
}
#endif
/*
* CitusNodeToString -
@ -589,9 +624,13 @@ _outNode(StringInfo str, const void *obj)
char *
CitusNodeToString(const void *obj)
{
#if (PG_VERSION_NUM >= 90600)
return nodeToString(obj);
#else
StringInfoData str;
initStringInfo(&str);
_outNode(&str, obj);
outNode(&str, obj);
return str.data;
#endif
}

View File

@ -29,6 +29,35 @@
#include "nodes/value.h"
/*
* For 9.6 onwards, we use 9.6's extensible node system, thus there's no need
* to copy various routines anymore. In that case, replace these functions
* with plain wrappers.
*/
#if (PG_VERSION_NUM >= 90600)
void *
CitusStringToNode(char *str)
{
return stringToNode(str);
}
char *
citus_pg_strtok(int *length)
{
return pg_strtok(length);
}
void *
CitusNodeRead(char *token, int tok_len)
{
return nodeRead(token, tok_len);
}
#else
/* Static state for citus_pg_strtok */
static char *citus_pg_strtok_ptr = NULL;
@ -346,3 +375,5 @@ CitusNodeRead(char *token, int tok_len)
return (void *) result;
}
#endif /* (PG_VERSION_NUM < 90600) */

View File

@ -0,0 +1,383 @@
/*-------------------------------------------------------------------------
*
* citus_readfuncs.c
* Citus specific node functions
*
* Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
* Portions Copyright (c) 2012-2015, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <math.h>
#include "distributed/citus_nodefuncs.h"
#include "nodes/parsenodes.h"
#include "nodes/readfuncs.h"
/*
* Macros to simplify reading of different kinds of fields. Use these
* wherever possible to reduce the chance for silly typos. Note that these
* hard-wire conventions about the names of the local variables in a Read
* routine.
*/
/* Macros for declaring appropriate local variables */
/* A few guys need only local_node */
#if (PG_VERSION_NUM >= 90600)
static inline Node *
CitusSetTag(Node *node, int tag)
{
CitusNode *citus_node = (CitusNode *) node;
citus_node->citus_tag = tag;
return node;
}
/* *INDENT-OFF* */
#define READ_LOCALS_NO_FIELDS(nodeTypeName) \
nodeTypeName *local_node = (nodeTypeName *) CitusSetTag((Node *) node, T_##nodeTypeName)
#else
#define READ_LOCALS_NO_FIELDS(nodeTypeName) \
nodeTypeName *local_node = CitusMakeNode(nodeTypeName)
#endif
/* And a few guys need only the citus_pg_strtok support fields */
#define READ_TEMP_LOCALS() \
char *token; \
int length
/* ... but most need both */
#define READ_LOCALS(nodeTypeName) \
READ_LOCALS_NO_FIELDS(nodeTypeName); \
READ_TEMP_LOCALS()
/* Read an integer field (anything written as ":fldname %d") */
#define READ_INT_FIELD(fldname) \
token = citus_pg_strtok(&length); /* skip :fldname */ \
token = citus_pg_strtok(&length); /* get field value */ \
local_node->fldname = atoi(token)
/* Read an unsigned integer field (anything written as ":fldname %u") */
#define READ_UINT_FIELD(fldname) \
token = citus_pg_strtok(&length); /* skip :fldname */ \
token = citus_pg_strtok(&length); /* get field value */ \
local_node->fldname = atoui(token)
/* XXX: CITUS Read an uint64 field (anything written as ":fldname %u") */
#define READ_UINT64_FIELD(fldname) \
token = citus_pg_strtok(&length); /* skip :fldname */ \
token = citus_pg_strtok(&length); /* get field value */ \
local_node->fldname = atoull(token)
/* Read an OID field (don't hard-wire assumption that OID is same as uint) */
#define READ_OID_FIELD(fldname) \
token = citus_pg_strtok(&length); /* skip :fldname */ \
token = citus_pg_strtok(&length); /* get field value */ \
local_node->fldname = atooid(token)
/* Read a char field (ie, one ascii character) */
#define READ_CHAR_FIELD(fldname) \
token = citus_pg_strtok(&length); /* skip :fldname */ \
token = citus_pg_strtok(&length); /* get field value */ \
local_node->fldname = token[0]
/* Read an enumerated-type field that was written as an integer code */
#define READ_ENUM_FIELD(fldname, enumtype) \
token = citus_pg_strtok(&length); /* skip :fldname */ \
token = citus_pg_strtok(&length); /* get field value */ \
local_node->fldname = (enumtype) atoi(token)
/* Read a float field */
#define READ_FLOAT_FIELD(fldname) \
token = citus_pg_strtok(&length); /* skip :fldname */ \
token = citus_pg_strtok(&length); /* get field value */ \
local_node->fldname = atof(token)
/* Read a boolean field */
#define READ_BOOL_FIELD(fldname) \
token = citus_pg_strtok(&length); /* skip :fldname */ \
token = citus_pg_strtok(&length); /* get field value */ \
local_node->fldname = strtobool(token)
/* Read a character-string field */
#define READ_STRING_FIELD(fldname) \
token = citus_pg_strtok(&length); /* skip :fldname */ \
token = citus_pg_strtok(&length); /* get field value */ \
local_node->fldname = nullable_string(token, length)
/* Read a parse location field (and throw away the value, per notes above) */
#define READ_LOCATION_FIELD(fldname) \
token = citus_pg_strtok(&length); /* skip :fldname */ \
token = citus_pg_strtok(&length); /* get field value */ \
(void) token; /* in case not used elsewhere */ \
local_node->fldname = -1 /* set field to "unknown" */
/* Read a Node field XXX: Citus: replaced call to nodeRead with CitusNodeRead */
#define READ_NODE_FIELD(fldname) \
token = citus_pg_strtok(&length); /* skip :fldname */ \
(void) token; /* in case not used elsewhere */ \
local_node->fldname = CitusNodeRead(NULL, 0)
/* Routine exit */
#if (PG_VERSION_NUM >= 90600)
#define READ_DONE() \
return;
#else
#define READ_DONE() \
return (Node *) local_node
#endif
/*
* NOTE: use atoi() to read values written with %d, or atoui() to read
* values written with %u in outfuncs.c. An exception is OID values,
* for which use atooid(). (As of 7.1, outfuncs.c writes OIDs as %u,
* but this will probably change in the future.)
*/
#define atoui(x) ((unsigned int) strtoul((x), NULL, 10))
#define atooid(x) ((Oid) strtoul((x), NULL, 10))
/* XXX: Citus */
#define atoull(x) ((uint64) strtoull((x), NULL, 10))
#define strtobool(x) ((*(x) == 't') ? true : false)
#define nullable_string(token,length) \
((length) == 0 ? NULL : debackslash(token, length))
static void
readJobInfo(Job *local_node)
{
READ_TEMP_LOCALS();
READ_UINT64_FIELD(jobId);
READ_NODE_FIELD(jobQuery);
READ_NODE_FIELD(taskList);
READ_NODE_FIELD(dependedJobList);
READ_BOOL_FIELD(subqueryPushdown);
}
READFUNC_RET
ReadJob(READFUNC_ARGS)
{
READ_LOCALS_NO_FIELDS(Job);
readJobInfo(local_node);
READ_DONE();
}
READFUNC_RET
ReadMultiPlan(READFUNC_ARGS)
{
READ_LOCALS(MultiPlan);
READ_NODE_FIELD(workerJob);
READ_NODE_FIELD(masterQuery);
READ_STRING_FIELD(masterTableName);
READ_DONE();
}
READFUNC_RET
ReadShardInterval(READFUNC_ARGS)
{
READ_LOCALS(ShardInterval);
READ_OID_FIELD(relationId);
READ_CHAR_FIELD(storageType);
READ_OID_FIELD(valueTypeId);
READ_INT_FIELD(valueTypeLen);
READ_BOOL_FIELD(valueByVal);
READ_BOOL_FIELD(minValueExists);
READ_BOOL_FIELD(maxValueExists);
token = citus_pg_strtok(&length); /* skip :minValue */
if (!local_node->minValueExists)
token = citus_pg_strtok(&length); /* skip "<>" */
else
local_node->minValue = readDatum(local_node->valueByVal);
token = citus_pg_strtok(&length); /* skip :maxValue */
if (!local_node->minValueExists)
token = citus_pg_strtok(&length); /* skip "<>" */
else
local_node->maxValue = readDatum(local_node->valueByVal);
READ_UINT64_FIELD(shardId);
READ_DONE();
}
READFUNC_RET
ReadMapMergeJob(READFUNC_ARGS)
{
int arrayLength;
int i;
READ_LOCALS(MapMergeJob);
readJobInfo(&local_node->job);
READ_NODE_FIELD(reduceQuery);
READ_ENUM_FIELD(partitionType, PartitionType);
READ_NODE_FIELD(partitionColumn);
READ_UINT_FIELD(partitionCount);
READ_INT_FIELD(sortedShardIntervalArrayLength);
arrayLength = local_node->sortedShardIntervalArrayLength;
/* now build & read sortedShardIntervalArray */
local_node->sortedShardIntervalArray =
(ShardInterval**) palloc(arrayLength * sizeof(ShardInterval *));
for (i = 0; i < arrayLength; ++i)
{
/* can't use READ_NODE_FIELD, no field names */
local_node->sortedShardIntervalArray[i] = CitusNodeRead(NULL, 0);
}
READ_NODE_FIELD(mapTaskList);
READ_NODE_FIELD(mergeTaskList);
READ_DONE();
}
READFUNC_RET
ReadShardPlacement(READFUNC_ARGS)
{
READ_LOCALS(ShardPlacement);
READ_OID_FIELD(placementId);
READ_UINT64_FIELD(shardId);
READ_UINT64_FIELD(shardLength);
READ_ENUM_FIELD(shardState, RelayFileState);
READ_STRING_FIELD(nodeName);
READ_UINT_FIELD(nodePort);
READ_DONE();
}
READFUNC_RET
ReadTask(READFUNC_ARGS)
{
READ_LOCALS(Task);
READ_ENUM_FIELD(taskType, TaskType);
READ_UINT64_FIELD(jobId);
READ_UINT_FIELD(taskId);
READ_STRING_FIELD(queryString);
READ_UINT64_FIELD(anchorShardId);
READ_NODE_FIELD(taskPlacementList);
READ_NODE_FIELD(dependedTaskList);
READ_UINT_FIELD(partitionId);
READ_UINT_FIELD(upstreamTaskId);
READ_NODE_FIELD(shardInterval);
READ_BOOL_FIELD(assignmentConstrained);
READ_NODE_FIELD(taskExecution);
READ_BOOL_FIELD(upsertQuery);
READ_BOOL_FIELD(requiresMasterEvaluation);
READ_DONE();
}
READFUNC_RET
ReadUnsupportedCitusNode(READFUNC_ARGS)
{
ereport(ERROR, (errmsg("not implemented")));
}
#if (PG_VERSION_NUM < 90600)
/*
* readDatum
*
* Given a string representation of a constant, recreate the appropriate
* Datum. The string representation embeds length info, but not byValue,
* so we must be told that.
*/
Datum
readDatum(bool typbyval)
{
Size length,
i;
int tokenLength;
char *token;
Datum res;
char *s;
/*
* read the actual length of the value
*/
token = citus_pg_strtok(&tokenLength);
length = atoui(token);
token = citus_pg_strtok(&tokenLength); /* read the '[' */
if (token == NULL || token[0] != '[')
elog(ERROR, "expected \"[\" to start datum, but got \"%s\"; length = %zu",
token ? (const char *) token : "[NULL]", length);
if (typbyval)
{
if (length > (Size) sizeof(Datum))
elog(ERROR, "byval datum but length = %zu", length);
res = (Datum) 0;
s = (char *) (&res);
for (i = 0; i < (Size) sizeof(Datum); i++)
{
token = citus_pg_strtok(&tokenLength);
s[i] = (char) atoi(token);
}
}
else if (length <= 0)
res = (Datum) NULL;
else
{
s = (char *) palloc(length);
for (i = 0; i < length; i++)
{
token = citus_pg_strtok(&tokenLength);
s[i] = (char) atoi(token);
}
res = PointerGetDatum(s);
}
token = citus_pg_strtok(&tokenLength); /* read the ']' */
if (token == NULL || token[0] != ']')
elog(ERROR, "expected \"]\" to end datum, but got \"%s\"; length = %zu",
token ? (const char *) token : "[NULL]", length);
return res;
}
#endif
#if (PG_VERSION_NUM >= 90600)
/* *INDENT-ON* */
/*
* For 9.6+ we can just use the, now extensible, parseNodeString(). Before
* that citus_readfuncs_$ver.c has a version specific implementation.
*/
Node *
CitusParseNodeString(void)
{
return parseNodeString();
}
#endif

View File

@ -144,8 +144,6 @@
((length) == 0 ? NULL : debackslash(token, length))
static Datum readDatum(bool typbyval);
/*
* _readBitmapset
*/
@ -1368,216 +1366,6 @@ _readTableSampleClause(void)
}
/* XXX: BEGIN Citus Nodes */
static void
_readJobInfo(Job *local_node)
{
READ_TEMP_LOCALS();
READ_UINT64_FIELD(jobId);
READ_NODE_FIELD(jobQuery);
READ_NODE_FIELD(taskList);
READ_NODE_FIELD(dependedJobList);
READ_BOOL_FIELD(subqueryPushdown);
}
static Job *
_readJob(void)
{
READ_LOCALS_NO_FIELDS(Job);
_readJobInfo(local_node);
READ_DONE();
}
static MultiPlan *
_readMultiPlan(void)
{
READ_LOCALS(MultiPlan);
READ_NODE_FIELD(workerJob);
READ_NODE_FIELD(masterQuery);
READ_STRING_FIELD(masterTableName);
READ_DONE();
}
static ShardInterval *
_readShardInterval(void)
{
READ_LOCALS(ShardInterval);
READ_OID_FIELD(relationId);
READ_CHAR_FIELD(storageType);
READ_OID_FIELD(valueTypeId);
READ_INT_FIELD(valueTypeLen);
READ_BOOL_FIELD(valueByVal);
READ_BOOL_FIELD(minValueExists);
READ_BOOL_FIELD(maxValueExists);
token = citus_pg_strtok(&length); /* skip :minValue */
if (!local_node->minValueExists)
token = citus_pg_strtok(&length); /* skip "<>" */
else
local_node->minValue = readDatum(local_node->valueByVal);
token = citus_pg_strtok(&length); /* skip :maxValue */
if (!local_node->minValueExists)
token = citus_pg_strtok(&length); /* skip "<>" */
else
local_node->maxValue = readDatum(local_node->valueByVal);
READ_UINT64_FIELD(shardId);
READ_DONE();
}
static MapMergeJob *
_readMapMergeJob(void)
{
int arrayLength;
int i;
READ_LOCALS(MapMergeJob);
_readJobInfo(&local_node->job);
READ_NODE_FIELD(reduceQuery);
READ_ENUM_FIELD(partitionType, PartitionType);
READ_NODE_FIELD(partitionColumn);
READ_UINT_FIELD(partitionCount);
READ_INT_FIELD(sortedShardIntervalArrayLength);
arrayLength = local_node->sortedShardIntervalArrayLength;
/* now build & read sortedShardIntervalArray */
local_node->sortedShardIntervalArray =
(ShardInterval**) palloc(arrayLength * sizeof(ShardInterval *));
for (i = 0; i < arrayLength; ++i)
{
local_node->sortedShardIntervalArray[i] = _readShardInterval();
}
READ_NODE_FIELD(mapTaskList);
READ_NODE_FIELD(mergeTaskList);
READ_DONE();
}
static ShardPlacement *
_readShardPlacement(void)
{
READ_LOCALS(ShardPlacement);
READ_UINT64_FIELD(placementId);
READ_UINT64_FIELD(shardId);
READ_UINT64_FIELD(shardLength);
READ_ENUM_FIELD(shardState, RelayFileState);
READ_STRING_FIELD(nodeName);
READ_UINT_FIELD(nodePort);
READ_DONE();
}
static Task *
_readTask(void)
{
READ_LOCALS(Task);
READ_ENUM_FIELD(taskType, TaskType);
READ_UINT64_FIELD(jobId);
READ_UINT_FIELD(taskId);
READ_STRING_FIELD(queryString);
READ_UINT64_FIELD(anchorShardId);
READ_NODE_FIELD(taskPlacementList);
READ_NODE_FIELD(dependedTaskList);
READ_UINT_FIELD(partitionId);
READ_UINT_FIELD(upstreamTaskId);
READ_NODE_FIELD(shardInterval);
READ_BOOL_FIELD(assignmentConstrained);
READ_NODE_FIELD(taskExecution);
READ_BOOL_FIELD(upsertQuery);
READ_BOOL_FIELD(requiresMasterEvaluation);
READ_DONE();
}
/* XXX: END Citus Nodes */
/*
* readDatum
*
* Given a string representation of a constant, recreate the appropriate
* Datum. The string representation embeds length info, but not byValue,
* so we must be told that.
*/
static Datum
readDatum(bool typbyval)
{
Size length,
i;
int tokenLength;
char *token;
Datum res;
char *s;
/*
* read the actual length of the value
*/
token = citus_pg_strtok(&tokenLength);
length = atoui(token);
token = citus_pg_strtok(&tokenLength); /* read the '[' */
if (token == NULL || token[0] != '[')
elog(ERROR, "expected \"[\" to start datum, but got \"%s\"; length = %zu",
token ? (const char *) token : "[NULL]", length);
if (typbyval)
{
if (length > (Size) sizeof(Datum))
elog(ERROR, "byval datum but length = %zu", length);
res = (Datum) 0;
s = (char *) (&res);
for (i = 0; i < (Size) sizeof(Datum); i++)
{
token = citus_pg_strtok(&tokenLength);
s[i] = (char) atoi(token);
}
}
else if (length <= 0)
res = (Datum) NULL;
else
{
s = (char *) palloc(length);
for (i = 0; i < length; i++)
{
token = citus_pg_strtok(&tokenLength);
s[i] = (char) atoi(token);
}
res = PointerGetDatum(s);
}
token = citus_pg_strtok(&tokenLength); /* read the ']' */
if (token == NULL || token[0] != ']')
elog(ERROR, "expected \"]\" to end datum, but got \"%s\"; length = %zu",
token ? (const char *) token : "[NULL]", length);
return res;
}
/*
* parseNodeString
*
@ -1718,17 +1506,17 @@ CitusParseNodeString(void)
return_value = _readDeclareCursorStmt();
/* XXX: BEGIN Citus Nodes */
else if (MATCH("MULTIPLAN", 9))
return_value = _readMultiPlan();
return_value = ReadMultiPlan();
else if (MATCH("JOB", 3))
return_value = _readJob();
return_value = ReadJob();
else if (MATCH("SHARDINTERVAL", 13))
return_value = _readShardInterval();
return_value = ReadShardInterval();
else if (MATCH("MAPMERGEJOB", 11))
return_value = _readMapMergeJob();
return_value = ReadMapMergeJob();
else if (MATCH("SHARDPLACEMENT", 14))
return_value = _readShardPlacement();
return_value = ReadShardPlacement();
else if (MATCH("TASK", 4))
return_value = _readTask();
return_value = ReadTask();
/* XXX: END Citus Nodes */
else
{

View File

@ -18,6 +18,7 @@
#include "access/xact.h"
#include "access/sysattr.h"
#include "catalog/indexing.h"
#include "catalog/pg_am.h"
#include "catalog/pg_extension.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_type.h"

File diff suppressed because it is too large Load Diff

View File

@ -25,6 +25,7 @@
#include "access/hash.h"
#include "access/htup_details.h"
#include "access/nbtree.h"
#include "catalog/pg_am.h"
#include "catalog/pg_collation.h"
#include "commands/copy.h"
#include "commands/defrem.h"

View File

@ -37,6 +37,55 @@ extern void * CitusNodeRead(char *token, int tok_len);
/* citus_readfuncs.c */
extern Node * CitusParseNodeString(void);
extern Datum readDatum(bool typbyval);
extern void RegisterNodes(void);
/*
* Define read functions for citus nodes in a way they're usable across
* several major versions. That requires some macro-uglyness as 9.6+ is quite
* different from before.
*/
#if (PG_VERSION_NUM >= 90600)
#define READFUNC_ARGS struct ExtensibleNode *node
#define READFUNC_RET void
#else
#define READFUNC_ARGS void
#define READFUNC_RET Node *
#endif
#if (PG_VERSION_NUM >= 90600)
#define OUTFUNC_ARGS StringInfo str, const struct ExtensibleNode *raw_node
#else
#define OUTFUNC_ARGS StringInfo str, const Node *raw_node
#endif
extern READFUNC_RET ReadJob(READFUNC_ARGS);
extern READFUNC_RET ReadMultiPlan(READFUNC_ARGS);
extern READFUNC_RET ReadShardInterval(READFUNC_ARGS);
extern READFUNC_RET ReadMapMergeJob(READFUNC_ARGS);
extern READFUNC_RET ReadShardPlacement(READFUNC_ARGS);
extern READFUNC_RET ReadTask(READFUNC_ARGS);
extern READFUNC_RET ReadUnsupportedCitusNode(READFUNC_ARGS);
extern void OutJob(OUTFUNC_ARGS);
extern void OutMultiPlan(OUTFUNC_ARGS);
extern void OutShardInterval(OUTFUNC_ARGS);
extern void OutMapMergeJob(OUTFUNC_ARGS);
extern void OutShardPlacement(OUTFUNC_ARGS);
extern void OutTask(OUTFUNC_ARGS);
extern void OutMultiNode(OUTFUNC_ARGS);
extern void OutMultiTreeRoot(OUTFUNC_ARGS);
extern void OutMultiProject(OUTFUNC_ARGS);
extern void OutMultiCollect(OUTFUNC_ARGS);
extern void OutMultiSelect(OUTFUNC_ARGS);
extern void OutMultiTable(OUTFUNC_ARGS);
extern void OutMultiJoin(OUTFUNC_ARGS);
extern void OutMultiPartition(OUTFUNC_ARGS);
extern void OutMultiCartesianProduct(OUTFUNC_ARGS);
extern void OutMultiExtendedOp(OUTFUNC_ARGS);
#endif /* CITUS_NODEFUNCS_H */

View File

@ -3,6 +3,26 @@
* citus_nodes.h
* Additional node types, and related infrastructure, for Citus.
*
* To add a new node type to Citus, perform the following:
*
* * Add a new CitusNodeTag value to use as a tag for the node. Add
* the node's name at a corresponding offset within the array named
* CitusNodeTagNamesD at the top of citus_nodefuncs.c
*
* * Describe the node in a struct, which must have a CitusNode as
* its first element
*
* * Implement an 'outfunc' for the node in citus_outfuncs.c, using
* the macros defined within that file. This function will handle
* converting the node to a string
*
* * Implement a 'readfunc' for the node in citus_readfuncs.c, using
* the macros defined within that file. This function will handle
* converting strings into instances of the node
*
* * Use DEFINE_NODE_METHODS within the nodeMethods array (near the
* bottom of citus_nodefuncs.c) to register the node in PostgreSQL
*
* Copyright (c) 2012-2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
@ -17,9 +37,10 @@
*
* These have to be distinct from the ideas used in postgres' nodes.h
*/
#define CITUS_NODE_TAG_START 1200
typedef enum CitusNodeTag
{
T_MultiNode = 1200, /* FIXME: perhaps use something less predicable? */
T_MultiNode = CITUS_NODE_TAG_START, /* FIXME: perhaps use something less predicable? */
T_MultiTreeRoot,
T_MultiProject,
T_MultiCollect,
@ -38,6 +59,46 @@ typedef enum CitusNodeTag
} CitusNodeTag;
const char** CitusNodeTagNames;
#if (PG_VERSION_NUM >= 90600)
#include "nodes/extensible.h"
typedef struct CitusNode
{
ExtensibleNode extensible;
CitusNodeTag citus_tag; /* for quick type determination */
} CitusNode;
#define CitusNodeTag(nodeptr) CitusNodeTagI((Node*) nodeptr)
static inline int
CitusNodeTagI(Node *node)
{
if (!IsA(node, ExtensibleNode))
{
return nodeTag(node);
}
return ((CitusNode*)(node))->citus_tag;
}
/* Citus variant of newNode(), don't use directly. */
#define CitusNewNode(size, tag) \
({ CitusNode *_result; \
AssertMacro((size) >= sizeof(CitusNode)); /* need the tag, at least */ \
_result = (CitusNode *) palloc0fast(size); \
_result->extensible.type = T_ExtensibleNode; \
_result->extensible.extnodename = CitusNodeTagNames[tag - CITUS_NODE_TAG_START]; \
_result->citus_tag =(int) (tag); \
_result; \
})
#else
typedef CitusNodeTag CitusNode;
/*
* nodeTag equivalent that returns the node tag for both citus and postgres
* node tag types. Needs to return int as there's no type that covers both
@ -46,12 +107,6 @@ typedef enum CitusNodeTag
#define CitusNodeTag(nodeptr) (*((const int*)(nodeptr)))
/*
* IsA equivalent that compares node tags as integers, not as enum values.
*/
#define CitusIsA(nodeptr,_type_) (CitusNodeTag(nodeptr) == T_##_type_)
/* Citus variant of newNode(), don't use directly. */
#define CitusNewNode(size, tag) \
({ Node *_result; \
@ -61,6 +116,13 @@ typedef enum CitusNodeTag
_result; \
})
#endif
/*
* IsA equivalent that compares node tags, including Citus-specific nodes.
*/
#define CitusIsA(nodeptr,_type_) (CitusNodeTag(nodeptr) == T_##_type_)
/*
* CitusMakeNode is Citus variant of makeNode(). Use it to create nodes of

View File

@ -29,7 +29,7 @@
/* In-memory representation of a typed tuple in pg_dist_shard. */
typedef struct ShardInterval
{
CitusNodeTag type;
CitusNode type;
Oid relationId;
char storageType;
Oid valueTypeId; /* min/max value datum's typeId */
@ -46,7 +46,7 @@ typedef struct ShardInterval
/* In-memory representation of a tuple in pg_dist_shard_placement. */
typedef struct ShardPlacement
{
CitusNodeTag type;
CitusNode type;
uint64 placementId; /* sequence that implies this placement creation order */
uint64 shardId;
uint64 shardLength;

View File

@ -17,9 +17,15 @@
#define EXEC_FLAG_CITUS_MASTER_SELECT 0x100
#define EXEC_FLAG_CITUS_ROUTER_EXECUTOR 0x200
#if (PG_VERSION_NUM >= 90600)
#define tuplecount_t uint64
#else
#define tuplecount_t long
#endif
extern void multi_ExecutorStart(QueryDesc *queryDesc, int eflags);
extern void multi_ExecutorRun(QueryDesc *queryDesc,
ScanDirection direction, long count);
ScanDirection direction, tuplecount_t count);
extern void multi_ExecutorFinish(QueryDesc *queryDesc);
extern void multi_ExecutorEnd(QueryDesc *queryDesc);

View File

@ -35,7 +35,7 @@
*/
typedef struct MultiNode
{
CitusNodeTag type;
CitusNode type;
struct MultiNode *parentNode;

View File

@ -114,7 +114,7 @@ typedef enum
*/
typedef struct Job
{
CitusNodeTag type;
CitusNode type;
uint64 jobId;
Query *jobQuery;
List *taskList;
@ -152,7 +152,7 @@ typedef struct TaskExecution TaskExecution;
typedef struct Task
{
CitusNodeTag type;
CitusNode type;
TaskType taskType;
uint64 jobId;
uint32 taskId;
@ -201,7 +201,7 @@ typedef struct JoinSequenceNode
*/
typedef struct MultiPlan
{
CitusNodeTag type;
CitusNode type;
Job *workerJob;
Query *masterQuery;
char *masterTableName;

View File

@ -3,6 +3,11 @@
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 520000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 520000;
CREATE OR REPLACE FUNCTION array_sort (ANYARRAY)
RETURNS ANYARRAY LANGUAGE SQL
AS $$
SELECT ARRAY(SELECT unnest($1) ORDER BY 1)
$$;
-- Check multi_cat_agg() aggregate which is used to implement array_agg()
SELECT array_cat_agg(i) FROM (VALUES (ARRAY[1,2]), (NULL), (ARRAY[3,4])) AS t(i);
array_cat_agg
@ -18,68 +23,68 @@ ERROR: array_agg with order by is unsupported
SELECT array_agg(distinct l_orderkey ORDER BY l_orderkey) FROM lineitem;
ERROR: array_agg with order by is unsupported
-- Check array_agg() for different data types and LIMIT clauses
SELECT array_agg(l_partkey) FROM lineitem GROUP BY l_orderkey
SELECT array_sort(array_agg(l_partkey)) FROM lineitem GROUP BY l_orderkey
ORDER BY l_orderkey LIMIT 10;
array_agg
array_sort
--------------------------------------------------
{155190,67310,63700,2132,24027,15635}
{2132,15635,24027,63700,67310,155190}
{106170}
{4297,19036,128449,29380,183095,62143}
{4297,19036,29380,62143,128449,183095}
{88035}
{108570,123927,37531}
{37531,108570,123927}
{139636}
{182052,145243,94780,163073,151894,79251,157238}
{82704,197921,44161,2743,85811,11615}
{61336,60519,137469,33918}
{79251,94780,145243,151894,157238,163073,182052}
{2743,11615,44161,82704,85811,197921}
{33918,60519,61336,137469}
{88362,89414,169544}
(10 rows)
SELECT array_agg(l_extendedprice) FROM lineitem GROUP BY l_orderkey
SELECT array_sort(array_agg(l_extendedprice)) FROM lineitem GROUP BY l_orderkey
ORDER BY l_orderkey LIMIT 10;
array_agg
array_sort
-----------------------------------------------------------------
{21168.23,45983.16,13309.60,28955.64,22824.48,49620.16}
{13309.60,21168.23,22824.48,28955.64,45983.16,49620.16}
{44694.46}
{54058.05,46796.47,39890.88,2618.76,32986.52,28733.64}
{2618.76,28733.64,32986.52,39890.88,46796.47,54058.05}
{30690.90}
{23678.55,50723.92,73426.50}
{61998.31}
{13608.60,11594.16,81639.88,31809.96,73943.82,43058.75,6476.15}
{47227.60,64605.44,2210.32,6582.96,79059.64,9159.66}
{40217.23,47344.32,7532.30,75928.31}
{17554.68,30875.02,9681.24}
{6476.15,11594.16,13608.60,31809.96,43058.75,73943.82,81639.88}
{2210.32,6582.96,9159.66,47227.60,64605.44,79059.64}
{7532.30,40217.23,47344.32,75928.31}
{9681.24,17554.68,30875.02}
(10 rows)
SELECT array_agg(l_shipdate) FROM lineitem GROUP BY l_orderkey
SELECT array_sort(array_agg(l_shipdate)) FROM lineitem GROUP BY l_orderkey
ORDER BY l_orderkey LIMIT 10;
array_agg
array_sort
--------------------------------------------------------------------------------
{03-13-1996,04-12-1996,01-29-1996,04-21-1996,03-30-1996,01-30-1996}
{01-29-1996,01-30-1996,03-13-1996,03-30-1996,04-12-1996,04-21-1996}
{01-28-1997}
{02-02-1994,11-09-1993,01-16-1994,12-04-1993,12-14-1993,10-29-1993}
{10-29-1993,11-09-1993,12-04-1993,12-14-1993,01-16-1994,02-02-1994}
{01-10-1996}
{10-31-1994,10-16-1994,08-08-1994}
{08-08-1994,10-16-1994,10-31-1994}
{04-27-1992}
{05-07-1996,02-01-1996,01-15-1996,03-21-1996,02-11-1996,01-16-1996,02-10-1996}
{10-23-1995,08-14-1995,08-07-1995,08-04-1995,08-28-1995,07-21-1995}
{10-29-1993,12-09-1993,12-09-1993,11-09-1993}
{10-23-1998,10-09-1998,10-30-1998}
{01-15-1996,01-16-1996,02-01-1996,02-10-1996,02-11-1996,03-21-1996,05-07-1996}
{07-21-1995,08-04-1995,08-07-1995,08-14-1995,08-28-1995,10-23-1995}
{10-29-1993,11-09-1993,12-09-1993,12-09-1993}
{10-09-1998,10-23-1998,10-30-1998}
(10 rows)
SELECT array_agg(l_shipmode) FROM lineitem GROUP BY l_orderkey
SELECT array_sort(array_agg(l_shipmode)) FROM lineitem GROUP BY l_orderkey
ORDER BY l_orderkey LIMIT 10;
array_agg
array_sort
----------------------------------------------------------------------------------------------
{"TRUCK ","MAIL ","REG AIR ","AIR ","FOB ","MAIL "}
{"AIR ","FOB ","MAIL ","MAIL ","REG AIR ","TRUCK "}
{"RAIL "}
{"AIR ","RAIL ","SHIP ","TRUCK ","FOB ","RAIL "}
{"AIR ","FOB ","RAIL ","RAIL ","SHIP ","TRUCK "}
{"REG AIR "}
{"AIR ","FOB ","AIR "}
{"AIR ","AIR ","FOB "}
{"TRUCK "}
{"FOB ","SHIP ","MAIL ","FOB ","TRUCK ","FOB ","FOB "}
{"TRUCK ","AIR ","AIR ","REG AIR ","AIR ","RAIL "}
{"TRUCK ","MAIL ","AIR ","MAIL "}
{"REG AIR ","FOB ","FOB "}
{"FOB ","FOB ","FOB ","FOB ","MAIL ","SHIP ","TRUCK "}
{"AIR ","AIR ","AIR ","RAIL ","REG AIR ","TRUCK "}
{"AIR ","MAIL ","MAIL ","TRUCK "}
{"FOB ","FOB ","REG AIR "}
(10 rows)
-- Check that we can execute array_agg() within other functions
@ -93,10 +98,10 @@ SELECT array_length(array_agg(l_orderkey), 1) FROM lineitem;
-- shards and contain different aggregates, filter clauses and other complex
-- expressions. Note that the l_orderkey ranges are such that the matching rows
-- lie in different shards.
SELECT l_quantity, count(*), avg(l_extendedprice), array_agg(l_orderkey) FROM lineitem
SELECT l_quantity, count(*), avg(l_extendedprice), array_sort(array_agg(l_orderkey)) FROM lineitem
WHERE l_quantity < 5 AND l_orderkey > 5500 AND l_orderkey < 9500
GROUP BY l_quantity ORDER BY l_quantity;
l_quantity | count | avg | array_agg
l_quantity | count | avg | array_sort
------------+-------+-----------------------+--------------------------------------------------------------------------------------------------
1.00 | 17 | 1477.1258823529411765 | {5543,5633,5634,5698,5766,5856,5857,5986,8997,9026,9158,9184,9220,9222,9348,9383,9476}
2.00 | 19 | 3078.4242105263157895 | {5506,5540,5573,5669,5703,5730,5798,5831,5893,5920,5923,9030,9058,9123,9124,9188,9344,9441,9476}
@ -104,21 +109,21 @@ SELECT l_quantity, count(*), avg(l_extendedprice), array_agg(l_orderkey) FROM li
4.00 | 19 | 5929.7136842105263158 | {5504,5507,5508,5511,5538,5764,5766,5826,5829,5862,5959,5985,9091,9120,9281,9347,9382,9440,9473}
(4 rows)
SELECT l_quantity, array_agg(extract (month FROM o_orderdate)) AS my_month
SELECT l_quantity, array_sort(array_agg(extract (month FROM o_orderdate))) AS my_month
FROM lineitem, orders WHERE l_orderkey = o_orderkey AND l_quantity < 5
AND l_orderkey > 5500 AND l_orderkey < 9500 GROUP BY l_quantity ORDER BY l_quantity;
l_quantity | my_month
------------+------------------------------------------------
1.00 | {9,5,7,5,9,11,11,4,7,7,4,7,4,2,6,3,5}
2.00 | {11,10,8,5,5,12,3,11,7,11,5,7,6,6,10,1,12,6,5}
3.00 | {4,9,8,11,7,10,6,7,8,5,8,9,11,3}
4.00 | {1,5,6,11,12,10,9,6,1,2,5,1,11,6,2,8,2,6,10}
1.00 | {2,3,4,4,4,5,5,5,6,7,7,7,7,9,9,11,11}
2.00 | {1,3,5,5,5,5,6,6,6,7,7,8,10,10,11,11,11,12,12}
3.00 | {3,4,5,6,7,7,8,8,8,9,9,10,11,11}
4.00 | {1,1,1,2,2,2,5,5,6,6,6,6,8,9,10,10,11,11,12}
(4 rows)
SELECT l_quantity, array_agg(l_orderkey * 2 + 1) FROM lineitem WHERE l_quantity < 5
SELECT l_quantity, array_sort(array_agg(l_orderkey * 2 + 1)) FROM lineitem WHERE l_quantity < 5
AND octet_length(l_comment) + octet_length('randomtext'::text) > 40
AND l_orderkey > 5500 AND l_orderkey < 9500 GROUP BY l_quantity ORDER BY l_quantity;
l_quantity | array_agg
l_quantity | array_sort
------------+---------------------------------------------
1.00 | {11269,11397,11713,11715,11973,18317,18445}
2.00 | {11847,18061,18247,18953}

View File

@ -3,6 +3,13 @@
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 570000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 570000;
-- print major version to make version-specific tests clear
SELECT substring(version(), '\d+\.\d+') AS major_version;
major_version
---------------
9.6
(1 row)
\a\t
SET citus.task_executor_type TO 'real-time';
SET citus.explain_distributed_queries TO on;
@ -43,7 +50,7 @@ Distributed Query into pg_merge_job_570000
-> Seq Scan on lineitem_290001 lineitem
Master Query
-> Sort
Sort Key: COALESCE((sum((COALESCE((sum(intermediate_column_570000_1))::bigint, '0'::bigint))))::bigint, '0'::bigint), intermediate_column_570000_0
Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(intermediate_column_570000_1))::bigint, '0'::bigint))))::bigint, '0'::bigint), intermediate_column_570000_0
-> HashAggregate
Group Key: intermediate_column_570000_0
-> Seq Scan on pg_merge_job_570000
@ -66,11 +73,14 @@ EXPLAIN (COSTS FALSE, FORMAT JSON)
"Plan": {
"Node Type": "Aggregate",
"Strategy": "Hashed",
"Partial Mode": "Simple",
"Parallel Aware": false,
"Group Key": ["l_quantity"],
"Plans": [
{
"Node Type": "Seq Scan",
"Parent Relationship": "Outer",
"Parallel Aware": false,
"Relation Name": "lineitem_290001",
"Alias": "lineitem"
}
@ -87,17 +97,21 @@ EXPLAIN (COSTS FALSE, FORMAT JSON)
{
"Plan": {
"Node Type": "Sort",
"Sort Key": ["COALESCE((sum((COALESCE((sum(intermediate_column_570001_1))::bigint, '0'::bigint))))::bigint, '0'::bigint)", "intermediate_column_570001_0"],
"Parallel Aware": false,
"Sort Key": ["COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(intermediate_column_570001_1))::bigint, '0'::bigint))))::bigint, '0'::bigint)", "intermediate_column_570001_0"],
"Plans": [
{
"Node Type": "Aggregate",
"Strategy": "Hashed",
"Partial Mode": "Simple",
"Parent Relationship": "Outer",
"Parallel Aware": false,
"Group Key": ["intermediate_column_570001_0"],
"Plans": [
{
"Node Type": "Seq Scan",
"Parent Relationship": "Outer",
"Parallel Aware": false,
"Relation Name": "pg_merge_job_570001",
"Alias": "pg_merge_job_570001"
}
@ -133,6 +147,8 @@ EXPLAIN (COSTS FALSE, FORMAT XML)
<Plan>
<Node-Type>Aggregate</Node-Type>
<Strategy>Hashed</Strategy>
<Partial-Mode>Simple</Partial-Mode>
<Parallel-Aware>false</Parallel-Aware>
<Group-Key>
<Item>l_quantity</Item>
</Group-Key>
@ -140,6 +156,7 @@ EXPLAIN (COSTS FALSE, FORMAT XML)
<Plan>
<Node-Type>Seq Scan</Node-Type>
<Parent-Relationship>Outer</Parent-Relationship>
<Parallel-Aware>false</Parallel-Aware>
<Relation-Name>lineitem_290001</Relation-Name>
<Alias>lineitem</Alias>
</Plan>
@ -155,15 +172,18 @@ EXPLAIN (COSTS FALSE, FORMAT XML)
<Query>
<Plan>
<Node-Type>Sort</Node-Type>
<Parallel-Aware>false</Parallel-Aware>
<Sort-Key>
<Item>COALESCE((sum((COALESCE((sum(intermediate_column_570003_1))::bigint, '0'::bigint))))::bigint, '0'::bigint)</Item>
<Item>COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(intermediate_column_570003_1))::bigint, '0'::bigint))))::bigint, '0'::bigint)</Item>
<Item>intermediate_column_570003_0</Item>
</Sort-Key>
<Plans>
<Plan>
<Node-Type>Aggregate</Node-Type>
<Strategy>Hashed</Strategy>
<Partial-Mode>Simple</Partial-Mode>
<Parent-Relationship>Outer</Parent-Relationship>
<Parallel-Aware>false</Parallel-Aware>
<Group-Key>
<Item>intermediate_column_570003_0</Item>
</Group-Key>
@ -171,6 +191,7 @@ EXPLAIN (COSTS FALSE, FORMAT XML)
<Plan>
<Node-Type>Seq Scan</Node-Type>
<Parent-Relationship>Outer</Parent-Relationship>
<Parallel-Aware>false</Parallel-Aware>
<Relation-Name>pg_merge_job_570003</Relation-Name>
<Alias>pg_merge_job_570003</Alias>
</Plan>
@ -201,29 +222,36 @@ EXPLAIN (COSTS FALSE, FORMAT YAML)
- Plan:
Node Type: "Aggregate"
Strategy: "Hashed"
Partial Mode: "Simple"
Parallel Aware: false
Group Key:
- "l_quantity"
Plans:
- Node Type: "Seq Scan"
Parent Relationship: "Outer"
Parallel Aware: false
Relation Name: "lineitem_290001"
Alias: "lineitem"
Master Query:
- Plan:
Node Type: "Sort"
Parallel Aware: false
Sort Key:
- "COALESCE((sum((COALESCE((sum(intermediate_column_570005_1))::bigint, '0'::bigint))))::bigint, '0'::bigint)"
- "COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(intermediate_column_570005_1))::bigint, '0'::bigint))))::bigint, '0'::bigint)"
- "intermediate_column_570005_0"
Plans:
- Node Type: "Aggregate"
Strategy: "Hashed"
Partial Mode: "Simple"
Parent Relationship: "Outer"
Parallel Aware: false
Group Key:
- "intermediate_column_570005_0"
Plans:
- Node Type: "Seq Scan"
Parent Relationship: "Outer"
Parallel Aware: false
Relation Name: "pg_merge_job_570005"
Alias: "pg_merge_job_570005"
-- Test Text format
@ -241,7 +269,7 @@ Distributed Query into pg_merge_job_570006
-> Seq Scan on lineitem_290001 lineitem
Master Query
-> Sort
Sort Key: COALESCE((sum((COALESCE((sum(intermediate_column_570006_1))::bigint, '0'::bigint))))::bigint, '0'::bigint), intermediate_column_570006_0
Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(intermediate_column_570006_1))::bigint, '0'::bigint))))::bigint, '0'::bigint), intermediate_column_570006_0
-> HashAggregate
Group Key: intermediate_column_570006_0
-> Seq Scan on pg_merge_job_570006
@ -260,7 +288,7 @@ Distributed Query into pg_merge_job_570007
Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment
Master Query
-> Aggregate
Output: (sum(intermediate_column_570007_0) / (sum(intermediate_column_570007_1) / sum(intermediate_column_570007_2)))
Output: (sum(intermediate_column_570007_0) / (sum(intermediate_column_570007_1) / pg_catalog.sum(intermediate_column_570007_2)))
-> Seq Scan on pg_temp_2.pg_merge_job_570007
Output: intermediate_column_570007_0, intermediate_column_570007_1, intermediate_column_570007_2
-- Test join
@ -380,7 +408,7 @@ Distributed Query into pg_merge_job_570013
Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment
Master Query
-> Aggregate
Output: (sum(intermediate_column_570013_0) / (sum(intermediate_column_570013_1) / sum(intermediate_column_570013_2)))
Output: (sum(intermediate_column_570013_0) / (sum(intermediate_column_570013_1) / pg_catalog.sum(intermediate_column_570013_2)))
Filter: (sum(pg_merge_job_570013.intermediate_column_570013_3) > '100'::numeric)
-> Seq Scan on pg_temp_2.pg_merge_job_570013
Output: intermediate_column_570013_0, intermediate_column_570013_1, intermediate_column_570013_2, intermediate_column_570013_3
@ -512,10 +540,13 @@ EXPLAIN (COSTS FALSE, FORMAT JSON)
"Plan": {
"Node Type": "Aggregate",
"Strategy": "Plain",
"Partial Mode": "Simple",
"Parallel Aware": false,
"Plans": [
{
"Node Type": "Seq Scan",
"Parent Relationship": "Outer",
"Parallel Aware": false,
"Relation Name": "pg_merge_job_570024",
"Alias": "pg_merge_job_570024"
}
@ -562,10 +593,13 @@ EXPLAIN (COSTS FALSE, FORMAT XML)
<Plan>
<Node-Type>Aggregate</Node-Type>
<Strategy>Plain</Strategy>
<Partial-Mode>Simple</Partial-Mode>
<Parallel-Aware>false</Parallel-Aware>
<Plans>
<Plan>
<Node-Type>Seq Scan</Node-Type>
<Parent-Relationship>Outer</Parent-Relationship>
<Parallel-Aware>false</Parallel-Aware>
<Relation-Name>pg_merge_job_570030</Relation-Name>
<Alias>pg_merge_job_570030</Alias>
</Plan>
@ -602,8 +636,37 @@ EXPLAIN (COSTS FALSE, FORMAT YAML)
- Plan:
Node Type: "Aggregate"
Strategy: "Plain"
Partial Mode: "Simple"
Parallel Aware: false
Plans:
- Node Type: "Seq Scan"
Parent Relationship: "Outer"
Parallel Aware: false
Relation Name: "pg_merge_job_570036"
Alias: "pg_merge_job_570036"
-- test parallel aggregates
SET parallel_setup_cost=0;
SET parallel_tuple_cost=0;
SET min_parallel_relation_size=0;
SET max_parallel_workers_per_gather=4;
-- ensure local plans display correctly
CREATE TABLE lineitem_clone (LIKE lineitem);
EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem_clone;
Finalize Aggregate
-> Gather
Workers Planned: 3
-> Partial Aggregate
-> Parallel Seq Scan on lineitem_clone
-- ensure distributed plans don't break
EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem;
Distributed Query into pg_merge_job_570037
Executor: Task-Tracker
Task Count: 8
Tasks Shown: One of 8
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_290001 lineitem
Master Query
-> Aggregate
-> Seq Scan on pg_merge_job_570037

View File

@ -0,0 +1,643 @@
--
-- MULTI_EXPLAIN
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 570000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 570000;
-- print major version to make version-specific tests clear
SELECT substring(version(), '\d+\.\d+') AS major_version;
major_version
---------------
9.5
(1 row)
\a\t
SET citus.task_executor_type TO 'real-time';
SET citus.explain_distributed_queries TO on;
-- Function that parses explain output as JSON
CREATE FUNCTION explain_json(query text)
RETURNS jsonb
AS $BODY$
DECLARE
result jsonb;
BEGIN
EXECUTE format('EXPLAIN (FORMAT JSON) %s', query) INTO result;
RETURN result;
END;
$BODY$ LANGUAGE plpgsql;
-- Function that parses explain output as XML
CREATE FUNCTION explain_xml(query text)
RETURNS xml
AS $BODY$
DECLARE
result xml;
BEGIN
EXECUTE format('EXPLAIN (FORMAT XML) %s', query) INTO result;
RETURN result;
END;
$BODY$ LANGUAGE plpgsql;
-- Test Text format
EXPLAIN (COSTS FALSE, FORMAT TEXT)
SELECT l_quantity, count(*) count_quantity FROM lineitem
GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
Distributed Query into pg_merge_job_570000
Executor: Real-Time
Task Count: 8
Tasks Shown: One of 8
-> Task
Node: host=localhost port=57637 dbname=regression
-> HashAggregate
Group Key: l_quantity
-> Seq Scan on lineitem_290001 lineitem
Master Query
-> Sort
Sort Key: COALESCE((sum((COALESCE((sum(intermediate_column_570000_1))::bigint, '0'::bigint))))::bigint, '0'::bigint), intermediate_column_570000_0
-> HashAggregate
Group Key: intermediate_column_570000_0
-> Seq Scan on pg_merge_job_570000
-- Test JSON format
EXPLAIN (COSTS FALSE, FORMAT JSON)
SELECT l_quantity, count(*) count_quantity FROM lineitem
GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
[
{
"Executor": "Real-Time",
"Job": {
"Task Count": 8,
"Tasks Shown": "One of 8",
"Tasks": [
{
"Node": "host=localhost port=57637 dbname=regression",
"Remote Plan": [
[
{
"Plan": {
"Node Type": "Aggregate",
"Strategy": "Hashed",
"Group Key": ["l_quantity"],
"Plans": [
{
"Node Type": "Seq Scan",
"Parent Relationship": "Outer",
"Relation Name": "lineitem_290001",
"Alias": "lineitem"
}
]
}
}
]
]
}
]
},
"Master Query": [
{
"Plan": {
"Node Type": "Sort",
"Sort Key": ["COALESCE((sum((COALESCE((sum(intermediate_column_570001_1))::bigint, '0'::bigint))))::bigint, '0'::bigint)", "intermediate_column_570001_0"],
"Plans": [
{
"Node Type": "Aggregate",
"Strategy": "Hashed",
"Parent Relationship": "Outer",
"Group Key": ["intermediate_column_570001_0"],
"Plans": [
{
"Node Type": "Seq Scan",
"Parent Relationship": "Outer",
"Relation Name": "pg_merge_job_570001",
"Alias": "pg_merge_job_570001"
}
]
}
]
}
}
]
}
]
-- Validate JSON format
SELECT true AS valid FROM explain_json($$
SELECT l_quantity, count(*) count_quantity FROM lineitem
GROUP BY l_quantity ORDER BY count_quantity, l_quantity$$);
t
-- Test XML format
EXPLAIN (COSTS FALSE, FORMAT XML)
SELECT l_quantity, count(*) count_quantity FROM lineitem
GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
<explain xmlns="http://www.postgresql.org/2009/explain">
<Distributed-Query>
<Executor>Real-Time</Executor>
<Job>
<Task-Count>8</Task-Count>
<Tasks-Shown>One of 8</Tasks-Shown>
<Tasks>
<Task>
<Node>host=localhost port=57637 dbname=regression</Node>
<Remote-Plan>
<explain xmlns="http://www.postgresql.org/2009/explain">
<Query>
<Plan>
<Node-Type>Aggregate</Node-Type>
<Strategy>Hashed</Strategy>
<Group-Key>
<Item>l_quantity</Item>
</Group-Key>
<Plans>
<Plan>
<Node-Type>Seq Scan</Node-Type>
<Parent-Relationship>Outer</Parent-Relationship>
<Relation-Name>lineitem_290001</Relation-Name>
<Alias>lineitem</Alias>
</Plan>
</Plans>
</Plan>
</Query>
</explain>
</Remote-Plan>
</Task>
</Tasks>
</Job>
<Master-Query>
<Query>
<Plan>
<Node-Type>Sort</Node-Type>
<Sort-Key>
<Item>COALESCE((sum((COALESCE((sum(intermediate_column_570003_1))::bigint, '0'::bigint))))::bigint, '0'::bigint)</Item>
<Item>intermediate_column_570003_0</Item>
</Sort-Key>
<Plans>
<Plan>
<Node-Type>Aggregate</Node-Type>
<Strategy>Hashed</Strategy>
<Parent-Relationship>Outer</Parent-Relationship>
<Group-Key>
<Item>intermediate_column_570003_0</Item>
</Group-Key>
<Plans>
<Plan>
<Node-Type>Seq Scan</Node-Type>
<Parent-Relationship>Outer</Parent-Relationship>
<Relation-Name>pg_merge_job_570003</Relation-Name>
<Alias>pg_merge_job_570003</Alias>
</Plan>
</Plans>
</Plan>
</Plans>
</Plan>
</Query>
</Master-Query>
</Distributed-Query>
</explain>
-- Validate XML format
SELECT true AS valid FROM explain_xml($$
SELECT l_quantity, count(*) count_quantity FROM lineitem
GROUP BY l_quantity ORDER BY count_quantity, l_quantity$$);
t
-- Test YAML format
EXPLAIN (COSTS FALSE, FORMAT YAML)
SELECT l_quantity, count(*) count_quantity FROM lineitem
GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
- Executor: "Real-Time"
Job:
Task Count: 8
Tasks Shown: "One of 8"
Tasks:
- Node: "host=localhost port=57637 dbname=regression"
Remote Plan:
- Plan:
Node Type: "Aggregate"
Strategy: "Hashed"
Group Key:
- "l_quantity"
Plans:
- Node Type: "Seq Scan"
Parent Relationship: "Outer"
Relation Name: "lineitem_290001"
Alias: "lineitem"
Master Query:
- Plan:
Node Type: "Sort"
Sort Key:
- "COALESCE((sum((COALESCE((sum(intermediate_column_570005_1))::bigint, '0'::bigint))))::bigint, '0'::bigint)"
- "intermediate_column_570005_0"
Plans:
- Node Type: "Aggregate"
Strategy: "Hashed"
Parent Relationship: "Outer"
Group Key:
- "intermediate_column_570005_0"
Plans:
- Node Type: "Seq Scan"
Parent Relationship: "Outer"
Relation Name: "pg_merge_job_570005"
Alias: "pg_merge_job_570005"
-- Test Text format
EXPLAIN (COSTS FALSE, FORMAT TEXT)
SELECT l_quantity, count(*) count_quantity FROM lineitem
GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
Distributed Query into pg_merge_job_570006
Executor: Real-Time
Task Count: 8
Tasks Shown: One of 8
-> Task
Node: host=localhost port=57637 dbname=regression
-> HashAggregate
Group Key: l_quantity
-> Seq Scan on lineitem_290001 lineitem
Master Query
-> Sort
Sort Key: COALESCE((sum((COALESCE((sum(intermediate_column_570006_1))::bigint, '0'::bigint))))::bigint, '0'::bigint), intermediate_column_570006_0
-> HashAggregate
Group Key: intermediate_column_570006_0
-> Seq Scan on pg_merge_job_570006
-- Test verbose
EXPLAIN (COSTS FALSE, VERBOSE TRUE)
SELECT sum(l_quantity) / avg(l_quantity) FROM lineitem;
Distributed Query into pg_merge_job_570007
Executor: Real-Time
Task Count: 8
Tasks Shown: One of 8
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
Output: sum(l_quantity), sum(l_quantity), count(l_quantity)
-> Seq Scan on public.lineitem_290001 lineitem
Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment
Master Query
-> Aggregate
Output: (sum(intermediate_column_570007_0) / (sum(intermediate_column_570007_1) / sum(intermediate_column_570007_2)))
-> Seq Scan on pg_temp_2.pg_merge_job_570007
Output: intermediate_column_570007_0, intermediate_column_570007_1, intermediate_column_570007_2
-- Test join
EXPLAIN (COSTS FALSE)
SELECT * FROM lineitem
JOIN orders ON l_orderkey = o_orderkey AND l_quantity < 5.0
ORDER BY l_quantity LIMIT 10;
Distributed Query into pg_merge_job_570008
Executor: Real-Time
Task Count: 8
Tasks Shown: One of 8
-> Task
Node: host=localhost port=57637 dbname=regression
-> Limit
-> Sort
Sort Key: lineitem.l_quantity
-> Hash Join
Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
-> Seq Scan on lineitem_290001 lineitem
Filter: (l_quantity < 5.0)
-> Hash
-> Seq Scan on orders_290008 orders
Master Query
-> Limit
-> Sort
Sort Key: intermediate_column_570008_4
-> Seq Scan on pg_merge_job_570008
-- Test insert
EXPLAIN (COSTS FALSE)
INSERT INTO lineitem VALUES(1,0);
Distributed Query
Executor: Router
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=57638 dbname=regression
-> Insert on lineitem_290000
-> Result
-- Test update
EXPLAIN (COSTS FALSE)
UPDATE lineitem
SET l_suppkey = 12
WHERE l_orderkey = 1 AND l_partkey = 0;
Distributed Query
Executor: Router
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=57638 dbname=regression
-> Update on lineitem_290000
-> Bitmap Heap Scan on lineitem_290000
Recheck Cond: (l_orderkey = 1)
Filter: (l_partkey = 0)
-> Bitmap Index Scan on lineitem_pkey_290000
Index Cond: (l_orderkey = 1)
-- Test delete
EXPLAIN (COSTS FALSE)
DELETE FROM lineitem
WHERE l_orderkey = 1 AND l_partkey = 0;
Distributed Query
Executor: Router
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=57638 dbname=regression
-> Delete on lineitem_290000
-> Bitmap Heap Scan on lineitem_290000
Recheck Cond: (l_orderkey = 1)
Filter: (l_partkey = 0)
-> Bitmap Index Scan on lineitem_pkey_290000
Index Cond: (l_orderkey = 1)
-- Test single-shard SELECT
EXPLAIN (COSTS FALSE)
SELECT l_quantity FROM lineitem WHERE l_orderkey = 5;
Distributed Query into pg_merge_job_570009
Executor: Router
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=57637 dbname=regression
-> Bitmap Heap Scan on lineitem_290000 lineitem
Recheck Cond: (l_orderkey = 5)
-> Bitmap Index Scan on lineitem_pkey_290000
Index Cond: (l_orderkey = 5)
SELECT true AS valid FROM explain_xml($$
SELECT l_quantity FROM lineitem WHERE l_orderkey = 5$$);
t
SELECT true AS valid FROM explain_json($$
SELECT l_quantity FROM lineitem WHERE l_orderkey = 5$$);
t
-- Test CREATE TABLE ... AS
EXPLAIN (COSTS FALSE)
CREATE TABLE explain_result AS
SELECT * FROM lineitem;
Distributed Query into pg_merge_job_570012
Executor: Real-Time
Task Count: 8
Tasks Shown: One of 8
-> Task
Node: host=localhost port=57637 dbname=regression
-> Seq Scan on lineitem_290001 lineitem
Master Query
-> Seq Scan on pg_merge_job_570012
-- Test having
EXPLAIN (COSTS FALSE, VERBOSE TRUE)
SELECT sum(l_quantity) / avg(l_quantity) FROM lineitem
HAVING sum(l_quantity) > 100;
Distributed Query into pg_merge_job_570013
Executor: Real-Time
Task Count: 8
Tasks Shown: One of 8
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
Output: sum(l_quantity), sum(l_quantity), count(l_quantity), sum(l_quantity)
-> Seq Scan on public.lineitem_290001 lineitem
Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment
Master Query
-> Aggregate
Output: (sum(intermediate_column_570013_0) / (sum(intermediate_column_570013_1) / sum(intermediate_column_570013_2)))
Filter: (sum(pg_merge_job_570013.intermediate_column_570013_3) > '100'::numeric)
-> Seq Scan on pg_temp_2.pg_merge_job_570013
Output: intermediate_column_570013_0, intermediate_column_570013_1, intermediate_column_570013_2, intermediate_column_570013_3
-- Test having without aggregate
EXPLAIN (COSTS FALSE, VERBOSE TRUE)
SELECT l_quantity FROM lineitem
GROUP BY l_quantity
HAVING l_quantity > (100 * random());
Distributed Query into pg_merge_job_570014
Executor: Real-Time
Task Count: 8
Tasks Shown: One of 8
-> Task
Node: host=localhost port=57637 dbname=regression
-> HashAggregate
Output: l_quantity, l_quantity
Group Key: lineitem.l_quantity
-> Seq Scan on public.lineitem_290001 lineitem
Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment
Master Query
-> HashAggregate
Output: intermediate_column_570014_0
Group Key: pg_merge_job_570014.intermediate_column_570014_0
Filter: ((pg_merge_job_570014.intermediate_column_570014_1)::double precision > ('100'::double precision * random()))
-> Seq Scan on pg_temp_2.pg_merge_job_570014
Output: intermediate_column_570014_0, intermediate_column_570014_1
-- Test all tasks output
SET citus.explain_all_tasks TO on;
EXPLAIN (COSTS FALSE)
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
Distributed Query into pg_merge_job_570015
Executor: Real-Time
Task Count: 4
Tasks Shown: All
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_290005 lineitem
Filter: (l_orderkey > 9030)
-> Task
Node: host=localhost port=57638 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_290004 lineitem
Filter: (l_orderkey > 9030)
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_290007 lineitem
Filter: (l_orderkey > 9030)
-> Task
Node: host=localhost port=57638 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_290006 lineitem
Filter: (l_orderkey > 9030)
Master Query
-> Aggregate
-> Seq Scan on pg_merge_job_570015
SELECT true AS valid FROM explain_xml($$
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030$$);
t
SELECT true AS valid FROM explain_json($$
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030$$);
t
-- Test track tracker
SET citus.task_executor_type TO 'task-tracker';
SET citus.explain_all_tasks TO off;
EXPLAIN (COSTS FALSE)
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
Distributed Query into pg_merge_job_570018
Executor: Task-Tracker
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_290005 lineitem
Filter: (l_orderkey > 9030)
Master Query
-> Aggregate
-> Seq Scan on pg_merge_job_570018
-- Test re-partition join
SET citus.large_table_shard_count TO 1;
EXPLAIN (COSTS FALSE)
SELECT count(*)
FROM lineitem, orders, customer, supplier
WHERE l_orderkey = o_orderkey
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey;
Distributed Query into pg_merge_job_570021
Executor: Task-Tracker
Task Count: 1
Tasks Shown: None, not supported for re-partition queries
-> MapMergeJob
Map Task Count: 1
Merge Task Count: 1
-> MapMergeJob
Map Task Count: 8
Merge Task Count: 1
Master Query
-> Aggregate
-> Seq Scan on pg_merge_job_570021
EXPLAIN (COSTS FALSE, FORMAT JSON)
SELECT count(*)
FROM lineitem, orders, customer, supplier
WHERE l_orderkey = o_orderkey
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey;
[
{
"Executor": "Task-Tracker",
"Job": {
"Task Count": 1,
"Tasks Shown": "None, not supported for re-partition queries",
"Depended Jobs": [
{
"Map Task Count": 1,
"Merge Task Count": 1,
"Depended Jobs": [
{
"Map Task Count": 8,
"Merge Task Count": 1
}
]
}
]
},
"Master Query": [
{
"Plan": {
"Node Type": "Aggregate",
"Strategy": "Plain",
"Plans": [
{
"Node Type": "Seq Scan",
"Parent Relationship": "Outer",
"Relation Name": "pg_merge_job_570024",
"Alias": "pg_merge_job_570024"
}
]
}
}
]
}
]
SELECT true AS valid FROM explain_json($$
SELECT count(*)
FROM lineitem, orders, customer, supplier
WHERE l_orderkey = o_orderkey
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey$$);
t
EXPLAIN (COSTS FALSE, FORMAT XML)
SELECT count(*)
FROM lineitem, orders, customer, supplier
WHERE l_orderkey = o_orderkey
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey;
<explain xmlns="http://www.postgresql.org/2009/explain">
<Distributed-Query>
<Executor>Task-Tracker</Executor>
<Job>
<Task-Count>1</Task-Count>
<Tasks-Shown>None, not supported for re-partition queries</Tasks-Shown>
<Depended-Jobs>
<MapMergeJob>
<Map-Task-Count>1</Map-Task-Count>
<Merge-Task-Count>1</Merge-Task-Count>
<Depended-Jobs>
<MapMergeJob>
<Map-Task-Count>8</Map-Task-Count>
<Merge-Task-Count>1</Merge-Task-Count>
</MapMergeJob>
</Depended-Jobs>
</MapMergeJob>
</Depended-Jobs>
</Job>
<Master-Query>
<Query>
<Plan>
<Node-Type>Aggregate</Node-Type>
<Strategy>Plain</Strategy>
<Plans>
<Plan>
<Node-Type>Seq Scan</Node-Type>
<Parent-Relationship>Outer</Parent-Relationship>
<Relation-Name>pg_merge_job_570030</Relation-Name>
<Alias>pg_merge_job_570030</Alias>
</Plan>
</Plans>
</Plan>
</Query>
</Master-Query>
</Distributed-Query>
</explain>
SELECT true AS valid FROM explain_xml($$
SELECT count(*)
FROM lineitem, orders, customer, supplier
WHERE l_orderkey = o_orderkey
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey$$);
t
EXPLAIN (COSTS FALSE, FORMAT YAML)
SELECT count(*)
FROM lineitem, orders, customer, supplier
WHERE l_orderkey = o_orderkey
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey;
- Executor: "Task-Tracker"
Job:
Task Count: 1
Tasks Shown: "None, not supported for re-partition queries"
Depended Jobs:
- Map Task Count: 1
Merge Task Count: 1
Depended Jobs:
- Map Task Count: 8
Merge Task Count: 1
Master Query:
- Plan:
Node Type: "Aggregate"
Strategy: "Plain"
Plans:
- Node Type: "Seq Scan"
Parent Relationship: "Outer"
Relation Name: "pg_merge_job_570036"
Alias: "pg_merge_job_570036"
-- test parallel aggregates
SET parallel_setup_cost=0;
ERROR: unrecognized configuration parameter "parallel_setup_cost"
SET parallel_tuple_cost=0;
ERROR: unrecognized configuration parameter "parallel_tuple_cost"
SET min_parallel_relation_size=0;
ERROR: unrecognized configuration parameter "min_parallel_relation_size"
SET max_parallel_workers_per_gather=4;
ERROR: unrecognized configuration parameter "max_parallel_workers_per_gather"
-- ensure local plans display correctly
CREATE TABLE lineitem_clone (LIKE lineitem);
EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem_clone;
Aggregate
-> Seq Scan on lineitem_clone
-- ensure distributed plans don't break
EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem;
Distributed Query into pg_merge_job_570037
Executor: Task-Tracker
Task Count: 8
Tasks Shown: One of 8
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_290001 lineitem
Master Query
-> Aggregate
-> Seq Scan on pg_merge_job_570037

View File

@ -86,6 +86,7 @@ SELECT name FROM researchers WHERE lab_id = 3 AND id = 6;
(1 row)
-- even if created by PL/pgSQL...
\set VERBOSITY terse
BEGIN;
DO $$
BEGIN
@ -120,6 +121,7 @@ END $$;
NOTICE: caught not_null_violation
COMMIT;
ERROR: cannot ROLLBACK TO SAVEPOINT in transactions which modify distributed tables
\set VERBOSITY default
-- should be valid to edit labs after researchers...
BEGIN;
INSERT INTO researchers VALUES (8, 5, 'Douglas Engelbart');
@ -318,11 +320,11 @@ DEFERRABLE INITIALLY IMMEDIATE
FOR EACH ROW EXECUTE PROCEDURE reject_bad();
\c - - - :master_port
-- test partial failure; worker_1 succeeds, 2 fails
\set VERBOSITY terse
BEGIN;
INSERT INTO objects VALUES (1, 'apple');
INSERT INTO objects VALUES (2, 'BAD');
WARNING: illegal value
CONTEXT: while executing command on localhost:57638
INSERT INTO labs VALUES (7, 'E Corp');
COMMIT;
-- data should be persisted
@ -378,11 +380,9 @@ BEGIN;
INSERT INTO objects VALUES (1, 'apple');
INSERT INTO objects VALUES (2, 'BAD');
WARNING: illegal value
CONTEXT: while executing command on localhost:57638
INSERT INTO labs VALUES (8, 'Aperture Science');
INSERT INTO labs VALUES (9, 'BAD');
WARNING: illegal value
CONTEXT: while executing command on localhost:57637
ERROR: could not modify any active placements
COMMIT;
-- data should NOT be persisted
@ -424,7 +424,6 @@ INSERT INTO objects VALUES (2, 'BAD');
INSERT INTO labs VALUES (9, 'Umbrella Corporation');
COMMIT;
WARNING: illegal value
CONTEXT: while executing command on localhost:57638
-- data should be persisted
SELECT * FROM objects WHERE id = 2;
id | name
@ -473,9 +472,7 @@ INSERT INTO labs VALUES (8, 'Aperture Science');
INSERT INTO labs VALUES (9, 'BAD');
COMMIT;
WARNING: illegal value
CONTEXT: while executing command on localhost:57638
WARNING: illegal value
CONTEXT: while executing command on localhost:57637
ERROR: could not commit transaction on any active nodes
-- data should NOT be persisted
SELECT * FROM objects WHERE id = 1;
@ -511,7 +508,7 @@ INSERT INTO labs VALUES (8, 'Aperture Science');
INSERT INTO labs VALUES (9, 'BAD');
COMMIT;
WARNING: illegal value
CONTEXT: while executing command on localhost:57637
\set VERBOSITY default
-- data to objects should be persisted, but labs should not...
SELECT * FROM objects WHERE id = 1;
id | name

View File

@ -757,33 +757,35 @@ DEBUG: Plan is router executable
SELECT id
FROM articles_hash
WHERE author_id = 1
GROUP BY id;
GROUP BY id
ORDER BY id;
DEBUG: predicate pruning for shardId 840001
DEBUG: Creating router plan
DEBUG: Plan is router executable
id
----
41
11
31
1
11
21
31
41
(5 rows)
-- single shard select with distinct is router plannable
SELECT distinct id
SELECT DISTINCT id
FROM articles_hash
WHERE author_id = 1;
WHERE author_id = 1
ORDER BY id;
DEBUG: predicate pruning for shardId 840001
DEBUG: Creating router plan
DEBUG: Plan is router executable
id
----
41
11
31
1
11
21
31
41
(5 rows)
-- single shard aggregate is router plannable

View File

@ -494,17 +494,18 @@ DEBUG: Plan is router executable
SELECT id
FROM articles
WHERE author_id = 1
GROUP BY id;
GROUP BY id
ORDER BY id;
DEBUG: predicate pruning for shardId 850001
DEBUG: Creating router plan
DEBUG: Plan is router executable
id
----
41
11
31
1
11
21
31
41
(5 rows)
-- copying from a single shard table does not require the master query

View File

@ -6,6 +6,10 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 270000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 270000;
-- print major version to make version-specific tests clear
SHOW server_version \gset
SELECT substring(:'server_version', '\d+\.\d+') AS major_version;
-- Create tables for subquery tests
CREATE TABLE lineitem_subquery (

View File

@ -3,6 +3,14 @@
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 270000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 270000;
-- print major version to make version-specific tests clear
SHOW server_version \gset
SELECT substring(:'server_version', '\d+\.\d+') AS major_version;
major_version
---------------
9.6
(1 row)
-- Create tables for subquery tests
CREATE TABLE lineitem_subquery (
l_orderkey bigint not null,
@ -765,10 +773,10 @@ FROM
Tasks Shown: One of 2
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate (cost=40.01..40.02 rows=1 width=32)
-> GroupAggregate (cost=39.89..39.99 rows=1 width=556)
-> Aggregate (cost=40.01..40.02 rows=1 width=16)
-> GroupAggregate (cost=39.89..39.99 rows=1 width=48)
Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id)
-> Merge Join (cost=39.89..39.97 rows=1 width=556)
-> Merge Join (cost=39.89..39.97 rows=1 width=540)
Merge Cond: ((((users.composite_id).tenant_id) = ((events.composite_id).tenant_id)) AND (((users.composite_id).user_id) = ((events.composite_id).user_id)))
-> Sort (cost=28.08..28.09 rows=6 width=32)
Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id)
@ -779,7 +787,7 @@ FROM
-> Seq Scan on events_270009 events (cost=0.00..11.79 rows=3 width=556)
Filter: ((event_type)::text = ANY ('{click,submit,pay}'::text[]))
Master Query
-> Aggregate (cost=0.01..0.02 rows=1 width=0)
-> Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Seq Scan on pg_merge_job_270014 (cost=0.00..0.00 rows=0 width=0)
(22 rows)
@ -847,48 +855,51 @@ FROM
GROUP BY
hasdone;
QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Distributed Query into pg_merge_job_270015
Executor: Real-Time
Task Count: 2
Tasks Shown: One of 2
-> Task
Node: host=localhost port=57637 dbname=regression
-> HashAggregate (cost=91.94..91.96 rows=2 width=64)
Group Key: COALESCE(('Has done paying'::text), 'Has not done paying'::text)
-> GroupAggregate (cost=91.85..91.90 rows=2 width=88)
-> GroupAggregate (cost=91.93..91.98 rows=2 width=48)
Group Key: subquery_top.hasdone
-> Sort (cost=91.93..91.93 rows=2 width=64)
Sort Key: subquery_top.hasdone
-> Subquery Scan on subquery_top (cost=91.85..91.92 rows=2 width=64)
-> GroupAggregate (cost=91.85..91.90 rows=2 width=112)
Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id), ('Has done paying'::text)
-> Sort (cost=91.85..91.85 rows=2 width=88)
Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id), ('Has done paying'::text)
-> Merge Left Join (cost=91.75..91.84 rows=2 width=88)
Merge Cond: ((((users.composite_id).tenant_id) = ((events_2.composite_id).tenant_id)) AND (((users.composite_id).user_id) = ((events_2.composite_id).user_id)))
-> Unique (cost=79.46..79.48 rows=2 width=40)
-> Sort (cost=79.46..79.47 rows=2 width=40)
-> Unique (cost=79.46..79.48 rows=2 width=56)
-> Sort (cost=79.46..79.47 rows=2 width=56)
Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id), ('action=>1'::text), events.event_time
-> Append (cost=0.00..79.45 rows=2 width=40)
-> Nested Loop (cost=0.00..39.72 rows=1 width=40)
-> Append (cost=0.00..79.45 rows=2 width=56)
-> Nested Loop (cost=0.00..39.72 rows=1 width=56)
Join Filter: (((users.composite_id).tenant_id = (events.composite_id).tenant_id) AND ((users.composite_id).user_id = (events.composite_id).user_id))
-> Seq Scan on events_270009 events (cost=0.00..11.62 rows=1 width=40)
Filter: ((event_type)::text = 'click'::text)
-> Seq Scan on users_270013 users (cost=0.00..28.00 rows=6 width=32)
Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type))
-> Nested Loop (cost=0.00..39.72 rows=1 width=40)
-> Nested Loop (cost=0.00..39.72 rows=1 width=56)
Join Filter: (((users_1.composite_id).tenant_id = (events_1.composite_id).tenant_id) AND ((users_1.composite_id).user_id = (events_1.composite_id).user_id))
-> Seq Scan on events_270009 events_1 (cost=0.00..11.62 rows=1 width=40)
Filter: ((event_type)::text = 'submit'::text)
-> Seq Scan on users_270013 users_1 (cost=0.00..28.00 rows=6 width=32)
Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type))
-> Materialize (cost=12.29..12.31 rows=1 width=48)
-> Unique (cost=12.29..12.30 rows=1 width=32)
-> Sort (cost=12.29..12.29 rows=1 width=32)
-> Unique (cost=12.29..12.30 rows=1 width=80)
-> Sort (cost=12.29..12.29 rows=1 width=80)
Sort Key: ((events_2.composite_id).tenant_id), ((events_2.composite_id).user_id)
-> Seq Scan on events_270009 events_2 (cost=0.00..12.28 rows=1 width=32)
-> Seq Scan on events_270009 events_2 (cost=0.00..12.28 rows=1 width=80)
Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type) AND ((event_type)::text = 'pay'::text))
Master Query
-> HashAggregate (cost=0.00..0.18 rows=10 width=0)
-> HashAggregate (cost=0.00..0.00 rows=0 width=0)
Group Key: intermediate_column_270015_2
-> Seq Scan on pg_merge_job_270015 (cost=0.00..0.00 rows=0 width=0)
(40 rows)
(43 rows)
-- Union, left join and having subquery pushdown
EXPLAIN SELECT
@ -1023,15 +1034,15 @@ LIMIT
-> Limit (cost=100.43..100.44 rows=6 width=56)
-> Sort (cost=100.43..100.44 rows=6 width=56)
Sort Key: (max(users.lastseen)) DESC
-> GroupAggregate (cost=100.14..100.29 rows=6 width=548)
-> GroupAggregate (cost=100.14..100.29 rows=6 width=56)
Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id)
-> Sort (cost=100.14..100.16 rows=6 width=548)
Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id)
-> Nested Loop Left Join (cost=40.04..100.06 rows=6 width=548)
-> Limit (cost=28.08..28.09 rows=6 width=40)
-> Sort (cost=28.08..28.09 rows=6 width=40)
-> Limit (cost=28.08..28.09 rows=6 width=24)
-> Sort (cost=28.08..28.09 rows=6 width=24)
Sort Key: users.lastseen DESC
-> Seq Scan on users_270013 users (cost=0.00..28.00 rows=6 width=40)
-> Seq Scan on users_270013 users (cost=0.00..28.00 rows=6 width=24)
Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type))
-> Limit (cost=11.96..11.96 rows=1 width=524)
-> Sort (cost=11.96..11.96 rows=1 width=524)
@ -1039,8 +1050,8 @@ LIMIT
-> Seq Scan on events_270009 events (cost=0.00..11.95 rows=1 width=524)
Filter: (((composite_id).tenant_id = ((users.composite_id).tenant_id)) AND ((composite_id).user_id = ((users.composite_id).user_id)))
Master Query
-> Limit (cost=0.01..0.02 rows=0 width=0)
-> Sort (cost=0.01..0.02 rows=0 width=0)
-> Limit (cost=0.00..0.00 rows=0 width=0)
-> Sort (cost=0.00..0.00 rows=0 width=0)
Sort Key: intermediate_column_270017_2 DESC
-> Seq Scan on pg_merge_job_270017 (cost=0.00..0.00 rows=0 width=0)
(29 rows)

File diff suppressed because it is too large Load Diff

View File

@ -143,6 +143,7 @@ sysopen my $fh, "tmp_check/tmp-bin/psql", O_CREAT|O_TRUNC|O_RDWR, 0700
print $fh "#!/bin/bash\n";
print $fh "exec $bindir/psql ";
print $fh "--variable=master_port=$masterPort ";
print $fh "--variable=SHOW_CONTEXT=always ";
for my $workeroff (0 .. $#workerPorts)
{
my $port = $workerPorts[$workeroff];
@ -226,14 +227,14 @@ for my $port (@workerPorts)
###
for my $port (@workerPorts)
{
system("$bindir/psql",
system("$bindir/psql", '-X',
('-h', $host, '-p', $port, '-U', $user, "postgres",
'-c', "CREATE DATABASE regression;")) == 0
or die "Could not create regression database on worker";
for my $extension (@extensions)
{
system("$bindir/psql",
system("$bindir/psql", '-X',
('-h', $host, '-p', $port, '-U', $user, "regression",
'-c', "CREATE EXTENSION IF NOT EXISTS \"$extension\";")) == 0
or die "Could not create extension on worker";
@ -241,7 +242,7 @@ for my $port (@workerPorts)
foreach my $dataType (keys %dataTypes)
{
system("$bindir/psql",
system("$bindir/psql", '-X',
('-h', $host, '-p', $port, '-U', $user, "regression",
'-c', "CREATE TYPE $dataType AS $dataTypes{$dataType};")) == 0
or die "Could not create TYPE $dataType on worker";
@ -249,7 +250,7 @@ for my $port (@workerPorts)
foreach my $function (keys %functions)
{
system("$bindir/psql",
system("$bindir/psql", '-X',
('-h', $host, '-p', $port, '-U', $user, "regression",
'-c', "CREATE FUNCTION $function RETURNS $functions{$function};")) == 0
or die "Could not create FUNCTION $function on worker";
@ -257,7 +258,7 @@ for my $port (@workerPorts)
foreach my $operator (keys %operators)
{
system("$bindir/psql",
system("$bindir/psql", '-X',
('-h', $host, '-p', $port, '-U', $user, "regression",
'-c', "CREATE OPERATOR $operator $operators{$operator};")) == 0
or die "Could not create OPERATOR $operator on worker";
@ -265,7 +266,7 @@ for my $port (@workerPorts)
foreach my $fdw (keys %fdws)
{
system("$bindir/psql",
system("$bindir/psql", '-X',
('-h', $host, '-p', $port, '-U', $user, "regression",
'-c', "CREATE FOREIGN DATA WRAPPER $fdw HANDLER $fdws{$fdw};")) == 0
or die "Could not create foreign data wrapper $fdw on worker";
@ -273,7 +274,7 @@ for my $port (@workerPorts)
foreach my $fdwServer (keys %fdwServers)
{
system("$bindir/psql",
system("$bindir/psql", '-X',
('-h', $host, '-p', $port, '-U', $user, "regression",
'-c', "CREATE SERVER $fdwServer FOREIGN DATA WRAPPER $fdwServers{$fdwServer};")) == 0
or die "Could not create server $fdwServer on worker";
@ -287,7 +288,7 @@ my @arguments = (
'--user', $user
);
if ($majorversion eq '9.5')
if ($majorversion eq '9.5' || $majorversion eq '9.6')
{
push(@arguments, '--bindir', "tmp_check/tmp-bin");
}

View File

@ -11,5 +11,6 @@
/multi_load_large_records.sql
/multi_load_more_data.sql
/multi_subquery.sql
/multi_subquery_0.sql
/worker_copy.sql
/multi_complex_count_distinct.sql

View File

@ -6,6 +6,11 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 520000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 520000;
CREATE OR REPLACE FUNCTION array_sort (ANYARRAY)
RETURNS ANYARRAY LANGUAGE SQL
AS $$
SELECT ARRAY(SELECT unnest($1) ORDER BY 1)
$$;
-- Check multi_cat_agg() aggregate which is used to implement array_agg()
@ -21,16 +26,16 @@ SELECT array_agg(distinct l_orderkey ORDER BY l_orderkey) FROM lineitem;
-- Check array_agg() for different data types and LIMIT clauses
SELECT array_agg(l_partkey) FROM lineitem GROUP BY l_orderkey
SELECT array_sort(array_agg(l_partkey)) FROM lineitem GROUP BY l_orderkey
ORDER BY l_orderkey LIMIT 10;
SELECT array_agg(l_extendedprice) FROM lineitem GROUP BY l_orderkey
SELECT array_sort(array_agg(l_extendedprice)) FROM lineitem GROUP BY l_orderkey
ORDER BY l_orderkey LIMIT 10;
SELECT array_agg(l_shipdate) FROM lineitem GROUP BY l_orderkey
SELECT array_sort(array_agg(l_shipdate)) FROM lineitem GROUP BY l_orderkey
ORDER BY l_orderkey LIMIT 10;
SELECT array_agg(l_shipmode) FROM lineitem GROUP BY l_orderkey
SELECT array_sort(array_agg(l_shipmode)) FROM lineitem GROUP BY l_orderkey
ORDER BY l_orderkey LIMIT 10;
-- Check that we can execute array_agg() within other functions
@ -42,15 +47,15 @@ SELECT array_length(array_agg(l_orderkey), 1) FROM lineitem;
-- expressions. Note that the l_orderkey ranges are such that the matching rows
-- lie in different shards.
SELECT l_quantity, count(*), avg(l_extendedprice), array_agg(l_orderkey) FROM lineitem
SELECT l_quantity, count(*), avg(l_extendedprice), array_sort(array_agg(l_orderkey)) FROM lineitem
WHERE l_quantity < 5 AND l_orderkey > 5500 AND l_orderkey < 9500
GROUP BY l_quantity ORDER BY l_quantity;
SELECT l_quantity, array_agg(extract (month FROM o_orderdate)) AS my_month
SELECT l_quantity, array_sort(array_agg(extract (month FROM o_orderdate))) AS my_month
FROM lineitem, orders WHERE l_orderkey = o_orderkey AND l_quantity < 5
AND l_orderkey > 5500 AND l_orderkey < 9500 GROUP BY l_quantity ORDER BY l_quantity;
SELECT l_quantity, array_agg(l_orderkey * 2 + 1) FROM lineitem WHERE l_quantity < 5
SELECT l_quantity, array_sort(array_agg(l_orderkey * 2 + 1)) FROM lineitem WHERE l_quantity < 5
AND octet_length(l_comment) + octet_length('randomtext'::text) > 40
AND l_orderkey > 5500 AND l_orderkey < 9500 GROUP BY l_quantity ORDER BY l_quantity;

View File

@ -6,6 +6,8 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 570000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 570000;
-- print major version to make version-specific tests clear
SELECT substring(version(), '\d+\.\d+') AS major_version;
\a\t
@ -185,3 +187,16 @@ EXPLAIN (COSTS FALSE, FORMAT YAML)
WHERE l_orderkey = o_orderkey
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey;
-- test parallel aggregates
SET parallel_setup_cost=0;
SET parallel_tuple_cost=0;
SET min_parallel_relation_size=0;
SET max_parallel_workers_per_gather=4;
-- ensure local plans display correctly
CREATE TABLE lineitem_clone (LIKE lineitem);
EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem_clone;
-- ensure distributed plans don't break
EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem;

View File

@ -63,6 +63,7 @@ COMMIT;
SELECT name FROM researchers WHERE lab_id = 3 AND id = 6;
-- even if created by PL/pgSQL...
\set VERBOSITY terse
BEGIN;
DO $$
BEGIN
@ -93,6 +94,7 @@ EXCEPTION
RAISE NOTICE 'caught not_null_violation';
END $$;
COMMIT;
\set VERBOSITY default
-- should be valid to edit labs after researchers...
@ -247,6 +249,7 @@ FOR EACH ROW EXECUTE PROCEDURE reject_bad();
\c - - - :master_port
-- test partial failure; worker_1 succeeds, 2 fails
\set VERBOSITY terse
BEGIN;
INSERT INTO objects VALUES (1, 'apple');
INSERT INTO objects VALUES (2, 'BAD');
@ -399,6 +402,7 @@ INSERT INTO objects VALUES (1, 'apple');
INSERT INTO labs VALUES (8, 'Aperture Science');
INSERT INTO labs VALUES (9, 'BAD');
COMMIT;
\set VERBOSITY default
-- data to objects should be persisted, but labs should not...
SELECT * FROM objects WHERE id = 1;

View File

@ -334,12 +334,14 @@ SELECT *
SELECT id
FROM articles_hash
WHERE author_id = 1
GROUP BY id;
GROUP BY id
ORDER BY id;
-- single shard select with distinct is router plannable
SELECT distinct id
SELECT DISTINCT id
FROM articles_hash
WHERE author_id = 1;
WHERE author_id = 1
ORDER BY id;
-- single shard aggregate is router plannable
SELECT avg(word_count)

View File

@ -260,7 +260,8 @@ SELECT *
SELECT id
FROM articles
WHERE author_id = 1
GROUP BY id;
GROUP BY id
ORDER BY id;
-- copying from a single shard table does not require the master query
COPY articles_single_shard TO stdout;