From 4799f3cdf056248729f8b84726b898e31179f5df Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Fri, 8 May 2020 12:21:48 +0200 Subject: [PATCH] add command execution --- .../distributed/planner/distributed_planner.c | 38 ++++++++++++++++--- src/include/distributed/distributed_planner.h | 2 + 2 files changed, 35 insertions(+), 5 deletions(-) diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 86c65d325..f904fb2cc 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -69,7 +69,8 @@ static Query * ConvertToInsertSelect(Query *parse); static Query * WrapRteValuesnIntoSubquery(RangeTblEntry *valuesRTE, List *targetList); - +Query * +GetSingleValue(Query *insertSelectQuery); static List *plannerRestrictionContextList = NIL; int MultiTaskQueryLogLevel = CITUS_LOG_LEVEL_OFF; /* multi-task query log level */ @@ -221,8 +222,9 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter); if (IsMultiRowInsert(parse)) { - planContext.originalQuery = copyObject(parse); - planContext.originalQuery = ConvertToInsertSelect(parse); + planContext.convertedQuery = ConvertToInsertSelect(parse); + + planContext.originalQuery = GetSingleValue(planContext.convertedQuery); } else { @@ -394,6 +396,31 @@ ConvertToInsertSelect(Query *parse) return insertSelectQuery; } +Query * +GetSingleValue(Query *insertSelectQuery) +{ + RangeTblEntry *subqueryRte = ExtractSelectRangeTableEntry(insertSelectQuery); + + subqueryRte = linitial(subqueryRte->subquery->rtable); + RangeTblEntry *valuesRte = linitial(subqueryRte->subquery->rtable); + + List *valuesLists = valuesRte->values_lists; + Node *firstValue = linitial(valuesLists); + + valuesRte->values_lists = list_make1(firstValue); + + Query *copyQuery = copyObject(insertSelectQuery); + + valuesRte->values_lists = valuesLists; + + RangeTblEntry *copySubqueryRte = ExtractSelectRangeTableEntry(copyQuery); + + RangeTblEntry *valuesRteInCopy = linitial(copySubqueryRte->subquery->rtable); + valuesRteInCopy->values_lists = list_make1(firstValue); + + return copyQuery; +} + /* * WrapRteRelationIntoSubquery wraps the given relation range table entry * in a newly constructed "(SELECT * FROM table_name as anchor_relation)" query. @@ -423,7 +450,6 @@ WrapRteValuesnIntoSubquery(RangeTblEntry *valuesRTE, List *targetList) ListCell *collationCell = NULL; int colIndex = 1; -elog(INFO, "colTypes: %s", nodeToString(colTypes)); forthree(typeCell, colTypes, modCell, colTypeMods, collationCell, colCollations) { Oid type = lfirst_oid(typeCell); @@ -882,8 +908,9 @@ CreateDistributedPlannedStmt(DistributedPlanningContext *planContext) planContext->plannerRestrictionContext->joinRestrictionContext = RemoveDuplicateJoinRestrictions(joinRestrictionContext); + Query *query = planContext->convertedQuery ? planContext->convertedQuery : planContext->originalQuery; DistributedPlan *distributedPlan = - CreateDistributedPlan(planId, planContext->originalQuery, planContext->query, + CreateDistributedPlan(planId, query, planContext->query, planContext->boundParams, hasUnresolvedParams, planContext->plannerRestrictionContext); @@ -1120,6 +1147,7 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi return NULL; } + distributedPlan = CreateInsertSelectPlan(planId, originalQuery, plannerRestrictionContext); } diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index f08ebd094..8657b11f4 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -159,6 +159,8 @@ typedef struct DistributedPlanningContext /* Our custom restriction context */ PlannerRestrictionContext *plannerRestrictionContext; + + Query *convertedQuery; } DistributedPlanningContext;