diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 68b3a17b7..86c65d325 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -65,6 +65,10 @@ #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/syscache.h" +static Query * +ConvertToInsertSelect(Query *parse); +static Query * +WrapRteValuesnIntoSubquery(RangeTblEntry *valuesRTE, List *targetList); static List *plannerRestrictionContextList = NIL; @@ -143,6 +147,12 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) List *rangeTableList = ExtractRangeTableEntryList(parse); + DistributedPlanningContext planContext = { + .query = parse, + .cursorOptions = cursorOptions, + .boundParams = boundParams, + }; + if (cursorOptions & CURSOR_OPT_FORCE_DISTRIBUTED) { /* this cursor flag could only be set when Citus has been loaded */ @@ -175,12 +185,6 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) int rteIdCounter = 1; - DistributedPlanningContext planContext = { - .query = parse, - .cursorOptions = cursorOptions, - .boundParams = boundParams, - }; - if (fastPathRouterQuery) { /* @@ -215,7 +219,15 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) * to be present in the copied query too. */ rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter); - planContext.originalQuery = copyObject(parse); + if (IsMultiRowInsert(parse)) + { + planContext.originalQuery = copyObject(parse); + planContext.originalQuery = ConvertToInsertSelect(parse); + } + else + { + planContext.originalQuery = copyObject(parse); + } bool setPartitionedTablesInherited = false; AdjustPartitioningForDistributedPlanning(rangeTableList, @@ -301,6 +313,168 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) return result; } +#include "parser/parse_node.h" +#include "parser/parse_relation.h" + +static Query * +ConvertToInsertSelect(Query *parse) +{ + + RangeTblEntry *rteValus = ExtractDistributedInsertValuesRTE(parse); + List *rowValuesList = rteValus->values_lists; + List *colnames = NIL; + + ListCell *workerTargetCell = NULL; + foreach(workerTargetCell, parse->targetList) + { + TargetEntry *workerTargetEntry = (TargetEntry *) lfirst(workerTargetCell); + Value *colName = makeString(workerTargetEntry->resname); + + colnames = lappend(colnames, colName); + } + + /* create range table entries */ + ParseState *pstate = make_parsestate(NULL); + Alias *selectAlias = makeAlias("multi_row_insert", colnames); + RangeTblEntry *rteValuesRangeTableEntry = + addRangeTableEntryForValues(pstate, rowValuesList,NIL, NIL, NIL, selectAlias, + false, true); + + rteValuesRangeTableEntry->coltypes = rteValus->coltypes; + rteValuesRangeTableEntry->coltypmods = rteValus->coltypmods; + rteValuesRangeTableEntry->colcollations = rteValus->colcollations; + + Query *subQuery = WrapRteValuesnIntoSubquery(rteValuesRangeTableEntry, parse->targetList); + +// StringInfo buf1 = makeStringInfo(); +// deparse_shard_query(subQuery, 0, 0, buf1); +// elog(DEBUG1, "subQuery:%s", buf1->data); +// elog(DEBUG1, "subQuery:%s", nodeToString(subQuery)); + + Query *insertSelectQuery = makeNode(Query); + insertSelectQuery->commandType = CMD_INSERT; + + insertSelectQuery->targetList = copyObject(parse->targetList); + + + RangeTblEntry *targetRelationRte = makeNode(RangeTblEntry); + targetRelationRte->rtekind = RTE_RELATION; + targetRelationRte->relid = ExtractFirstCitusTableId(parse); + targetRelationRte->relkind = get_rel_relkind(targetRelationRte->relid); + targetRelationRte->alias = selectAlias; + targetRelationRte->eref = selectAlias; + + RangeTblEntry *subqueryRte = makeNode(RangeTblEntry); + subqueryRte->rtekind = RTE_SUBQUERY; + subqueryRte->subquery = subQuery; + subqueryRte->alias = selectAlias; + subqueryRte->eref = selectAlias; + subqueryRte->inFromCl = true; + + RangeTblRef *targetRelationRangeTableRef = makeNode(RangeTblRef); + targetRelationRangeTableRef->rtindex = 1; + + RangeTblRef *subqueryRangeTableRef = makeNode(RangeTblRef); + subqueryRangeTableRef->rtindex = 2; + + insertSelectQuery->jointree = makeFromExpr(list_make1(subqueryRangeTableRef), NULL); + + /* set the FROM expression to the subquery */ + insertSelectQuery->rtable = list_make2(targetRelationRte, subqueryRte); + insertSelectQuery->resultRelation = targetRelationRangeTableRef->rtindex; + + + StringInfo buf = makeStringInfo(); + deparse_shard_query(insertSelectQuery, 0, 0, buf); + + insertSelectQuery->returningList = parse->returningList; + insertSelectQuery->onConflict = parse->onConflict; + + + return insertSelectQuery; +} + +/* + * WrapRteRelationIntoSubquery wraps the given relation range table entry + * in a newly constructed "(SELECT * FROM table_name as anchor_relation)" query. + * + * Note that the query returned by this function does not contain any filters or + * projections. The returned query should be used cautiosly and it is mostly + * designed for generating a stub query. + */ +static Query * +WrapRteValuesnIntoSubquery(RangeTblEntry *valuesRTE, List *targetList) +{ + Query *subQuery = makeNode(Query); + subQuery->commandType = CMD_SELECT; + + RangeTblRef *valuesRangeTableRef = makeNode(RangeTblRef); + valuesRangeTableRef->rtindex = 1; + + subQuery->jointree = makeFromExpr(list_make1(valuesRangeTableRef), NULL); + subQuery->rtable = list_make1(valuesRTE); + + List *colTypes = valuesRTE->coltypes; + List *colTypeMods = valuesRTE->coltypmods; + List *colCollations = valuesRTE->colcollations; + + ListCell *typeCell = NULL; + ListCell *modCell = NULL; + ListCell *collationCell = NULL; + + int colIndex = 1; +elog(INFO, "colTypes: %s", nodeToString(colTypes)); + forthree(typeCell, colTypes, modCell, colTypeMods, collationCell, colCollations) + { + Oid type = lfirst_oid(typeCell); + Oid typeMod = lfirst_oid(modCell); + Oid collation = lfirst_oid(collationCell); + + Var *targetColumn = makeVar(valuesRangeTableRef->rtindex, colIndex , type, typeMod,collation, 0); + /* create a dummy target entry */ + TargetEntry *newTargetEntry = makeTargetEntry((Expr *) targetColumn, colIndex , "test", + false); + + subQuery->targetList = lappend(subQuery->targetList, newTargetEntry); + colIndex++; + } + +// forboth()(targetCell, valuesRTE->coltypes) +// { +// TargetEntry *targetEntry = (TargetEntry *) lfirst(targetCell); +// +// Var *targetColumn = makeVar(valuesRangeTableRef->rtindex, colIndex , exprType((Node *)targetEntry->expr), exprTypmod((Node *)targetEntry->expr), +// exprCollation((Node *)targetEntry->expr), 0); +// +// /* create a dummy target entry */ +// TargetEntry *newTargetEntry = makeTargetEntry((Expr *) targetColumn, colIndex , targetEntry->resname, +// false); +// subQuery->targetList = lappend(subQuery->targetList, newTargetEntry); +// colIndex++; +// } + + Query *topQuery = makeNode(Query); + topQuery->commandType = CMD_SELECT; + + RangeTblEntry *subqueryRte = makeNode(RangeTblEntry); + subqueryRte->rtekind = RTE_SUBQUERY; + subqueryRte->subquery = subQuery; + subqueryRte->alias = valuesRTE->alias; + subqueryRte->eref = valuesRTE->alias; + subqueryRte->inFromCl = true; + + RangeTblRef *subqueryRangeTableRef = makeNode(RangeTblRef); + subqueryRangeTableRef->rtindex = 1; + + topQuery->jointree = makeFromExpr(list_make1(subqueryRangeTableRef), NULL); + + /* set the FROM expression to the subquery */ + topQuery->rtable = list_make1(subqueryRte); + + topQuery->targetList = copyObject(subQuery->targetList); + + return topQuery; +} /* * ExtractRangeTableEntryList is a wrapper around ExtractRangeTableEntryWalker. @@ -612,9 +786,17 @@ PlanFastPathDistributedStmt(DistributedPlanningContext *planContext, fastPathContext->distributionKeyHasParam = true; } + + if (IsMultiRowInsert(planContext->originalQuery)) + { + + + } + planContext->plan = FastPathPlanner(planContext->originalQuery, planContext->query, planContext->boundParams); + return CreateDistributedPlannedStmt(planContext); } diff --git a/src/backend/distributed/planner/fast_path_router_planner.c b/src/backend/distributed/planner/fast_path_router_planner.c index 70d813bf5..433ad44e9 100644 --- a/src/backend/distributed/planner/fast_path_router_planner.c +++ b/src/backend/distributed/planner/fast_path_router_planner.c @@ -184,6 +184,10 @@ FastPathRouterQuery(Query *query, Node **distributionKeyValue) /* we don't support INSERT..SELECT in the fast-path */ return false; } + else if (IsMultiRowInsert(query)) + { + return false; + } else if (query->commandType == CMD_INSERT) { /* we don't need to do any further checks, all INSERTs are fast-path */