Support PostgreSQL 9.6

Adds support for PostgreSQL 9.6 by copying in the requisite ruleutils
file and refactoring the out/readfuncs code to flexibly support the
old-style copy/pasted out/readfuncs (prior to 9.6) or use extensible
node APIs (in 9.6 and higher).

Most version-specific code within this change is only needed to set new
fields in the AggRef nodes we build for aggregations. Version-specific
test output files were added in certain cases, though in most they were
not necessary. Each such file begins by e.g. printing the major version
in order to clarify its purpose.

The comment atop citus_nodes.h details how to add support for new nodes
for when that becomes necessary.
pull/850/head
Andres Freund 2016-03-12 18:38:57 -08:00 committed by Jason Petersen
parent a2f6b29a6d
commit ac14b2edbc
No known key found for this signature in database
GPG Key ID: 9F1D3510D110ABA9
43 changed files with 10335 additions and 459 deletions

1
.gitattributes vendored
View File

@ -29,4 +29,5 @@ src/backend/distributed/utils/citus_outfuncs.c -citus-style
src/backend/distributed/utils/citus_read.c -citus-style
src/backend/distributed/utils/citus_readfuncs_95.c -citus-style
src/backend/distributed/utils/ruleutils_95.c -citus-style
src/backend/distributed/utils/ruleutils_96.c -citus-style
src/include/distributed/citus_nodes.h -citus-style

View File

@ -9,8 +9,9 @@ env:
secure: degV+qb2xHiea7E2dGk/WLvmYjq4ZsBn6ZPko+YhRcNm2GRXRaU3FqMBIecPtsEEFYaL5GwCQq/CgBf9aQxgDQ+t2CrmtGTtI9AGAbVBl//amNeJOoLe6QvrDpSQX5pUxwDLCng8cvoQK7ZxGlNCzDKiu4Ep4DUWgQVpauJkQ9nHjtSMZvUqCoI9h1lBy9Mxh7YFfHPW2PAXCqpV4VlNiIYF84UKdX3MXKLy9Yt0JBSNTWLZFp/fFw2qNwzFvN94rF3ZvFSD7Wp6CIhT6R5/6k6Zx8YQIrjWhgm6OVy1osUA8X7W79h2ISPqKqMNVJkjJ+N8S4xuQU0kfejnQ74Ie/uJiHCmbW5W2TjpL1aU3FQpPsGwR8h0rSeHhJAJzd8Ma+z8vvnnQHDyvetPBB0WgA/VMQCu8uEutyfYw2hDmB2+l2dDwkViaI7R95bReAGrpd5uNqklAXuR7yOeArz0ZZpHV0aZHGcNBxznMaZExSVZ5DVPW38UPn7Kgse8BnOWeLgnA1hJVp6CmBCtu+hKYt+atBPgRbM8IUINnKKZf/Sk6HeJIJZs662jD8/X93vFi0ZtyV2jEKJpouWw8j4vrGGsaDzTEUcyJgDqZj7tPJptM2L5B3BcFJmkGj2HO3N+LGDarJrVBBSiEjhTgx4NnLiKZnUbMx547mCRg2akk2w=
matrix:
- PGVERSION=9.5
- PGVERSION=9.6
before_install:
- git clone -b v0.4.1 --depth 1 https://github.com/citusdata/tools.git
- git clone -b v0.4.3 --depth 1 https://github.com/citusdata/tools.git
- sudo make -C tools install
- setup_apt
- curl https://install.citusdata.com/community/deb.sh | sudo bash

2
configure vendored
View File

