mirror of https://github.com/citusdata/citus.git
add command execution
parent
092cc7c01a
commit
4799f3cdf0
|
@ -69,7 +69,8 @@ static Query *
|
||||||
ConvertToInsertSelect(Query *parse);
|
ConvertToInsertSelect(Query *parse);
|
||||||
static Query *
|
static Query *
|
||||||
WrapRteValuesnIntoSubquery(RangeTblEntry *valuesRTE, List *targetList);
|
WrapRteValuesnIntoSubquery(RangeTblEntry *valuesRTE, List *targetList);
|
||||||
|
Query *
|
||||||
|
GetSingleValue(Query *insertSelectQuery);
|
||||||
|
|
||||||
static List *plannerRestrictionContextList = NIL;
|
static List *plannerRestrictionContextList = NIL;
|
||||||
int MultiTaskQueryLogLevel = CITUS_LOG_LEVEL_OFF; /* multi-task query log level */
|
int MultiTaskQueryLogLevel = CITUS_LOG_LEVEL_OFF; /* multi-task query log level */
|
||||||
|
@ -221,8 +222,9 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
||||||
rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter);
|
rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter);
|
||||||
if (IsMultiRowInsert(parse))
|
if (IsMultiRowInsert(parse))
|
||||||
{
|
{
|
||||||
planContext.originalQuery = copyObject(parse);
|
planContext.convertedQuery = ConvertToInsertSelect(parse);
|
||||||
planContext.originalQuery = ConvertToInsertSelect(parse);
|
|
||||||
|
planContext.originalQuery = GetSingleValue(planContext.convertedQuery);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -394,6 +396,31 @@ ConvertToInsertSelect(Query *parse)
|
||||||
return insertSelectQuery;
|
return insertSelectQuery;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Query *
|
||||||
|
GetSingleValue(Query *insertSelectQuery)
|
||||||
|
{
|
||||||
|
RangeTblEntry *subqueryRte = ExtractSelectRangeTableEntry(insertSelectQuery);
|
||||||
|
|
||||||
|
subqueryRte = linitial(subqueryRte->subquery->rtable);
|
||||||
|
RangeTblEntry *valuesRte = linitial(subqueryRte->subquery->rtable);
|
||||||
|
|
||||||
|
List *valuesLists = valuesRte->values_lists;
|
||||||
|
Node *firstValue = linitial(valuesLists);
|
||||||
|
|
||||||
|
valuesRte->values_lists = list_make1(firstValue);
|
||||||
|
|
||||||
|
Query *copyQuery = copyObject(insertSelectQuery);
|
||||||
|
|
||||||
|
valuesRte->values_lists = valuesLists;
|
||||||
|
|
||||||
|
RangeTblEntry *copySubqueryRte = ExtractSelectRangeTableEntry(copyQuery);
|
||||||
|
|
||||||
|
RangeTblEntry *valuesRteInCopy = linitial(copySubqueryRte->subquery->rtable);
|
||||||
|
valuesRteInCopy->values_lists = list_make1(firstValue);
|
||||||
|
|
||||||
|
return copyQuery;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* WrapRteRelationIntoSubquery wraps the given relation range table entry
|
* WrapRteRelationIntoSubquery wraps the given relation range table entry
|
||||||
* in a newly constructed "(SELECT * FROM table_name as anchor_relation)" query.
|
* in a newly constructed "(SELECT * FROM table_name as anchor_relation)" query.
|
||||||
|
@ -423,7 +450,6 @@ WrapRteValuesnIntoSubquery(RangeTblEntry *valuesRTE, List *targetList)
|
||||||
ListCell *collationCell = NULL;
|
ListCell *collationCell = NULL;
|
||||||
|
|
||||||
int colIndex = 1;
|
int colIndex = 1;
|
||||||
elog(INFO, "colTypes: %s", nodeToString(colTypes));
|
|
||||||
forthree(typeCell, colTypes, modCell, colTypeMods, collationCell, colCollations)
|
forthree(typeCell, colTypes, modCell, colTypeMods, collationCell, colCollations)
|
||||||
{
|
{
|
||||||
Oid type = lfirst_oid(typeCell);
|
Oid type = lfirst_oid(typeCell);
|
||||||
|
@ -882,8 +908,9 @@ CreateDistributedPlannedStmt(DistributedPlanningContext *planContext)
|
||||||
planContext->plannerRestrictionContext->joinRestrictionContext =
|
planContext->plannerRestrictionContext->joinRestrictionContext =
|
||||||
RemoveDuplicateJoinRestrictions(joinRestrictionContext);
|
RemoveDuplicateJoinRestrictions(joinRestrictionContext);
|
||||||
|
|
||||||
|
Query *query = planContext->convertedQuery ? planContext->convertedQuery : planContext->originalQuery;
|
||||||
DistributedPlan *distributedPlan =
|
DistributedPlan *distributedPlan =
|
||||||
CreateDistributedPlan(planId, planContext->originalQuery, planContext->query,
|
CreateDistributedPlan(planId, query, planContext->query,
|
||||||
planContext->boundParams,
|
planContext->boundParams,
|
||||||
hasUnresolvedParams,
|
hasUnresolvedParams,
|
||||||
planContext->plannerRestrictionContext);
|
planContext->plannerRestrictionContext);
|
||||||
|
@ -1120,6 +1147,7 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
distributedPlan =
|
distributedPlan =
|
||||||
CreateInsertSelectPlan(planId, originalQuery, plannerRestrictionContext);
|
CreateInsertSelectPlan(planId, originalQuery, plannerRestrictionContext);
|
||||||
}
|
}
|
||||||
|
|
|
@ -159,6 +159,8 @@ typedef struct DistributedPlanningContext
|
||||||
|
|
||||||
/* Our custom restriction context */
|
/* Our custom restriction context */
|
||||||
PlannerRestrictionContext *plannerRestrictionContext;
|
PlannerRestrictionContext *plannerRestrictionContext;
|
||||||
|
|
||||||
|
Query *convertedQuery;
|
||||||
} DistributedPlanningContext;
|
} DistributedPlanningContext;
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue