Do not evaluate functions on the coordinator for SELECT queries (#3440)

Previously, the logic for evaluting the functions and the parameters
were the same. That ended-up evaluting the functions inaccurately
on the coordinator. Instead, split the function evaluation logic
from parameter evalution logic.
pull/3406/head
Önder Kalacı 2020-01-30 08:47:28 +01:00 committed by GitHub
parent e9c17b71a4
commit 8584cb005b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 348 additions and 52 deletions

View File

@ -245,7 +245,8 @@ CitusBeginScanWithCoordinatorProcessing(CustomScanState *node, EState *estate, i
/* citus only evaluates functions for modification queries */
bool modifyQueryRequiresMasterEvaluation =
workerJob->requiresMasterEvaluation && jobQuery->commandType != CMD_SELECT;
jobQuery->commandType != CMD_SELECT &&
(workerJob->requiresMasterEvaluation || workerJob->deferredPruning);
/*
* ExecuteMasterEvaluableFunctions handles both function evalation
@ -253,12 +254,36 @@ CitusBeginScanWithCoordinatorProcessing(CustomScanState *node, EState *estate, i
* there is a parameter on the distribution key. So, evaluate in both
* cases.
*/
bool shoudEvaluteFunctionsOrParams =
modifyQueryRequiresMasterEvaluation || workerJob->deferredPruning;
if (shoudEvaluteFunctionsOrParams)
if (modifyQueryRequiresMasterEvaluation)
{
/* evaluate functions and parameters */
ExecuteMasterEvaluableFunctions(jobQuery, planState);
/* evaluate functions and parameters for modification queries */
ExecuteMasterEvaluableFunctionsAndParameters(jobQuery, planState);
}
else if (jobQuery->commandType == CMD_SELECT && !workerJob->deferredPruning)
{
/* we'll use generated strings, no need to have the parameters anymore */
EState *executorState = planState->state;
ResetExecutionParameters(executorState);
/* we're done, we don't want to evaluate functions for SELECT queries */
return;
}
else if (jobQuery->commandType == CMD_SELECT && workerJob->deferredPruning)
{
/*
* Evaluate parameters, because the parameters are only avaliable on the
* coordinator and are required for pruning.
*
* But, we don't want to evaluate functions for read-only queries on the
* coordinator as the volatile functions could yield different
* results per shard (also per row) and could have side-effects.
*
* Note that Citus already errors out for modification queries during
* planning when the query involve any volatile function that might
* diverge the shards as such functions are expected to yield different
* results per shard (also per row).
*/
ExecuteMasterEvaluableParameters(jobQuery, planState);
}
/*

View File

@ -29,10 +29,11 @@
/* private function declarations */
static bool IsVarNode(Node *node);
static Expr * citus_evaluate_expr(Expr *expr, Oid result_type, int32 result_typmod,
Oid result_collation, PlanState *planState);
Oid result_collation,
MasterEvaluationContext *masterEvaluationContext);
static bool CitusIsVolatileFunctionIdChecker(Oid func_id, void *context);
static bool CitusIsMutableFunctionIdChecker(Oid func_id, void *context);
static bool ShouldEvaluateExpressionType(NodeTag nodeTag);
/*
* RequiresMastereEvaluation returns the executor needs to reparse and
@ -47,13 +48,36 @@ RequiresMasterEvaluation(Query *query)
/*
* ExecuteMasterEvaluableFunctions evaluates expressions that can be resolved
* to a constant.
* ExecuteMasterEvaluableFunctions evaluates expressions and external parameters
* that can be resolved to a constant.
*/
void
ExecuteMasterEvaluableFunctions(Query *query, PlanState *planState)
ExecuteMasterEvaluableFunctionsAndParameters(Query *query, PlanState *planState)
{
PartiallyEvaluateExpression((Node *) query, planState);
MasterEvaluationContext masterEvaluationContext;
masterEvaluationContext.planState = planState;
masterEvaluationContext.evaluateParams = true;
masterEvaluationContext.evaluateFunctions = true;
PartiallyEvaluateExpression((Node *) query, &masterEvaluationContext);
}
/*
* ExecuteMasterEvaluableParameters evaluates external paramaters that can be
* resolved to a constant.
*/
void
ExecuteMasterEvaluableParameters(Query *query, PlanState *planState)
{
MasterEvaluationContext masterEvaluationContext;
masterEvaluationContext.planState = planState;
masterEvaluationContext.evaluateParams = true;
masterEvaluationContext.evaluateFunctions = false;
PartiallyEvaluateExpression((Node *) query, &masterEvaluationContext);
}
@ -64,16 +88,16 @@ ExecuteMasterEvaluableFunctions(Query *query, PlanState *planState)
* on the master.
*/
Node *
PartiallyEvaluateExpression(Node *expression, PlanState *planState)
PartiallyEvaluateExpression(Node *expression,
MasterEvaluationContext *masterEvaluationContext)
{
if (expression == NULL || IsA(expression, Const))
{
return expression;
}
switch (nodeTag(expression))
{
case T_Param:
NodeTag nodeTag = nodeTag(expression);
if (nodeTag == T_Param)
{
Param *param = (Param *) expression;
if (param->paramkind == PARAM_SUBLINK)
@ -81,10 +105,55 @@ PartiallyEvaluateExpression(Node *expression, PlanState *planState)
/* ExecInitExpr cannot handle PARAM_SUBLINK */
return expression;
}
return (Node *) citus_evaluate_expr((Expr *) expression,
exprType(expression),
exprTypmod(expression),
exprCollation(expression),
masterEvaluationContext);
}
else if (ShouldEvaluateExpressionType(nodeTag))
{
if (FindNodeCheck(expression, IsVarNode))
{
return (Node *) expression_tree_mutator(expression,
PartiallyEvaluateExpression,
masterEvaluationContext);
}
/* fallthrough */
return (Node *) citus_evaluate_expr((Expr *) expression,
exprType(expression),
exprTypmod(expression),
exprCollation(expression),
masterEvaluationContext);
}
else if (nodeTag == T_Query)
{
return (Node *) query_tree_mutator((Query *) expression,
PartiallyEvaluateExpression,
masterEvaluationContext,
QTW_DONT_COPY_QUERY);
}
else
{
return (Node *) expression_tree_mutator(expression,
PartiallyEvaluateExpression,
masterEvaluationContext);
}
return expression;
}
/*
* ShouldEvaluateExpressionType returns true if Citus should evaluate the
* input node on the coordinator.
*/
static bool
ShouldEvaluateExpressionType(NodeTag nodeTag)
{
switch (nodeTag)
{
case T_FuncExpr:
case T_OpExpr:
case T_DistinctExpr:
@ -97,38 +166,14 @@ PartiallyEvaluateExpression(Node *expression, PlanState *planState)
case T_RelabelType:
case T_CoerceToDomain:
{
if (FindNodeCheck(expression, IsVarNode))
{
return (Node *) expression_tree_mutator(expression,
PartiallyEvaluateExpression,
planState);
}
return (Node *) citus_evaluate_expr((Expr *) expression,
exprType(expression),
exprTypmod(expression),
exprCollation(expression),
planState);
}
case T_Query:
{
return (Node *) query_tree_mutator((Query *) expression,
PartiallyEvaluateExpression,
planState, QTW_DONT_COPY_QUERY);
return true;
}
default:
{
return (Node *) expression_tree_mutator(expression,
PartiallyEvaluateExpression,
planState);
return false;
}
}
return expression;
}
/*
* IsVarNode returns whether a node is a Var (column reference).
@ -150,8 +195,10 @@ IsVarNode(Node *node)
*/
static Expr *
citus_evaluate_expr(Expr *expr, Oid result_type, int32 result_typmod,
Oid result_collation, PlanState *planState)
Oid result_collation,
MasterEvaluationContext *masterEvaluationContext)
{
PlanState *planState = NULL;
EState *estate;
ExprState *exprstate;
ExprContext *econtext;
@ -160,6 +207,28 @@ citus_evaluate_expr(Expr *expr, Oid result_type, int32 result_typmod,
int16 resultTypLen;
bool resultTypByVal;
if (masterEvaluationContext)
{
planState = masterEvaluationContext->planState;
if (IsA(expr, Param))
{
if (!masterEvaluationContext->evaluateParams)
{
/* bail out, the caller doesn't want params to be evaluated */
return expr;
}
}
else if (!masterEvaluationContext->evaluateFunctions)
{
/* should only get here for node types we should evaluate */
Assert(ShouldEvaluateExpressionType(nodeTag(expr)));
/* bail out, the caller doesn't want functions/expressions to be evaluated */
return expr;
}
}
/*
* To use the executor, we need an EState.
*/

View File

@ -15,9 +15,25 @@
#include "nodes/nodes.h"
#include "nodes/parsenodes.h"
/*
* This struct is used to pass information to master
* evaluation logic.
*/
typedef struct MasterEvaluationContext
{
PlanState *planState;
bool evaluateParams;
bool evaluateFunctions;
} MasterEvaluationContext;
extern bool RequiresMasterEvaluation(Query *query);
extern void ExecuteMasterEvaluableFunctions(Query *query, PlanState *planState);
extern Node * PartiallyEvaluateExpression(Node *expression, PlanState *planState);
extern void ExecuteMasterEvaluableFunctionsAndParameters(Query *query,
PlanState *planState);
extern void ExecuteMasterEvaluableParameters(Query *query, PlanState *planState);
extern Node * PartiallyEvaluateExpression(Node *expression,
MasterEvaluationContext *masterEvaluationContext);
extern bool CitusIsVolatileFunction(Node *node);
extern bool CitusIsMutableFunction(Node *node);

View File

@ -0,0 +1,129 @@
-- This test relies on metadata being synced
-- that's why is should be executed on MX schedule
CREATE SCHEMA master_evaluation;
SET search_path TO master_evaluation;
-- create a volatile function that returns the local node id
CREATE OR REPLACE FUNCTION get_local_node_id_volatile()
RETURNS INT AS $$
DECLARE localGroupId int;
BEGIN
SELECT groupid INTO localGroupId FROM pg_dist_local_group;
RETURN localGroupId;
END; $$ language plpgsql VOLATILE;
SELECT create_distributed_function('get_local_node_id_volatile()');
create_distributed_function
---------------------------------------------------------------------
(1 row)
CREATE TABLE master_evaluation_table (key int, value int);
SELECT create_distributed_table('master_evaluation_table', 'key');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- show that local id is 0, we'll use this information
SELECT get_local_node_id_volatile();
get_local_node_id_volatile
---------------------------------------------------------------------
0
(1 row)
-- load data such that we have 1 row per node
INSERT INTO master_evaluation_table SELECT i, 0 FROM generate_series(0,100)i;
-- we expect that the function is evaluated on the worker node, so we should get a row
SELECT get_local_node_id_volatile() > 0 FROM master_evaluation_table WHERE key = 1;
?column?
---------------------------------------------------------------------
t
(1 row)
-- make sure that it is also true for fast-path router queries with paramaters
PREPARE p1(int) AS SELECT get_local_node_id_volatile() > 0 FROM master_evaluation_table WHERE key = $1;
execute p1(1);
?column?
---------------------------------------------------------------------
t
(1 row)
execute p1(2);
?column?
---------------------------------------------------------------------
t
(1 row)
execute p1(3);
?column?
---------------------------------------------------------------------
t
(1 row)
execute p1(4);
?column?
---------------------------------------------------------------------
t
(1 row)
execute p1(5);
?column?
---------------------------------------------------------------------
t
(1 row)
execute p1(6);
?column?
---------------------------------------------------------------------
t
(1 row)
execute p1(7);
?column?
---------------------------------------------------------------------
t
(1 row)
execute p1(8);
?column?
---------------------------------------------------------------------
t
(1 row)
-- for multi-shard queries, we still expect the evaluation to happen on the workers
SELECT count(*), max(get_local_node_id_volatile()) != 0, min(get_local_node_id_volatile()) != 0 FROM master_evaluation_table;
count | ?column? | ?column?
---------------------------------------------------------------------
101 | t | t
(1 row)
-- when executed locally, we expect to get the result from the coordinator
SELECT (SELECT count(*) FROM master_evaluation_table), get_local_node_id_volatile() = 0;
count | ?column?
---------------------------------------------------------------------
101 | t
(1 row)
-- make sure that we get the results from the workers when the query is sent to workers
SET citus.task_assignment_policy TO "round-robin";
SELECT (SELECT count(*) FROM master_evaluation_table), get_local_node_id_volatile() > 0;
count | ?column?
---------------------------------------------------------------------
101 | t
(1 row)
RESET citus.task_assignment_policy;
-- for multi-shard SELECTs, we don't try to evaluate on the coordinator
SELECT min(get_local_node_id_volatile()) > 0 FROM master_evaluation_table;
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) FROM master_evaluation_table WHERE value >= get_local_node_id_volatile();
count
---------------------------------------------------------------------
0
(1 row)
SET client_min_messages TO ERROR;
DROP SCHEMA master_evaluation CASCADE;

View File

@ -36,7 +36,7 @@ test: recursive_dml_queries_mx multi_mx_truncate_from_worker
test: multi_mx_repartition_udt_prepare mx_foreign_key_to_reference_table
test: multi_mx_repartition_join_w1 multi_mx_repartition_join_w2 multi_mx_repartition_udt_w1 multi_mx_repartition_udt_w2
test: multi_mx_metadata
test: multi_mx_call
test: multi_mx_call master_evaluation
test: multi_mx_function_call_delegation
test: multi_mx_modifications local_shard_execution
test: multi_mx_transaction_recovery

View File

@ -0,0 +1,57 @@
-- This test relies on metadata being synced
-- that's why is should be executed on MX schedule
CREATE SCHEMA master_evaluation;
SET search_path TO master_evaluation;
-- create a volatile function that returns the local node id
CREATE OR REPLACE FUNCTION get_local_node_id_volatile()
RETURNS INT AS $$
DECLARE localGroupId int;
BEGIN
SELECT groupid INTO localGroupId FROM pg_dist_local_group;
RETURN localGroupId;
END; $$ language plpgsql VOLATILE;
SELECT create_distributed_function('get_local_node_id_volatile()');
CREATE TABLE master_evaluation_table (key int, value int);
SELECT create_distributed_table('master_evaluation_table', 'key');
-- show that local id is 0, we'll use this information
SELECT get_local_node_id_volatile();
-- load data such that we have 1 row per node
INSERT INTO master_evaluation_table SELECT i, 0 FROM generate_series(0,100)i;
-- we expect that the function is evaluated on the worker node, so we should get a row
SELECT get_local_node_id_volatile() > 0 FROM master_evaluation_table WHERE key = 1;
-- make sure that it is also true for fast-path router queries with paramaters
PREPARE p1(int) AS SELECT get_local_node_id_volatile() > 0 FROM master_evaluation_table WHERE key = $1;
execute p1(1);
execute p1(2);
execute p1(3);
execute p1(4);
execute p1(5);
execute p1(6);
execute p1(7);
execute p1(8);
-- for multi-shard queries, we still expect the evaluation to happen on the workers
SELECT count(*), max(get_local_node_id_volatile()) != 0, min(get_local_node_id_volatile()) != 0 FROM master_evaluation_table;
-- when executed locally, we expect to get the result from the coordinator
SELECT (SELECT count(*) FROM master_evaluation_table), get_local_node_id_volatile() = 0;
-- make sure that we get the results from the workers when the query is sent to workers
SET citus.task_assignment_policy TO "round-robin";
SELECT (SELECT count(*) FROM master_evaluation_table), get_local_node_id_volatile() > 0;
RESET citus.task_assignment_policy;
-- for multi-shard SELECTs, we don't try to evaluate on the coordinator
SELECT min(get_local_node_id_volatile()) > 0 FROM master_evaluation_table;
SELECT count(*) FROM master_evaluation_table WHERE value >= get_local_node_id_volatile();
SET client_min_messages TO ERROR;
DROP SCHEMA master_evaluation CASCADE;