@ -1915,7 +1915,7 @@ if test -z "$version_num"; then
as_fn_error $? "Could not detect PostgreSQL version from pg_config." "$LINENO" 5
fi
if test "$version_num" != '9.5'; then
if test "$version_num" != '9.5' -a "$version_num" != '9.6'; then
as_fn_error $? "Citus is not compatible with the detected PostgreSQL version ${version_num}." "$LINENO" 5
else
{ $as_echo "$as_me:${as_lineno-$LINENO}: building against PostgreSQL $version_num" >&5

View File

@ -36,7 +36,7 @@ if test -z "$version_num"; then
AC_MSG_ERROR([Could not detect PostgreSQL version from pg_config.])
fi
if test "$version_num" != '9.5'; then
if test "$version_num" != '9.5' -a "$version_num" != '9.6'; then
AC_MSG_ERROR([Citus is not compatible with the detected PostgreSQL version ${version_num}.])
else
AC_MSG_NOTICE([building against PostgreSQL $version_num])

View File

@ -200,7 +200,7 @@ CopyQueryResults(List *masterCopyStmtList)
/* Execute query plan. */
void
multi_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count)
multi_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, tuplecount_t count)
{
int eflags = queryDesc->estate->es_top_eflags;

View File

@ -18,6 +18,7 @@
#include "commands/dbcommands.h"
#include "commands/explain.h"
#include "commands/tablecmds.h"
#include "optimizer/cost.h"
#include "distributed/citus_nodefuncs.h"
#include "distributed/multi_client_executor.h"
#include "distributed/multi_executor.h"
@ -108,9 +109,22 @@ MultiExplainOneQuery(Query *query, IntoClause *into, ExplainState *es,
instr_time planDuration;
Query *originalQuery = NULL;
RelationRestrictionContext *restrictionContext = NULL;
/* if local query, run the standard explain and return */
bool localQuery = !NeedsDistributedPlanning(query);
int cursorOptions = 0;
#if PG_VERSION_NUM >= 90600
/*
* Allow parallel plans in 9.6+ unless selecting into a table.
* Without this, we're breaking explain for non-Citus plans.
*/
if (!into)
{
cursorOptions |= CURSOR_OPT_PARALLEL_OK;
}
#endif
/* handle local queries in the same way as ExplainOneQuery */
if (localQuery)
{
PlannedStmt *plan = NULL;
@ -118,7 +132,7 @@ MultiExplainOneQuery(Query *query, IntoClause *into, ExplainState *es,
INSTR_TIME_SET_CURRENT(planStart);
/* plan the query */
plan = pg_plan_query(query, 0, params);
plan = pg_plan_query(query, cursorOptions, params);
INSTR_TIME_SET_CURRENT(planDuration);
INSTR_TIME_SUBTRACT(planDuration, planStart);
@ -143,7 +157,7 @@ MultiExplainOneQuery(Query *query, IntoClause *into, ExplainState *es,
PG_TRY();
{
/* call standard planner to modify the query structure before multi planning */
initialPlan = standard_planner(query, 0, params);
initialPlan = standard_planner(query, cursorOptions, params);
commandType = initialPlan->commandType;
if (commandType == CMD_INSERT || commandType == CMD_UPDATE ||

View File

@ -17,6 +17,7 @@
#include "access/nbtree.h"
#include "access/heapam.h"
#include "access/htup_details.h"
#include "catalog/pg_am.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_join_order.h"
#include "distributed/multi_physical_planner.h"

View File

@ -39,6 +39,7 @@
#include "optimizer/clauses.h"
#include "optimizer/tlist.h"
#include "optimizer/var.h"
#include "parser/parse_agg.h"
#include "parser/parse_coerce.h"
#include "parser/parse_oper.h"
#include "utils/builtins.h"
@ -1273,6 +1274,7 @@ MasterExtendedOpNode(MultiExtendedOp *originalOpNode)
{
Node *newNode = MasterAggregateMutator((Node *) originalExpression,
walkerContext);
newExpression = (Expr *) newNode;
}
else
@ -1379,6 +1381,7 @@ MasterAggregateExpression(Aggref *originalAggregate,
const uint32 masterTableId = 1; /* one table on the master node */
const Index columnLevelsUp = 0; /* normal column */
const AttrNumber argumentId = 1; /* our aggregates have single arguments */
AggClauseCosts aggregateCosts;
if (aggregateType == AGGREGATE_COUNT && originalAggregate->aggdistinct &&
CountDistinctErrorRate == DISABLE_DISTINCT_APPROXIMATION &&
@ -1467,6 +1470,11 @@ MasterAggregateExpression(Aggref *originalAggregate,
unionAggregate->aggtype = hllType;
unionAggregate->args = list_make1(hllTargetEntry);
unionAggregate->aggkind = AGGKIND_NORMAL;
#if (PG_VERSION_NUM >= 90600)
unionAggregate->aggtranstype = InvalidOid;
unionAggregate->aggargtypes = list_make1_oid(unionAggregate->aggtype);
unionAggregate->aggsplit = AGGSPLIT_SIMPLE;
#endif
cardinalityExpression = makeNode(FuncExpr);
cardinalityExpression->funcid = cardinalityFunctionId;
@ -1526,6 +1534,11 @@ MasterAggregateExpression(Aggref *originalAggregate,
newMasterAggregate->aggdistinct = NULL;
newMasterAggregate->aggfnoid = sumFunctionId;
newMasterAggregate->aggtype = masterReturnType;
#if (PG_VERSION_NUM >= 90600)
newMasterAggregate->aggtranstype = InvalidOid;
newMasterAggregate->aggargtypes = list_make1_oid(newMasterAggregate->aggtype);
newMasterAggregate->aggsplit = AGGSPLIT_SIMPLE;
#endif
column = makeVar(masterTableId, walkerContext->columnId, workerReturnType,
workerReturnTypeMod, workerCollationId, columnLevelsUp);
@ -1590,6 +1603,11 @@ MasterAggregateExpression(Aggref *originalAggregate,
newMasterAggregate = copyObject(originalAggregate);
newMasterAggregate->aggfnoid = aggregateFunctionId;
newMasterAggregate->args = list_make1(arrayCatAggArgument);
#if (PG_VERSION_NUM >= 90600)
newMasterAggregate->aggtranstype = InvalidOid;
newMasterAggregate->aggargtypes = list_make1_oid(ANYARRAYOID);
newMasterAggregate->aggsplit = AGGSPLIT_SIMPLE;
#endif
newMasterExpression = (Expr *) newMasterAggregate;
}
@ -1640,6 +1658,16 @@ MasterAggregateExpression(Aggref *originalAggregate,
newMasterExpression = typeConvertedExpression;
}
/* Run AggRefs through cost machinery to mark required fields sanely */
memset(&aggregateCosts, 0, sizeof(aggregateCosts));
#if PG_VERSION_NUM >= 90600
get_agg_clause_costs(NULL, (Node *) newMasterExpression, AGGSPLIT_SIMPLE,
&aggregateCosts);
#else
count_agg_clauses(NULL, (Node *) newMasterExpression, &aggregateCosts);
#endif
return newMasterExpression;
}
@ -1682,6 +1710,11 @@ MasterAverageExpression(Oid sumAggregateType, Oid countAggregateType,
firstSum->aggtype = get_func_rettype(firstSum->aggfnoid);
firstSum->args = list_make1(firstTargetEntry);
firstSum->aggkind = AGGKIND_NORMAL;
#if (PG_VERSION_NUM >= 90600)
firstSum->aggtranstype = InvalidOid;
firstSum->aggargtypes = list_make1_oid(firstSum->aggtype);
firstSum->aggsplit = AGGSPLIT_SIMPLE;
#endif
/* create the second argument for sum(column2) */
secondColumn = makeVar(masterTableId, (*columnId), countAggregateType,
@ -1694,6 +1727,11 @@ MasterAverageExpression(Oid sumAggregateType, Oid countAggregateType,
secondSum->aggtype = get_func_rettype(secondSum->aggfnoid);
secondSum->args = list_make1(secondTargetEntry);
secondSum->aggkind = AGGKIND_NORMAL;
#if (PG_VERSION_NUM >= 90600)
secondSum->aggtranstype = InvalidOid;
secondSum->aggargtypes = list_make1_oid(firstSum->aggtype);
secondSum->aggsplit = AGGSPLIT_SIMPLE;
#endif
/*
* Build the division operator between these two aggregates. This function
@ -1798,6 +1836,7 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode)
if (hasAggregates)
{
WorkerAggregateWalker((Node *) originalExpression, walkerContext);
newExpressionList = walkerContext->expressionList;
}
else
@ -1961,6 +2000,7 @@ WorkerAggregateExpressionList(Aggref *originalAggregate,
{
AggregateType aggregateType = GetAggregateType(originalAggregate->aggfnoid);
List *workerAggregateList = NIL;
AggClauseCosts aggregateCosts;
if (aggregateType == AGGREGATE_COUNT && originalAggregate->aggdistinct &&
CountDistinctErrorRate == DISABLE_DISTINCT_APPROXIMATION &&
@ -2060,9 +2100,20 @@ WorkerAggregateExpressionList(Aggref *originalAggregate,
sumAggregate->aggfnoid = AggregateFunctionOid(sumAggregateName, argumentType);
sumAggregate->aggtype = get_func_rettype(sumAggregate->aggfnoid);
#if (PG_VERSION_NUM >= 90600)
sumAggregate->aggtranstype = InvalidOid;
sumAggregate->aggargtypes = list_make1_oid(argumentType);
sumAggregate->aggsplit = AGGSPLIT_SIMPLE;
#endif
/* count has any input type */
countAggregate->aggfnoid = AggregateFunctionOid(countAggregateName, ANYOID);
countAggregate->aggtype = get_func_rettype(countAggregate->aggfnoid);
#if (PG_VERSION_NUM >= 90600)
countAggregate->aggtranstype = InvalidOid;
countAggregate->aggargtypes = list_make1_oid(argumentType);
countAggregate->aggsplit = AGGSPLIT_SIMPLE;
#endif
workerAggregateList = lappend(workerAggregateList, sumAggregate);
workerAggregateList = lappend(workerAggregateList, countAggregate);
@ -2077,6 +2128,17 @@ WorkerAggregateExpressionList(Aggref *originalAggregate,
workerAggregateList = lappend(workerAggregateList, workerAggregate);
}
/* Run AggRefs through cost machinery to mark required fields sanely */
memset(&aggregateCosts, 0, sizeof(aggregateCosts));
#if PG_VERSION_NUM >= 90600
get_agg_clause_costs(NULL, (Node *) workerAggregateList, AGGSPLIT_SIMPLE,
&aggregateCosts);
#else
count_agg_clauses(NULL, (Node *) workerAggregateList, &aggregateCosts);
#endif
return workerAggregateList;
}
@ -2365,8 +2427,18 @@ ErrorIfContainsUnsupportedAggregate(MultiNode *logicalPlanNode)
MultiExtendedOp *extendedOpNode = (MultiExtendedOp *) linitial(opNodeList);
List *targetList = extendedOpNode->targetList;
#if (PG_VERSION_NUM >= 90600)
/*
* PVC_REJECT_PLACEHOLDERS is now implicit if PVC_INCLUDE_PLACEHOLDERS
* isn't specified.
*/
List *expressionList = pull_var_clause((Node *) targetList, PVC_INCLUDE_AGGREGATES);
#else
List *expressionList = pull_var_clause((Node *) targetList, PVC_INCLUDE_AGGREGATES,
PVC_REJECT_PLACEHOLDERS);
#endif
ListCell *expressionCell = NULL;
foreach(expressionCell, expressionList)

View File

@ -1612,8 +1612,18 @@ ExtractRangeTableEntryWalker(Node *node, List **rangeTableList)
List *
pull_var_clause_default(Node *node)
{
#if (PG_VERSION_NUM >= 90600)
/*
* PVC_REJECT_PLACEHOLDERS is now implicit if PVC_INCLUDE_PLACEHOLDERS
* isn't specified.
*/
List *columnList = pull_var_clause(node, PVC_RECURSE_AGGREGATES);
#else
List *columnList = pull_var_clause(node, PVC_RECURSE_AGGREGATES,
PVC_REJECT_PLACEHOLDERS);
#endif
return columnList;
}

View File

@ -140,9 +140,15 @@ BuildAggregatePlan(Query *masterQuery, Plan *subPlan)
havingQual = masterQuery->havingQual;
/* estimate aggregate execution costs */
MemSet(&aggregateCosts, 0, sizeof(AggClauseCosts));
memset(&aggregateCosts, 0, sizeof(AggClauseCosts));
#if (PG_VERSION_NUM >= 90600)
get_agg_clause_costs(NULL, (Node *) aggregateTargetList, AGGSPLIT_SIMPLE,
&aggregateCosts);
get_agg_clause_costs(NULL, (Node *) havingQual, AGGSPLIT_SIMPLE, &aggregateCosts);
#else
count_agg_clauses(NULL, (Node *) aggregateTargetList, &aggregateCosts);
count_agg_clauses(NULL, havingQual, &aggregateCosts);
count_agg_clauses(NULL, (Node *) havingQual, &aggregateCosts);
#endif
/*
* For upper level plans above the sequential scan, the planner expects the
@ -178,10 +184,17 @@ BuildAggregatePlan(Query *masterQuery, Plan *subPlan)
}
/* finally create the plan */
aggregatePlan = make_agg(NULL, aggregateTargetList, (List *) havingQual,
aggregateStrategy, &aggregateCosts, groupColumnCount,
groupColumnIdArray, groupColumnOpArray, NIL,
#if (PG_VERSION_NUM >= 90600)
aggregatePlan = make_agg(aggregateTargetList, (List *) havingQual, aggregateStrategy,
AGGSPLIT_SIMPLE, groupColumnCount, groupColumnIdArray,
groupColumnOpArray, NIL, NIL,
rowEstimate, subPlan);
#else
aggregatePlan = make_agg(NULL, aggregateTargetList, (List *) havingQual,
aggregateStrategy,
&aggregateCosts, groupColumnCount, groupColumnIdArray,
groupColumnOpArray, NIL, rowEstimate, subPlan);
#endif
return aggregatePlan;
}
@ -247,7 +260,11 @@ BuildSelectStatement(Query *masterQuery, char *masterTableName,
if (masterQuery->sortClause)
{
List *sortClauseList = masterQuery->sortClause;
#if (PG_VERSION_NUM >= 90600)
Sort *sortPlan = make_sort_from_sortclauses(sortClauseList, topLevelPlan);
#else
Sort *sortPlan = make_sort_from_sortclauses(NULL, sortClauseList, topLevelPlan);
#endif
topLevelPlan = (Plan *) sortPlan;
}
@ -256,11 +273,15 @@ BuildSelectStatement(Query *masterQuery, char *masterTableName,
{
Node *limitCount = masterQuery->limitCount;
Node *limitOffset = masterQuery->limitOffset;
#if (PG_VERSION_NUM >= 90600)
Limit *limitPlan = make_limit(topLevelPlan, limitOffset, limitCount);
#else
int64 offsetEstimate = 0;
int64 countEstimate = 0;
Limit *limitPlan = make_limit(topLevelPlan, limitOffset, limitCount,
offsetEstimate, countEstimate);
#endif
topLevelPlan = (Plan *) limitPlan;
}

View File

@ -535,10 +535,21 @@ MasterIrreducibleExpressionWalker(Node *expression, WalkerState *state)
* Once you've added them to this check, make sure you also evaluate them in the
* executor!
*/
StaticAssertStmt(PG_VERSION_NUM < 90600, "When porting to a newer PG this section"
StaticAssertStmt(PG_VERSION_NUM < 90700, "When porting to a newer PG this section"
" needs to be reviewed.");
if (IsA(expression, Aggref))
{
Aggref *expr = (Aggref *) expression;
if (IsA(expression, OpExpr))
volatileFlag = func_volatile(expr->aggfnoid);
}
else if (IsA(expression, WindowFunc))
{
WindowFunc *expr = (WindowFunc *) expression;
volatileFlag = func_volatile(expr->winfnoid);
}
else if (IsA(expression, OpExpr))
{
OpExpr *expr = (OpExpr *) expression;

View File

@ -18,6 +18,7 @@
#include "commands/explain.h"
#include "executor/executor.h"
#include "distributed/citus_nodefuncs.h"
#include "distributed/commit_protocol.h"
#include "distributed/master_protocol.h"
#include "distributed/multi_copy.h"
@ -129,6 +130,9 @@ _PG_init(void)
*/
RegisterCitusConfigVariables();
/* make our additional node types known */
RegisterNodes();
/* intercept planner */
planner_hook = multi_planner;

View File

@ -89,9 +89,15 @@ FakeGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid)
Cost startup_cost = 0;
Cost total_cost = startup_cost + baserel->rows;
#if (PG_VERSION_NUM >= 90600)
add_path(baserel, (Path *) create_foreignscan_path(root, baserel, NULL, baserel->rows,
startup_cost, total_cost, NIL,
NULL, NULL, NIL));
#else
add_path(baserel, (Path *) create_foreignscan_path(root, baserel, baserel->rows,
startup_cost, total_cost, NIL,
NULL, NULL, NIL));
#endif
}

View File

@ -11,9 +11,31 @@
#include "postgres.h"
#include "catalog/pg_type.h"
#include "distributed/citus_nodes.h"
#include "distributed/citus_nodefuncs.h"
#include "distributed/metadata_cache.h"
static const char *CitusNodeTagNamesD[] = {
"MultiNode",
"MultiTreeRoot",
"MultiProject",
"MultiCollect",
"MultiSelect",
"MultiTable",
"MultiJoin",
"MultiPartition",
"MultiCartesianProduct",
"MultiExtendedOp",
"Job",
"MapMergeJob",
"MultiPlan",
"Task",
"ShardInterval",
"ShardPlacement"
};
const char **CitusNodeTagNames = CitusNodeTagNamesD;
/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(citus_extradata_container);
@ -307,3 +329,84 @@ citus_extradata_container(PG_FUNCTION_ARGS)
PG_RETURN_NULL();
}
#if (PG_VERSION_NUM >= 90600)
static void
CopyUnsupportedCitusNode(struct ExtensibleNode *newnode,
const struct ExtensibleNode *oldnode)
{
ereport(ERROR, (errmsg("not implemented")));
}
static bool
EqualUnsupportedCitusNode(const struct ExtensibleNode *a,
const struct ExtensibleNode *b)
{
ereport(ERROR, (errmsg("not implemented")));
}
/* *INDENT-OFF* */
#define DEFINE_NODE_METHODS(type) \
{ \
#type, \
sizeof(type), \
CopyUnsupportedCitusNode, \
EqualUnsupportedCitusNode, \
Out##type, \
Read##type \
}
#define DEFINE_NODE_METHODS_NO_READ(type) \
{ \
#type, \
sizeof(type), \
CopyUnsupportedCitusNode, \
EqualUnsupportedCitusNode, \
Out##type, \
ReadUnsupportedCitusNode \
}
/* *INDENT-ON* */
const ExtensibleNodeMethods nodeMethods[] =
{
DEFINE_NODE_METHODS(MultiPlan),
DEFINE_NODE_METHODS(Job),
DEFINE_NODE_METHODS(ShardInterval),
DEFINE_NODE_METHODS(MapMergeJob),
DEFINE_NODE_METHODS(ShardPlacement),
DEFINE_NODE_METHODS(Task),
/* nodes with only output support */
DEFINE_NODE_METHODS_NO_READ(MultiNode),
DEFINE_NODE_METHODS_NO_READ(MultiTreeRoot),
DEFINE_NODE_METHODS_NO_READ(MultiProject),
DEFINE_NODE_METHODS_NO_READ(MultiCollect),
DEFINE_NODE_METHODS_NO_READ(MultiSelect),
DEFINE_NODE_METHODS_NO_READ(MultiTable),
DEFINE_NODE_METHODS_NO_READ(MultiJoin),
DEFINE_NODE_METHODS_NO_READ(MultiPartition),
DEFINE_NODE_METHODS_NO_READ(MultiCartesianProduct),
DEFINE_NODE_METHODS_NO_READ(MultiExtendedOp)
};
#endif
void
RegisterNodes(void)
{
#if (PG_VERSION_NUM >= 90600)
int off;
StaticAssertExpr(lengthof(nodeMethods) == lengthof(CitusNodeTagNamesD),
"number of node methods and names do not match");
for (off = 0; off < lengthof(nodeMethods); off++)
{
RegisterExtensibleNodeMethods(&nodeMethods[off]);
}
#endif
}

View File

@ -38,10 +38,21 @@
* routine.
*/
/* Store const reference to raw input node in local named 'node' */
#define WRITE_LOCALS(nodeTypeName) \
const nodeTypeName *node = (const nodeTypeName *) raw_node
/* Write the label for the node type */
#if (PG_VERSION_NUM >= 90600)
#define WRITE_NODE_TYPE(nodelabel) \
(void) 0
#else
#define WRITE_NODE_TYPE(nodelabel) \
appendStringInfoString(str, nodelabel)
#endif
/* Write an integer field (anything written as ":fldname %d") */
#define WRITE_INT_FIELD(fldname) \
appendStringInfo(str, " :" CppAsString(fldname) " %d", node->fldname)
@ -83,7 +94,7 @@
/* Write a character-string (possibly NULL) field */
#define WRITE_STRING_FIELD(fldname) \
(appendStringInfo(str, " :" CppAsString(fldname) " "), \
_outToken(str, node->fldname))
outToken(str, node->fldname))
/* Write a parse location field (actually same as INT case) */
#define WRITE_LOCATION_FIELD(fldname) \
@ -92,7 +103,7 @@
/* Write a Node field */
#define WRITE_NODE_FIELD(fldname) \
(appendStringInfo(str, " :" CppAsString(fldname) " "), \
_outNode(str, node->fldname))
outNode(str, node->fldname))
/* Write a bitmapset field */
#define WRITE_BITMAPSET_FIELD(fldname) \
@ -102,18 +113,18 @@
#define booltostr(x) ((x) ? "true" : "false")
static void _outNode(StringInfo str, const void *obj);
#if (PG_VERSION_NUM < 90600)
static void outNode(StringInfo str, const void *obj);
/*
* _outToken
* outToken
* Convert an ordinary string (eg, an identifier) into a form that
* will be decoded back to a plain token by read.c's functions.
*
* If a null or empty string is given, it is encoded as "<>".
*/
static void
_outToken(StringInfo str, const char *s)
outToken(StringInfo str, const char *s)
{
if (s == NULL || *s == '\0')
{
@ -166,7 +177,7 @@ _outList(StringInfo str, const List *node)
*/
if (IsA(node, List))
{
_outNode(str, lfirst(lc));
outNode(str, lfirst(lc));
if (lnext(lc))
appendStringInfoChar(str, ' ');
}
@ -187,7 +198,7 @@ _outList(StringInfo str, const List *node)
* Print the value of a Datum given its type.
*/
static void
_outDatum(StringInfo str, Datum value, int typlen, bool typbyval)
outDatum(StringInfo str, Datum value, int typlen, bool typbyval)
{
Size length,
i;
@ -218,38 +229,49 @@ _outDatum(StringInfo str, Datum value, int typlen, bool typbyval)
}
}
#endif
/*****************************************************************************
* Output routines for Citus node types
*****************************************************************************/
static void
_outMultiUnaryNode(StringInfo str, const MultiUnaryNode *node)
OutMultiUnaryNodeFields(StringInfo str, const MultiUnaryNode *node)
{
WRITE_NODE_FIELD(childNode);
}
static void
_outMultiBinaryNode(StringInfo str, const MultiBinaryNode *node)
OutMultiBinaryNodeFields(StringInfo str, const MultiBinaryNode *node)
{
WRITE_NODE_FIELD(leftChildNode);
WRITE_NODE_FIELD(rightChildNode);
}
static void
_outMultiTreeRoot(StringInfo str, const MultiTreeRoot *node)
void
OutMultiNode(OUTFUNC_ARGS)
{
WRITE_NODE_TYPE("MULTITREEROOT");
_outMultiUnaryNode(str, (const MultiUnaryNode *) node);
WRITE_NODE_TYPE("MULTINODE");
}
static void
_outMultiPlan(StringInfo str, const MultiPlan *node)
void
OutMultiTreeRoot(OUTFUNC_ARGS)
{
WRITE_LOCALS(MultiTreeRoot);
WRITE_NODE_TYPE("MULTITREEROOT");
OutMultiUnaryNodeFields(str, (const MultiUnaryNode *) node);
}
void
OutMultiPlan(OUTFUNC_ARGS)
{
WRITE_LOCALS(MultiPlan);
WRITE_NODE_TYPE("MULTIPLAN");
WRITE_NODE_FIELD(workerJob);
@ -258,87 +280,95 @@ _outMultiPlan(StringInfo str, const MultiPlan *node)
}
static void
_outMultiProject(StringInfo str, const MultiProject *node)
void
OutMultiProject(OUTFUNC_ARGS)
{
WRITE_LOCALS(MultiProject);
WRITE_NODE_TYPE("MULTIPROJECT");
WRITE_NODE_FIELD(columnList);
_outMultiUnaryNode(str, (const MultiUnaryNode *) node);
OutMultiUnaryNodeFields(str, (const MultiUnaryNode *) node);
}
static void
_outMultiCollect(StringInfo str, const MultiCollect *node)
void
OutMultiCollect(OUTFUNC_ARGS)
{
WRITE_LOCALS(MultiCollect);
WRITE_NODE_TYPE("MULTICOLLECT");
_outMultiUnaryNode(str, (const MultiUnaryNode *) node);
OutMultiUnaryNodeFields(str, (const MultiUnaryNode *) node);
}
static void
_outMultiSelect(StringInfo str, const MultiSelect *node)
void
OutMultiSelect(OUTFUNC_ARGS)
{
WRITE_LOCALS(MultiSelect);
WRITE_NODE_TYPE("MULTISELECT");
WRITE_NODE_FIELD(selectClauseList);
_outMultiUnaryNode(str, (const MultiUnaryNode *) node);
OutMultiUnaryNodeFields(str, (const MultiUnaryNode *) node);
}
static void
_outMultiTable(StringInfo str, const MultiTable *node)
void
OutMultiTable(OUTFUNC_ARGS)
{
WRITE_LOCALS(MultiTable);
WRITE_NODE_TYPE("MULTITABLE");
WRITE_OID_FIELD(relationId);
WRITE_INT_FIELD(rangeTableId);
_outMultiUnaryNode(str, (const MultiUnaryNode *) node);
OutMultiUnaryNodeFields(str, (const MultiUnaryNode *) node);
}
static void
_outMultiJoin(StringInfo str, const MultiJoin *node)
void
OutMultiJoin(OUTFUNC_ARGS)
{
WRITE_LOCALS(MultiJoin);
WRITE_NODE_TYPE("MULTIJOIN");
WRITE_NODE_FIELD(joinClauseList);
WRITE_ENUM_FIELD(joinRuleType, JoinRuleType);
WRITE_ENUM_FIELD(joinType, JoinType);
_outMultiBinaryNode(str, (const MultiBinaryNode *) node);
OutMultiBinaryNodeFields(str, (const MultiBinaryNode *) node);
}
static void
_outMultiPartition(StringInfo str, const MultiPartition *node)
void
OutMultiPartition(OUTFUNC_ARGS)
{
WRITE_LOCALS(MultiPartition);
WRITE_NODE_TYPE("MULTIPARTITION");
WRITE_NODE_FIELD(partitionColumn);
_outMultiUnaryNode(str, (const MultiUnaryNode *) node);
OutMultiUnaryNodeFields(str, (const MultiUnaryNode *) node);
}
static void
_outMultiCartesianProduct(StringInfo str, const MultiCartesianProduct *node)
void
OutMultiCartesianProduct(OUTFUNC_ARGS)
{
WRITE_LOCALS(MultiCartesianProduct);
WRITE_NODE_TYPE("MULTICARTESIANPRODUCT");
_outMultiBinaryNode(str, (const MultiBinaryNode *) node);
OutMultiBinaryNodeFields(str, (const MultiBinaryNode *) node);
}
static void
_outMultiExtendedOp(StringInfo str, const MultiExtendedOp *node)
void
OutMultiExtendedOp(OUTFUNC_ARGS)
{
WRITE_LOCALS(MultiExtendedOp);
WRITE_NODE_TYPE("MULTIEXTENDEDOP");
WRITE_NODE_FIELD(targetList);
@ -348,11 +378,11 @@ _outMultiExtendedOp(StringInfo str, const MultiExtendedOp *node)
WRITE_NODE_FIELD(limitOffset);
WRITE_NODE_FIELD(havingQual);
_outMultiUnaryNode(str, (const MultiUnaryNode *) node);
OutMultiUnaryNodeFields(str, (const MultiUnaryNode *) node);
}
static void
_outJobInfo(StringInfo str, const Job *node)
OutJobFields(StringInfo str, const Job *node)
{
WRITE_UINT64_FIELD(jobId);
WRITE_NODE_FIELD(jobQuery);
@ -362,18 +392,20 @@ _outJobInfo(StringInfo str, const Job *node)
}
static void
_outJob(StringInfo str, const Job *node)
void
OutJob(OUTFUNC_ARGS)
{
WRITE_LOCALS(Job);
WRITE_NODE_TYPE("JOB");
_outJobInfo(str, node);
OutJobFields(str, node);
}
static void
_outShardInterval(StringInfo str, const ShardInterval *node)
void
OutShardInterval(OUTFUNC_ARGS)
{
WRITE_LOCALS(ShardInterval);
WRITE_NODE_TYPE("SHARDINTERVAL");
WRITE_OID_FIELD(relationId);
@ -388,27 +420,28 @@ _outShardInterval(StringInfo str, const ShardInterval *node)
if (!node->minValueExists)
appendStringInfoString(str, "<>");
else
_outDatum(str, node->minValue, node->valueTypeLen, node->valueByVal);
outDatum(str, node->minValue, node->valueTypeLen, node->valueByVal);
appendStringInfoString(str, " :maxValue ");
if (!node->maxValueExists)
appendStringInfoString(str, "<>");
else
_outDatum(str, node->maxValue, node->valueTypeLen, node->valueByVal);
outDatum(str, node->maxValue, node->valueTypeLen, node->valueByVal);
WRITE_UINT64_FIELD(shardId);
}
static void
_outMapMergeJob(StringInfo str, const MapMergeJob *node)
void
OutMapMergeJob(OUTFUNC_ARGS)
{
WRITE_LOCALS(MapMergeJob);
int arrayLength = node->sortedShardIntervalArrayLength;
int i;
WRITE_NODE_TYPE("MAPMERGEJOB");
_outJobInfo(str, (Job *) node);
OutJobFields(str, (Job *) node);
WRITE_NODE_FIELD(reduceQuery);
WRITE_ENUM_FIELD(partitionType, PartitionType);
WRITE_NODE_FIELD(partitionColumn);
@ -417,9 +450,7 @@ _outMapMergeJob(StringInfo str, const MapMergeJob *node)
for (i = 0; i < arrayLength; ++i)
{
ShardInterval *writeElement = node->sortedShardIntervalArray[i];
_outShardInterval(str, writeElement);
outNode(str, node->sortedShardIntervalArray[i]);
}
WRITE_NODE_FIELD(mapTaskList);
@ -427,9 +458,10 @@ _outMapMergeJob(StringInfo str, const MapMergeJob *node)
}
static void
_outShardPlacement(StringInfo str, const ShardPlacement *node)
void
OutShardPlacement(OUTFUNC_ARGS)
{
WRITE_LOCALS(ShardPlacement);
WRITE_NODE_TYPE("SHARDPLACEMENT");
WRITE_UINT64_FIELD(placementId);
@ -441,9 +473,10 @@ _outShardPlacement(StringInfo str, const ShardPlacement *node)
}
static void
_outTask(StringInfo str, const Task *node)
void
OutTask(OUTFUNC_ARGS)
{
WRITE_LOCALS(Task);
WRITE_NODE_TYPE("TASK");
WRITE_ENUM_FIELD(taskType, TaskType);
@ -462,13 +495,14 @@ _outTask(StringInfo str, const Task *node)
WRITE_BOOL_FIELD(requiresMasterEvaluation);
}
#if (PG_VERSION_NUM < 90600)
/*
* _outNode -
* outNode -
* converts a Node into ascii string and append it to 'str'
*/
static void
_outNode(StringInfo str, const void *obj)
outNode(StringInfo str, const void *obj)
{
if (obj == NULL)
{
@ -487,91 +521,91 @@ _outNode(StringInfo str, const void *obj)
case T_MultiTreeRoot:
appendStringInfoChar(str, '{');
_outMultiTreeRoot(str, obj);
OutMultiTreeRoot(str, obj);
appendStringInfoChar(str, '}');
break;
case T_MultiProject:
appendStringInfoChar(str, '{');
_outMultiProject(str, obj);
OutMultiProject(str, obj);
appendStringInfoChar(str, '}');
break;
case T_MultiCollect:
appendStringInfoChar(str, '{');
_outMultiCollect(str, obj);
OutMultiCollect(str, obj);
appendStringInfoChar(str, '}');
break;
case T_MultiSelect:
appendStringInfoChar(str, '{');
_outMultiSelect(str, obj);
OutMultiSelect(str, obj);
appendStringInfoChar(str, '}');
break;
case T_MultiTable:
appendStringInfoChar(str, '{');
_outMultiTable(str, obj);
OutMultiTable(str, obj);
appendStringInfoChar(str, '}');
break;
case T_MultiJoin:
appendStringInfoChar(str, '{');
_outMultiJoin(str, obj);
OutMultiJoin(str, obj);
appendStringInfoChar(str, '}');
break;
case T_MultiPartition:
appendStringInfoChar(str, '{');
_outMultiPartition(str, obj);
OutMultiPartition(str, obj);
appendStringInfoChar(str, '}');
break;
case T_MultiCartesianProduct:
appendStringInfoChar(str, '{');
_outMultiCartesianProduct(str, obj);
OutMultiCartesianProduct(str, obj);
appendStringInfoChar(str, '}');
break;
case T_MultiExtendedOp:
appendStringInfoChar(str, '{');
_outMultiExtendedOp(str, obj);
OutMultiExtendedOp(str, obj);
appendStringInfoChar(str, '}');
break;
case T_Job:
appendStringInfoChar(str, '{');
_outJob(str, obj);
OutJob(str, obj);
appendStringInfoChar(str, '}');
break;
case T_MapMergeJob:
appendStringInfoChar(str, '{');
_outMapMergeJob(str, obj);
OutMapMergeJob(str, obj);
appendStringInfoChar(str, '}');
break;
case T_MultiPlan:
appendStringInfoChar(str, '{');
_outMultiPlan(str, obj);
OutMultiPlan(str, obj);
appendStringInfoChar(str, '}');
break;
case T_Task:
appendStringInfoChar(str, '{');
_outTask(str, obj);
OutTask(str, obj);
appendStringInfoChar(str, '}');
break;
case T_ShardInterval:
appendStringInfoChar(str, '{');
_outShardInterval(str, obj);
OutShardInterval(str, obj);
appendStringInfoChar(str, '}');
break;
case T_ShardPlacement:
appendStringInfoChar(str, '{');
_outShardPlacement(str, obj);
OutShardPlacement(str, obj);
appendStringInfoChar(str, '}');
break;
@ -581,6 +615,7 @@ _outNode(StringInfo str, const void *obj)
}
}
#endif
/*
* CitusNodeToString -
@ -589,9 +624,13 @@ _outNode(StringInfo str, const void *obj)
char *
CitusNodeToString(const void *obj)
{
#if (PG_VERSION_NUM >= 90600)
return nodeToString(obj);
#else
StringInfoData str;
initStringInfo(&str);
_outNode(&str, obj);
outNode(&str, obj);
return str.data;
#endif
}

View File

@ -29,6 +29,35 @@
#include "nodes/value.h"
/*
* For 9.6 onwards, we use 9.6's extensible node system, thus there's no need
* to copy various routines anymore. In that case, replace these functions
* with plain wrappers.
*/
#if (PG_VERSION_NUM >= 90600)
void *
CitusStringToNode(char *str)
{
return stringToNode(str);
}
char *
citus_pg_strtok(int *length)
{
return pg_strtok(length);
}
void *
CitusNodeRead(char *token, int tok_len)
{
return nodeRead(token, tok_len);
}
#else
/* Static state for citus_pg_strtok */
static char *citus_pg_strtok_ptr = NULL;
@ -63,7 +92,7 @@ CitusStringToNode(char *str)
/*
* citus_pg_strtok is a copy of postgres' pg_strtok routine, referencing
* citus_pg_strtok_ptr instead of pg_strtok_ptr as state.
*/
*/
char *
citus_pg_strtok(int *length)
{
@ -346,3 +375,5 @@ CitusNodeRead(char *token, int tok_len)
return (void *) result;
}
#endif /* (PG_VERSION_NUM < 90600) */

View File

@ -0,0 +1,383 @@
/*-------------------------------------------------------------------------
*
* citus_readfuncs.c
* Citus specific node functions
*
* Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
* Portions Copyright (c) 2012-2015, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <math.h>
#include "distributed/citus_nodefuncs.h"
#include "nodes/parsenodes.h"
#include "nodes/readfuncs.h"
/*
* Macros to simplify reading of different kinds of fields. Use these
* wherever possible to reduce the chance for silly typos. Note that these
* hard-wire conventions about the names of the local variables in a Read
* routine.
*/
/* Macros for declaring appropriate local variables */
/* A few guys need only local_node */
#if (PG_VERSION_NUM >= 90600)
static inline Node *
CitusSetTag(Node *node, int tag)
{
CitusNode *citus_node = (CitusNode *) node;
citus_node->citus_tag = tag;
return node;
}
/* *INDENT-OFF* */
#define READ_LOCALS_NO_FIELDS(nodeTypeName) \
nodeTypeName *local_node = (nodeTypeName *) CitusSetTag((Node *) node, T_##nodeTypeName)
#else
#define READ_LOCALS_NO_FIELDS(nodeTypeName) \
nodeTypeName *local_node = CitusMakeNode(nodeTypeName)
#endif
/* And a few guys need only the citus_pg_strtok support fields */
#define READ_TEMP_LOCALS() \
char *token; \
int length
/* ... but most need both */
#define READ_LOCALS(nodeTypeName) \
READ_LOCALS_NO_FIELDS(nodeTypeName); \
READ_TEMP_LOCALS()
/* Read an integer field (anything written as ":fldname %d") */
#define READ_INT_FIELD(fldname) \
token = citus_pg_strtok(&length); /* skip :fldname */ \
token = citus_pg_strtok(&length); /* get field value */ \
local_node->fldname = atoi(token)
/* Read an unsigned integer field (anything written as ":fldname %u") */
#define READ_UINT_FIELD(fldname) \
token = citus_pg_strtok(&length); /* skip :fldname */ \
token = citus_pg_strtok(&length); /* get field value */ \
local_node->fldname = atoui(token)
/* XXX: CITUS Read an uint64 field (anything written as ":fldname %u") */
#define READ_UINT64_FIELD(fldname) \
token = citus_pg_strtok(&length); /* skip :fldname */ \
token = citus_pg_strtok(&length); /* get field value */ \
local_node->fldname = atoull(token)
/* Read an OID field (don't hard-wire assumption that OID is same as uint) */
#define READ_OID_FIELD(fldname) \
token = citus_pg_strtok(&length); /* skip :fldname */ \
token = citus_pg_strtok(&length); /* get field value */ \
local_node->fldname = atooid(token)
/* Read a char field (ie, one ascii character) */
#define READ_CHAR_FIELD(fldname) \
token = citus_pg_strtok(&length); /* skip :fldname */ \
token = citus_pg_strtok(&length); /* get field value */ \
local_node->fldname = token[0]
/* Read an enumerated-type field that was written as an integer code */
#define READ_ENUM_FIELD(fldname, enumtype) \
token = citus_pg_strtok(&length); /* skip :fldname */ \
token = citus_pg_strtok(&length); /* get field value */ \
local_node->fldname = (enumtype) atoi(token)
/* Read a float field */
#define READ_FLOAT_FIELD(fldname) \
token = citus_pg_strtok(&length); /* skip :fldname */ \
token = citus_pg_strtok(&length); /* get field value */ \
local_node->fldname = atof(token)
/* Read a boolean field */
#define READ_BOOL_FIELD(fldname) \
token = citus_pg_strtok(&length); /* skip :fldname */ \
token = citus_pg_strtok(&length); /* get field value */ \
local_node->fldname = strtobool(token)
/* Read a character-string field */
#define READ_STRING_FIELD(fldname) \
token = citus_pg_strtok(&length); /* skip :fldname */ \
token = citus_pg_strtok(&length); /* get field value */ \
local_node->fldname = nullable_string(token, length)
/* Read a parse location field (and throw away the value, per notes above) */
#define READ_LOCATION_FIELD(fldname) \
token = citus_pg_strtok(&length); /* skip :fldname */ \
token = citus_pg_strtok(&length); /* get field value */ \
(void) token; /* in case not used elsewhere */ \
local_node->fldname = -1 /* set field to "unknown" */
/* Read a Node field XXX: Citus: replaced call to nodeRead with CitusNodeRead */
#define READ_NODE_FIELD(fldname) \
token = citus_pg_strtok(&length); /* skip :fldname */ \
(void) token; /* in case not used elsewhere */ \
local_node->fldname = CitusNodeRead(NULL, 0)
/* Routine exit */
#if (PG_VERSION_NUM >= 90600)
#define READ_DONE() \
return;
#else
#define READ_DONE() \
return (Node *) local_node
#endif
/*
* NOTE: use atoi() to read values written with %d, or atoui() to read
* values written with %u in outfuncs.c. An exception is OID values,
* for which use atooid(). (As of 7.1, outfuncs.c writes OIDs as %u,
* but this will probably change in the future.)
*/
#define atoui(x) ((unsigned int) strtoul((x), NULL, 10))
#define atooid(x) ((Oid) strtoul((x), NULL, 10))
/* XXX: Citus */
#define atoull(x) ((uint64) strtoull((x), NULL, 10))
#define strtobool(x) ((*(x) == 't') ? true : false)
#define nullable_string(token,length) \
((length) == 0 ? NULL : debackslash(token, length))
static void
readJobInfo(Job *local_node)
{
READ_TEMP_LOCALS();
READ_UINT64_FIELD(jobId);
READ_NODE_FIELD(jobQuery);
READ_NODE_FIELD(taskList);
READ_NODE_FIELD(dependedJobList);
READ_BOOL_FIELD(subqueryPushdown);
}
READFUNC_RET
ReadJob(READFUNC_ARGS)
{
READ_LOCALS_NO_FIELDS(Job);
readJobInfo(local_node);
READ_DONE();
}
READFUNC_RET
ReadMultiPlan(READFUNC_ARGS)
{
READ_LOCALS(MultiPlan);
READ_NODE_FIELD(workerJob);
READ_NODE_FIELD(masterQuery);
READ_STRING_FIELD(masterTableName);
READ_DONE();
}
READFUNC_RET
ReadShardInterval(READFUNC_ARGS)
{
READ_LOCALS(ShardInterval);
READ_OID_FIELD(relationId);
READ_CHAR_FIELD(storageType);
READ_OID_FIELD(valueTypeId);
READ_INT_FIELD(valueTypeLen);
READ_BOOL_FIELD(valueByVal);
READ_BOOL_FIELD(minValueExists);
READ_BOOL_FIELD(maxValueExists);
token = citus_pg_strtok(&length); /* skip :minValue */
if (!local_node->minValueExists)
token = citus_pg_strtok(&length); /* skip "<>" */
else
local_node->minValue = readDatum(local_node->valueByVal);
token = citus_pg_strtok(&length); /* skip :maxValue */
if (!local_node->minValueExists)
token = citus_pg_strtok(&length); /* skip "<>" */
else
local_node->maxValue = readDatum(local_node->valueByVal);
READ_UINT64_FIELD(shardId);
READ_DONE();
}
READFUNC_RET
ReadMapMergeJob(READFUNC_ARGS)
{
int arrayLength;
int i;
READ_LOCALS(MapMergeJob);
readJobInfo(&local_node->job);
READ_NODE_FIELD(reduceQuery);
READ_ENUM_FIELD(partitionType, PartitionType);
READ_NODE_FIELD(partitionColumn);
READ_UINT_FIELD(partitionCount);
READ_INT_FIELD(sortedShardIntervalArrayLength);
arrayLength = local_node->sortedShardIntervalArrayLength;
/* now build & read sortedShardIntervalArray */
local_node->sortedShardIntervalArray =
(ShardInterval**) palloc(arrayLength * sizeof(ShardInterval *));
for (i = 0; i < arrayLength; ++i)
{
/* can't use READ_NODE_FIELD, no field names */
local_node->sortedShardIntervalArray[i] = CitusNodeRead(NULL, 0);
}
READ_NODE_FIELD(mapTaskList);
READ_NODE_FIELD(mergeTaskList);
READ_DONE();
}
READFUNC_RET
ReadShardPlacement(READFUNC_ARGS)
{
READ_LOCALS(ShardPlacement);
READ_OID_FIELD(placementId);
READ_UINT64_FIELD(shardId);
READ_UINT64_FIELD(shardLength);
READ_ENUM_FIELD(shardState, RelayFileState);
READ_STRING_FIELD(nodeName);
READ_UINT_FIELD(nodePort);
READ_DONE();
}
READFUNC_RET
ReadTask(READFUNC_ARGS)
{
READ_LOCALS(Task);
READ_ENUM_FIELD(taskType, TaskType);
READ_UINT64_FIELD(jobId);
READ_UINT_FIELD(taskId);
READ_STRING_FIELD(queryString);
READ_UINT64_FIELD(anchorShardId);
READ_NODE_FIELD(taskPlacementList);
READ_NODE_FIELD(dependedTaskList);
READ_UINT_FIELD(partitionId);
READ_UINT_FIELD(upstreamTaskId);
READ_NODE_FIELD(shardInterval);
READ_BOOL_FIELD(assignmentConstrained);
READ_NODE_FIELD(taskExecution);
READ_BOOL_FIELD(upsertQuery);
READ_BOOL_FIELD(requiresMasterEvaluation);
READ_DONE();
}
READFUNC_RET
ReadUnsupportedCitusNode(READFUNC_ARGS)
{
ereport(ERROR, (errmsg("not implemented")));
}
#if (PG_VERSION_NUM < 90600)
/*
* readDatum
*
* Given a string representation of a constant, recreate the appropriate
* Datum. The string representation embeds length info, but not byValue,
* so we must be told that.
*/
Datum
readDatum(bool typbyval)
{
Size length,
i;
int tokenLength;
char *token;
Datum res;
char *s;
/*
* read the actual length of the value
*/
token = citus_pg_strtok(&tokenLength);
length = atoui(token);
token = citus_pg_strtok(&tokenLength); /* read the '[' */
if (token == NULL || token[0] != '[')
elog(ERROR, "expected \"[\" to start datum, but got \"%s\"; length = %zu",
token ? (const char *) token : "[NULL]", length);
if (typbyval)
{
if (length > (Size) sizeof(Datum))
elog(ERROR, "byval datum but length = %zu", length);
res = (Datum) 0;
s = (char *) (&res);
for (i = 0; i < (Size) sizeof(Datum); i++)
{
token = citus_pg_strtok(&tokenLength);
s[i] = (char) atoi(token);
}
}
else if (length <= 0)
res = (Datum) NULL;
else
{
s = (char *) palloc(length);
for (i = 0; i < length; i++)
{
token = citus_pg_strtok(&tokenLength);
s[i] = (char) atoi(token);
}
res = PointerGetDatum(s);
}
token = citus_pg_strtok(&tokenLength); /* read the ']' */
if (token == NULL || token[0] != ']')
elog(ERROR, "expected \"]\" to end datum, but got \"%s\"; length = %zu",
token ? (const char *) token : "[NULL]", length);
return res;
}
#endif
#if (PG_VERSION_NUM >= 90600)
/* *INDENT-ON* */
/*
* For 9.6+ we can just use the, now extensible, parseNodeString(). Before
* that citus_readfuncs_$ver.c has a version specific implementation.
*/
Node *
CitusParseNodeString(void)
{
return parseNodeString();
}
#endif

View File

@ -144,8 +144,6 @@
((length) == 0 ? NULL : debackslash(token, length))
static Datum readDatum(bool typbyval);
/*
* _readBitmapset
*/
@ -1368,216 +1366,6 @@ _readTableSampleClause(void)
}
/* XXX: BEGIN Citus Nodes */
static void
_readJobInfo(Job *local_node)
{
READ_TEMP_LOCALS();
READ_UINT64_FIELD(jobId);
READ_NODE_FIELD(jobQuery);
READ_NODE_FIELD(taskList);
READ_NODE_FIELD(dependedJobList);
READ_BOOL_FIELD(subqueryPushdown);
}
static Job *
_readJob(void)
{
READ_LOCALS_NO_FIELDS(Job);
_readJobInfo(local_node);
READ_DONE();
}
static MultiPlan *
_readMultiPlan(void)
{
READ_LOCALS(MultiPlan);
READ_NODE_FIELD(workerJob);
READ_NODE_FIELD(masterQuery);
READ_STRING_FIELD(masterTableName);
READ_DONE();
}
static ShardInterval *
_readShardInterval(void)
{
READ_LOCALS(ShardInterval);
READ_OID_FIELD(relationId);
READ_CHAR_FIELD(storageType);
READ_OID_FIELD(valueTypeId);
READ_INT_FIELD(valueTypeLen);
READ_BOOL_FIELD(valueByVal);
READ_BOOL_FIELD(minValueExists);
READ_BOOL_FIELD(maxValueExists);
token = citus_pg_strtok(&length); /* skip :minValue */
if (!local_node->minValueExists)
token = citus_pg_strtok(&length); /* skip "<>" */
else
local_node->minValue = readDatum(local_node->valueByVal);
token = citus_pg_strtok(&length); /* skip :maxValue */
if (!local_node->minValueExists)
token = citus_pg_strtok(&length); /* skip "<>" */
else
local_node->maxValue = readDatum(local_node->valueByVal);
READ_UINT64_FIELD(shardId);
READ_DONE();
}
static MapMergeJob *
_readMapMergeJob(void)
{
int arrayLength;
int i;
READ_LOCALS(MapMergeJob);
_readJobInfo(&local_node->job);
READ_NODE_FIELD(reduceQuery);
READ_ENUM_FIELD(partitionType, PartitionType);
READ_NODE_FIELD(partitionColumn);
READ_UINT_FIELD(partitionCount);
READ_INT_FIELD(sortedShardIntervalArrayLength);
arrayLength = local_node->sortedShardIntervalArrayLength;
/* now build & read sortedShardIntervalArray */
local_node->sortedShardIntervalArray =
(ShardInterval**) palloc(arrayLength * sizeof(ShardInterval *));
for (i = 0; i < arrayLength; ++i)
{
local_node->sortedShardIntervalArray[i] = _readShardInterval();
}
READ_NODE_FIELD(mapTaskList);
READ_NODE_FIELD(mergeTaskList);
READ_DONE();
}
static ShardPlacement *
_readShardPlacement(void)
{
READ_LOCALS(ShardPlacement);
READ_UINT64_FIELD(placementId);
READ_UINT64_FIELD(shardId);
READ_UINT64_FIELD(shardLength);
READ_ENUM_FIELD(shardState, RelayFileState);
READ_STRING_FIELD(nodeName);
READ_UINT_FIELD(nodePort);
READ_DONE();
}
static Task *
_readTask(void)
{
READ_LOCALS(Task);
READ_ENUM_FIELD(taskType, TaskType);
READ_UINT64_FIELD(jobId);
READ_UINT_FIELD(taskId);
READ_STRING_FIELD(queryString);
READ_UINT64_FIELD(anchorShardId);
READ_NODE_FIELD(taskPlacementList);
READ_NODE_FIELD(dependedTaskList);
READ_UINT_FIELD(partitionId);
READ_UINT_FIELD(upstreamTaskId);
READ_NODE_FIELD(shardInterval);
READ_BOOL_FIELD(assignmentConstrained);
READ_NODE_FIELD(taskExecution);
READ_BOOL_FIELD(upsertQuery);
READ_BOOL_FIELD(requiresMasterEvaluation);
READ_DONE();
}
/* XXX: END Citus Nodes */
/*
* readDatum
*
* Given a string representation of a constant, recreate the appropriate
* Datum. The string representation embeds length info, but not byValue,
* so we must be told that.
*/
static Datum
readDatum(bool typbyval)
{
Size length,
i;
int tokenLength;
char *token;
Datum res;
char *s;
/*
* read the actual length of the value
*/
token = citus_pg_strtok(&tokenLength);
length = atoui(token);
token = citus_pg_strtok(&tokenLength); /* read the '[' */
if (token == NULL || token[0] != '[')
elog(ERROR, "expected \"[\" to start datum, but got \"%s\"; length = %zu",
token ? (const char *) token : "[NULL]", length);
if (typbyval)
{
if (length > (Size) sizeof(Datum))
elog(ERROR, "byval datum but length = %zu", length);
res = (Datum) 0;
s = (char *) (&res);
for (i = 0; i < (Size) sizeof(Datum); i++)
{
token = citus_pg_strtok(&tokenLength);
s[i] = (char) atoi(token);
}
}
else if (length <= 0)
res = (Datum) NULL;
else
{
s = (char *) palloc(length);
for (i = 0; i < length; i++)
{
token = citus_pg_strtok(&tokenLength);
s[i] = (char) atoi(token);
}
res = PointerGetDatum(s);
}
token = citus_pg_strtok(&tokenLength); /* read the ']' */
if (token == NULL || token[0] != ']')
elog(ERROR, "expected \"]\" to end datum, but got \"%s\"; length = %zu",
token ? (const char *) token : "[NULL]", length);
return res;
}
/*
* parseNodeString
*
@ -1718,17 +1506,17 @@ CitusParseNodeString(void)
return_value = _readDeclareCursorStmt();
/* XXX: BEGIN Citus Nodes */
else if (MATCH("MULTIPLAN", 9))
return_value = _readMultiPlan();
return_value = ReadMultiPlan();
else if (MATCH("JOB", 3))
return_value = _readJob();
return_value = ReadJob();
else if (MATCH("SHARDINTERVAL", 13))
return_value = _readShardInterval();
return_value = ReadShardInterval();
else if (MATCH("MAPMERGEJOB", 11))
return_value = _readMapMergeJob();
return_value = ReadMapMergeJob();
else if (MATCH("SHARDPLACEMENT", 14))
return_value = _readShardPlacement();
return_value = ReadShardPlacement();
else if (MATCH("TASK", 4))
return_value = _readTask();
return_value = ReadTask();
/* XXX: END Citus Nodes */
else
{

View File

@ -18,6 +18,7 @@
#include "access/xact.h"
#include "access/sysattr.h"
#include "catalog/indexing.h"
#include "catalog/pg_am.h"
#include "catalog/pg_extension.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_type.h"

File diff suppressed because it is too large Load Diff

View File

@ -25,6 +25,7 @@
#include "access/hash.h"
#include "access/htup_details.h"
#include "access/nbtree.h"
#include "catalog/pg_am.h"
#include "catalog/pg_collation.h"
#include "commands/copy.h"
#include "commands/defrem.h"

View File

@ -37,6 +37,55 @@ extern void * CitusNodeRead(char *token, int tok_len);
/* citus_readfuncs.c */
extern Node * CitusParseNodeString(void);
extern Datum readDatum(bool typbyval);
extern void RegisterNodes(void);
/*
* Define read functions for citus nodes in a way they're usable across
* several major versions. That requires some macro-uglyness as 9.6+ is quite
* different from before.
*/
#if (PG_VERSION_NUM >= 90600)
#define READFUNC_ARGS struct ExtensibleNode *node
#define READFUNC_RET void
#else
#define READFUNC_ARGS void
#define READFUNC_RET Node *
#endif
#if (PG_VERSION_NUM >= 90600)
#define OUTFUNC_ARGS StringInfo str, const struct ExtensibleNode *raw_node
#else
#define OUTFUNC_ARGS StringInfo str, const Node *raw_node
#endif
extern READFUNC_RET ReadJob(READFUNC_ARGS);
extern READFUNC_RET ReadMultiPlan(READFUNC_ARGS);
extern READFUNC_RET ReadShardInterval(READFUNC_ARGS);
extern READFUNC_RET ReadMapMergeJob(READFUNC_ARGS);
extern READFUNC_RET ReadShardPlacement(READFUNC_ARGS);
extern READFUNC_RET ReadTask(READFUNC_ARGS);
extern READFUNC_RET ReadUnsupportedCitusNode(READFUNC_ARGS);
extern void OutJob(OUTFUNC_ARGS);
extern void OutMultiPlan(OUTFUNC_ARGS);
extern void OutShardInterval(OUTFUNC_ARGS);
extern void OutMapMergeJob(OUTFUNC_ARGS);
extern void OutShardPlacement(OUTFUNC_ARGS);
extern void OutTask(OUTFUNC_ARGS);
extern void OutMultiNode(OUTFUNC_ARGS);
extern void OutMultiTreeRoot(OUTFUNC_ARGS);
extern void OutMultiProject(OUTFUNC_ARGS);
extern void OutMultiCollect(OUTFUNC_ARGS);
extern void OutMultiSelect(OUTFUNC_ARGS);
extern void OutMultiTable(OUTFUNC_ARGS);
extern void OutMultiJoin(OUTFUNC_ARGS);
extern void OutMultiPartition(OUTFUNC_ARGS);
extern void OutMultiCartesianProduct(OUTFUNC_ARGS);
extern void OutMultiExtendedOp(OUTFUNC_ARGS);
#endif /* CITUS_NODEFUNCS_H */

View File

@ -3,6 +3,26 @@
* citus_nodes.h
* Additional node types, and related infrastructure, for Citus.
*
* To add a new node type to Citus, perform the following:
*
* * Add a new CitusNodeTag value to use as a tag for the node. Add
* the node's name at a corresponding offset within the array named
* CitusNodeTagNamesD at the top of citus_nodefuncs.c
*
* * Describe the node in a struct, which must have a CitusNode as
* its first element
*
* * Implement an 'outfunc' for the node in citus_outfuncs.c, using
* the macros defined within that file. This function will handle
* converting the node to a string
*
* * Implement a 'readfunc' for the node in citus_readfuncs.c, using
* the macros defined within that file. This function will handle
* converting strings into instances of the node
*
* * Use DEFINE_NODE_METHODS within the nodeMethods array (near the
* bottom of citus_nodefuncs.c) to register the node in PostgreSQL
*
* Copyright (c) 2012-2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
@ -17,9 +37,10 @@
*
* These have to be distinct from the ideas used in postgres' nodes.h
*/
#define CITUS_NODE_TAG_START 1200
typedef enum CitusNodeTag
{
T_MultiNode = 1200, /* FIXME: perhaps use something less predicable? */
T_MultiNode = CITUS_NODE_TAG_START, /* FIXME: perhaps use something less predicable? */
T_MultiTreeRoot,
T_MultiProject,
T_MultiCollect,
@ -38,6 +59,46 @@ typedef enum CitusNodeTag
} CitusNodeTag;
const char** CitusNodeTagNames;
#if (PG_VERSION_NUM >= 90600)
#include "nodes/extensible.h"
typedef struct CitusNode
{
ExtensibleNode extensible;
CitusNodeTag citus_tag; /* for quick type determination */
} CitusNode;
#define CitusNodeTag(nodeptr) CitusNodeTagI((Node*) nodeptr)
static inline int
CitusNodeTagI(Node *node)
{
if (!IsA(node, ExtensibleNode))
{
return nodeTag(node);
}
return ((CitusNode*)(node))->citus_tag;
}
/* Citus variant of newNode(), don't use directly. */
#define CitusNewNode(size, tag) \
({ CitusNode *_result; \
AssertMacro((size) >= sizeof(CitusNode)); /* need the tag, at least */ \
_result = (CitusNode *) palloc0fast(size); \
_result->extensible.type = T_ExtensibleNode; \
_result->extensible.extnodename = CitusNodeTagNames[tag - CITUS_NODE_TAG_START]; \
_result->citus_tag =(int) (tag); \
_result; \
})
#else
typedef CitusNodeTag CitusNode;
/*
* nodeTag equivalent that returns the node tag for both citus and postgres
* node tag types. Needs to return int as there's no type that covers both
@ -46,12 +107,6 @@ typedef enum CitusNodeTag
#define CitusNodeTag(nodeptr) (*((const int*)(nodeptr)))
/*
* IsA equivalent that compares node tags as integers, not as enum values.
*/
#define CitusIsA(nodeptr,_type_) (CitusNodeTag(nodeptr) == T_##_type_)
/* Citus variant of newNode(), don't use directly. */
#define CitusNewNode(size, tag) \
({ Node *_result; \
@ -61,6 +116,13 @@ typedef enum CitusNodeTag
_result; \
})
#endif
/*
* IsA equivalent that compares node tags, including Citus-specific nodes.
*/
#define CitusIsA(nodeptr,_type_) (CitusNodeTag(nodeptr) == T_##_type_)
/*
* CitusMakeNode is Citus variant of makeNode(). Use it to create nodes of

View File

@ -29,7 +29,7 @@
/* In-memory representation of a typed tuple in pg_dist_shard. */
typedef struct ShardInterval
{
CitusNodeTag type;
CitusNode type;
Oid relationId;
char storageType;
Oid valueTypeId; /* min/max value datum's typeId */
@ -46,7 +46,7 @@ typedef struct ShardInterval
/* In-memory representation of a tuple in pg_dist_shard_placement. */
typedef struct ShardPlacement
{
CitusNodeTag type;
CitusNode type;
uint64 placementId; /* sequence that implies this placement creation order */
uint64 shardId;
uint64 shardLength;

View File

@ -17,9 +17,15 @@
#define EXEC_FLAG_CITUS_MASTER_SELECT 0x100
#define EXEC_FLAG_CITUS_ROUTER_EXECUTOR 0x200
#if (PG_VERSION_NUM >= 90600)
#define tuplecount_t uint64
#else
#define tuplecount_t long
#endif
extern void multi_ExecutorStart(QueryDesc *queryDesc, int eflags);
extern void multi_ExecutorRun(QueryDesc *queryDesc,
ScanDirection direction, long count);
ScanDirection direction, tuplecount_t count);
extern void multi_ExecutorFinish(QueryDesc *queryDesc);
extern void multi_ExecutorEnd(QueryDesc *queryDesc);

View File

@ -35,7 +35,7 @@
*/
typedef struct MultiNode
{
CitusNodeTag type;
CitusNode type;
struct MultiNode *parentNode;

View File

@ -114,7 +114,7 @@ typedef enum
*/
typedef struct Job
{
CitusNodeTag type;
CitusNode type;
uint64 jobId;
Query *jobQuery;
List *taskList;
@ -152,7 +152,7 @@ typedef struct TaskExecution TaskExecution;
typedef struct Task
{
CitusNodeTag type;
CitusNode type;
TaskType taskType;
uint64 jobId;
uint32 taskId;
@ -201,7 +201,7 @@ typedef struct JoinSequenceNode
*/
typedef struct MultiPlan
{
CitusNodeTag type;
CitusNode type;
Job *workerJob;
Query *masterQuery;
char *masterTableName;

View File

@ -3,6 +3,11 @@
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 520000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 520000;
CREATE OR REPLACE FUNCTION array_sort (ANYARRAY)
RETURNS ANYARRAY LANGUAGE SQL
AS $$
SELECT ARRAY(SELECT unnest($1) ORDER BY 1)
$$;
-- Check multi_cat_agg() aggregate which is used to implement array_agg()
SELECT array_cat_agg(i) FROM (VALUES (ARRAY[1,2]), (NULL), (ARRAY[3,4])) AS t(i);
array_cat_agg
@ -18,68 +23,68 @@ ERROR: array_agg with order by is unsupported
SELECT array_agg(distinct l_orderkey ORDER BY l_orderkey) FROM lineitem;
ERROR: array_agg with order by is unsupported
-- Check array_agg() for different data types and LIMIT clauses
SELECT array_agg(l_partkey) FROM lineitem GROUP BY l_orderkey
SELECT array_sort(array_agg(l_partkey)) FROM lineitem GROUP BY l_orderkey
ORDER BY l_orderkey LIMIT 10;
array_agg
array_sort
--------------------------------------------------
{155190,67310,63700,2132,24027,15635}
{2132,15635,24027,63700,67310,155190}
{106170}
{4297,19036,128449,29380,183095,62143}
{4297,19036,29380,62143,128449,183095}
{88035}
{108570,123927,37531}
{37531,108570,123927}
{139636}
{182052,145243,94780,163073,151894,79251,157238}
{82704,197921,44161,2743,85811,11615}
{61336,60519,137469,33918}
{79251,94780,145243,151894,157238,163073,182052}
{2743,11615,44161,82704,85811,197921}
{33918,60519,61336,137469}
{88362,89414,169544}
(10 rows)
SELECT array_agg(l_extendedprice) FROM lineitem GROUP BY l_orderkey
SELECT array_sort(array_agg(l_extendedprice)) FROM lineitem GROUP BY l_orderkey
ORDER BY l_orderkey LIMIT 10;
array_agg
array_sort
-----------------------------------------------------------------
{21168.23,45983.16,13309.60,28955.64,22824.48,49620.16}
{13309.60,21168.23,22824.48,28955.64,45983.16,49620.16}
{44694.46}
{54058.05,46796.47,39890.88,2618.76,32986.52,28733.64}
{2618.76,28733.64,32986.52,39890.88,46796.47,54058.05}
{30690.90}
{23678.55,50723.92,73426.50}
{61998.31}
{13608.60,11594.16,81639.88,31809.96,73943.82,43058.75,6476.15}
{47227.60,64605.44,2210.32,6582.96,79059.64,9159.66}
{40217.23,47344.32,7532.30,75928.31}
{17554.68,30875.02,9681.24}
{6476.15,11594.16,13608.60,31809.96,43058.75,73943.82,81639.88}
{2210.32,6582.96,9159.66,47227.60,64605.44,79059.64}
{7532.30,40217.23,47344.32,75928.31}
{9681.24,17554.68,30875.02}
(10 rows)
SELECT array_agg(l_shipdate) FROM lineitem GROUP BY l_orderkey
SELECT array_sort(array_agg(l_shipdate)) FROM lineitem GROUP BY l_orderkey
ORDER BY l_orderkey LIMIT 10;
array_agg
array_sort
--------------------------------------------------------------------------------
{03-13-1996,04-12-1996,01-29-1996,04-21-1996,03-30-1996,01-30-1996}
{01-29-1996,01-30-1996,03-13-1996,03-30-1996,04-12-1996,04-21-1996}
{01-28-1997}
{02-02-1994,11-09-1993,01-16-1994,12-04-1993,12-14-1993,10-29-1993}
{10-29-1993,11-09-1993,12-04-1993,12-14-1993,01-16-1994,02-02-1994}
{01-10-1996}
{10-31-1994,10-16-1994,08-08-1994}
{08-08-1994,10-16-1994,10-31-1994}
{04-27-1992}
{05-07-1996,02-01-1996,01-15-1996,03-21-1996,02-11-1996,01-16-1996,02-10-1996}
{10-23-1995,08-14-1995,08-07-1995,08-04-1995,08-28-1995,07-21-1995}
{10-29-1993,12-09-1993,12-09-1993,11-09-1993}
{10-23-1998,10-09-1998,10-30-1998}
{01-15-1996,01-16-1996,02-01-1996,02-10-1996,02-11-1996,03-21-1996,05-07-1996}
{07-21-1995,08-04-1995,08-07-1995,08-14-1995,08-28-1995,10-23-1995}
{10-29-1993,11-09-1993,12-09-1993,12-09-1993}
{10-09-1998,10-23-1998,10-30-1998}
(10 rows)
SELECT array_agg(l_shipmode) FROM lineitem GROUP BY l_orderkey
SELECT array_sort(array_agg(l_shipmode)) FROM lineitem GROUP BY l_orderkey
ORDER BY l_orderkey LIMIT 10;
array_agg
array_sort
----------------------------------------------------------------------------------------------
{"TRUCK ","MAIL ","REG AIR ","AIR ","FOB ","MAIL "}
{"AIR ","FOB ","MAIL ","MAIL ","REG AIR ","TRUCK "}
{"RAIL "}
{"AIR ","RAIL ","SHIP ","TRUCK ","FOB ","RAIL "}
{"AIR ","FOB ","RAIL ","RAIL ","SHIP ","TRUCK "}
{"REG AIR "}
{"AIR ","FOB ","AIR "}
{"AIR ","AIR ","FOB "}
{"TRUCK "}
{"FOB ","SHIP ","MAIL ","FOB ","TRUCK ","FOB ","FOB "}
{"TRUCK ","AIR ","AIR ","REG AIR ","AIR ","RAIL "}
{"TRUCK ","MAIL ","AIR ","MAIL "}
{"REG AIR ","FOB ","FOB "}
{"FOB ","FOB ","FOB ","FOB ","MAIL ","SHIP ","TRUCK "}
{"AIR ","AIR ","AIR ","RAIL ","REG AIR ","TRUCK "}
{"AIR ","MAIL ","MAIL ","TRUCK "}
{"FOB ","FOB ","REG AIR "}
(10 rows)
-- Check that we can execute array_agg() within other functions
@ -93,10 +98,10 @@ SELECT array_length(array_agg(l_orderkey), 1) FROM lineitem;
-- shards and contain different aggregates, filter clauses and other complex
-- expressions. Note that the l_orderkey ranges are such that the matching rows
-- lie in different shards.
SELECT l_quantity, count(*), avg(l_extendedprice), array_agg(l_orderkey) FROM lineitem
SELECT l_quantity, count(*), avg(l_extendedprice), array_sort(array_agg(l_orderkey)) FROM lineitem
WHERE l_quantity < 5 AND l_orderkey > 5500 AND l_orderkey < 9500
GROUP BY l_quantity ORDER BY l_quantity;
l_quantity | count | avg | array_agg
l_quantity | count | avg | array_sort
------------+-------+-----------------------+--------------------------------------------------------------------------------------------------
1.00 | 17 | 1477.1258823529411765 | {5543,5633,5634,5698,5766,5856,5857,5986,8997,9026,9158,9184,9220,9222,9348,9383,9476}
2.00 | 19 | 3078.4242105263157895 | {5506,5540,5573,5669,5703,5730,5798,5831,5893,5920,5923,9030,9058,9123,9124,9188,9344,9441,9476}
@ -104,21 +109,21 @@ SELECT l_quantity, count(*), avg(l_extendedprice), array_agg(l_orderkey) FROM li
4.00 | 19 | 5929.7136842105263158 | {5504,5507,5508,5511,5538,5764,5766,5826,5829,5862,5959,5985,9091,9120,9281,9347,9382,9440,9473}
(4 rows)
SELECT l_quantity, array_agg(extract (month FROM o_orderdate)) AS my_month
SELECT l_quantity, array_sort(array_agg(extract (month FROM o_orderdate))) AS my_month
FROM lineitem, orders WHERE l_orderkey = o_orderkey AND l_quantity < 5
AND l_orderkey > 5500 AND l_orderkey < 9500 GROUP BY l_quantity ORDER BY l_quantity;
l_quantity | my_month
------------+------------------------------------------------
1.00 | {9,5,7,5,9,11,11,4,7,7,4,7,4,2,6,3,5}
2.00 | {11,10,8,5,5,12,3,11,7,11,5,7,6,6,10,1,12,6,5}
3.00 | {4,9,8,11,7,10,6,7,8,5,8,9,11,3}
4.00 | {1,5,6,11,12,10,9,6,1,2,5,1,11,6,2,8,2,6,10}
1.00 | {2,3,4,4,4,5,5,5,6,7,7,7,7,9,9,11,11}
2.00 | {1,3,5,5,5,5,6,6,6,7,7,8,10,10,11,11,11,12,12}
3.00 | {3,4,5,6,7,7,8,8,8,9,9,10,11,11}
4.00 | {1,1,1,2,2,2,5,5,6,6,6,6,8,9,10,10,11,11,12}
(4 rows)
SELECT l_quantity, array_agg(l_orderkey * 2 + 1) FROM lineitem WHERE l_quantity < 5
SELECT l_quantity, array_sort(array_agg(l_orderkey * 2 + 1)) FROM lineitem WHERE l_quantity < 5
AND octet_length(l_comment) + octet_length('randomtext'::text) > 40
AND l_orderkey > 5500 AND l_orderkey < 9500 GROUP BY l_quantity ORDER BY l_quantity;
l_quantity | array_agg
l_quantity | array_sort
------------+---------------------------------------------
1.00 | {11269,11397,11713,11715,11973,18317,18445}
2.00 | {11847,18061,18247,18953}

View File

@ -3,6 +3,13 @@
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 570000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 570000;
-- print major version to make version-specific tests clear
SELECT substring(version(), '\d+\.\d+') AS major_version;
major_version
---------------
9.6
(1 row)
\a\t
SET citus.task_executor_type TO 'real-time';
SET citus.explain_distributed_queries TO on;
@ -43,7 +50,7 @@ Distributed Query into pg_merge_job_570000
-> Seq Scan on lineitem_290001 lineitem
Master Query
-> Sort
Sort Key: COALESCE((sum((COALESCE((sum(intermediate_column_570000_1))::bigint, '0'::bigint))))::bigint, '0'::bigint), intermediate_column_570000_0
Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(intermediate_column_570000_1))::bigint, '0'::bigint))))::bigint, '0'::bigint), intermediate_column_570000_0
-> HashAggregate
Group Key: intermediate_column_570000_0
-> Seq Scan on pg_merge_job_570000
@ -66,11 +73,14 @@ EXPLAIN (COSTS FALSE, FORMAT JSON)
"Plan": {
"Node Type": "Aggregate",
"Strategy": "Hashed",
"Partial Mode": "Simple",
"Parallel Aware": false,
"Group Key": ["l_quantity"],
"Plans": [
{
"Node Type": "Seq Scan",
"Parent Relationship": "Outer",
"Parallel Aware": false,
"Relation Name": "lineitem_290001",
"Alias": "lineitem"
}
@ -87,17 +97,21 @@ EXPLAIN (COSTS FALSE, FORMAT JSON)
{
"Plan": {
"Node Type": "Sort",
"Sort Key": ["COALESCE((sum((COALESCE((sum(intermediate_column_570001_1))::bigint, '0'::bigint))))::bigint, '0'::bigint)", "intermediate_column_570001_0"],
"Parallel Aware": false,
"Sort Key": ["COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(intermediate_column_570001_1))::bigint, '0'::bigint))))::bigint, '0'::bigint)", "intermediate_column_570001_0"],
"Plans": [
{
"Node Type": "Aggregate",
"Strategy": "Hashed",
"Partial Mode": "Simple",
"Parent Relationship": "Outer",
"Parallel Aware": false,
"Group Key": ["intermediate_column_570001_0"],
"Plans": [
{
"Node Type": "Seq Scan",
"Parent Relationship": "Outer",
"Parallel Aware": false,
"Relation Name": "pg_merge_job_570001",
"Alias": "pg_merge_job_570001"
}
@ -133,6 +147,8 @@ EXPLAIN (COSTS FALSE, FORMAT XML)
<Plan>
<Node-Type>Aggregate</Node-Type>
<Strategy>Hashed</Strategy>
<Partial-Mode>Simple</Partial-Mode>
<Parallel-Aware>false</Parallel-Aware>
<Group-Key>
<Item>l_quantity</Item>
</Group-Key>
@ -140,6 +156,7 @@ EXPLAIN (COSTS FALSE, FORMAT XML)
<Plan>
<Node-Type>Seq Scan</Node-Type>
<Parent-Relationship>Outer</Parent-Relationship>
<Parallel-Aware>false</Parallel-Aware>
<Relation-Name>lineitem_290001</Relation-Name>
<Alias>lineitem</Alias>
</Plan>
@ -155,15 +172,18 @@ EXPLAIN (COSTS FALSE, FORMAT XML)
<Query>
<Plan>
<Node-Type>Sort</Node-Type>
<Parallel-Aware>false</Parallel-Aware>
<Sort-Key>
<Item>COALESCE((sum((COALESCE((sum(intermediate_column_570003_1))::bigint, '0'::bigint))))::bigint, '0'::bigint)</Item>
<Item>COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(intermediate_column_570003_1))::bigint, '0'::bigint))))::bigint, '0'::bigint)</Item>
<Item>intermediate_column_570003_0</Item>
</Sort-Key>
<Plans>
<Plan>
<Node-Type>Aggregate</Node-Type>
<Strategy>Hashed</Strategy>
<Partial-Mode>Simple</Partial-Mode>
<Parent-Relationship>Outer</Parent-Relationship>
<Parallel-Aware>false</Parallel-Aware>
<Group-Key>
<Item>intermediate_column_570003_0</Item>
</Group-Key>
@ -171,6 +191,7 @@ EXPLAIN (COSTS FALSE, FORMAT XML)
<Plan>
<Node-Type>Seq Scan</Node-Type>
<Parent-Relationship>Outer</Parent-Relationship>
<Parallel-Aware>false</Parallel-Aware>
<Relation-Name>pg_merge_job_570003</Relation-Name>
<Alias>pg_merge_job_570003</Alias>
</Plan>
@ -201,29 +222,36 @@ EXPLAIN (COSTS FALSE, FORMAT YAML)
- Plan:
Node Type: "Aggregate"
Strategy: "Hashed"
Partial Mode: "Simple"
Parallel Aware: false
Group Key:
- "l_quantity"
Plans:
- Node Type: "Seq Scan"
Parent Relationship: "Outer"
Parallel Aware: false
Relation Name: "lineitem_290001"
Alias: "lineitem"
Master Query:
- Plan:
Node Type: "Sort"
Parallel Aware: false
Sort Key:
- "COALESCE((sum((COALESCE((sum(intermediate_column_570005_1))::bigint, '0'::bigint))))::bigint, '0'::bigint)"
- "COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(intermediate_column_570005_1))::bigint, '0'::bigint))))::bigint, '0'::bigint)"
- "intermediate_column_570005_0"
Plans:
- Node Type: "Aggregate"
Strategy: "Hashed"
Partial Mode: "Simple"
Parent Relationship: "Outer"
Parallel Aware: false
Group Key:
- "intermediate_column_570005_0"
Plans:
- Node Type: "Seq Scan"
Parent Relationship: "Outer"
Parallel Aware: false
Relation Name: "pg_merge_job_570005"
Alias: "pg_merge_job_570005"
-- Test Text format
@ -241,7 +269,7 @@ Distributed Query into pg_merge_job_570006
-> Seq Scan on lineitem_290001 lineitem
Master Query
-> Sort
Sort Key: COALESCE((sum((COALESCE((sum(intermediate_column_570006_1))::bigint, '0'::bigint))))::bigint, '0'::bigint), intermediate_column_570006_0
Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(intermediate_column_570006_1))::bigint, '0'::bigint))))::bigint, '0'::bigint), intermediate_column_570006_0
-> HashAggregate
Group Key: intermediate_column_570006_0
-> Seq Scan on pg_merge_job_570006
@ -260,7 +288,7 @@ Distributed Query into pg_merge_job_570007
Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment
Master Query
-> Aggregate
Output: (sum(intermediate_column_570007_0) / (sum(intermediate_column_570007_1) / sum(intermediate_column_570007_2)))
Output: (sum(intermediate_column_570007_0) / (sum(intermediate_column_570007_1) / pg_catalog.sum(intermediate_column_570007_2)))
-> Seq Scan on pg_temp_2.pg_merge_job_570007
Output: intermediate_column_570007_0, intermediate_column_570007_1, intermediate_column_570007_2
-- Test join
@ -380,7 +408,7 @@ Distributed Query into pg_merge_job_570013
Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment
Master Query
-> Aggregate
Output: (sum(intermediate_column_570013_0) / (sum(intermediate_column_570013_1) / sum(intermediate_column_570013_2)))
Output: (sum(intermediate_column_570013_0) / (sum(intermediate_column_570013_1) / pg_catalog.sum(intermediate_column_570013_2)))
Filter: (sum(pg_merge_job_570013.intermediate_column_570013_3) > '100'::numeric)
-> Seq Scan on pg_temp_2.pg_merge_job_570013
Output: intermediate_column_570013_0, intermediate_column_570013_1, intermediate_column_570013_2, intermediate_column_570013_3
@ -512,10 +540,13 @@ EXPLAIN (COSTS FALSE, FORMAT JSON)
"Plan": {
"Node Type": "Aggregate",
"Strategy": "Plain",
"Partial Mode": "Simple",
"Parallel Aware": false,
"Plans": [
{
"Node Type": "Seq Scan",
"Parent Relationship": "Outer",
"Parallel Aware": false,
"Relation Name": "pg_merge_job_570024",
"Alias": "pg_merge_job_570024"
}
@ -562,10 +593,13 @@ EXPLAIN (COSTS FALSE, FORMAT XML)
<Plan>
<Node-Type>Aggregate</Node-Type>
<Strategy>Plain</Strategy>
<Partial-Mode>Simple</Partial-Mode>
<Parallel-Aware>false</Parallel-Aware>
<Plans>
<Plan>
<Node-Type>Seq Scan</Node-Type>
<Parent-Relationship>Outer</Parent-Relationship>
<Parallel-Aware>false</Parallel-Aware>
<Relation-Name>pg_merge_job_570030</Relation-Name>
<Alias>pg_merge_job_570030</Alias>
</Plan>
@ -602,8 +636,37 @@ EXPLAIN (COSTS FALSE, FORMAT YAML)
- Plan:
Node Type: "Aggregate"
Strategy: "Plain"
Partial Mode: "Simple"
Parallel Aware: false
Plans:
- Node Type: "Seq Scan"
Parent Relationship: "Outer"
Parallel Aware: false
Relation Name: "pg_merge_job_570036"
Alias: "pg_merge_job_570036"
-- test parallel aggregates
SET parallel_setup_cost=0;
SET parallel_tuple_cost=0;
SET min_parallel_relation_size=0;
SET max_parallel_workers_per_gather=4;
-- ensure local plans display correctly
CREATE TABLE lineitem_clone (LIKE lineitem);
EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem_clone;
Finalize Aggregate
-> Gather
Workers Planned: 3
-> Partial Aggregate
-> Parallel Seq Scan on lineitem_clone
-- ensure distributed plans don't break
EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem;
Distributed Query into pg_merge_job_570037
Executor: Task-Tracker
Task Count: 8
Tasks Shown: One of 8
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_290001 lineitem
Master Query
-> Aggregate
-> Seq Scan on pg_merge_job_570037

View File

@ -0,0 +1,643 @@
--
-- MULTI_EXPLAIN
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 570000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 570000;
-- print major version to make version-specific tests clear
SELECT substring(version(), '\d+\.\d+') AS major_version;
major_version
---------------
9.5
(1 row)
\a\t
SET citus.task_executor_type TO 'real-time';
SET citus.explain_distributed_queries TO on;
-- Function that parses explain output as JSON
CREATE FUNCTION explain_json(query text)
RETURNS jsonb
AS $BODY$
DECLARE
result jsonb;
BEGIN
EXECUTE format('EXPLAIN (FORMAT JSON) %s', query) INTO result;
RETURN result;
END;
$BODY$ LANGUAGE plpgsql;
-- Function that parses explain output as XML
CREATE FUNCTION explain_xml(query text)
RETURNS xml
AS $BODY$
DECLARE
result xml;
BEGIN
EXECUTE format('EXPLAIN (FORMAT XML) %s', query) INTO result;
RETURN result;
END;
$BODY$ LANGUAGE plpgsql;
-- Test Text format
EXPLAIN (COSTS FALSE, FORMAT TEXT)
SELECT l_quantity, count(*) count_quantity FROM lineitem
GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
Distributed Query into pg_merge_job_570000
Executor: Real-Time
Task Count: 8
Tasks Shown: One of 8
-> Task
Node: host=localhost port=57637 dbname=regression
-> HashAggregate
Group Key: l_quantity
-> Seq Scan on lineitem_290001 lineitem
Master Query
-> Sort
Sort Key: COALESCE((sum((COALESCE((sum(intermediate_column_570000_1))::bigint, '0'::bigint))))::bigint, '0'::bigint), intermediate_column_570000_0
-> HashAggregate
Group Key: intermediate_column_570000_0
-> Seq Scan on pg_merge_job_570000
-- Test JSON format
EXPLAIN (COSTS FALSE, FORMAT JSON)
SELECT l_quantity, count(*) count_quantity FROM lineitem
GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
[
{
"Executor": "Real-Time",
"Job": {
"Task Count": 8,
"Tasks Shown": "One of 8",
"Tasks": [
{
"Node": "host=localhost port=57637 dbname=regression",
"Remote Plan": [
[
{
"Plan": {
"Node Type": "Aggregate",
"Strategy": "Hashed",
"Group Key": ["l_quantity"],
"Plans": [
{
"Node Type": "Seq Scan",
"Parent Relationship": "Outer",
"Relation Name": "lineitem_290001",
"Alias": "lineitem"
}
]
}
}
]
]
}
]
},
"Master Query": [
{
"Plan": {
"Node Type": "Sort",
"Sort Key": ["COALESCE((sum((COALESCE((sum(intermediate_column_570001_1))::bigint, '0'::bigint))))::bigint, '0'::bigint)", "intermediate_column_570001_0"],
"Plans": [
{
"Node Type": "Aggregate",
"Strategy": "Hashed",
"Parent Relationship": "Outer",
"Group Key": ["intermediate_column_570001_0"],
"Plans": [
{
"Node Type": "Seq Scan",
"Parent Relationship": "Outer",
"Relation Name": "pg_merge_job_570001",
"Alias": "pg_merge_job_570001"
}
]
}
]
}
}
]
}
]
-- Validate JSON format
SELECT true AS valid FROM explain_json($$
SELECT l_quantity, count(*) count_quantity FROM lineitem
GROUP BY l_quantity ORDER BY count_quantity, l_quantity$$);
t
-- Test XML format
EXPLAIN (COSTS FALSE, FORMAT XML)
SELECT l_quantity, count(*) count_quantity FROM lineitem
GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
<explain xmlns="http://www.postgresql.org/2009/explain">
<Distributed-Query>
<Executor>Real-Time</Executor>
<Job>
<Task-Count>8</Task-Count>
<Tasks-Shown>One of 8</Tasks-Shown>
<Tasks>
<Task>
<Node>host=localhost port=57637 dbname=regression</Node>
<Remote-Plan>
<explain xmlns="http://www.postgresql.org/2009/explain">
<Query>
<Plan>
<Node-Type>Aggregate</Node-Type>
<Strategy>Hashed</Strategy>
<Group-Key>
<Item>l_quantity</Item>
</Group-Key>
<Plans>
<Plan>
<Node-Type>Seq Scan</Node-Type>
<Parent-Relationship>Outer</Parent-Relationship>
<Relation-Name>lineitem_290001</Relation-Name>
<Alias>lineitem</Alias>
</Plan>
</Plans>
</Plan>
</Query>
</explain>
</Remote-Plan>
</Task>
</Tasks>
</Job>
<Master-Query>
<Query>
<Plan>
<Node-Type>Sort</Node-Type>
<Sort-Key>
<Item>COALESCE((sum((COALESCE((sum(intermediate_column_570003_1))::bigint, '0'::bigint))))::bigint, '0'::bigint)</Item>
<Item>intermediate_column_570003_0</Item>
</Sort-Key>
<Plans>
<Plan>
<Node-Type>Aggregate</Node-Type>
<Strategy>Hashed</Strategy>
<Parent-Relationship>Outer</Parent-Relationship>
<Group-Key>
<Item>intermediate_column_570003_0</Item>
</Group-Key>
<Plans>
<Plan>
<Node-Type>Seq Scan</Node-Type>
<Parent-Relationship>Outer</Parent-Relationship>
<Relation-Name>pg_merge_job_570003</Relation-Name>
<Alias>pg_merge_job_570003</Alias>
</Plan>
</Plans>
</Plan>
</Plans>
</Plan>
</Query>
</Master-Query>
</Distributed-Query>
</explain>
-- Validate XML format
SELECT true AS valid FROM explain_xml($$
SELECT l_quantity, count(*) count_quantity FROM lineitem
GROUP BY l_quantity ORDER BY count_quantity, l_quantity$$);
t
-- Test YAML format
EXPLAIN (COSTS FALSE, FORMAT YAML)
SELECT l_quantity, count(*) count_quantity FROM lineitem
GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
- Executor: "Real-Time"
Job:
Task Count: 8
Tasks Shown: "One of 8"
Tasks:
- Node: "host=localhost port=57637 dbname=regression"
Remote Plan:
- Plan:
Node Type: "Aggregate"
Strategy: "Hashed"
Group Key:
- "l_quantity"
Plans:
- Node Type: "Seq Scan"
Parent Relationship: "Outer"
Relation Name: "lineitem_290001"
Alias: "lineitem"
Master Query:
- Plan:
Node Type: "Sort"
Sort Key:
- "COALESCE((sum((COALESCE((sum(intermediate_column_570005_1))::bigint, '0'::bigint))))::bigint, '0'::bigint)"
- "intermediate_column_570005_0"
Plans:
- Node Type: "Aggregate"
Strategy: "Hashed"
Parent Relationship: "Outer"
Group Key:
- "intermediate_column_570005_0"
Plans:
- Node Type: "Seq Scan"
Parent Relationship: "Outer"
Relation Name: "pg_merge_job_570005"
Alias: "pg_merge_job_570005"
-- Test Text format
EXPLAIN (COSTS FALSE, FORMAT TEXT)
SELECT l_quantity, count(*) count_quantity FROM lineitem
GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
Distributed Query into pg_merge_job_570006
Executor: Real-Time
Task Count: 8
Tasks Shown: One of 8
-> Task
Node: host=localhost port=57637 dbname=regression
-> HashAggregate
Group Key: l_quantity
-> Seq Scan on lineitem_290001 lineitem
Master Query
-> Sort
Sort Key: COALESCE((sum((COALESCE((sum(intermediate_column_570006_1))::bigint, '0'::bigint))))::bigint, '0'::bigint), intermediate_column_570006_0
-> HashAggregate
Group Key: intermediate_column_570006_0
-> Seq Scan on pg_merge_job_570006
-- Test verbose
EXPLAIN (COSTS FALSE, VERBOSE TRUE)
SELECT sum(l_quantity) / avg(l_quantity) FROM lineitem;
Distributed Query into pg_merge_job_570007
Executor: Real-Time
Task Count: 8
Tasks Shown: One of 8
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
Output: sum(l_quantity), sum(l_quantity), count(l_quantity)
-> Seq Scan on public.lineitem_290001 lineitem
Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment
Master Query
-> Aggregate
Output: (sum(intermediate_column_570007_0) / (sum(intermediate_column_570007_1) / sum(intermediate_column_570007_2)))
-> Seq Scan on pg_temp_2.pg_merge_job_570007
Output: intermediate_column_570007_0, intermediate_column_570007_1, intermediate_column_570007_2
-- Test join
EXPLAIN (COSTS FALSE)
SELECT * FROM lineitem
JOIN orders ON l_orderkey = o_orderkey AND l_quantity < 5.0
ORDER BY l_quantity LIMIT 10;
Distributed Query into pg_merge_job_570008
Executor: Real-Time
Task Count: 8
Tasks Shown: One of 8
-> Task
Node: host=localhost port=57637 dbname=regression
-> Limit
-> Sort
Sort Key: lineitem.l_quantity
-> Hash Join
Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
-> Seq Scan on lineitem_290001 lineitem
Filter: (l_quantity < 5.0)
-> Hash
-> Seq Scan on orders_290008 orders
Master Query
-> Limit
-> Sort
Sort Key: intermediate_column_570008_4
-> Seq Scan on pg_merge_job_570008
-- Test insert
EXPLAIN (COSTS FALSE)
INSERT INTO lineitem VALUES(1,0);
Distributed Query
Executor: Router
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=57638 dbname=regression
-> Insert on lineitem_290000
-> Result
-- Test update
EXPLAIN (COSTS FALSE)
UPDATE lineitem
SET l_suppkey = 12
WHERE l_orderkey = 1 AND l_partkey = 0;
Distributed Query
Executor: Router
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=57638 dbname=regression
-> Update on lineitem_290000
-> Bitmap Heap Scan on lineitem_290000
Recheck Cond: (l_orderkey = 1)
Filter: (l_partkey = 0)
-> Bitmap Index Scan on lineitem_pkey_290000
Index Cond: (l_orderkey = 1)
-- Test delete
EXPLAIN (COSTS FALSE)
DELETE FROM lineitem
WHERE l_orderkey = 1 AND l_partkey = 0;
Distributed Query
Executor: Router
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=57638 dbname=regression
-> Delete on lineitem_290000
-> Bitmap Heap Scan on lineitem_290000
Recheck Cond: (l_orderkey = 1)
Filter: (l_partkey = 0)
-> Bitmap Index Scan on lineitem_pkey_290000
Index Cond: (l_orderkey = 1)
-- Test single-shard SELECT
EXPLAIN (COSTS FALSE)
SELECT l_quantity FROM lineitem WHERE l_orderkey = 5;
Distributed Query into pg_merge_job_570009
Executor: Router
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=57637 dbname=regression
-> Bitmap Heap Scan on lineitem_290000 lineitem
Recheck Cond: (l_orderkey = 5)
-> Bitmap Index Scan on lineitem_pkey_290000
Index Cond: (l_orderkey = 5)
SELECT true AS valid FROM explain_xml($$
SELECT l_quantity FROM lineitem WHERE l_orderkey = 5$$);
t
SELECT true AS valid FROM explain_json($$
SELECT l_quantity FROM lineitem WHERE l_orderkey = 5$$);
t
-- Test CREATE TABLE ... AS
EXPLAIN (COSTS FALSE)
CREATE TABLE explain_result AS
SELECT * FROM lineitem;
Distributed Query into pg_merge_job_570012
Executor: Real-Time
Task Count: 8
Tasks Shown: One of 8
-> Task
Node: host=localhost port=57637 dbname=regression
-> Seq Scan on lineitem_290001 lineitem
Master Query
-> Seq Scan on pg_merge_job_570012
-- Test having
EXPLAIN (COSTS FALSE, VERBOSE TRUE)
SELECT sum(l_quantity) / avg(l_quantity) FROM lineitem
HAVING sum(l_quantity) > 100;
Distributed Query into pg_merge_job_570013
Executor: Real-Time
Task Count: 8
Tasks Shown: One of 8
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
Output: sum(l_quantity), sum(l_quantity), count(l_quantity), sum(l_quantity)
-> Seq Scan on public.lineitem_290001 lineitem
Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment
Master Query
-> Aggregate
Output: (sum(intermediate_column_570013_0) / (sum(intermediate_column_570013_1) / sum(intermediate_column_570013_2)))
Filter: (sum(pg_merge_job_570013.intermediate_column_570013_3) > '100'::numeric)
-> Seq Scan on pg_temp_2.pg_merge_job_570013
Output: intermediate_column_570013_0, intermediate_column_570013_1, intermediate_column_570013_2, intermediate_column_570013_3
-- Test having without aggregate
EXPLAIN (COSTS FALSE, VERBOSE TRUE)
SELECT l_quantity FROM lineitem
GROUP BY l_quantity
HAVING l_quantity > (100 * random());
Distributed Query into pg_merge_job_570014
Executor: Real-Time
Task Count: 8
Tasks Shown: One of 8
-> Task
Node: host=localhost port=57637 dbname=regression
-> HashAggregate
Output: l_quantity, l_quantity
Group Key: lineitem.l_quantity
-> Seq Scan on public.lineitem_290001 lineitem
Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment
Master Query
-> HashAggregate
Output: intermediate_column_570014_0
Group Key: pg_merge_job_570014.intermediate_column_570014_0
Filter: ((pg_merge_job_570014.intermediate_column_570014_1)::double precision > ('100'::double precision * random()))
-> Seq Scan on pg_temp_2.pg_merge_job_570014
Output: intermediate_column_570014_0, intermediate_column_570014_1
-- Test all tasks output
SET citus.explain_all_tasks TO on;
EXPLAIN (COSTS FALSE)
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
Distributed Query into pg_merge_job_570015
Executor: Real-Time
Task Count: 4
Tasks Shown: All
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_290005 lineitem
Filter: (l_orderkey > 9030)
-> Task
Node: host=localhost port=57638 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_290004 lineitem
Filter: (l_orderkey > 9030)
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_290007 lineitem
Filter: (l_orderkey > 9030)
-> Task
Node: host=localhost port=57638 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_290006 lineitem
Filter: (l_orderkey > 9030)
Master Query
-> Aggregate
-> Seq Scan on pg_merge_job_570015
SELECT true AS valid FROM explain_xml($$
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030$$);
t
SELECT true AS valid FROM explain_json($$
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030$$);
t
-- Test track tracker
SET citus.task_executor_type TO 'task-tracker';
SET citus.explain_all_tasks TO off;
EXPLAIN (COSTS FALSE)
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
Distributed Query into pg_merge_job_570018
Executor: Task-Tracker
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_290005 lineitem
Filter: (l_orderkey > 9030)
Master Query
-> Aggregate
-> Seq Scan on pg_merge_job_570018
-- Test re-partition join
SET citus.large_table_shard_count TO 1;
EXPLAIN (COSTS FALSE)
SELECT count(*)
FROM lineitem, orders, customer, supplier
WHERE l_orderkey = o_orderkey
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey;
Distributed Query into pg_merge_job_570021
Executor: Task-Tracker
Task Count: 1
Tasks Shown: None, not supported for re-partition queries
-> MapMergeJob
Map Task Count: 1
Merge Task Count: 1
-> MapMergeJob
Map Task Count: 8
Merge Task Count: 1
Master Query
-> Aggregate
-> Seq Scan on pg_merge_job_570021
EXPLAIN (COSTS FALSE, FORMAT JSON)
SELECT count(*)
FROM lineitem, orders, customer, supplier
WHERE l_orderkey = o_orderkey
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey;
[
{
"Executor": "Task-Tracker",
"Job": {
"Task Count": 1,
"Tasks Shown": "None, not supported for re-partition queries",
"Depended Jobs": [
{
"Map Task Count": 1,
"Merge Task Count": 1,
"Depended Jobs": [
{
"Map Task Count": 8,
"Merge Task Count": 1
}
]
}
]
},
"Master Query": [
{
"Plan": {
"Node Type": "Aggregate",
"Strategy": "Plain",
"Plans": [
{
"Node Type": "Seq Scan",
"Parent Relationship": "Outer",
"Relation Name": "pg_merge_job_570024",
"Alias": "pg_merge_job_570024"
}
]
}
}
]
}
]
SELECT true AS valid FROM explain_json($$
SELECT count(*)
FROM lineitem, orders, customer, supplier
WHERE l_orderkey = o_orderkey
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey$$);
t
EXPLAIN (COSTS FALSE, FORMAT XML)
SELECT count(*)
FROM lineitem, orders, customer, supplier
WHERE l_orderkey = o_orderkey
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey;
<explain xmlns="http://www.postgresql.org/2009/explain">
<Distributed-Query>
<Executor>Task-Tracker</Executor>
<Job>
<Task-Count>1</Task-Count>
<Tasks-Shown>None, not supported for re-partition queries</Tasks-Shown>
<Depended-Jobs>
<MapMergeJob>
<Map-Task-Count>1</Map-Task-Count>
<Merge-Task-Count>1</Merge-Task-Count>
<Depended-Jobs>
<MapMergeJob>
<Map-Task-Count>8</Map-Task-Count>
<Merge-Task-Count>1</Merge-Task-Count>
</MapMergeJob>
</Depended-Jobs>
</MapMergeJob>
</Depended-Jobs>
</Job>
<Master-Query>
<Query>
<Plan>
<Node-Type>Aggregate</Node-Type>
<Strategy>Plain</Strategy>
<Plans>
<Plan>
<Node-Type>Seq Scan</Node-Type>
<Parent-Relationship>Outer</Parent-Relationship>
<Relation-Name>pg_merge_job_570030</Relation-Name>
<Alias>pg_merge_job_570030</Alias>
</Plan>
</Plans>
</Plan>
</Query>
</Master-Query>
</Distributed-Query>
</explain>
SELECT true AS valid FROM explain_xml($$
SELECT count(*)
FROM lineitem, orders, customer, supplier
WHERE l_orderkey = o_orderkey
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey$$);
t
EXPLAIN (COSTS FALSE, FORMAT YAML)
SELECT count(*)
FROM lineitem, orders, customer, supplier
WHERE l_orderkey = o_orderkey
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey;
- Executor: "Task-Tracker"
Job:
Task Count: 1
Tasks Shown: "None, not supported for re-partition queries"
Depended Jobs:
- Map Task Count: 1
Merge Task Count: 1
Depended Jobs:
- Map Task Count: 8
Merge Task Count: 1
Master Query:
- Plan:
Node Type: "Aggregate"
Strategy: "Plain"
Plans:
- Node Type: "Seq Scan"
Parent Relationship: "Outer"
Relation Name: "pg_merge_job_570036"
Alias: "pg_merge_job_570036"
-- test parallel aggregates
SET parallel_setup_cost=0;
ERROR: unrecognized configuration parameter "parallel_setup_cost"
SET parallel_tuple_cost=0;
ERROR: unrecognized configuration parameter "parallel_tuple_cost"
SET min_parallel_relation_size=0;
ERROR: unrecognized configuration parameter "min_parallel_relation_size"
SET max_parallel_workers_per_gather=4;
ERROR: unrecognized configuration parameter "max_parallel_workers_per_gather"
-- ensure local plans display correctly
CREATE TABLE lineitem_clone (LIKE lineitem);
EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem_clone;
Aggregate
-> Seq Scan on lineitem_clone
-- ensure distributed plans don't break
EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem;
Distributed Query into pg_merge_job_570037
Executor: Task-Tracker
Task Count: 8
Tasks Shown: One of 8
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_290001 lineitem
Master Query
-> Aggregate
-> Seq Scan on pg_merge_job_570037

View File

@ -86,6 +86,7 @@ SELECT name FROM researchers WHERE lab_id = 3 AND id = 6;
(1 row)
-- even if created by PL/pgSQL...
\set VERBOSITY terse
BEGIN;
DO $$
BEGIN
@ -120,6 +121,7 @@ END $$;
NOTICE: caught not_null_violation
COMMIT;
ERROR: cannot ROLLBACK TO SAVEPOINT in transactions which modify distributed tables
\set VERBOSITY default
-- should be valid to edit labs after researchers...
BEGIN;
INSERT INTO researchers VALUES (8, 5, 'Douglas Engelbart');
@ -318,11 +320,11 @@ DEFERRABLE INITIALLY IMMEDIATE
FOR EACH ROW EXECUTE PROCEDURE reject_bad();
\c - - - :master_port
-- test partial failure; worker_1 succeeds, 2 fails
\set VERBOSITY terse
BEGIN;
INSERT INTO objects VALUES (1, 'apple');
INSERT INTO objects VALUES (2, 'BAD');
WARNING: illegal value
CONTEXT: while executing command on localhost:57638
INSERT INTO labs VALUES (7, 'E Corp');
COMMIT;
-- data should be persisted
@ -378,11 +380,9 @@ BEGIN;
INSERT INTO objects VALUES (1, 'apple');
INSERT INTO objects VALUES (2, 'BAD');
WARNING: illegal value
CONTEXT: while executing command on localhost:57638
INSERT INTO labs VALUES (8, 'Aperture Science');
INSERT INTO labs VALUES (9, 'BAD');
WARNING: illegal value
CONTEXT: while executing command on localhost:57637
ERROR: could not modify any active placements
COMMIT;
-- data should NOT be persisted
@ -424,7 +424,6 @@ INSERT INTO objects VALUES (2, 'BAD');
INSERT INTO labs VALUES (9, 'Umbrella Corporation');
COMMIT;
WARNING: illegal value
CONTEXT: while executing command on localhost:57638
-- data should be persisted
SELECT * FROM objects WHERE id = 2;
id | name
@ -473,9 +472,7 @@ INSERT INTO labs VALUES (8, 'Aperture Science');
INSERT INTO labs VALUES (9, 'BAD');
COMMIT;
WARNING: illegal value
CONTEXT: while executing command on localhost:57638
WARNING: illegal value
CONTEXT: while executing command on localhost:57637
ERROR: could not commit transaction on any active nodes
-- data should NOT be persisted
SELECT * FROM objects WHERE id = 1;
@ -511,7 +508,7 @@ INSERT INTO labs VALUES (8, 'Aperture Science');
INSERT INTO labs VALUES (9, 'BAD');
COMMIT;
WARNING: illegal value
CONTEXT: while executing command on localhost:57637
\set VERBOSITY default
-- data to objects should be persisted, but labs should not...
SELECT * FROM objects WHERE id = 1;
id | name

View File

@ -757,33 +757,35 @@ DEBUG: Plan is router executable
SELECT id
FROM articles_hash
WHERE author_id = 1
GROUP BY id;
GROUP BY id
ORDER BY id;
DEBUG: predicate pruning for shardId 840001
DEBUG: Creating router plan
DEBUG: Plan is router executable
id
----
41
11
31
1
11
21
31
41
(5 rows)
-- single shard select with distinct is router plannable
SELECT distinct id
SELECT DISTINCT id
FROM articles_hash
WHERE author_id = 1;
WHERE author_id = 1
ORDER BY id;
DEBUG: predicate pruning for shardId 840001
DEBUG: Creating router plan
DEBUG: Plan is router executable
id
----
41
11
31
1
11
21
31
41
(5 rows)
-- single shard aggregate is router plannable

View File

@ -494,17 +494,18 @@ DEBUG: Plan is router executable
SELECT id
FROM articles
WHERE author_id = 1
GROUP BY id;
GROUP BY id
ORDER BY id;
DEBUG: predicate pruning for shardId 850001
DEBUG: Creating router plan
DEBUG: Plan is router executable
id
----
41
11
31
1
11
21
31
41
(5 rows)
-- copying from a single shard table does not require the master query

View File

@ -6,6 +6,10 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 270000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 270000;
-- print major version to make version-specific tests clear
SHOW server_version \gset
SELECT substring(:'server_version', '\d+\.\d+') AS major_version;
-- Create tables for subquery tests
CREATE TABLE lineitem_subquery (

View File

@ -3,6 +3,14 @@
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 270000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 270000;
-- print major version to make version-specific tests clear
SHOW server_version \gset
SELECT substring(:'server_version', '\d+\.\d+') AS major_version;
major_version
---------------
9.6
(1 row)
-- Create tables for subquery tests
CREATE TABLE lineitem_subquery (
l_orderkey bigint not null,
@ -765,10 +773,10 @@ FROM
Tasks Shown: One of 2
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate (cost=40.01..40.02 rows=1 width=32)
-> GroupAggregate (cost=39.89..39.99 rows=1 width=556)
-> Aggregate (cost=40.01..40.02 rows=1 width=16)
-> GroupAggregate (cost=39.89..39.99 rows=1 width=48)
Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id)
-> Merge Join (cost=39.89..39.97 rows=1 width=556)
-> Merge Join (cost=39.89..39.97 rows=1 width=540)
Merge Cond: ((((users.composite_id).tenant_id) = ((events.composite_id).tenant_id)) AND (((users.composite_id).user_id) = ((events.composite_id).user_id)))
-> Sort (cost=28.08..28.09 rows=6 width=32)
Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id)
@ -779,7 +787,7 @@ FROM
-> Seq Scan on events_270009 events (cost=0.00..11.79 rows=3 width=556)
Filter: ((event_type)::text = ANY ('{click,submit,pay}'::text[]))
Master Query
-> Aggregate (cost=0.01..0.02 rows=1 width=0)
-> Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Seq Scan on pg_merge_job_270014 (cost=0.00..0.00 rows=0 width=0)
(22 rows)
@ -847,48 +855,51 @@ FROM
GROUP BY
hasdone;
QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Distributed Query into pg_merge_job_270015
Executor: Real-Time
Task Count: 2
Tasks Shown: One of 2
-> Task
Node: host=localhost port=57637 dbname=regression
-> HashAggregate (cost=91.94..91.96 rows=2 width=64)
Group Key: COALESCE(('Has done paying'::text), 'Has not done paying'::text)
-> GroupAggregate (cost=91.85..91.90 rows=2 width=88)
-> GroupAggregate (cost=91.93..91.98 rows=2 width=48)
Group Key: subquery_top.hasdone
-> Sort (cost=91.93..91.93 rows=2 width=64)
Sort Key: subquery_top.hasdone
-> Subquery Scan on subquery_top (cost=91.85..91.92 rows=2 width=64)
-> GroupAggregate (cost=91.85..91.90 rows=2 width=112)
Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id), ('Has done paying'::text)
-> Sort (cost=91.85..91.85 rows=2 width=88)
Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id), ('Has done paying'::text)
-> Merge Left Join (cost=91.75..91.84 rows=2 width=88)
Merge Cond: ((((users.composite_id).tenant_id) = ((events_2.composite_id).tenant_id)) AND (((users.composite_id).user_id) = ((events_2.composite_id).user_id)))
-> Unique (cost=79.46..79.48 rows=2 width=40)
-> Sort (cost=79.46..79.47 rows=2 width=40)
-> Unique (cost=79.46..79.48 rows=2 width=56)
-> Sort (cost=79.46..79.47 rows=2 width=56)
Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id), ('action=>1'::text), events.event_time
-> Append (cost=0.00..79.45 rows=2 width=40)
-> Nested Loop (cost=0.00..39.72 rows=1 width=40)
-> Append (cost=0.00..79.45 rows=2 width=56)
-> Nested Loop (cost=0.00..39.72 rows=1 width=56)
Join Filter: (((users.composite_id).tenant_id = (events.composite_id).tenant_id) AND ((users.composite_id).user_id = (events.composite_id).user_id))
-> Seq Scan on events_270009 events (cost=0.00..11.62 rows=1 width=40)
Filter: ((event_type)::text = 'click'::text)
-> Seq Scan on users_270013 users (cost=0.00..28.00 rows=6 width=32)
Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type))
-> Nested Loop (cost=0.00..39.72 rows=1 width=40)
-> Nested Loop (cost=0.00..39.72 rows=1 width=56)
Join Filter: (((users_1.composite_id).tenant_id = (events_1.composite_id).tenant_id) AND ((users_1.composite_id).user_id = (events_1.composite_id).user_id))
-> Seq Scan on events_270009 events_1 (cost=0.00..11.62 rows=1 width=40)
Filter: ((event_type)::text = 'submit'::text)
-> Seq Scan on users_270013 users_1 (cost=0.00..28.00 rows=6 width=32)
Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type))
-> Materialize (cost=12.29..12.31 rows=1 width=48)
-> Unique (cost=12.29..12.30 rows=1 width=32)
-> Sort (cost=12.29..12.29 rows=1 width=32)
-> Unique (cost=12.29..12.30 rows=1 width=80)
-> Sort (cost=12.29..12.29 rows=1 width=80)
Sort Key: ((events_2.composite_id).tenant_id), ((events_2.composite_id).user_id)
-> Seq Scan on events_270009 events_2 (cost=0.00..12.28 rows=1 width=32)
-> Seq Scan on events_270009 events_2 (cost=0.00..12.28 rows=1 width=80)
Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type) AND ((event_type)::text = 'pay'::text))
Master Query
-> HashAggregate (cost=0.00..0.18 rows=10 width=0)
-> HashAggregate (cost=0.00..0.00 rows=0 width=0)
Group Key: intermediate_column_270015_2
-> Seq Scan on pg_merge_job_270015 (cost=0.00..0.00 rows=0 width=0)
(40 rows)
(43 rows)
-- Union, left join and having subquery pushdown
EXPLAIN SELECT
@ -1023,15 +1034,15 @@ LIMIT
-> Limit (cost=100.43..100.44 rows=6 width=56)
-> Sort (cost=100.43..100.44 rows=6 width=56)
Sort Key: (max(users.lastseen)) DESC
-> GroupAggregate (cost=100.14..100.29 rows=6 width=548)
-> GroupAggregate (cost=100.14..100.29 rows=6 width=56)
Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id)
-> Sort (cost=100.14..100.16 rows=6 width=548)
Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id)
-> Nested Loop Left Join (cost=40.04..100.06 rows=6 width=548)
-> Limit (cost=28.08..28.09 rows=6 width=40)
-> Sort (cost=28.08..28.09 rows=6 width=40)
-> Limit (cost=28.08..28.09 rows=6 width=24)
-> Sort (cost=28.08..28.09 rows=6 width=24)
Sort Key: users.lastseen DESC
-> Seq Scan on users_270013 users (cost=0.00..28.00 rows=6 width=40)
-> Seq Scan on users_270013 users (cost=0.00..28.00 rows=6 width=24)
Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type))
-> Limit (cost=11.96..11.96 rows=1 width=524)
-> Sort (cost=11.96..11.96 rows=1 width=524)
@ -1039,8 +1050,8 @@ LIMIT
-> Seq Scan on events_270009 events (cost=0.00..11.95 rows=1 width=524)
Filter: (((composite_id).tenant_id = ((users.composite_id).tenant_id)) AND ((composite_id).user_id = ((users.composite_id).user_id)))
Master Query
-> Limit (cost=0.01..0.02 rows=0 width=0)
-> Sort (cost=0.01..0.02 rows=0 width=0)
-> Limit (cost=0.00..0.00 rows=0 width=0)
-> Sort (cost=0.00..0.00 rows=0 width=0)
Sort Key: intermediate_column_270017_2 DESC
-> Seq Scan on pg_merge_job_270017 (cost=0.00..0.00 rows=0 width=0)
(29 rows)

