Only reparse queries if the planner flags them for reparsing

pull/488/head
Brian Cloutier 2016-07-11 19:29:13 +03:00 committed by Andres Freund
parent 4820366a6f
commit af9515f669
8 changed files with 45 additions and 7 deletions

View File

@ -325,7 +325,7 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
bool gotResults = false;
char *queryString = task->queryString;
if (isModificationQuery)
if (isModificationQuery && task->requiresMasterEvaluation)
{
PlannedStmt *planStatement = queryDesc->plannedstmt;
MultiPlan *multiPlan = GetMultiPlan(planStatement);

View File

@ -21,6 +21,7 @@
#include "access/skey.h"
#endif
#include "access/xact.h"
#include "distributed/citus_clauses.h"
#include "distributed/citus_nodes.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/metadata_cache.h"
@ -710,6 +711,7 @@ RouterModifyTask(Query *query)
StringInfo queryString = makeStringInfo();
Task *modifyTask = NULL;
bool upsertQuery = false;
bool requiresMasterEvaluation = RequiresMasterEvaluation(query);
/* grab shared metadata lock to stop concurrent placement additions */
LockShardDistributionMetadata(shardId, ShareLock);
@ -772,6 +774,7 @@ RouterModifyTask(Query *query)
modifyTask->anchorShardId = shardId;
modifyTask->dependedTaskList = NIL;
modifyTask->upsertQuery = upsertQuery;
modifyTask->requiresMasterEvaluation = requiresMasterEvaluation;
return modifyTask;
}
@ -1132,6 +1135,7 @@ RouterSelectTask(Query *query)
task->anchorShardId = shardId;
task->dependedTaskList = NIL;
task->upsertQuery = upsertQuery;
task->requiresMasterEvaluation = false;
return task;
}

View File

@ -28,6 +28,32 @@ static Expr * citus_evaluate_expr(Expr *expr, Oid result_type, int32 result_typm
Oid result_collation);
/*
* Whether the executor needs to reparse and try to execute this query.
*/
bool
RequiresMasterEvaluation(Query *query)
{
ListCell *targetEntryCell = NULL;
foreach(targetEntryCell, query->targetList)
{
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
if (contain_mutable_functions((Node *) targetEntry->expr))
{
return true;
}
}
if (query->jointree && query->jointree->quals)
{
return contain_mutable_functions((Node *) query->jointree->quals);
}
return false;
}
/*
* Looks at each TargetEntry of the query and the jointree quals, evaluating
* any sub-expressions which don't include Vars.

View File

@ -458,6 +458,7 @@ _outTask(StringInfo str, const Task *node)
WRITE_BOOL_FIELD(assignmentConstrained);
WRITE_NODE_FIELD(taskExecution);
WRITE_BOOL_FIELD(upsertQuery);
WRITE_BOOL_FIELD(requiresMasterEvaluation);
}

View File

@ -1418,6 +1418,7 @@ _readTask(void)
READ_BOOL_FIELD(assignmentConstrained);
READ_NODE_FIELD(taskExecution);
READ_BOOL_FIELD(upsertQuery);
READ_BOOL_FIELD(requiresMasterEvaluation);
READ_DONE();
}

View File

@ -1507,6 +1507,7 @@ _readTask(void)
READ_BOOL_FIELD(assignmentConstrained);
READ_NODE_FIELD(taskExecution);
READ_BOOL_FIELD(upsertQuery);
READ_BOOL_FIELD(requiresMasterEvaluation);
READ_DONE();
}

View File

@ -14,6 +14,7 @@
#include "nodes/nodes.h"
#include "nodes/parsenodes.h"
extern bool RequiresMasterEvaluation(Query *query);
extern void ExecuteMasterEvaluableFunctions(Query *query);
#endif /* CITUS_NODEFUNCS_H */

View File

@ -141,6 +141,9 @@ typedef struct MapMergeJob
* as compute tasks; and shard fetch, map fetch, and merge fetch tasks are data
* fetch tasks. We also forward declare the task execution struct here to avoid
* including the executor header files.
*
* NB: Changing this requires also changing _outTask in citus_outfuncs and _readTask
* in citus_readfuncs to correctly (de)serialize this struct.
*/
typedef struct TaskExecution TaskExecution;
@ -156,12 +159,13 @@ typedef struct Task
List *dependedTaskList; /* only applies to compute tasks */
uint32 partitionId;
uint32 upstreamTaskId; /* only applies to data fetch tasks */
ShardInterval *shardInterval; /* only applies to merge tasks */
bool assignmentConstrained; /* only applies to merge tasks */
uint64 shardId; /* only applies to shard fetch tasks */
TaskExecution *taskExecution; /* used by task tracker executor */
bool upsertQuery; /* only applies to modify tasks */
uint32 upstreamTaskId; /* only applies to data fetch tasks */
ShardInterval *shardInterval; /* only applies to merge tasks */
bool assignmentConstrained; /* only applies to merge tasks */
uint64 shardId; /* only applies to shard fetch tasks */
TaskExecution *taskExecution; /* used by task tracker executor */
bool upsertQuery; /* only applies to modify tasks */
bool requiresMasterEvaluation; /* only applies to modify tasks */
} Task;