Move requiresMasterEvaluation from Task to Job

pull/855/head
Marco Slot 2016-10-06 00:17:48 +02:00
parent 213d8419c6
commit 9d98acfb6d
6 changed files with 13 additions and 8 deletions

View File

@ -91,6 +91,7 @@ static HTAB * CreateXactParticipantHash(void);
static bool ExecuteTaskAndStoreResults(QueryDesc *queryDesc, static bool ExecuteTaskAndStoreResults(QueryDesc *queryDesc,
Task *task, Task *task,
bool isModificationQuery, bool isModificationQuery,
bool requiresMasterModification,
bool expectResults); bool expectResults);
static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor, static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor,
DestReceiver *destination, DestReceiver *destination,
@ -361,7 +362,8 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count)
{ {
PlannedStmt *planStatement = queryDesc->plannedstmt; PlannedStmt *planStatement = queryDesc->plannedstmt;
MultiPlan *multiPlan = GetMultiPlan(planStatement); MultiPlan *multiPlan = GetMultiPlan(planStatement);
List *taskList = multiPlan->workerJob->taskList; Job *workerJob = multiPlan->workerJob;
List *taskList = workerJob->taskList;
Task *task = NULL; Task *task = NULL;
EState *estate = queryDesc->estate; EState *estate = queryDesc->estate;
CmdType operation = queryDesc->operation; CmdType operation = queryDesc->operation;
@ -415,6 +417,7 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count)
{ {
bool resultsOK = false; bool resultsOK = false;
bool isModificationQuery = false; bool isModificationQuery = false;
bool requiresMasterEvaluation = workerJob->requiresMasterEvaluation;
if (operation == CMD_INSERT || operation == CMD_UPDATE || if (operation == CMD_INSERT || operation == CMD_UPDATE ||
operation == CMD_DELETE) operation == CMD_DELETE)
@ -429,6 +432,7 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count)
resultsOK = ExecuteTaskAndStoreResults(queryDesc, task, resultsOK = ExecuteTaskAndStoreResults(queryDesc, task,
isModificationQuery, isModificationQuery,
requiresMasterEvaluation,
sendTuples); sendTuples);
if (!resultsOK) if (!resultsOK)
{ {
@ -491,6 +495,7 @@ out:
static bool static bool
ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
bool isModificationQuery, bool isModificationQuery,
bool requiresMasterEvaluation,
bool expectResults) bool expectResults)
{ {
CmdType commandType = queryDesc->operation; CmdType commandType = queryDesc->operation;
@ -506,7 +511,7 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
bool gotResults = false; bool gotResults = false;
char *queryString = task->queryString; char *queryString = task->queryString;
if (isModificationQuery && task->requiresMasterEvaluation) if (isModificationQuery && requiresMasterEvaluation)
{ {
PlannedStmt *planStatement = queryDesc->plannedstmt; PlannedStmt *planStatement = queryDesc->plannedstmt;
MultiPlan *multiPlan = GetMultiPlan(planStatement); MultiPlan *multiPlan = GetMultiPlan(planStatement);

View File

@ -1677,6 +1677,7 @@ BuildJob(Query *jobQuery, List *dependedJobList)
job->jobId = UniqueJobId(); job->jobId = UniqueJobId();
job->jobQuery = jobQuery; job->jobQuery = jobQuery;
job->dependedJobList = dependedJobList; job->dependedJobList = dependedJobList;
job->requiresMasterEvaluation = false;
return job; return job;
} }

View File

@ -757,7 +757,6 @@ RouterModifyTask(Query *originalQuery, Query *query)
StringInfo queryString = makeStringInfo(); StringInfo queryString = makeStringInfo();
Task *modifyTask = NULL; Task *modifyTask = NULL;
bool upsertQuery = false; bool upsertQuery = false;
bool requiresMasterEvaluation = RequiresMasterEvaluation(originalQuery);
/* grab shared metadata lock to stop concurrent placement additions */ /* grab shared metadata lock to stop concurrent placement additions */
LockShardDistributionMetadata(shardId, ShareLock); LockShardDistributionMetadata(shardId, ShareLock);
@ -789,7 +788,6 @@ RouterModifyTask(Query *originalQuery, Query *query)
modifyTask->anchorShardId = shardId; modifyTask->anchorShardId = shardId;
modifyTask->dependedTaskList = NIL; modifyTask->dependedTaskList = NIL;
modifyTask->upsertQuery = upsertQuery; modifyTask->upsertQuery = upsertQuery;
modifyTask->requiresMasterEvaluation = requiresMasterEvaluation;
return modifyTask; return modifyTask;
} }
@ -1126,7 +1124,6 @@ RouterSelectTask(Query *originalQuery, Query *query,
task->anchorShardId = shardId; task->anchorShardId = shardId;
task->dependedTaskList = NIL; task->dependedTaskList = NIL;
task->upsertQuery = upsertQuery; task->upsertQuery = upsertQuery;
task->requiresMasterEvaluation = false;
*placementList = workerList; *placementList = workerList;
@ -1477,6 +1474,7 @@ RouterQueryJob(Query *query, Task *task, List *placementList)
Job *job = NULL; Job *job = NULL;
List *taskList = NIL; List *taskList = NIL;
TaskType taskType = task->taskType; TaskType taskType = task->taskType;
bool requiresMasterEvaluation = RequiresMasterEvaluation(query);
/* /*
* We send modify task to the first replica, otherwise we choose the target shard * We send modify task to the first replica, otherwise we choose the target shard
@ -1501,6 +1499,7 @@ RouterQueryJob(Query *query, Task *task, List *placementList)
job->subqueryPushdown = false; job->subqueryPushdown = false;
job->jobQuery = query; job->jobQuery = query;
job->taskList = taskList; job->taskList = taskList;
job->requiresMasterEvaluation = requiresMasterEvaluation;
return job; return job;
} }

View File

@ -389,6 +389,7 @@ OutJobFields(StringInfo str, const Job *node)
WRITE_NODE_FIELD(taskList); WRITE_NODE_FIELD(taskList);
WRITE_NODE_FIELD(dependedJobList); WRITE_NODE_FIELD(dependedJobList);
WRITE_BOOL_FIELD(subqueryPushdown); WRITE_BOOL_FIELD(subqueryPushdown);
WRITE_BOOL_FIELD(requiresMasterEvaluation);
} }
@ -492,7 +493,6 @@ OutTask(OUTFUNC_ARGS)
WRITE_BOOL_FIELD(assignmentConstrained); WRITE_BOOL_FIELD(assignmentConstrained);
WRITE_NODE_FIELD(taskExecution); WRITE_NODE_FIELD(taskExecution);
WRITE_BOOL_FIELD(upsertQuery); WRITE_BOOL_FIELD(upsertQuery);
WRITE_BOOL_FIELD(requiresMasterEvaluation);
} }
#if (PG_VERSION_NUM < 90600) #if (PG_VERSION_NUM < 90600)

View File

@ -161,6 +161,7 @@ readJobInfo(Job *local_node)
READ_NODE_FIELD(taskList); READ_NODE_FIELD(taskList);
READ_NODE_FIELD(dependedJobList); READ_NODE_FIELD(dependedJobList);
READ_BOOL_FIELD(subqueryPushdown); READ_BOOL_FIELD(subqueryPushdown);
READ_BOOL_FIELD(requiresMasterEvaluation);
} }
@ -288,7 +289,6 @@ ReadTask(READFUNC_ARGS)
READ_BOOL_FIELD(assignmentConstrained); READ_BOOL_FIELD(assignmentConstrained);
READ_NODE_FIELD(taskExecution); READ_NODE_FIELD(taskExecution);
READ_BOOL_FIELD(upsertQuery); READ_BOOL_FIELD(upsertQuery);
READ_BOOL_FIELD(requiresMasterEvaluation);
READ_DONE(); READ_DONE();
} }

View File

@ -120,6 +120,7 @@ typedef struct Job
List *taskList; List *taskList;
List *dependedJobList; List *dependedJobList;
bool subqueryPushdown; bool subqueryPushdown;
bool requiresMasterEvaluation; /* only applies to modify jobs */
} Job; } Job;
@ -168,7 +169,6 @@ typedef struct Task
uint64 shardId; /* only applies to shard fetch tasks */ uint64 shardId; /* only applies to shard fetch tasks */
TaskExecution *taskExecution; /* used by task tracker executor */ TaskExecution *taskExecution; /* used by task tracker executor */
bool upsertQuery; /* only applies to modify tasks */ bool upsertQuery; /* only applies to modify tasks */
bool requiresMasterEvaluation; /* only applies to modify tasks */
} Task; } Task;