citus/src/backend/distributed/planner/distributed_planner.c

3008 lines
89 KiB
C
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

/*-------------------------------------------------------------------------
*
* distributed_planner.c
* General Citus planner code.
*
* Copyright (c) Citus Data, Inc.
*-------------------------------------------------------------------------
*/
#include <float.h>
#include <limits.h>
#include "postgres.h"
#include "funcapi.h"
#include "access/htup_details.h"
#include "access/xact.h"
#include "catalog/pg_class.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_type.h"
#include "executor/executor.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "nodes/pg_list.h"
#include "optimizer/optimizer.h"
#include "optimizer/pathnode.h"
#include "optimizer/plancat.h"
#include "optimizer/planmain.h"
#include "optimizer/planner.h"
#include "parser/parse_type.h"
#include "parser/parsetree.h"
#include "utils/builtins.h"
#include "utils/datum.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
#include "pg_version_constants.h"
#include "distributed/citus_depended_object.h"
#include "distributed/citus_nodefuncs.h"
#include "distributed/citus_nodes.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/colocation_utils.h"
#include "distributed/combine_query_planner.h"
#include "distributed/commands.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/cte_inline.h"
#include "distributed/distributed_planner.h"
#include "distributed/function_call_delegation.h"
#include "distributed/insert_select_planner.h"
#include "distributed/intermediate_result_pruning.h"
#include "distributed/intermediate_results.h"
#include "distributed/listutils.h"
#include "distributed/merge_planner.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_logical_optimizer.h"
#include "distributed/multi_logical_planner.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_router_planner.h"
#include "distributed/query_pushdown_planning.h"
#include "distributed/query_utils.h"
#include "distributed/recursive_planning.h"
#include "distributed/shard_utils.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/utils/citus_stat_tenants.h"
#include "distributed/version_compat.h"
#include "distributed/worker_shard_visibility.h"
#if PG_VERSION_NUM >= PG_VERSION_16
#include "parser/parse_relation.h"
#endif
/* RouterPlanType is used to determine the router plan to invoke */
typedef enum RouterPlanType
{
INSERT_SELECT_INTO_CITUS_TABLE,
INSERT_SELECT_INTO_LOCAL_TABLE,
DML_QUERY,
SELECT_QUERY,
MERGE_QUERY,
REPLAN_WITH_BOUND_PARAMETERS
} RouterPlanType;
static List *plannerRestrictionContextList = NIL;
int MultiTaskQueryLogLevel = CITUS_LOG_LEVEL_OFF; /* multi-task query log level */
static uint64 NextPlanId = 1;
/* keep track of planner call stack levels */
int PlannerLevel = 0;
static PlannedStmt * CreateDistributedPlannedStmt(
DistributedPlanningContext *planContext);
static PlannedStmt * InlineCtesAndCreateDistributedPlannedStmt(uint64 planId,
DistributedPlanningContext
*planContext);
static PlannedStmt * TryCreateDistributedPlannedStmt(PlannedStmt *localPlan,
Query *originalQuery,
Query *query, ParamListInfo
boundParams,
PlannerRestrictionContext *
plannerRestrictionContext);
static DeferredErrorMessage * DeferErrorIfPartitionTableNotSingleReplicated(Oid
relationId);
static int AssignRTEIdentities(List *rangeTableList, int rteIdCounter);
static void AssignRTEIdentity(RangeTblEntry *rangeTableEntry, int rteIdentifier);
static void AdjustPartitioningForDistributedPlanning(List *rangeTableList,
bool setPartitionedTablesInherited);
static bool RTEWentThroughAdjustPartitioning(RangeTblEntry *rangeTableEntry);
static PlannedStmt * FinalizeNonRouterPlan(PlannedStmt *localPlan,
DistributedPlan *distributedPlan,
CustomScan *customScan);
static PlannedStmt * FinalizeRouterPlan(PlannedStmt *localPlan, CustomScan *customScan);
static AppendRelInfo * FindTargetAppendRelInfo(PlannerInfo *root, int relationRteIndex);
static List * makeTargetListFromCustomScanList(List *custom_scan_tlist);
static List * makeCustomScanTargetlistFromExistingTargetList(List *existingTargetlist);
static int32 BlessRecordExpressionList(List *exprs);
static void CheckNodeIsDumpable(Node *node);
static Node * CheckNodeCopyAndSerialization(Node *node);
static void AdjustReadIntermediateResultCost(RangeTblEntry *rangeTableEntry,
RelOptInfo *relOptInfo);
static void AdjustReadIntermediateResultArrayCost(RangeTblEntry *rangeTableEntry,
RelOptInfo *relOptInfo);
static void AdjustReadIntermediateResultsCostInternal(RelOptInfo *relOptInfo,
List *columnTypes,
int resultIdCount,
Datum *resultIds,
Const *resultFormatConst);
static List * OuterPlanParamsList(PlannerInfo *root);
static List * CopyPlanParamList(List *originalPlanParamList);
static PlannerRestrictionContext * CreateAndPushPlannerRestrictionContext(void);
static PlannerRestrictionContext * CurrentPlannerRestrictionContext(void);
static void PopPlannerRestrictionContext(void);
static void ResetPlannerRestrictionContext(
PlannerRestrictionContext *plannerRestrictionContext);
static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *planContext,
Node *distributionKeyValue);
static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext,
int rteIdCounter);
static RTEListProperties * GetRTEListProperties(List *rangeTableList);
static List * TranslatedVars(PlannerInfo *root, int relationIndex);
static void WarnIfListHasForeignDistributedTable(List *rangeTableList);
static RouterPlanType GetRouterPlanType(Query *query,
Query *originalQuery,
bool hasUnresolvedParams);
static void ConcatenateRTablesAndPerminfos(PlannedStmt *mainPlan,
PlannedStmt *concatPlan);
static bool CheckPostPlanDistribution(bool isDistributedQuery,
Query *origQuery,
List *rangeTableList,
Query *plannedQuery);
static Query *
RewriteInsertSelectForPartialNextval(Query *originalQuery);
static List *
TargetEntryList(List *expressionList);
static int
FindRTEIndexInQuery(Query *query, RangeTblEntry *rte);
RangeTblEntry *
ExtractResultRelationRTEOrNull(Query *query);
/* Distributed planner hook */
PlannedStmt *
distributed_planner(Query *parse,
const char *query_string,
int cursorOptions,
ParamListInfo boundParams)
{
bool needsDistributedPlanning = false;
bool fastPathRouterQuery = false;
Node *distributionKeyValue = NULL;
List *rangeTableList = ExtractRangeTableEntryList(parse);
if (cursorOptions & CURSOR_OPT_FORCE_DISTRIBUTED)
{
/* this cursor flag could only be set when Citus has been loaded */
Assert(CitusHasBeenLoaded());
/*
* We cannot have merge command for this path as well because
* there cannot be recursively planned merge command.
*/
Assert(!IsMergeQuery(parse));
needsDistributedPlanning = true;
}
else if (CitusHasBeenLoaded())
{
bool maybeHasForeignDistributedTable = false;
needsDistributedPlanning =
ListContainsDistributedTableRTE(rangeTableList,
&maybeHasForeignDistributedTable);
if (needsDistributedPlanning)
{
fastPathRouterQuery = FastPathRouterQuery(parse, &distributionKeyValue);
if (maybeHasForeignDistributedTable)
{
WarnIfListHasForeignDistributedTable(rangeTableList);
}
}
}
int rteIdCounter = 1;
DistributedPlanningContext planContext = {
.query = parse,
.cursorOptions = cursorOptions,
.boundParams = boundParams,
};
if (needsDistributedPlanning)
{
/*
* standard_planner scribbles on its input, but for deparsing we need the
* unmodified form. Before copying we call AssignRTEIdentities to be able
* to match RTEs in the rewritten query tree with those in the original
* tree.
*/
rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter);
planContext.originalQuery = copyObject(parse);
if (!fastPathRouterQuery)
{
/*
* When there are partitioned tables (not applicable to fast path),
* pretend that they are regular tables to avoid unnecessary work
* in standard_planner.
*/
bool setPartitionedTablesInherited = false;
AdjustPartitioningForDistributedPlanning(rangeTableList,
setPartitionedTablesInherited);
}
}
/*
* Make sure that we hide shard names on the Citus MX worker nodes. See comments in
* HideShardsFromSomeApplications() for the details.
*/
HideShardsFromSomeApplications(parse);
/*
* If GUC is set, we prevent queries, which contain pg meta relations, from
* showing any citus dependent object. The flag is expected to be set only before
* postgres vanilla tests.
*/
HideCitusDependentObjectsOnQueriesOfPgMetaTables((Node *) parse, NULL);
/* create a restriction context and put it at the end if context list */
planContext.plannerRestrictionContext = CreateAndPushPlannerRestrictionContext();
/*
* We keep track of how many times we've recursed into the planner, primarily
* to detect whether we are in a function call. We need to make sure that the
* PlannerLevel is decremented exactly once at the end of the next PG_TRY
* block, both in the happy case and when an error occurs.
*/
PlannerLevel++;
PlannedStmt *result = NULL;
PG_TRY();
{
if (fastPathRouterQuery)
{
result = PlanFastPathDistributedStmt(&planContext, distributionKeyValue);
}
else
{
/*
* Call into standard_planner because the Citus planner relies on both the
* restriction information per table and parse tree transformations made by
* postgres' planner.
*/
planContext.plan = standard_planner(planContext.query, NULL,
planContext.cursorOptions,
planContext.boundParams);
needsDistributedPlanning = CheckPostPlanDistribution(needsDistributedPlanning,
planContext.originalQuery,
rangeTableList,
planContext.query);
if (needsDistributedPlanning)
{
result = PlanDistributedStmt(&planContext, rteIdCounter);
}
else if ((result = TryToDelegateFunctionCall(&planContext)) == NULL)
{
result = planContext.plan;
}
}
}
PG_CATCH();
{
PopPlannerRestrictionContext();
PlannerLevel--;
PG_RE_THROW();
}
PG_END_TRY();
PlannerLevel--;
/* remove the context from the context list */
PopPlannerRestrictionContext();
/*
* In some cases, for example; parameterized SQL functions, we may miss that
* there is a need for distributed planning. Such cases only become clear after
* standard_planner performs some modifications on parse tree. In such cases
* we will simply error out.
*/
if (!needsDistributedPlanning && NeedsDistributedPlanning(parse))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot perform distributed planning on this "
"query because parameterized queries for SQL "
"functions referencing distributed tables are "
"not supported"),
errhint("Consider using PL/pgSQL functions instead.")));
}
/*
* We annotate the query for tenant statisisics.
*/
AttributeQueryIfAnnotated(query_string, parse->commandType);
return result;
}
/*
* ExtractRangeTableEntryList is a wrapper around ExtractRangeTableEntryWalker.
* The function traverses the input query and returns all the range table
* entries that are in the query tree.
*/
List *
ExtractRangeTableEntryList(Query *query)
{
List *rteList = NIL;
ExtractRangeTableEntryWalker((Node *) query, &rteList);
return rteList;
}
/*
* NeedsDistributedPlanning returns true if the Citus extension is loaded and
* the query contains a distributed table.
*
* This function allows queries containing local tables to pass through the
* distributed planner. How to handle local tables is a decision that should
* be made within the planner
*/
bool
NeedsDistributedPlanning(Query *query)
{
if (!CitusHasBeenLoaded())
{
return false;
}
CmdType commandType = query->commandType;
if (commandType != CMD_SELECT && commandType != CMD_INSERT &&
commandType != CMD_UPDATE && commandType != CMD_DELETE)
{
return false;
}
List *allRTEs = ExtractRangeTableEntryList(query);
return ListContainsDistributedTableRTE(allRTEs, NULL);
}
/*
* ListContainsDistributedTableRTE gets a list of range table entries
* and returns true if there is at least one distributed relation range
* table entry in the list. The boolean maybeHasForeignDistributedTable
* variable is set to true if the list contains a foreign table.
*/
bool
ListContainsDistributedTableRTE(List *rangeTableList,
bool *maybeHasForeignDistributedTable)
{
ListCell *rangeTableCell = NULL;
foreach(rangeTableCell, rangeTableList)
{
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
if (rangeTableEntry->rtekind != RTE_RELATION)
{
continue;
}
if (HideCitusDependentObjects && IsolationIsSerializable() && IsPgLocksTable(
rangeTableEntry))
{
/*
* Postgres tidscan.sql test fails if we do not filter pg_locks table because
* test results, which show taken locks in serializable isolation mode,
* fails by showing extra lock taken by IsCitusTable below.
*/
continue;
}
if (IsCitusTable(rangeTableEntry->relid))
{
if (maybeHasForeignDistributedTable != NULL &&
IsForeignTable(rangeTableEntry->relid))
{
*maybeHasForeignDistributedTable = true;
}
return true;
}
}
return false;
}
/*
* AssignRTEIdentities function modifies query tree by adding RTE identities to the
* RTE_RELATIONs.
*
* Please note that, we want to avoid modifying query tree as much as possible
* because if PostgreSQL changes the way it uses modified fields, that may break
* our logic.
*
* Returns the next id. This can be used to call on a rangeTableList that may've
* been partially assigned. Should be set to 1 initially.
*/
static int
AssignRTEIdentities(List *rangeTableList, int rteIdCounter)
{
ListCell *rangeTableCell = NULL;
foreach(rangeTableCell, rangeTableList)
{
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
/*
* To be able to track individual RTEs through PostgreSQL's query
* planning, we need to be able to figure out whether an RTE is
* actually a copy of another, rather than a different one. We
* simply number the RTEs starting from 1.
*
* Note that we're only interested in RTE_RELATIONs and thus assigning
* identifiers to those RTEs only.
*/
if (rangeTableEntry->rtekind == RTE_RELATION &&
rangeTableEntry->values_lists == NIL)
{
AssignRTEIdentity(rangeTableEntry, rteIdCounter++);
}
}
return rteIdCounter;
}
/*
* AdjustPartitioningForDistributedPlanning function modifies query tree by
* changing inh flag and relkind of partitioned tables. We want Postgres to
* treat partitioned tables as regular relations (i.e. we do not want to
* expand them to their partitions) since it breaks Citus planning in different
* ways. We let anything related to partitioning happen on the shards.
*
* Please note that, we want to avoid modifying query tree as much as possible
* because if PostgreSQL changes the way it uses modified fields, that may break
* our logic.
*/
static void
AdjustPartitioningForDistributedPlanning(List *rangeTableList,
bool setPartitionedTablesInherited)
{
ListCell *rangeTableCell = NULL;
foreach(rangeTableCell, rangeTableList)
{
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
/*
* We want Postgres to behave partitioned tables as regular relations
* (i.e. we do not want to expand them to their partitions). To do this
* we set each partitioned table's inh flag to appropriate
* value before and after dropping to the standart_planner.
*/
if (rangeTableEntry->rtekind == RTE_RELATION &&
PartitionedTable(rangeTableEntry->relid))
{
rangeTableEntry->inh = setPartitionedTablesInherited;
if (setPartitionedTablesInherited)
{
rangeTableEntry->relkind = RELKIND_PARTITIONED_TABLE;
}
else
{
rangeTableEntry->relkind = RELKIND_RELATION;
}
}
}
}
/*
* RTEWentThroughAdjustPartitioning returns true if the given rangetableentry
* has been modified through AdjustPartitioningForDistributedPlanning
* function, false otherwise.
*/
static bool
RTEWentThroughAdjustPartitioning(RangeTblEntry *rangeTableEntry)
{
return (rangeTableEntry->rtekind == RTE_RELATION &&
PartitionedTable(rangeTableEntry->relid) &&
rangeTableEntry->inh == false);
}
/*
* AssignRTEIdentity assigns the given rteIdentifier to the given range table
* entry.
*
* To be able to track RTEs through postgres' query planning, which copies and
* duplicate, and modifies them, we sometimes need to figure out whether two
* RTEs are copies of the same original RTE. For that we, hackishly, use a
* field normally unused in RTE_RELATION RTEs.
*
* The assigned identifier better be unique within a plantree.
*/
static void
AssignRTEIdentity(RangeTblEntry *rangeTableEntry, int rteIdentifier)
{
Assert(rangeTableEntry->rtekind == RTE_RELATION);
rangeTableEntry->values_lists = list_make2_int(rteIdentifier, rangeTableEntry->inh);
}
/* GetRTEIdentity returns the identity assigned with AssignRTEIdentity. */
int
GetRTEIdentity(RangeTblEntry *rte)
{
Assert(rte->rtekind == RTE_RELATION);
/*
* Since SQL functions might be in-lined by standard_planner,
* we might miss assigning an RTE identity for RangeTblEntries
* related to SQL functions. We already have checks in other
* places to throw an error for SQL functions but they are not
* sufficient due to function in-lining; so here we capture such
* cases and throw an error here.
*/
if (list_length(rte->values_lists) != 2)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot perform distributed planning on this "
"query because parameterized queries for SQL "
"functions referencing distributed tables are "
"not supported"),
errhint("Consider using PL/pgSQL functions instead.")));
}
Assert(IsA(rte->values_lists, IntList));
return linitial_int(rte->values_lists);
}
/*
* GetOriginalInh gets the original value of the inheritance flag set by
* AssignRTEIdentity. The planner resets this flag in the rewritten query,
* but we need it during deparsing.
*/
bool
GetOriginalInh(RangeTblEntry *rte)
{
return lsecond_int(rte->values_lists);
}
/*
* GetQueryLockMode returns the necessary lock mode to be acquired for the
* given query. (See comment written in RangeTblEntry->rellockmode)
*/
LOCKMODE
GetQueryLockMode(Query *query)
{
if (IsModifyCommand(query))
{
return RowExclusiveLock;
}
else if (query->hasForUpdate)
{
return RowShareLock;
}
else
{
return AccessShareLock;
}
}
/*
* IsModifyCommand returns true if the query performs modifications, false
* otherwise.
*/
bool
IsModifyCommand(Query *query)
{
CmdType commandType = query->commandType;
if (commandType == CMD_INSERT || commandType == CMD_UPDATE ||
commandType == CMD_DELETE || commandType == CMD_MERGE)
{
return true;
}
return false;
}
/*
* IsMultiTaskPlan returns true if job contains multiple tasks.
*/
bool
IsMultiTaskPlan(DistributedPlan *distributedPlan)
{
Job *workerJob = distributedPlan->workerJob;
if (workerJob != NULL && list_length(workerJob->taskList) > 1)
{
return true;
}
return false;
}
/*
* PlanFastPathDistributedStmt creates a distributed planned statement using
* the FastPathPlanner.
*/
static PlannedStmt *
PlanFastPathDistributedStmt(DistributedPlanningContext *planContext,
Node *distributionKeyValue)
{
FastPathRestrictionContext *fastPathContext =
planContext->plannerRestrictionContext->fastPathRestrictionContext;
planContext->plannerRestrictionContext->fastPathRestrictionContext->
fastPathRouterQuery = true;
if (distributionKeyValue == NULL)
{
/* nothing to record */
}
else if (IsA(distributionKeyValue, Const))
{
fastPathContext->distributionKeyValue = (Const *) distributionKeyValue;
}
else if (IsA(distributionKeyValue, Param))
{
fastPathContext->distributionKeyHasParam = true;
}
planContext->plan = FastPathPlanner(planContext->originalQuery, planContext->query,
planContext->boundParams);
return CreateDistributedPlannedStmt(planContext);
}
/*
* PlanDistributedStmt creates a distributed planned statement using the PG
* planner.
*/
static PlannedStmt *
PlanDistributedStmt(DistributedPlanningContext *planContext,
int rteIdCounter)
{
/* may've inlined new relation rtes */
List *rangeTableList = ExtractRangeTableEntryList(planContext->query);
rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter);
PlannedStmt *result = CreateDistributedPlannedStmt(planContext);
bool setPartitionedTablesInherited = true;
AdjustPartitioningForDistributedPlanning(rangeTableList,
setPartitionedTablesInherited);
return result;
}
/*
* DissuadePlannerFromUsingPlan try dissuade planner when planning a plan that
* potentially failed due to unresolved prepared statement parameters.
*/
void
DissuadePlannerFromUsingPlan(PlannedStmt *plan)
{
/*
* Arbitrarily high cost, but low enough that it can be added up
* without overflowing by choose_custom_plan().
*/
Assert(plan != NULL);
plan->planTree->total_cost = FLT_MAX / 100000000;
}
/*
* CreateDistributedPlannedStmt encapsulates the logic needed to transform a particular
* query into a distributed plan that is encapsulated by a PlannedStmt.
*/
static PlannedStmt *
CreateDistributedPlannedStmt(DistributedPlanningContext *planContext)
{
uint64 planId = NextPlanId++;
bool hasUnresolvedParams = false;
PlannedStmt *resultPlan = NULL;
if (QueryTreeContainsInlinableCTE(planContext->originalQuery))
{
/*
* Inlining CTEs as subqueries in the query can avoid recursively
* planning some (or all) of the CTEs. In other words, the inlined
* CTEs could become part of query pushdown planning, which is much
* more efficient than recursively planning. So, first try distributed
* planning on the inlined CTEs in the query tree.
*
* We also should fallback to distributed planning with non-inlined CTEs
* if the distributed planning fails with inlined CTEs, because recursively
* planning CTEs can provide full SQL coverage, although it might be slow.
*/
resultPlan = InlineCtesAndCreateDistributedPlannedStmt(planId, planContext);
if (resultPlan != NULL)
{
return resultPlan;
}
}
if (HasUnresolvedExternParamsWalker((Node *) planContext->originalQuery,
planContext->boundParams))
{
hasUnresolvedParams = true;
}
bool allowRecursivePlanning = true;
DistributedPlan *distributedPlan =
CreateDistributedPlan(planId, allowRecursivePlanning,
planContext->originalQuery,
planContext->query,
planContext->boundParams,
hasUnresolvedParams,
planContext->plannerRestrictionContext);
/*
* If no plan was generated, prepare a generic error to be emitted.
* Normally this error message will never returned to the user, as it's
* usually due to unresolved prepared statement parameters - in that case
* the logic below will force a custom plan (i.e. with parameters bound to
* specific values) to be generated. But sql (not plpgsql) functions
* unfortunately don't go through a codepath supporting custom plans - so
* we still need to have an error prepared.
*/
if (!distributedPlan)
{
/* currently always should have a more specific error otherwise */
Assert(hasUnresolvedParams);
distributedPlan = CitusMakeNode(DistributedPlan);
distributedPlan->planningError =
DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"could not create distributed plan",
"Possibly this is caused by the use of parameters in SQL "
"functions, which is not supported in Citus.",
"Consider using PL/pgSQL functions instead.");
}
/*
* Error out if none of the planners resulted in a usable plan, unless the
* error was possibly triggered by missing parameters. In that case we'll
* not error out here, but instead rely on postgres' custom plan logic.
* Postgres re-plans prepared statements the first five executions
* (i.e. it produces custom plans), after that the cost of a generic plan
* is compared with the average custom plan cost. We support otherwise
* unsupported prepared statement parameters by assigning an exorbitant
* cost to the unsupported query. That'll lead to the custom plan being
* chosen. But for that to be possible we can't error out here, as
* otherwise that logic is never reached.
*/
if (distributedPlan->planningError && !hasUnresolvedParams)
{
RaiseDeferredError(distributedPlan->planningError, ERROR);
}
/* remember the plan's identifier for identifying subplans */
distributedPlan->planId = planId;
/* create final plan by combining local plan with distributed plan */
resultPlan = FinalizePlan(planContext->plan, distributedPlan);
/*
* As explained above, force planning costs to be unrealistically high if
* query planning failed (possibly) due to prepared statement parameters or
* if it is planned as a multi shard modify query.
*/
if ((distributedPlan->planningError ||
(UpdateOrDeleteOrMergeQuery(planContext->originalQuery) && IsMultiTaskPlan(
distributedPlan))) &&
hasUnresolvedParams)
{
DissuadePlannerFromUsingPlan(resultPlan);
}
return resultPlan;
}
/*
* InlineCtesAndCreateDistributedPlannedStmt gets all the parameters required
* for creating a distributed planned statement. The function is primarily a
* wrapper on top of CreateDistributedPlannedStmt(), by first inlining the
* CTEs and calling CreateDistributedPlannedStmt() in PG_TRY() block. The
* function returns NULL if the planning fails on the query where eligable
* CTEs are inlined.
*/
static PlannedStmt *
InlineCtesAndCreateDistributedPlannedStmt(uint64 planId,
DistributedPlanningContext *planContext)
{
/*
* We'll inline the CTEs and try distributed planning, preserve the original
* query in case the planning fails and we fallback to recursive planning of
* CTEs.
*/
Query *copyOfOriginalQuery = copyObject(planContext->originalQuery);
RecursivelyInlineCtesInQueryTree(copyOfOriginalQuery);
/* after inlining, we shouldn't have any inlinable CTEs */
Assert(!QueryTreeContainsInlinableCTE(copyOfOriginalQuery));
// if (QueryContainsNextval(copyOfOriginalQuery))
// {
// /* rewrite the query to partial pushdown form */
// copyOfOriginalQuery = RewriteInsertSelectForPartialNextval(copyOfOriginalQuery);
// }
/* simply recurse into CreateDistributedPlannedStmt() in a PG_TRY() block */
PlannedStmt *result = TryCreateDistributedPlannedStmt(planContext->plan,
copyOfOriginalQuery,
planContext->query,
planContext->boundParams,
planContext->
plannerRestrictionContext);
return result;
}
/*
* RewriteInsertSelectForPartialNextval
*
* Given an INSERT ... SELECT query that contains nextval() in its target list,
* rewrite the query so that the inner subquery (which is pushed down to workers)
* returns only the non-volatile (non-nextval) columns. The outer query then
* adds the nextval() calls in its target list. This forces nextval() to be
* evaluated on the coordinator.
*
* For example, this transforms:
*
* INSERT INTO target (col1, col2, col3)
* SELECT col1, nextval('seq'), col3
* FROM distributed_table;
*
* into a plan roughly equivalent to:
*
* INSERT INTO target (col1, col2, col3)
* SELECT worker.col1, nextval('seq'), worker.col3
* FROM (
* SELECT col1, col3
* FROM distributed_table
* ) AS worker;
*/
Query *
RewriteInsertSelectForPartialNextval(Query *originalQuery)
{
/* 0) Basic check: should be an INSERT...SELECT query */
RangeTblEntry *insertRte = ExtractResultRelationRTEOrNull(originalQuery);
if (insertRte == NULL)
{
/* not an INSERT...SELECT, or no result table => skip rewriting */
return originalQuery;
}
/* 1) Get the result relation and the SELECT side RTE */
// RangeTblEntry *insertRte = ExtractResultRelationRTEOrError(originalQuery);
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(originalQuery);
/* 2) Ensure the SELECT side is an RTE_SUBQUERY. If not, wrap it. */
if (selectRte->rtekind != RTE_SUBQUERY)
{
Query *fakeSelect = makeNode(Query);
fakeSelect->commandType = CMD_SELECT;
fakeSelect->targetList = copyObject(selectRte->subquery->targetList);
/* Additional fields (rtable, jointree) should be copied as needed */
selectRte->rtekind = RTE_SUBQUERY;
selectRte->subquery = fakeSelect;
}
/* Optionally, wrap the subquery to get a clean target list */
selectRte->subquery = WrapSubquery(selectRte->subquery);
Query *subquery = selectRte->subquery; /* This is now our inner subquery */
/*
* 3) Partition the subquery’s target list into:
* - workerExprs: expressions that do NOT contain nextval()
* - volatileExprs: expressions that DO contain nextval()
*
* We'll use a helper function RemoveNextvalFromTargetList() that,
* for each TargetEntry in subquery->targetList, places its expr into one of
* two lists depending on whether contain_nextval_expression_walker() returns true.
*/
List *workerExprs = NIL;
List *volatileExprs = NIL;
ListCell *lc;
foreach(lc, subquery->targetList)
{
TargetEntry *tle = (TargetEntry *) lfirst(lc);
if (contain_nextval_expression_walker((Node *) tle->expr, NULL))
volatileExprs = lappend(volatileExprs, tle->expr);
else
workerExprs = lappend(workerExprs, tle->expr);
}
/*
* 4) Build new target lists:
*
* For the inner subquery: Use only the worker expressions.
* For the outer (coordinator) query: Build a target list that combines:
* - Vars that reference the worker subquery’s output columns (for each workerExpr)
* - The volatile expressions (e.g. nextval()) placed in their proper positions.
*
* Here we assume that the original target list order is preserved:
* e.g. if the original target list was:
* [ expr1, expr2 (volatile), expr3 ]
* then the new outer target list should be:
* [ Var(worker_col1), expr2, Var(worker_col2) ]
*
* This example assumes that the order of workerExprs and volatileExprs
* correspond to their positions in the original target list.
*
* A more robust implementation would walk the original target list and, for
* each entry, either replace it with a Var reference (if non-volatile) or
* keep the volatile expression.
*/
/* Create new inner subquery target list from workerExprs */
List *newWorkerTList = TargetEntryList(workerExprs);
subquery->targetList = newWorkerTList; /* inner subquery returns only worker columns */
/* Build the outer target list */
List *newOuterTList = NIL;
int outerResno = 1;
int workerIndex = 1;
int volatileIndex = 1;
/* For each original target entry, check if it was volatile or not.
For simplicity, we assume the original order is preserved in the concatenated lists.
A robust solution would store position info. */
foreach(lc, originalQuery->targetList)
{
TargetEntry *origTle = (TargetEntry *) lfirst(lc);
if (contain_nextval_expression_walker((Node *) origTle->expr, NULL))
{
/* Keep the volatile expression as is. */
TargetEntry *newTle = makeTargetEntry(copyObject(origTle->expr),
outerResno,
pstrdup(origTle->resname),
false);
newOuterTList = lappend(newOuterTList, newTle);
volatileIndex++;
}
else
{
/* Replace non-volatile expression with a Var reference to the subquery's output.
We need to get the RTE index of the subquery.
*/
int subqueryIndex = FindRTEIndexInQuery(originalQuery, selectRte);
TargetEntry *workerTle = (TargetEntry *) list_nth(newWorkerTList, workerIndex - 1);
Var *v = makeVar(subqueryIndex, /* the RTE index (1-based) */
workerIndex, /* attribute number within subquery TList */
exprType((Node *) workerTle->expr),
exprTypmod((Node *) workerTle->expr),
exprCollation((Node *) workerTle->expr),
0);
TargetEntry *newTle = makeTargetEntry((Expr *) v,
outerResno,
pstrdup(origTle->resname),
false);
newOuterTList = lappend(newOuterTList, newTle);
workerIndex++;
}
outerResno++;
}
originalQuery->targetList = newOuterTList;
/* Optionally, re-run any target list reordering to align with the physical table's column order */
ReorderInsertSelectTargetLists(originalQuery, insertRte, selectRte);
return originalQuery;
}
RangeTblEntry *
ExtractResultRelationRTEOrNull(Query *query)
{
if (query->resultRelation == 0)
return NULL;
RangeTblEntry *rte = rt_fetch(query->resultRelation, query->rtable);
return rte;
}
/*
* TargetEntryList creates a new target list from a list of expressions.
* Each expression is wrapped in a TargetEntry with an automatically generated
* column name.
*/
static List *
TargetEntryList(List *expressionList)
{
List *tlist = NIL;
ListCell *cell;
int colIndex = 1;
foreach(cell, expressionList)
{
Expr *expr = (Expr *) lfirst(cell);
StringInfo colName = makeStringInfo();
appendStringInfo(colName, "worker_col%d", colIndex);
TargetEntry *tle = makeTargetEntry(expr, colIndex, pstrdup(colName->data), false);
tlist = lappend(tlist, tle);
colIndex++;
}
return tlist;
}
/*
* FindRTEIndexInQuery returns the 1-based index of the given RTE in query->rtable.
* If not found, returns 0.
*/
static int
FindRTEIndexInQuery(Query *query, RangeTblEntry *rte)
{
ListCell *cell;
int index = 0;
foreach(cell, query->rtable)
{
index++;
if (lfirst(cell) == (void *) rte)
return index;
}
return 0;
}
/*
* TryCreateDistributedPlannedStmt is a wrapper around CreateDistributedPlannedStmt, simply
* calling it in PG_TRY()/PG_CATCH() block. The function returns a PlannedStmt if the input
* query can be planned by Citus. If not, the function returns NULL and generates a DEBUG4
* message with the reason for the failure.
*/
static PlannedStmt *
TryCreateDistributedPlannedStmt(PlannedStmt *localPlan,
Query *originalQuery,
Query *query, ParamListInfo boundParams,
PlannerRestrictionContext *plannerRestrictionContext)
{
MemoryContext savedContext = CurrentMemoryContext;
PlannedStmt *result = NULL;
DistributedPlanningContext *planContext = palloc0(sizeof(DistributedPlanningContext));
planContext->plan = localPlan;
planContext->boundParams = boundParams;
planContext->originalQuery = originalQuery;
planContext->query = query;
planContext->plannerRestrictionContext = plannerRestrictionContext;
PG_TRY();
{
result = CreateDistributedPlannedStmt(planContext);
}
PG_CATCH();
{
MemoryContextSwitchTo(savedContext);
ErrorData *edata = CopyErrorData();
FlushErrorState();
/* don't try to intercept PANIC or FATAL, let those breeze past us */
if (edata->elevel != ERROR)
{
PG_RE_THROW();
}
ereport(DEBUG4, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Planning after CTEs inlined failed with "
"\nmessage: %s\ndetail: %s\nhint: %s",
edata->message ? edata->message : "",
edata->detail ? edata->detail : "",
edata->hint ? edata->hint : "")));
/* leave the error handling system */
FreeErrorData(edata);
result = NULL;
}
PG_END_TRY();
return result;
}
/*
* GetRouterPlanType checks the parse tree to return appropriate plan type.
*/
static RouterPlanType
GetRouterPlanType(Query *query, Query *originalQuery, bool hasUnresolvedParams)
{
if (!IsModifyCommand(originalQuery))
{
return SELECT_QUERY;
}
Oid targetRelationId = ModifyQueryResultRelationId(query);
EnsureModificationsCanRunOnRelation(targetRelationId);
EnsurePartitionTableNotReplicated(targetRelationId);
/* Check the type of modification being done */
if (InsertSelectIntoCitusTable(originalQuery))
{
if (hasUnresolvedParams)
{
return REPLAN_WITH_BOUND_PARAMETERS;
}
return INSERT_SELECT_INTO_CITUS_TABLE;
}
else if (InsertSelectIntoLocalTable(originalQuery))
{
if (hasUnresolvedParams)
{
return REPLAN_WITH_BOUND_PARAMETERS;
}
return INSERT_SELECT_INTO_LOCAL_TABLE;
}
else if (IsMergeQuery(originalQuery))
{
if (hasUnresolvedParams)
{
return REPLAN_WITH_BOUND_PARAMETERS;
}
return MERGE_QUERY;
}
else
{
return DML_QUERY;
}
}
/*
* CreateDistributedPlan generates a distributed plan for a query.
* It goes through 3 steps:
*
* 1. Try router planner
* 2. Generate subplans for CTEs and complex subqueries
* - If any, go back to step 1 by calling itself recursively
* 3. Logical planner
*/
DistributedPlan *
CreateDistributedPlan(uint64 planId, bool allowRecursivePlanning, Query *originalQuery,
Query *query, ParamListInfo boundParams, bool hasUnresolvedParams,
PlannerRestrictionContext *plannerRestrictionContext)
{
DistributedPlan *distributedPlan = NULL;
bool hasCtes = originalQuery->cteList != NIL;
/* Step 1: Try router planner */
RouterPlanType routerPlan = GetRouterPlanType(query, originalQuery,
hasUnresolvedParams);
switch (routerPlan)
{
case INSERT_SELECT_INTO_CITUS_TABLE:
{
distributedPlan =
CreateInsertSelectPlan(planId,
originalQuery,
plannerRestrictionContext,
boundParams);
break;
}
case INSERT_SELECT_INTO_LOCAL_TABLE:
{
if (QueryContainsNextval(originalQuery))
{
/* rewrite the query to partial pushdown form */
originalQuery = RewriteInsertSelectForPartialNextval(originalQuery);
}
distributedPlan =
CreateInsertSelectIntoLocalTablePlan(planId,
originalQuery,
boundParams,
hasUnresolvedParams,
plannerRestrictionContext);
break;
}
case DML_QUERY:
{
/* modifications are always routed through the same planner/executor */
distributedPlan =
CreateModifyPlan(originalQuery, query, plannerRestrictionContext);
break;
}
case MERGE_QUERY:
{
distributedPlan =
CreateMergePlan(planId, originalQuery, query, plannerRestrictionContext,
boundParams);
break;
}
case REPLAN_WITH_BOUND_PARAMETERS:
{
/*
* Unresolved parameters can cause performance regressions in
* INSERT...SELECT when the partition column is a parameter
* because we don't perform any additional pruning in the executor.
*/
return NULL;
}
case SELECT_QUERY:
{
/*
* For select queries we, if router executor is enabled, first try to
* plan the query as a router query. If not supported, otherwise try
* the full blown plan/optimize/physical planning process needed to
* produce distributed query plans.
*/
distributedPlan =
CreateRouterPlan(originalQuery, query, plannerRestrictionContext);
break;
}
}
/* the functions above always return a plan, possibly with an error */
Assert(distributedPlan);
if (distributedPlan->planningError == NULL)
{
return distributedPlan;
}
else
{
RaiseDeferredError(distributedPlan->planningError, DEBUG2);
}
if (hasUnresolvedParams)
{
/*
* There are parameters that don't have a value in boundParams.
*
* The remainder of the planning logic cannot handle unbound
* parameters. We return a NULL plan, which will have an
* extremely high cost, such that postgres will replan with
* bound parameters.
*/
return NULL;
}
/* force evaluation of bound params */
boundParams = copyParamList(boundParams);
/*
* If there are parameters that do have a value in boundParams, replace
* them in the original query. This allows us to more easily cut the
* query into pieces (during recursive planning) or deparse parts of
* the query (during subquery pushdown planning).
*/
originalQuery = (Query *) ResolveExternalParams((Node *) originalQuery,
boundParams);
Assert(originalQuery != NULL);
/* Step 2: Generate subplans for CTEs and complex subqueries */
/*
* Plan subqueries and CTEs that cannot be pushed down by recursively
* calling the planner and return the resulting plans to subPlanList.
* Note that GenerateSubplansForSubqueriesAndCTEs will reset perminfoindexes
* for some RTEs in originalQuery->rtable list, while not changing
* originalQuery->rteperminfos. That's fine because we will go through
* standard_planner again, which will adjust things accordingly in
* set_plan_references>add_rtes_to_flat_rtable>add_rte_to_flat_rtable.
*/
List *subPlanList = GenerateSubplansForSubqueriesAndCTEs(planId, originalQuery,
plannerRestrictionContext);
/*
* If subqueries were recursively planned then we need to replan the query
* to get the new planner restriction context and apply planner transformations.
*
* We could simplify this code if the logical planner was capable of dealing
* with an original query. In that case, we would only have to filter the
* planner restriction context.
*
* Note that we check both for subplans and whether the query had CTEs
* prior to calling GenerateSubplansForSubqueriesAndCTEs. If none of
* the CTEs are referenced then there are no subplans, but we still want
* to retry the router planner.
*/
if (list_length(subPlanList) > 0 || hasCtes)
{
/*
* recursive planner should handle all the tree from bottom to
* top at single pass. i.e. It should have already recursively planned all
* required parts in its first pass. Hence, we expect allowRecursivePlanning
* to be true. Otherwise, this means we have bug at recursive planner,
* which needs to be handled. We add a check here and return error.
*/
if (!allowRecursivePlanning)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("recursive complex joins are only supported "
"when all distributed tables are co-located and "
"joined on their distribution columns")));
}
Query *newQuery = copyObject(originalQuery);
bool setPartitionedTablesInherited = false;
PlannerRestrictionContext *currentPlannerRestrictionContext =
CurrentPlannerRestrictionContext();
/* reset the current planner restrictions context */
ResetPlannerRestrictionContext(currentPlannerRestrictionContext);
/*
* We force standard_planner to treat partitioned tables as regular tables
* by clearing the inh flag on RTEs. We already did this at the start of
* distributed_planner, but on a copy of the original query, so we need
* to do it again here.
*/
AdjustPartitioningForDistributedPlanning(ExtractRangeTableEntryList(newQuery),
setPartitionedTablesInherited);
/*
* Some relations may have been removed from the query, but we can skip
* AssignRTEIdentities since we currently do not rely on RTE identities
* being contiguous.
*/
standard_planner(newQuery, NULL, 0, boundParams);
/* overwrite the old transformed query with the new transformed query */
*query = *newQuery;
/*
* recurse into CreateDistributedPlan with subqueries/CTEs replaced.
* We only allow recursive planning once, which should have already done all
* the necessary transformations. So, we do not allow recursive planning once again.
*/
allowRecursivePlanning = false;
distributedPlan = CreateDistributedPlan(planId, allowRecursivePlanning,
originalQuery, query, NULL, false,
plannerRestrictionContext);
/* distributedPlan cannot be null since hasUnresolvedParams argument was false */
Assert(distributedPlan != NULL);
distributedPlan->subPlanList = subPlanList;
return distributedPlan;
}
/*
* DML command returns a planning error, even after recursive planning. The
* logical planner cannot handle DML commands so return the plan with the
* error.
*/
if (IsModifyCommand(originalQuery))
{
return distributedPlan;
}
/*
* CTEs are stripped from the original query by RecursivelyPlanSubqueriesAndCTEs.
* If we get here and there are still CTEs that means that none of the CTEs are
* referenced. We therefore also strip the CTEs from the rewritten query.
*/
query->cteList = NIL;
Assert(originalQuery->cteList == NIL);
/* Step 3: Try Logical planner */
MultiTreeRoot *logicalPlan = MultiLogicalPlanCreate(originalQuery, query,
plannerRestrictionContext);
MultiLogicalPlanOptimize(logicalPlan);
/*
* This check is here to make it likely that all node types used in
* Citus are dumpable. Explain can dump logical and physical plans
* using the extended outfuncs infrastructure, but it's infeasible to
* test most plans. MultiQueryContainerNode always serializes the
* physical plan, so there's no need to check that separately
*/
CheckNodeIsDumpable((Node *) logicalPlan);
/* Create the physical plan */
distributedPlan = CreatePhysicalDistributedPlan(logicalPlan,
plannerRestrictionContext);
/* distributed plan currently should always succeed or error out */
Assert(distributedPlan && distributedPlan->planningError == NULL);
return distributedPlan;
}
/*
* EnsurePartitionTableNotReplicated errors out if the input relation is
* a partition table and the table has a replication factor greater than
* one.
*
* If the table is not a partition or replication factor is 1, the function
* becomes a no-op.
*/
void
EnsurePartitionTableNotReplicated(Oid relationId)
{
DeferredErrorMessage *deferredError =
DeferErrorIfPartitionTableNotSingleReplicated(relationId);
if (deferredError != NULL)
{
RaiseDeferredError(deferredError, ERROR);
}
}
/*
* DeferErrorIfPartitionTableNotSingleReplicated defers error if the input relation
* is a partition table with replication factor > 1. Otherwise, the function returns
* NULL.
*/
static DeferredErrorMessage *
DeferErrorIfPartitionTableNotSingleReplicated(Oid relationId)
{
if (PartitionTableNoLock(relationId) && !SingleReplicatedTable(relationId))
{
Oid parentOid = PartitionParentOid(relationId);
char *parentRelationTest = get_rel_name(parentOid);
StringInfo errorHint = makeStringInfo();
appendStringInfo(errorHint, "Run the query on the parent table "
"\"%s\" instead.", parentRelationTest);
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"modifications on partitions when replication "
"factor is greater than 1 is not supported",
NULL, errorHint->data);
}
return NULL;
}
/*
* ResolveExternalParams replaces the external parameters that appears
* in the query with the corresponding entries in the boundParams.
*
* Note that this function is inspired by eval_const_expr() on Postgres.
* We cannot use that function because it requires access to PlannerInfo.
*/
Node *
ResolveExternalParams(Node *inputNode, ParamListInfo boundParams)
{
/* consider resolving external parameters only when boundParams exists */
if (!boundParams)
{
return inputNode;
}
if (inputNode == NULL)
{
return NULL;
}
if (IsA(inputNode, Param))
{
Param *paramToProcess = (Param *) inputNode;
int numberOfParameters = boundParams->numParams;
int parameterId = paramToProcess->paramid;
int16 typeLength = 0;
bool typeByValue = false;
Datum constValue = 0;
if (paramToProcess->paramkind != PARAM_EXTERN)
{
return inputNode;
}
if (parameterId < 0)
{
return inputNode;
}
/* parameterId starts from 1 */
int parameterIndex = parameterId - 1;
if (parameterIndex >= numberOfParameters)
{
return inputNode;
}
ParamExternData *correspondingParameterData =
&boundParams->params[parameterIndex];
if (!(correspondingParameterData->pflags & PARAM_FLAG_CONST))
{
return inputNode;
}
get_typlenbyval(paramToProcess->paramtype, &typeLength, &typeByValue);
bool paramIsNull = correspondingParameterData->isnull;
if (paramIsNull)
{
constValue = 0;
}
else if (typeByValue)
{
constValue = correspondingParameterData->value;
}
else
{
/*
* Out of paranoia ensure that datum lives long enough,
* although bind params currently should always live
* long enough.
*/
constValue = datumCopy(correspondingParameterData->value, typeByValue,
typeLength);
}
return (Node *) makeConst(paramToProcess->paramtype, paramToProcess->paramtypmod,
paramToProcess->paramcollid, typeLength, constValue,
paramIsNull, typeByValue);
}
else if (IsA(inputNode, Query))
{
return (Node *) query_tree_mutator((Query *) inputNode, ResolveExternalParams,
boundParams, 0);
}
return expression_tree_mutator(inputNode, ResolveExternalParams, boundParams);
}
/*
* GetDistributedPlan returns the associated DistributedPlan for a CustomScan.
*
* Callers should only read from the returned data structure, since it may be
* the plan of a prepared statement and may therefore be reused.
*/
DistributedPlan *
GetDistributedPlan(CustomScan *customScan)
{
Assert(list_length(customScan->custom_private) == 1);
Node *node = (Node *) linitial(customScan->custom_private);
Assert(CitusIsA(node, DistributedPlan));
CheckNodeCopyAndSerialization(node);
DistributedPlan *distributedPlan = (DistributedPlan *) node;
return distributedPlan;
}
/*
* FinalizePlan combines local plan with distributed plan and creates a plan
* which can be run by the PostgreSQL executor.
*/
PlannedStmt *
FinalizePlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan)
{
PlannedStmt *finalPlan = NULL;
CustomScan *customScan = makeNode(CustomScan);
MultiExecutorType executorType = MULTI_EXECUTOR_INVALID_FIRST;
/* this field is used in JobExecutorType */
distributedPlan->relationIdList = localPlan->relationOids;
if (!distributedPlan->planningError)
{
executorType = JobExecutorType(distributedPlan);
}
switch (executorType)
{
case MULTI_EXECUTOR_ADAPTIVE:
{
customScan->methods = &AdaptiveExecutorCustomScanMethods;
break;
}
case MULTI_EXECUTOR_NON_PUSHABLE_INSERT_SELECT:
{
customScan->methods = &NonPushableInsertSelectCustomScanMethods;
break;
}
case MULTI_EXECUTOR_NON_PUSHABLE_MERGE_QUERY:
{
customScan->methods = &NonPushableMergeCommandCustomScanMethods;
break;
}
default:
{
customScan->methods = &DelayedErrorCustomScanMethods;
break;
}
}
if (IsMultiTaskPlan(distributedPlan))
{
/* if it is not a single task executable plan, inform user according to the log level */
if (MultiTaskQueryLogLevel != CITUS_LOG_LEVEL_OFF)
{
ereport(MultiTaskQueryLogLevel, (errmsg(
"multi-task query about to be executed"),
errhint(
"Queries are split to multiple tasks "
"if they have to be split into several"
" queries on the workers.")));
}
}
distributedPlan->queryId = localPlan->queryId;
Node *distributedPlanData = (Node *) distributedPlan;
customScan->custom_private = list_make1(distributedPlanData);
/* necessary to avoid extra Result node in PG15 */
customScan->flags = CUSTOMPATH_SUPPORT_BACKWARD_SCAN | CUSTOMPATH_SUPPORT_PROJECTION;
/*
* Fast path queries cannot have any subplans by definition, so skip
* expensive traversals.
*/
if (!distributedPlan->fastPathRouterPlan)
{
/*
* Record subplans used by distributed plan to make intermediate result
* pruning easier.
*
* We do this before finalizing the plan, because the combineQuery is
* rewritten by standard_planner in FinalizeNonRouterPlan.
*/
distributedPlan->usedSubPlanNodeList = FindSubPlanUsages(distributedPlan);
}
if (distributedPlan->combineQuery)
{
finalPlan = FinalizeNonRouterPlan(localPlan, distributedPlan, customScan);
}
else
{
finalPlan = FinalizeRouterPlan(localPlan, customScan);
}
return finalPlan;
}
/*
* FinalizeNonRouterPlan gets the distributed custom scan plan, and creates the
* final master select plan on the top of this distributed plan for adaptive executor.
*/
static PlannedStmt *
FinalizeNonRouterPlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan,
CustomScan *customScan)
{
PlannedStmt *finalPlan = PlanCombineQuery(distributedPlan, customScan);
finalPlan->queryId = localPlan->queryId;
finalPlan->utilityStmt = localPlan->utilityStmt;
/* add original range table list for access permission checks */
ConcatenateRTablesAndPerminfos(finalPlan, localPlan);
return finalPlan;
}
static void
ConcatenateRTablesAndPerminfos(PlannedStmt *mainPlan, PlannedStmt *concatPlan)
{
mainPlan->rtable = list_concat(mainPlan->rtable, concatPlan->rtable);
#if PG_VERSION_NUM >= PG_VERSION_16
/*
* concatPlan's range table list is concatenated to mainPlan's range table list
* therefore all the perminfoindexes should be updated to their value
* PLUS the highest perminfoindex in mainPlan's perminfos, which is exactly
* the list length.
*/
int mainPlan_highest_perminfoindex = list_length(mainPlan->permInfos);
ListCell *lc;
foreach(lc, concatPlan->rtable)
{
RangeTblEntry *rte = (RangeTblEntry *) lfirst(lc);
if (rte->perminfoindex != 0)
{
rte->perminfoindex = rte->perminfoindex + mainPlan_highest_perminfoindex;
}
}
/* finally, concatenate perminfos as well */
mainPlan->permInfos = list_concat(mainPlan->permInfos, concatPlan->permInfos);
#endif
}
/*
* FinalizeRouterPlan gets a CustomScan node which already wrapped distributed
* part of a router plan and sets it as the direct child of the router plan
* because we don't run any query on master node for router executable queries.
* Here, we also rebuild the column list to read from the remote scan.
*/
static PlannedStmt *
FinalizeRouterPlan(PlannedStmt *localPlan, CustomScan *customScan)
{
List *columnNameList = NIL;
customScan->custom_scan_tlist =
makeCustomScanTargetlistFromExistingTargetList(localPlan->planTree->targetlist);
customScan->scan.plan.targetlist =
makeTargetListFromCustomScanList(customScan->custom_scan_tlist);
/* extract the column names from the final targetlist*/
TargetEntry *targetEntry = NULL;
foreach_declared_ptr(targetEntry, customScan->scan.plan.targetlist)
{
String *columnName = makeString(targetEntry->resname);
columnNameList = lappend(columnNameList, columnName);
}
PlannedStmt *routerPlan = makeNode(PlannedStmt);
routerPlan->planTree = (Plan *) customScan;
RangeTblEntry *remoteScanRangeTableEntry = RemoteScanRangeTableEntry(columnNameList);
routerPlan->rtable = list_make1(remoteScanRangeTableEntry);
/* add original range table list for access permission checks */
ConcatenateRTablesAndPerminfos(routerPlan, localPlan);
routerPlan->canSetTag = true;
routerPlan->relationOids = NIL;
routerPlan->queryId = localPlan->queryId;
routerPlan->utilityStmt = localPlan->utilityStmt;
routerPlan->commandType = localPlan->commandType;
routerPlan->hasReturning = localPlan->hasReturning;
return routerPlan;
}
/*
* makeCustomScanTargetlistFromExistingTargetList rebuilds the targetlist from the remote
* query into a list that can be used as the custom_scan_tlist for our Citus Custom Scan.
*/
static List *
makeCustomScanTargetlistFromExistingTargetList(List *existingTargetlist)
{
List *custom_scan_tlist = NIL;
/* we will have custom scan range table entry as the first one in the list */
const int customScanRangeTableIndex = 1;
/* build a targetlist to read from the custom scan output */
TargetEntry *targetEntry = NULL;
foreach_declared_ptr(targetEntry, existingTargetlist)
{
Assert(IsA(targetEntry, TargetEntry));
/*
* This is unlikely to be hit because we would not need resjunk stuff
* at the toplevel of a router query - all things needing it have been
* pushed down.
*/
if (targetEntry->resjunk)
{
continue;
}
/* build target entry pointing to remote scan range table entry */
Var *newVar = makeVarFromTargetEntry(customScanRangeTableIndex, targetEntry);
if (newVar->vartype == RECORDOID || newVar->vartype == RECORDARRAYOID)
{
/*
* Add the anonymous composite type to the type cache and store
* the key in vartypmod. Eventually this makes its way into the
* TupleDesc used by the executor, which uses it to parse the
* query results from the workers in BuildTupleFromCStrings.
*/
newVar->vartypmod = BlessRecordExpression(targetEntry->expr);
}
TargetEntry *newTargetEntry = flatCopyTargetEntry(targetEntry);
newTargetEntry->expr = (Expr *) newVar;
custom_scan_tlist = lappend(custom_scan_tlist, newTargetEntry);
}
return custom_scan_tlist;
}
/*
* makeTargetListFromCustomScanList based on a custom_scan_tlist create the target list to
* use on the Citus Custom Scan Node. The targetlist differs from the custom_scan_tlist in
* a way that the expressions in the targetlist all are references to the index (resno) in
* the custom_scan_tlist in their varattno while the varno is replaced with INDEX_VAR
* instead of the range table entry index.
*/
static List *
makeTargetListFromCustomScanList(List *custom_scan_tlist)
{
List *targetList = NIL;
TargetEntry *targetEntry = NULL;
int resno = 1;
foreach_declared_ptr(targetEntry, custom_scan_tlist)
{
/*
* INDEX_VAR is used to reference back to the TargetEntry in custom_scan_tlist by
* its resno (index)
*/
Var *newVar = makeVarFromTargetEntry(INDEX_VAR, targetEntry);
TargetEntry *newTargetEntry = makeTargetEntry((Expr *) newVar, resno,
targetEntry->resname,
targetEntry->resjunk);
targetList = lappend(targetList, newTargetEntry);
resno++;
}
return targetList;
}
/*
* BlessRecordExpression ensures we can parse an anonymous composite type on the
* target list of a query that is sent to the worker.
*
* We cannot normally parse record types coming from the workers unless we
* "bless" the tuple descriptor, which adds a transient type to the type cache
* and assigns it a type mod value, which is the key in the type cache.
*/
int32
BlessRecordExpression(Expr *expr)
{
int32 typeMod = -1;
if (IsA(expr, FuncExpr) || IsA(expr, OpExpr))
{
/*
* Handle functions that return records on the target
* list, e.g. SELECT function_call(1,2);
*/
Oid resultTypeId = InvalidOid;
TupleDesc resultTupleDesc = NULL;
/* get_expr_result_type blesses the tuple descriptor */
TypeFuncClass typeClass = get_expr_result_type((Node *) expr, &resultTypeId,
&resultTupleDesc);
if (typeClass == TYPEFUNC_COMPOSITE)
{
typeMod = resultTupleDesc->tdtypmod;
}
}
else if (IsA(expr, RowExpr))
{
/*
* Handle row expressions, e.g. SELECT (1,2);
*/
RowExpr *rowExpr = (RowExpr *) expr;
ListCell *argCell = NULL;
int currentResno = 1;
TupleDesc rowTupleDesc = CreateTemplateTupleDesc(list_length(rowExpr->args));
foreach(argCell, rowExpr->args)
{
Node *rowArg = (Node *) lfirst(argCell);
Oid rowArgTypeId = exprType(rowArg);
int rowArgTypeMod = exprTypmod(rowArg);
if (rowArgTypeId == RECORDOID || rowArgTypeId == RECORDARRAYOID)
{
/* ensure nested rows are blessed as well */
rowArgTypeMod = BlessRecordExpression((Expr *) rowArg);
}
TupleDescInitEntry(rowTupleDesc, currentResno, NULL,
rowArgTypeId, rowArgTypeMod, 0);
TupleDescInitEntryCollation(rowTupleDesc, currentResno,
exprCollation(rowArg));
currentResno++;
}
BlessTupleDesc(rowTupleDesc);
typeMod = rowTupleDesc->tdtypmod;
}
else if (IsA(expr, ArrayExpr))
{
/*
* Handle row array expressions, e.g. SELECT ARRAY[(1,2)];
* Postgres allows ARRAY[(1,2),(1,2,3)]. We do not.
*/
ArrayExpr *arrayExpr = (ArrayExpr *) expr;
typeMod = BlessRecordExpressionList(arrayExpr->elements);
}
else if (IsA(expr, NullIfExpr))
{
NullIfExpr *nullIfExpr = (NullIfExpr *) expr;
typeMod = BlessRecordExpressionList(nullIfExpr->args);
}
else if (IsA(expr, MinMaxExpr))
{
MinMaxExpr *minMaxExpr = (MinMaxExpr *) expr;
typeMod = BlessRecordExpressionList(minMaxExpr->args);
}
else if (IsA(expr, CoalesceExpr))
{
CoalesceExpr *coalesceExpr = (CoalesceExpr *) expr;
typeMod = BlessRecordExpressionList(coalesceExpr->args);
}
else if (IsA(expr, CaseExpr))
{
CaseExpr *caseExpr = (CaseExpr *) expr;
List *results = NIL;
ListCell *whenCell = NULL;
foreach(whenCell, caseExpr->args)
{
CaseWhen *whenArg = (CaseWhen *) lfirst(whenCell);
results = lappend(results, whenArg->result);
}
if (caseExpr->defresult != NULL)
{
results = lappend(results, caseExpr->defresult);
}
typeMod = BlessRecordExpressionList(results);
}
return typeMod;
}
/*
* BlessRecordExpressionList maps BlessRecordExpression over a list.
* Returns typmod of all expressions, or -1 if they are not all the same.
* Ignores expressions with a typmod of -1.
*/
static int32
BlessRecordExpressionList(List *exprs)
{
int32 finalTypeMod = -1;
ListCell *exprCell = NULL;
foreach(exprCell, exprs)
{
Node *exprArg = (Node *) lfirst(exprCell);
int32 exprTypeMod = BlessRecordExpression((Expr *) exprArg);
if (exprTypeMod == -1)
{
continue;
}
else if (finalTypeMod == -1)
{
finalTypeMod = exprTypeMod;
}
else if (finalTypeMod != exprTypeMod)
{
return -1;
}
}
return finalTypeMod;
}
/*
* RemoteScanRangeTableEntry creates a range table entry from given column name
* list to represent a remote scan.
*/
RangeTblEntry *
RemoteScanRangeTableEntry(List *columnNameList)
{
RangeTblEntry *remoteScanRangeTableEntry = makeNode(RangeTblEntry);
/* we use RTE_VALUES for custom scan because we can't look up relation */
remoteScanRangeTableEntry->rtekind = RTE_VALUES;
remoteScanRangeTableEntry->eref = makeAlias("remote_scan", columnNameList);
remoteScanRangeTableEntry->inh = false;
remoteScanRangeTableEntry->inFromCl = true;
return remoteScanRangeTableEntry;
}
/*
* CheckNodeIsDumpable checks that the passed node can be dumped using
* nodeToString(). As this checks is expensive, it's only active when
* assertions are enabled.
*/
static void
CheckNodeIsDumpable(Node *node)
{
#ifdef USE_ASSERT_CHECKING
char *out = nodeToString(node);
pfree(out);
#endif
}
/*
* CheckNodeCopyAndSerialization checks copy/dump/read functions
* for nodes and returns copy of the input.
*
* It is only active when assertions are enabled, otherwise it returns
* the input directly. We use this to confirm that our serialization
* and copy logic produces the correct plan during regression tests.
*
* It does not check string equality on node dumps due to differences
* in some Postgres types.
*/
static Node *
CheckNodeCopyAndSerialization(Node *node)
{
#ifdef USE_ASSERT_CHECKING
char *out = nodeToString(node);
Node *nodeCopy = copyObject(node);
char *outCopy = nodeToString(nodeCopy);
pfree(out);
pfree(outCopy);
return nodeCopy;
#else
return node;
#endif
}
/*
* multi_join_restriction_hook is a hook called by postgresql standard planner
* to notify us about various planning information regarding joins. We use
* it to learn about the joining column.
*/
void
multi_join_restriction_hook(PlannerInfo *root,
RelOptInfo *joinrel,
RelOptInfo *outerrel,
RelOptInfo *innerrel,
JoinType jointype,
JoinPathExtraData *extra)
{
if (bms_is_empty(innerrel->relids) || bms_is_empty(outerrel->relids))
{
/*
* We do not expect empty relids. Still, ignoring such JoinRestriction is
* preferable for two reasons:
* 1. This might be a query that doesn't rely on JoinRestrictions at all (e.g.,
* local query).
* 2. We cannot process them when they are empty (and likely to segfault if
* we allow as-is).
*/
ereport(DEBUG1, (errmsg("Join restriction information is NULL")));
}
/*
* Use a memory context that's guaranteed to live long enough, could be
* called in a more shortly lived one (e.g. with GEQO).
*/
PlannerRestrictionContext *plannerRestrictionContext =
CurrentPlannerRestrictionContext();
MemoryContext restrictionsMemoryContext = plannerRestrictionContext->memoryContext;
MemoryContext oldMemoryContext = MemoryContextSwitchTo(restrictionsMemoryContext);
JoinRestrictionContext *joinRestrictionContext =
plannerRestrictionContext->joinRestrictionContext;
Assert(joinRestrictionContext != NULL);
JoinRestriction *joinRestriction = palloc0(sizeof(JoinRestriction));
joinRestriction->joinType = jointype;
joinRestriction->plannerInfo = root;
/*
* We create a copy of restrictInfoList and relids because with geqo they may
* be created in a memory context which will be deleted when we still need it,
* thus we create a copy of it in our memory context.
*/
joinRestriction->joinRestrictInfoList = copyObject(extra->restrictlist);
joinRestriction->innerrelRelids = bms_copy(innerrel->relids);
joinRestriction->outerrelRelids = bms_copy(outerrel->relids);
joinRestrictionContext->joinRestrictionList =
lappend(joinRestrictionContext->joinRestrictionList, joinRestriction);
/*
* Keep track if we received any semi joins here. If we didn't we can
* later safely convert any semi joins in the rewritten query to inner
* joins.
*/
joinRestrictionContext->hasSemiJoin = joinRestrictionContext->hasSemiJoin ||
extra->sjinfo->jointype == JOIN_SEMI;
joinRestrictionContext->hasOuterJoin = joinRestrictionContext->hasOuterJoin ||
IS_OUTER_JOIN(extra->sjinfo->jointype);
MemoryContextSwitchTo(oldMemoryContext);
}
/*
* multi_relation_restriction_hook is a hook called by postgresql standard planner
* to notify us about various planning information regarding a relation. We use
* it to retrieve restrictions on relations.
*/
void
multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo,
Index restrictionIndex, RangeTblEntry *rte)
{
CitusTableCacheEntry *cacheEntry = NULL;
if (ReplaceCitusExtraDataContainer && IsCitusExtraDataContainerRelation(rte))
{
/*
* We got here by planning the query part that needs to be executed on the query
* coordinator node.
* We have verified the occurrence of the citus_extra_datacontainer function
* encoding the remote scan we plan to execute here. We will replace all paths
* with a path describing our custom scan.
*/
Path *path = CreateCitusCustomScanPath(root, relOptInfo, restrictionIndex, rte,
ReplaceCitusExtraDataContainerWithCustomScan);
/* replace all paths with our custom scan and recalculate cheapest */
relOptInfo->pathlist = list_make1(path);
set_cheapest(relOptInfo);
return;
}
AdjustReadIntermediateResultCost(rte, relOptInfo);
AdjustReadIntermediateResultArrayCost(rte, relOptInfo);
if (rte->rtekind != RTE_RELATION)
{
return;
}
/*
* Use a memory context that's guaranteed to live long enough, could be
* called in a more shortly lived one (e.g. with GEQO).
*/
PlannerRestrictionContext *plannerRestrictionContext =
CurrentPlannerRestrictionContext();
MemoryContext restrictionsMemoryContext = plannerRestrictionContext->memoryContext;
MemoryContext oldMemoryContext = MemoryContextSwitchTo(restrictionsMemoryContext);
bool isCitusTable = IsCitusTable(rte->relid);
RelationRestriction *relationRestriction = palloc0(sizeof(RelationRestriction));
relationRestriction->index = restrictionIndex;
relationRestriction->relationId = rte->relid;
relationRestriction->rte = rte;
relationRestriction->relOptInfo = relOptInfo;
relationRestriction->citusTable = isCitusTable;
relationRestriction->plannerInfo = root;
/* see comments on GetVarFromAssignedParam() */
relationRestriction->outerPlanParamsList = OuterPlanParamsList(root);
relationRestriction->translatedVars = TranslatedVars(root,
relationRestriction->index);
RelationRestrictionContext *relationRestrictionContext =
plannerRestrictionContext->relationRestrictionContext;
/*
* We're also keeping track of whether all participant
* tables are reference tables.
*/
if (isCitusTable)
{
cacheEntry = GetCitusTableCacheEntry(rte->relid);
#if PG_VERSION_NUM == PG_VERSION_15
/*
* Postgres 15.0 had a bug regarding inherited statistics expressions,
* which is fixed in 15.1 via Postgres commit
* 1f1865e9083625239769c26f68b9c2861b8d4b1c.
*
* Hence, we only set this value on exactly PG15.0
*/
relOptInfo->statlist = NIL;
#endif
relationRestrictionContext->allReferenceTables &=
IsCitusTableTypeCacheEntry(cacheEntry, REFERENCE_TABLE);
}
relationRestrictionContext->relationRestrictionList =
lappend(relationRestrictionContext->relationRestrictionList, relationRestriction);
MemoryContextSwitchTo(oldMemoryContext);
}
/*
* multi_get_relation_info_hook modifies the relation's indexlist
* if necessary, to avoid a crash in PG16 caused by our
* Citus function AdjustPartitioningForDistributedPlanning().
*
* AdjustPartitioningForDistributedPlanning() is a hack that we use
* to prevent Postgres' standard_planner() to expand all the partitions
* for the distributed planning when a distributed partitioned table
* is queried. It is required for both correctness and performance
* reasons. Although we can eliminate the use of the function for
* the correctness (e.g., make sure that rest of the planner can handle
* partitions), it's performance implication is hard to avoid. Certain
* planning logic of Citus (such as router or query pushdown) relies
* heavily on the relationRestrictionList. If
* AdjustPartitioningForDistributedPlanning() is removed, all the
* partitions show up in the relationRestrictionList, causing high
* planning times for such queries.
*/
void
multi_get_relation_info_hook(PlannerInfo *root, Oid relationObjectId, bool inhparent,
RelOptInfo *rel)
{
if (!CitusHasBeenLoaded())
{
return;
}
Index varno = rel->relid;
RangeTblEntry *rangeTableEntry = planner_rt_fetch(varno, root);
if (RTEWentThroughAdjustPartitioning(rangeTableEntry))
{
ListCell *lc = NULL;
foreach(lc, rel->indexlist)
{
IndexOptInfo *indexOptInfo = (IndexOptInfo *) lfirst(lc);
if (get_rel_relkind(indexOptInfo->indexoid) == RELKIND_PARTITIONED_INDEX)
{
/*
* Normally, we should not need this. However, the combination of
* Postgres commit 3c569049b7b502bb4952483d19ce622ff0af5fd6 and
* Citus function AdjustPartitioningForDistributedPlanning()
* forces us to do this. The commit expects partitioned indexes
* to belong to relations with "inh" flag set properly. Whereas, the
* function overrides "inh" flag. To avoid a crash,
* we go over the list of indexinfos and remove all partitioned indexes.
* Partitioned indexes were ignored pre PG16 anyway, we are essentially
* not breaking any logic.
*/
rel->indexlist = foreach_delete_current(rel->indexlist, lc);
}
}
}
}
/*
* TranslatedVars deep copies the translated vars for the given relation index
* if there is any append rel list.
*/
static List *
TranslatedVars(PlannerInfo *root, int relationIndex)
{
List *translatedVars = NIL;
if (root->append_rel_list != NIL)
{
AppendRelInfo *targetAppendRelInfo =
FindTargetAppendRelInfo(root, relationIndex);
if (targetAppendRelInfo != NULL)
{
/* postgres deletes translated_vars, hence we deep copy them here */
Node *targetNode = NULL;
foreach_declared_ptr(targetNode, targetAppendRelInfo->translated_vars)
{
translatedVars =
lappend(translatedVars, copyObject(targetNode));
}
}
}
return translatedVars;
}
/*
* FindTargetAppendRelInfo finds the target append rel info for the given
* relation rte index.
*/
static AppendRelInfo *
FindTargetAppendRelInfo(PlannerInfo *root, int relationRteIndex)
{
AppendRelInfo *appendRelInfo = NULL;
/* iterate on the queries that are part of UNION ALL subselects */
foreach_declared_ptr(appendRelInfo, root->append_rel_list)
{
/*
* We're only interested in the child rel that is equal to the
* relation we're investigating. Here we don't need to find the offset
* because postgres adds an offset to child_relid and parent_relid after
* calling multi_relation_restriction_hook.
*/
if (appendRelInfo->child_relid == relationRteIndex)
{
return appendRelInfo;
}
}
return NULL;
}
/*
* AdjustReadIntermediateResultCost adjusts the row count and total cost
* of a read_intermediate_result call based on the file size.
*/
static void
AdjustReadIntermediateResultCost(RangeTblEntry *rangeTableEntry, RelOptInfo *relOptInfo)
{
if (rangeTableEntry->rtekind != RTE_FUNCTION ||
list_length(rangeTableEntry->functions) != 1)
{
/* avoid more expensive checks below for non-functions */
return;
}
if (!CitusHasBeenLoaded() || !CheckCitusVersion(DEBUG5))
{
/* read_intermediate_result may not exist */
return;
}
if (!ContainsReadIntermediateResultFunction((Node *) rangeTableEntry->functions))
{
return;
}
RangeTblFunction *rangeTableFunction = (RangeTblFunction *) linitial(
rangeTableEntry->functions);
FuncExpr *funcExpression = (FuncExpr *) rangeTableFunction->funcexpr;
Const *resultIdConst = (Const *) linitial(funcExpression->args);
if (!IsA(resultIdConst, Const))
{
/* not sure how to interpret non-const */
return;
}
Datum resultIdDatum = resultIdConst->constvalue;
Const *resultFormatConst = (Const *) lsecond(funcExpression->args);
if (!IsA(resultFormatConst, Const))
{
/* not sure how to interpret non-const */
return;
}
AdjustReadIntermediateResultsCostInternal(relOptInfo,
rangeTableFunction->funccoltypes,
1, &resultIdDatum, resultFormatConst);
}
/*
* AdjustReadIntermediateResultArrayCost adjusts the row count and total cost
* of a read_intermediate_results(resultIds, format) call based on the file size.
*/
static void
AdjustReadIntermediateResultArrayCost(RangeTblEntry *rangeTableEntry,
RelOptInfo *relOptInfo)
{
Datum *resultIdArray = NULL;
int resultIdCount = 0;
if (rangeTableEntry->rtekind != RTE_FUNCTION ||
list_length(rangeTableEntry->functions) != 1)
{
/* avoid more expensive checks below for non-functions */
return;
}
if (!CitusHasBeenLoaded() || !CheckCitusVersion(DEBUG5))
{
/* read_intermediate_result may not exist */
return;
}
if (!ContainsReadIntermediateResultArrayFunction((Node *) rangeTableEntry->functions))
{
return;
}
RangeTblFunction *rangeTableFunction =
(RangeTblFunction *) linitial(rangeTableEntry->functions);
FuncExpr *funcExpression = (FuncExpr *) rangeTableFunction->funcexpr;
Const *resultIdConst = (Const *) linitial(funcExpression->args);
if (!IsA(resultIdConst, Const))
{
/* not sure how to interpret non-const */
return;
}
Datum resultIdArrayDatum = resultIdConst->constvalue;
deconstruct_array(DatumGetArrayTypeP(resultIdArrayDatum), TEXTOID, -1, false,
'i', &resultIdArray, NULL, &resultIdCount);
Const *resultFormatConst = (Const *) lsecond(funcExpression->args);
if (!IsA(resultFormatConst, Const))
{
/* not sure how to interpret non-const */
return;
}
AdjustReadIntermediateResultsCostInternal(relOptInfo,
rangeTableFunction->funccoltypes,
resultIdCount, resultIdArray,
resultFormatConst);
}
/*
* AdjustReadIntermediateResultsCostInternal adjusts the row count and total cost
* of reading intermediate results based on file sizes.
*/
static void
AdjustReadIntermediateResultsCostInternal(RelOptInfo *relOptInfo, List *columnTypes,
int resultIdCount, Datum *resultIds,
Const *resultFormatConst)
{
PathTarget *reltarget = relOptInfo->reltarget;
List *pathList = relOptInfo->pathlist;
double rowCost = 0.;
double rowSizeEstimate = 0;
double rowCountEstimate = 0.;
double ioCost = 0.;
QualCost funcCost = { 0., 0. };
int64 totalResultSize = 0;
ListCell *typeCell = NULL;
Datum resultFormatDatum = resultFormatConst->constvalue;
Oid resultFormatId = DatumGetObjectId(resultFormatDatum);
bool binaryFormat = (resultFormatId == BinaryCopyFormatId());
for (int index = 0; index < resultIdCount; index++)
{
char *resultId = TextDatumGetCString(resultIds[index]);
int64 resultSize = IntermediateResultSize(resultId);
if (resultSize < 0)
{
/* result does not exist, will probably error out later on */
return;
}
if (binaryFormat)
{
/* subtract 11-byte signature + 8 byte header + 2-byte footer */
totalResultSize -= 21;
}
totalResultSize += resultSize;
}
/* start with the cost of evaluating quals */
rowCost += relOptInfo->baserestrictcost.per_tuple;
/* postgres' estimate for the width of the rows */
rowSizeEstimate += reltarget->width;
/* add 2 bytes for column count (binary) or line separator (text) */
rowSizeEstimate += 2;
foreach(typeCell, columnTypes)
{
Oid columnTypeId = lfirst_oid(typeCell);
Oid inputFunctionId = InvalidOid;
Oid typeIOParam = InvalidOid;
if (binaryFormat)
{
getTypeBinaryInputInfo(columnTypeId, &inputFunctionId, &typeIOParam);
/* binary format: 4 bytes for field size */
rowSizeEstimate += 4;
}
else
{
getTypeInputInfo(columnTypeId, &inputFunctionId, &typeIOParam);
/* text format: 1 byte for tab separator */
rowSizeEstimate += 1;
}
/* add the cost of parsing a column */
add_function_cost(NULL, inputFunctionId, NULL, &funcCost);
}
rowCost += funcCost.per_tuple;
/* estimate the number of rows based on the file size and estimated row size */
rowCountEstimate = Max(1, (double) totalResultSize / rowSizeEstimate);
/* cost of reading the data */
ioCost = seq_page_cost * totalResultSize / BLCKSZ;
Assert(pathList != NIL);
/* tell the planner about the cost and row count of the function */
Path *path = (Path *) linitial(pathList);
path->rows = rowCountEstimate;
path->total_cost = rowCountEstimate * rowCost + ioCost;
path->startup_cost = funcCost.startup + relOptInfo->baserestrictcost.startup;
}
/*
* OuterPlanParamsList creates a list of RootPlanParams for outer nodes of the
* given root. The first item in the list corresponds to parent_root, and the
* last item corresponds to the outer most node.
*/
static List *
OuterPlanParamsList(PlannerInfo *root)
{
List *planParamsList = NIL;
for (PlannerInfo *outerNodeRoot = root->parent_root; outerNodeRoot != NULL;
outerNodeRoot = outerNodeRoot->parent_root)
{
RootPlanParams *rootPlanParams = palloc0(sizeof(RootPlanParams));
rootPlanParams->root = outerNodeRoot;
/*
* TODO: In SearchPlannerParamList() we are only interested in Var plan
* params, consider copying just them here.
*/
rootPlanParams->plan_params = CopyPlanParamList(outerNodeRoot->plan_params);
planParamsList = lappend(planParamsList, rootPlanParams);
}
return planParamsList;
}
/*
* CopyPlanParamList deep copies the input PlannerParamItem list and returns the newly
* allocated list.
* Note that we cannot use copyObject() function directly since there is no support for
* copying PlannerParamItem structs.
*/
static List *
CopyPlanParamList(List *originalPlanParamList)
{
ListCell *planParamCell = NULL;
List *copiedPlanParamList = NIL;
foreach(planParamCell, originalPlanParamList)
{
PlannerParamItem *originalParamItem = lfirst(planParamCell);
PlannerParamItem *copiedParamItem = makeNode(PlannerParamItem);
copiedParamItem->paramId = originalParamItem->paramId;
copiedParamItem->item = copyObject(originalParamItem->item);
copiedPlanParamList = lappend(copiedPlanParamList, copiedParamItem);
}
return copiedPlanParamList;
}
/*
* CreateAndPushPlannerRestrictionContext creates a new relation restriction context
* and a new join context, inserts it to the beginning of the
* plannerRestrictionContextList. Finally, the planner restriction context is
* inserted to the beginning of the plannerRestrictionContextList and it is returned.
*/
static PlannerRestrictionContext *
CreateAndPushPlannerRestrictionContext(void)
{
PlannerRestrictionContext *plannerRestrictionContext =
palloc0(sizeof(PlannerRestrictionContext));
plannerRestrictionContext->relationRestrictionContext =
palloc0(sizeof(RelationRestrictionContext));
plannerRestrictionContext->joinRestrictionContext =
palloc0(sizeof(JoinRestrictionContext));
plannerRestrictionContext->fastPathRestrictionContext =
palloc0(sizeof(FastPathRestrictionContext));
plannerRestrictionContext->memoryContext = CurrentMemoryContext;
/* we'll apply logical AND as we add tables */
plannerRestrictionContext->relationRestrictionContext->allReferenceTables = true;
plannerRestrictionContextList = lcons(plannerRestrictionContext,
plannerRestrictionContextList);
return plannerRestrictionContext;
}
/*
* TranslatedVarsForRteIdentity gets an rteIdentity and returns the
* translatedVars that belong to the range table relation. If no
* translatedVars found, the function returns NIL;
*/
List *
TranslatedVarsForRteIdentity(int rteIdentity)
{
PlannerRestrictionContext *currentPlannerRestrictionContext =
CurrentPlannerRestrictionContext();
List *relationRestrictionList =
currentPlannerRestrictionContext->relationRestrictionContext->
relationRestrictionList;
RelationRestriction *relationRestriction = NULL;
foreach_declared_ptr(relationRestriction, relationRestrictionList)
{
if (GetRTEIdentity(relationRestriction->rte) == rteIdentity)
{
return relationRestriction->translatedVars;
}
}
return NIL;
}
/*
* CurrentRestrictionContext returns the most recently added
* PlannerRestrictionContext from the plannerRestrictionContextList list.
*/
static PlannerRestrictionContext *
CurrentPlannerRestrictionContext(void)
{
Assert(plannerRestrictionContextList != NIL);
PlannerRestrictionContext *plannerRestrictionContext =
(PlannerRestrictionContext *) linitial(plannerRestrictionContextList);
if (plannerRestrictionContext == NULL)
{
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
errmsg("planner restriction context stack was empty"),
errdetail("Please report this to the Citus core team.")));
}
return plannerRestrictionContext;
}
/*
* PopPlannerRestrictionContext removes the most recently added restriction contexts from
* the planner restriction context list. The function assumes the list is not empty.
*/
static void
PopPlannerRestrictionContext(void)
{
plannerRestrictionContextList = list_delete_first(plannerRestrictionContextList);
}
/*
* ResetPlannerRestrictionContext resets the element of the given planner
* restriction context.
*/
static void
ResetPlannerRestrictionContext(PlannerRestrictionContext *plannerRestrictionContext)
{
plannerRestrictionContext->relationRestrictionContext =
palloc0(sizeof(RelationRestrictionContext));
plannerRestrictionContext->joinRestrictionContext =
palloc0(sizeof(JoinRestrictionContext));
plannerRestrictionContext->fastPathRestrictionContext =
palloc0(sizeof(FastPathRestrictionContext));
/* we'll apply logical AND as we add tables */
plannerRestrictionContext->relationRestrictionContext->allReferenceTables = true;
}
/*
* HasUnresolvedExternParamsWalker returns true if the passed in expression
* has external parameters that are not contained in boundParams, false
* otherwise.
*/
bool
HasUnresolvedExternParamsWalker(Node *expression, ParamListInfo boundParams)
{
if (expression == NULL)
{
return false;
}
if (IsA(expression, Param))
{
Param *param = (Param *) expression;
int paramId = param->paramid;
/* only care about user supplied parameters */
if (param->paramkind != PARAM_EXTERN)
{
return false;
}
/* check whether parameter is available (and valid) */
if (boundParams && paramId > 0 && paramId <= boundParams->numParams)
{
ParamExternData *externParam = NULL;
/* give hook a chance in case parameter is dynamic */
if (boundParams->paramFetch != NULL)
{
ParamExternData externParamPlaceholder;
externParam = (*boundParams->paramFetch)(boundParams, paramId, false,
&externParamPlaceholder);
}
else
{
externParam = &boundParams->params[paramId - 1];
}
Oid paramType = externParam->ptype;
if (OidIsValid(paramType))
{
return false;
}
}
return true;
}
/* keep traversing */
if (IsA(expression, Query))
{
return query_tree_walker((Query *) expression,
HasUnresolvedExternParamsWalker,
boundParams,
0);
}
else
{
return expression_tree_walker(expression,
HasUnresolvedExternParamsWalker,
boundParams);
}
}
/*
* ContainsSingleShardTable returns true if given query contains reference
* to a single-shard table.
*/
bool
ContainsSingleShardTable(Query *query)
{
RTEListProperties *rteListProperties = GetRTEListPropertiesForQuery(query);
return rteListProperties->hasSingleShardDistTable;
}
/*
* GetRTEListPropertiesForQuery is a wrapper around GetRTEListProperties that
* returns RTEListProperties for the rte list retrieved from query.
*/
RTEListProperties *
GetRTEListPropertiesForQuery(Query *query)
{
List *rteList = ExtractRangeTableEntryList(query);
return GetRTEListProperties(rteList);
}
/*
* GetRTEListProperties returns RTEListProperties struct processing the given
* rangeTableList.
*/
static RTEListProperties *
GetRTEListProperties(List *rangeTableList)
{
RTEListProperties *rteListProperties = palloc0(sizeof(RTEListProperties));
RangeTblEntry *rangeTableEntry = NULL;
foreach_declared_ptr(rangeTableEntry, rangeTableList)
{
if (rangeTableEntry->rtekind != RTE_RELATION)
{
continue;
}
else if (rangeTableEntry->relkind == RELKIND_VIEW)
{
/*
* Skip over views, distributed tables within (regular) views are
* already in rangeTableList.
*/
continue;
}
if (rangeTableEntry->relkind == RELKIND_MATVIEW)
{
/*
* Record materialized views as they are similar to postgres local tables
* but it is nice to record them separately.
*
* Regular tables, partitioned tables or foreign tables can be a local or
* distributed tables and we can qualify them accurately.
*
* For regular views, we don't care because their definitions are already
* in the same query tree and we can detect what is inside the view definition.
*
* For materialized views, they are just local tables in the queries. But, when
* REFRESH MATERIALIZED VIEW is used, they behave similar to regular views, adds
* the view definition to the query. Hence, it is useful to record it seperately
* and let the callers decide on what to do.
*/
rteListProperties->hasMaterializedView = true;
continue;
}
Oid relationId = rangeTableEntry->relid;
CitusTableCacheEntry *cacheEntry = LookupCitusTableCacheEntry(relationId);
if (!cacheEntry)
{
rteListProperties->hasPostgresLocalTable = true;
}
else if (IsCitusTableTypeCacheEntry(cacheEntry, REFERENCE_TABLE))
{
rteListProperties->hasReferenceTable = true;
}
else if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_LOCAL_TABLE))
{
rteListProperties->hasCitusLocalTable = true;
}
else if (IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE))
{
rteListProperties->hasDistributedTable = true;
if (!HasDistributionKeyCacheEntry(cacheEntry))
{
rteListProperties->hasSingleShardDistTable = true;
}
else
{
rteListProperties->hasDistTableWithShardKey = true;
}
}
else
{
/* it's not expected, but let's do a bug catch here */
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
errmsg("encountered with an unexpected citus "
"table type while processing range table "
"entries of query")));
}
}
rteListProperties->hasCitusTable = (rteListProperties->hasDistributedTable ||
rteListProperties->hasReferenceTable ||
rteListProperties->hasCitusLocalTable);
return rteListProperties;
}
/*
* WarnIfListHasForeignDistributedTable iterates the given list and logs a WARNING
* if the given relation is a distributed foreign table.
* We do that because now we only support Citus Local Tables for foreign tables.
*/
static void
WarnIfListHasForeignDistributedTable(List *rangeTableList)
{
static bool DistributedForeignTableWarningPrompted = false;
RangeTblEntry *rangeTableEntry = NULL;
foreach_declared_ptr(rangeTableEntry, rangeTableList)
{
if (DistributedForeignTableWarningPrompted)
{
return;
}
Oid relationId = rangeTableEntry->relid;
if (IsForeignTable(relationId) && IsCitusTable(relationId) &&
!IsCitusTableType(relationId, CITUS_LOCAL_TABLE))
{
DistributedForeignTableWarningPrompted = true;
ereport(WARNING, (errmsg(
"support for distributed foreign tables are deprecated, "
"please use Citus managed local tables"),
(errdetail(
"Foreign tables can be added to metadata using UDF: "
"citus_add_local_table_to_metadata()"))));
}
}
}
static bool
CheckPostPlanDistribution(bool isDistributedQuery,
Query *origQuery, List *rangeTableList,
Query *plannedQuery)
{
if (isDistributedQuery)
{
Node *origQuals = origQuery->jointree->quals;
Node *plannedQuals = plannedQuery->jointree->quals;
#if PG_VERSION_NUM >= PG_VERSION_17
if (IsMergeQuery(origQuery))
{
origQuals = origQuery->mergeJoinCondition;
plannedQuals = plannedQuery->mergeJoinCondition;
}
#endif
/*
* The WHERE quals have been eliminated by the Postgres planner, possibly by
* an OR clause that was simplified to TRUE. In such cases, we need to check
* if the planned query still requires distributed planning.
*/
if (origQuals != NULL && plannedQuals == NULL)
{
List *rtesPostPlan = ExtractRangeTableEntryList(plannedQuery);
if (list_length(rtesPostPlan) < list_length(rangeTableList))
{
isDistributedQuery = ListContainsDistributedTableRTE(
rtesPostPlan, NULL);
}
}
}
return isDistributedQuery;
}