diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index b8e706226..ca4662e58 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -624,6 +624,28 @@ static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events eventCount, bool *cancellationReceived); static long MillisecondsBetweenTimestamps(instr_time startTime, instr_time endTime); + +/* + * AdaptiveExecutorPreExecutorRun gets called right before postgres starts its executor + * run. Given that the result of our subplans would be evaluated before the first call to + * the exec function of our custom scan we make sure our subplans have executed before. + */ +void +AdaptiveExecutorPreExecutorRun(CitusScanState *scanState) +{ + DistributedPlan *distributedPlan = scanState->distributedPlan; + + /* + * PostgreSQL takes locks on all partitions in the executor. It's not entirely + * clear why this is necessary (instead of locking the parent during DDL), but + * we do the same for consistency. + */ + LockPartitionsForDistributedPlan(distributedPlan); + + ExecuteSubPlans(distributedPlan); +} + + /* * AdaptiveExecutor is called via CitusExecScan on the * first call of CitusExecScan. The function fills the tupleStore @@ -649,15 +671,6 @@ AdaptiveExecutor(CitusScanState *scanState) /* we should only call this once before the scan finished */ Assert(!scanState->finishedRemoteScan); - /* - * PostgreSQL takes locks on all partitions in the executor. It's not entirely - * clear why this is necessary (instead of locking the parent during DDL), but - * we do the same for consistency. - */ - LockPartitionsForDistributedPlan(distributedPlan); - - ExecuteSubPlans(distributedPlan); - bool hasDependentJobs = HasDependentJobs(job); if (hasDependentJobs) { diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index a572be9f6..c11cafb4b 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -49,6 +49,7 @@ static Node * DelayedErrorCreateScan(CustomScan *scan); static void CitusBeginScan(CustomScanState *node, EState *estate, int eflags); static void CitusBeginScanWithCoordinatorProcessing(CustomScanState *node, EState *estate, int eflags); +static void CitusPreExecScan(CitusScanState *scanState); static void HandleDeferredShardPruningForFastPathQueries( DistributedPlan *distributedPlan); static void HandleDeferredShardPruningForInserts(DistributedPlan *distributedPlan); @@ -114,6 +115,29 @@ static CustomExecMethods CoordinatorInsertSelectCustomExecMethods = { }; +/* + * IsCitusCustomState returns if a given PlanState node is a CitusCustomState node. + */ +bool +IsCitusCustomState(PlanState *planState) +{ + if (!IsA(planState, CustomScanState)) + { + return false; + } + + CustomScanState *css = castNode(CustomScanState, planState); + if (css->methods == &AdaptiveExecutorCustomExecMethods || + css->methods == &TaskTrackerCustomExecMethods || + css->methods == &CoordinatorInsertSelectCustomExecMethods) + { + return true; + } + + return false; +} + + /* * Let PostgreSQL know about Citus' custom scan nodes. */ @@ -141,7 +165,24 @@ CitusBeginScan(CustomScanState *node, EState *estate, int eflags) CitusScanState *scanState = (CitusScanState *) node; #if PG_VERSION_NUM >= 120000 + + /* + * Since we are using a tuplestore we cannot use the virtual tuples postgres had + * already setup on the CustomScan. Instead we need to reinitialize the tuples as + * minimal. + * + * During initialization postgres also created the projection information and the + * quals, but both are 'compiled' to be executed on virtual tuples. Since we replaced + * the tuples with minimal tuples we also compile both the projection and the quals + * on to these 'new' tuples. + */ ExecInitResultSlot(&scanState->customScanState.ss.ps, &TTSOpsMinimalTuple); + + ExecInitScanTupleSlot(node->ss.ps.state, &node->ss, node->ss.ps.scandesc, + &TTSOpsMinimalTuple); + ExecAssignScanProjectionInfoWithVarno(&node->ss, INDEX_VAR); + + node->ss.ps.qual = ExecInitQual(node->ss.ps.plan->qual, (PlanState *) node); #endif DistributedPlan *distributedPlan = scanState->distributedPlan; @@ -158,6 +199,16 @@ CitusBeginScan(CustomScanState *node, EState *estate, int eflags) } +/* + * CitusPreExecScan is called right before postgres' executor starts pulling tuples. + */ +static void +CitusPreExecScan(CitusScanState *scanState) +{ + AdaptiveExecutorPreExecutorRun(scanState); +} + + /* * CitusExecScan is called when a tuple is pulled from a custom scan. * On the first call, it executes the distributed query and writes the @@ -176,9 +227,7 @@ CitusExecScan(CustomScanState *node) scanState->finishedRemoteScan = true; } - TupleTableSlot *resultSlot = ReturnTupleFromTuplestore(scanState); - - return resultSlot; + return ReturnTupleFromTuplestore(scanState); } @@ -596,6 +645,7 @@ AdaptiveExecutorCreateScan(CustomScan *scan) scanState->distributedPlan = GetDistributedPlan(scan); scanState->customScanState.methods = &AdaptiveExecutorCustomExecMethods; + scanState->PreExecScan = &CitusPreExecScan; return (Node *) scanState; } @@ -726,8 +776,7 @@ CitusReScan(CustomScanState *node) TupleDesc ScanStateGetTupleDescriptor(CitusScanState *scanState) { - return scanState->customScanState.ss.ps.ps_ResultTupleSlot-> - tts_tupleDescriptor; + return scanState->customScanState.ss.ss_ScanTupleSlot->tts_tupleDescriptor; } diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index fad9031fc..567b2bb6f 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -21,6 +21,7 @@ #include "distributed/commands/utility_hook.h" #include "distributed/insert_select_executor.h" #include "distributed/insert_select_planner.h" +#include "distributed/listutils.h" #include "distributed/master_protocol.h" #include "distributed/multi_executor.h" #include "distributed/multi_master_planner.h" @@ -34,6 +35,7 @@ #include "distributed/worker_protocol.h" #include "executor/execdebug.h" #include "commands/copy.h" +#include "nodes/execnodes.h" #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" #include "parser/parsetree.h" @@ -69,6 +71,9 @@ int ExecutorLevel = 0; static Relation StubRelation(TupleDesc tupleDescriptor); static bool AlterTableConstraintCheck(QueryDesc *queryDesc); static bool IsLocalReferenceTableJoinPlan(PlannedStmt *plan); +static List * FindCitusCustomScanStates(PlanState *planState); +static bool CitusCustomScanStateWalker(PlanState *planState, + List **citusCustomScanStates); /* * CitusExecutorStart is the ExecutorStart_hook that gets called when @@ -123,10 +128,25 @@ CitusExecutorRun(QueryDesc *queryDesc, { DestReceiver *dest = queryDesc->dest; + /* + * We do some potentially time consuming operations our self now before we hand of + * control to postgres' executor. To make sure that time spent is accurately measured + * we remove the totaltime instrumentation from the queryDesc. Instead we will start + * and stop the instrumentation of the total time and put it back on the queryDesc + * before returning (or rethrowing) from this function. + */ + Instrumentation *volatile totalTime = queryDesc->totaltime; + queryDesc->totaltime = NULL; + PG_TRY(); { ExecutorLevel++; + if (totalTime) + { + InstrStartNode(totalTime); + } + if (CitusHasBeenLoaded()) { if (IsLocalReferenceTableJoinPlan(queryDesc->plannedstmt) && @@ -174,13 +194,47 @@ CitusExecutorRun(QueryDesc *queryDesc, } else { + /* switch into per-query memory context before calling PreExecScan */ + MemoryContext oldcontext = MemoryContextSwitchTo( + queryDesc->estate->es_query_cxt); + + /* + * Call PreExecScan for all citus custom scan nodes prior to starting the + * postgres exec scan to give some citus scan nodes some time to initialize + * state that would be too late if it were to initialize when the first tuple + * would need to return. + */ + List *citusCustomScanStates = FindCitusCustomScanStates(queryDesc->planstate); + CitusScanState *citusScanState = NULL; + foreach_ptr(citusScanState, citusCustomScanStates) + { + if (citusScanState->PreExecScan) + { + citusScanState->PreExecScan(citusScanState); + } + } + + /* postgres will switch here again and will restore back on its own */ + MemoryContextSwitchTo(oldcontext); + standard_ExecutorRun(queryDesc, direction, count, execute_once); } + if (totalTime) + { + InstrStopNode(totalTime, queryDesc->estate->es_processed); + queryDesc->totaltime = totalTime; + } + ExecutorLevel--; } PG_CATCH(); { + if (totalTime) + { + queryDesc->totaltime = totalTime; + } + ExecutorLevel--; PG_RE_THROW(); @@ -189,6 +243,38 @@ CitusExecutorRun(QueryDesc *queryDesc, } +/* + * FindCitusCustomScanStates returns a list of all citus custom scan states in it. + */ +static List * +FindCitusCustomScanStates(PlanState *planState) +{ + List *citusCustomScanStates = NIL; + CitusCustomScanStateWalker(planState, &citusCustomScanStates); + return citusCustomScanStates; +} + + +/* + * CitusCustomScanStateWalker walks a planState tree structure and adds all + * CitusCustomState nodes to the list passed by reference as the second argument. + */ +static bool +CitusCustomScanStateWalker(PlanState *planState, List **citusCustomScanStates) +{ + if (IsCitusCustomState(planState)) + { + CitusScanState *css = (CitusScanState *) planState; + *citusCustomScanStates = lappend(*citusCustomScanStates, css); + + /* breaks the walking of this tree */ + return true; + } + return planstate_tree_walker(planState, CitusCustomScanStateWalker, + citusCustomScanStates); +} + + /* * ReturnTupleFromTuplestore reads the next tuple from the tuple store of the * given Citus scan node and returns it. It returns null if all tuples are read @@ -214,10 +300,77 @@ ReturnTupleFromTuplestore(CitusScanState *scanState) forwardScanDirection = false; } - TupleTableSlot *resultSlot = scanState->customScanState.ss.ps.ps_ResultTupleSlot; - tuplestore_gettupleslot(tupleStore, forwardScanDirection, false, resultSlot); + ExprState *qual = scanState->customScanState.ss.ps.qual; + ProjectionInfo *projInfo = scanState->customScanState.ss.ps.ps_ProjInfo; + ExprContext *econtext = scanState->customScanState.ss.ps.ps_ExprContext; - return resultSlot; + if (!qual && !projInfo) + { + /* no quals, nor projections return directly from the tuple store. */ + TupleTableSlot *slot = scanState->customScanState.ss.ss_ScanTupleSlot; + tuplestore_gettupleslot(tupleStore, forwardScanDirection, false, slot); + return slot; + } + + for (;;) + { + /* + * If there is a very selective qual on the Citus Scan node we might block + * interupts for a longer time if we would not check for interrupts in this loop + */ + CHECK_FOR_INTERRUPTS(); + + /* + * Reset per-tuple memory context to free any expression evaluation + * storage allocated in the previous tuple cycle. + */ + ResetExprContext(econtext); + + TupleTableSlot *slot = scanState->customScanState.ss.ss_ScanTupleSlot; + tuplestore_gettupleslot(tupleStore, forwardScanDirection, false, slot); + + if (TupIsNull(slot)) + { + /* + * When the tuple is null we have reached the end of the tuplestore. We will + * return a null tuple, however, depending on the existence of a projection we + * need to either return the scan tuple or the projected tuple. + */ + if (projInfo) + { + return ExecClearTuple(projInfo->pi_state.resultslot); + } + else + { + return slot; + } + } + + /* place the current tuple into the expr context */ + econtext->ecxt_scantuple = slot; + + if (!ExecQual(qual, econtext)) + { + /* skip nodes that do not satisfy the qual (filter) */ + InstrCountFiltered1(scanState, 1); + continue; + } + + /* found a satisfactory scan tuple */ + if (projInfo) + { + /* + * Form a projection tuple, store it in the result tuple slot and return it. + * ExecProj works on the ecxt_scantuple on the context stored earlier. + */ + return ExecProject(projInfo); + } + else + { + /* Here, we aren't projecting, so just return scan tuple */ + return slot; + } + } } diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 8fa615315..e0d25cb97 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -24,6 +24,7 @@ #include "distributed/insert_select_planner.h" #include "distributed/intermediate_result_pruning.h" #include "distributed/intermediate_results.h" +#include "distributed/listutils.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/multi_executor.h" @@ -53,6 +54,7 @@ #endif #include "optimizer/pathnode.h" #include "optimizer/planner.h" +#include "optimizer/planmain.h" #include "utils/builtins.h" #include "utils/datum.h" #include "utils/lsyscache.h" @@ -98,6 +100,8 @@ static PlannedStmt * FinalizeNonRouterPlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan, CustomScan *customScan); static PlannedStmt * FinalizeRouterPlan(PlannedStmt *localPlan, CustomScan *customScan); +static List * makeTargetListFromCustomScanList(List *custom_scan_tlist); +static List * makeCustomScanTargetlistFromExistingTargetList(List *existingTargetlist); static int32 BlessRecordExpressionList(List *exprs); static void CheckNodeIsDumpable(Node *node); static Node * CheckNodeCopyAndSerialization(Node *node); @@ -1425,18 +1429,58 @@ FinalizeNonRouterPlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan, static PlannedStmt * FinalizeRouterPlan(PlannedStmt *localPlan, CustomScan *customScan) { - ListCell *targetEntryCell = NULL; - List *targetList = NIL; List *columnNameList = NIL; + customScan->custom_scan_tlist = + makeCustomScanTargetlistFromExistingTargetList(localPlan->planTree->targetlist); + customScan->scan.plan.targetlist = + makeTargetListFromCustomScanList(customScan->custom_scan_tlist); + + /* extract the column names from the final targetlist*/ + TargetEntry *targetEntry = NULL; + foreach_ptr(targetEntry, customScan->scan.plan.targetlist) + { + Value *columnName = makeString(targetEntry->resname); + columnNameList = lappend(columnNameList, columnName); + } + + PlannedStmt *routerPlan = makeNode(PlannedStmt); + routerPlan->planTree = (Plan *) customScan; + + RangeTblEntry *remoteScanRangeTableEntry = RemoteScanRangeTableEntry(columnNameList); + routerPlan->rtable = list_make1(remoteScanRangeTableEntry); + + /* add original range table list for access permission checks */ + routerPlan->rtable = list_concat(routerPlan->rtable, localPlan->rtable); + + routerPlan->canSetTag = true; + routerPlan->relationOids = NIL; + + routerPlan->queryId = localPlan->queryId; + routerPlan->utilityStmt = localPlan->utilityStmt; + routerPlan->commandType = localPlan->commandType; + routerPlan->hasReturning = localPlan->hasReturning; + + return routerPlan; +} + + +/* + * makeCustomScanTargetlistFromExistingTargetList rebuilds the targetlist from the remote + * query into a list that can be used as the custom_scan_tlist for our Citus Custom Scan. + */ +static List * +makeCustomScanTargetlistFromExistingTargetList(List *existingTargetlist) +{ + List *custom_scan_tlist = NIL; + /* we will have custom scan range table entry as the first one in the list */ - int customScanRangeTableIndex = 1; + const int customScanRangeTableIndex = 1; /* build a targetlist to read from the custom scan output */ - foreach(targetEntryCell, localPlan->planTree->targetlist) + TargetEntry *targetEntry = NULL; + foreach_ptr(targetEntry, existingTargetlist) { - TargetEntry *targetEntry = lfirst(targetEntryCell); - Assert(IsA(targetEntry, TargetEntry)); /* @@ -1465,32 +1509,40 @@ FinalizeRouterPlan(PlannedStmt *localPlan, CustomScan *customScan) TargetEntry *newTargetEntry = flatCopyTargetEntry(targetEntry); newTargetEntry->expr = (Expr *) newVar; - targetList = lappend(targetList, newTargetEntry); - - Value *columnName = makeString(targetEntry->resname); - columnNameList = lappend(columnNameList, columnName); + custom_scan_tlist = lappend(custom_scan_tlist, newTargetEntry); } - customScan->scan.plan.targetlist = targetList; + return custom_scan_tlist; +} - PlannedStmt *routerPlan = makeNode(PlannedStmt); - routerPlan->planTree = (Plan *) customScan; - RangeTblEntry *remoteScanRangeTableEntry = RemoteScanRangeTableEntry(columnNameList); - routerPlan->rtable = list_make1(remoteScanRangeTableEntry); - - /* add original range table list for access permission checks */ - routerPlan->rtable = list_concat(routerPlan->rtable, localPlan->rtable); - - routerPlan->canSetTag = true; - routerPlan->relationOids = NIL; - - routerPlan->queryId = localPlan->queryId; - routerPlan->utilityStmt = localPlan->utilityStmt; - routerPlan->commandType = localPlan->commandType; - routerPlan->hasReturning = localPlan->hasReturning; - - return routerPlan; +/* + * makeTargetListFromCustomScanList based on a custom_scan_tlist create the target list to + * use on the Citus Custom Scan Node. The targetlist differs from the custom_scan_tlist in + * a way that the expressions in the targetlist all are references to the index (resno) in + * the custom_scan_tlist in their varattno while the varno is replaced with INDEX_VAR + * instead of the range table entry index. + */ +static List * +makeTargetListFromCustomScanList(List *custom_scan_tlist) +{ + List *targetList = NIL; + TargetEntry *targetEntry = NULL; + int resno = 1; + foreach_ptr(targetEntry, custom_scan_tlist) + { + /* + * INDEX_VAR is used to reference back to the TargetEntry in custom_scan_tlist by + * its resno (index) + */ + Var *newVar = makeVarFromTargetEntry(INDEX_VAR, targetEntry); + TargetEntry *newTargetEntry = makeTargetEntry((Expr *) newVar, resno, + targetEntry->resname, + targetEntry->resjunk); + targetList = lappend(targetList, newTargetEntry); + resno++; + } + return targetList; } @@ -1779,6 +1831,25 @@ multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo, { DistTableCacheEntry *cacheEntry = NULL; + if (ReplaceCitusExtraDataContainer && IsCitusExtraDataContainerRelation(rte)) + { + /* + * We got here by planning the query part that needs to be executed on the query + * coordinator node. + * We have verified the occurrence of the citus_extra_datacontainer function + * encoding the remote scan we plan to execute here. We will replace all paths + * with a path describing our custom scan. + */ + Path *path = CreateCitusCustomScanPath(root, relOptInfo, restrictionIndex, rte, + ReplaceCitusExtraDataContainerWithCustomScan); + + /* replace all paths with our custom scan and recalculate cheapest */ + relOptInfo->pathlist = list_make1(path); + set_cheapest(relOptInfo); + + return; + } + AdjustReadIntermediateResultCost(rte, relOptInfo); AdjustReadIntermediateResultArrayCost(rte, relOptInfo); diff --git a/src/backend/distributed/planner/multi_logical_optimizer.c b/src/backend/distributed/planner/multi_logical_optimizer.c index 36c863b96..dcfd3fe89 100644 --- a/src/backend/distributed/planner/multi_logical_optimizer.c +++ b/src/backend/distributed/planner/multi_logical_optimizer.c @@ -1485,6 +1485,15 @@ MasterExtendedOpNode(MultiExtendedOp *originalOpNode, if (originalHavingQual != NULL) { newHavingQual = MasterAggregateMutator(originalHavingQual, &walkerContext); + if (IsA(newHavingQual, List)) + { + /* + * unflatten having qual to allow standard planner to work when transforming + * the master query to a plan + */ + newHavingQual = (Node *) make_ands_explicit( + castNode(List, newHavingQual)); + } } } diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index c8ee68fe3..50c8683fb 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -79,6 +79,7 @@ static bool HasTablesample(Query *queryTree); static bool HasComplexRangeTableType(Query *queryTree); static bool IsReadIntermediateResultFunction(Node *node); static bool IsReadIntermediateResultArrayFunction(Node *node); +static bool IsCitusExtraDataContainerFunc(Node *node); static bool IsFunctionWithOid(Node *node, Oid funcOid); static bool ExtractFromExpressionWalker(Node *node, QualifierWalkerContext *walkerContext); @@ -811,6 +812,39 @@ IsReadIntermediateResultArrayFunction(Node *node) } +/* + * IsCitusExtraDataContainerRelation determines whether a range table entry contains a + * call to the citus_extradata_container function. + */ +bool +IsCitusExtraDataContainerRelation(RangeTblEntry *rte) +{ + if (rte->rtekind != RTE_FUNCTION || list_length(rte->functions) != 1) + { + /* avoid more expensive checks below for non-functions */ + return false; + } + + if (!CitusHasBeenLoaded() || !CheckCitusVersion(DEBUG5)) + { + return false; + } + + return FindNodeCheck((Node *) rte->functions, IsCitusExtraDataContainerFunc); +} + + +/* + * IsCitusExtraDataContainerFunc determines whether a given node is a function call + * to the citus_extradata_container function. + */ +static bool +IsCitusExtraDataContainerFunc(Node *node) +{ + return IsFunctionWithOid(node, CitusExtraDataContainerFuncId()); +} + + /* * IsFunctionWithOid determines whether a given node is a function call * to the read_intermediate_result function. diff --git a/src/backend/distributed/planner/multi_master_planner.c b/src/backend/distributed/planner/multi_master_planner.c index 377b0400d..30702e79b 100644 --- a/src/backend/distributed/planner/multi_master_planner.c +++ b/src/backend/distributed/planner/multi_master_planner.c @@ -14,50 +14,38 @@ #include "postgres.h" #include "catalog/pg_type.h" -#include "commands/extension.h" #include "distributed/citus_ruleutils.h" -#include "distributed/function_utils.h" #include "distributed/listutils.h" -#include "distributed/multi_logical_optimizer.h" +#include "distributed/metadata_cache.h" #include "distributed/multi_master_planner.h" #include "distributed/multi_physical_planner.h" -#include "distributed/distributed_planner.h" -#include "distributed/multi_server_executor.h" -#include "distributed/version_compat.h" -#include "distributed/worker_protocol.h" #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" -#include "nodes/print.h" -#include "optimizer/clauses.h" -#include "optimizer/cost.h" -#include "optimizer/planmain.h" -#include "optimizer/tlist.h" -#include "optimizer/subselect.h" -#if PG_VERSION_NUM >= 120000 -#include "optimizer/optimizer.h" -#else -#include "optimizer/var.h" -#endif -#include "utils/builtins.h" -#include "utils/guc.h" -#include "utils/memutils.h" -#include "utils/rel.h" -#include "utils/syscache.h" -#include "utils/lsyscache.h" - +#include "optimizer/planner.h" +#include "rewrite/rewriteManip.h" static List * MasterTargetList(List *workerTargetList); -static PlannedStmt * BuildSelectStatement(Query *masterQuery, List *masterTargetList, - CustomScan *remoteScan); -static Agg * BuildAggregatePlan(PlannerInfo *root, Query *masterQuery, Plan *subPlan); -static bool HasDistinctOrOrderByAggregate(Query *masterQuery); -static bool UseGroupAggregateWithHLL(Query *masterQuery); -static bool QueryContainsAggregateWithHLL(Query *query); -static Plan * BuildDistinctPlan(Query *masterQuery, Plan *subPlan); -static Agg * makeAggNode(List *groupClauseList, List *havingQual, - AggStrategy aggrStrategy, List *queryTargetList, Plan *subPlan); -static void FinalizeStatement(PlannerInfo *root, PlannedStmt *stmt, Plan *topLevelPlan); +static PlannedStmt * BuildSelectStatementViaStdPlanner(Query *masterQuery, + List *masterTargetList, + CustomScan *remoteScan); +static bool FindCitusExtradataContainerRTE(Node *node, RangeTblEntry **result); +static Plan * CitusCustomScanPathPlan(PlannerInfo *root, RelOptInfo *rel, + struct CustomPath *best_path, List *tlist, + List *clauses, List *custom_plans); + +bool ReplaceCitusExtraDataContainer = false; +CustomScan *ReplaceCitusExtraDataContainerWithCustomScan = NULL; + +/* + * CitusCustomScanPathMethods defines the methods for a custom path we insert into the + * planner during the planning of the query part that will be executed on the node + * coordinating the query. + */ +static CustomPathMethods CitusCustomScanPathMethods = { + .CustomName = "CitusCustomScanPath", + .PlanCustomPath = CitusCustomScanPathPlan, +}; /* * MasterNodeSelectPlan takes in a distributed plan and a custom scan node which @@ -75,11 +63,7 @@ MasterNodeSelectPlan(DistributedPlan *distributedPlan, CustomScan *remoteScan) Job *workerJob = distributedPlan->workerJob; List *workerTargetList = workerJob->jobQuery->targetList; List *masterTargetList = MasterTargetList(workerTargetList); - - PlannedStmt *masterSelectPlan = BuildSelectStatement(masterQuery, masterTargetList, - remoteScan); - - return masterSelectPlan; + return BuildSelectStatementViaStdPlanner(masterQuery, masterTargetList, remoteScan); } @@ -132,516 +116,199 @@ MasterTargetList(List *workerTargetList) /* - * BuildSelectStatement builds the final select statement to run on the master - * node, before returning results to the user. The function first gets the custom - * scan node for all results fetched to the master, and layers aggregation, sort - * and limit plans on top of the scan statement if necessary. + * CreateCitusCustomScanPath creates a custom path node that will return the CustomScan if + * the path ends up in the best_path during postgres planning. We use this function during + * the set relation hook of postgres during the planning of the query part that will be + * executed on the query coordinating node. */ -static PlannedStmt * -BuildSelectStatement(Query *masterQuery, List *masterTargetList, CustomScan *remoteScan) +Path * +CreateCitusCustomScanPath(PlannerInfo *root, RelOptInfo *relOptInfo, + Index restrictionIndex, RangeTblEntry *rte, + CustomScan *remoteScan) { - /* top level select query should have only one range table entry */ - Assert(list_length(masterQuery->rtable) == 1); - Agg *aggregationPlan = NULL; - Plan *topLevelPlan = NULL; - List *sortClauseList = copyObject(masterQuery->sortClause); - List *columnNameList = NIL; - TargetEntry *targetEntry = NULL; - - PlannerGlobal *glob = makeNode(PlannerGlobal); - PlannerInfo *root = makeNode(PlannerInfo); - root->parse = masterQuery; - root->glob = glob; - root->query_level = 1; - root->planner_cxt = CurrentMemoryContext; - root->wt_param_id = -1; - - - /* (1) make PlannedStmt and set basic information */ - PlannedStmt *selectStatement = makeNode(PlannedStmt); - selectStatement->canSetTag = true; - selectStatement->relationOids = NIL; - selectStatement->commandType = CMD_SELECT; - - - remoteScan->custom_scan_tlist = masterTargetList; - - /* (2) add an aggregation plan if needed */ - if (masterQuery->hasAggs || masterQuery->groupClause) - { - remoteScan->scan.plan.targetlist = masterTargetList; - - aggregationPlan = BuildAggregatePlan(root, masterQuery, &remoteScan->scan.plan); - topLevelPlan = (Plan *) aggregationPlan; - selectStatement->planTree = topLevelPlan; - } - else - { - /* otherwise set the final projections on the scan plan directly */ - - /* - * The masterTargetList contains all columns that we fetch from - * the worker as non-resjunk. - * - * Here the output of the plan node determines the output of the query. - * We therefore use the targetList of masterQuery, which has non-output - * columns set as resjunk. - */ - remoteScan->scan.plan.targetlist = masterQuery->targetList; - topLevelPlan = &remoteScan->scan.plan; - } + CitusCustomScanPath *path = (CitusCustomScanPath *) newNode( + sizeof(CitusCustomScanPath), T_CustomPath); + path->custom_path.methods = &CitusCustomScanPathMethods; + path->custom_path.path.pathtype = T_CustomScan; + path->custom_path.path.pathtarget = relOptInfo->reltarget; + path->custom_path.path.parent = relOptInfo; /* - * (3) create distinct plan if needed. + * The 100k rows we put on the cost of the path is kind of arbitrary and could be + * improved in accuracy to produce better plans. * - * distinct on() requires sort + unique plans. Unique itself is not enough - * as it only compares the current value with previous one when checking - * uniqueness, thus ordering is necessary. If already has order by - * clause we append distinct clauses to the end of it. Postgresql requires - * that if both distinct on() and order by exists, ordering shall start - * on distinct clauses. Therefore we can safely append distinct clauses to - * the end of order by clauses. Although the same column may appear more - * than once in order by clauses, created plan uses only one instance, for - * example order by a,b,a,a,b,c is translated to equivalent order by a,b,c. + * 100k on the row estimate causes the postgres planner to behave very much like the + * old citus planner in the plans it produces. Namely the old planner had hardcoded + * the use of Hash Aggregates for most of the operations, unless a postgres guc was + * set that would disallow hash aggregates to be used. * - * If the query has distinct clause but not distinct on, we first create - * distinct plan that is either HashAggreate or Sort + Unique plans depending - * on hashable property of columns in distinct clause. If there is order by - * clause, it is handled after distinct planning. + * Ideally we would be able to provide estimates close to postgres' estimates on the + * workers to let the standard planner choose an optimal solution for the masterQuery. */ - if (masterQuery->hasDistinctOn) - { - ListCell *distinctCell = NULL; - foreach(distinctCell, masterQuery->distinctClause) - { - SortGroupClause *singleDistinctClause = lfirst(distinctCell); - Index sortGroupRef = singleDistinctClause->tleSortGroupRef; + path->custom_path.path.rows = 100000; + path->remoteScan = remoteScan; - if (get_sortgroupref_clause_noerr(sortGroupRef, sortClauseList) == NULL) - { - sortClauseList = lappend(sortClauseList, singleDistinctClause); - } - } - } - else if (masterQuery->distinctClause) - { - Plan *distinctPlan = BuildDistinctPlan(masterQuery, topLevelPlan); - topLevelPlan = distinctPlan; - } - - /* (4) add a sorting plan if needed */ - if (sortClauseList) - { - Sort *sortPlan = make_sort_from_sortclauses(sortClauseList, topLevelPlan); - - /* just for reproducible costs between different PostgreSQL versions */ - sortPlan->plan.startup_cost = 0; - sortPlan->plan.total_cost = 0; - sortPlan->plan.plan_rows = 0; - - topLevelPlan = (Plan *) sortPlan; - } - - /* - * (5) add a unique plan for distinctOn. - * If the query has distinct on we add a sort clause in step 3. Therefore - * Step 4 always creates a sort plan. - * */ - if (masterQuery->hasDistinctOn) - { - Assert(IsA(topLevelPlan, Sort)); - topLevelPlan = - (Plan *) make_unique_from_sortclauses(topLevelPlan, - masterQuery->distinctClause); - } - - /* (5) add a limit plan if needed */ - if (masterQuery->limitCount || masterQuery->limitOffset) - { - Node *limitCount = masterQuery->limitCount; - Node *limitOffset = masterQuery->limitOffset; - Limit *limitPlan = make_limit(topLevelPlan, limitOffset, limitCount); - topLevelPlan = (Plan *) limitPlan; - } - - /* - * (6) set top level plan in the plantree and copy over some things from - * PlannerInfo - */ - FinalizeStatement(root, selectStatement, topLevelPlan); - - /* - * (7) Replace rangetable with one with nice names to show in EXPLAIN plans - */ - foreach_ptr(targetEntry, masterTargetList) - { - columnNameList = lappend(columnNameList, makeString(targetEntry->resname)); - } - - RangeTblEntry *customScanRangeTableEntry = linitial(selectStatement->rtable); - customScanRangeTableEntry->eref = makeAlias("remote_scan", columnNameList); - - return selectStatement; + return (Path *) path; } /* - * FinalizeStatement sets some necessary fields on the final statement and its - * plan to make it work with the regular postgres executor. This code is copied - * almost verbatim from standard_planner in the PG source code. + * CitusCustomScanPathPlan is called for the CitusCustomScanPath node in the best_path + * after the postgres planner has evaluated all possible paths. * - * Modifications from original code: - * - Added SS_attach_initplans call - */ -static void -FinalizeStatement(PlannerInfo *root, PlannedStmt *result, Plan *top_plan) -{ - ListCell *lp, - *lr; - PlannerGlobal *glob = root->glob; - - /* Taken from create_plan */ - SS_attach_initplans(root, top_plan); - - /* - * If any Params were generated, run through the plan tree and compute - * each plan node's extParam/allParam sets. Ideally we'd merge this into - * set_plan_references' tree traversal, but for now it has to be separate - * because we need to visit subplans before not after main plan. - */ - if (glob->paramExecTypes != NIL) - { - Assert(list_length(glob->subplans) == list_length(glob->subroots)); - forboth(lp, glob->subplans, lr, glob->subroots) - { - Plan *subplan = (Plan *) lfirst(lp); - PlannerInfo *subroot = lfirst_node(PlannerInfo, lr); - - SS_finalize_plan(subroot, subplan); - } - SS_finalize_plan(root, top_plan); - } - - /* final cleanup of the plan */ - Assert(glob->finalrtable == NIL); - Assert(glob->finalrowmarks == NIL); - Assert(glob->resultRelations == NIL); - Assert(glob->rootResultRelations == NIL); - - top_plan = set_plan_references(root, top_plan); - - /* ... and the subplans (both regular subplans and initplans) */ - Assert(list_length(glob->subplans) == list_length(glob->subroots)); - forboth(lp, glob->subplans, lr, glob->subroots) - { - Plan *subplan = (Plan *) lfirst(lp); - PlannerInfo *subroot = lfirst_node(PlannerInfo, lr); - - lfirst(lp) = set_plan_references(subroot, subplan); - } - result->transientPlan = glob->transientPlan; - result->dependsOnRole = glob->dependsOnRole; - result->parallelModeNeeded = glob->parallelModeNeeded; - result->planTree = top_plan; - - result->rtable = glob->finalrtable; - result->resultRelations = glob->resultRelations; -#if PG_VERSION_NUM < 120000 - result->nonleafResultRelations = glob->nonleafResultRelations; -#endif - result->rootResultRelations = glob->rootResultRelations; - result->subplans = glob->subplans; - result->rewindPlanIDs = glob->rewindPlanIDs; - result->rowMarks = glob->finalrowmarks; - result->relationOids = glob->relationOids; - result->invalItems = glob->invalItems; - result->paramExecTypes = glob->paramExecTypes; -} - - -/* - * BuildAggregatePlan creates and returns an aggregate plan. This aggregate plan - * builds aggregation and grouping operators (if any) that are to be executed on - * the master node. - */ -static Agg * -BuildAggregatePlan(PlannerInfo *root, Query *masterQuery, Plan *subPlan) -{ - /* assert that we need to build an aggregate plan */ - Assert(masterQuery->hasAggs || masterQuery->groupClause); - AggClauseCosts aggregateCosts; - AggStrategy aggregateStrategy = AGG_PLAIN; - List *groupColumnList = masterQuery->groupClause; - List *aggregateTargetList = masterQuery->targetList; - - /* - * Replaces SubLink nodes with SubPlan nodes in the having section of the - * query. (and creates the subplans in root->subplans) - * - * Would be nice if we could use masterQuery->hasSubLinks to only call - * these when that is true. However, for some reason hasSubLinks is false - * even when there are SubLinks. - */ - Node *havingQual = SS_process_sublinks(root, masterQuery->havingQual, true); - - /* - * Right now this is not really needed, since we don't support correlated - * subqueries anyway. Once we do calling this is critical to do right after - * calling SS_process_sublinks, according to the postgres function comment. - */ - havingQual = SS_replace_correlation_vars(root, havingQual); - - - /* estimate aggregate execution costs */ - memset(&aggregateCosts, 0, sizeof(AggClauseCosts)); - get_agg_clause_costs(root, (Node *) aggregateTargetList, AGGSPLIT_SIMPLE, - &aggregateCosts); - - get_agg_clause_costs(root, (Node *) havingQual, AGGSPLIT_SIMPLE, &aggregateCosts); - - - /* if we have grouping, then initialize appropriate information */ - if (list_length(groupColumnList) > 0) - { - bool groupingIsHashable = grouping_is_hashable(groupColumnList); - bool groupingIsSortable = grouping_is_sortable(groupColumnList); - bool hasUnhashableAggregate = HasDistinctOrOrderByAggregate(masterQuery); - - if (!groupingIsHashable && !groupingIsSortable) - { - ereport(ERROR, (errmsg("grouped column list cannot be hashed or sorted"))); - } - - /* - * Postgres hash aggregate strategy does not support distinct aggregates - * in group and order by with aggregate operations. - * see nodeAgg.c:build_pertrans_for_aggref(). In that case we use - * sorted agg strategy, otherwise we use hash strategy. - * - * If the master query contains hll aggregate functions and the client set - * hll.force_groupagg to on, then we choose to use group aggregation. - */ - if (!enable_hashagg || !groupingIsHashable || hasUnhashableAggregate || - UseGroupAggregateWithHLL(masterQuery)) - { - char *messageHint = NULL; - if (!enable_hashagg && groupingIsHashable) - { - messageHint = "Consider setting enable_hashagg to on."; - } - - if (!groupingIsSortable) - { - ereport(ERROR, (errmsg("grouped column list must cannot be sorted"), - errdetail("Having a distinct aggregate requires " - "grouped column list to be sortable."), - messageHint ? errhint("%s", messageHint) : 0)); - } - - aggregateStrategy = AGG_SORTED; - subPlan = (Plan *) make_sort_from_sortclauses(groupColumnList, subPlan); - } - else - { - aggregateStrategy = AGG_HASHED; - } - } - - /* finally create the plan */ - Agg *aggregatePlan = makeAggNode(groupColumnList, (List *) havingQual, - aggregateStrategy, aggregateTargetList, subPlan); - - /* just for reproducible costs between different PostgreSQL versions */ - aggregatePlan->plan.startup_cost = 0; - aggregatePlan->plan.total_cost = 0; - aggregatePlan->plan.plan_rows = 0; - - return aggregatePlan; -} - - -/* - * HasDistinctAggregate returns true if the query has a distinct - * aggregate in its target list or in having clause. - */ -static bool -HasDistinctOrOrderByAggregate(Query *masterQuery) -{ - ListCell *allColumnCell = NULL; - - List *targetVarList = pull_var_clause((Node *) masterQuery->targetList, - PVC_INCLUDE_AGGREGATES); - List *havingVarList = pull_var_clause(masterQuery->havingQual, - PVC_INCLUDE_AGGREGATES); - - List *allColumnList = list_concat(targetVarList, havingVarList); - foreach(allColumnCell, allColumnList) - { - Node *columnNode = lfirst(allColumnCell); - if (IsA(columnNode, Aggref)) - { - Aggref *aggref = (Aggref *) columnNode; - if (aggref->aggdistinct != NIL || aggref->aggorder != NIL) - { - return true; - } - } - } - - return false; -} - - -/* - * UseGroupAggregateWithHLL first checks whether the HLL extension is loaded, if - * it is not then simply return false. Otherwise, checks whether the client set - * the hll.force_groupagg to on. If it is enabled and the master query contains - * hll aggregate function, it returns true. - */ -static bool -UseGroupAggregateWithHLL(Query *masterQuery) -{ - Oid hllId = get_extension_oid(HLL_EXTENSION_NAME, true); - - /* If HLL extension is not loaded, return false */ - if (!OidIsValid(hllId)) - { - return false; - } - - /* If HLL is loaded but related GUC is not set, return false */ - const char *gucStrValue = GetConfigOption(HLL_FORCE_GROUPAGG_GUC_NAME, true, false); - if (gucStrValue == NULL || strcmp(gucStrValue, "off") == 0) - { - return false; - } - - return QueryContainsAggregateWithHLL(masterQuery); -} - - -/* - * QueryContainsAggregateWithHLL returns true if the query has an hll aggregate - * function in it's target list. - */ -static bool -QueryContainsAggregateWithHLL(Query *query) -{ - ListCell *varCell = NULL; - - List *varList = pull_var_clause((Node *) query->targetList, PVC_INCLUDE_AGGREGATES); - foreach(varCell, varList) - { - Var *var = (Var *) lfirst(varCell); - if (nodeTag(var) == T_Aggref) - { - Aggref *aggref = (Aggref *) var; - int argCount = list_length(aggref->args); - Oid hllId = get_extension_oid(HLL_EXTENSION_NAME, false); - Oid hllSchemaOid = get_extension_schema(hllId); - const char *hllSchemaName = get_namespace_name(hllSchemaOid); - - /* - * If the obtained oid is InvalidOid for addFunctionId, that means - * we don't have an hll_add_agg function with the given argument count. - * So, we don't need to double check whether the obtained id is valid. - */ - Oid addFunctionId = FunctionOidExtended(hllSchemaName, HLL_ADD_AGGREGATE_NAME, - argCount, true); - Oid unionFunctionId = FunctionOid(hllSchemaName, HLL_UNION_AGGREGATE_NAME, 1); - - if (aggref->aggfnoid == addFunctionId || aggref->aggfnoid == unionFunctionId) - { - return true; - } - } - } - - return false; -} - - -/* - * BuildDistinctPlan creates an returns a plan for distinct. Depending on - * availability of hash function it chooses HashAgg over Sort/Unique - * plans. - * This function has a potential performance issue since we blindly set - * Plan nodes without looking at cost. We might need to revisit this - * if we have performance issues with select distinct queries. + * This function returns a Plan node, more specifically the CustomScan Plan node that has + * the ability to execute the distributed part of the query. + * + * When this function is called there is an extra list of clauses passed in that might not + * already have been applied to the plan. We add these clauses to the quals this node will + * execute. The quals are evaluated before returning the tuples scanned from the workers + * to the plan above ours to make sure they do not end up in the final result. */ static Plan * -BuildDistinctPlan(Query *masterQuery, Plan *subPlan) +CitusCustomScanPathPlan(PlannerInfo *root, + RelOptInfo *rel, + struct CustomPath *best_path, + List *tlist, + List *clauses, + List *custom_plans) { - Plan *distinctPlan = NULL; - List *distinctClauseList = masterQuery->distinctClause; - List *targetList = copyObject(masterQuery->targetList); + CitusCustomScanPath *citusPath = (CitusCustomScanPath *) best_path; - /* - * We don't need to add distinct plan if all of the columns used in group by - * clause also used in distinct clause, since group by clause guarantees the - * uniqueness of the target list for every row. - */ - if (IsGroupBySubsetOfDistinct(masterQuery->groupClause, masterQuery->distinctClause)) + /* clauses might have been added by the planner, need to add them to our scan */ + RestrictInfo *restrictInfo = NULL; + List **quals = &citusPath->remoteScan->scan.plan.qual; + foreach_ptr(restrictInfo, clauses) { - return subPlan; + *quals = lappend(*quals, restrictInfo->clause); } - - Assert(masterQuery->distinctClause); - Assert(!masterQuery->hasDistinctOn); - - /* - * Create group by plan with HashAggregate if all distinct - * members are hashable, and not containing distinct aggregate. - * Otherwise create sort+unique plan. - */ - bool distinctClausesHashable = grouping_is_hashable(distinctClauseList); - bool hasUnhashableAggregate = HasDistinctOrOrderByAggregate(masterQuery); - - if (enable_hashagg && distinctClausesHashable && !hasUnhashableAggregate) - { - distinctPlan = (Plan *) makeAggNode(distinctClauseList, NIL, AGG_HASHED, - targetList, subPlan); - } - else - { - Sort *sortPlan = make_sort_from_sortclauses(masterQuery->distinctClause, - subPlan); - distinctPlan = (Plan *) make_unique_from_sortclauses((Plan *) sortPlan, - masterQuery->distinctClause); - } - - return distinctPlan; + return (Plan *) citusPath->remoteScan; } /* - * makeAggNode creates a "Agg" plan node. groupClauseList is a list of - * SortGroupClause's. + * BuildSelectStatementViaStdPlanner creates a PlannedStmt where it combines the + * masterQuery and the remoteScan. It utilizes the standard_planner from postgres to + * create a plan based on the masterQuery. */ -static Agg * -makeAggNode(List *groupClauseList, List *havingQual, AggStrategy aggrStrategy, - List *queryTargetList, Plan *subPlan) +static PlannedStmt * +BuildSelectStatementViaStdPlanner(Query *masterQuery, List *masterTargetList, + CustomScan *remoteScan) { - Agg *aggNode = NULL; - int groupColumnCount = list_length(groupClauseList); - AttrNumber *groupColumnIdArray = - extract_grouping_cols(groupClauseList, subPlan->targetlist); - Oid *groupColumnOpArray = extract_grouping_ops(groupClauseList); - const int rowEstimate = 10; + /* + * the standard planner will scribble on the target list. Since it is essential to not + * change the custom_scan_tlist we copy the target list before adding them to any. + * The masterTargetList is used in the end to extract the column names to be added to + * the alias we will create for the CustomScan, (expressed as the + * citus_extradata_container function call in the masterQuery). + */ + remoteScan->custom_scan_tlist = copyObject(masterTargetList); + remoteScan->scan.plan.targetlist = copyObject(masterTargetList); -#if (PG_VERSION_NUM >= 120000) - aggNode = make_agg(queryTargetList, havingQual, aggrStrategy, - AGGSPLIT_SIMPLE, groupColumnCount, groupColumnIdArray, - groupColumnOpArray, - extract_grouping_collations(groupClauseList, - subPlan->targetlist), - NIL, NIL, rowEstimate, subPlan); -#else - aggNode = make_agg(queryTargetList, havingQual, aggrStrategy, - AGGSPLIT_SIMPLE, groupColumnCount, groupColumnIdArray, - groupColumnOpArray, - NIL, NIL, rowEstimate, subPlan); -#endif + /* probably want to do this where we add sublinks to the master plan */ + masterQuery->hasSubLinks = checkExprHasSubLink((Node *) masterQuery); - return aggNode; + /* + * We will overwrite the alias of the rangetable which describes the custom scan. + * Idealy we would have set the correct column names and alias on the range table in + * the master query already when we inserted the extra data container. This could be + * improved in the future. + */ + + /* find the rangetable entry for the extradata container and overwrite its alias */ + RangeTblEntry *extradataContainerRTE = NULL; + FindCitusExtradataContainerRTE((Node *) masterQuery, &extradataContainerRTE); + if (extradataContainerRTE != NULL) + { + /* extract column names from the masterTargetList */ + List *columnNameList = NIL; + TargetEntry *targetEntry = NULL; + foreach_ptr(targetEntry, masterTargetList) + { + columnNameList = lappend(columnNameList, makeString(targetEntry->resname)); + } + extradataContainerRTE->eref = makeAlias("remote_scan", columnNameList); + } + + /* + * Print the master query at debug level 4. Since serializing the query is relatively + * cpu intensive we only perform that if we are actually logging DEBUG4. + */ + const int logMasterQueryLevel = DEBUG4; + if (IsLoggableLevel(logMasterQueryLevel)) + { + StringInfo queryString = makeStringInfo(); + pg_get_query_def(masterQuery, queryString); + elog(logMasterQueryLevel, "master query: %s", queryString->data); + } + + PlannedStmt *standardStmt = NULL; + PG_TRY(); + { + /* This code should not be re-entrant, we check via asserts below */ + Assert(ReplaceCitusExtraDataContainer == false); + Assert(ReplaceCitusExtraDataContainerWithCustomScan == NULL); + ReplaceCitusExtraDataContainer = true; + ReplaceCitusExtraDataContainerWithCustomScan = remoteScan; + + standardStmt = standard_planner(masterQuery, 0, NULL); + + ReplaceCitusExtraDataContainer = false; + ReplaceCitusExtraDataContainerWithCustomScan = NULL; + } + PG_CATCH(); + { + ReplaceCitusExtraDataContainer = false; + ReplaceCitusExtraDataContainerWithCustomScan = NULL; + PG_RE_THROW(); + } + PG_END_TRY(); + + Assert(standardStmt != NULL); + return standardStmt; +} + + +/* + * Finds the rangetable entry in the query that refers to the citus_extradata_container + * and stores the pointer in result. + */ +static bool +FindCitusExtradataContainerRTE(Node *node, RangeTblEntry **result) +{ + if (node == NULL) + { + return false; + } + + if (IsA(node, RangeTblEntry)) + { + RangeTblEntry *rangeTblEntry = castNode(RangeTblEntry, node); + if (rangeTblEntry->rtekind == RTE_FUNCTION && + list_length(rangeTblEntry->functions) == 1) + { + RangeTblFunction *rangeTblFunction = (RangeTblFunction *) linitial( + rangeTblEntry->functions); + FuncExpr *funcExpr = castNode(FuncExpr, rangeTblFunction->funcexpr); + if (funcExpr->funcid == CitusExtraDataContainerFuncId()) + { + *result = rangeTblEntry; + return true; + } + } + + /* query_tree_walker descends into RTEs */ + return false; + } + else if (IsA(node, Query)) + { +#if PG_VERSION_NUM >= 120000 + const int flags = QTW_EXAMINE_RTES_BEFORE; +#else + const int flags = QTW_EXAMINE_RTES; +#endif + return query_tree_walker((Query *) node, FindCitusExtradataContainerRTE, result, + flags); + } + + return expression_tree_walker(node, FindCitusExtradataContainerRTE, result); } diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index b4ad3c893..6420da72b 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -113,7 +113,11 @@ static int ExtractRangeTableId(Node *node); static void ExtractColumns(RangeTblEntry *rangeTableEntry, int rangeTableId, List *dependentJobList, List **columnNames, List **columnVars); static RangeTblEntry * DerivedRangeTableEntry(MultiNode *multiNode, List *columnNames, - List *tableIdList); + List *tableIdList, List *funcColumnNames, + List *funcColumnTypes, + List *funcColumnTypeMods, + List *funcCollations); + static List * DerivedColumnNameList(uint32 columnCount, uint64 generatingJobId); static Query * BuildSubqueryJobQuery(MultiNode *multiNode); static void UpdateAllColumnAttributes(Node *columnContainer, List *rangeTableList, @@ -715,7 +719,8 @@ BuildJobQuery(MultiNode *multiNode, List *dependentJobList) jobQuery->limitOffset = limitOffset; jobQuery->limitCount = limitCount; jobQuery->havingQual = havingQual; - jobQuery->hasAggs = contain_agg_clause((Node *) targetList); + jobQuery->hasAggs = contain_agg_clause((Node *) targetList) || + contain_agg_clause((Node *) havingQual); jobQuery->distinctClause = distinctClause; jobQuery->hasDistinctOn = hasDistinctOn; @@ -754,7 +759,8 @@ BuildReduceQuery(MultiExtendedOp *extendedOpNode, List *dependentJobList) /* create a derived range table for the subtree below the collect */ RangeTblEntry *rangeTableEntry = DerivedRangeTableEntry(multiNode, columnNameList, - OutputTableIdList(multiNode)); + OutputTableIdList(multiNode), + NIL, NIL, NIL, NIL); rangeTableEntry->eref->colnames = columnNameList; ModifyRangeTblExtraData(rangeTableEntry, CITUS_RTE_SHARD, NULL, NULL, NULL); derivedRangeTableList = lappend(derivedRangeTableList, rangeTableEntry); @@ -839,7 +845,8 @@ BaseRangeTableList(MultiNode *multiNode) rangeTableEntry->alias = multiTable->alias; rangeTableEntry->relid = multiTable->relationId; SetRangeTblExtraData(rangeTableEntry, CITUS_RTE_RELATION, NULL, NULL, - list_make1_int(multiTable->rangeTableId)); + list_make1_int(multiTable->rangeTableId), + NIL, NIL, NIL, NIL); baseRangeTableList = lappend(baseRangeTableList, rangeTableEntry); } @@ -863,15 +870,18 @@ BaseRangeTableList(MultiNode *multiNode) * on worker nodes in case of the master node query. */ static RangeTblEntry * -DerivedRangeTableEntry(MultiNode *multiNode, List *columnList, List *tableIdList) +DerivedRangeTableEntry(MultiNode *multiNode, List *columnList, List *tableIdList, + List *funcColumnNames, List *funcColumnTypes, + List *funcColumnTypeMods, List *funcCollations) { RangeTblEntry *rangeTableEntry = makeNode(RangeTblEntry); rangeTableEntry->inFromCl = true; rangeTableEntry->eref = makeNode(Alias); rangeTableEntry->eref->colnames = columnList; - SetRangeTblExtraData(rangeTableEntry, CITUS_RTE_REMOTE_QUERY, NULL, NULL, - tableIdList); + SetRangeTblExtraData(rangeTableEntry, CITUS_RTE_REMOTE_QUERY, NULL, NULL, tableIdList, + funcColumnNames, funcColumnTypes, funcColumnTypeMods, + funcCollations); return rangeTableEntry; } @@ -1221,9 +1231,36 @@ QueryJoinTree(MultiNode *multiNode, List *dependentJobList, List **rangeTableLis List *columnNameList = DerivedColumnNameList(columnCount, dependentJob->jobId); + List *funcColumnNames = NIL; + List *funcColumnTypes = NIL; + List *funcColumnTypeMods = NIL; + List *funcCollations = NIL; + + TargetEntry *targetEntry = NULL; + foreach_ptr(targetEntry, dependentTargetList) + { + Node *expr = (Node *) targetEntry->expr; + + char *name = targetEntry->resname; + if (name == NULL) + { + name = pstrdup("unnamed"); + } + + funcColumnNames = lappend(funcColumnNames, makeString(name)); + + funcColumnTypes = lappend_oid(funcColumnTypes, exprType(expr)); + funcColumnTypeMods = lappend_int(funcColumnTypeMods, exprTypmod(expr)); + funcCollations = lappend_oid(funcCollations, exprCollation(expr)); + } + RangeTblEntry *rangeTableEntry = DerivedRangeTableEntry(multiNode, columnNameList, - tableIdList); + tableIdList, + funcColumnNames, + funcColumnTypes, + funcColumnTypeMods, + funcCollations); RangeTblRef *rangeTableRef = makeNode(RangeTblRef); rangeTableRef->rtindex = list_length(*rangeTableList) + 1; diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 97b9cece0..b0ae355f8 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -45,6 +45,7 @@ #include "distributed/multi_join_order.h" #include "distributed/multi_logical_optimizer.h" #include "distributed/distributed_planner.h" +#include "distributed/multi_master_planner.h" #include "distributed/multi_router_planner.h" #include "distributed/multi_server_executor.h" #include "distributed/pg_dist_partition.h" diff --git a/src/backend/distributed/sql/citus--9.2-2--9.3-1.sql b/src/backend/distributed/sql/citus--9.2-2--9.3-1.sql index 1d572507c..58feeed42 100644 --- a/src/backend/distributed/sql/citus--9.2-2--9.3-1.sql +++ b/src/backend/distributed/sql/citus--9.2-2--9.3-1.sql @@ -2,3 +2,4 @@ /* bump version to 9.3-1 */ +#include "udfs/citus_extradata_container/9.3-1.sql" diff --git a/src/backend/distributed/sql/udfs/citus_extradata_container/9.3-1.sql b/src/backend/distributed/sql/udfs/citus_extradata_container/9.3-1.sql new file mode 100644 index 000000000..5d4dc4caa --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_extradata_container/9.3-1.sql @@ -0,0 +1,12 @@ +-- we use the citus_extradata_container function as a range table entry in the query part +-- executed on the coordinator. Now that we are letting this query be planned by the +-- postgres planner we need to be able to pass column names and type information with this +-- function. This requires the change of the prototype of the function and add a return +-- type. Changing the return type of the function requires we drop the function first. +DROP FUNCTION citus_extradata_container(INTERNAL); +CREATE OR REPLACE FUNCTION citus_extradata_container(INTERNAL) + RETURNS SETOF record + LANGUAGE C +AS 'MODULE_PATHNAME', $$citus_extradata_container$$; +COMMENT ON FUNCTION pg_catalog.citus_extradata_container(INTERNAL) + IS 'placeholder function to store additional data in postgres node trees'; diff --git a/src/backend/distributed/sql/udfs/citus_extradata_container/latest.sql b/src/backend/distributed/sql/udfs/citus_extradata_container/latest.sql new file mode 100644 index 000000000..5d4dc4caa --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_extradata_container/latest.sql @@ -0,0 +1,12 @@ +-- we use the citus_extradata_container function as a range table entry in the query part +-- executed on the coordinator. Now that we are letting this query be planned by the +-- postgres planner we need to be able to pass column names and type information with this +-- function. This requires the change of the prototype of the function and add a return +-- type. Changing the return type of the function requires we drop the function first. +DROP FUNCTION citus_extradata_container(INTERNAL); +CREATE OR REPLACE FUNCTION citus_extradata_container(INTERNAL) + RETURNS SETOF record + LANGUAGE C +AS 'MODULE_PATHNAME', $$citus_extradata_container$$; +COMMENT ON FUNCTION pg_catalog.citus_extradata_container(INTERNAL) + IS 'placeholder function to store additional data in postgres node trees'; diff --git a/src/backend/distributed/utils/citus_nodefuncs.c b/src/backend/distributed/utils/citus_nodefuncs.c index 4da2d8983..ebb34f5e4 100644 --- a/src/backend/distributed/utils/citus_nodefuncs.c +++ b/src/backend/distributed/utils/citus_nodefuncs.c @@ -72,9 +72,10 @@ PG_FUNCTION_INFO_V1(citus_extradata_container); * will not be handled by out/readfuncs.c. For the current uses that's ok. */ void -SetRangeTblExtraData(RangeTblEntry *rte, CitusRTEKind rteKind, - char *fragmentSchemaName, char *fragmentTableName, - List *tableIdList) +SetRangeTblExtraData(RangeTblEntry *rte, CitusRTEKind rteKind, char *fragmentSchemaName, + char *fragmentTableName, List *tableIdList, List *funcColumnNames, + List *funcColumnTypes, List *funcColumnTypeMods, + List *funcCollations) { Assert(rte->eref); @@ -127,6 +128,7 @@ SetRangeTblExtraData(RangeTblEntry *rte, CitusRTEKind rteKind, /* create function expression to store our faux arguments in */ FuncExpr *fauxFuncExpr = makeNode(FuncExpr); fauxFuncExpr->funcid = CitusExtraDataContainerFuncId(); + fauxFuncExpr->funcresulttype = RECORDOID; fauxFuncExpr->funcretset = true; fauxFuncExpr->location = -1; fauxFuncExpr->args = list_make4(rteKindData, fragmentSchemaData, @@ -137,6 +139,10 @@ SetRangeTblExtraData(RangeTblEntry *rte, CitusRTEKind rteKind, /* set the column count to pass ruleutils checks, not used elsewhere */ fauxFunction->funccolcount = list_length(rte->eref->colnames); + fauxFunction->funccolnames = funcColumnNames; + fauxFunction->funccoltypes = funcColumnTypes; + fauxFunction->funccoltypmods = funcColumnTypeMods; + fauxFunction->funccolcollations = funcCollations; rte->rtekind = RTE_FUNCTION; rte->functions = list_make1(fauxFunction); @@ -281,7 +287,7 @@ ModifyRangeTblExtraData(RangeTblEntry *rte, CitusRTEKind rteKind, SetRangeTblExtraData(rte, rteKind, fragmentSchemaName, fragmentTableName, - tableIdList); + tableIdList, NIL, NIL, NIL, NIL); } diff --git a/src/include/distributed/citus_custom_scan.h b/src/include/distributed/citus_custom_scan.h index d9d6a49e8..12398cf4f 100644 --- a/src/include/distributed/citus_custom_scan.h +++ b/src/include/distributed/citus_custom_scan.h @@ -15,10 +15,13 @@ #include "executor/execdesc.h" #include "nodes/plannodes.h" - typedef struct CitusScanState { CustomScanState customScanState; /* underlying custom scan node */ + + /* function that gets called before postgres starts its execution */ + void (*PreExecScan)(struct CitusScanState *scanState); + DistributedPlan *distributedPlan; /* distributed execution plan */ MultiExecutorType executorType; /* distributed executor type */ bool finishedRemoteScan; /* flag to check if remote scan is finished */ diff --git a/src/include/distributed/citus_nodefuncs.h b/src/include/distributed/citus_nodefuncs.h index 8210856d5..e4b811d95 100644 --- a/src/include/distributed/citus_nodefuncs.h +++ b/src/include/distributed/citus_nodefuncs.h @@ -18,7 +18,9 @@ /* citus_nodefuncs.c */ extern void SetRangeTblExtraData(RangeTblEntry *rte, CitusRTEKind rteKind, char *fragmentSchemaName, char *fragmentTableName, - List *tableIdList); + List *tableIdList, List *funcColumnNames, + List *funcColumnTypes, List *funcColumnTypeMods, + List *funcCollations); extern void ModifyRangeTblExtraData(RangeTblEntry *rte, CitusRTEKind rteKind, char *fragmentSchemaName, char *fragmentTableName, List *tableIdList); diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index 518191d67..2a1370150 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -161,6 +161,26 @@ typedef struct DistributedPlanningContext } DistributedPlanningContext; +/* + * CitusCustomScanPath is injected into the planner during the master query planning phase + * of the logical planner. + * We call out to the standard planner to plan the master query part for the output of the + * logical planner. This makes it easier to implement new sql features into the logical + * planner by not having to manually implement the plan creation for the query on the + * master. + */ +typedef struct CitusCustomScanPath +{ + CustomPath custom_path; + + /* + * Custom scan node computed by the citus planner that will produce the tuples for the + * path we are injecting during the planning of the master query + */ + CustomScan *remoteScan; +} CitusCustomScanPath; + + extern PlannedStmt * distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams); extern List * ExtractRangeTableEntryList(Query *query); diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index 6099b8356..d1167b243 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -72,6 +72,7 @@ extern int ExecutorLevel; extern void CitusExecutorStart(QueryDesc *queryDesc, int eflags); extern void CitusExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count, bool execute_once); +extern void AdaptiveExecutorPreExecutorRun(CitusScanState *scanState); extern TupleTableSlot * AdaptiveExecutor(CitusScanState *scanState); extern uint64 ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList, TupleDesc tupleDescriptor, @@ -86,6 +87,7 @@ extern uint64 ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskL extern void ExecuteUtilityTaskListWithoutResults(List *taskList); extern uint64 ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int targetPoolSize); +extern bool IsCitusCustomState(PlanState *planState); extern TupleTableSlot * CitusExecScan(CustomScanState *node); extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState); extern void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob); diff --git a/src/include/distributed/multi_logical_planner.h b/src/include/distributed/multi_logical_planner.h index 22b45ecd8..544023aa4 100644 --- a/src/include/distributed/multi_logical_planner.h +++ b/src/include/distributed/multi_logical_planner.h @@ -192,6 +192,7 @@ extern bool TargetListOnPartitionColumn(Query *query, List *targetEntryList); extern bool FindNodeCheckInRangeTableList(List *rtable, bool (*check)(Node *)); extern bool IsDistributedTableRTE(Node *node); extern bool QueryContainsDistributedTableRTE(Query *query); +extern bool IsCitusExtraDataContainerRelation(RangeTblEntry *rte); extern bool ContainsReadIntermediateResultFunction(Node *node); extern bool ContainsReadIntermediateResultArrayFunction(Node *node); extern char * FindIntermediateResultIdIfExists(RangeTblEntry *rte); diff --git a/src/include/distributed/multi_master_planner.h b/src/include/distributed/multi_master_planner.h index 67ac64201..2b1db746b 100644 --- a/src/include/distributed/multi_master_planner.h +++ b/src/include/distributed/multi_master_planner.h @@ -18,13 +18,23 @@ #include "nodes/parsenodes.h" #include "nodes/plannodes.h" +#if PG_VERSION_NUM >= 120000 +#include "nodes/pathnodes.h" +#else +#include "nodes/relation.h" +#endif + /* Function declarations for building local plans on the master node */ struct DistributedPlan; struct CustomScan; +extern Path * CreateCitusCustomScanPath(PlannerInfo *root, RelOptInfo *relOptInfo, + Index restrictionIndex, RangeTblEntry *rte, + CustomScan *remoteScan); extern PlannedStmt * MasterNodeSelectPlan(struct DistributedPlan *distributedPlan, struct CustomScan *dataScan); extern Unique * make_unique_from_sortclauses(Plan *lefttree, List *distinctList); - +extern bool ReplaceCitusExtraDataContainer; +extern CustomScan *ReplaceCitusExtraDataContainerWithCustomScan; #endif /* MULTI_MASTER_PLANNER_H */ diff --git a/src/test/regress/bin/normalize.sed b/src/test/regress/bin/normalize.sed index 55ad1a72c..8d792f689 100644 --- a/src/test/regress/bin/normalize.sed +++ b/src/test/regress/bin/normalize.sed @@ -1,6 +1,6 @@ # Rules to normalize test outputs. Our custom diff tool passes test output -# of tests in normalized_tests.lst through the substitution rules in this file -# before doing the actual comparison. +# of tests through the substitution rules in this file before doing the +# actual comparison. # # An example of when this is useful is when an error happens on a different # port number, or a different worker shard, or a different placement, etc. @@ -92,3 +92,6 @@ s/read_intermediate_result\('insert_select_[0-9]+_/read_intermediate_result('ins # ignore job id in repartitioned insert/select s/repartitioned_results_[0-9]+/repartitioned_results_xxxxx/g +# ignore first parameter for citus_extradata_container due to differences between pg11 and pg12 +# can be removed when we remove PG_VERSION_NUM >= 120000 +s/pg_catalog.citus_extradata_container\([0-9]+/pg_catalog.citus_extradata_container\(XXX/g diff --git a/src/test/regress/expected/adaptive_executor_repartition.out b/src/test/regress/expected/adaptive_executor_repartition.out index f0f22eaa6..f89ead3fc 100644 --- a/src/test/regress/expected/adaptive_executor_repartition.out +++ b/src/test/regress/expected/adaptive_executor_repartition.out @@ -80,16 +80,17 @@ SELECT create_reference_table('ref_table'); (1 row) -- single hash repartition after bcast joins -EXPLAIN SELECT +EXPLAIN (COSTS OFF) +SELECT count(*) FROM ref_table r1, single_hash_repartition_second t1, single_hash_repartition_first t2 WHERE r1.id = t1.id AND t2.sum = t1.id; - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- - Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) + Aggregate + -> Custom Scan (Citus Adaptive) Task Count: 4 Tasks Shown: None, not supported for re-partition queries -> MapMergeJob @@ -98,16 +99,17 @@ WHERE (7 rows) -- a more complicated join order, first colocated join, later single hash repartition join -EXPLAIN SELECT +EXPLAIN (COSTS OFF) +SELECT count(*) FROM single_hash_repartition_first t1, single_hash_repartition_first t2, single_hash_repartition_second t3 WHERE t1.id = t2.id AND t1.sum = t3.id; - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- - Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) + Aggregate + -> Custom Scan (Citus Adaptive) Task Count: 4 Tasks Shown: None, not supported for re-partition queries -> MapMergeJob diff --git a/src/test/regress/expected/ch_bench_having.out b/src/test/regress/expected/ch_bench_having.out index e266e37e8..3ad09bd9e 100644 --- a/src/test/regress/expected/ch_bench_having.out +++ b/src/test/regress/expected/ch_bench_having.out @@ -22,12 +22,12 @@ order by s_i_id; QUERY PLAN --------------------------------------------------------------------- Sort - Sort Key: s_i_id + Sort Key: remote_scan.s_i_id InitPlan 1 (returns $0) -> Function Scan on read_intermediate_result intermediate_result -> HashAggregate - Group Key: s_i_id - Filter: ((pg_catalog.sum(worker_column_3))::bigint > $0) + Group Key: remote_scan.s_i_id + Filter: ((pg_catalog.sum(remote_scan.worker_column_3))::bigint > $0) -> Custom Scan (Citus Adaptive) -> Distributed Subplan XXX_1 -> Aggregate @@ -68,12 +68,12 @@ order by s_i_id; QUERY PLAN --------------------------------------------------------------------- Sort - Sort Key: s_i_id + Sort Key: remote_scan.s_i_id InitPlan 1 (returns $0) -> Function Scan on read_intermediate_result intermediate_result -> HashAggregate - Group Key: s_i_id - Filter: ((pg_catalog.sum(worker_column_3))::bigint > $0) + Group Key: remote_scan.s_i_id + Filter: ((pg_catalog.sum(remote_scan.worker_column_3))::bigint > $0) -> Custom Scan (Citus Adaptive) -> Distributed Subplan XXX_1 -> Aggregate @@ -101,8 +101,8 @@ having sum(s_order_cnt) > (select max(s_order_cnt) - 3 as having_query from st QUERY PLAN --------------------------------------------------------------------- HashAggregate - Group Key: s_i_id - Filter: ((pg_catalog.sum(worker_column_3))::bigint > $0) + Group Key: remote_scan.s_i_id + Filter: ((pg_catalog.sum(remote_scan.worker_column_3))::bigint > $0) InitPlan 1 (returns $0) -> Function Scan on read_intermediate_result intermediate_result -> Custom Scan (Citus Adaptive) @@ -124,51 +124,57 @@ having sum(s_order_cnt) > (select max(s_order_cnt) - 3 as having_query from st -> Seq Scan on stock_1640000 stock (22 rows) -explain select s_i_id, sum(s_order_cnt) as ordercount +explain (costs false) +select s_i_id, sum(s_order_cnt) as ordercount from stock s group by s_i_id having (select true) order by s_i_id; - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- - Sort (cost=0.00..0.00 rows=0 width=0) + Sort Sort Key: remote_scan.s_i_id InitPlan 1 (returns $0) - -> Result (cost=0.00..0.01 rows=1 width=1) - -> HashAggregate (cost=0.00..0.00 rows=0 width=0) + -> Result + -> HashAggregate Group Key: remote_scan.s_i_id - Filter: $0 - -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) + -> Result + One-Time Filter: $0 + -> Custom Scan (Citus Adaptive) + Filter: $0 + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> HashAggregate + Group Key: s.s_i_id + -> Seq Scan on stock_1640000 s +(17 rows) + +explain (costs false) +select s_i_id, sum(s_order_cnt) as ordercount +from stock s +group by s_i_id +having (select true); + QUERY PLAN +--------------------------------------------------------------------- + HashAggregate + Group Key: remote_scan.s_i_id + InitPlan 1 (returns $0) + -> Result + -> Result + One-Time Filter: $0 + -> Custom Scan (Citus Adaptive) + Filter: $0 Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=localhost port=xxxxx dbname=regression - -> HashAggregate (cost=40.60..42.60 rows=200 width=12) + -> HashAggregate Group Key: s.s_i_id - -> Seq Scan on stock_1640000 s (cost=0.00..30.40 rows=2040 width=8) + -> Seq Scan on stock_1640000 s (15 rows) -explain select s_i_id, sum(s_order_cnt) as ordercount -from stock s -group by s_i_id -having (select true); - QUERY PLAN ---------------------------------------------------------------------- - HashAggregate (cost=0.00..0.00 rows=0 width=0) - Group Key: remote_scan.s_i_id - Filter: $0 - InitPlan 1 (returns $0) - -> Result (cost=0.00..0.01 rows=1 width=1) - -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) - Task Count: 4 - Tasks Shown: One of 4 - -> Task - Node: host=localhost port=xxxxx dbname=regression - -> HashAggregate (cost=40.60..42.60 rows=200 width=12) - Group Key: s.s_i_id - -> Seq Scan on stock_1640000 s (cost=0.00..30.40 rows=2040 width=8) -(13 rows) - select s_i_id, sum(s_order_cnt) as ordercount from stock where s_order_cnt > (select sum(s_order_cnt) * .005 as where_query from stock) diff --git a/src/test/regress/expected/ch_bench_having_mx.out b/src/test/regress/expected/ch_bench_having_mx.out index 85b109ddc..71478ae6f 100644 --- a/src/test/regress/expected/ch_bench_having_mx.out +++ b/src/test/regress/expected/ch_bench_having_mx.out @@ -27,12 +27,12 @@ order by s_i_id; QUERY PLAN --------------------------------------------------------------------- Sort - Sort Key: s_i_id + Sort Key: remote_scan.s_i_id InitPlan 1 (returns $0) -> Function Scan on read_intermediate_result intermediate_result -> HashAggregate - Group Key: s_i_id - Filter: ((pg_catalog.sum(worker_column_3))::bigint > $0) + Group Key: remote_scan.s_i_id + Filter: ((pg_catalog.sum(remote_scan.worker_column_3))::bigint > $0) -> Custom Scan (Citus Adaptive) -> Distributed Subplan XXX_1 -> Aggregate @@ -73,12 +73,12 @@ order by s_i_id; QUERY PLAN --------------------------------------------------------------------- Sort - Sort Key: s_i_id + Sort Key: remote_scan.s_i_id InitPlan 1 (returns $0) -> Function Scan on read_intermediate_result intermediate_result -> HashAggregate - Group Key: s_i_id - Filter: ((pg_catalog.sum(worker_column_3))::bigint > $0) + Group Key: remote_scan.s_i_id + Filter: ((pg_catalog.sum(remote_scan.worker_column_3))::bigint > $0) -> Custom Scan (Citus Adaptive) -> Distributed Subplan XXX_1 -> Aggregate @@ -106,8 +106,8 @@ having sum(s_order_cnt) > (select max(s_order_cnt) - 3 as having_query from st QUERY PLAN --------------------------------------------------------------------- HashAggregate - Group Key: s_i_id - Filter: ((pg_catalog.sum(worker_column_3))::bigint > $0) + Group Key: remote_scan.s_i_id + Filter: ((pg_catalog.sum(remote_scan.worker_column_3))::bigint > $0) InitPlan 1 (returns $0) -> Function Scan on read_intermediate_result intermediate_result -> Custom Scan (Citus Adaptive) @@ -134,16 +134,41 @@ from stock s group by s_i_id having (select true) order by s_i_id; - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- - Sort (cost=0.00..0.00 rows=0 width=0) + Sort (cost=510.65..511.15 rows=200 width=12) Sort Key: remote_scan.s_i_id InitPlan 1 (returns $0) -> Result (cost=0.00..0.01 rows=1 width=1) - -> HashAggregate (cost=0.00..0.00 rows=0 width=0) + -> HashAggregate (cost=500.00..503.00 rows=200 width=12) Group Key: remote_scan.s_i_id - Filter: $0 - -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) + -> Result (cost=0.00..0.00 rows=100000 width=12) + One-Time Filter: $0 + -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=12) + Filter: $0 + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> HashAggregate (cost=40.60..42.60 rows=200 width=12) + Group Key: s.s_i_id + -> Seq Scan on stock_1640000 s (cost=0.00..30.40 rows=2040 width=8) +(17 rows) + +explain select s_i_id, sum(s_order_cnt) as ordercount +from stock s +group by s_i_id +having (select true); + QUERY PLAN +--------------------------------------------------------------------- + HashAggregate (cost=500.01..503.01 rows=200 width=12) + Group Key: remote_scan.s_i_id + InitPlan 1 (returns $0) + -> Result (cost=0.00..0.01 rows=1 width=1) + -> Result (cost=0.00..0.00 rows=100000 width=12) + One-Time Filter: $0 + -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=12) + Filter: $0 Task Count: 4 Tasks Shown: One of 4 -> Task @@ -153,27 +178,6 @@ order by s_i_id; -> Seq Scan on stock_1640000 s (cost=0.00..30.40 rows=2040 width=8) (15 rows) -explain select s_i_id, sum(s_order_cnt) as ordercount -from stock s -group by s_i_id -having (select true); - QUERY PLAN ---------------------------------------------------------------------- - HashAggregate (cost=0.00..0.00 rows=0 width=0) - Group Key: remote_scan.s_i_id - Filter: $0 - InitPlan 1 (returns $0) - -> Result (cost=0.00..0.01 rows=1 width=1) - -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) - Task Count: 4 - Tasks Shown: One of 4 - -> Task - Node: host=localhost port=xxxxx dbname=regression - -> HashAggregate (cost=40.60..42.60 rows=200 width=12) - Group Key: s.s_i_id - -> Seq Scan on stock_1640000 s (cost=0.00..30.40 rows=2040 width=8) -(13 rows) - select s_i_id, sum(s_order_cnt) as ordercount from stock where s_order_cnt > (select sum(s_order_cnt) * .005 as where_query from stock) diff --git a/src/test/regress/expected/cte_inline.out b/src/test/regress/expected/cte_inline.out index 7c23a3701..2af1af3b1 100644 --- a/src/test/regress/expected/cte_inline.out +++ b/src/test/regress/expected/cte_inline.out @@ -988,19 +988,19 @@ GROUP BY key HAVING (count(*) > (SELECT max FROM cte_1)) -ORDER BY 2 DESC +ORDER BY 2 DESC, 1 DESC LIMIT 5; DEBUG: CTE cte_1 is going to be inlined via distributed planning DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Router planner cannot handle multi-shard select queries DEBUG: generating subplan XXX_1 for subquery SELECT max(key) AS max FROM cte_inline.test_table -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT key, count(*) AS count FROM cte_inline.test_table GROUP BY key HAVING (count(*) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max integer)) cte_1)) ORDER BY (count(*)) DESC LIMIT 5 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT key, count(*) AS count FROM cte_inline.test_table GROUP BY key HAVING (count(*) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max integer)) cte_1)) ORDER BY (count(*)) DESC, key DESC LIMIT 5 DEBUG: Router planner cannot handle multi-shard select queries key | count --------------------------------------------------------------------- + 8 | 40 5 | 40 2 | 40 - 8 | 40 (3 rows) -- cte used in ORDER BY just works fine diff --git a/src/test/regress/expected/cte_inline_0.out b/src/test/regress/expected/cte_inline_0.out index 75fea39fb..f5d011bd0 100644 --- a/src/test/regress/expected/cte_inline_0.out +++ b/src/test/regress/expected/cte_inline_0.out @@ -854,21 +854,21 @@ GROUP BY key HAVING (count(*) > (SELECT max FROM cte_1)) -ORDER BY 2 DESC +ORDER BY 2 DESC, 1 DESC LIMIT 5; DEBUG: CTE cte_1 is going to be inlined via distributed planning DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Router planner cannot handle multi-shard select queries DEBUG: generating subplan XXX_1 for subquery SELECT max(key) AS max FROM cte_inline.test_table -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT key, count(*) AS count FROM cte_inline.test_table GROUP BY key HAVING (count(*) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max integer)) cte_1)) ORDER BY (count(*)) DESC LIMIT 5 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT key, count(*) AS count FROM cte_inline.test_table GROUP BY key HAVING (count(*) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max integer)) cte_1)) ORDER BY (count(*)) DESC, key DESC LIMIT 5 DEBUG: Router planner cannot handle multi-shard select queries key | count --------------------------------------------------------------------- 0 | 44 9 | 40 - 5 | 40 8 | 40 6 | 40 + 5 | 40 (5 rows) -- cte used in ORDER BY just works fine diff --git a/src/test/regress/expected/having_subquery.out b/src/test/regress/expected/having_subquery.out new file mode 100644 index 000000000..5b660e12a --- /dev/null +++ b/src/test/regress/expected/having_subquery.out @@ -0,0 +1,60 @@ +-- Testing a having clause that could have been a where clause between a distributed table +-- and a reference table. This query was the cause for intermediate results not being +-- available during the replace of the planner for the master query with the standard +-- planner. +-- Since the having clause could have been a where clause the having clause on the grouping +-- on the coordinator is replaced with a Result node containing a One-time filter if the +-- having qual (one-time filter works because the query doesn't change with the tuples +-- returned from below). +SELECT count(*), + o_orderstatus +FROM orders +GROUP BY 2 +HAVING ( + SELECT count(*) + FROM customer + ) > 0; + count | o_orderstatus +--------------------------------------------------------------------- + 1461 | O + 75 | P + 1449 | F +(3 rows) + +-- lets pin the plan in the test as well +EXPLAIN (COSTS OFF) +SELECT count(*), + o_orderstatus +FROM orders +GROUP BY 2 +HAVING ( + SELECT count(*) + FROM customer + ) > 0; + QUERY PLAN +--------------------------------------------------------------------- + HashAggregate + Group Key: remote_scan.o_orderstatus + InitPlan 1 (returns $0) + -> Function Scan on read_intermediate_result intermediate_result + -> Result + One-Time Filter: ($0 > 0) + -> Custom Scan (Citus Adaptive) + Filter: ($0 > 0) + -> Distributed Subplan XXX_1 + -> Custom Scan (Citus Adaptive) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Aggregate + -> Seq Scan on customer_360001 customer + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> HashAggregate + Group Key: orders.o_orderstatus + -> Seq Scan on orders_290002 orders +(23 rows) + diff --git a/src/test/regress/expected/insert_select_repartition.out b/src/test/regress/expected/insert_select_repartition.out index b483b3d7f..1e3b95a7b 100644 --- a/src/test/regress/expected/insert_select_repartition.out +++ b/src/test/regress/expected/insert_select_repartition.out @@ -563,7 +563,7 @@ EXPLAIN INSERT INTO target_table SELECT a, max(b) FROM source_table GROUP BY a; --------------------------------------------------------------------- Custom Scan (Citus INSERT ... SELECT) (cost=0.00..0.00 rows=0 width=0) INSERT/SELECT method: repartition - -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=8) Task Count: 4 Tasks Shown: One of 4 -> Task diff --git a/src/test/regress/expected/intermediate_result_pruning.out b/src/test/regress/expected/intermediate_result_pruning.out index 697d2ce21..49c9955a4 100644 --- a/src/test/regress/expected/intermediate_result_pruning.out +++ b/src/test/regress/expected/intermediate_result_pruning.out @@ -747,8 +747,9 @@ DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx ROLLBACK; -- test with INSERT SELECT via coordinator -- INSERT .. SELECT via coordinator that doesn't have any intermediate results +-- We use offset 1 to make sure the result needs to be pulled to the coordinator, offset 0 would be optimized away INSERT INTO table_1 - SELECT * FROM table_2 OFFSET 0; + SELECT * FROM table_2 OFFSET 1; DEBUG: OFFSET clauses are not allowed in distributed INSERT ... SELECT queries DEBUG: Collecting INSERT ... SELECT results on coordinator -- INSERT .. SELECT via coordinator which has intermediate result, diff --git a/src/test/regress/expected/multi_explain.out b/src/test/regress/expected/multi_explain.out index ad633c38f..be4497129 100644 --- a/src/test/regress/expected/multi_explain.out +++ b/src/test/regress/expected/multi_explain.out @@ -1147,7 +1147,7 @@ Aggregate RESET citus.task_executor_type; PREPARE router_executor_query AS SELECT l_quantity FROM lineitem WHERE l_orderkey = 5; EXPLAIN EXECUTE router_executor_query; -Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) +Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=18) Task Count: 1 Tasks Shown: All -> Task @@ -1170,7 +1170,7 @@ Aggregate -- at least make sure to fail without crashing PREPARE router_executor_query_param(int) AS SELECT l_quantity FROM lineitem WHERE l_orderkey = $1; EXPLAIN EXECUTE router_executor_query_param(5); -Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) +Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=18) Task Count: 1 Tasks Shown: All -> Task diff --git a/src/test/regress/expected/multi_join_order_additional.out b/src/test/regress/expected/multi_join_order_additional.out index 2a9662b40..feb7d7576 100644 --- a/src/test/regress/expected/multi_join_order_additional.out +++ b/src/test/regress/expected/multi_join_order_additional.out @@ -70,15 +70,16 @@ SELECT create_distributed_table('customer_hash', 'c_custkey'); SET client_min_messages TO DEBUG2; -- The following query checks that we can correctly handle self-joins -EXPLAIN SELECT l1.l_quantity FROM lineitem l1, lineitem l2 +EXPLAIN (COSTS OFF) +SELECT l1.l_quantity FROM lineitem l1, lineitem l2 WHERE l1.l_orderkey = l2.l_orderkey AND l1.l_quantity > 5; DEBUG: Router planner does not support append-partitioned tables. LOG: join order: [ "lineitem" ][ local partition join "lineitem" ] DEBUG: join prunable for intervals [1,5986] and [8997,14947] DEBUG: join prunable for intervals [8997,14947] and [1,5986] - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- - Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) + Custom Scan (Citus Task-Tracker) explain statements for distributed queries are not enabled (2 rows) @@ -86,92 +87,101 @@ SET client_min_messages TO LOG; -- The following queries check that we correctly handle joins and OR clauses. In -- particular, these queries check that we factorize out OR clauses if possible, -- and that we default to a cartesian product otherwise. -EXPLAIN SELECT count(*) FROM lineitem, orders +EXPLAIN (COSTS OFF) +SELECT count(*) FROM lineitem, orders WHERE (l_orderkey = o_orderkey AND l_quantity > 5) OR (l_orderkey = o_orderkey AND l_quantity < 10); LOG: join order: [ "lineitem" ][ local partition join "orders" ] - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- - Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) + Aggregate + -> Custom Scan (Citus Task-Tracker) explain statements for distributed queries are not enabled (3 rows) -EXPLAIN SELECT l_quantity FROM lineitem, orders +EXPLAIN (COSTS OFF) +SELECT l_quantity FROM lineitem, orders WHERE (l_orderkey = o_orderkey OR l_quantity > 5); ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator -EXPLAIN SELECT count(*) FROM orders, lineitem_hash +EXPLAIN (COSTS OFF) +SELECT count(*) FROM orders, lineitem_hash WHERE o_orderkey = l_orderkey; LOG: join order: [ "orders" ][ single range partition join "lineitem_hash" ] - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- - Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) + Aggregate + -> Custom Scan (Citus Task-Tracker) explain statements for distributed queries are not enabled (3 rows) -- Verify we handle local joins between two hash-partitioned tables. -EXPLAIN SELECT count(*) FROM orders_hash, lineitem_hash +EXPLAIN (COSTS OFF) +SELECT count(*) FROM orders_hash, lineitem_hash WHERE o_orderkey = l_orderkey; LOG: join order: [ "orders_hash" ][ local partition join "lineitem_hash" ] - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- - Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) + Aggregate + -> Custom Scan (Citus Task-Tracker) explain statements for distributed queries are not enabled (3 rows) -- Validate that we can handle broadcast joins with hash-partitioned tables. -EXPLAIN SELECT count(*) FROM customer_hash, nation +EXPLAIN (COSTS OFF) +SELECT count(*) FROM customer_hash, nation WHERE c_nationkey = n_nationkey; LOG: join order: [ "customer_hash" ][ reference join "nation" ] - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- - Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) + Aggregate + -> Custom Scan (Citus Task-Tracker) explain statements for distributed queries are not enabled (3 rows) -- Validate that we don't use a single-partition join method for a hash -- re-partitioned table, thus preventing a partition of just the customer table. -EXPLAIN SELECT count(*) FROM orders, lineitem, customer_append +EXPLAIN (COSTS OFF) +SELECT count(*) FROM orders, lineitem, customer_append WHERE o_custkey = l_partkey AND o_custkey = c_nationkey; LOG: join order: [ "orders" ][ dual partition join "lineitem" ][ dual partition join "customer_append" ] - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- - Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) + Aggregate + -> Custom Scan (Citus Task-Tracker) explain statements for distributed queries are not enabled (3 rows) -- Validate that we don't chose a single-partition join method with a -- hash-partitioned base table -EXPLAIN SELECT count(*) FROM orders, customer_hash +EXPLAIN (COSTS OFF) +SELECT count(*) FROM orders, customer_hash WHERE c_custkey = o_custkey; LOG: join order: [ "orders" ][ dual partition join "customer_hash" ] - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- - Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) + Aggregate + -> Custom Scan (Citus Task-Tracker) explain statements for distributed queries are not enabled (3 rows) -- Validate that we can re-partition a hash partitioned table to join with a -- range partitioned one. -EXPLAIN SELECT count(*) FROM orders_hash, customer_append +EXPLAIN (COSTS OFF) +SELECT count(*) FROM orders_hash, customer_append WHERE c_custkey = o_custkey; LOG: join order: [ "orders_hash" ][ single range partition join "customer_append" ] - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- - Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) + Aggregate + -> Custom Scan (Citus Task-Tracker) explain statements for distributed queries are not enabled (3 rows) -- Validate a 4 way join that could be done locally is planned as such by the logical -- planner. It used to be planned as a repartition join due to no 1 table being directly -- joined to all other tables, but instead follows a chain. -EXPLAIN SELECT count(*) +EXPLAIN (COSTS OFF) +SELECT count(*) FROM ( SELECT users_table.user_id FROM users_table @@ -185,10 +195,10 @@ JOIN ( WHERE event_type = 5 ) AS some_users ON (some_users.user_id = bar.user_id); LOG: join order: [ "users_table" ][ local partition join "events_table" ][ local partition join "users_table" ][ local partition join "events_table" ] - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- - Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) + Aggregate + -> Custom Scan (Citus Task-Tracker) explain statements for distributed queries are not enabled (3 rows) diff --git a/src/test/regress/expected/multi_join_order_tpch_repartition.out b/src/test/regress/expected/multi_join_order_tpch_repartition.out index 24beca674..a0b7a72d4 100644 --- a/src/test/regress/expected/multi_join_order_tpch_repartition.out +++ b/src/test/regress/expected/multi_join_order_tpch_repartition.out @@ -11,7 +11,8 @@ SET client_min_messages TO LOG; -- except that more data has been loaded into customer and part tables. Therefore, -- we will apply different distributed join strategies for these queries. -- Query #6 from the TPC-H decision support benchmark -EXPLAIN SELECT +EXPLAIN (COSTS OFF) +SELECT sum(l_extendedprice * l_discount) as revenue FROM lineitem @@ -21,15 +22,16 @@ WHERE and l_discount between 0.06 - 0.01 and 0.06 + 0.01 and l_quantity < 24; LOG: join order: [ "lineitem" ] - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- - Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) + Aggregate + -> Custom Scan (Citus Task-Tracker) explain statements for distributed queries are not enabled (3 rows) -- Query #3 from the TPC-H decision support benchmark -EXPLAIN SELECT +EXPLAIN (COSTS OFF) +SELECT l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, @@ -54,16 +56,17 @@ ORDER BY LOG: join order: [ "orders" ][ local partition join "lineitem" ][ single range partition join "customer_append" ] QUERY PLAN --------------------------------------------------------------------- - Sort (cost=0.00..0.00 rows=0 width=0) + Sort Sort Key: (sum(remote_scan.revenue)) DESC, remote_scan.o_orderdate - -> HashAggregate (cost=0.00..0.00 rows=0 width=0) + -> HashAggregate Group Key: remote_scan.l_orderkey, remote_scan.o_orderdate, remote_scan.o_shippriority - -> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Task-Tracker) explain statements for distributed queries are not enabled (6 rows) -- Query #10 from the TPC-H decision support benchmark -EXPLAIN SELECT +EXPLAIN (COSTS OFF) +SELECT c_custkey, c_name, sum(l_extendedprice * (1 - l_discount)) as revenue, @@ -97,16 +100,17 @@ ORDER BY LOG: join order: [ "orders" ][ local partition join "lineitem" ][ single range partition join "customer_append" ][ reference join "nation" ] QUERY PLAN --------------------------------------------------------------------- - Sort (cost=0.00..0.00 rows=0 width=0) + Sort Sort Key: (sum(remote_scan.revenue)) DESC - -> HashAggregate (cost=0.00..0.00 rows=0 width=0) + -> HashAggregate Group Key: remote_scan.c_custkey, remote_scan.c_name, remote_scan.c_acctbal, remote_scan.c_phone, remote_scan.n_name, remote_scan.c_address, remote_scan.c_comment - -> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Task-Tracker) explain statements for distributed queries are not enabled (6 rows) -- Query #19 from the TPC-H decision support benchmark (modified) -EXPLAIN SELECT +EXPLAIN (COSTS OFF) +SELECT sum(l_extendedprice* (1 - l_discount)) as revenue FROM lineitem, @@ -136,15 +140,16 @@ WHERE AND l_shipinstruct = 'DELIVER IN PERSON' ); LOG: join order: [ "lineitem" ][ single range partition join "part_append" ] - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- - Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) + Aggregate + -> Custom Scan (Citus Task-Tracker) explain statements for distributed queries are not enabled (3 rows) -- Query to test multiple re-partition jobs in a single query -EXPLAIN SELECT +EXPLAIN (COSTS OFF) +SELECT l_partkey, count(*) FROM lineitem, part_append, orders, customer_append @@ -155,11 +160,11 @@ WHERE GROUP BY l_partkey; LOG: join order: [ "lineitem" ][ local partition join "orders" ][ single range partition join "part_append" ][ single range partition join "customer_append" ] - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- - HashAggregate (cost=0.00..0.00 rows=0 width=0) + HashAggregate Group Key: remote_scan.l_partkey - -> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Task-Tracker) explain statements for distributed queries are not enabled (4 rows) diff --git a/src/test/regress/expected/multi_join_order_tpch_small.out b/src/test/regress/expected/multi_join_order_tpch_small.out index b7ccdaafe..c2e707867 100644 --- a/src/test/regress/expected/multi_join_order_tpch_small.out +++ b/src/test/regress/expected/multi_join_order_tpch_small.out @@ -6,7 +6,8 @@ SET citus.explain_distributed_queries TO off; SET citus.log_multi_join_order TO TRUE; SET client_min_messages TO LOG; -- Query #6 from the TPC-H decision support benchmark -EXPLAIN SELECT +EXPLAIN (COSTS OFF) +SELECT sum(l_extendedprice * l_discount) as revenue FROM lineitem @@ -16,15 +17,16 @@ WHERE and l_discount between 0.06 - 0.01 and 0.06 + 0.01 and l_quantity < 24; LOG: join order: [ "lineitem" ] - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- - Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) + Aggregate + -> Custom Scan (Citus Adaptive) explain statements for distributed queries are not enabled (3 rows) -- Query #3 from the TPC-H decision support benchmark -EXPLAIN SELECT +EXPLAIN (COSTS OFF) +SELECT l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, @@ -49,16 +51,17 @@ ORDER BY LOG: join order: [ "orders" ][ reference join "customer" ][ local partition join "lineitem" ] QUERY PLAN --------------------------------------------------------------------- - Sort (cost=0.00..0.00 rows=0 width=0) + Sort Sort Key: (sum(remote_scan.revenue)) DESC, remote_scan.o_orderdate - -> HashAggregate (cost=0.00..0.00 rows=0 width=0) + -> HashAggregate Group Key: remote_scan.l_orderkey, remote_scan.o_orderdate, remote_scan.o_shippriority - -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Adaptive) explain statements for distributed queries are not enabled (6 rows) -- Query #10 from the TPC-H decision support benchmark -EXPLAIN SELECT +EXPLAIN (COSTS OFF) +SELECT c_custkey, c_name, sum(l_extendedprice * (1 - l_discount)) as revenue, @@ -92,16 +95,17 @@ ORDER BY LOG: join order: [ "orders" ][ reference join "customer" ][ reference join "nation" ][ local partition join "lineitem" ] QUERY PLAN --------------------------------------------------------------------- - Sort (cost=0.00..0.00 rows=0 width=0) + Sort Sort Key: (sum(remote_scan.revenue)) DESC - -> HashAggregate (cost=0.00..0.00 rows=0 width=0) + -> HashAggregate Group Key: remote_scan.c_custkey, remote_scan.c_name, remote_scan.c_acctbal, remote_scan.c_phone, remote_scan.n_name, remote_scan.c_address, remote_scan.c_comment - -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Adaptive) explain statements for distributed queries are not enabled (6 rows) -- Query #19 from the TPC-H decision support benchmark (modified) -EXPLAIN SELECT +EXPLAIN (COSTS OFF) +SELECT sum(l_extendedprice* (1 - l_discount)) as revenue FROM lineitem, @@ -131,10 +135,10 @@ WHERE AND l_shipinstruct = 'DELIVER IN PERSON' ); LOG: join order: [ "lineitem" ][ reference join "part" ] - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- - Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) + Aggregate + -> Custom Scan (Citus Adaptive) explain statements for distributed queries are not enabled (3 rows) diff --git a/src/test/regress/expected/multi_join_pruning.out b/src/test/regress/expected/multi_join_pruning.out index 59160fe10..065c8cfc3 100644 --- a/src/test/regress/expected/multi_join_pruning.out +++ b/src/test/regress/expected/multi_join_pruning.out @@ -68,43 +68,46 @@ DEBUG: Router planner does not support append-partitioned tables. -- different type of columns including varchar, array types, composite types -- etc. This is in response to a bug we had where we were not able to resolve -- correct operator types for some kind of column types. -EXPLAIN SELECT count(*) +EXPLAIN (COSTS OFF) +SELECT count(*) FROM array_partitioned_table table1, array_partitioned_table table2 WHERE table1.array_column = table2.array_column; DEBUG: Router planner does not support append-partitioned tables. DEBUG: join prunable for intervals [{},{AZZXSP27F21T6,AZZXSP27F21T6}] and [{BA1000U2AMO4ZGX,BZZXSP27F21T6},{CA1000U2AMO4ZGX,CZZXSP27F21T6}] DEBUG: join prunable for intervals [{BA1000U2AMO4ZGX,BZZXSP27F21T6},{CA1000U2AMO4ZGX,CZZXSP27F21T6}] and [{},{AZZXSP27F21T6,AZZXSP27F21T6}] - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- - Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) + Aggregate + -> Custom Scan (Citus Adaptive) explain statements for distributed queries are not enabled (3 rows) -EXPLAIN SELECT count(*) +EXPLAIN (COSTS OFF) +SELECT count(*) FROM composite_partitioned_table table1, composite_partitioned_table table2 WHERE table1.composite_column = table2.composite_column; DEBUG: Router planner does not support append-partitioned tables. DEBUG: join prunable for intervals [(a,3,b),(b,4,c)] and [(c,5,d),(d,6,e)] DEBUG: join prunable for intervals [(c,5,d),(d,6,e)] and [(a,3,b),(b,4,c)] - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- - Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) + Aggregate + -> Custom Scan (Citus Adaptive) explain statements for distributed queries are not enabled (3 rows) -- Test that large table joins on partition varchar columns work -EXPLAIN SELECT count(*) +EXPLAIN (COSTS OFF) +SELECT count(*) FROM varchar_partitioned_table table1, varchar_partitioned_table table2 WHERE table1.varchar_column = table2.varchar_column; DEBUG: Router planner does not support append-partitioned tables. DEBUG: join prunable for intervals [AA1000U2AMO4ZGX,AZZXSP27F21T6] and [BA1000U2AMO4ZGX,BZZXSP27F21T6] DEBUG: join prunable for intervals [BA1000U2AMO4ZGX,BZZXSP27F21T6] and [AA1000U2AMO4ZGX,AZZXSP27F21T6] - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- - Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) + Aggregate + -> Custom Scan (Citus Adaptive) explain statements for distributed queries are not enabled (3 rows) diff --git a/src/test/regress/expected/multi_mx_repartition_udt_prepare.out b/src/test/regress/expected/multi_mx_repartition_udt_prepare.out index 143f0a94d..19776411c 100644 --- a/src/test/regress/expected/multi_mx_repartition_udt_prepare.out +++ b/src/test/regress/expected/multi_mx_repartition_udt_prepare.out @@ -163,13 +163,14 @@ SELECT * FROM repartition_udt JOIN repartition_udt_other -- Query that should result in a repartition join on UDT column. SET citus.log_multi_join_order = true; -EXPLAIN SELECT * FROM repartition_udt JOIN repartition_udt_other +EXPLAIN (COSTS OFF) +SELECT * FROM repartition_udt JOIN repartition_udt_other ON repartition_udt.udtcol = repartition_udt_other.udtcol WHERE repartition_udt.pk > 1; LOG: join order: [ "repartition_udt" ][ dual partition join "repartition_udt_other" ] - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- - Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) + Custom Scan (Citus Task-Tracker) Task Count: 4 Tasks Shown: None, not supported for re-partition queries -> MapMergeJob diff --git a/src/test/regress/expected/multi_orderby_limit_pushdown.out b/src/test/regress/expected/multi_orderby_limit_pushdown.out index 860f8501a..9d67c9810 100644 --- a/src/test/regress/expected/multi_orderby_limit_pushdown.out +++ b/src/test/regress/expected/multi_orderby_limit_pushdown.out @@ -28,28 +28,28 @@ LIMIT 1; 1 | 3.2857142857142857 (1 row) -EXPLAIN +EXPLAIN (COSTS OFF) SELECT user_id, avg(value_1) FROM users_table GROUP BY user_id ORDER BY avg(value_1) DESC LIMIT 1; - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- - Limit (cost=0.00..0.00 rows=0 width=0) - -> Sort (cost=0.00..0.00 rows=0 width=0) + Limit + -> Sort Sort Key: remote_scan.avg DESC - -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Adaptive) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=localhost port=xxxxx dbname=regression - -> Limit (cost=1.53..1.53 rows=1 width=36) - -> Sort (cost=1.53..1.53 rows=2 width=36) + -> Limit + -> Sort Sort Key: (avg(value_1)) DESC - -> HashAggregate (cost=1.50..1.52 rows=2 width=36) + -> HashAggregate Group Key: user_id - -> Seq Scan on users_table_1400256 users_table (cost=0.00..1.33 rows=33 width=8) + -> Seq Scan on users_table_1400256 users_table (14 rows) SELECT user_id, avg(value_1) + 1 @@ -96,23 +96,23 @@ ORDER BY 2 DESC; 1 | 10.2857142857142857 (6 rows) -EXPLAIN +EXPLAIN (COSTS OFF) SELECT user_id, avg(value_1) + count(value_2) FROM users_table GROUP BY user_id ORDER BY 2 DESC; - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- - Sort (cost=0.00..0.00 rows=0 width=0) + Sort Sort Key: remote_scan."?column?" DESC - -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Adaptive) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=localhost port=xxxxx dbname=regression - -> HashAggregate (cost=1.58..1.61 rows=2 width=36) + -> HashAggregate Group Key: user_id - -> Seq Scan on users_table_1400256 users_table (cost=0.00..1.33 rows=33 width=12) + -> Seq Scan on users_table_1400256 users_table (10 rows) SELECT user_id, avg(value_1) + count(value_2) diff --git a/src/test/regress/expected/multi_repartition_join_planning.out b/src/test/regress/expected/multi_repartition_join_planning.out index 3f3340469..b3593d43e 100644 --- a/src/test/regress/expected/multi_repartition_join_planning.out +++ b/src/test/regress/expected/multi_repartition_join_planning.out @@ -106,6 +106,7 @@ DETAIL: Creating dependency on merge taskId 11 DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx +DEBUG: master query: SELECT l_partkey, o_orderkey, COALESCE((pg_catalog.sum(count))::bigint, '0'::bigint) AS count FROM pg_catalog.citus_extradata_container(XXX, NULL::cstring(0), NULL::cstring(0), '(i 1 3 2 4)'::cstring(0)) remote_scan(l_partkey integer, o_orderkey bigint, count bigint) GROUP BY l_partkey, o_orderkey ORDER BY l_partkey, o_orderkey DEBUG: completed cleanup query for job 3 DEBUG: completed cleanup query for job 3 DEBUG: completed cleanup query for job 2 @@ -209,6 +210,7 @@ DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx +DEBUG: master query: SELECT l_partkey, o_orderkey, COALESCE((pg_catalog.sum(count))::bigint, '0'::bigint) AS count FROM pg_catalog.citus_extradata_container(XXX, NULL::cstring(0), NULL::cstring(0), '(i 1 2)'::cstring(0)) remote_scan(l_partkey integer, o_orderkey bigint, count bigint) GROUP BY l_partkey, o_orderkey ORDER BY l_partkey, o_orderkey DEBUG: completed cleanup query for job 6 DEBUG: completed cleanup query for job 6 DEBUG: completed cleanup query for job 4 @@ -283,6 +285,7 @@ DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx +DEBUG: master query: SELECT o_orderkey, o_shippriority, COALESCE((pg_catalog.sum(count))::bigint, '0'::bigint) AS count FROM pg_catalog.citus_extradata_container(XXX, NULL::cstring(0), NULL::cstring(0), '(i 1 2)'::cstring(0)) remote_scan(o_orderkey bigint, o_shippriority integer, count bigint) GROUP BY o_orderkey ORDER BY o_orderkey DEBUG: completed cleanup query for job 9 DEBUG: completed cleanup query for job 9 DEBUG: completed cleanup query for job 7 @@ -359,6 +362,7 @@ DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx +DEBUG: master query: SELECT o_orderkey, o_shippriority, COALESCE((pg_catalog.sum(count))::bigint, '0'::bigint) AS count FROM pg_catalog.citus_extradata_container(XXX, NULL::cstring(0), NULL::cstring(0), '(i 1 2)'::cstring(0)) remote_scan(o_orderkey bigint, o_shippriority integer, count bigint) GROUP BY o_orderkey ORDER BY o_orderkey DEBUG: completed cleanup query for job 12 DEBUG: completed cleanup query for job 12 DEBUG: completed cleanup query for job 10 @@ -433,6 +437,7 @@ DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx +DEBUG: master query: SELECT o_orderkey, any_value(any_value) AS any_value FROM pg_catalog.citus_extradata_container(XXX, NULL::cstring(0), NULL::cstring(0), '(i 1 2)'::cstring(0)) remote_scan(o_orderkey bigint, any_value integer, worker_column_3 integer) GROUP BY o_orderkey ORDER BY o_orderkey DEBUG: completed cleanup query for job 15 DEBUG: completed cleanup query for job 15 DEBUG: completed cleanup query for job 13 @@ -517,6 +522,7 @@ DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx +DEBUG: master query: SELECT s_i_id FROM pg_catalog.citus_extradata_container(XXX, NULL::cstring(0), NULL::cstring(0), '(i 1 2)'::cstring(0)) remote_scan(s_i_id integer, worker_column_2 integer, worker_column_3 numeric) DEBUG: completed cleanup query for job 18 DEBUG: completed cleanup query for job 18 DEBUG: completed cleanup query for job 16 diff --git a/src/test/regress/expected/multi_repartition_udt.out b/src/test/regress/expected/multi_repartition_udt.out index a1fc0c27d..03c27da93 100644 --- a/src/test/regress/expected/multi_repartition_udt.out +++ b/src/test/regress/expected/multi_repartition_udt.out @@ -164,13 +164,14 @@ SELECT * FROM repartition_udt JOIN repartition_udt_other -- Query that should result in a repartition join on UDT column. SET citus.task_executor_type = 'task-tracker'; SET citus.log_multi_join_order = true; -EXPLAIN SELECT * FROM repartition_udt JOIN repartition_udt_other +EXPLAIN (COSTS OFF) +SELECT * FROM repartition_udt JOIN repartition_udt_other ON repartition_udt.udtcol = repartition_udt_other.udtcol WHERE repartition_udt.pk > 1; LOG: join order: [ "repartition_udt" ][ dual partition join "repartition_udt_other" ] - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- - Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) + Custom Scan (Citus Task-Tracker) Task Count: 4 Tasks Shown: None, not supported for re-partition queries -> MapMergeJob diff --git a/src/test/regress/expected/multi_select_distinct.out b/src/test/regress/expected/multi_select_distinct.out index 433c420dc..8bb960161 100644 --- a/src/test/regress/expected/multi_select_distinct.out +++ b/src/test/regress/expected/multi_select_distinct.out @@ -234,24 +234,22 @@ EXPLAIN (COSTS FALSE) GROUP BY 1 HAVING count(*) > 5 ORDER BY 2 DESC, 1; - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- - Sort - Sort Key: remote_scan.count DESC, remote_scan.l_orderkey - -> Unique - -> Sort - Sort Key: remote_scan.count DESC, remote_scan.l_orderkey - -> Custom Scan (Citus Adaptive) - Task Count: 4 - Tasks Shown: One of 4 - -> Task - Node: host=localhost port=xxxxx dbname=regression - -> HashAggregate - Group Key: l_orderkey - Filter: (count(*) > 5) - -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part - Filter: (l_orderkey < 200) -(15 rows) + Unique + -> Sort + Sort Key: remote_scan.count DESC, remote_scan.l_orderkey + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> HashAggregate + Group Key: l_orderkey + Filter: (count(*) > 5) + -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part + Filter: (l_orderkey < 200) +(13 rows) SET enable_hashagg TO on; -- distinct on aggregate of group by columns, we try to check whether we handle @@ -278,10 +276,9 @@ EXPLAIN (COSTS FALSE) ORDER BY 1; QUERY PLAN --------------------------------------------------------------------- - Sort - Sort Key: (COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint)) - -> HashAggregate - Group Key: COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint) + Unique + -> Sort + Sort Key: (COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint)) -> HashAggregate Group Key: remote_scan.worker_column_2, remote_scan.worker_column_3 -> Custom Scan (Citus Adaptive) @@ -292,7 +289,7 @@ EXPLAIN (COSTS FALSE) -> HashAggregate Group Key: l_suppkey, l_linenumber -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part -(14 rows) +(13 rows) -- check the plan if the hash aggreate is disabled. We expect to see sort+unique -- instead of aggregate plan node to handle distinct. @@ -302,26 +299,24 @@ EXPLAIN (COSTS FALSE) FROM lineitem_hash_part GROUP BY l_suppkey, l_linenumber ORDER BY 1; - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- - Sort - Sort Key: (COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint)) - -> Unique - -> Sort - Sort Key: (COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint)) - -> GroupAggregate - Group Key: remote_scan.worker_column_2, remote_scan.worker_column_3 - -> Sort - Sort Key: remote_scan.worker_column_2, remote_scan.worker_column_3 - -> Custom Scan (Citus Adaptive) - Task Count: 4 - Tasks Shown: One of 4 - -> Task - Node: host=localhost port=xxxxx dbname=regression - -> HashAggregate - Group Key: l_suppkey, l_linenumber - -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part -(17 rows) + Unique + -> Sort + Sort Key: (COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint)) + -> GroupAggregate + Group Key: remote_scan.worker_column_2, remote_scan.worker_column_3 + -> Sort + Sort Key: remote_scan.worker_column_2, remote_scan.worker_column_3 + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> HashAggregate + Group Key: l_suppkey, l_linenumber + -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part +(15 rows) SET enable_hashagg TO on; -- Now we have only part of group clause columns in distinct, yet it is still not @@ -355,10 +350,9 @@ EXPLAIN (COSTS FALSE) QUERY PLAN --------------------------------------------------------------------- Limit - -> Sort - Sort Key: remote_scan.l_suppkey - -> HashAggregate - Group Key: remote_scan.l_suppkey, COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint) + -> Unique + -> Sort + Sort Key: remote_scan.l_suppkey, (COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint)) -> HashAggregate Group Key: remote_scan.l_suppkey, remote_scan.worker_column_3 -> Custom Scan (Citus Adaptive) @@ -369,7 +363,7 @@ EXPLAIN (COSTS FALSE) -> HashAggregate Group Key: l_suppkey, l_linenumber -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part -(15 rows) +(14 rows) -- check the plan if the hash aggreate is disabled. Similar to the explain of -- the query above. @@ -380,27 +374,25 @@ EXPLAIN (COSTS FALSE) GROUP BY l_suppkey, l_linenumber ORDER BY 1 LIMIT 10; - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- Limit - -> Sort - Sort Key: remote_scan.l_suppkey - -> Unique - -> Sort - Sort Key: remote_scan.l_suppkey, (COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint)) - -> GroupAggregate - Group Key: remote_scan.l_suppkey, remote_scan.worker_column_3 - -> Sort - Sort Key: remote_scan.l_suppkey, remote_scan.worker_column_3 - -> Custom Scan (Citus Adaptive) - Task Count: 4 - Tasks Shown: One of 4 - -> Task - Node: host=localhost port=xxxxx dbname=regression - -> HashAggregate - Group Key: l_suppkey, l_linenumber - -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part -(18 rows) + -> Unique + -> Sort + Sort Key: remote_scan.l_suppkey, (COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint)) + -> GroupAggregate + Group Key: remote_scan.l_suppkey, remote_scan.worker_column_3 + -> Sort + Sort Key: remote_scan.l_suppkey, remote_scan.worker_column_3 + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> HashAggregate + Group Key: l_suppkey, l_linenumber + -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part +(16 rows) SET enable_hashagg TO on; -- Similar to the above query, not with count but avg. Only difference with the @@ -432,13 +424,12 @@ EXPLAIN (COSTS FALSE) GROUP BY l_suppkey, l_linenumber ORDER BY 1,2 LIMIT 10; - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- Limit - -> Sort - Sort Key: remote_scan.l_suppkey, ((pg_catalog.sum(remote_scan.avg) / pg_catalog.sum(remote_scan.avg_1))) - -> HashAggregate - Group Key: remote_scan.l_suppkey, (pg_catalog.sum(remote_scan.avg) / pg_catalog.sum(remote_scan.avg_1)) + -> Unique + -> Sort + Sort Key: remote_scan.l_suppkey, ((pg_catalog.sum(remote_scan.avg) / pg_catalog.sum(remote_scan.avg_1))) -> HashAggregate Group Key: remote_scan.l_suppkey, remote_scan.worker_column_4 -> Custom Scan (Citus Adaptive) @@ -449,7 +440,7 @@ EXPLAIN (COSTS FALSE) -> HashAggregate Group Key: l_suppkey, l_linenumber -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part -(15 rows) +(14 rows) -- check the plan if the hash aggreate is disabled. This explain errors out due -- to a bug right now, expectation must be corrected after fixing it. @@ -460,27 +451,25 @@ EXPLAIN (COSTS FALSE) GROUP BY l_suppkey, l_linenumber ORDER BY 1,2 LIMIT 10; - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- Limit - -> Sort - Sort Key: remote_scan.l_suppkey, ((pg_catalog.sum(remote_scan.avg) / pg_catalog.sum(remote_scan.avg_1))) - -> Unique - -> Sort - Sort Key: remote_scan.l_suppkey, ((pg_catalog.sum(remote_scan.avg) / pg_catalog.sum(remote_scan.avg_1))) - -> GroupAggregate - Group Key: remote_scan.l_suppkey, remote_scan.worker_column_4 - -> Sort - Sort Key: remote_scan.l_suppkey, remote_scan.worker_column_4 - -> Custom Scan (Citus Adaptive) - Task Count: 4 - Tasks Shown: One of 4 - -> Task - Node: host=localhost port=xxxxx dbname=regression - -> HashAggregate - Group Key: l_suppkey, l_linenumber - -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part -(18 rows) + -> Unique + -> Sort + Sort Key: remote_scan.l_suppkey, ((pg_catalog.sum(remote_scan.avg) / pg_catalog.sum(remote_scan.avg_1))) + -> GroupAggregate + Group Key: remote_scan.l_suppkey, remote_scan.worker_column_4 + -> Sort + Sort Key: remote_scan.l_suppkey, remote_scan.worker_column_4 + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> HashAggregate + Group Key: l_suppkey, l_linenumber + -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part +(16 rows) SET enable_hashagg TO on; -- Similar to the above query but with distinct on @@ -586,13 +575,12 @@ EXPLAIN (COSTS FALSE) GROUP BY l_suppkey, l_linenumber ORDER BY 1 LIMIT 10; - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- Limit - -> Sort - Sort Key: ((sum(remote_scan.avg) / (pg_catalog.sum(remote_scan.avg_1))::double precision)) - -> HashAggregate - Group Key: (sum(remote_scan.avg) / (pg_catalog.sum(remote_scan.avg_1))::double precision) + -> Unique + -> Sort + Sort Key: ((sum(remote_scan.avg) / (pg_catalog.sum(remote_scan.avg_1))::double precision)) -> HashAggregate Group Key: remote_scan.worker_column_3, remote_scan.worker_column_4 -> Custom Scan (Citus Adaptive) @@ -603,7 +591,7 @@ EXPLAIN (COSTS FALSE) -> HashAggregate Group Key: l_suppkey, l_linenumber -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part -(15 rows) +(14 rows) -- check the plan if the hash aggreate is disabled. This explain errors out due -- to a bug right now, expectation must be corrected after fixing it. @@ -614,27 +602,25 @@ EXPLAIN (COSTS FALSE) GROUP BY l_suppkey, l_linenumber ORDER BY 1 LIMIT 10; - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- Limit - -> Sort - Sort Key: ((sum(remote_scan.avg) / (pg_catalog.sum(remote_scan.avg_1))::double precision)) - -> Unique - -> Sort - Sort Key: ((sum(remote_scan.avg) / (pg_catalog.sum(remote_scan.avg_1))::double precision)) - -> GroupAggregate - Group Key: remote_scan.worker_column_3, remote_scan.worker_column_4 - -> Sort - Sort Key: remote_scan.worker_column_3, remote_scan.worker_column_4 - -> Custom Scan (Citus Adaptive) - Task Count: 4 - Tasks Shown: One of 4 - -> Task - Node: host=localhost port=xxxxx dbname=regression - -> HashAggregate - Group Key: l_suppkey, l_linenumber - -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part -(18 rows) + -> Unique + -> Sort + Sort Key: ((sum(remote_scan.avg) / (pg_catalog.sum(remote_scan.avg_1))::double precision)) + -> GroupAggregate + Group Key: remote_scan.worker_column_3, remote_scan.worker_column_4 + -> Sort + Sort Key: remote_scan.worker_column_3, remote_scan.worker_column_4 + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> HashAggregate + Group Key: l_suppkey, l_linenumber + -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part +(16 rows) SET enable_hashagg TO on; -- expression among aggregations. @@ -664,13 +650,12 @@ EXPLAIN (COSTS FALSE) GROUP BY l_suppkey, l_linenumber ORDER BY 1 LIMIT 10; - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- Limit - -> Sort - Sort Key: (((pg_catalog.sum(remote_scan.dis))::bigint + COALESCE((pg_catalog.sum(remote_scan.dis_1))::bigint, '0'::bigint))) - -> HashAggregate - Group Key: ((pg_catalog.sum(remote_scan.dis))::bigint + COALESCE((pg_catalog.sum(remote_scan.dis_1))::bigint, '0'::bigint)) + -> Unique + -> Sort + Sort Key: (((pg_catalog.sum(remote_scan.dis))::bigint + COALESCE((pg_catalog.sum(remote_scan.dis_1))::bigint, '0'::bigint))) -> HashAggregate Group Key: remote_scan.worker_column_3, remote_scan.worker_column_4 -> Custom Scan (Citus Adaptive) @@ -681,7 +666,7 @@ EXPLAIN (COSTS FALSE) -> HashAggregate Group Key: l_suppkey, l_linenumber -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part -(15 rows) +(14 rows) -- check the plan if the hash aggreate is disabled. This explain errors out due -- to a bug right now, expectation must be corrected after fixing it. @@ -692,27 +677,25 @@ EXPLAIN (COSTS FALSE) GROUP BY l_suppkey, l_linenumber ORDER BY 1 LIMIT 10; - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- Limit - -> Sort - Sort Key: (((pg_catalog.sum(remote_scan.dis))::bigint + COALESCE((pg_catalog.sum(remote_scan.dis_1))::bigint, '0'::bigint))) - -> Unique - -> Sort - Sort Key: (((pg_catalog.sum(remote_scan.dis))::bigint + COALESCE((pg_catalog.sum(remote_scan.dis_1))::bigint, '0'::bigint))) - -> GroupAggregate - Group Key: remote_scan.worker_column_3, remote_scan.worker_column_4 - -> Sort - Sort Key: remote_scan.worker_column_3, remote_scan.worker_column_4 - -> Custom Scan (Citus Adaptive) - Task Count: 4 - Tasks Shown: One of 4 - -> Task - Node: host=localhost port=xxxxx dbname=regression - -> HashAggregate - Group Key: l_suppkey, l_linenumber - -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part -(18 rows) + -> Unique + -> Sort + Sort Key: (((pg_catalog.sum(remote_scan.dis))::bigint + COALESCE((pg_catalog.sum(remote_scan.dis_1))::bigint, '0'::bigint))) + -> GroupAggregate + Group Key: remote_scan.worker_column_3, remote_scan.worker_column_4 + -> Sort + Sort Key: remote_scan.worker_column_3, remote_scan.worker_column_4 + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> HashAggregate + Group Key: l_suppkey, l_linenumber + -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part +(16 rows) SET enable_hashagg TO on; -- distinct on all columns, note Group By columns guarantees uniqueness of the @@ -774,27 +757,25 @@ EXPLAIN (COSTS FALSE) GROUP BY 1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16 ORDER BY 1,2 LIMIT 10; - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- Limit - -> Sort - Sort Key: remote_scan.l_orderkey, remote_scan.l_partkey - -> Unique - -> Sort - Sort Key: remote_scan.l_orderkey, remote_scan.l_partkey, remote_scan.l_suppkey, remote_scan.l_linenumber, remote_scan.l_quantity, remote_scan.l_extendedprice, remote_scan.l_discount, remote_scan.l_tax, remote_scan.l_returnflag, remote_scan.l_linestatus, remote_scan.l_shipdate, remote_scan.l_commitdate, remote_scan.l_receiptdate, remote_scan.l_shipinstruct, remote_scan.l_shipmode, remote_scan.l_comment - -> Custom Scan (Citus Adaptive) - Task Count: 4 - Tasks Shown: One of 4 - -> Task - Node: host=localhost port=xxxxx dbname=regression - -> Limit - -> Unique - -> Group - Group Key: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment - -> Sort - Sort Key: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment - -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part -(18 rows) + -> Unique + -> Sort + Sort Key: remote_scan.l_orderkey, remote_scan.l_partkey, remote_scan.l_suppkey, remote_scan.l_linenumber, remote_scan.l_quantity, remote_scan.l_extendedprice, remote_scan.l_discount, remote_scan.l_tax, remote_scan.l_returnflag, remote_scan.l_linestatus, remote_scan.l_shipdate, remote_scan.l_commitdate, remote_scan.l_receiptdate, remote_scan.l_shipinstruct, remote_scan.l_shipmode, remote_scan.l_comment + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Limit + -> Unique + -> Group + Group Key: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment + -> Sort + Sort Key: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment + -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part +(16 rows) SET enable_hashagg TO on; -- distinct on count distinct @@ -864,24 +845,22 @@ EXPLAIN (COSTS FALSE) FROM lineitem_hash_part GROUP BY l_orderkey ORDER BY 1,2; - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- - Sort - Sort Key: remote_scan.count, remote_scan.count_1 - -> Unique - -> Sort - Sort Key: remote_scan.count, remote_scan.count_1 - -> Custom Scan (Citus Adaptive) - Task Count: 4 - Tasks Shown: One of 4 - -> Task - Node: host=localhost port=xxxxx dbname=regression - -> GroupAggregate - Group Key: l_orderkey - -> Sort - Sort Key: l_orderkey - -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part -(15 rows) + Unique + -> Sort + Sort Key: remote_scan.count, remote_scan.count_1 + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> GroupAggregate + Group Key: l_orderkey + -> Sort + Sort Key: l_orderkey + -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part +(13 rows) SET enable_hashagg TO on; -- distinct on aggregation with filter and expression @@ -904,12 +883,11 @@ EXPLAIN (COSTS FALSE) FROM lineitem_hash_part GROUP BY l_suppkey ORDER BY 1; - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- - Sort - Sort Key: (ceil(((COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint) / 2))::double precision)) - -> HashAggregate - Group Key: ceil(((COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint) / 2))::double precision) + Unique + -> Sort + Sort Key: (ceil(((COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint) / 2))::double precision)) -> HashAggregate Group Key: remote_scan.worker_column_2 -> Custom Scan (Citus Adaptive) @@ -920,7 +898,7 @@ EXPLAIN (COSTS FALSE) -> HashAggregate Group Key: l_suppkey -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part -(14 rows) +(13 rows) -- check the plan if the hash aggreate is disabled SET enable_hashagg TO off; @@ -929,26 +907,24 @@ EXPLAIN (COSTS FALSE) FROM lineitem_hash_part GROUP BY l_suppkey ORDER BY 1; - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- - Sort - Sort Key: (ceil(((COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint) / 2))::double precision)) - -> Unique - -> Sort - Sort Key: (ceil(((COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint) / 2))::double precision)) - -> GroupAggregate - Group Key: remote_scan.worker_column_2 - -> Sort - Sort Key: remote_scan.worker_column_2 - -> Custom Scan (Citus Adaptive) - Task Count: 4 - Tasks Shown: One of 4 - -> Task - Node: host=localhost port=xxxxx dbname=regression - -> HashAggregate - Group Key: l_suppkey - -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part -(17 rows) + Unique + -> Sort + Sort Key: (ceil(((COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint) / 2))::double precision)) + -> GroupAggregate + Group Key: remote_scan.worker_column_2 + -> Sort + Sort Key: remote_scan.worker_column_2 + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> HashAggregate + Group Key: l_suppkey + -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part +(15 rows) SET enable_hashagg TO on; -- explain the query to see actual plan with array_agg aggregation. @@ -985,25 +961,23 @@ EXPLAIN (COSTS FALSE) GROUP BY l_orderkey ORDER BY 2 LIMIT 15; - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- Limit - -> Sort - Sort Key: remote_scan.array_length - -> Unique - -> Sort - Sort Key: remote_scan.array_length, remote_scan.array_agg - -> Custom Scan (Citus Adaptive) - Task Count: 4 - Tasks Shown: One of 4 - -> Task - Node: host=localhost port=xxxxx dbname=regression - -> GroupAggregate - Group Key: l_orderkey - -> Sort - Sort Key: l_orderkey - -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part -(16 rows) + -> Unique + -> Sort + Sort Key: remote_scan.array_length, remote_scan.array_agg + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> GroupAggregate + Group Key: l_orderkey + -> Sort + Sort Key: l_orderkey + -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part +(14 rows) SET enable_hashagg TO on; -- distinct on non-partition column with aggregate @@ -1035,22 +1009,23 @@ EXPLAIN (COSTS FALSE) GROUP BY 1 HAVING count(*) > 2 ORDER BY 1; - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- - Sort - Sort Key: remote_scan.l_partkey - -> HashAggregate - Group Key: remote_scan.l_partkey - Filter: (COALESCE((pg_catalog.sum(remote_scan.worker_column_3))::bigint, '0'::bigint) > 2) - -> Custom Scan (Citus Adaptive) - Task Count: 4 - Tasks Shown: One of 4 - -> Task - Node: host=localhost port=xxxxx dbname=regression - -> HashAggregate - Group Key: l_partkey - -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part -(13 rows) + Unique + -> Sort + Sort Key: remote_scan.l_partkey, (COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint)) + -> HashAggregate + Group Key: remote_scan.l_partkey + Filter: (COALESCE((pg_catalog.sum(remote_scan.worker_column_3))::bigint, '0'::bigint) > 2) + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> HashAggregate + Group Key: l_partkey + -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part +(14 rows) -- distinct on non-partition column and avg SELECT DISTINCT l_partkey, avg(l_linenumber) diff --git a/src/test/regress/expected/multi_task_assignment_policy.out b/src/test/regress/expected/multi_task_assignment_policy.out index e44d8ee9a..7bd763543 100644 --- a/src/test/regress/expected/multi_task_assignment_policy.out +++ b/src/test/regress/expected/multi_task_assignment_policy.out @@ -68,53 +68,53 @@ BEGIN; SET client_min_messages TO DEBUG3; -- First test the default greedy task assignment policy SET citus.task_assignment_policy TO 'greedy'; -EXPLAIN SELECT count(*) FROM task_assignment_test_table; +EXPLAIN (COSTS OFF) SELECT count(*) FROM task_assignment_test_table; DEBUG: Router planner does not support append-partitioned tables. DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- - Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) + Aggregate + -> Custom Scan (Citus Adaptive) explain statements for distributed queries are not enabled (3 rows) -EXPLAIN SELECT count(*) FROM task_assignment_test_table; +EXPLAIN (COSTS OFF) SELECT count(*) FROM task_assignment_test_table; DEBUG: Router planner does not support append-partitioned tables. DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- - Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) + Aggregate + -> Custom Scan (Citus Adaptive) explain statements for distributed queries are not enabled (3 rows) -- Next test the first-replica task assignment policy SET citus.task_assignment_policy TO 'first-replica'; -EXPLAIN SELECT count(*) FROM task_assignment_test_table; +EXPLAIN (COSTS OFF) SELECT count(*) FROM task_assignment_test_table; DEBUG: Router planner does not support append-partitioned tables. DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- - Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) + Aggregate + -> Custom Scan (Citus Adaptive) explain statements for distributed queries are not enabled (3 rows) -EXPLAIN SELECT count(*) FROM task_assignment_test_table; +EXPLAIN (COSTS OFF) SELECT count(*) FROM task_assignment_test_table; DEBUG: Router planner does not support append-partitioned tables. DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- - Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) + Aggregate + -> Custom Scan (Citus Adaptive) explain statements for distributed queries are not enabled (3 rows) diff --git a/src/test/regress/expected/relation_access_tracking.out b/src/test/regress/expected/relation_access_tracking.out index de8b50c8e..241716558 100644 --- a/src/test/regress/expected/relation_access_tracking.out +++ b/src/test/regress/expected/relation_access_tracking.out @@ -401,7 +401,8 @@ BEGIN; ROLLBACK; -- coordinator INSERT .. SELECT BEGIN; - INSERT INTO table_2 SELECT * FROM table_1 OFFSET 0; + -- We use offset 1 to make sure the result needs to be pulled to the coordinator, offset 0 would be optimized away + INSERT INTO table_2 SELECT * FROM table_1 OFFSET 1; SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1; table_name | select_access | dml_access | ddl_access --------------------------------------------------------------------- diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 9f62924e1..76b4a112b 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -84,7 +84,7 @@ test: multi_subquery_union multi_subquery_in_where_clause multi_subquery_misc test: multi_agg_distinct multi_agg_approximate_distinct multi_limit_clause_approximate multi_outer_join_reference multi_single_relation_subquery multi_prepare_plsql test: multi_reference_table multi_select_for_update relation_access_tracking test: custom_aggregate_support aggregate_support -test: multi_average_expression multi_working_columns multi_having_pushdown +test: multi_average_expression multi_working_columns multi_having_pushdown having_subquery test: multi_array_agg multi_limit_clause multi_orderby_limit_pushdown test: multi_jsonb_agg multi_jsonb_object_agg multi_json_agg multi_json_object_agg bool_agg ch_bench_having ch_bench_subquery_repartition chbenchmark_all_queries expression_reference_join test: multi_agg_type_conversion multi_count_type_conversion diff --git a/src/test/regress/sql/adaptive_executor_repartition.sql b/src/test/regress/sql/adaptive_executor_repartition.sql index c973014c1..b184aee83 100644 --- a/src/test/regress/sql/adaptive_executor_repartition.sql +++ b/src/test/regress/sql/adaptive_executor_repartition.sql @@ -42,7 +42,8 @@ SELECT create_reference_table('ref_table'); -- single hash repartition after bcast joins -EXPLAIN SELECT +EXPLAIN (COSTS OFF) +SELECT count(*) FROM ref_table r1, single_hash_repartition_second t1, single_hash_repartition_first t2 @@ -50,7 +51,8 @@ WHERE r1.id = t1.id AND t2.sum = t1.id; -- a more complicated join order, first colocated join, later single hash repartition join -EXPLAIN SELECT +EXPLAIN (COSTS OFF) +SELECT count(*) FROM single_hash_repartition_first t1, single_hash_repartition_first t2, single_hash_repartition_second t3 diff --git a/src/test/regress/sql/ch_bench_having.sql b/src/test/regress/sql/ch_bench_having.sql index 07a0dc870..890e6a3a2 100644 --- a/src/test/regress/sql/ch_bench_having.sql +++ b/src/test/regress/sql/ch_bench_having.sql @@ -32,13 +32,15 @@ group by s_i_id having sum(s_order_cnt) > (select max(s_order_cnt) - 3 as having_query from stock); -explain select s_i_id, sum(s_order_cnt) as ordercount +explain (costs false) +select s_i_id, sum(s_order_cnt) as ordercount from stock s group by s_i_id having (select true) order by s_i_id; -explain select s_i_id, sum(s_order_cnt) as ordercount +explain (costs false) +select s_i_id, sum(s_order_cnt) as ordercount from stock s group by s_i_id having (select true); diff --git a/src/test/regress/sql/cte_inline.sql b/src/test/regress/sql/cte_inline.sql index 9af34f344..057fc8d53 100644 --- a/src/test/regress/sql/cte_inline.sql +++ b/src/test/regress/sql/cte_inline.sql @@ -473,7 +473,7 @@ GROUP BY key HAVING (count(*) > (SELECT max FROM cte_1)) -ORDER BY 2 DESC +ORDER BY 2 DESC, 1 DESC LIMIT 5; -- cte used in ORDER BY just works fine diff --git a/src/test/regress/sql/having_subquery.sql b/src/test/regress/sql/having_subquery.sql new file mode 100644 index 000000000..ced678d39 --- /dev/null +++ b/src/test/regress/sql/having_subquery.sql @@ -0,0 +1,27 @@ +-- Testing a having clause that could have been a where clause between a distributed table +-- and a reference table. This query was the cause for intermediate results not being +-- available during the replace of the planner for the master query with the standard +-- planner. +-- Since the having clause could have been a where clause the having clause on the grouping +-- on the coordinator is replaced with a Result node containing a One-time filter if the +-- having qual (one-time filter works because the query doesn't change with the tuples +-- returned from below). +SELECT count(*), + o_orderstatus +FROM orders +GROUP BY 2 +HAVING ( + SELECT count(*) + FROM customer + ) > 0; + +-- lets pin the plan in the test as well +EXPLAIN (COSTS OFF) +SELECT count(*), + o_orderstatus +FROM orders +GROUP BY 2 +HAVING ( + SELECT count(*) + FROM customer + ) > 0; diff --git a/src/test/regress/sql/intermediate_result_pruning.sql b/src/test/regress/sql/intermediate_result_pruning.sql index 8aee49fcc..3300f113f 100644 --- a/src/test/regress/sql/intermediate_result_pruning.sql +++ b/src/test/regress/sql/intermediate_result_pruning.sql @@ -442,8 +442,9 @@ ROLLBACK; -- test with INSERT SELECT via coordinator -- INSERT .. SELECT via coordinator that doesn't have any intermediate results +-- We use offset 1 to make sure the result needs to be pulled to the coordinator, offset 0 would be optimized away INSERT INTO table_1 - SELECT * FROM table_2 OFFSET 0; + SELECT * FROM table_2 OFFSET 1; -- INSERT .. SELECT via coordinator which has intermediate result, -- and can be pruned to a single worker because the final query is on diff --git a/src/test/regress/sql/multi_join_order_additional.sql b/src/test/regress/sql/multi_join_order_additional.sql index 33d610b96..869ae0889 100644 --- a/src/test/regress/sql/multi_join_order_additional.sql +++ b/src/test/regress/sql/multi_join_order_additional.sql @@ -66,7 +66,8 @@ SELECT create_distributed_table('customer_hash', 'c_custkey'); SET client_min_messages TO DEBUG2; -- The following query checks that we can correctly handle self-joins -EXPLAIN SELECT l1.l_quantity FROM lineitem l1, lineitem l2 +EXPLAIN (COSTS OFF) +SELECT l1.l_quantity FROM lineitem l1, lineitem l2 WHERE l1.l_orderkey = l2.l_orderkey AND l1.l_quantity > 5; SET client_min_messages TO LOG; @@ -75,43 +76,52 @@ SET client_min_messages TO LOG; -- particular, these queries check that we factorize out OR clauses if possible, -- and that we default to a cartesian product otherwise. -EXPLAIN SELECT count(*) FROM lineitem, orders +EXPLAIN (COSTS OFF) +SELECT count(*) FROM lineitem, orders WHERE (l_orderkey = o_orderkey AND l_quantity > 5) OR (l_orderkey = o_orderkey AND l_quantity < 10); -EXPLAIN SELECT l_quantity FROM lineitem, orders +EXPLAIN (COSTS OFF) +SELECT l_quantity FROM lineitem, orders WHERE (l_orderkey = o_orderkey OR l_quantity > 5); -EXPLAIN SELECT count(*) FROM orders, lineitem_hash +EXPLAIN (COSTS OFF) +SELECT count(*) FROM orders, lineitem_hash WHERE o_orderkey = l_orderkey; -- Verify we handle local joins between two hash-partitioned tables. -EXPLAIN SELECT count(*) FROM orders_hash, lineitem_hash +EXPLAIN (COSTS OFF) +SELECT count(*) FROM orders_hash, lineitem_hash WHERE o_orderkey = l_orderkey; -- Validate that we can handle broadcast joins with hash-partitioned tables. -EXPLAIN SELECT count(*) FROM customer_hash, nation +EXPLAIN (COSTS OFF) +SELECT count(*) FROM customer_hash, nation WHERE c_nationkey = n_nationkey; -- Validate that we don't use a single-partition join method for a hash -- re-partitioned table, thus preventing a partition of just the customer table. -EXPLAIN SELECT count(*) FROM orders, lineitem, customer_append +EXPLAIN (COSTS OFF) +SELECT count(*) FROM orders, lineitem, customer_append WHERE o_custkey = l_partkey AND o_custkey = c_nationkey; -- Validate that we don't chose a single-partition join method with a -- hash-partitioned base table -EXPLAIN SELECT count(*) FROM orders, customer_hash +EXPLAIN (COSTS OFF) +SELECT count(*) FROM orders, customer_hash WHERE c_custkey = o_custkey; -- Validate that we can re-partition a hash partitioned table to join with a -- range partitioned one. -EXPLAIN SELECT count(*) FROM orders_hash, customer_append +EXPLAIN (COSTS OFF) +SELECT count(*) FROM orders_hash, customer_append WHERE c_custkey = o_custkey; -- Validate a 4 way join that could be done locally is planned as such by the logical -- planner. It used to be planned as a repartition join due to no 1 table being directly -- joined to all other tables, but instead follows a chain. -EXPLAIN SELECT count(*) +EXPLAIN (COSTS OFF) +SELECT count(*) FROM ( SELECT users_table.user_id FROM users_table diff --git a/src/test/regress/sql/multi_join_order_tpch_repartition.sql b/src/test/regress/sql/multi_join_order_tpch_repartition.sql index 32024e73b..8e9c59713 100644 --- a/src/test/regress/sql/multi_join_order_tpch_repartition.sql +++ b/src/test/regress/sql/multi_join_order_tpch_repartition.sql @@ -19,7 +19,8 @@ SET client_min_messages TO LOG; -- Query #6 from the TPC-H decision support benchmark -EXPLAIN SELECT +EXPLAIN (COSTS OFF) +SELECT sum(l_extendedprice * l_discount) as revenue FROM lineitem @@ -31,7 +32,8 @@ WHERE -- Query #3 from the TPC-H decision support benchmark -EXPLAIN SELECT +EXPLAIN (COSTS OFF) +SELECT l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, @@ -56,7 +58,8 @@ ORDER BY -- Query #10 from the TPC-H decision support benchmark -EXPLAIN SELECT +EXPLAIN (COSTS OFF) +SELECT c_custkey, c_name, sum(l_extendedprice * (1 - l_discount)) as revenue, @@ -90,7 +93,8 @@ ORDER BY -- Query #19 from the TPC-H decision support benchmark (modified) -EXPLAIN SELECT +EXPLAIN (COSTS OFF) +SELECT sum(l_extendedprice* (1 - l_discount)) as revenue FROM lineitem, @@ -122,7 +126,8 @@ WHERE -- Query to test multiple re-partition jobs in a single query -EXPLAIN SELECT +EXPLAIN (COSTS OFF) +SELECT l_partkey, count(*) FROM lineitem, part_append, orders, customer_append diff --git a/src/test/regress/sql/multi_join_order_tpch_small.sql b/src/test/regress/sql/multi_join_order_tpch_small.sql index 6ca2693be..e6deeb31d 100644 --- a/src/test/regress/sql/multi_join_order_tpch_small.sql +++ b/src/test/regress/sql/multi_join_order_tpch_small.sql @@ -10,7 +10,8 @@ SET client_min_messages TO LOG; -- Query #6 from the TPC-H decision support benchmark -EXPLAIN SELECT +EXPLAIN (COSTS OFF) +SELECT sum(l_extendedprice * l_discount) as revenue FROM lineitem @@ -22,7 +23,8 @@ WHERE -- Query #3 from the TPC-H decision support benchmark -EXPLAIN SELECT +EXPLAIN (COSTS OFF) +SELECT l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, @@ -47,7 +49,8 @@ ORDER BY -- Query #10 from the TPC-H decision support benchmark -EXPLAIN SELECT +EXPLAIN (COSTS OFF) +SELECT c_custkey, c_name, sum(l_extendedprice * (1 - l_discount)) as revenue, @@ -81,7 +84,8 @@ ORDER BY -- Query #19 from the TPC-H decision support benchmark (modified) -EXPLAIN SELECT +EXPLAIN (COSTS OFF) +SELECT sum(l_extendedprice* (1 - l_discount)) as revenue FROM lineitem, diff --git a/src/test/regress/sql/multi_join_pruning.sql b/src/test/regress/sql/multi_join_pruning.sql index 42ce063e1..69c722be7 100644 --- a/src/test/regress/sql/multi_join_pruning.sql +++ b/src/test/regress/sql/multi_join_pruning.sql @@ -43,16 +43,19 @@ SELECT sum(l_linenumber), avg(l_linenumber) -- etc. This is in response to a bug we had where we were not able to resolve -- correct operator types for some kind of column types. -EXPLAIN SELECT count(*) +EXPLAIN (COSTS OFF) +SELECT count(*) FROM array_partitioned_table table1, array_partitioned_table table2 WHERE table1.array_column = table2.array_column; -EXPLAIN SELECT count(*) +EXPLAIN (COSTS OFF) +SELECT count(*) FROM composite_partitioned_table table1, composite_partitioned_table table2 WHERE table1.composite_column = table2.composite_column; -- Test that large table joins on partition varchar columns work -EXPLAIN SELECT count(*) +EXPLAIN (COSTS OFF) +SELECT count(*) FROM varchar_partitioned_table table1, varchar_partitioned_table table2 WHERE table1.varchar_column = table2.varchar_column; diff --git a/src/test/regress/sql/multi_mx_repartition_udt_prepare.sql b/src/test/regress/sql/multi_mx_repartition_udt_prepare.sql index 0d6b77f27..6bc5c16d6 100644 --- a/src/test/regress/sql/multi_mx_repartition_udt_prepare.sql +++ b/src/test/regress/sql/multi_mx_repartition_udt_prepare.sql @@ -192,7 +192,8 @@ SELECT * FROM repartition_udt JOIN repartition_udt_other -- Query that should result in a repartition join on UDT column. SET citus.log_multi_join_order = true; -EXPLAIN SELECT * FROM repartition_udt JOIN repartition_udt_other +EXPLAIN (COSTS OFF) +SELECT * FROM repartition_udt JOIN repartition_udt_other ON repartition_udt.udtcol = repartition_udt_other.udtcol WHERE repartition_udt.pk > 1; diff --git a/src/test/regress/sql/multi_orderby_limit_pushdown.sql b/src/test/regress/sql/multi_orderby_limit_pushdown.sql index 815418b6e..821c0130a 100644 --- a/src/test/regress/sql/multi_orderby_limit_pushdown.sql +++ b/src/test/regress/sql/multi_orderby_limit_pushdown.sql @@ -18,7 +18,7 @@ GROUP BY user_id ORDER BY avg(value_1) DESC LIMIT 1; -EXPLAIN +EXPLAIN (COSTS OFF) SELECT user_id, avg(value_1) FROM users_table GROUP BY user_id @@ -48,7 +48,7 @@ FROM users_table GROUP BY user_id ORDER BY 2 DESC; -EXPLAIN +EXPLAIN (COSTS OFF) SELECT user_id, avg(value_1) + count(value_2) FROM users_table GROUP BY user_id diff --git a/src/test/regress/sql/multi_repartition_udt.sql b/src/test/regress/sql/multi_repartition_udt.sql index e1a1ab353..e6ba750ee 100644 --- a/src/test/regress/sql/multi_repartition_udt.sql +++ b/src/test/regress/sql/multi_repartition_udt.sql @@ -191,7 +191,8 @@ SELECT * FROM repartition_udt JOIN repartition_udt_other SET citus.task_executor_type = 'task-tracker'; SET citus.log_multi_join_order = true; -EXPLAIN SELECT * FROM repartition_udt JOIN repartition_udt_other +EXPLAIN (COSTS OFF) +SELECT * FROM repartition_udt JOIN repartition_udt_other ON repartition_udt.udtcol = repartition_udt_other.udtcol WHERE repartition_udt.pk > 1; diff --git a/src/test/regress/sql/multi_task_assignment_policy.sql b/src/test/regress/sql/multi_task_assignment_policy.sql index 855b1ef82..2ed20493a 100644 --- a/src/test/regress/sql/multi_task_assignment_policy.sql +++ b/src/test/regress/sql/multi_task_assignment_policy.sql @@ -85,17 +85,17 @@ SET client_min_messages TO DEBUG3; SET citus.task_assignment_policy TO 'greedy'; -EXPLAIN SELECT count(*) FROM task_assignment_test_table; +EXPLAIN (COSTS OFF) SELECT count(*) FROM task_assignment_test_table; -EXPLAIN SELECT count(*) FROM task_assignment_test_table; +EXPLAIN (COSTS OFF) SELECT count(*) FROM task_assignment_test_table; -- Next test the first-replica task assignment policy SET citus.task_assignment_policy TO 'first-replica'; -EXPLAIN SELECT count(*) FROM task_assignment_test_table; +EXPLAIN (COSTS OFF) SELECT count(*) FROM task_assignment_test_table; -EXPLAIN SELECT count(*) FROM task_assignment_test_table; +EXPLAIN (COSTS OFF) SELECT count(*) FROM task_assignment_test_table; COMMIT; diff --git a/src/test/regress/sql/relation_access_tracking.sql b/src/test/regress/sql/relation_access_tracking.sql index 4ce2f6b4e..3a9421191 100644 --- a/src/test/regress/sql/relation_access_tracking.sql +++ b/src/test/regress/sql/relation_access_tracking.sql @@ -234,7 +234,8 @@ ROLLBACK; -- coordinator INSERT .. SELECT BEGIN; - INSERT INTO table_2 SELECT * FROM table_1 OFFSET 0; + -- We use offset 1 to make sure the result needs to be pulled to the coordinator, offset 0 would be optimized away + INSERT INTO table_2 SELECT * FROM table_1 OFFSET 1; SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1; ROLLBACK;