From da47a03b182b9b0f8ec98ed69f86b61724b6fc8e Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Thu, 22 Jun 2017 16:56:14 +0200 Subject: [PATCH] Move INSERT ... SELECT planning logic into one place --- .../distributed/planner/deparse_shard_query.c | 1 + .../planner/insert_select_planner.c | 1134 ++++++++++++++++- .../planner/multi_logical_planner.c | 1 + .../distributed/planner/multi_planner.c | 12 +- .../planner/multi_router_planner.c | 978 +------------- .../distributed/test/deparse_shard_query.c | 1 + src/backend/distributed/utils/citus_clauses.c | 1 + .../distributed/insert_select_planner.h | 10 +- .../distributed/multi_router_planner.h | 8 +- 9 files changed, 1101 insertions(+), 1045 deletions(-) diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index 90e070601..318dae558 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -16,6 +16,7 @@ #include "distributed/citus_nodefuncs.h" #include "distributed/citus_ruleutils.h" #include "distributed/deparse_shard_query.h" +#include "distributed/insert_select_planner.h" #include "distributed/metadata_cache.h" #include "distributed/multi_physical_planner.h" #include "distributed/multi_router_planner.h" diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index 14f0ed99c..405c082df 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -10,34 +10,1049 @@ #include "postgres.h" +#include "catalog/pg_class.h" +#include "distributed/citus_clauses.h" #include "distributed/citus_ruleutils.h" +#include "distributed/colocation_utils.h" #include "distributed/errormessage.h" #include "distributed/insert_select_planner.h" +#include "distributed/metadata_cache.h" #include "distributed/multi_executor.h" #include "distributed/multi_logical_planner.h" +#include "distributed/multi_logical_optimizer.h" #include "distributed/multi_physical_planner.h" #include "distributed/multi_router_planner.h" #include "distributed/pg_dist_partition.h" +#include "distributed/resource_lock.h" #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" #include "nodes/parsenodes.h" +#include "optimizer/clauses.h" #include "optimizer/planner.h" +#include "optimizer/restrictinfo.h" +#include "optimizer/var.h" #include "parser/parsetree.h" #include "parser/parse_coerce.h" #include "parser/parse_relation.h" #include "utils/lsyscache.h" -static DeferredErrorMessage * DeferErrorIfCoordinatorInsertSelectUnsupported( - Query *insertSelectQuery); +static MultiPlan * CreateDistributedInsertSelectPlan(Query *originalQuery, + PlannerRestrictionContext * + plannerRestrictionContext); +static bool SafeToPushDownSubquery(PlannerRestrictionContext *plannerRestrictionContext, + Query *originalQuery); +static Task * RouterModifyTaskForShardInterval(Query *originalQuery, + ShardInterval *shardInterval, + RelationRestrictionContext * + restrictionContext, + uint32 taskIdIndex, + bool allRelationsJoinedOnPartitionKey); +static DeferredErrorMessage * DistributedInsertSelectSupported(Query *queryTree, + RangeTblEntry *insertRte, + RangeTblEntry *subqueryRte, + bool allReferenceTables); +static DeferredErrorMessage * MultiTaskRouterSelectQuerySupported(Query *query); +static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query, + RangeTblEntry *insertRte, + RangeTblEntry * + subqueryRte, + Oid * + selectPartitionColumnTableId); +static MultiPlan * CreateCoordinatorInsertSelectPlan(Query *parse); +static DeferredErrorMessage * CoordinatorInsertSelectSupported(Query *insertSelectQuery); static Query * WrapSubquery(Query *subquery); +static void CastSelectTargetList(List *selectTargetList, Oid targetRelationId, + List *insertTargetList); + + +/* + * InsertSelectIntoDistributedTable returns true when the input query is an + * INSERT INTO ... SELECT kind of query and the target is a distributed + * table. + * + * Note that the input query should be the original parsetree of + * the query (i.e., not passed trough the standard planner). + * + * This function is inspired from getInsertSelectQuery() on + * rewrite/rewriteManip.c. + */ +bool +InsertSelectIntoDistributedTable(Query *query) +{ + CmdType commandType = query->commandType; + List *fromList = NULL; + RangeTblRef *rangeTableReference = NULL; + RangeTblEntry *subqueryRte = NULL; + RangeTblEntry *insertRte = NULL; + + if (commandType != CMD_INSERT) + { + return false; + } + + if (query->jointree == NULL || !IsA(query->jointree, FromExpr)) + { + return false; + } + + fromList = query->jointree->fromlist; + if (list_length(fromList) != 1) + { + return false; + } + + rangeTableReference = linitial(fromList); + if (!IsA(rangeTableReference, RangeTblRef)) + { + return false; + } + + subqueryRte = rt_fetch(rangeTableReference->rtindex, query->rtable); + if (subqueryRte->rtekind != RTE_SUBQUERY) + { + return false; + } + + /* ensure that there is a query */ + Assert(IsA(subqueryRte->subquery, Query)); + + insertRte = ExtractInsertRangeTableEntry(query); + if (!IsDistributedTable(insertRte->relid)) + { + return false; + } + + return true; +} + + +/* + * CreateInsertSelectPlan tries to create a distributed plan for an + * INSERT INTO distributed_table SELECT ... query by push down the + * command to the workers and if that is not possible it creates a + * plan for evaluating the SELECT on the coordinator. + */ +MultiPlan * +CreateInsertSelectPlan(Query *originalQuery, + PlannerRestrictionContext *plannerRestrictionContext) +{ + MultiPlan *distributedPlan = NULL; + + distributedPlan = CreateDistributedInsertSelectPlan(originalQuery, + plannerRestrictionContext); + + if (distributedPlan->planningError != NULL) + { + RaiseDeferredError(distributedPlan->planningError, DEBUG1); + + /* if INSERT..SELECT cannot be distributed, pull to coordinator */ + distributedPlan = CreateCoordinatorInsertSelectPlan(originalQuery); + } + + return distributedPlan; +} + + +/* + * CreateDistributedInsertSelectPlan Creates a MultiPlan for distributed + * INSERT ... SELECT queries which could consists of multiple tasks. + * + * The function never returns NULL, it errors out if cannot create the MultiPlan. + */ +static MultiPlan * +CreateDistributedInsertSelectPlan(Query *originalQuery, + PlannerRestrictionContext *plannerRestrictionContext) +{ + int shardOffset = 0; + List *sqlTaskList = NIL; + uint32 taskIdIndex = 1; /* 0 is reserved for invalid taskId */ + Job *workerJob = NULL; + uint64 jobId = INVALID_JOB_ID; + MultiPlan *multiPlan = CitusMakeNode(MultiPlan); + RangeTblEntry *insertRte = ExtractInsertRangeTableEntry(originalQuery); + RangeTblEntry *subqueryRte = ExtractSelectRangeTableEntry(originalQuery); + Oid targetRelationId = insertRte->relid; + DistTableCacheEntry *targetCacheEntry = DistributedTableCacheEntry(targetRelationId); + int shardCount = targetCacheEntry->shardIntervalArrayLength; + RelationRestrictionContext *relationRestrictionContext = + plannerRestrictionContext->relationRestrictionContext; + bool allReferenceTables = relationRestrictionContext->allReferenceTables; + bool safeToPushDownSubquery = false; + + multiPlan->operation = originalQuery->commandType; + + /* + * Error semantics for INSERT ... SELECT queries are different than regular + * modify queries. Thus, handle separately. + */ + multiPlan->planningError = DistributedInsertSelectSupported(originalQuery, insertRte, + subqueryRte, + allReferenceTables); + if (multiPlan->planningError) + { + return multiPlan; + } + + safeToPushDownSubquery = SafeToPushDownSubquery(plannerRestrictionContext, + originalQuery); + + /* + * Plan select query for each shard in the target table. Do so by replacing the + * partitioning qual parameter added in multi_planner() using the current shard's + * actual boundary values. Also, add the current shard's boundary values to the + * top level subquery to ensure that even if the partitioning qual is not distributed + * to all the tables, we never run the queries on the shards that don't match with + * the current shard boundaries. Finally, perform the normal shard pruning to + * decide on whether to push the query to the current shard or not. + */ + for (shardOffset = 0; shardOffset < shardCount; shardOffset++) + { + ShardInterval *targetShardInterval = + targetCacheEntry->sortedShardIntervalArray[shardOffset]; + Task *modifyTask = NULL; + + modifyTask = RouterModifyTaskForShardInterval(originalQuery, targetShardInterval, + relationRestrictionContext, + taskIdIndex, + safeToPushDownSubquery); + + /* add the task if it could be created */ + if (modifyTask != NULL) + { + modifyTask->insertSelectQuery = true; + + sqlTaskList = lappend(sqlTaskList, modifyTask); + } + + ++taskIdIndex; + } + + if (MultiTaskQueryLogLevel != MULTI_TASK_QUERY_INFO_OFF && + list_length(sqlTaskList) > 1) + { + ereport(MultiTaskQueryLogLevel, (errmsg("multi-task query about to be executed"), + errhint("Queries are split to multiple tasks " + "if they have to be split into several" + " queries on the workers."))); + } + + /* Create the worker job */ + workerJob = CitusMakeNode(Job); + workerJob->taskList = sqlTaskList; + workerJob->subqueryPushdown = false; + workerJob->dependedJobList = NIL; + workerJob->jobId = jobId; + workerJob->jobQuery = originalQuery; + workerJob->requiresMasterEvaluation = RequiresMasterEvaluation(originalQuery); + + /* and finally the multi plan */ + multiPlan->workerJob = workerJob; + multiPlan->masterQuery = NULL; + multiPlan->routerExecutable = true; + multiPlan->hasReturning = false; + + if (list_length(originalQuery->returningList) > 0) + { + multiPlan->hasReturning = true; + } + + return multiPlan; +} + + +/* + * DistributedInsertSelectSupported returns NULL if the INSERT ... SELECT query + * is supported, or a description why not. + */ +static DeferredErrorMessage * +DistributedInsertSelectSupported(Query *queryTree, RangeTblEntry *insertRte, + RangeTblEntry *subqueryRte, bool allReferenceTables) +{ + Query *subquery = NULL; + Oid selectPartitionColumnTableId = InvalidOid; + Oid targetRelationId = insertRte->relid; + char targetPartitionMethod = PartitionMethod(targetRelationId); + ListCell *rangeTableCell = NULL; + DeferredErrorMessage *error = NULL; + + /* we only do this check for INSERT ... SELECT queries */ + AssertArg(InsertSelectIntoDistributedTable(queryTree)); + + subquery = subqueryRte->subquery; + + if (!NeedsDistributedPlanning(subquery)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "distributed INSERT ... SELECT can only select from " + "distributed tables", + NULL, NULL); + } + + if (GetLocalGroupId() != 0) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "distributed INSERT ... SELECT can only be performed from " + "the coordinator", + NULL, NULL); + } + + /* we do not expect to see a view in modify target */ + foreach(rangeTableCell, queryTree->rtable) + { + RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); + if (rangeTableEntry->rtekind == RTE_RELATION && + rangeTableEntry->relkind == RELKIND_VIEW) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot insert into view over distributed table", + NULL, NULL); + } + } + + if (contain_volatile_functions((Node *) queryTree)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "volatile functions are not allowed in distributed " + "INSERT ... SELECT queries", + NULL, NULL); + } + + /* we don't support LIMIT, OFFSET and WINDOW functions */ + error = MultiTaskRouterSelectQuerySupported(subquery); + if (error) + { + return error; + } + + /* + * If we're inserting into a reference table, all participating tables + * should be reference tables as well. + */ + if (targetPartitionMethod == DISTRIBUTE_BY_NONE) + { + if (!allReferenceTables) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "only reference tables may be queried when targeting " + "a reference table with distributed INSERT ... SELECT", + NULL, NULL); + } + } + else + { + DeferredErrorMessage *error = NULL; + + /* ensure that INSERT's partition column comes from SELECT's partition column */ + error = InsertPartitionColumnMatchesSelect(queryTree, insertRte, subqueryRte, + &selectPartitionColumnTableId); + if (error) + { + return error; + } + + /* + * We expect partition column values come from colocated tables. Note that we + * skip this check from the reference table case given that all reference tables + * are already (and by default) co-located. + */ + if (!TablesColocated(insertRte->relid, selectPartitionColumnTableId)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "INSERT target table and the source relation of the SELECT partition " + "column value must be colocated in distributed INSERT ... SELECT", + NULL, NULL); + } + } + + return NULL; +} + + +/* + * SafeToPushDownSubquery returns true if either + * (i) there exists join in the query and all relations joined on their + * partition keys + * (ii) there exists only union set operations and all relations has + * partition keys in the same ordinal position in the query + */ +static bool +SafeToPushDownSubquery(PlannerRestrictionContext *plannerRestrictionContext, + Query *originalQuery) +{ + RelationRestrictionContext *relationRestrictionContext = + plannerRestrictionContext->relationRestrictionContext; + bool restrictionEquivalenceForPartitionKeys = + RestrictionEquivalenceForPartitionKeys(plannerRestrictionContext); + + if (restrictionEquivalenceForPartitionKeys) + { + return true; + } + + if (ContainsUnionSubquery(originalQuery)) + { + return SafeToPushdownUnionSubquery(relationRestrictionContext); + } + + return false; +} + + +/* + * RouterModifyTaskForShardInterval creates a modify task by + * replacing the partitioning qual parameter added in multi_planner() + * with the shardInterval's boundary value. Then perform the normal + * shard pruning on the subquery. Finally, checks if the target shardInterval + * has exactly same placements with the select task's available anchor + * placements. + * + * The function errors out if the subquery is not router select query (i.e., + * subqueries with non equi-joins.). + */ +static Task * +RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInterval, + RelationRestrictionContext *restrictionContext, + uint32 taskIdIndex, + bool safeToPushdownSubquery) +{ + Query *copiedQuery = copyObject(originalQuery); + RangeTblEntry *copiedInsertRte = ExtractInsertRangeTableEntry(copiedQuery); + RangeTblEntry *copiedSubqueryRte = ExtractSelectRangeTableEntry(copiedQuery); + Query *copiedSubquery = (Query *) copiedSubqueryRte->subquery; + + uint64 shardId = shardInterval->shardId; + Oid distributedTableId = shardInterval->relationId; + DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); + + RelationRestrictionContext *copiedRestrictionContext = + CopyRelationRestrictionContext(restrictionContext); + + StringInfo queryString = makeStringInfo(); + ListCell *restrictionCell = NULL; + Task *modifyTask = NULL; + List *selectPlacementList = NIL; + uint64 selectAnchorShardId = INVALID_SHARD_ID; + List *relationShardList = NIL; + uint64 jobId = INVALID_JOB_ID; + List *insertShardPlacementList = NULL; + List *intersectedPlacementList = NULL; + bool routerPlannable = false; + bool upsertQuery = false; + bool replacePrunedQueryWithDummy = false; + bool allReferenceTables = restrictionContext->allReferenceTables; + List *shardOpExpressions = NIL; + RestrictInfo *shardRestrictionList = NULL; + + /* grab shared metadata lock to stop concurrent placement additions */ + LockShardDistributionMetadata(shardId, ShareLock); + + /* + * Replace the partitioning qual parameter value in all baserestrictinfos. + * Note that this has to be done on a copy, as the walker modifies in place. + */ + foreach(restrictionCell, copiedRestrictionContext->relationRestrictionList) + { + RelationRestriction *restriction = lfirst(restrictionCell); + List *originalBaseRestrictInfo = restriction->relOptInfo->baserestrictinfo; + List *extendedBaseRestrictInfo = originalBaseRestrictInfo; + Index rteIndex = restriction->index; + + if (!safeToPushdownSubquery || allReferenceTables) + { + continue; + } + + shardOpExpressions = ShardIntervalOpExpressions(shardInterval, rteIndex); + shardRestrictionList = make_simple_restrictinfo((Expr *) shardOpExpressions); + extendedBaseRestrictInfo = lappend(extendedBaseRestrictInfo, + shardRestrictionList); + + restriction->relOptInfo->baserestrictinfo = extendedBaseRestrictInfo; + } + + /* + * We also need to add shard interval range to the subquery in case + * the partition qual not distributed all tables such as some + * subqueries in WHERE clause. + * + * Note that we need to add the ranges before the shard pruning to + * prevent shard pruning logic (i.e, namely UpdateRelationNames()) + * modifies range table entries, which makes hard to add the quals. + */ + if (!allReferenceTables) + { + AddShardIntervalRestrictionToSelect(copiedSubquery, shardInterval); + } + + /* mark that we don't want the router planner to generate dummy hosts/queries */ + replacePrunedQueryWithDummy = false; + + /* + * Use router select planner to decide on whether we can push down the query + * or not. If we can, we also rely on the side-effects that all RTEs have been + * updated to point to the relevant nodes and selectPlacementList is determined. + */ + routerPlannable = RouterSelectQuery(copiedSubquery, copiedRestrictionContext, + &selectPlacementList, &selectAnchorShardId, + &relationShardList, replacePrunedQueryWithDummy); + + if (!routerPlannable) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot perform distributed planning for the given " + "modification"), + errdetail("Select query cannot be pushed down to the worker."))); + } + + + /* ensure that we do not send queries where select is pruned away completely */ + if (list_length(selectPlacementList) == 0) + { + ereport(DEBUG2, (errmsg("Skipping target shard interval %ld since " + "SELECT query for it pruned away", shardId))); + + return NULL; + } + + /* get the placements for insert target shard and its intersection with select */ + insertShardPlacementList = FinalizedShardPlacementList(shardId); + intersectedPlacementList = IntersectPlacementList(insertShardPlacementList, + selectPlacementList); + + /* + * If insert target does not have exactly the same placements with the select, + * we sholdn't run the query. + */ + if (list_length(insertShardPlacementList) != list_length(intersectedPlacementList)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot perform distributed planning for the given " + "modification"), + errdetail("Insert query cannot be executed on all placements " + "for shard %ld", shardId))); + } + + + /* this is required for correct deparsing of the query */ + ReorderInsertSelectTargetLists(copiedQuery, copiedInsertRte, copiedSubqueryRte); + + /* set the upsert flag */ + if (originalQuery->onConflict != NULL) + { + upsertQuery = true; + } + + /* setting an alias simplifies deparsing of RETURNING */ + if (copiedInsertRte->alias == NULL) + { + Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL); + copiedInsertRte->alias = alias; + } + + /* and generate the full query string */ + deparse_shard_query(copiedQuery, distributedTableId, shardInterval->shardId, + queryString); + ereport(DEBUG2, (errmsg("distributed statement: %s", queryString->data))); + + modifyTask = CreateBasicTask(jobId, taskIdIndex, MODIFY_TASK, queryString->data); + modifyTask->dependedTaskList = NULL; + modifyTask->anchorShardId = shardId; + modifyTask->taskPlacementList = insertShardPlacementList; + modifyTask->upsertQuery = upsertQuery; + modifyTask->relationShardList = relationShardList; + modifyTask->replicationModel = cacheEntry->replicationModel; + + return modifyTask; +} + + +/* + * ReorderInsertSelectTargetLists reorders the target lists of INSERT/SELECT + * query which is required for deparsing purposes. The reordered query is returned. + * + * The necessity for this function comes from the fact that ruleutils.c is not supposed + * to be used on "rewritten" queries (i.e. ones that have been passed through + * QueryRewrite()). Query rewriting is the process in which views and such are expanded, + * and, INSERT/UPDATE targetlists are reordered to match the physical order, + * defaults etc. For the details of reordeing, see transformInsertRow() and + * rewriteTargetListIU(). + */ +Query * +ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte, + RangeTblEntry *subqueryRte) +{ + Query *subquery = NULL; + ListCell *insertTargetEntryCell; + List *newSubqueryTargetlist = NIL; + List *newInsertTargetlist = NIL; + int resno = 1; + Index insertTableId = 1; + Oid insertRelationId = InvalidOid; + int subqueryTargetLength = 0; + int targetEntryIndex = 0; + + AssertArg(InsertSelectIntoDistributedTable(originalQuery)); + + subquery = subqueryRte->subquery; + + insertRelationId = insertRte->relid; + + /* + * We implement the following algorithm for the reoderding: + * - Iterate over the INSERT target list entries + * - If the target entry includes a Var, find the corresponding + * SELECT target entry on the original query and update resno + * - If the target entry does not include a Var (i.e., defaults + * or constants), create new target entry and add that to + * SELECT target list + * - Create a new INSERT target entry with respect to the new + * SELECT target entry created. + */ + foreach(insertTargetEntryCell, originalQuery->targetList) + { + TargetEntry *oldInsertTargetEntry = lfirst(insertTargetEntryCell); + TargetEntry *newInsertTargetEntry = NULL; + Var *newInsertVar = NULL; + TargetEntry *newSubqueryTargetEntry = NULL; + List *targetVarList = NULL; + int targetVarCount = 0; + AttrNumber originalAttrNo = get_attnum(insertRelationId, + oldInsertTargetEntry->resname); + + /* see transformInsertRow() for the details */ + if (IsA(oldInsertTargetEntry->expr, ArrayRef) || + IsA(oldInsertTargetEntry->expr, FieldStore)) + { + ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg( + "cannot plan distributed INSERT INTO ... SELECT query"), + errhint("Do not use array references and field stores " + "on the INSERT target list."))); + } + + /* + * It is safe to pull Var clause and ignore the coercions since that + * are already going to be added on the workers implicitly. + */ + targetVarList = pull_var_clause((Node *) oldInsertTargetEntry->expr, + PVC_RECURSE_AGGREGATES); + + targetVarCount = list_length(targetVarList); + + /* a single INSERT target entry cannot have more than one Var */ + Assert(targetVarCount <= 1); + + if (targetVarCount == 1) + { + Var *oldInsertVar = (Var *) linitial(targetVarList); + TargetEntry *oldSubqueryTle = list_nth(subquery->targetList, + oldInsertVar->varattno - 1); + + newSubqueryTargetEntry = copyObject(oldSubqueryTle); + + newSubqueryTargetEntry->resno = resno; + newSubqueryTargetlist = lappend(newSubqueryTargetlist, + newSubqueryTargetEntry); + } + else + { + newSubqueryTargetEntry = makeTargetEntry(oldInsertTargetEntry->expr, + resno, + oldInsertTargetEntry->resname, + oldInsertTargetEntry->resjunk); + newSubqueryTargetlist = lappend(newSubqueryTargetlist, + newSubqueryTargetEntry); + } + + /* + * The newly created select target entry cannot be a junk entry since junk + * entries are not in the final target list and we're processing the + * final target list entries. + */ + Assert(!newSubqueryTargetEntry->resjunk); + + newInsertVar = makeVar(insertTableId, originalAttrNo, + exprType((Node *) newSubqueryTargetEntry->expr), + exprTypmod((Node *) newSubqueryTargetEntry->expr), + exprCollation((Node *) newSubqueryTargetEntry->expr), + 0); + newInsertTargetEntry = makeTargetEntry((Expr *) newInsertVar, originalAttrNo, + oldInsertTargetEntry->resname, + oldInsertTargetEntry->resjunk); + + newInsertTargetlist = lappend(newInsertTargetlist, newInsertTargetEntry); + resno++; + } + + /* + * if there are any remaining target list entries (i.e., GROUP BY column not on the + * target list of subquery), update the remaining resnos. + */ + subqueryTargetLength = list_length(subquery->targetList); + for (; targetEntryIndex < subqueryTargetLength; ++targetEntryIndex) + { + TargetEntry *oldSubqueryTle = list_nth(subquery->targetList, + targetEntryIndex); + TargetEntry *newSubqueryTargetEntry = NULL; + + /* + * Skip non-junk entries since we've already processed them above and this + * loop only is intended for junk entries. + */ + if (!oldSubqueryTle->resjunk) + { + continue; + } + + newSubqueryTargetEntry = copyObject(oldSubqueryTle); + + newSubqueryTargetEntry->resno = resno; + newSubqueryTargetlist = lappend(newSubqueryTargetlist, + newSubqueryTargetEntry); + + resno++; + } + + originalQuery->targetList = newInsertTargetlist; + subquery->targetList = newSubqueryTargetlist; + + return NULL; +} + + +/* + * MultiTaskRouterSelectQuerySupported returns NULL if the query may be used + * as the source for an INSERT ... SELECT or returns a description why not. + */ +static DeferredErrorMessage * +MultiTaskRouterSelectQuerySupported(Query *query) +{ + List *queryList = NIL; + ListCell *queryCell = NULL; + + ExtractQueryWalker((Node *) query, &queryList); + foreach(queryCell, queryList) + { + Query *subquery = (Query *) lfirst(queryCell); + + Assert(subquery->commandType == CMD_SELECT); + + /* pushing down rtes without relations yields (shardCount * expectedRows) */ + if (subquery->rtable == NIL) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "Subqueries without relations are not allowed in " + "distributed INSERT ... SELECT queries", + NULL, NULL); + } + + /* pushing down limit per shard would yield wrong results */ + if (subquery->limitCount != NULL) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "LIMIT clauses are not allowed in distirbuted INSERT " + "... SELECT queries", + NULL, NULL); + } + + /* pushing down limit offest per shard would yield wrong results */ + if (subquery->limitOffset != NULL) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "OFFSET clauses are not allowed in distributed " + "INSERT ... SELECT queries", + NULL, NULL); + } + + /* + * We could potentially support window clauses where the data is partitioned + * over distribution column. For simplicity, we currently do not support window + * clauses at all. + */ + if (subquery->windowClause != NULL) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "window functions are not allowed in distributed " + "INSERT ... SELECT queries", + NULL, NULL); + } + + if (subquery->setOperations != NULL) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "Set operations are not allowed in distributed " + "INSERT ... SELECT queries", + NULL, NULL); + } + + /* + * We currently do not support grouping sets since it could generate NULL + * results even after the restrictions are applied to the query. A solution + * would be to add the whole query into a subquery and add the restrictions + * on that subquery. + */ + if (subquery->groupingSets != NULL) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "grouping sets are not allowed in distributed " + "INSERT ... SELECT queries", + NULL, NULL); + } + + /* + * We cannot support DISTINCT ON clauses since it could be on a non-partition column. + * In that case, there is no way that Citus can support this. + */ + if (subquery->hasDistinctOn) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "DISTINCT ON clauses are not allowed in distributed " + "INSERT ... SELECT queries", + NULL, NULL); + } + } + + return NULL; +} + + +/* + * InsertPartitionColumnMatchesSelect returns NULL the partition column in the + * table targeted by INSERTed matches with the any of the SELECTed table's + * partition column. Returns the error description if there's no match. + * + * On return without error (i.e., if partition columns match), the function + * also sets selectPartitionColumnTableId. + */ +static DeferredErrorMessage * +InsertPartitionColumnMatchesSelect(Query *query, RangeTblEntry *insertRte, + RangeTblEntry *subqueryRte, + Oid *selectPartitionColumnTableId) +{ + ListCell *targetEntryCell = NULL; + uint32 rangeTableId = 1; + Oid insertRelationId = insertRte->relid; + Var *insertPartitionColumn = PartitionColumn(insertRelationId, rangeTableId); + Query *subquery = subqueryRte->subquery; + bool targetTableHasPartitionColumn = false; + + foreach(targetEntryCell, query->targetList) + { + TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); + List *insertTargetEntryColumnList = pull_var_clause_default((Node *) targetEntry); + Var *insertVar = NULL; + AttrNumber originalAttrNo = InvalidAttrNumber; + TargetEntry *subqueryTargetEntry = NULL; + Expr *selectTargetExpr = NULL; + Oid subqueryPartitionColumnRelationId = InvalidOid; + Var *subqueryPartitionColumn = NULL; + List *parentQueryList = NIL; + + /* + * We only consider target entries that include a single column. Note that this + * is slightly different than directly checking the whether the targetEntry->expr + * is a var since the var could be wrapped into an implicit/explicit casting. + * + * Also note that we skip the target entry if it does not contain a Var, which + * corresponds to columns with DEFAULT values on the target list. + */ + if (list_length(insertTargetEntryColumnList) != 1) + { + continue; + } + + insertVar = (Var *) linitial(insertTargetEntryColumnList); + originalAttrNo = targetEntry->resno; + + /* skip processing of target table non-partition columns */ + if (originalAttrNo != insertPartitionColumn->varattno) + { + continue; + } + + /* INSERT query includes the partition column */ + targetTableHasPartitionColumn = true; + + subqueryTargetEntry = list_nth(subquery->targetList, + insertVar->varattno - 1); + selectTargetExpr = subqueryTargetEntry->expr; + + parentQueryList = list_make2(query, subquery); + FindReferencedTableColumn(selectTargetExpr, + parentQueryList, subquery, + &subqueryPartitionColumnRelationId, + &subqueryPartitionColumn); + + /* + * Corresponding (i.e., in the same ordinal position as the target table's + * partition column) select target entry does not directly belong a table. + * Evaluate its expression type and error out properly. + */ + if (subqueryPartitionColumnRelationId == InvalidOid) + { + char *errorDetailTemplate = "Subquery contains %s in the " + "same position as the target table's " + "partition column."; + + char *exprDescription = ""; + + switch (selectTargetExpr->type) + { + case T_Const: + { + exprDescription = "a constant value"; + break; + } + + case T_OpExpr: + { + exprDescription = "an operator"; + break; + } + + case T_FuncExpr: + { + FuncExpr *subqueryFunctionExpr = (FuncExpr *) selectTargetExpr; + + switch (subqueryFunctionExpr->funcformat) + { + case COERCE_EXPLICIT_CALL: + { + exprDescription = "a function call"; + break; + } + + case COERCE_EXPLICIT_CAST: + { + exprDescription = "an explicit cast"; + break; + } + + case COERCE_IMPLICIT_CAST: + { + exprDescription = "an implicit cast"; + break; + } + + default: + { + exprDescription = "a function call"; + break; + } + } + break; + } + + case T_Aggref: + { + exprDescription = "an aggregation"; + break; + } + + case T_CaseExpr: + { + exprDescription = "a case expression"; + break; + } + + case T_CoalesceExpr: + { + exprDescription = "a coalesce expression"; + break; + } + + case T_RowExpr: + { + exprDescription = "a row expression"; + break; + } + + case T_MinMaxExpr: + { + exprDescription = "a min/max expression"; + break; + } + + case T_CoerceViaIO: + { + exprDescription = "an explicit coercion"; + break; + } + + default: + { + exprDescription = + "an expression that is not a simple column reference"; + break; + } + } + + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot perform distributed INSERT INTO ... SELECT " + "because the partition columns in the source table " + "and subquery do not match", + psprintf(errorDetailTemplate, exprDescription), + "Ensure the target table's partition column has a " + "corresponding simple column reference to a distributed " + "table's partition column in the subquery."); + } + + /* + * Insert target expression could only be non-var if the select target + * entry does not have the same type (i.e., target column requires casting). + */ + if (!IsA(targetEntry->expr, Var)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot perform distributed INSERT INTO ... SELECT " + "because the partition columns in the source table " + "and subquery do not match", + "The data type of the target table's partition column " + "should exactly match the data type of the " + "corresponding simple column reference in the subquery.", + NULL); + } + + /* finally, check that the select target column is a partition column */ + if (!IsPartitionColumn(selectTargetExpr, subquery)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot perform distributed INSERT INTO ... SELECT " + "becuase the partition columns in the source table " + "and subquery do not match", + "The target table's partition column should correspond " + "to a partition column in the subquery.", + NULL); + } + + /* finally, check that the select target column is a partition column */ + /* we can set the select relation id */ + *selectPartitionColumnTableId = subqueryPartitionColumnRelationId; + + break; + } + + if (!targetTableHasPartitionColumn) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot perform distributed INSERT INTO ... SELECT " + "because the partition columns in the source table " + "and subquery do not match", + "the query doesn't include the target table's " + "partition column", + NULL); + } + + return NULL; +} /* * CreatteCoordinatorInsertSelectPlan creates a query plan for a SELECT into a * distributed table. The query plan can also be executed on a worker in MX. */ -MultiPlan * +static MultiPlan * CreateCoordinatorInsertSelectPlan(Query *parse) { Query *insertSelectQuery = copyObject(parse); @@ -47,14 +1062,11 @@ CreateCoordinatorInsertSelectPlan(Query *parse) RangeTblEntry *insertRte = ExtractInsertRangeTableEntry(insertSelectQuery); Oid targetRelationId = insertRte->relid; - ListCell *selectTargetCell = NULL; - ListCell *insertTargetCell = NULL; - MultiPlan *multiPlan = CitusMakeNode(MultiPlan); multiPlan->operation = CMD_INSERT; multiPlan->planningError = - DeferErrorIfCoordinatorInsertSelectUnsupported(insertSelectQuery); + CoordinatorInsertSelectSupported(insertSelectQuery); if (multiPlan->planningError != NULL) { @@ -89,48 +1101,9 @@ CreateCoordinatorInsertSelectPlan(Query *parse) ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte); - /* add casts when the SELECT output does not directly match the table */ - forboth(insertTargetCell, insertSelectQuery->targetList, - selectTargetCell, selectQuery->targetList) - { - TargetEntry *insertTargetEntry = (TargetEntry *) lfirst(insertTargetCell); - TargetEntry *selectTargetEntry = (TargetEntry *) lfirst(selectTargetCell); - - Var *columnVar = NULL; - Oid columnType = InvalidOid; - int32 columnTypeMod = 0; - Oid selectOutputType = InvalidOid; - - /* indirection is not supported, e.g. INSERT INTO table (composite_column.x) */ - if (!IsA(insertTargetEntry->expr, Var)) - { - ereport(ERROR, (errmsg("can only handle regular columns in the target " - "list"))); - } - - columnVar = (Var *) insertTargetEntry->expr; - columnType = get_atttype(targetRelationId, columnVar->varattno); - columnTypeMod = get_atttypmod(targetRelationId, columnVar->varattno); - selectOutputType = columnVar->vartype; - - /* - * If the type in the target list does not match the type of the column, - * we need to cast to the column type. PostgreSQL would do this - * automatically during the insert, but we're passing the SELECT - * output directly to COPY. - */ - if (columnType != selectOutputType) - { - Expr *selectExpression = selectTargetEntry->expr; - Expr *typeCastedSelectExpr = - (Expr *) coerce_to_target_type(NULL, (Node *) selectExpression, - selectOutputType, columnType, - columnTypeMod, COERCION_EXPLICIT, - COERCE_IMPLICIT_CAST, -1); - - selectTargetEntry->expr = typeCastedSelectExpr; - } - } + /* make sure the SELECT returns the right type for copying into the table */ + CastSelectTargetList(selectQuery->targetList, targetRelationId, + insertSelectQuery->targetList); multiPlan->insertSelectSubquery = selectQuery; multiPlan->insertTargetList = insertSelectQuery->targetList; @@ -141,13 +1114,13 @@ CreateCoordinatorInsertSelectPlan(Query *parse) /* - * DeferErrorIfCoordinatorInsertSelectUnsupported returns an error if executing an + * CoordinatorInsertSelectSupported returns an error if executing an * INSERT ... SELECT command by pulling results of the SELECT to the coordinator * is unsupported because it uses RETURNING, ON CONFLICT, or an append-distributed * table. */ static DeferredErrorMessage * -DeferErrorIfCoordinatorInsertSelectUnsupported(Query *insertSelectQuery) +CoordinatorInsertSelectSupported(Query *insertSelectQuery) { RangeTblEntry *insertRte = NULL; RangeTblEntry *subqueryRte = NULL; @@ -252,3 +1225,62 @@ WrapSubquery(Query *subquery) return outerQuery; } + + +/* + * CastSelectTargetList adds casts to the target entries in selectTargetList + * to match the type in insertTargetList. This ensures that the results of + * the SELECT will have the right type when serialised during COPY. For + * example, a float that is inserted into a an int column normally has an + * implicit cast, but if we send it through the COPY protocol the serialised + * form would contain decimal notation, which is not valid for int. + */ +static void +CastSelectTargetList(List *selectTargetList, Oid targetRelationId, List *insertTargetList) +{ + ListCell *insertTargetCell = NULL; + ListCell *selectTargetCell = NULL; + + /* add casts when the SELECT output does not directly match the table */ + forboth(insertTargetCell, insertTargetList, + selectTargetCell, selectTargetList) + { + TargetEntry *insertTargetEntry = (TargetEntry *) lfirst(insertTargetCell); + TargetEntry *selectTargetEntry = (TargetEntry *) lfirst(selectTargetCell); + + Var *columnVar = NULL; + Oid columnType = InvalidOid; + int32 columnTypeMod = 0; + Oid selectOutputType = InvalidOid; + + /* indirection is not supported, e.g. INSERT INTO table (composite_column.x) */ + if (!IsA(insertTargetEntry->expr, Var)) + { + ereport(ERROR, (errmsg("can only handle regular columns in the target " + "list"))); + } + + columnVar = (Var *) insertTargetEntry->expr; + columnType = get_atttype(targetRelationId, columnVar->varattno); + columnTypeMod = get_atttypmod(targetRelationId, columnVar->varattno); + selectOutputType = columnVar->vartype; + + /* + * If the type in the target list does not match the type of the column, + * we need to cast to the column type. PostgreSQL would do this + * automatically during the insert, but we're passing the SELECT + * output directly to COPY. + */ + if (columnType != selectOutputType) + { + Expr *selectExpression = selectTargetEntry->expr; + Expr *typeCastedSelectExpr = + (Expr *) coerce_to_target_type(NULL, (Node *) selectExpression, + selectOutputType, columnType, + columnTypeMod, COERCION_EXPLICIT, + COERCE_IMPLICIT_CAST, -1); + + selectTargetEntry->expr = typeCastedSelectExpr; + } + } +} diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index f284df44d..8641af7fd 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -22,6 +22,7 @@ #include "distributed/citus_clauses.h" #include "distributed/colocation_utils.h" #include "distributed/metadata_cache.h" +#include "distributed/insert_select_planner.h" #include "distributed/multi_logical_optimizer.h" #include "distributed/multi_logical_planner.h" #include "distributed/multi_physical_planner.h" diff --git a/src/backend/distributed/planner/multi_planner.c b/src/backend/distributed/planner/multi_planner.c index bb3e49101..81d1a547f 100644 --- a/src/backend/distributed/planner/multi_planner.c +++ b/src/backend/distributed/planner/multi_planner.c @@ -281,16 +281,8 @@ CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query { if (InsertSelectIntoDistributedTable(originalQuery)) { - distributedPlan = CreateDistributedInsertSelectPlan(originalQuery, - plannerRestrictionContext); - - if (distributedPlan->planningError != NULL) - { - RaiseDeferredError(distributedPlan->planningError, DEBUG1); - - /* if INSERT..SELECT cannot be distributed, pull to coordinator */ - distributedPlan = CreateCoordinatorInsertSelectPlan(originalQuery); - } + distributedPlan = + CreateInsertSelectPlan(originalQuery, plannerRestrictionContext); } else { diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index ec0d280d3..87335bdef 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -26,6 +26,7 @@ #include "distributed/deparse_shard_query.h" #include "distributed/distribution_column.h" #include "distributed/errormessage.h" +#include "distributed/insert_select_planner.h" #include "distributed/master_metadata_utility.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" @@ -86,14 +87,6 @@ static MultiPlan * CreateSingleTaskRouterPlan(Query *originalQuery, Query *query, RelationRestrictionContext * restrictionContext); -static bool SafeToPushDownSubquery(PlannerRestrictionContext *plannerRestrictionContext, - Query *originalQuery); -static Task * RouterModifyTaskForShardInterval(Query *originalQuery, - ShardInterval *shardInterval, - RelationRestrictionContext * - restrictionContext, - uint32 taskIdIndex, - bool allRelationsJoinedOnPartitionKey); static bool MasterIrreducibleExpression(Node *expression, bool *varArgument, bool *badCoalesce); static bool MasterIrreducibleExpressionWalker(Node *expression, WalkerState *state); @@ -115,21 +108,9 @@ static bool RelationPrunesToMultipleShards(List *relationShardList); static List * TargetShardIntervalsForSelect(Query *query, RelationRestrictionContext *restrictionContext); static List * WorkersContainingAllShards(List *prunedShardIntervalsList); -static List * IntersectPlacementList(List *lhsPlacementList, List *rhsPlacementList); static Job * RouterQueryJob(Query *query, Task *task, List *placementList); static bool MultiRouterPlannableQuery(Query *query, RelationRestrictionContext *restrictionContext); -static DeferredErrorMessage * InsertSelectQuerySupported(Query *queryTree, - RangeTblEntry *insertRte, - RangeTblEntry *subqueryRte, - bool allReferenceTables); -static DeferredErrorMessage * MultiTaskRouterSelectQuerySupported(Query *query); -static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query, - RangeTblEntry *insertRte, - RangeTblEntry * - subqueryRte, - Oid * - selectPartitionColumnTableId); static DeferredErrorMessage * ErrorIfQueryHasModifyingCTE(Query *queryTree); #if (PG_VERSION_NUM >= 100000) static List * get_all_actual_clauses(List *restrictinfo_list); @@ -255,311 +236,6 @@ CreateSingleTaskRouterPlan(Query *originalQuery, Query *query, } -/* - * Creates a router plan for INSERT ... SELECT queries which could consists of - * multiple tasks. - * - * The function never returns NULL, it errors out if cannot create the multi plan. - */ -MultiPlan * -CreateDistributedInsertSelectPlan(Query *originalQuery, - PlannerRestrictionContext *plannerRestrictionContext) -{ - int shardOffset = 0; - List *sqlTaskList = NIL; - uint32 taskIdIndex = 1; /* 0 is reserved for invalid taskId */ - Job *workerJob = NULL; - uint64 jobId = INVALID_JOB_ID; - MultiPlan *multiPlan = CitusMakeNode(MultiPlan); - RangeTblEntry *insertRte = ExtractInsertRangeTableEntry(originalQuery); - RangeTblEntry *subqueryRte = ExtractSelectRangeTableEntry(originalQuery); - Oid targetRelationId = insertRte->relid; - DistTableCacheEntry *targetCacheEntry = DistributedTableCacheEntry(targetRelationId); - int shardCount = targetCacheEntry->shardIntervalArrayLength; - RelationRestrictionContext *relationRestrictionContext = - plannerRestrictionContext->relationRestrictionContext; - bool allReferenceTables = relationRestrictionContext->allReferenceTables; - bool safeToPushDownSubquery = false; - - multiPlan->operation = originalQuery->commandType; - - /* - * Error semantics for INSERT ... SELECT queries are different than regular - * modify queries. Thus, handle separately. - */ - multiPlan->planningError = InsertSelectQuerySupported(originalQuery, insertRte, - subqueryRte, - allReferenceTables); - if (multiPlan->planningError) - { - return multiPlan; - } - - safeToPushDownSubquery = SafeToPushDownSubquery(plannerRestrictionContext, - originalQuery); - - /* - * Plan select query for each shard in the target table. Do so by replacing the - * partitioning qual parameter added in multi_planner() using the current shard's - * actual boundary values. Also, add the current shard's boundary values to the - * top level subquery to ensure that even if the partitioning qual is not distributed - * to all the tables, we never run the queries on the shards that don't match with - * the current shard boundaries. Finally, perform the normal shard pruning to - * decide on whether to push the query to the current shard or not. - */ - for (shardOffset = 0; shardOffset < shardCount; shardOffset++) - { - ShardInterval *targetShardInterval = - targetCacheEntry->sortedShardIntervalArray[shardOffset]; - Task *modifyTask = NULL; - - modifyTask = RouterModifyTaskForShardInterval(originalQuery, targetShardInterval, - relationRestrictionContext, - taskIdIndex, - safeToPushDownSubquery); - - /* add the task if it could be created */ - if (modifyTask != NULL) - { - modifyTask->insertSelectQuery = true; - - sqlTaskList = lappend(sqlTaskList, modifyTask); - } - - ++taskIdIndex; - } - - if (MultiTaskQueryLogLevel != MULTI_TASK_QUERY_INFO_OFF && - list_length(sqlTaskList) > 1) - { - ereport(MultiTaskQueryLogLevel, (errmsg("multi-task query about to be executed"), - errhint("Queries are split to multiple tasks " - "if they have to be split into several" - " queries on the workers."))); - } - - /* Create the worker job */ - workerJob = CitusMakeNode(Job); - workerJob->taskList = sqlTaskList; - workerJob->subqueryPushdown = false; - workerJob->dependedJobList = NIL; - workerJob->jobId = jobId; - workerJob->jobQuery = originalQuery; - workerJob->requiresMasterEvaluation = RequiresMasterEvaluation(originalQuery); - - /* and finally the multi plan */ - multiPlan->workerJob = workerJob; - multiPlan->masterQuery = NULL; - multiPlan->routerExecutable = true; - multiPlan->hasReturning = false; - - if (list_length(originalQuery->returningList) > 0) - { - multiPlan->hasReturning = true; - } - - return multiPlan; -} - - -/* - * SafeToPushDownSubquery returns true if either - * (i) there exists join in the query and all relations joined on their - * partition keys - * (ii) there exists only union set operations and all relations has - * partition keys in the same ordinal position in the query - */ -static bool -SafeToPushDownSubquery(PlannerRestrictionContext *plannerRestrictionContext, - Query *originalQuery) -{ - RelationRestrictionContext *relationRestrictionContext = - plannerRestrictionContext->relationRestrictionContext; - bool restrictionEquivalenceForPartitionKeys = - RestrictionEquivalenceForPartitionKeys(plannerRestrictionContext); - - if (restrictionEquivalenceForPartitionKeys) - { - return true; - } - - if (ContainsUnionSubquery(originalQuery)) - { - return SafeToPushdownUnionSubquery(relationRestrictionContext); - } - - return false; -} - - -/* - * RouterModifyTaskForShardInterval creates a modify task by - * replacing the partitioning qual parameter added in multi_planner() - * with the shardInterval's boundary value. Then perform the normal - * shard pruning on the subquery. Finally, checks if the target shardInterval - * has exactly same placements with the select task's available anchor - * placements. - * - * The function errors out if the subquery is not router select query (i.e., - * subqueries with non equi-joins.). - */ -static Task * -RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInterval, - RelationRestrictionContext *restrictionContext, - uint32 taskIdIndex, - bool safeToPushdownSubquery) -{ - Query *copiedQuery = copyObject(originalQuery); - RangeTblEntry *copiedInsertRte = ExtractInsertRangeTableEntry(copiedQuery); - RangeTblEntry *copiedSubqueryRte = ExtractSelectRangeTableEntry(copiedQuery); - Query *copiedSubquery = (Query *) copiedSubqueryRte->subquery; - - uint64 shardId = shardInterval->shardId; - Oid distributedTableId = shardInterval->relationId; - DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); - - RelationRestrictionContext *copiedRestrictionContext = - CopyRelationRestrictionContext(restrictionContext); - - StringInfo queryString = makeStringInfo(); - ListCell *restrictionCell = NULL; - Task *modifyTask = NULL; - List *selectPlacementList = NIL; - uint64 selectAnchorShardId = INVALID_SHARD_ID; - List *relationShardList = NIL; - uint64 jobId = INVALID_JOB_ID; - List *insertShardPlacementList = NULL; - List *intersectedPlacementList = NULL; - bool routerPlannable = false; - bool upsertQuery = false; - bool replacePrunedQueryWithDummy = false; - bool allReferenceTables = restrictionContext->allReferenceTables; - List *shardOpExpressions = NIL; - RestrictInfo *shardRestrictionList = NULL; - - /* grab shared metadata lock to stop concurrent placement additions */ - LockShardDistributionMetadata(shardId, ShareLock); - - /* - * Replace the partitioning qual parameter value in all baserestrictinfos. - * Note that this has to be done on a copy, as the walker modifies in place. - */ - foreach(restrictionCell, copiedRestrictionContext->relationRestrictionList) - { - RelationRestriction *restriction = lfirst(restrictionCell); - List *originalBaseRestrictInfo = restriction->relOptInfo->baserestrictinfo; - List *extendedBaseRestrictInfo = originalBaseRestrictInfo; - Index rteIndex = restriction->index; - - if (!safeToPushdownSubquery || allReferenceTables) - { - continue; - } - - shardOpExpressions = ShardIntervalOpExpressions(shardInterval, rteIndex); - shardRestrictionList = make_simple_restrictinfo((Expr *) shardOpExpressions); - extendedBaseRestrictInfo = lappend(extendedBaseRestrictInfo, - shardRestrictionList); - - restriction->relOptInfo->baserestrictinfo = extendedBaseRestrictInfo; - } - - /* - * We also need to add shard interval range to the subquery in case - * the partition qual not distributed all tables such as some - * subqueries in WHERE clause. - * - * Note that we need to add the ranges before the shard pruning to - * prevent shard pruning logic (i.e, namely UpdateRelationNames()) - * modifies range table entries, which makes hard to add the quals. - */ - if (!allReferenceTables) - { - AddShardIntervalRestrictionToSelect(copiedSubquery, shardInterval); - } - - /* mark that we don't want the router planner to generate dummy hosts/queries */ - replacePrunedQueryWithDummy = false; - - /* - * Use router select planner to decide on whether we can push down the query - * or not. If we can, we also rely on the side-effects that all RTEs have been - * updated to point to the relevant nodes and selectPlacementList is determined. - */ - routerPlannable = RouterSelectQuery(copiedSubquery, copiedRestrictionContext, - &selectPlacementList, &selectAnchorShardId, - &relationShardList, replacePrunedQueryWithDummy); - - if (!routerPlannable) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot perform distributed planning for the given " - "modification"), - errdetail("Select query cannot be pushed down to the worker."))); - } - - - /* ensure that we do not send queries where select is pruned away completely */ - if (list_length(selectPlacementList) == 0) - { - ereport(DEBUG2, (errmsg("Skipping target shard interval %ld since " - "SELECT query for it pruned away", shardId))); - - return NULL; - } - - /* get the placements for insert target shard and its intersection with select */ - insertShardPlacementList = FinalizedShardPlacementList(shardId); - intersectedPlacementList = IntersectPlacementList(insertShardPlacementList, - selectPlacementList); - - /* - * If insert target does not have exactly the same placements with the select, - * we sholdn't run the query. - */ - if (list_length(insertShardPlacementList) != list_length(intersectedPlacementList)) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot perform distributed planning for the given " - "modification"), - errdetail("Insert query cannot be executed on all placements " - "for shard %ld", shardId))); - } - - - /* this is required for correct deparsing of the query */ - ReorderInsertSelectTargetLists(copiedQuery, copiedInsertRte, copiedSubqueryRte); - - /* set the upsert flag */ - if (originalQuery->onConflict != NULL) - { - upsertQuery = true; - } - - /* setting an alias simplifies deparsing of RETURNING */ - if (copiedInsertRte->alias == NULL) - { - Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL); - copiedInsertRte->alias = alias; - } - - /* and generate the full query string */ - deparse_shard_query(copiedQuery, distributedTableId, shardInterval->shardId, - queryString); - ereport(DEBUG2, (errmsg("distributed statement: %s", queryString->data))); - - modifyTask = CreateBasicTask(jobId, taskIdIndex, MODIFY_TASK, queryString->data); - modifyTask->dependedTaskList = NULL; - modifyTask->anchorShardId = shardId; - modifyTask->taskPlacementList = insertShardPlacementList; - modifyTask->upsertQuery = upsertQuery; - modifyTask->relationShardList = relationShardList; - modifyTask->replicationModel = cacheEntry->replicationModel; - - return modifyTask; -} - - /* * ShardIntervalOpExpressions returns a list of OpExprs with exactly two * items in it. The list consists of shard interval ranges with partition columns @@ -776,441 +452,6 @@ ExtractInsertRangeTableEntry(Query *query) } -/* - * InsertSelectQueryNotSupported returns NULL if the INSERT ... SELECT query - * is supported, or a description why not. - */ -static DeferredErrorMessage * -InsertSelectQuerySupported(Query *queryTree, RangeTblEntry *insertRte, - RangeTblEntry *subqueryRte, bool allReferenceTables) -{ - Query *subquery = NULL; - Oid selectPartitionColumnTableId = InvalidOid; - Oid targetRelationId = insertRte->relid; - char targetPartitionMethod = PartitionMethod(targetRelationId); - ListCell *rangeTableCell = NULL; - DeferredErrorMessage *error = NULL; - - /* we only do this check for INSERT ... SELECT queries */ - AssertArg(InsertSelectIntoDistributedTable(queryTree)); - - subquery = subqueryRte->subquery; - - if (!NeedsDistributedPlanning(subquery)) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "distributed INSERT ... SELECT can only select from " - "distributed tables", - NULL, NULL); - } - - if (GetLocalGroupId() != 0) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "distributed INSERT ... SELECT can only be performed from " - "the coordinator", - NULL, NULL); - } - - /* we do not expect to see a view in modify target */ - foreach(rangeTableCell, queryTree->rtable) - { - RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); - if (rangeTableEntry->rtekind == RTE_RELATION && - rangeTableEntry->relkind == RELKIND_VIEW) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "cannot insert into view over distributed table", - NULL, NULL); - } - } - - if (contain_volatile_functions((Node *) queryTree)) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "volatile functions are not allowed in distributed " - "INSERT ... SELECT queries", - NULL, NULL); - } - - /* we don't support LIMIT, OFFSET and WINDOW functions */ - error = MultiTaskRouterSelectQuerySupported(subquery); - if (error) - { - return error; - } - - /* - * If we're inserting into a reference table, all participating tables - * should be reference tables as well. - */ - if (targetPartitionMethod == DISTRIBUTE_BY_NONE) - { - if (!allReferenceTables) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "only reference tables may be queried when targeting " - "a reference table with distributed INSERT ... SELECT", - NULL, NULL); - } - } - else - { - DeferredErrorMessage *error = NULL; - - /* ensure that INSERT's partition column comes from SELECT's partition column */ - error = InsertPartitionColumnMatchesSelect(queryTree, insertRte, subqueryRte, - &selectPartitionColumnTableId); - if (error) - { - return error; - } - - /* - * We expect partition column values come from colocated tables. Note that we - * skip this check from the reference table case given that all reference tables - * are already (and by default) co-located. - */ - if (!TablesColocated(insertRte->relid, selectPartitionColumnTableId)) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "INSERT target table and the source relation of the SELECT partition " - "column value must be colocated in distributed INSERT ... SELECT", - NULL, NULL); - } - } - - return NULL; -} - - -/* - * MultiTaskRouterSelectQuerySupported returns NULL if the query may be used - * as the source for an INSERT ... SELECT or returns a description why not. - */ -static DeferredErrorMessage * -MultiTaskRouterSelectQuerySupported(Query *query) -{ - List *queryList = NIL; - ListCell *queryCell = NULL; - - ExtractQueryWalker((Node *) query, &queryList); - foreach(queryCell, queryList) - { - Query *subquery = (Query *) lfirst(queryCell); - - Assert(subquery->commandType == CMD_SELECT); - - /* pushing down rtes without relations yields (shardCount * expectedRows) */ - if (subquery->rtable == NIL) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "Subqueries without relations are not allowed in " - "distributed INSERT ... SELECT queries", - NULL, NULL); - } - - /* pushing down limit per shard would yield wrong results */ - if (subquery->limitCount != NULL) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "LIMIT clauses are not allowed in distirbuted INSERT " - "... SELECT queries", - NULL, NULL); - } - - /* pushing down limit offest per shard would yield wrong results */ - if (subquery->limitOffset != NULL) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "OFFSET clauses are not allowed in distributed " - "INSERT ... SELECT queries", - NULL, NULL); - } - - /* - * We could potentially support window clauses where the data is partitioned - * over distribution column. For simplicity, we currently do not support window - * clauses at all. - */ - if (subquery->windowClause != NULL) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "window functions are not allowed in distributed " - "INSERT ... SELECT queries", - NULL, NULL); - } - - if (subquery->setOperations != NULL) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "Set operations are not allowed in distributed " - "INSERT ... SELECT queries", - NULL, NULL); - } - - /* - * We currently do not support grouping sets since it could generate NULL - * results even after the restrictions are applied to the query. A solution - * would be to add the whole query into a subquery and add the restrictions - * on that subquery. - */ - if (subquery->groupingSets != NULL) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "grouping sets are not allowed in distributed " - "INSERT ... SELECT queries", - NULL, NULL); - } - - /* - * We cannot support DISTINCT ON clauses since it could be on a non-partition column. - * In that case, there is no way that Citus can support this. - */ - if (subquery->hasDistinctOn) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "DISTINCT ON clauses are not allowed in distributed " - "INSERT ... SELECT queries", - NULL, NULL); - } - } - - return NULL; -} - - -/* - * InsertPartitionColumnMatchesSelect returns NULL the partition column in the - * table targeted by INSERTed matches with the any of the SELECTed table's - * partition column. Returns the error description if there's no match. - * - * On return without error (i.e., if partition columns match), the function - * also sets selectPartitionColumnTableId. - */ -static DeferredErrorMessage * -InsertPartitionColumnMatchesSelect(Query *query, RangeTblEntry *insertRte, - RangeTblEntry *subqueryRte, - Oid *selectPartitionColumnTableId) -{ - ListCell *targetEntryCell = NULL; - uint32 rangeTableId = 1; - Oid insertRelationId = insertRte->relid; - Var *insertPartitionColumn = PartitionColumn(insertRelationId, rangeTableId); - Query *subquery = subqueryRte->subquery; - bool targetTableHasPartitionColumn = false; - - foreach(targetEntryCell, query->targetList) - { - TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); - List *insertTargetEntryColumnList = pull_var_clause_default((Node *) targetEntry); - Var *insertVar = NULL; - AttrNumber originalAttrNo = InvalidAttrNumber; - TargetEntry *subqueryTargetEntry = NULL; - Expr *selectTargetExpr = NULL; - Oid subqueryPartitionColumnRelationId = InvalidOid; - Var *subqueryPartitionColumn = NULL; - List *parentQueryList = NIL; - - /* - * We only consider target entries that include a single column. Note that this - * is slightly different than directly checking the whether the targetEntry->expr - * is a var since the var could be wrapped into an implicit/explicit casting. - * - * Also note that we skip the target entry if it does not contain a Var, which - * corresponds to columns with DEFAULT values on the target list. - */ - if (list_length(insertTargetEntryColumnList) != 1) - { - continue; - } - - insertVar = (Var *) linitial(insertTargetEntryColumnList); - originalAttrNo = targetEntry->resno; - - /* skip processing of target table non-partition columns */ - if (originalAttrNo != insertPartitionColumn->varattno) - { - continue; - } - - /* INSERT query includes the partition column */ - targetTableHasPartitionColumn = true; - - subqueryTargetEntry = list_nth(subquery->targetList, - insertVar->varattno - 1); - selectTargetExpr = subqueryTargetEntry->expr; - - parentQueryList = list_make2(query, subquery); - FindReferencedTableColumn(selectTargetExpr, - parentQueryList, subquery, - &subqueryPartitionColumnRelationId, - &subqueryPartitionColumn); - - /* - * Corresponding (i.e., in the same ordinal position as the target table's - * partition column) select target entry does not directly belong a table. - * Evaluate its expression type and error out properly. - */ - if (subqueryPartitionColumnRelationId == InvalidOid) - { - char *errorDetailTemplate = "Subquery contains %s in the " - "same position as the target table's " - "partition column."; - - char *exprDescription = ""; - - switch (selectTargetExpr->type) - { - case T_Const: - { - exprDescription = "a constant value"; - break; - } - - case T_OpExpr: - { - exprDescription = "an operator"; - break; - } - - case T_FuncExpr: - { - FuncExpr *subqueryFunctionExpr = (FuncExpr *) selectTargetExpr; - - switch (subqueryFunctionExpr->funcformat) - { - case COERCE_EXPLICIT_CALL: - { - exprDescription = "a function call"; - break; - } - - case COERCE_EXPLICIT_CAST: - { - exprDescription = "an explicit cast"; - break; - } - - case COERCE_IMPLICIT_CAST: - { - exprDescription = "an implicit cast"; - break; - } - - default: - { - exprDescription = "a function call"; - break; - } - } - break; - } - - case T_Aggref: - { - exprDescription = "an aggregation"; - break; - } - - case T_CaseExpr: - { - exprDescription = "a case expression"; - break; - } - - case T_CoalesceExpr: - { - exprDescription = "a coalesce expression"; - break; - } - - case T_RowExpr: - { - exprDescription = "a row expression"; - break; - } - - case T_MinMaxExpr: - { - exprDescription = "a min/max expression"; - break; - } - - case T_CoerceViaIO: - { - exprDescription = "an explicit coercion"; - break; - } - - default: - { - exprDescription = - "an expression that is not a simple column reference"; - break; - } - } - - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "cannot perform distributed INSERT INTO ... SELECT " - "because the partition columns in the source table " - "and subquery do not match", - psprintf(errorDetailTemplate, exprDescription), - "Ensure the target table's partition column has a " - "corresponding simple column reference to a distributed " - "table's partition column in the subquery."); - } - - /* - * Insert target expression could only be non-var if the select target - * entry does not have the same type (i.e., target column requires casting). - */ - if (!IsA(targetEntry->expr, Var)) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "cannot perform distributed INSERT INTO ... SELECT " - "because the partition columns in the source table " - "and subquery do not match", - "The data type of the target table's partition column " - "should exactly match the data type of the " - "corresponding simple column reference in the subquery.", - NULL); - } - - /* finally, check that the select target column is a partition column */ - if (!IsPartitionColumn(selectTargetExpr, subquery)) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "cannot perform distributed INSERT INTO ... SELECT " - "becuase the partition columns in the source table " - "and subquery do not match", - "The target table's partition column should correspond " - "to a partition column in the subquery.", - NULL); - } - - /* finally, check that the select target column is a partition column */ - /* we can set the select relation id */ - *selectPartitionColumnTableId = subqueryPartitionColumnRelationId; - - break; - } - - if (!targetTableHasPartitionColumn) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "cannot perform distributed INSERT INTO ... SELECT " - "because the partition columns in the source table " - "and subquery do not match", - "the query doesn't include the target table's " - "partition column", - NULL); - } - - return NULL; -} - - /* * ModifyQuerySupported returns NULL if the query only contains supported * features, otherwise it returns an error description. @@ -2593,7 +1834,7 @@ WorkersContainingAllShards(List *prunedShardIntervalsList) * expect very high replication factor, iterating over a list and making string * comparisons should be sufficient. */ -static List * +List * IntersectPlacementList(List *lhsPlacementList, List *rhsPlacementList) { ListCell *lhsPlacementCell = NULL; @@ -2736,221 +1977,6 @@ MultiRouterPlannableQuery(Query *query, RelationRestrictionContext *restrictionC } -/* - * ReorderInsertSelectTargetLists reorders the target lists of INSERT/SELECT - * query which is required for deparsing purposes. The reordered query is returned. - * - * The necessity for this function comes from the fact that ruleutils.c is not supposed - * to be used on "rewritten" queries (i.e. ones that have been passed through - * QueryRewrite()). Query rewriting is the process in which views and such are expanded, - * and, INSERT/UPDATE targetlists are reordered to match the physical order, - * defaults etc. For the details of reordeing, see transformInsertRow() and - * rewriteTargetListIU(). - */ -Query * -ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte, - RangeTblEntry *subqueryRte) -{ - Query *subquery = NULL; - ListCell *insertTargetEntryCell; - List *newSubqueryTargetlist = NIL; - List *newInsertTargetlist = NIL; - int resno = 1; - Index insertTableId = 1; - Oid insertRelationId = InvalidOid; - int subqueryTargetLength = 0; - int targetEntryIndex = 0; - - AssertArg(InsertSelectIntoDistributedTable(originalQuery)); - - subquery = subqueryRte->subquery; - - insertRelationId = insertRte->relid; - - /* - * We implement the following algorithm for the reoderding: - * - Iterate over the INSERT target list entries - * - If the target entry includes a Var, find the corresponding - * SELECT target entry on the original query and update resno - * - If the target entry does not include a Var (i.e., defaults - * or constants), create new target entry and add that to - * SELECT target list - * - Create a new INSERT target entry with respect to the new - * SELECT target entry created. - */ - foreach(insertTargetEntryCell, originalQuery->targetList) - { - TargetEntry *oldInsertTargetEntry = lfirst(insertTargetEntryCell); - TargetEntry *newInsertTargetEntry = NULL; - Var *newInsertVar = NULL; - TargetEntry *newSubqueryTargetEntry = NULL; - List *targetVarList = NULL; - int targetVarCount = 0; - AttrNumber originalAttrNo = get_attnum(insertRelationId, - oldInsertTargetEntry->resname); - - /* see transformInsertRow() for the details */ - if (IsA(oldInsertTargetEntry->expr, ArrayRef) || - IsA(oldInsertTargetEntry->expr, FieldStore)) - { - ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg( - "cannot plan distributed INSERT INTO ... SELECT query"), - errhint("Do not use array references and field stores " - "on the INSERT target list."))); - } - - /* - * It is safe to pull Var clause and ignore the coercions since that - * are already going to be added on the workers implicitly. - */ - targetVarList = pull_var_clause((Node *) oldInsertTargetEntry->expr, - PVC_RECURSE_AGGREGATES); - - targetVarCount = list_length(targetVarList); - - /* a single INSERT target entry cannot have more than one Var */ - Assert(targetVarCount <= 1); - - if (targetVarCount == 1) - { - Var *oldInsertVar = (Var *) linitial(targetVarList); - TargetEntry *oldSubqueryTle = list_nth(subquery->targetList, - oldInsertVar->varattno - 1); - - newSubqueryTargetEntry = copyObject(oldSubqueryTle); - - newSubqueryTargetEntry->resno = resno; - newSubqueryTargetlist = lappend(newSubqueryTargetlist, - newSubqueryTargetEntry); - } - else - { - newSubqueryTargetEntry = makeTargetEntry(oldInsertTargetEntry->expr, - resno, - oldInsertTargetEntry->resname, - oldInsertTargetEntry->resjunk); - newSubqueryTargetlist = lappend(newSubqueryTargetlist, - newSubqueryTargetEntry); - } - - /* - * The newly created select target entry cannot be a junk entry since junk - * entries are not in the final target list and we're processing the - * final target list entries. - */ - Assert(!newSubqueryTargetEntry->resjunk); - - newInsertVar = makeVar(insertTableId, originalAttrNo, - exprType((Node *) newSubqueryTargetEntry->expr), - exprTypmod((Node *) newSubqueryTargetEntry->expr), - exprCollation((Node *) newSubqueryTargetEntry->expr), - 0); - newInsertTargetEntry = makeTargetEntry((Expr *) newInsertVar, originalAttrNo, - oldInsertTargetEntry->resname, - oldInsertTargetEntry->resjunk); - - newInsertTargetlist = lappend(newInsertTargetlist, newInsertTargetEntry); - resno++; - } - - /* - * if there are any remaining target list entries (i.e., GROUP BY column not on the - * target list of subquery), update the remaining resnos. - */ - subqueryTargetLength = list_length(subquery->targetList); - for (; targetEntryIndex < subqueryTargetLength; ++targetEntryIndex) - { - TargetEntry *oldSubqueryTle = list_nth(subquery->targetList, - targetEntryIndex); - TargetEntry *newSubqueryTargetEntry = NULL; - - /* - * Skip non-junk entries since we've already processed them above and this - * loop only is intended for junk entries. - */ - if (!oldSubqueryTle->resjunk) - { - continue; - } - - newSubqueryTargetEntry = copyObject(oldSubqueryTle); - - newSubqueryTargetEntry->resno = resno; - newSubqueryTargetlist = lappend(newSubqueryTargetlist, - newSubqueryTargetEntry); - - resno++; - } - - originalQuery->targetList = newInsertTargetlist; - subquery->targetList = newSubqueryTargetlist; - - return NULL; -} - - -/* - * InsertSelectIntoDistributedTable returns true when the input query is an - * INSERT INTO ... SELECT kind of query and the target is a distributed - * table. - * - * Note that the input query should be the original parsetree of - * the query (i.e., not passed trough the standard planner). - * - * This function is inspired from getInsertSelectQuery() on - * rewrite/rewriteManip.c. - */ -bool -InsertSelectIntoDistributedTable(Query *query) -{ - CmdType commandType = query->commandType; - List *fromList = NULL; - RangeTblRef *rangeTableReference = NULL; - RangeTblEntry *subqueryRte = NULL; - RangeTblEntry *insertRte = NULL; - - if (commandType != CMD_INSERT) - { - return false; - } - - if (query->jointree == NULL || !IsA(query->jointree, FromExpr)) - { - return false; - } - - fromList = query->jointree->fromlist; - if (list_length(fromList) != 1) - { - return false; - } - - rangeTableReference = linitial(fromList); - if (!IsA(rangeTableReference, RangeTblRef)) - { - return false; - } - - subqueryRte = rt_fetch(rangeTableReference->rtindex, query->rtable); - if (subqueryRte->rtekind != RTE_SUBQUERY) - { - return false; - } - - /* ensure that there is a query */ - Assert(IsA(subqueryRte->subquery, Query)); - - insertRte = ExtractInsertRangeTableEntry(query); - if (!IsDistributedTable(insertRte->relid)) - { - return false; - } - - return true; -} - - /* * Copy a RelationRestrictionContext. Note that several subfields are copied * shallowly, for lack of copyObject support. diff --git a/src/backend/distributed/test/deparse_shard_query.c b/src/backend/distributed/test/deparse_shard_query.c index 76be876a1..abccca4af 100644 --- a/src/backend/distributed/test/deparse_shard_query.c +++ b/src/backend/distributed/test/deparse_shard_query.c @@ -19,6 +19,7 @@ #include "catalog/pg_type.h" #include "distributed/master_protocol.h" #include "distributed/citus_ruleutils.h" +#include "distributed/insert_select_planner.h" #include "distributed/multi_router_planner.h" #include "distributed/test_helper_functions.h" /* IWYU pragma: keep */ #include "lib/stringinfo.h" diff --git a/src/backend/distributed/utils/citus_clauses.c b/src/backend/distributed/utils/citus_clauses.c index 8514073a4..af90d0b12 100644 --- a/src/backend/distributed/utils/citus_clauses.c +++ b/src/backend/distributed/utils/citus_clauses.c @@ -9,6 +9,7 @@ #include "postgres.h" #include "distributed/citus_clauses.h" +#include "distributed/insert_select_planner.h" #include "distributed/multi_router_planner.h" #include "catalog/pg_type.h" diff --git a/src/include/distributed/insert_select_planner.h b/src/include/distributed/insert_select_planner.h index 5fccdacde..e89372f81 100644 --- a/src/include/distributed/insert_select_planner.h +++ b/src/include/distributed/insert_select_planner.h @@ -17,13 +17,21 @@ #include "postgres.h" #include "distributed/multi_physical_planner.h" +#include "distributed/multi_planner.h" #include "nodes/execnodes.h" #include "nodes/parsenodes.h" #include "nodes/plannodes.h" -extern MultiPlan * CreateCoordinatorInsertSelectPlan(Query *originalQuery); +extern bool InsertSelectIntoDistributedTable(Query *query); +extern Query * ReorderInsertSelectTargetLists(Query *originalQuery, + RangeTblEntry *insertRte, + RangeTblEntry *subqueryRte); extern void CoordinatorInsertSelectExplainScan(CustomScanState *node, List *ancestors, struct ExplainState *es); +extern MultiPlan * CreateInsertSelectPlan(Query *originalQuery, + PlannerRestrictionContext * + plannerRestrictionContext); + #endif /* INSERT_SELECT_PLANNER_H */ diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index 32f54ce3d..80ee4eb5e 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -35,18 +35,12 @@ extern bool RouterSelectQuery(Query *originalQuery, RelationRestrictionContext *restrictionContext, List **placementList, uint64 *anchorShardId, List **relationShardList, bool replacePrunedQueryWithDummy); -extern MultiPlan * CreateDistributedInsertSelectPlan(Query *originalQuery, - PlannerRestrictionContext * - plannerRestrictionContext); +extern List * IntersectPlacementList(List *lhsPlacementList, List *rhsPlacementList); extern DeferredErrorMessage * ModifyQuerySupported(Query *queryTree); -extern Query * ReorderInsertSelectTargetLists(Query *originalQuery, - RangeTblEntry *insertRte, - RangeTblEntry *subqueryRte); extern List * ShardIntervalOpExpressions(ShardInterval *shardInterval, Index rteIndex); extern RelationRestrictionContext * CopyRelationRestrictionContext( RelationRestrictionContext *oldContext); -extern bool InsertSelectIntoDistributedTable(Query *query); extern Oid ExtractFirstDistributedTableId(Query *query); extern RangeTblEntry * ExtractSelectRangeTableEntry(Query *query); extern RangeTblEntry * ExtractInsertRangeTableEntry(Query *query);