mirror of https://github.com/citusdata/citus.git
Use better way to extract INSERT/SELECT RTEs
parent
de3a16405f
commit
94130b76ef
|
@ -79,6 +79,8 @@ static Task * RouterModifyTaskForShardInterval(Query *originalQuery,
|
||||||
RelationRestrictionContext *
|
RelationRestrictionContext *
|
||||||
restrictionContext,
|
restrictionContext,
|
||||||
uint32 taskIdIndex);
|
uint32 taskIdIndex);
|
||||||
|
static RangeTblEntry * ExtractSelectRangeTableEntry(Query *query);
|
||||||
|
static RangeTblEntry * ExtractInsertRangeTableEntry(Query *query);
|
||||||
static bool MasterIrreducibleExpression(Node *expression, bool *varArgument,
|
static bool MasterIrreducibleExpression(Node *expression, bool *varArgument,
|
||||||
bool *badCoalesce);
|
bool *badCoalesce);
|
||||||
static bool MasterIrreducibleExpressionWalker(Node *expression, WalkerState *state);
|
static bool MasterIrreducibleExpressionWalker(Node *expression, WalkerState *state);
|
||||||
|
@ -140,7 +142,7 @@ MultiRouterPlanCreate(Query *originalQuery, Query *query,
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (InsertSelectQuery(query))
|
if (InsertSelectQuery(originalQuery))
|
||||||
{
|
{
|
||||||
multiPlan = CreateMultiTaskRouterPlan(originalQuery, restrictionContext);
|
multiPlan = CreateMultiTaskRouterPlan(originalQuery, restrictionContext);
|
||||||
}
|
}
|
||||||
|
@ -223,8 +225,8 @@ CreateMultiTaskRouterPlan(Query *originalQuery,
|
||||||
Job *workerJob = NULL;
|
Job *workerJob = NULL;
|
||||||
uint64 jobId = INVALID_JOB_ID;
|
uint64 jobId = INVALID_JOB_ID;
|
||||||
MultiPlan *multiPlan = NULL;
|
MultiPlan *multiPlan = NULL;
|
||||||
RangeTblEntry *insertRte = linitial(originalQuery->rtable);
|
RangeTblEntry *insertRte = ExtractInsertRangeTableEntry(originalQuery);
|
||||||
RangeTblEntry *subqueryRte = lsecond(originalQuery->rtable);
|
RangeTblEntry *subqueryRte = ExtractSelectRangeTableEntry(originalQuery);
|
||||||
Oid targetRelationId = insertRte->relid;
|
Oid targetRelationId = insertRte->relid;
|
||||||
DistTableCacheEntry *targetCacheEntry = DistributedTableCacheEntry(targetRelationId);
|
DistTableCacheEntry *targetCacheEntry = DistributedTableCacheEntry(targetRelationId);
|
||||||
int shardCount = targetCacheEntry->shardIntervalArrayLength;
|
int shardCount = targetCacheEntry->shardIntervalArrayLength;
|
||||||
|
@ -295,8 +297,8 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
|
||||||
uint32 taskIdIndex)
|
uint32 taskIdIndex)
|
||||||
{
|
{
|
||||||
Query *copiedQuery = copyObject(originalQuery);
|
Query *copiedQuery = copyObject(originalQuery);
|
||||||
RangeTblEntry *copiedInsertRte = linitial(copiedQuery->rtable);
|
RangeTblEntry *copiedInsertRte = ExtractInsertRangeTableEntry(copiedQuery);
|
||||||
RangeTblEntry *copiedSubqueryRte = lsecond(copiedQuery->rtable);
|
RangeTblEntry *copiedSubqueryRte = ExtractSelectRangeTableEntry(copiedQuery);
|
||||||
Query *copiedSubquery = (Query *) copiedSubqueryRte->subquery;
|
Query *copiedSubquery = (Query *) copiedSubqueryRte->subquery;
|
||||||
|
|
||||||
uint64 shardId = shardInterval->shardId;
|
uint64 shardId = shardInterval->shardId;
|
||||||
|
@ -397,6 +399,48 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ExtractSelectRangeTableEntry returns the range table entry of the subquery.
|
||||||
|
* Note that the function expects and asserts that the input query be
|
||||||
|
* an INSERT...SELECT query.
|
||||||
|
*/
|
||||||
|
static RangeTblEntry *
|
||||||
|
ExtractSelectRangeTableEntry(Query *query)
|
||||||
|
{
|
||||||
|
Assert(InsertSelectQuery(query));
|
||||||
|
|
||||||
|
List *fromList = query->jointree->fromlist;
|
||||||
|
RangeTblRef *reference = NULL;
|
||||||
|
RangeTblEntry *subqueryRte = NULL;
|
||||||
|
|
||||||
|
/* since we already aseerted InsertSelectQuery() it is safe to access list */
|
||||||
|
reference = linitial(fromList);
|
||||||
|
Assert(IsA(reference, RangeTblRef));
|
||||||
|
|
||||||
|
subqueryRte = rt_fetch(reference->rtindex, query->rtable);
|
||||||
|
|
||||||
|
return subqueryRte;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ExtractInsertRangeTableEntry returns the INSERT'ed table's range table entry.
|
||||||
|
* Note that the function expects and asserts that the input query be
|
||||||
|
* an INSERT...SELECT query.
|
||||||
|
*/
|
||||||
|
static RangeTblEntry *
|
||||||
|
ExtractInsertRangeTableEntry(Query *query)
|
||||||
|
{
|
||||||
|
AssertArg(InsertSelectQuery(query));
|
||||||
|
|
||||||
|
int resultRelation = query->resultRelation;
|
||||||
|
List *rangeTableList = query->rtable;
|
||||||
|
RangeTblEntry *insertRTE = rt_fetch(resultRelation, rangeTableList);
|
||||||
|
|
||||||
|
return insertRTE;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ErrorIfInsertSelectQueryNotSupported errors out for unsupported
|
* ErrorIfInsertSelectQueryNotSupported errors out for unsupported
|
||||||
* INSERT ... SELECT queries.
|
* INSERT ... SELECT queries.
|
||||||
|
@ -605,8 +649,7 @@ AddHiddenPartitionColumnEqualityQual(Query *originalQuery)
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* TODO: once CTEs are present, this does not work */
|
subqueryEntry = ExtractSelectRangeTableEntry(originalQuery);
|
||||||
subqueryEntry = (RangeTblEntry *) list_nth(originalQuery->rtable, 1);
|
|
||||||
subquery = subqueryEntry->subquery;
|
subquery = subqueryEntry->subquery;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -2302,29 +2345,48 @@ ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte,
|
||||||
/*
|
/*
|
||||||
* InsertSelectQuery returns true when the input query
|
* InsertSelectQuery returns true when the input query
|
||||||
* is INSERT INTO ... SELECT kind of query.
|
* is INSERT INTO ... SELECT kind of query.
|
||||||
|
*
|
||||||
|
* Note that the input query should be the original parsetree of
|
||||||
|
* the query (i.e., not passed trough the standard planner).
|
||||||
|
*
|
||||||
|
* This function is inspired from getInsertSelectQuery() on
|
||||||
|
* rewrite/rewriteManip.c.
|
||||||
*/
|
*/
|
||||||
bool
|
bool
|
||||||
InsertSelectQuery(Query *query)
|
InsertSelectQuery(Query *query)
|
||||||
{
|
{
|
||||||
CmdType commandType = query->commandType;
|
CmdType commandType = query->commandType;
|
||||||
List *rangeTableList = query->rtable;
|
List *fromList = NULL;
|
||||||
|
RangeTblRef *rangeTableReference = NULL;
|
||||||
RangeTblEntry *subqueryRte = NULL;
|
RangeTblEntry *subqueryRte = NULL;
|
||||||
Query *subquery = NULL;
|
Query *selectQuery = NULL;
|
||||||
|
|
||||||
if (commandType != CMD_INSERT)
|
if (commandType != CMD_INSERT)
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
rangeTableList = query->rtable;
|
if (query->jointree == NULL || !IsA(query->jointree, FromExpr))
|
||||||
if (list_length(rangeTableList) < 2)
|
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
subqueryRte = lsecond(query->rtable);
|
fromList = query->jointree->fromlist;
|
||||||
subquery = subqueryRte->subquery;
|
if (list_length(fromList) != 1)
|
||||||
if (subquery == NULL)
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
rangeTableReference = linitial(fromList);
|
||||||
|
if (!IsA(rangeTableReference, RangeTblRef))
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
subqueryRte = rt_fetch(rangeTableReference->rtindex, query->rtable);
|
||||||
|
selectQuery = subqueryRte->subquery;
|
||||||
|
if (!(selectQuery && IsA(selectQuery, Query) &&
|
||||||
|
selectQuery->commandType == CMD_SELECT))
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue