mirror of https://github.com/citusdata/citus.git
add command execution
parent
105de7beb8
commit
092cc7c01a
|
@ -65,6 +65,10 @@
|
|||
#include "utils/lsyscache.h"
|
||||
#include "utils/memutils.h"
|
||||
#include "utils/syscache.h"
|
||||
static Query *
|
||||
ConvertToInsertSelect(Query *parse);
|
||||
static Query *
|
||||
WrapRteValuesnIntoSubquery(RangeTblEntry *valuesRTE, List *targetList);
|
||||
|
||||
|
||||
static List *plannerRestrictionContextList = NIL;
|
||||
|
@ -143,6 +147,12 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
|||
|
||||
List *rangeTableList = ExtractRangeTableEntryList(parse);
|
||||
|
||||
DistributedPlanningContext planContext = {
|
||||
.query = parse,
|
||||
.cursorOptions = cursorOptions,
|
||||
.boundParams = boundParams,
|
||||
};
|
||||
|
||||
if (cursorOptions & CURSOR_OPT_FORCE_DISTRIBUTED)
|
||||
{
|
||||
/* this cursor flag could only be set when Citus has been loaded */
|
||||
|
@ -175,12 +185,6 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
|||
|
||||
int rteIdCounter = 1;
|
||||
|
||||
DistributedPlanningContext planContext = {
|
||||
.query = parse,
|
||||
.cursorOptions = cursorOptions,
|
||||
.boundParams = boundParams,
|
||||
};
|
||||
|
||||
if (fastPathRouterQuery)
|
||||
{
|
||||
/*
|
||||
|
@ -215,7 +219,15 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
|||
* to be present in the copied query too.
|
||||
*/
|
||||
rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter);
|
||||
if (IsMultiRowInsert(parse))
|
||||
{
|
||||
planContext.originalQuery = copyObject(parse);
|
||||
planContext.originalQuery = ConvertToInsertSelect(parse);
|
||||
}
|
||||
else
|
||||
{
|
||||
planContext.originalQuery = copyObject(parse);
|
||||
}
|
||||
|
||||
bool setPartitionedTablesInherited = false;
|
||||
AdjustPartitioningForDistributedPlanning(rangeTableList,
|
||||
|
@ -301,6 +313,168 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
|||
return result;
|
||||
}
|
||||
|
||||
#include "parser/parse_node.h"
|
||||
#include "parser/parse_relation.h"
|
||||
|
||||
static Query *
|
||||
ConvertToInsertSelect(Query *parse)
|
||||
{
|
||||
|
||||
RangeTblEntry *rteValus = ExtractDistributedInsertValuesRTE(parse);
|
||||
List *rowValuesList = rteValus->values_lists;
|
||||
List *colnames = NIL;
|
||||
|
||||
ListCell *workerTargetCell = NULL;
|
||||
foreach(workerTargetCell, parse->targetList)
|
||||
{
|
||||
TargetEntry *workerTargetEntry = (TargetEntry *) lfirst(workerTargetCell);
|
||||
Value *colName = makeString(workerTargetEntry->resname);
|
||||
|
||||
colnames = lappend(colnames, colName);
|
||||
}
|
||||
|
||||
/* create range table entries */
|
||||
ParseState *pstate = make_parsestate(NULL);
|
||||
Alias *selectAlias = makeAlias("multi_row_insert", colnames);
|
||||
RangeTblEntry *rteValuesRangeTableEntry =
|
||||
addRangeTableEntryForValues(pstate, rowValuesList,NIL, NIL, NIL, selectAlias,
|
||||
false, true);
|
||||
|
||||
rteValuesRangeTableEntry->coltypes = rteValus->coltypes;
|
||||
rteValuesRangeTableEntry->coltypmods = rteValus->coltypmods;
|
||||
rteValuesRangeTableEntry->colcollations = rteValus->colcollations;
|
||||
|
||||
Query *subQuery = WrapRteValuesnIntoSubquery(rteValuesRangeTableEntry, parse->targetList);
|
||||
|
||||
// StringInfo buf1 = makeStringInfo();
|
||||
// deparse_shard_query(subQuery, 0, 0, buf1);
|
||||
// elog(DEBUG1, "subQuery:%s", buf1->data);
|
||||
// elog(DEBUG1, "subQuery:%s", nodeToString(subQuery));
|
||||
|
||||
Query *insertSelectQuery = makeNode(Query);
|
||||
insertSelectQuery->commandType = CMD_INSERT;
|
||||
|
||||
insertSelectQuery->targetList = copyObject(parse->targetList);
|
||||
|
||||
|
||||
RangeTblEntry *targetRelationRte = makeNode(RangeTblEntry);
|
||||
targetRelationRte->rtekind = RTE_RELATION;
|
||||
targetRelationRte->relid = ExtractFirstCitusTableId(parse);
|
||||
targetRelationRte->relkind = get_rel_relkind(targetRelationRte->relid);
|
||||
targetRelationRte->alias = selectAlias;
|
||||
targetRelationRte->eref = selectAlias;
|
||||
|
||||
RangeTblEntry *subqueryRte = makeNode(RangeTblEntry);
|
||||
subqueryRte->rtekind = RTE_SUBQUERY;
|
||||
subqueryRte->subquery = subQuery;
|
||||
subqueryRte->alias = selectAlias;
|
||||
subqueryRte->eref = selectAlias;
|
||||
subqueryRte->inFromCl = true;
|
||||
|
||||
RangeTblRef *targetRelationRangeTableRef = makeNode(RangeTblRef);
|
||||
targetRelationRangeTableRef->rtindex = 1;
|
||||
|
||||
RangeTblRef *subqueryRangeTableRef = makeNode(RangeTblRef);
|
||||
subqueryRangeTableRef->rtindex = 2;
|
||||
|
||||
insertSelectQuery->jointree = makeFromExpr(list_make1(subqueryRangeTableRef), NULL);
|
||||
|
||||
/* set the FROM expression to the subquery */
|
||||
insertSelectQuery->rtable = list_make2(targetRelationRte, subqueryRte);
|
||||
insertSelectQuery->resultRelation = targetRelationRangeTableRef->rtindex;
|
||||
|
||||
|
||||
StringInfo buf = makeStringInfo();
|
||||
deparse_shard_query(insertSelectQuery, 0, 0, buf);
|
||||
|
||||
insertSelectQuery->returningList = parse->returningList;
|
||||
insertSelectQuery->onConflict = parse->onConflict;
|
||||
|
||||
|
||||
return insertSelectQuery;
|
||||
}
|
||||
|
||||
/*
|
||||
* WrapRteRelationIntoSubquery wraps the given relation range table entry
|
||||
* in a newly constructed "(SELECT * FROM table_name as anchor_relation)" query.
|
||||
*
|
||||
* Note that the query returned by this function does not contain any filters or
|
||||
* projections. The returned query should be used cautiosly and it is mostly
|
||||
* designed for generating a stub query.
|
||||
*/
|
||||
static Query *
|
||||
WrapRteValuesnIntoSubquery(RangeTblEntry *valuesRTE, List *targetList)
|
||||
{
|
||||
Query *subQuery = makeNode(Query);
|
||||
subQuery->commandType = CMD_SELECT;
|
||||
|
||||
RangeTblRef *valuesRangeTableRef = makeNode(RangeTblRef);
|
||||
valuesRangeTableRef->rtindex = 1;
|
||||
|
||||
subQuery->jointree = makeFromExpr(list_make1(valuesRangeTableRef), NULL);
|
||||
subQuery->rtable = list_make1(valuesRTE);
|
||||
|
||||
List *colTypes = valuesRTE->coltypes;
|
||||
List *colTypeMods = valuesRTE->coltypmods;
|
||||
List *colCollations = valuesRTE->colcollations;
|
||||
|
||||
ListCell *typeCell = NULL;
|
||||
ListCell *modCell = NULL;
|
||||
ListCell *collationCell = NULL;
|
||||
|
||||
int colIndex = 1;
|
||||
elog(INFO, "colTypes: %s", nodeToString(colTypes));
|
||||
forthree(typeCell, colTypes, modCell, colTypeMods, collationCell, colCollations)
|
||||
{
|
||||
Oid type = lfirst_oid(typeCell);
|
||||
Oid typeMod = lfirst_oid(modCell);
|
||||
Oid collation = lfirst_oid(collationCell);
|
||||
|
||||
Var *targetColumn = makeVar(valuesRangeTableRef->rtindex, colIndex , type, typeMod,collation, 0);
|
||||
/* create a dummy target entry */
|
||||
TargetEntry *newTargetEntry = makeTargetEntry((Expr *) targetColumn, colIndex , "test",
|
||||
false);
|
||||
|
||||
subQuery->targetList = lappend(subQuery->targetList, newTargetEntry);
|
||||
colIndex++;
|
||||
}
|
||||
|
||||
// forboth()(targetCell, valuesRTE->coltypes)
|
||||
// {
|
||||
// TargetEntry *targetEntry = (TargetEntry *) lfirst(targetCell);
|
||||
//
|
||||
// Var *targetColumn = makeVar(valuesRangeTableRef->rtindex, colIndex , exprType((Node *)targetEntry->expr), exprTypmod((Node *)targetEntry->expr),
|
||||
// exprCollation((Node *)targetEntry->expr), 0);
|
||||
//
|
||||
// /* create a dummy target entry */
|
||||
// TargetEntry *newTargetEntry = makeTargetEntry((Expr *) targetColumn, colIndex , targetEntry->resname,
|
||||
// false);
|
||||
// subQuery->targetList = lappend(subQuery->targetList, newTargetEntry);
|
||||
// colIndex++;
|
||||
// }
|
||||
|
||||
Query *topQuery = makeNode(Query);
|
||||
topQuery->commandType = CMD_SELECT;
|
||||
|
||||
RangeTblEntry *subqueryRte = makeNode(RangeTblEntry);
|
||||
subqueryRte->rtekind = RTE_SUBQUERY;
|
||||
subqueryRte->subquery = subQuery;
|
||||
subqueryRte->alias = valuesRTE->alias;
|
||||
subqueryRte->eref = valuesRTE->alias;
|
||||
subqueryRte->inFromCl = true;
|
||||
|
||||
RangeTblRef *subqueryRangeTableRef = makeNode(RangeTblRef);
|
||||
subqueryRangeTableRef->rtindex = 1;
|
||||
|
||||
topQuery->jointree = makeFromExpr(list_make1(subqueryRangeTableRef), NULL);
|
||||
|
||||
/* set the FROM expression to the subquery */
|
||||
topQuery->rtable = list_make1(subqueryRte);
|
||||
|
||||
topQuery->targetList = copyObject(subQuery->targetList);
|
||||
|
||||
return topQuery;
|
||||
}
|
||||
|
||||
/*
|
||||
* ExtractRangeTableEntryList is a wrapper around ExtractRangeTableEntryWalker.
|
||||
|
@ -612,9 +786,17 @@ PlanFastPathDistributedStmt(DistributedPlanningContext *planContext,
|
|||
fastPathContext->distributionKeyHasParam = true;
|
||||
}
|
||||
|
||||
|
||||
if (IsMultiRowInsert(planContext->originalQuery))
|
||||
{
|
||||
|
||||
|
||||
}
|
||||
|
||||
planContext->plan = FastPathPlanner(planContext->originalQuery, planContext->query,
|
||||
planContext->boundParams);
|
||||
|
||||
|
||||
return CreateDistributedPlannedStmt(planContext);
|
||||
}
|
||||
|
||||
|
|
|
@ -184,6 +184,10 @@ FastPathRouterQuery(Query *query, Node **distributionKeyValue)
|
|||
/* we don't support INSERT..SELECT in the fast-path */
|
||||
return false;
|
||||
}
|
||||
else if (IsMultiRowInsert(query))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
else if (query->commandType == CMD_INSERT)
|
||||
{
|
||||
/* we don't need to do any further checks, all INSERTs are fast-path */
|
||||
|
|
Loading…
Reference in New Issue