File diff suppressed because it is too large Load Diff

View File

@ -143,6 +143,7 @@ sysopen my $fh, "tmp_check/tmp-bin/psql", O_CREAT|O_TRUNC|O_RDWR, 0700
print $fh "#!/bin/bash\n";
print $fh "exec $bindir/psql ";
print $fh "--variable=master_port=$masterPort ";
print $fh "--variable=SHOW_CONTEXT=always ";
for my $workeroff (0 .. $#workerPorts)
{
my $port = $workerPorts[$workeroff];
@ -226,14 +227,14 @@ for my $port (@workerPorts)
###
for my $port (@workerPorts)
{
system("$bindir/psql",
system("$bindir/psql", '-X',
('-h', $host, '-p', $port, '-U', $user, "postgres",
'-c', "CREATE DATABASE regression;")) == 0
or die "Could not create regression database on worker";
for my $extension (@extensions)
{
system("$bindir/psql",
system("$bindir/psql", '-X',
('-h', $host, '-p', $port, '-U', $user, "regression",
'-c', "CREATE EXTENSION IF NOT EXISTS \"$extension\";")) == 0
or die "Could not create extension on worker";
@ -241,7 +242,7 @@ for my $port (@workerPorts)
foreach my $dataType (keys %dataTypes)
{
system("$bindir/psql",
system("$bindir/psql", '-X',
('-h', $host, '-p', $port, '-U', $user, "regression",
'-c', "CREATE TYPE $dataType AS $dataTypes{$dataType};")) == 0
or die "Could not create TYPE $dataType on worker";
@ -249,7 +250,7 @@ for my $port (@workerPorts)
foreach my $function (keys %functions)
{
system("$bindir/psql",
system("$bindir/psql", '-X',
('-h', $host, '-p', $port, '-U', $user, "regression",
'-c', "CREATE FUNCTION $function RETURNS $functions{$function};")) == 0
or die "Could not create FUNCTION $function on worker";
@ -257,7 +258,7 @@ for my $port (@workerPorts)
foreach my $operator (keys %operators)
{
system("$bindir/psql",
system("$bindir/psql", '-X',
('-h', $host, '-p', $port, '-U', $user, "regression",
'-c', "CREATE OPERATOR $operator $operators{$operator};")) == 0
or die "Could not create OPERATOR $operator on worker";
@ -265,7 +266,7 @@ for my $port (@workerPorts)
foreach my $fdw (keys %fdws)
{
system("$bindir/psql",
system("$bindir/psql", '-X',
('-h', $host, '-p', $port, '-U', $user, "regression",
'-c', "CREATE FOREIGN DATA WRAPPER $fdw HANDLER $fdws{$fdw};")) == 0
or die "Could not create foreign data wrapper $fdw on worker";
@ -273,7 +274,7 @@ for my $port (@workerPorts)
foreach my $fdwServer (keys %fdwServers)
{
system("$bindir/psql",
system("$bindir/psql", '-X',
('-h', $host, '-p', $port, '-U', $user, "regression",
'-c', "CREATE SERVER $fdwServer FOREIGN DATA WRAPPER $fdwServers{$fdwServer};")) == 0
or die "Could not create server $fdwServer on worker";
@ -287,7 +288,7 @@ my @arguments = (
'--user', $user
);
if ($majorversion eq '9.5')
if ($majorversion eq '9.5' || $majorversion eq '9.6')
{
push(@arguments, '--bindir', "tmp_check/tmp-bin");
}

View File

@ -11,5 +11,6 @@
/multi_load_large_records.sql
/multi_load_more_data.sql
/multi_subquery.sql
/multi_subquery_0.sql
/worker_copy.sql
/multi_complex_count_distinct.sql

View File

@ -6,6 +6,11 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 520000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 520000;
CREATE OR REPLACE FUNCTION array_sort (ANYARRAY)
RETURNS ANYARRAY LANGUAGE SQL
AS $$
SELECT ARRAY(SELECT unnest($1) ORDER BY 1)
$$;
-- Check multi_cat_agg() aggregate which is used to implement array_agg()
@ -21,16 +26,16 @@ SELECT array_agg(distinct l_orderkey ORDER BY l_orderkey) FROM lineitem;
-- Check array_agg() for different data types and LIMIT clauses
SELECT array_agg(l_partkey) FROM lineitem GROUP BY l_orderkey
SELECT array_sort(array_agg(l_partkey)) FROM lineitem GROUP BY l_orderkey
ORDER BY l_orderkey LIMIT 10;
SELECT array_agg(l_extendedprice) FROM lineitem GROUP BY l_orderkey
SELECT array_sort(array_agg(l_extendedprice)) FROM lineitem GROUP BY l_orderkey
ORDER BY l_orderkey LIMIT 10;
SELECT array_agg(l_shipdate) FROM lineitem GROUP BY l_orderkey
SELECT array_sort(array_agg(l_shipdate)) FROM lineitem GROUP BY l_orderkey
ORDER BY l_orderkey LIMIT 10;
SELECT array_agg(l_shipmode) FROM lineitem GROUP BY l_orderkey
SELECT array_sort(array_agg(l_shipmode)) FROM lineitem GROUP BY l_orderkey
ORDER BY l_orderkey LIMIT 10;
-- Check that we can execute array_agg() within other functions
@ -42,15 +47,15 @@ SELECT array_length(array_agg(l_orderkey), 1) FROM lineitem;
-- expressions. Note that the l_orderkey ranges are such that the matching rows
-- lie in different shards.
SELECT l_quantity, count(*), avg(l_extendedprice), array_agg(l_orderkey) FROM lineitem
SELECT l_quantity, count(*), avg(l_extendedprice), array_sort(array_agg(l_orderkey)) FROM lineitem
WHERE l_quantity < 5 AND l_orderkey > 5500 AND l_orderkey < 9500
GROUP BY l_quantity ORDER BY l_quantity;
SELECT l_quantity, array_agg(extract (month FROM o_orderdate)) AS my_month
SELECT l_quantity, array_sort(array_agg(extract (month FROM o_orderdate))) AS my_month
FROM lineitem, orders WHERE l_orderkey = o_orderkey AND l_quantity < 5
AND l_orderkey > 5500 AND l_orderkey < 9500 GROUP BY l_quantity ORDER BY l_quantity;
SELECT l_quantity, array_agg(l_orderkey * 2 + 1) FROM lineitem WHERE l_quantity < 5
SELECT l_quantity, array_sort(array_agg(l_orderkey * 2 + 1)) FROM lineitem WHERE l_quantity < 5
AND octet_length(l_comment) + octet_length('randomtext'::text) > 40
AND l_orderkey > 5500 AND l_orderkey < 9500 GROUP BY l_quantity ORDER BY l_quantity;

View File

@ -6,6 +6,8 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 570000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 570000;
-- print major version to make version-specific tests clear
SELECT substring(version(), '\d+\.\d+') AS major_version;
\a\t
@ -185,3 +187,16 @@ EXPLAIN (COSTS FALSE, FORMAT YAML)
WHERE l_orderkey = o_orderkey
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey;
-- test parallel aggregates
SET parallel_setup_cost=0;
SET parallel_tuple_cost=0;
SET min_parallel_relation_size=0;
SET max_parallel_workers_per_gather=4;
-- ensure local plans display correctly
CREATE TABLE lineitem_clone (LIKE lineitem);
EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem_clone;
-- ensure distributed plans don't break
EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem;

View File

@ -63,6 +63,7 @@ COMMIT;
SELECT name FROM researchers WHERE lab_id = 3 AND id = 6;
-- even if created by PL/pgSQL...
\set VERBOSITY terse
BEGIN;
DO $$
BEGIN
@ -93,6 +94,7 @@ EXCEPTION
RAISE NOTICE 'caught not_null_violation';
END $$;
COMMIT;
\set VERBOSITY default
-- should be valid to edit labs after researchers...
@ -247,6 +249,7 @@ FOR EACH ROW EXECUTE PROCEDURE reject_bad();
\c - - - :master_port
-- test partial failure; worker_1 succeeds, 2 fails
\set VERBOSITY terse
BEGIN;
INSERT INTO objects VALUES (1, 'apple');
INSERT INTO objects VALUES (2, 'BAD');
@ -399,6 +402,7 @@ INSERT INTO objects VALUES (1, 'apple');
INSERT INTO labs VALUES (8, 'Aperture Science');
INSERT INTO labs VALUES (9, 'BAD');
COMMIT;
\set VERBOSITY default
-- data to objects should be persisted, but labs should not...
SELECT * FROM objects WHERE id = 1;

View File

@ -334,12 +334,14 @@ SELECT *
SELECT id
FROM articles_hash
WHERE author_id = 1
GROUP BY id;
GROUP BY id
ORDER BY id;
-- single shard select with distinct is router plannable
SELECT distinct id
SELECT DISTINCT id
FROM articles_hash
WHERE author_id = 1;
WHERE author_id = 1
ORDER BY id;
-- single shard aggregate is router plannable
SELECT avg(word_count)

View File

@ -260,7 +260,8 @@ SELECT *
SELECT id
FROM articles
WHERE author_id = 1
GROUP BY id;
GROUP BY id
ORDER BY id;
-- copying from a single shard table does not require the master query
COPY articles_single_shard TO stdout;