From 9d98acfb6d4e660e38841a6ef169eff8dd8e0ac5 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Thu, 6 Oct 2016 00:17:48 +0200 Subject: [PATCH] Move requiresMasterEvaluation from Task to Job --- src/backend/distributed/executor/multi_router_executor.c | 9 +++++++-- src/backend/distributed/planner/multi_physical_planner.c | 1 + src/backend/distributed/planner/multi_router_planner.c | 5 ++--- src/backend/distributed/utils/citus_outfuncs.c | 2 +- src/backend/distributed/utils/citus_readfuncs.c | 2 +- src/include/distributed/multi_physical_planner.h | 2 +- 6 files changed, 13 insertions(+), 8 deletions(-) diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 9eb22d029..4476e058c 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -91,6 +91,7 @@ static HTAB * CreateXactParticipantHash(void); static bool ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, bool isModificationQuery, + bool requiresMasterModification, bool expectResults); static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor, DestReceiver *destination, @@ -361,7 +362,8 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count) { PlannedStmt *planStatement = queryDesc->plannedstmt; MultiPlan *multiPlan = GetMultiPlan(planStatement); - List *taskList = multiPlan->workerJob->taskList; + Job *workerJob = multiPlan->workerJob; + List *taskList = workerJob->taskList; Task *task = NULL; EState *estate = queryDesc->estate; CmdType operation = queryDesc->operation; @@ -415,6 +417,7 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count) { bool resultsOK = false; bool isModificationQuery = false; + bool requiresMasterEvaluation = workerJob->requiresMasterEvaluation; if (operation == CMD_INSERT || operation == CMD_UPDATE || operation == CMD_DELETE) @@ -429,6 +432,7 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count) resultsOK = ExecuteTaskAndStoreResults(queryDesc, task, isModificationQuery, + requiresMasterEvaluation, sendTuples); if (!resultsOK) { @@ -491,6 +495,7 @@ out: static bool ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, bool isModificationQuery, + bool requiresMasterEvaluation, bool expectResults) { CmdType commandType = queryDesc->operation; @@ -506,7 +511,7 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, bool gotResults = false; char *queryString = task->queryString; - if (isModificationQuery && task->requiresMasterEvaluation) + if (isModificationQuery && requiresMasterEvaluation) { PlannedStmt *planStatement = queryDesc->plannedstmt; MultiPlan *multiPlan = GetMultiPlan(planStatement); diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 3251ac714..70e819d6d 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -1677,6 +1677,7 @@ BuildJob(Query *jobQuery, List *dependedJobList) job->jobId = UniqueJobId(); job->jobQuery = jobQuery; job->dependedJobList = dependedJobList; + job->requiresMasterEvaluation = false; return job; } diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 3de008c60..56a7c357f 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -757,7 +757,6 @@ RouterModifyTask(Query *originalQuery, Query *query) StringInfo queryString = makeStringInfo(); Task *modifyTask = NULL; bool upsertQuery = false; - bool requiresMasterEvaluation = RequiresMasterEvaluation(originalQuery); /* grab shared metadata lock to stop concurrent placement additions */ LockShardDistributionMetadata(shardId, ShareLock); @@ -789,7 +788,6 @@ RouterModifyTask(Query *originalQuery, Query *query) modifyTask->anchorShardId = shardId; modifyTask->dependedTaskList = NIL; modifyTask->upsertQuery = upsertQuery; - modifyTask->requiresMasterEvaluation = requiresMasterEvaluation; return modifyTask; } @@ -1126,7 +1124,6 @@ RouterSelectTask(Query *originalQuery, Query *query, task->anchorShardId = shardId; task->dependedTaskList = NIL; task->upsertQuery = upsertQuery; - task->requiresMasterEvaluation = false; *placementList = workerList; @@ -1477,6 +1474,7 @@ RouterQueryJob(Query *query, Task *task, List *placementList) Job *job = NULL; List *taskList = NIL; TaskType taskType = task->taskType; + bool requiresMasterEvaluation = RequiresMasterEvaluation(query); /* * We send modify task to the first replica, otherwise we choose the target shard @@ -1501,6 +1499,7 @@ RouterQueryJob(Query *query, Task *task, List *placementList) job->subqueryPushdown = false; job->jobQuery = query; job->taskList = taskList; + job->requiresMasterEvaluation = requiresMasterEvaluation; return job; } diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 3c4cd213b..49b41db27 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -389,6 +389,7 @@ OutJobFields(StringInfo str, const Job *node) WRITE_NODE_FIELD(taskList); WRITE_NODE_FIELD(dependedJobList); WRITE_BOOL_FIELD(subqueryPushdown); + WRITE_BOOL_FIELD(requiresMasterEvaluation); } @@ -492,7 +493,6 @@ OutTask(OUTFUNC_ARGS) WRITE_BOOL_FIELD(assignmentConstrained); WRITE_NODE_FIELD(taskExecution); WRITE_BOOL_FIELD(upsertQuery); - WRITE_BOOL_FIELD(requiresMasterEvaluation); } #if (PG_VERSION_NUM < 90600) diff --git a/src/backend/distributed/utils/citus_readfuncs.c b/src/backend/distributed/utils/citus_readfuncs.c index 1b85b0b93..256dc20a2 100644 --- a/src/backend/distributed/utils/citus_readfuncs.c +++ b/src/backend/distributed/utils/citus_readfuncs.c @@ -161,6 +161,7 @@ readJobInfo(Job *local_node) READ_NODE_FIELD(taskList); READ_NODE_FIELD(dependedJobList); READ_BOOL_FIELD(subqueryPushdown); + READ_BOOL_FIELD(requiresMasterEvaluation); } @@ -288,7 +289,6 @@ ReadTask(READFUNC_ARGS) READ_BOOL_FIELD(assignmentConstrained); READ_NODE_FIELD(taskExecution); READ_BOOL_FIELD(upsertQuery); - READ_BOOL_FIELD(requiresMasterEvaluation); READ_DONE(); } diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 351ab9171..3d12fa3aa 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -120,6 +120,7 @@ typedef struct Job List *taskList; List *dependedJobList; bool subqueryPushdown; + bool requiresMasterEvaluation; /* only applies to modify jobs */ } Job; @@ -168,7 +169,6 @@ typedef struct Task uint64 shardId; /* only applies to shard fetch tasks */ TaskExecution *taskExecution; /* used by task tracker executor */ bool upsertQuery; /* only applies to modify tasks */ - bool requiresMasterEvaluation; /* only applies to modify tasks */ } Task;