mirror of https://github.com/citusdata/citus.git
Refactor CreateMultiTaskRouterPlan not pass query
parent
11d0476153
commit
de3a16405f
|
@ -71,10 +71,10 @@ typedef struct WalkerState
|
||||||
static MultiPlan * CreateSingleTaskRouterPlan(Query *originalQuery, Query *query,
|
static MultiPlan * CreateSingleTaskRouterPlan(Query *originalQuery, Query *query,
|
||||||
RelationRestrictionContext *
|
RelationRestrictionContext *
|
||||||
restrictionContext);
|
restrictionContext);
|
||||||
static MultiPlan * CreateMultiTaskRouterPlan(Query *originalQuery, Query *query,
|
static MultiPlan * CreateMultiTaskRouterPlan(Query *originalQuery,
|
||||||
RelationRestrictionContext *
|
RelationRestrictionContext *
|
||||||
restrictionContext);
|
restrictionContext);
|
||||||
static Task * RouterModifyTaskForShardInterval(Query *originalQuery, Query *query,
|
static Task * RouterModifyTaskForShardInterval(Query *originalQuery,
|
||||||
ShardInterval *shardInterval,
|
ShardInterval *shardInterval,
|
||||||
RelationRestrictionContext *
|
RelationRestrictionContext *
|
||||||
restrictionContext,
|
restrictionContext,
|
||||||
|
@ -142,7 +142,7 @@ MultiRouterPlanCreate(Query *originalQuery, Query *query,
|
||||||
|
|
||||||
if (InsertSelectQuery(query))
|
if (InsertSelectQuery(query))
|
||||||
{
|
{
|
||||||
multiPlan = CreateMultiTaskRouterPlan(originalQuery, query, restrictionContext);
|
multiPlan = CreateMultiTaskRouterPlan(originalQuery, restrictionContext);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -214,7 +214,7 @@ CreateSingleTaskRouterPlan(Query *originalQuery, Query *query,
|
||||||
* The function never returns NULL, it errors out if cannot create the multi plan.
|
* The function never returns NULL, it errors out if cannot create the multi plan.
|
||||||
*/
|
*/
|
||||||
static MultiPlan *
|
static MultiPlan *
|
||||||
CreateMultiTaskRouterPlan(Query *originalQuery, Query *query,
|
CreateMultiTaskRouterPlan(Query *originalQuery,
|
||||||
RelationRestrictionContext *restrictionContext)
|
RelationRestrictionContext *restrictionContext)
|
||||||
{
|
{
|
||||||
int shardOffset = 0;
|
int shardOffset = 0;
|
||||||
|
@ -223,8 +223,8 @@ CreateMultiTaskRouterPlan(Query *originalQuery, Query *query,
|
||||||
Job *workerJob = NULL;
|
Job *workerJob = NULL;
|
||||||
uint64 jobId = INVALID_JOB_ID;
|
uint64 jobId = INVALID_JOB_ID;
|
||||||
MultiPlan *multiPlan = NULL;
|
MultiPlan *multiPlan = NULL;
|
||||||
RangeTblEntry *insertRte = linitial(query->rtable);
|
RangeTblEntry *insertRte = linitial(originalQuery->rtable);
|
||||||
RangeTblEntry *subqueryRte = lsecond(query->rtable);
|
RangeTblEntry *subqueryRte = lsecond(originalQuery->rtable);
|
||||||
Oid targetRelationId = insertRte->relid;
|
Oid targetRelationId = insertRte->relid;
|
||||||
DistTableCacheEntry *targetCacheEntry = DistributedTableCacheEntry(targetRelationId);
|
DistTableCacheEntry *targetCacheEntry = DistributedTableCacheEntry(targetRelationId);
|
||||||
int shardCount = targetCacheEntry->shardIntervalArrayLength;
|
int shardCount = targetCacheEntry->shardIntervalArrayLength;
|
||||||
|
@ -247,10 +247,8 @@ CreateMultiTaskRouterPlan(Query *originalQuery, Query *query,
|
||||||
targetCacheEntry->sortedShardIntervalArray[shardOffset];
|
targetCacheEntry->sortedShardIntervalArray[shardOffset];
|
||||||
Task *modifyTask = NULL;
|
Task *modifyTask = NULL;
|
||||||
|
|
||||||
modifyTask = RouterModifyTaskForShardInterval(originalQuery, query,
|
modifyTask = RouterModifyTaskForShardInterval(originalQuery, targetShardInterval,
|
||||||
targetShardInterval,
|
restrictionContext, taskIdIndex);
|
||||||
restrictionContext,
|
|
||||||
taskIdIndex);
|
|
||||||
|
|
||||||
/* add the task if it could be created */
|
/* add the task if it could be created */
|
||||||
if (modifyTask != NULL)
|
if (modifyTask != NULL)
|
||||||
|
@ -292,14 +290,10 @@ CreateMultiTaskRouterPlan(Query *originalQuery, Query *query,
|
||||||
* subqueries with non euqi-joins.).
|
* subqueries with non euqi-joins.).
|
||||||
*/
|
*/
|
||||||
static Task *
|
static Task *
|
||||||
RouterModifyTaskForShardInterval(Query *originalQuery, Query *query,
|
RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInterval,
|
||||||
ShardInterval *shardInterval,
|
|
||||||
RelationRestrictionContext *restrictionContext,
|
RelationRestrictionContext *restrictionContext,
|
||||||
uint32 taskIdIndex)
|
uint32 taskIdIndex)
|
||||||
{
|
{
|
||||||
RangeTblEntry *subqueryRte = lsecond(query->rtable);
|
|
||||||
Query *subquery = subqueryRte->subquery;
|
|
||||||
|
|
||||||
Query *copiedQuery = copyObject(originalQuery);
|
Query *copiedQuery = copyObject(originalQuery);
|
||||||
RangeTblEntry *copiedInsertRte = linitial(copiedQuery->rtable);
|
RangeTblEntry *copiedInsertRte = linitial(copiedQuery->rtable);
|
||||||
RangeTblEntry *copiedSubqueryRte = lsecond(copiedQuery->rtable);
|
RangeTblEntry *copiedSubqueryRte = lsecond(copiedQuery->rtable);
|
||||||
|
|
Loading…
Reference in New Issue