diff --git a/src/backend/distributed/planner/multi_logical_optimizer.c b/src/backend/distributed/planner/multi_logical_optimizer.c index 5242e3cdc..e66b615f2 100644 --- a/src/backend/distributed/planner/multi_logical_optimizer.c +++ b/src/backend/distributed/planner/multi_logical_optimizer.c @@ -152,14 +152,12 @@ static void ErrorIfCannotPushdownSubquery(Query *subqueryTree, bool outerQueryHa static void ErrorIfUnsupportedTableCombination(Query *queryTree); static void ErrorIfUnsupportedUnionQuery(Query *unionQuery); static bool TargetListOnPartitionColumn(Query *query, List *targetEntryList); -static bool IsPartitionColumnRecursive(Expr *columnExpression, Query *query); static FieldSelect * CompositeFieldRecursive(Expr *expression, Query *query); static bool FullCompositeFieldList(List *compositeFieldList); static Query * LateralQuery(Query *query); static bool SupportedLateralQuery(Query *parentQuery, Query *lateralQuery); static bool JoinOnPartitionColumn(Query *query); static void ErrorIfUnsupportedShardDistribution(Query *query); -static List * RelationIdList(Query *query); static bool CoPartitionedTables(Oid firstRelationId, Oid secondRelationId); static bool ShardIntervalsEqual(FmgrInfo *comparisonFunction, ShardInterval *firstInterval, @@ -3318,7 +3316,7 @@ TargetListOnPartitionColumn(Query *query, List *targetEntryList) * Note that if the given expression is a field of a composite type, then this * function checks if this composite column is a partition column. */ -static bool +bool IsPartitionColumnRecursive(Expr *columnExpression, Query *query) { bool isPartitionColumn = false; diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 70e819d6d..f2976e3da 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -150,8 +150,6 @@ static List * DataFetchTaskList(uint64 jobId, uint32 taskIdIndex, List *fragment static StringInfo NodeNameArrayString(List *workerNodeList); static StringInfo NodePortArrayString(List *workerNodeList); static StringInfo DatumArrayString(Datum *datumArray, uint32 datumCount, Oid datumTypeId); -static Task * CreateBasicTask(uint64 jobId, uint32 taskId, TaskType taskType, - char *queryString); static void UpdateRangeTableAlias(List *rangeTableList, List *fragmentList); static Alias * FragmentAlias(RangeTblEntry *rangeTableEntry, RangeTableFragment *fragment); @@ -3939,7 +3937,7 @@ DatumArrayString(Datum *datumArray, uint32 datumCount, Oid datumTypeId) * CreateBasicTask creates a task, initializes fields that are common to each task, * and returns the created task. */ -static Task * +Task * CreateBasicTask(uint64 jobId, uint32 taskId, TaskType taskType, char *queryString) { Task *task = CitusMakeNode(Task); diff --git a/src/backend/distributed/planner/multi_planner.c b/src/backend/distributed/planner/multi_planner.c index 8cb91cd28..d4dfc9f98 100644 --- a/src/backend/distributed/planner/multi_planner.c +++ b/src/backend/distributed/planner/multi_planner.c @@ -59,6 +59,24 @@ multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) if (needsDistributedPlanning) { originalQuery = copyObject(parse); + + /* + * We implement INSERT INTO .. SELECT by pushing down the SELECT to + * each shard. That requires that the SELECT is co-located with the + * target table. To compute that we use the router planner, by adding + * a "hidden" constraint that the partition column be equal to a + * certain value. standard_planner() distributes that constraint to + * all affected table's baserestrictinfos. The router planner then + * iterates over the target table's shards, for each we replace the + * "hidden" restriction, with one that PruneShardList() handles, and + * then generate a query for that individual shard. If any of the + * involved tables don't prune down to a single shard, or if the + * pruned shards aren't colocated, we error out. + */ + if (InsertSelectQuery(parse)) + { + AddHiddenPartitionColumnParameter(parse); + } } /* create a restriction context and put it at the end if context list */ diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 46acf055e..063942354 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -19,12 +19,14 @@ #include "access/xact.h" #include "distributed/citus_clauses.h" #include "catalog/pg_type.h" +#include "distributed/colocation_utils.h" #include "distributed/citus_nodes.h" #include "distributed/citus_nodefuncs.h" #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" #include "distributed/multi_join_order.h" #include "distributed/multi_logical_planner.h" +#include "distributed/multi_logical_optimizer.h" #include "distributed/multi_physical_planner.h" #include "distributed/multi_router_executor.h" #include "distributed/multi_router_planner.h" @@ -46,6 +48,7 @@ #include "optimizer/restrictinfo.h" #include "optimizer/var.h" #include "parser/parsetree.h" +#include "parser/parse_oper.h" #include "storage/lock.h" #include "utils/elog.h" #include "utils/errcodes.h" @@ -65,6 +68,12 @@ typedef struct WalkerState /* planner functions forward declarations */ +static MultiPlan * CreateSingleTaskRouterPlan(Query *originalQuery, Query *query, + RelationRestrictionContext * + restrictionContext); +static MultiPlan * CreateMultiTaskRouterPlan(Query *originalQuery, Query *query, + RelationRestrictionContext * + restrictionContext); static bool MasterIrreducibleExpression(Node *expression, bool *varArgument, bool *badCoalesce); static bool MasterIrreducibleExpressionWalker(Node *expression, WalkerState *state); @@ -82,6 +91,9 @@ static Const * ExtractInsertPartitionValue(Query *query, Var *partitionColumn); static Task * RouterSelectTask(Query *originalQuery, Query *query, RelationRestrictionContext *restrictionContext, List **placementList); +static Query * RouterSelectQuery(Query *originalQuery, Query *query, + RelationRestrictionContext *restrictionContext, + List **placementList, uint64 *anchorShardId); static List * TargetShardIntervalsForSelect(Query *query, RelationRestrictionContext *restrictionContext); static List * WorkersContainingAllShards(List *prunedShardIntervalsList); @@ -91,26 +103,26 @@ static bool UpdateRelationNames(Node *node, static Job * RouterQueryJob(Query *query, Task *task, List *placementList); static bool MultiRouterPlannableQuery(Query *query, MultiExecutorType taskExecutorType, RelationRestrictionContext *restrictionContext); -static bool InsertSelectQuery(Query *query); +static RelationRestrictionContext * copyRelationRestrictionContext( + RelationRestrictionContext *oldContext); +static Node * ReplaceHiddenParameter(Node *node, void *context); +static Var * MakeInt4Column(); +static Const * MakeInt4Constant(Datum constantValue); +static void ErrorIfInsertSelectQueryNotSupported(Query *queryTree); +static void ErrorIfNotAllParticipatingTablesAreColocated(Query *query); +static void ErrorIfInsertPartitionColumnDoesNotMatchSelect(Query *query); +static void AddHiddenParameterToFirstTableRecursively(Query *query); + /* - * MultiRouterPlanCreate creates a physical plan for given query. The created plan is - * either a modify task that changes a single shard, or a router task that returns - * query results from a single shard. Supported modify queries (insert/update/delete) - * are router plannable by default. If query is not router plannable then the function - * returns NULL. + * MultiRouterPlanCreate */ MultiPlan * MultiRouterPlanCreate(Query *originalQuery, Query *query, MultiExecutorType taskExecutorType, RelationRestrictionContext *restrictionContext) { - Task *task = NULL; - Job *job = NULL; MultiPlan *multiPlan = NULL; - CmdType commandType = query->commandType; - bool modifyTask = false; - List *placementList = NIL; bool routerPlannable = MultiRouterPlannableQuery(query, taskExecutorType, restrictionContext); @@ -119,6 +131,36 @@ MultiRouterPlanCreate(Query *originalQuery, Query *query, return NULL; } + if (InsertSelectQuery(query)) + { + multiPlan = CreateMultiTaskRouterPlan(originalQuery, query, restrictionContext); + } + else + { + multiPlan = CreateSingleTaskRouterPlan(originalQuery, query, restrictionContext); + } + + return multiPlan; +} + + +/* + * CreateSingleTaskRouterPlan creates a physical plan for given query. The created plan is + * either a modify task that changes a single shard, or a router task that returns + * query results from a single shard. Supported modify queries (insert/update/delete) + * are router plannable by default. If query is not router plannable then the function + * returns NULL. + */ +static MultiPlan * +CreateSingleTaskRouterPlan(Query *originalQuery, Query *query, + RelationRestrictionContext *restrictionContext) +{ + CmdType commandType = query->commandType; + bool modifyTask = false; + Job *job = NULL; + Task *task = NULL; + List *placementList = NIL; + MultiPlan *multiPlan = NULL; if (commandType == CMD_INSERT || commandType == CMD_UPDATE || commandType == CMD_DELETE) @@ -156,6 +198,396 @@ MultiRouterPlanCreate(Query *originalQuery, Query *query, } +/* + * Creates a router plan for INSERT ... SELECT queries which can consists of + * multiple tasks. + * + * The function never returns NULL, it errors out if cannot create the multi plan. + */ +static MultiPlan * +CreateMultiTaskRouterPlan(Query *originalQuery, Query *query, + RelationRestrictionContext *restrictionContext) +{ + Oid distributedTableId = ExtractFirstDistributedTableId(query); + DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); + int shardOffset = 0; + int shardCount = cacheEntry->shardIntervalArrayLength; + List *sqlTaskList = NIL; + uint32 taskIdIndex = 1; /* 0 is reserved for invalid taskId */ + Job *workerJob = NULL; + uint64 jobId = INVALID_JOB_ID; + MultiPlan *multiPlan = NULL; + + /* + * Error semantics for INSERT ... SELECT queries are different than regular + * modify queries. Thus, handle separately. + */ + ErrorIfInsertSelectQueryNotSupported(originalQuery); + + /* + * Plan select query for each shard in the target table. Do so by + * replacing the magic parameters added in multi_planner() with actual + * current shard's boundary values. Then perform the normal shard + * pruning. + */ + for (shardOffset = 0; shardOffset < shardCount; shardOffset++) + { + Query *subquery = ((RangeTblEntry *) list_nth(query->rtable, 1))->subquery; + Query *copiedOriginal = copyObject(originalQuery); + Query *originalSubquery = ((RangeTblEntry *) list_nth(copiedOriginal->rtable, + 1))->subquery; + RelationRestrictionContext *copiedRestrictionContext = + copyRelationRestrictionContext(restrictionContext); + ShardInterval *shardInterval = + cacheEntry->sortedShardIntervalArray[shardOffset]; + uint64 shardId = shardInterval->shardId; + StringInfo queryString = makeStringInfo(); + ListCell *restrictionCell = NULL; + Query *routerQuery = NULL; + Task *sqlTask = NULL; + List *selectPlacementList = NIL; + uint64 selectAnchorShardId = INVALID_SHARD_ID; + List *insertShardPlacementList = NULL; + List *intersectedPlacementList = NULL; + + /* + * Replace the magic value in all baserestrictinfos. Note that + * this has to be done on a copy, as the walker modifies in place. + */ + foreach(restrictionCell, copiedRestrictionContext->relationRestrictionList) + { + RelationRestriction *restriction = lfirst(restrictionCell); + + restriction->relOptInfo->baserestrictinfo = (List *) + ReplaceHiddenParameter( + (Node *) restriction->relOptInfo->baserestrictinfo, + shardInterval); + } + + /* + * Use select planner to generate query for this specific + * shard. We don't use the generated query, just rely on the + * side-effect that all RTEs have been updated to point to the + * relevant nodes. + */ + routerQuery = RouterSelectQuery(originalSubquery, subquery, + copiedRestrictionContext, &selectPlacementList, + &selectAnchorShardId); + + if (routerQuery == NULL) + { + elog(ERROR, "couldn't prune down sufficiently for insert pushdown"); + } + + /* Ensure that we have INSERTed table's placement exists on the same worker */ + insertShardPlacementList = ShardPlacementList(shardId); + intersectedPlacementList = IntersectPlacementList(insertShardPlacementList, + selectPlacementList); + + if (list_length(insertShardPlacementList) != list_length( + intersectedPlacementList)) + { + ereport(DEBUG2, (errmsg("insert table does not have the same placements on " + "the select placement list. Skipping this task"))); + + continue; + } + + ReorderInsertSelectTargetListsIfExists(copiedOriginal); + + /* and generate the full query string */ + deparse_shard_query(copiedOriginal, distributedTableId, shardInterval->shardId, + queryString); + ereport(DEBUG4, (errmsg("distributed statement: %s", queryString->data))); + + + sqlTask = CreateBasicTask(jobId, taskIdIndex++, SQL_TASK, queryString->data); + sqlTask->dependedTaskList = NULL; + sqlTask->anchorShardId = shardId; + sqlTask->taskPlacementList = insertShardPlacementList; + + sqlTaskList = lappend(sqlTaskList, sqlTask); + } + + + /* Create the worker job */ + workerJob = CitusMakeNode(Job); + workerJob->taskList = sqlTaskList; + workerJob->subqueryPushdown = false; + workerJob->dependedJobList = NIL; + workerJob->jobId = jobId; + workerJob->jobQuery = originalQuery; + workerJob->requiresMasterEvaluation = false; /* for now we do not support any function evaluation */ + + /* and finally the multi plan */ + multiPlan = CitusMakeNode(MultiPlan); + multiPlan->workerJob = workerJob; + multiPlan->masterTableName = NULL; + multiPlan->masterQuery = NULL; + + return multiPlan; +} + + +/* + * ErrorIfInsertSelectQueryNotSupported errors out for unsupported + * INSERT ... SELECT queries. + */ +static void +ErrorIfInsertSelectQueryNotSupported(Query *queryTree) +{ + RangeTblEntry *insertRte = NULL; + RangeTblEntry *subqueryRte = NULL; + Query *subquery = NULL; + Oid insertRelationId = InvalidOid; + + /* we only do this check for INSERT ... SELECT queries */ + AssertArg(InsertSelectQuery(queryTree)); + + insertRte = linitial(queryTree->rtable); + subqueryRte = lsecond(queryTree->rtable); + subquery = subqueryRte->subquery; + insertRelationId = insertRte->relid; + + /* we support this feature only for colocated tables */ + ErrorIfNotAllParticipatingTablesAreColocated(queryTree); + + if (contain_mutable_functions((Node *) queryTree)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("non-IMMUTABLE functions are not allowed in INSERT ... " + "SELECT queries"))); + } + + if (subquery->limitCount != NULL) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("LIMIT clause are not allowed in INSERT ... SELECT " + "queries"))); + } + + if (subquery->limitOffset != NULL) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("OFFSET clause are not allowed in INSERT ... SELECT " + "queries"))); + } + + /*TODO: check with Andres. Should we allow on partition column? I'm cool with not having window functions */ + if (subquery->windowClause != NULL) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("Window functions are not allowed in INSERT ... SELECT " + "queries"))); + } + + ErrorIfInsertPartitionColumnDoesNotMatchSelect(queryTree); +} + + + +/* + * ErrorIfNotAllParticipatingTablesAreColocated errors out of all tables + * referenced in the query are not colocated. + */ +static void +ErrorIfNotAllParticipatingTablesAreColocated(Query *query) +{ + List *relationIdList = RelationIdList(query); + ListCell *relationIdCell = NULL; + uint64 colocationId = INVALID_COLOCATION_ID; + bool tablesAreColocated = true; + + foreach(relationIdCell, relationIdList) + { + Oid relationId = lfirst_oid(relationIdCell); + uint64 currentColocationId = TableColocationId(relationId); + + if (currentColocationId == INVALID_COLOCATION_ID) + { + tablesAreColocated = false; + break; + } + + /* set for the first table */ + if (colocationId == INVALID_COLOCATION_ID) + { + colocationId = currentColocationId; + } + + if (colocationId != currentColocationId) + { + tablesAreColocated = false; + break; + } + } + + + if (!tablesAreColocated) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("all participating tables should be colocated"))); + } + + return; +} + + +/* + * ErrorIfInsertPartitionColumnDoesNotMatchSelect checks whether the INSERTed table's + * partition column value matches with the any of the SELECTed table's partition column. + */ +static void +ErrorIfInsertPartitionColumnDoesNotMatchSelect(Query *query) +{ + ListCell *targetEntryCell = NULL; + uint32 rangeTableId = 1; + RangeTblEntry *insertRte = linitial(query->rtable); + RangeTblEntry *subqueryRte = lsecond(query->rtable); + Query *subquery = subqueryRte->subquery; + Oid insertRelationId = insertRte->relid; + Var *insertPartitionColumn = PartitionColumn(insertRelationId, rangeTableId); + bool partitionColumnsMatch = false; + + foreach(targetEntryCell, query->targetList) + { + TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); + + if (IsA(targetEntry->expr, Var)) + { + Var *insertVar = (Var *) targetEntry->expr; + AttrNumber originalAttrNo = get_attnum(insertRelationId, + targetEntry->resname); + TargetEntry *subqeryTargetEntry = NULL; + + if (originalAttrNo != insertPartitionColumn->varattno) + { + continue; + } + + subqeryTargetEntry = list_nth(subquery->targetList, + insertVar->varattno - 1); + + if (!IsA(subqeryTargetEntry->expr, Var)) + { + partitionColumnsMatch = false; + break; + } + + if (!IsPartitionColumnRecursive(subqeryTargetEntry->expr, subquery)) + { + partitionColumnsMatch = false; + break; + } + + partitionColumnsMatch = true; + break; + } + } + + if (!partitionColumnsMatch) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("SELECT query should return bare partition column on " + "the same ordinal position with INSERT query's partition " + "column"))); + } +} + + +/* + * AddHiddenPartitionColumnParameter() can only be used with + * INSERT ... SELECT queries. We add this hidden parameter to + * recursively for subqueries. + * + * If the input query is not INSERT .. SELECT the function errors-out. + */ +void +AddHiddenPartitionColumnParameter(Query *originalQuery) +{ + Query *subquery = NULL; + RangeTblEntry *subqueryEntry = NULL; + + if (!InsertSelectQuery(originalQuery)) + { + elog(ERROR, "Only INSERT .. SELECT queries can be modified"); + } + + subqueryEntry = (RangeTblEntry *) list_nth(originalQuery->rtable, 1); + subquery = subqueryEntry->subquery; + + AddHiddenParameterToFirstTableRecursively(subquery); +} + + +/* + * AddHiddenParameterToFirstTableRecursively adds a hidden parameter + * ($1 = partitionColumn) for the first table on the query. + */ +static void +AddHiddenParameterToFirstTableRecursively(Query *query) +{ + Param *hiddenParam = makeNode(Param); + Node *hiddenBound = NULL; + Oid firstTableId = ExtractFirstDistributedTableId(query); + Var *partitionColumn = PartitionColumn(firstTableId, 1); + Oid partitionColumnCollid = partitionColumn->varcollid; + Oid lessThanOperator = InvalidOid; + Oid equalsOperator = InvalidOid; + Oid greaterOperator = InvalidOid; + bool hashable = false; + + List *subqueryEntryList = NIL; + ListCell *rangeTableEntryCell = NULL; + + AssertArg(query->commandType == CMD_SELECT); + + hiddenParam->paramkind = PARAM_EXTERN; + hiddenParam->paramid = HIDDEN_PARAMETER_ID; + hiddenParam->paramtype = partitionColumn->vartype; + hiddenParam->paramtypmod = partitionColumn->vartypmod; + hiddenParam->paramcollid = partitionColumnCollid; + hiddenParam->location = -1; + + get_sort_group_operators(partitionColumn->vartype, true, true, true, + &lessThanOperator, &equalsOperator, &greaterOperator, + &hashable); + + /* + * XXX: Using an equality constraint here isn't exactly correct, + * might want to replace it with >= and <=. + * + * It looks like this works. + */ + hiddenBound = (Node *) + make_opclause(equalsOperator, InvalidOid, false, + (Expr *) hiddenParam, (Expr *) partitionColumn, + partitionColumnCollid, partitionColumnCollid); + + /* add restriction on partition column */ + if (query->jointree->quals == NULL) + { + query->jointree->quals = hiddenBound; + } + else + { + query->jointree->quals = make_and_qual(query->jointree->quals, + hiddenBound); + } + + /* recursively do same addition for subqueries of this query */ + subqueryEntryList = SubqueryEntryList(query); + foreach(rangeTableEntryCell, subqueryEntryList) + { + RangeTblEntry *rangeTableEntry = + (RangeTblEntry *) lfirst(rangeTableEntryCell); + + Query *innerSubquery = rangeTableEntry->subquery; + AddHiddenParameterToFirstTableRecursively(innerSubquery); + } +} + + /* * ErrorIfModifyQueryNotSupported checks if the query contains unsupported features, * and errors out if it does. @@ -1039,11 +1471,44 @@ RouterSelectTask(Query *originalQuery, Query *query, List **placementList) { Task *task = NULL; + StringInfo queryString = makeStringInfo(); + bool upsertQuery = false; + uint64 shardId = INVALID_SHARD_ID; + + originalQuery = RouterSelectQuery(originalQuery, query, restrictionContext, + placementList, &shardId); + + + if (originalQuery == NULL) + { + return NULL; + } + + pg_get_query_def(originalQuery, queryString); + + task = CitusMakeNode(Task); + task->jobId = INVALID_JOB_ID; + task->taskId = INVALID_TASK_ID; + task->taskType = ROUTER_TASK; + task->queryString = queryString->data; + task->anchorShardId = shardId; + task->dependedTaskList = NIL; + task->upsertQuery = upsertQuery; + //task->requiresMasterEvaluation = false; + + return task; +} + + +/* RouterSelectQuery builds a Task to represent a single shard select query */ +static Query * +RouterSelectQuery(Query *originalQuery, Query *query, + RelationRestrictionContext *restrictionContext, + List **placementList, uint64 *anchorShardId) +{ List *prunedRelationShardList = TargetShardIntervalsForSelect(query, restrictionContext); - StringInfo queryString = makeStringInfo(); uint64 shardId = INVALID_SHARD_ID; - bool upsertQuery = false; CmdType commandType PG_USED_FOR_ASSERTS_ONLY = query->commandType; ListCell *prunedRelationShardListCell = NULL; List *workerList = NIL; @@ -1115,20 +1580,10 @@ RouterSelectTask(Query *originalQuery, Query *query, UpdateRelationNames((Node *) originalQuery, restrictionContext); - pg_get_query_def(originalQuery, queryString); - - task = CitusMakeNode(Task); - task->jobId = INVALID_JOB_ID; - task->taskId = INVALID_TASK_ID; - task->taskType = ROUTER_TASK; - task->queryString = queryString->data; - task->anchorShardId = shardId; - task->dependedTaskList = NIL; - task->upsertQuery = upsertQuery; - *placementList = workerList; + *anchorShardId = shardId; - return task; + return originalQuery; } @@ -1167,6 +1622,11 @@ TargetShardIntervalsForSelect(Query *query, List *pseudoRestrictionList = extract_actual_clauses(joinInfoList, true); bool whereFalseQuery = false; + /* elog(DEBUG2, "relation id: %d", relationId); */ + /* elog(DEBUG2, "restrictClauseList-: %s", pretty_format_node_dump(nodeToString(restrictClauseList))); */ + /* elog(DEBUG2, "join info-: %s", pretty_format_node_dump(nodeToString(relationRestriction->relOptInfo->joininfo))); */ + + relationRestriction->prunedShardIntervalList = NIL; /* @@ -1718,7 +2178,7 @@ ReorderInsertSelectTargetListsIfExists(Query *originalQuery) * InsertSelectQuery returns true when the input query * is INSERT INTO ... SELECT kind of query. */ -static bool +bool InsertSelectQuery(Query *query) { CmdType commandType = query->commandType; @@ -1746,3 +2206,178 @@ InsertSelectQuery(Query *query) return true; } + + +/* + * Copy a RelationRestrictionContext. Note that several subfields are copied + * shallowly, for lack of copyObject support. + */ +static RelationRestrictionContext * +copyRelationRestrictionContext(RelationRestrictionContext *oldContext) +{ + RelationRestrictionContext *newContext = (RelationRestrictionContext *) + palloc(sizeof(RelationRestrictionContext)); + ListCell *relationRestrictionCell = NULL; + + newContext->hasDistributedRelation = oldContext->hasDistributedRelation; + newContext->hasLocalRelation = oldContext->hasLocalRelation; + newContext->relationRestrictionList = NIL; + + foreach(relationRestrictionCell, oldContext->relationRestrictionList) + { + RelationRestriction *oldRestriction = + (RelationRestriction *) lfirst(relationRestrictionCell); + RelationRestriction *newRestriction = (RelationRestriction *) + palloc0(sizeof(RelationRestriction)); + + newRestriction->index = oldRestriction->index; + newRestriction->relationId = oldRestriction->relationId; + newRestriction->distributedRelation = oldRestriction->distributedRelation; + newRestriction->rte = copyObject(oldRestriction->rte); + + /* can't be copied, we copy (flatly) a RelOptInfo, and then decouple baserestrictinfo */ + newRestriction->relOptInfo = palloc(sizeof(RelOptInfo)); + memcpy(newRestriction->relOptInfo, oldRestriction->relOptInfo, + sizeof(RelOptInfo)); + newRestriction->relOptInfo->baserestrictinfo = copyObject( + oldRestriction->relOptInfo->baserestrictinfo); + + /* not copyable, but readonly */ + newRestriction->plannerInfo = oldRestriction->plannerInfo; + newRestriction->prunedShardIntervalList = copyObject( + oldRestriction->prunedShardIntervalList); + + newContext->relationRestrictionList = + lappend(newContext->relationRestrictionList, newRestriction); + } + + return newContext; +} + + +/* + * Replace the "hidden" partition restriction clause with the current shard's + * (passed in context) boundary value. + */ +static Node * +ReplaceHiddenParameter(Node *node, void *context) +{ + ShardInterval *shardInterval = (ShardInterval *) context; + Assert(shardInterval->minValueExists); + Assert(shardInterval->maxValueExists); + + if (node == NULL) + { + return NULL; + } + + if (IsA(node, OpExpr)) + { + OpExpr *op = (OpExpr *) node; + if (list_length(op->args) == 2) + { + Node *leftop = get_leftop((Expr *) op); + Node *rightop = get_rightop((Expr *) op); + Param *param = NULL; + + /* + * TODO: do we really need Var? Postgres replaces Var with Const in case we already have the same + * Var in the restrictInfo + * */ + if (IsA(leftop, Param))/* && IsA(rightop, Var)) */ + { + param = (Param *) leftop; + } + else if (IsA(rightop, Param)) //IsA(leftop, Var))/* &&) */ + { + param = (Param *) rightop; + } + + /* + * Found hidden op, replace with appropriate boundaries for the + * current shard interval. + */ + if (param && param->paramid == HIDDEN_PARAMETER_ID) + { + Var *hashedColumn = NULL; + OpExpr *hashedOperatorExpression = NULL; + + hashedColumn = MakeInt4Column(); + hashedOperatorExpression = (OpExpr *) + make_opclause(96, + InvalidOid, + false, /* no return set */ + (Expr *) hashedColumn, + (Expr *) MakeInt4Constant( + shardInterval->maxValue), + InvalidOid, InvalidOid); + hashedOperatorExpression->opfuncid = get_opcode( + hashedOperatorExpression->opno); + hashedOperatorExpression->opresulttype = get_func_rettype( + hashedOperatorExpression->opfuncid); + return (Node *) hashedOperatorExpression; + } + } + } + + if (IsA(node, Query)) + { + /* FIXME: probably can remove support for this */ + /* to support CTEs, subqueries, etc */ + return (Node *) query_tree_mutator((Query *) node, + ReplaceHiddenParameter, + context, + QTW_EXAMINE_RTES); + } + else if (IsA(node, RestrictInfo)) + { + RestrictInfo *restrictInfo = (RestrictInfo *) node; + restrictInfo->clause = (Expr *) ReplaceHiddenParameter( + (Node *) restrictInfo->clause, context); + + return (Node *) restrictInfo; + } + + return expression_tree_mutator(node, ReplaceHiddenParameter, context); +} + + +/* + * MakeInt4Column creates a column of int4 type with invalid table id and max + * attribute number. + */ +static Var * +MakeInt4Column() +{ + Index tableId = 0; + AttrNumber columnAttributeNumber = RESERVED_HASHED_COLUMN_ID; + Oid columnType = INT4OID; + int32 columnTypeMod = -1; + Oid columnCollationOid = InvalidOid; + Index columnLevelSup = 0; + + Var *int4Column = makeVar(tableId, columnAttributeNumber, columnType, + columnTypeMod, columnCollationOid, columnLevelSup); + return int4Column; +} + + +/* + * MakeInt4Constant creates a new constant of int4 type and assigns the given + * value as a constant value. + */ +static Const * +MakeInt4Constant(Datum constantValue) +{ + Oid constantType = INT4OID; + int32 constantTypeMode = -1; + Oid constantCollationId = InvalidOid; + int constantLength = sizeof(int32); + bool constantIsNull = false; + bool constantByValue = true; + + Const *int4Constant = makeConst(constantType, constantTypeMode, constantCollationId, + constantLength, constantValue, constantIsNull, + constantByValue); + return int4Constant; +} diff --git a/src/include/distributed/multi_logical_optimizer.h b/src/include/distributed/multi_logical_optimizer.h index 3b840f935..9fc714698 100644 --- a/src/include/distributed/multi_logical_optimizer.h +++ b/src/include/distributed/multi_logical_optimizer.h @@ -122,6 +122,8 @@ extern bool ExtractQueryWalker(Node *node, List **queryList); extern bool LeafQuery(Query *queryTree); extern List * PartitionColumnOpExpressionList(Query *query); extern List * ReplaceColumnsInOpExpressionList(List *opExpressionList, Var *newColumn); +extern bool IsPartitionColumnRecursive(Expr *columnExpression, Query *query); +extern List * RelationIdList(Query *query); #endif /* MULTI_LOGICAL_OPTIMIZER_H */ diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 3d12fa3aa..e46959d4c 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -227,6 +227,8 @@ extern int TaskAssignmentPolicy; /* Function declarations for building physical plans and constructing queries */ extern MultiPlan * MultiPhysicalPlanCreate(MultiTreeRoot *multiTree); extern StringInfo ShardFetchQueryString(uint64 shardId); +extern Task * CreateBasicTask(uint64 jobId, uint32 taskId, TaskType taskType, + char *queryString); /* Function declarations for shard pruning */ extern List * PruneShardList(Oid relationId, Index tableId, List *whereClauseList, diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index 5d380575a..86e2e15a0 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -21,6 +21,9 @@ #include "nodes/parsenodes.h" +/* reserved parameted id */ +#define HIDDEN_PARAMETER_ID 0xdeadbeef + /* reserved alias name for UPSERTs */ #define UPSERT_ALIAS "citus_table_alias" @@ -28,7 +31,9 @@ extern MultiPlan * MultiRouterPlanCreate(Query *originalQuery, Query *query, MultiExecutorType taskExecutorType, RelationRestrictionContext *restrictionContext); +extern void AddHiddenPartitionColumnParameter(Query *originalQuery); extern void ErrorIfModifyQueryNotSupported(Query *queryTree); extern Query * ReorderInsertSelectTargetListsIfExists(Query *originalQuery); +extern bool InsertSelectQuery(Query *query); #endif /* MULTI_ROUTER_PLANNER_H */ diff --git a/src/test/regress/expected/multi_modifications.out b/src/test/regress/expected/multi_modifications.out index 1f2f6c6ba..821bbecbf 100644 --- a/src/test/regress/expected/multi_modifications.out +++ b/src/test/regress/expected/multi_modifications.out @@ -205,10 +205,9 @@ DELETE FROM limit_orders WHERE id = 246 AND placed_at = current_timestamp::times INSERT INTO limit_orders VALUES (DEFAULT), (DEFAULT); ERROR: cannot perform distributed planning for the given modification DETAIL: Multi-row INSERTs to distributed tables are not supported. +-- Who says that? :) -- INSERT ... SELECT ... FROM commands are unsupported -INSERT INTO limit_orders SELECT * FROM limit_orders; -ERROR: cannot perform distributed planning for the given modifications -DETAIL: Subqueries are not supported in distributed modifications. +-- INSERT INTO limit_orders SELECT * FROM limit_orders; -- commands containing a CTE are unsupported WITH deleted_orders AS (DELETE FROM limit_orders RETURNING *) INSERT INTO limit_orders DEFAULT VALUES; diff --git a/src/test/regress/sql/multi_modifications.sql b/src/test/regress/sql/multi_modifications.sql index 88eff899f..e8e493038 100644 --- a/src/test/regress/sql/multi_modifications.sql +++ b/src/test/regress/sql/multi_modifications.sql @@ -150,8 +150,9 @@ DELETE FROM limit_orders WHERE id = 246 AND placed_at = current_timestamp::times -- commands with multiple rows are unsupported INSERT INTO limit_orders VALUES (DEFAULT), (DEFAULT); +-- Who says that? :) -- INSERT ... SELECT ... FROM commands are unsupported -INSERT INTO limit_orders SELECT * FROM limit_orders; +-- INSERT INTO limit_orders SELECT * FROM limit_orders; -- commands containing a CTE are unsupported WITH deleted_orders AS (DELETE FROM limit_orders RETURNING *)