diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 25f3eb65d..21785f26a 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -325,7 +325,7 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, bool gotResults = false; char *queryString = task->queryString; - if (isModificationQuery) + if (isModificationQuery && task->requiresMasterEvaluation) { PlannedStmt *planStatement = queryDesc->plannedstmt; MultiPlan *multiPlan = GetMultiPlan(planStatement); diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index b43d94587..cb093744d 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -21,6 +21,7 @@ #include "access/skey.h" #endif #include "access/xact.h" +#include "distributed/citus_clauses.h" #include "distributed/citus_nodes.h" #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" @@ -710,6 +711,7 @@ RouterModifyTask(Query *query) StringInfo queryString = makeStringInfo(); Task *modifyTask = NULL; bool upsertQuery = false; + bool requiresMasterEvaluation = RequiresMasterEvaluation(query); /* grab shared metadata lock to stop concurrent placement additions */ LockShardDistributionMetadata(shardId, ShareLock); @@ -772,6 +774,7 @@ RouterModifyTask(Query *query) modifyTask->anchorShardId = shardId; modifyTask->dependedTaskList = NIL; modifyTask->upsertQuery = upsertQuery; + modifyTask->requiresMasterEvaluation = requiresMasterEvaluation; return modifyTask; } @@ -1132,6 +1135,7 @@ RouterSelectTask(Query *query) task->anchorShardId = shardId; task->dependedTaskList = NIL; task->upsertQuery = upsertQuery; + task->requiresMasterEvaluation = false; return task; } diff --git a/src/backend/distributed/utils/citus_clauses.c b/src/backend/distributed/utils/citus_clauses.c index 780702398..9a93c9160 100644 --- a/src/backend/distributed/utils/citus_clauses.c +++ b/src/backend/distributed/utils/citus_clauses.c @@ -28,6 +28,32 @@ static Expr * citus_evaluate_expr(Expr *expr, Oid result_type, int32 result_typm Oid result_collation); +/* + * Whether the executor needs to reparse and try to execute this query. + */ +bool +RequiresMasterEvaluation(Query *query) +{ + ListCell *targetEntryCell = NULL; + + foreach(targetEntryCell, query->targetList) + { + TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); + + if (contain_mutable_functions((Node *) targetEntry->expr)) + { + return true; + } + } + + if (query->jointree && query->jointree->quals) + { + return contain_mutable_functions((Node *) query->jointree->quals); + } + + return false; +} + /* * Looks at each TargetEntry of the query and the jointree quals, evaluating * any sub-expressions which don't include Vars. diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 2510cc28c..124185ff4 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -458,6 +458,7 @@ _outTask(StringInfo str, const Task *node) WRITE_BOOL_FIELD(assignmentConstrained); WRITE_NODE_FIELD(taskExecution); WRITE_BOOL_FIELD(upsertQuery); + WRITE_BOOL_FIELD(requiresMasterEvaluation); } diff --git a/src/backend/distributed/utils/citus_readfuncs_94.c b/src/backend/distributed/utils/citus_readfuncs_94.c index 4be2d093f..d7cb9d61f 100644 --- a/src/backend/distributed/utils/citus_readfuncs_94.c +++ b/src/backend/distributed/utils/citus_readfuncs_94.c @@ -1418,6 +1418,7 @@ _readTask(void) READ_BOOL_FIELD(assignmentConstrained); READ_NODE_FIELD(taskExecution); READ_BOOL_FIELD(upsertQuery); + READ_BOOL_FIELD(requiresMasterEvaluation); READ_DONE(); } diff --git a/src/backend/distributed/utils/citus_readfuncs_95.c b/src/backend/distributed/utils/citus_readfuncs_95.c index 89fc16753..caf223c00 100644 --- a/src/backend/distributed/utils/citus_readfuncs_95.c +++ b/src/backend/distributed/utils/citus_readfuncs_95.c @@ -1507,6 +1507,7 @@ _readTask(void) READ_BOOL_FIELD(assignmentConstrained); READ_NODE_FIELD(taskExecution); READ_BOOL_FIELD(upsertQuery); + READ_BOOL_FIELD(requiresMasterEvaluation); READ_DONE(); } diff --git a/src/include/distributed/citus_clauses.h b/src/include/distributed/citus_clauses.h index d6f98edfc..acdc3e05a 100644 --- a/src/include/distributed/citus_clauses.h +++ b/src/include/distributed/citus_clauses.h @@ -14,6 +14,7 @@ #include "nodes/nodes.h" #include "nodes/parsenodes.h" +extern bool RequiresMasterEvaluation(Query *query); extern void ExecuteMasterEvaluableFunctions(Query *query); #endif /* CITUS_NODEFUNCS_H */ diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index ff769d18e..032b8f49f 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -141,6 +141,9 @@ typedef struct MapMergeJob * as compute tasks; and shard fetch, map fetch, and merge fetch tasks are data * fetch tasks. We also forward declare the task execution struct here to avoid * including the executor header files. + * + * NB: Changing this requires also changing _outTask in citus_outfuncs and _readTask + * in citus_readfuncs to correctly (de)serialize this struct. */ typedef struct TaskExecution TaskExecution; @@ -156,12 +159,13 @@ typedef struct Task List *dependedTaskList; /* only applies to compute tasks */ uint32 partitionId; - uint32 upstreamTaskId; /* only applies to data fetch tasks */ - ShardInterval *shardInterval; /* only applies to merge tasks */ - bool assignmentConstrained; /* only applies to merge tasks */ - uint64 shardId; /* only applies to shard fetch tasks */ - TaskExecution *taskExecution; /* used by task tracker executor */ - bool upsertQuery; /* only applies to modify tasks */ + uint32 upstreamTaskId; /* only applies to data fetch tasks */ + ShardInterval *shardInterval; /* only applies to merge tasks */ + bool assignmentConstrained; /* only applies to merge tasks */ + uint64 shardId; /* only applies to shard fetch tasks */ + TaskExecution *taskExecution; /* used by task tracker executor */ + bool upsertQuery; /* only applies to modify tasks */ + bool requiresMasterEvaluation; /* only applies to modify tasks */ } Task;