From 8584cb005b0af87cfc6fc895b06726554d564748 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96nder=20Kalac=C4=B1?= Date: Thu, 30 Jan 2020 08:47:28 +0100 Subject: [PATCH] Do not evaluate functions on the coordinator for SELECT queries (#3440) Previously, the logic for evaluting the functions and the parameters were the same. That ended-up evaluting the functions inaccurately on the coordinator. Instead, split the function evaluation logic from parameter evalution logic. --- .../distributed/executor/citus_custom_scan.c | 37 ++++- src/backend/distributed/utils/citus_clauses.c | 155 +++++++++++++----- src/include/distributed/citus_clauses.h | 20 ++- .../regress/expected/master_evaluation.out | 129 +++++++++++++++ src/test/regress/multi_mx_schedule | 2 +- src/test/regress/sql/master_evaluation.sql | 57 +++++++ 6 files changed, 348 insertions(+), 52 deletions(-) create mode 100644 src/test/regress/expected/master_evaluation.out create mode 100644 src/test/regress/sql/master_evaluation.sql diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index d2e5cd51f..075bd8732 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -245,7 +245,8 @@ CitusBeginScanWithCoordinatorProcessing(CustomScanState *node, EState *estate, i /* citus only evaluates functions for modification queries */ bool modifyQueryRequiresMasterEvaluation = - workerJob->requiresMasterEvaluation && jobQuery->commandType != CMD_SELECT; + jobQuery->commandType != CMD_SELECT && + (workerJob->requiresMasterEvaluation || workerJob->deferredPruning); /* * ExecuteMasterEvaluableFunctions handles both function evalation @@ -253,12 +254,36 @@ CitusBeginScanWithCoordinatorProcessing(CustomScanState *node, EState *estate, i * there is a parameter on the distribution key. So, evaluate in both * cases. */ - bool shoudEvaluteFunctionsOrParams = - modifyQueryRequiresMasterEvaluation || workerJob->deferredPruning; - if (shoudEvaluteFunctionsOrParams) + if (modifyQueryRequiresMasterEvaluation) { - /* evaluate functions and parameters */ - ExecuteMasterEvaluableFunctions(jobQuery, planState); + /* evaluate functions and parameters for modification queries */ + ExecuteMasterEvaluableFunctionsAndParameters(jobQuery, planState); + } + else if (jobQuery->commandType == CMD_SELECT && !workerJob->deferredPruning) + { + /* we'll use generated strings, no need to have the parameters anymore */ + EState *executorState = planState->state; + ResetExecutionParameters(executorState); + + /* we're done, we don't want to evaluate functions for SELECT queries */ + return; + } + else if (jobQuery->commandType == CMD_SELECT && workerJob->deferredPruning) + { + /* + * Evaluate parameters, because the parameters are only avaliable on the + * coordinator and are required for pruning. + * + * But, we don't want to evaluate functions for read-only queries on the + * coordinator as the volatile functions could yield different + * results per shard (also per row) and could have side-effects. + * + * Note that Citus already errors out for modification queries during + * planning when the query involve any volatile function that might + * diverge the shards as such functions are expected to yield different + * results per shard (also per row). + */ + ExecuteMasterEvaluableParameters(jobQuery, planState); } /* diff --git a/src/backend/distributed/utils/citus_clauses.c b/src/backend/distributed/utils/citus_clauses.c index 304a22024..4969d27d6 100644 --- a/src/backend/distributed/utils/citus_clauses.c +++ b/src/backend/distributed/utils/citus_clauses.c @@ -29,10 +29,11 @@ /* private function declarations */ static bool IsVarNode(Node *node); static Expr * citus_evaluate_expr(Expr *expr, Oid result_type, int32 result_typmod, - Oid result_collation, PlanState *planState); + Oid result_collation, + MasterEvaluationContext *masterEvaluationContext); static bool CitusIsVolatileFunctionIdChecker(Oid func_id, void *context); static bool CitusIsMutableFunctionIdChecker(Oid func_id, void *context); - +static bool ShouldEvaluateExpressionType(NodeTag nodeTag); /* * RequiresMastereEvaluation returns the executor needs to reparse and @@ -47,13 +48,36 @@ RequiresMasterEvaluation(Query *query) /* - * ExecuteMasterEvaluableFunctions evaluates expressions that can be resolved - * to a constant. + * ExecuteMasterEvaluableFunctions evaluates expressions and external parameters + * that can be resolved to a constant. */ void -ExecuteMasterEvaluableFunctions(Query *query, PlanState *planState) +ExecuteMasterEvaluableFunctionsAndParameters(Query *query, PlanState *planState) { - PartiallyEvaluateExpression((Node *) query, planState); + MasterEvaluationContext masterEvaluationContext; + + masterEvaluationContext.planState = planState; + masterEvaluationContext.evaluateParams = true; + masterEvaluationContext.evaluateFunctions = true; + + PartiallyEvaluateExpression((Node *) query, &masterEvaluationContext); +} + + +/* + * ExecuteMasterEvaluableParameters evaluates external paramaters that can be + * resolved to a constant. + */ +void +ExecuteMasterEvaluableParameters(Query *query, PlanState *planState) +{ + MasterEvaluationContext masterEvaluationContext; + + masterEvaluationContext.planState = planState; + masterEvaluationContext.evaluateParams = true; + masterEvaluationContext.evaluateFunctions = false; + + PartiallyEvaluateExpression((Node *) query, &masterEvaluationContext); } @@ -64,27 +88,72 @@ ExecuteMasterEvaluableFunctions(Query *query, PlanState *planState) * on the master. */ Node * -PartiallyEvaluateExpression(Node *expression, PlanState *planState) +PartiallyEvaluateExpression(Node *expression, + MasterEvaluationContext *masterEvaluationContext) { if (expression == NULL || IsA(expression, Const)) { return expression; } - switch (nodeTag(expression)) + NodeTag nodeTag = nodeTag(expression); + if (nodeTag == T_Param) { - case T_Param: + Param *param = (Param *) expression; + if (param->paramkind == PARAM_SUBLINK) { - Param *param = (Param *) expression; - if (param->paramkind == PARAM_SUBLINK) - { - /* ExecInitExpr cannot handle PARAM_SUBLINK */ - return expression; - } + /* ExecInitExpr cannot handle PARAM_SUBLINK */ + return expression; } - /* fallthrough */ + return (Node *) citus_evaluate_expr((Expr *) expression, + exprType(expression), + exprTypmod(expression), + exprCollation(expression), + masterEvaluationContext); + } + else if (ShouldEvaluateExpressionType(nodeTag)) + { + if (FindNodeCheck(expression, IsVarNode)) + { + return (Node *) expression_tree_mutator(expression, + PartiallyEvaluateExpression, + masterEvaluationContext); + } + return (Node *) citus_evaluate_expr((Expr *) expression, + exprType(expression), + exprTypmod(expression), + exprCollation(expression), + masterEvaluationContext); + } + else if (nodeTag == T_Query) + { + return (Node *) query_tree_mutator((Query *) expression, + PartiallyEvaluateExpression, + masterEvaluationContext, + QTW_DONT_COPY_QUERY); + } + else + { + return (Node *) expression_tree_mutator(expression, + PartiallyEvaluateExpression, + masterEvaluationContext); + } + + return expression; +} + + +/* + * ShouldEvaluateExpressionType returns true if Citus should evaluate the + * input node on the coordinator. + */ +static bool +ShouldEvaluateExpressionType(NodeTag nodeTag) +{ + switch (nodeTag) + { case T_FuncExpr: case T_OpExpr: case T_DistinctExpr: @@ -97,36 +166,12 @@ PartiallyEvaluateExpression(Node *expression, PlanState *planState) case T_RelabelType: case T_CoerceToDomain: { - if (FindNodeCheck(expression, IsVarNode)) - { - return (Node *) expression_tree_mutator(expression, - PartiallyEvaluateExpression, - planState); - } - - return (Node *) citus_evaluate_expr((Expr *) expression, - exprType(expression), - exprTypmod(expression), - exprCollation(expression), - planState); - } - - case T_Query: - { - return (Node *) query_tree_mutator((Query *) expression, - PartiallyEvaluateExpression, - planState, QTW_DONT_COPY_QUERY); + return true; } default: - { - return (Node *) expression_tree_mutator(expression, - PartiallyEvaluateExpression, - planState); - } + return false; } - - return expression; } @@ -150,8 +195,10 @@ IsVarNode(Node *node) */ static Expr * citus_evaluate_expr(Expr *expr, Oid result_type, int32 result_typmod, - Oid result_collation, PlanState *planState) + Oid result_collation, + MasterEvaluationContext *masterEvaluationContext) { + PlanState *planState = NULL; EState *estate; ExprState *exprstate; ExprContext *econtext; @@ -160,6 +207,28 @@ citus_evaluate_expr(Expr *expr, Oid result_type, int32 result_typmod, int16 resultTypLen; bool resultTypByVal; + if (masterEvaluationContext) + { + planState = masterEvaluationContext->planState; + + if (IsA(expr, Param)) + { + if (!masterEvaluationContext->evaluateParams) + { + /* bail out, the caller doesn't want params to be evaluated */ + return expr; + } + } + else if (!masterEvaluationContext->evaluateFunctions) + { + /* should only get here for node types we should evaluate */ + Assert(ShouldEvaluateExpressionType(nodeTag(expr))); + + /* bail out, the caller doesn't want functions/expressions to be evaluated */ + return expr; + } + } + /* * To use the executor, we need an EState. */ diff --git a/src/include/distributed/citus_clauses.h b/src/include/distributed/citus_clauses.h index c8d21461e..556dce36f 100644 --- a/src/include/distributed/citus_clauses.h +++ b/src/include/distributed/citus_clauses.h @@ -15,9 +15,25 @@ #include "nodes/nodes.h" #include "nodes/parsenodes.h" + +/* + * This struct is used to pass information to master + * evaluation logic. + */ +typedef struct MasterEvaluationContext +{ + PlanState *planState; + bool evaluateParams; + bool evaluateFunctions; +} MasterEvaluationContext; + + extern bool RequiresMasterEvaluation(Query *query); -extern void ExecuteMasterEvaluableFunctions(Query *query, PlanState *planState); -extern Node * PartiallyEvaluateExpression(Node *expression, PlanState *planState); +extern void ExecuteMasterEvaluableFunctionsAndParameters(Query *query, + PlanState *planState); +extern void ExecuteMasterEvaluableParameters(Query *query, PlanState *planState); +extern Node * PartiallyEvaluateExpression(Node *expression, + MasterEvaluationContext *masterEvaluationContext); extern bool CitusIsVolatileFunction(Node *node); extern bool CitusIsMutableFunction(Node *node); diff --git a/src/test/regress/expected/master_evaluation.out b/src/test/regress/expected/master_evaluation.out new file mode 100644 index 000000000..f58632695 --- /dev/null +++ b/src/test/regress/expected/master_evaluation.out @@ -0,0 +1,129 @@ +-- This test relies on metadata being synced +-- that's why is should be executed on MX schedule +CREATE SCHEMA master_evaluation; +SET search_path TO master_evaluation; +-- create a volatile function that returns the local node id +CREATE OR REPLACE FUNCTION get_local_node_id_volatile() +RETURNS INT AS $$ +DECLARE localGroupId int; +BEGIN + SELECT groupid INTO localGroupId FROM pg_dist_local_group; + RETURN localGroupId; +END; $$ language plpgsql VOLATILE; +SELECT create_distributed_function('get_local_node_id_volatile()'); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE master_evaluation_table (key int, value int); +SELECT create_distributed_table('master_evaluation_table', 'key'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- show that local id is 0, we'll use this information +SELECT get_local_node_id_volatile(); + get_local_node_id_volatile +--------------------------------------------------------------------- + 0 +(1 row) + +-- load data such that we have 1 row per node +INSERT INTO master_evaluation_table SELECT i, 0 FROM generate_series(0,100)i; +-- we expect that the function is evaluated on the worker node, so we should get a row +SELECT get_local_node_id_volatile() > 0 FROM master_evaluation_table WHERE key = 1; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +-- make sure that it is also true for fast-path router queries with paramaters +PREPARE p1(int) AS SELECT get_local_node_id_volatile() > 0 FROM master_evaluation_table WHERE key = $1; +execute p1(1); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +execute p1(2); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +execute p1(3); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +execute p1(4); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +execute p1(5); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +execute p1(6); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +execute p1(7); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +execute p1(8); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +-- for multi-shard queries, we still expect the evaluation to happen on the workers +SELECT count(*), max(get_local_node_id_volatile()) != 0, min(get_local_node_id_volatile()) != 0 FROM master_evaluation_table; + count | ?column? | ?column? +--------------------------------------------------------------------- + 101 | t | t +(1 row) + +-- when executed locally, we expect to get the result from the coordinator +SELECT (SELECT count(*) FROM master_evaluation_table), get_local_node_id_volatile() = 0; + count | ?column? +--------------------------------------------------------------------- + 101 | t +(1 row) + +-- make sure that we get the results from the workers when the query is sent to workers +SET citus.task_assignment_policy TO "round-robin"; +SELECT (SELECT count(*) FROM master_evaluation_table), get_local_node_id_volatile() > 0; + count | ?column? +--------------------------------------------------------------------- + 101 | t +(1 row) + +RESET citus.task_assignment_policy; +-- for multi-shard SELECTs, we don't try to evaluate on the coordinator +SELECT min(get_local_node_id_volatile()) > 0 FROM master_evaluation_table; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*) FROM master_evaluation_table WHERE value >= get_local_node_id_volatile(); + count +--------------------------------------------------------------------- + 0 +(1 row) + +SET client_min_messages TO ERROR; +DROP SCHEMA master_evaluation CASCADE; diff --git a/src/test/regress/multi_mx_schedule b/src/test/regress/multi_mx_schedule index 6cdadd3c3..9a7fadb50 100644 --- a/src/test/regress/multi_mx_schedule +++ b/src/test/regress/multi_mx_schedule @@ -36,7 +36,7 @@ test: recursive_dml_queries_mx multi_mx_truncate_from_worker test: multi_mx_repartition_udt_prepare mx_foreign_key_to_reference_table test: multi_mx_repartition_join_w1 multi_mx_repartition_join_w2 multi_mx_repartition_udt_w1 multi_mx_repartition_udt_w2 test: multi_mx_metadata -test: multi_mx_call +test: multi_mx_call master_evaluation test: multi_mx_function_call_delegation test: multi_mx_modifications local_shard_execution test: multi_mx_transaction_recovery diff --git a/src/test/regress/sql/master_evaluation.sql b/src/test/regress/sql/master_evaluation.sql new file mode 100644 index 000000000..d1b7199cf --- /dev/null +++ b/src/test/regress/sql/master_evaluation.sql @@ -0,0 +1,57 @@ +-- This test relies on metadata being synced +-- that's why is should be executed on MX schedule +CREATE SCHEMA master_evaluation; +SET search_path TO master_evaluation; + +-- create a volatile function that returns the local node id +CREATE OR REPLACE FUNCTION get_local_node_id_volatile() +RETURNS INT AS $$ +DECLARE localGroupId int; +BEGIN + SELECT groupid INTO localGroupId FROM pg_dist_local_group; + RETURN localGroupId; +END; $$ language plpgsql VOLATILE; +SELECT create_distributed_function('get_local_node_id_volatile()'); + +CREATE TABLE master_evaluation_table (key int, value int); +SELECT create_distributed_table('master_evaluation_table', 'key'); + +-- show that local id is 0, we'll use this information +SELECT get_local_node_id_volatile(); + +-- load data such that we have 1 row per node +INSERT INTO master_evaluation_table SELECT i, 0 FROM generate_series(0,100)i; + +-- we expect that the function is evaluated on the worker node, so we should get a row +SELECT get_local_node_id_volatile() > 0 FROM master_evaluation_table WHERE key = 1; + +-- make sure that it is also true for fast-path router queries with paramaters +PREPARE p1(int) AS SELECT get_local_node_id_volatile() > 0 FROM master_evaluation_table WHERE key = $1; + +execute p1(1); +execute p1(2); +execute p1(3); +execute p1(4); +execute p1(5); +execute p1(6); +execute p1(7); +execute p1(8); + +-- for multi-shard queries, we still expect the evaluation to happen on the workers +SELECT count(*), max(get_local_node_id_volatile()) != 0, min(get_local_node_id_volatile()) != 0 FROM master_evaluation_table; + +-- when executed locally, we expect to get the result from the coordinator +SELECT (SELECT count(*) FROM master_evaluation_table), get_local_node_id_volatile() = 0; + +-- make sure that we get the results from the workers when the query is sent to workers +SET citus.task_assignment_policy TO "round-robin"; +SELECT (SELECT count(*) FROM master_evaluation_table), get_local_node_id_volatile() > 0; + +RESET citus.task_assignment_policy; + +-- for multi-shard SELECTs, we don't try to evaluate on the coordinator +SELECT min(get_local_node_id_volatile()) > 0 FROM master_evaluation_table; +SELECT count(*) FROM master_evaluation_table WHERE value >= get_local_node_id_volatile(); + +SET client_min_messages TO ERROR; +DROP SCHEMA master_evaluation CASCADE;