mirror of https://github.com/citusdata/citus.git
Make sure to go deeper into the functions to search for PARAMs
For example, a PARAM might reside inside a function just because of a casting of a type such as the follows: ``` {FUNCEXPR :funcid 1740 :funcresulttype 1700 :funcretset false :funcvariadic false :funcformat 2 :funccollid 0 :inputcollid 0 :args ( {PARAM :paramkind 0 :paramid 15 :paramtype 23 :paramtypmod -1 :paramcollid 0 :location 356 } ) ``` We should recursively check the expression before bailing out.pull/3454/head
parent
1adc293286
commit
2f274a4fce
|
@ -34,6 +34,8 @@ static Expr * citus_evaluate_expr(Expr *expr, Oid result_type, int32 result_typm
|
|||
static bool CitusIsVolatileFunctionIdChecker(Oid func_id, void *context);
|
||||
static bool CitusIsMutableFunctionIdChecker(Oid func_id, void *context);
|
||||
static bool ShouldEvaluateExpressionType(NodeTag nodeTag);
|
||||
static bool ShouldEvaluateFunctionWithMasterContext(MasterEvaluationContext *
|
||||
evaluationContext);
|
||||
|
||||
/*
|
||||
* RequiresMastereEvaluation returns the executor needs to reparse and
|
||||
|
@ -48,7 +50,7 @@ RequiresMasterEvaluation(Query *query)
|
|||
|
||||
|
||||
/*
|
||||
* ExecuteMasterEvaluableFunctions evaluates expressions and external parameters
|
||||
* ExecuteMasterEvaluableFunctionsAndParameters evaluates expressions and parameters
|
||||
* that can be resolved to a constant.
|
||||
*/
|
||||
void
|
||||
|
@ -57,8 +59,7 @@ ExecuteMasterEvaluableFunctionsAndParameters(Query *query, PlanState *planState)
|
|||
MasterEvaluationContext masterEvaluationContext;
|
||||
|
||||
masterEvaluationContext.planState = planState;
|
||||
masterEvaluationContext.evaluateParams = true;
|
||||
masterEvaluationContext.evaluateFunctions = true;
|
||||
masterEvaluationContext.evaluationMode = EVALUATE_FUNCTIONS_PARAMS;
|
||||
|
||||
PartiallyEvaluateExpression((Node *) query, &masterEvaluationContext);
|
||||
}
|
||||
|
@ -74,8 +75,7 @@ ExecuteMasterEvaluableParameters(Query *query, PlanState *planState)
|
|||
MasterEvaluationContext masterEvaluationContext;
|
||||
|
||||
masterEvaluationContext.planState = planState;
|
||||
masterEvaluationContext.evaluateParams = true;
|
||||
masterEvaluationContext.evaluateFunctions = false;
|
||||
masterEvaluationContext.evaluationMode = EVALUATE_PARAMS;
|
||||
|
||||
PartiallyEvaluateExpression((Node *) query, &masterEvaluationContext);
|
||||
}
|
||||
|
@ -112,7 +112,8 @@ PartiallyEvaluateExpression(Node *expression,
|
|||
exprCollation(expression),
|
||||
masterEvaluationContext);
|
||||
}
|
||||
else if (ShouldEvaluateExpressionType(nodeTag))
|
||||
else if (ShouldEvaluateExpressionType(nodeTag) &&
|
||||
ShouldEvaluateFunctionWithMasterContext(masterEvaluationContext))
|
||||
{
|
||||
if (FindNodeCheck(expression, IsVarNode))
|
||||
{
|
||||
|
@ -145,6 +146,24 @@ PartiallyEvaluateExpression(Node *expression,
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ShouldEvaluateFunctionWithMasterContext is a helper function which is used to
|
||||
* decide whether the function/expression should be evaluated with the input
|
||||
* masterEvaluationContext.
|
||||
*/
|
||||
static bool
|
||||
ShouldEvaluateFunctionWithMasterContext(MasterEvaluationContext *evaluationContext)
|
||||
{
|
||||
if (evaluationContext == NULL)
|
||||
{
|
||||
/* if no context provided, evaluate, which is the default behaviour */
|
||||
return true;
|
||||
}
|
||||
|
||||
return evaluationContext->evaluationMode == EVALUATE_FUNCTIONS_PARAMS;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ShouldEvaluateExpressionType returns true if Citus should evaluate the
|
||||
* input node on the coordinator.
|
||||
|
@ -213,13 +232,13 @@ citus_evaluate_expr(Expr *expr, Oid result_type, int32 result_typmod,
|
|||
|
||||
if (IsA(expr, Param))
|
||||
{
|
||||
if (!masterEvaluationContext->evaluateParams)
|
||||
if (masterEvaluationContext->evaluationMode == EVALUATE_NONE)
|
||||
{
|
||||
/* bail out, the caller doesn't want params to be evaluated */
|
||||
return expr;
|
||||
}
|
||||
}
|
||||
else if (!masterEvaluationContext->evaluateFunctions)
|
||||
else if (masterEvaluationContext->evaluationMode != EVALUATE_FUNCTIONS_PARAMS)
|
||||
{
|
||||
/* should only get here for node types we should evaluate */
|
||||
Assert(ShouldEvaluateExpressionType(nodeTag(expr)));
|
||||
|
|
|
@ -16,6 +16,22 @@
|
|||
#include "nodes/parsenodes.h"
|
||||
|
||||
|
||||
/*
|
||||
* MasterEvaluationMode is used to signal what expressions in the query
|
||||
* should be evaluated on the coordinator.
|
||||
*/
|
||||
typedef enum MasterEvaluationMode
|
||||
{
|
||||
/* evaluate nothing */
|
||||
EVALUATE_NONE = 0,
|
||||
|
||||
/* evaluate only external parameters */
|
||||
EVALUATE_PARAMS,
|
||||
|
||||
/* evaluate both the functions/expressions and the external paramaters */
|
||||
EVALUATE_FUNCTIONS_PARAMS
|
||||
} MasterEvaluationMode;
|
||||
|
||||
/*
|
||||
* This struct is used to pass information to master
|
||||
* evaluation logic.
|
||||
|
@ -23,8 +39,7 @@
|
|||
typedef struct MasterEvaluationContext
|
||||
{
|
||||
PlanState *planState;
|
||||
bool evaluateParams;
|
||||
bool evaluateFunctions;
|
||||
MasterEvaluationMode evaluationMode;
|
||||
} MasterEvaluationContext;
|
||||
|
||||
|
||||
|
|
|
@ -16,6 +16,19 @@ SELECT create_distributed_function('get_local_node_id_volatile()');
|
|||
|
||||
(1 row)
|
||||
|
||||
CREATE OR REPLACE FUNCTION get_local_node_id_volatile_sum_with_param(int)
|
||||
RETURNS INT AS $$
|
||||
DECLARE localGroupId int;
|
||||
BEGIN
|
||||
SELECT groupid + $1 INTO localGroupId FROM pg_dist_local_group;
|
||||
RETURN localGroupId;
|
||||
END; $$ language plpgsql VOLATILE;
|
||||
SELECT create_distributed_function('get_local_node_id_volatile_sum_with_param(int)');
|
||||
create_distributed_function
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE master_evaluation_table (key int, value int);
|
||||
SELECT create_distributed_table('master_evaluation_table', 'key');
|
||||
create_distributed_table
|
||||
|
@ -30,8 +43,8 @@ SELECT get_local_node_id_volatile();
|
|||
0
|
||||
(1 row)
|
||||
|
||||
-- load data such that we have 1 row per node
|
||||
INSERT INTO master_evaluation_table SELECT i, 0 FROM generate_series(0,100)i;
|
||||
-- load data
|
||||
INSERT INTO master_evaluation_table SELECT i, i FROM generate_series(0,100)i;
|
||||
-- we expect that the function is evaluated on the worker node, so we should get a row
|
||||
SELECT get_local_node_id_volatile() > 0 FROM master_evaluation_table WHERE key = 1;
|
||||
?column?
|
||||
|
@ -105,10 +118,10 @@ SELECT (SELECT count(*) FROM master_evaluation_table), get_local_node_id_volatil
|
|||
|
||||
-- make sure that we get the results from the workers when the query is sent to workers
|
||||
SET citus.task_assignment_policy TO "round-robin";
|
||||
SELECT (SELECT count(*) FROM master_evaluation_table), get_local_node_id_volatile() > 0;
|
||||
SELECT (SELECT count(*) FROM master_evaluation_table), get_local_node_id_volatile() = 0;
|
||||
count | ?column?
|
||||
---------------------------------------------------------------------
|
||||
101 | t
|
||||
101 | f
|
||||
(1 row)
|
||||
|
||||
RESET citus.task_assignment_policy;
|
||||
|
@ -119,11 +132,402 @@ SELECT min(get_local_node_id_volatile()) > 0 FROM master_evaluation_table;
|
|||
t
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM master_evaluation_table WHERE value >= get_local_node_id_volatile();
|
||||
count
|
||||
SELECT count(*) > 0 FROM master_evaluation_table WHERE value >= get_local_node_id_volatile();
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
-- let's have some tests around expressions
|
||||
-- for modifications, we expect the evaluation to happen on the coordinator
|
||||
-- thus the results should be 0
|
||||
PREPARE insert_with_param_expression(int) AS INSERT INTO master_evaluation_table (key, value) VALUES ($1 + get_local_node_id_volatile(), $1 + get_local_node_id_volatile()) RETURNING key, value;
|
||||
EXECUTE insert_with_param_expression(0);
|
||||
key | value
|
||||
---------------------------------------------------------------------
|
||||
0 | 0
|
||||
(1 row)
|
||||
|
||||
EXECUTE insert_with_param_expression(0);
|
||||
key | value
|
||||
---------------------------------------------------------------------
|
||||
0 | 0
|
||||
(1 row)
|
||||
|
||||
EXECUTE insert_with_param_expression(0);
|
||||
key | value
|
||||
---------------------------------------------------------------------
|
||||
0 | 0
|
||||
(1 row)
|
||||
|
||||
EXECUTE insert_with_param_expression(0);
|
||||
key | value
|
||||
---------------------------------------------------------------------
|
||||
0 | 0
|
||||
(1 row)
|
||||
|
||||
EXECUTE insert_with_param_expression(0);
|
||||
key | value
|
||||
---------------------------------------------------------------------
|
||||
0 | 0
|
||||
(1 row)
|
||||
|
||||
EXECUTE insert_with_param_expression(0);
|
||||
key | value
|
||||
---------------------------------------------------------------------
|
||||
0 | 0
|
||||
(1 row)
|
||||
|
||||
EXECUTE insert_with_param_expression(0);
|
||||
key | value
|
||||
---------------------------------------------------------------------
|
||||
0 | 0
|
||||
(1 row)
|
||||
|
||||
-- for modifications, we expect the evaluation to happen on the coordinator
|
||||
-- thus the results should be 0
|
||||
PREPARE insert_with_param(int) AS INSERT INTO master_evaluation_table (key, value) VALUES ($1, $1) RETURNING key, value;
|
||||
EXECUTE insert_with_param(0 + get_local_node_id_volatile());
|
||||
key | value
|
||||
---------------------------------------------------------------------
|
||||
0 | 0
|
||||
(1 row)
|
||||
|
||||
EXECUTE insert_with_param(0 + get_local_node_id_volatile());
|
||||
key | value
|
||||
---------------------------------------------------------------------
|
||||
0 | 0
|
||||
(1 row)
|
||||
|
||||
EXECUTE insert_with_param(0 + get_local_node_id_volatile());
|
||||
key | value
|
||||
---------------------------------------------------------------------
|
||||
0 | 0
|
||||
(1 row)
|
||||
|
||||
EXECUTE insert_with_param(0 + get_local_node_id_volatile());
|
||||
key | value
|
||||
---------------------------------------------------------------------
|
||||
0 | 0
|
||||
(1 row)
|
||||
|
||||
EXECUTE insert_with_param(0 + get_local_node_id_volatile());
|
||||
key | value
|
||||
---------------------------------------------------------------------
|
||||
0 | 0
|
||||
(1 row)
|
||||
|
||||
EXECUTE insert_with_param(0 + get_local_node_id_volatile());
|
||||
key | value
|
||||
---------------------------------------------------------------------
|
||||
0 | 0
|
||||
(1 row)
|
||||
|
||||
EXECUTE insert_with_param(0 + get_local_node_id_volatile());
|
||||
key | value
|
||||
---------------------------------------------------------------------
|
||||
0 | 0
|
||||
(1 row)
|
||||
|
||||
PREPARE router_select_with_param_expression(int) AS SELECT value > 0 FROM master_evaluation_table WHERE key = $1 + get_local_node_id_volatile();
|
||||
-- for selects, we expect the evaluation to happen on the workers
|
||||
-- this means that the query should be hitting multiple workers
|
||||
SET client_min_messages TO DEBUG2;
|
||||
EXECUTE router_select_with_param_expression(0);
|
||||
DEBUG: Router planner cannot handle multi-shard select queries
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
EXECUTE router_select_with_param_expression(0);
|
||||
DEBUG: Router planner cannot handle multi-shard select queries
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
EXECUTE router_select_with_param_expression(0);
|
||||
DEBUG: Router planner cannot handle multi-shard select queries
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
EXECUTE router_select_with_param_expression(0);
|
||||
DEBUG: Router planner cannot handle multi-shard select queries
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
EXECUTE router_select_with_param_expression(0);
|
||||
DEBUG: Router planner cannot handle multi-shard select queries
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
EXECUTE router_select_with_param_expression(0);
|
||||
DEBUG: Router planner cannot handle multi-shard select queries
|
||||
DEBUG: Router planner cannot handle multi-shard select queries
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
EXECUTE router_select_with_param_expression(0);
|
||||
DEBUG: Router planner cannot handle multi-shard select queries
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
EXECUTE router_select_with_param_expression(0);
|
||||
DEBUG: Router planner cannot handle multi-shard select queries
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
PREPARE router_select_with_param(int) AS SELECT DISTINCT value FROM master_evaluation_table WHERE key = $1;
|
||||
-- this time the parameter itself is a function, so should be evaluated
|
||||
-- on the coordinator
|
||||
EXECUTE router_select_with_param(0 + get_local_node_id_volatile());
|
||||
DEBUG: Deferred pruning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
value
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
EXECUTE router_select_with_param(0 + get_local_node_id_volatile());
|
||||
DEBUG: Deferred pruning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
value
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
EXECUTE router_select_with_param(0 + get_local_node_id_volatile());
|
||||
DEBUG: Deferred pruning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
value
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
EXECUTE router_select_with_param(0 + get_local_node_id_volatile());
|
||||
DEBUG: Deferred pruning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
value
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
EXECUTE router_select_with_param(0 + get_local_node_id_volatile());
|
||||
DEBUG: Deferred pruning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
value
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
EXECUTE router_select_with_param(0 + get_local_node_id_volatile());
|
||||
DEBUG: Deferred pruning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
value
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
EXECUTE router_select_with_param(0 + get_local_node_id_volatile());
|
||||
value
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- same calls with functions as the parametres only
|
||||
EXECUTE router_select_with_param(get_local_node_id_volatile());
|
||||
value
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
EXECUTE router_select_with_param(get_local_node_id_volatile());
|
||||
value
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
EXECUTE router_select_with_param(get_local_node_id_volatile());
|
||||
value
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
EXECUTE router_select_with_param(get_local_node_id_volatile());
|
||||
value
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
EXECUTE router_select_with_param(get_local_node_id_volatile());
|
||||
value
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
EXECUTE router_select_with_param(get_local_node_id_volatile());
|
||||
value
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
EXECUTE router_select_with_param(get_local_node_id_volatile());
|
||||
value
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- this time use the parameter inside the function
|
||||
PREPARE router_select_with_parameter_in_function(int) AS SELECT bool_and(get_local_node_id_volatile_sum_with_param($1) > 1) FROM master_evaluation_table WHERE key = get_local_node_id_volatile_sum_with_param($1);
|
||||
EXECUTE router_select_with_parameter_in_function(0);
|
||||
DEBUG: Router planner cannot handle multi-shard select queries
|
||||
bool_and
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
EXECUTE router_select_with_parameter_in_function(0);
|
||||
DEBUG: Router planner cannot handle multi-shard select queries
|
||||
bool_and
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
EXECUTE router_select_with_parameter_in_function(0);
|
||||
DEBUG: Router planner cannot handle multi-shard select queries
|
||||
bool_and
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
EXECUTE router_select_with_parameter_in_function(0);
|
||||
DEBUG: Router planner cannot handle multi-shard select queries
|
||||
bool_and
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
EXECUTE router_select_with_parameter_in_function(0);
|
||||
DEBUG: Router planner cannot handle multi-shard select queries
|
||||
bool_and
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
EXECUTE router_select_with_parameter_in_function(0);
|
||||
DEBUG: Router planner cannot handle multi-shard select queries
|
||||
DEBUG: Router planner cannot handle multi-shard select queries
|
||||
bool_and
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
EXECUTE router_select_with_parameter_in_function(0);
|
||||
DEBUG: Router planner cannot handle multi-shard select queries
|
||||
bool_and
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
RESET client_min_messages;
|
||||
RESET citus.log_remote_commands;
|
||||
-- numeric has different casting affects, so some tests on that
|
||||
CREATE TABLE master_evaluation_table_2 (key numeric, value numeric);
|
||||
SELECT create_distributed_table('master_evaluation_table_2', 'key');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE OR REPLACE FUNCTION TEST_RANDOM (INTEGER, INTEGER) RETURNS INTEGER AS $$
|
||||
DECLARE
|
||||
start_int ALIAS FOR $1;
|
||||
end_int ALIAS FOR $2;
|
||||
BEGIN
|
||||
RETURN trunc(random() * (end_int-start_int) + start_int);
|
||||
END;
|
||||
$$ LANGUAGE 'plpgsql' STRICT;
|
||||
CREATE OR REPLACE PROCEDURE master_evaluation.test_procedure(int)
|
||||
LANGUAGE plpgsql
|
||||
AS $procedure$
|
||||
DECLARE filterKey INTEGER;
|
||||
BEGIN
|
||||
filterKey := round(master_evaluation.TEST_RANDOM(1,1)) + $1;
|
||||
PERFORM DISTINCT value FROM master_evaluation_table_2 WHERE key = filterKey;
|
||||
END;
|
||||
$procedure$;
|
||||
-- we couldn't find a meaningful query to write for this
|
||||
-- however this query fails before https://github.com/citusdata/citus/pull/3454
|
||||
SET client_min_messages TO DEBUG2;
|
||||
\set VERBOSITY TERSE
|
||||
CALL test_procedure(100);
|
||||
DEBUG: Deferred pruning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
CALL test_procedure(100);
|
||||
DEBUG: Deferred pruning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
CALL test_procedure(100);
|
||||
DEBUG: Deferred pruning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
CALL test_procedure(100);
|
||||
DEBUG: Deferred pruning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
CALL test_procedure(100);
|
||||
DEBUG: Deferred pruning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
CALL test_procedure(100);
|
||||
DEBUG: Deferred pruning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
CALL test_procedure(100);
|
||||
CREATE OR REPLACE PROCEDURE master_evaluation.test_procedure_2(int)
|
||||
LANGUAGE plpgsql
|
||||
AS $procedure$
|
||||
DECLARE filterKey INTEGER;
|
||||
BEGIN
|
||||
filterKey := round(master_evaluation.TEST_RANDOM(1,1)) + $1;
|
||||
INSERT INTO master_evaluation_table_2 VALUES (filterKey, filterKey);
|
||||
END;
|
||||
$procedure$;
|
||||
RESET citus.log_remote_commands ;
|
||||
RESET client_min_messages;
|
||||
-- these calls would INSERT key = 101, so test if insert succeeded
|
||||
CALL test_procedure_2(100);
|
||||
CALL test_procedure_2(100);
|
||||
CALL test_procedure_2(100);
|
||||
CALL test_procedure_2(100);
|
||||
CALL test_procedure_2(100);
|
||||
CALL test_procedure_2(100);
|
||||
CALL test_procedure_2(100);
|
||||
SELECT count(*) FROM master_evaluation_table_2 WHERE key = 101;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
7
|
||||
(1 row)
|
||||
|
||||
SET client_min_messages TO ERROR;
|
||||
DROP SCHEMA master_evaluation CASCADE;
|
||||
|
|
|
@ -13,14 +13,24 @@ BEGIN
|
|||
END; $$ language plpgsql VOLATILE;
|
||||
SELECT create_distributed_function('get_local_node_id_volatile()');
|
||||
|
||||
CREATE OR REPLACE FUNCTION get_local_node_id_volatile_sum_with_param(int)
|
||||
RETURNS INT AS $$
|
||||
DECLARE localGroupId int;
|
||||
BEGIN
|
||||
SELECT groupid + $1 INTO localGroupId FROM pg_dist_local_group;
|
||||
RETURN localGroupId;
|
||||
END; $$ language plpgsql VOLATILE;
|
||||
SELECT create_distributed_function('get_local_node_id_volatile_sum_with_param(int)');
|
||||
|
||||
|
||||
CREATE TABLE master_evaluation_table (key int, value int);
|
||||
SELECT create_distributed_table('master_evaluation_table', 'key');
|
||||
|
||||
-- show that local id is 0, we'll use this information
|
||||
SELECT get_local_node_id_volatile();
|
||||
|
||||
-- load data such that we have 1 row per node
|
||||
INSERT INTO master_evaluation_table SELECT i, 0 FROM generate_series(0,100)i;
|
||||
-- load data
|
||||
INSERT INTO master_evaluation_table SELECT i, i FROM generate_series(0,100)i;
|
||||
|
||||
-- we expect that the function is evaluated on the worker node, so we should get a row
|
||||
SELECT get_local_node_id_volatile() > 0 FROM master_evaluation_table WHERE key = 1;
|
||||
|
@ -45,13 +55,143 @@ SELECT (SELECT count(*) FROM master_evaluation_table), get_local_node_id_volatil
|
|||
|
||||
-- make sure that we get the results from the workers when the query is sent to workers
|
||||
SET citus.task_assignment_policy TO "round-robin";
|
||||
SELECT (SELECT count(*) FROM master_evaluation_table), get_local_node_id_volatile() > 0;
|
||||
SELECT (SELECT count(*) FROM master_evaluation_table), get_local_node_id_volatile() = 0;
|
||||
|
||||
RESET citus.task_assignment_policy;
|
||||
|
||||
-- for multi-shard SELECTs, we don't try to evaluate on the coordinator
|
||||
SELECT min(get_local_node_id_volatile()) > 0 FROM master_evaluation_table;
|
||||
SELECT count(*) FROM master_evaluation_table WHERE value >= get_local_node_id_volatile();
|
||||
SELECT count(*) > 0 FROM master_evaluation_table WHERE value >= get_local_node_id_volatile();
|
||||
|
||||
-- let's have some tests around expressions
|
||||
|
||||
-- for modifications, we expect the evaluation to happen on the coordinator
|
||||
-- thus the results should be 0
|
||||
PREPARE insert_with_param_expression(int) AS INSERT INTO master_evaluation_table (key, value) VALUES ($1 + get_local_node_id_volatile(), $1 + get_local_node_id_volatile()) RETURNING key, value;
|
||||
EXECUTE insert_with_param_expression(0);
|
||||
EXECUTE insert_with_param_expression(0);
|
||||
EXECUTE insert_with_param_expression(0);
|
||||
EXECUTE insert_with_param_expression(0);
|
||||
EXECUTE insert_with_param_expression(0);
|
||||
EXECUTE insert_with_param_expression(0);
|
||||
EXECUTE insert_with_param_expression(0);
|
||||
|
||||
-- for modifications, we expect the evaluation to happen on the coordinator
|
||||
-- thus the results should be 0
|
||||
PREPARE insert_with_param(int) AS INSERT INTO master_evaluation_table (key, value) VALUES ($1, $1) RETURNING key, value;
|
||||
EXECUTE insert_with_param(0 + get_local_node_id_volatile());
|
||||
EXECUTE insert_with_param(0 + get_local_node_id_volatile());
|
||||
EXECUTE insert_with_param(0 + get_local_node_id_volatile());
|
||||
EXECUTE insert_with_param(0 + get_local_node_id_volatile());
|
||||
EXECUTE insert_with_param(0 + get_local_node_id_volatile());
|
||||
EXECUTE insert_with_param(0 + get_local_node_id_volatile());
|
||||
EXECUTE insert_with_param(0 + get_local_node_id_volatile());
|
||||
|
||||
PREPARE router_select_with_param_expression(int) AS SELECT value > 0 FROM master_evaluation_table WHERE key = $1 + get_local_node_id_volatile();
|
||||
|
||||
-- for selects, we expect the evaluation to happen on the workers
|
||||
-- this means that the query should be hitting multiple workers
|
||||
SET client_min_messages TO DEBUG2;
|
||||
EXECUTE router_select_with_param_expression(0);
|
||||
EXECUTE router_select_with_param_expression(0);
|
||||
EXECUTE router_select_with_param_expression(0);
|
||||
EXECUTE router_select_with_param_expression(0);
|
||||
EXECUTE router_select_with_param_expression(0);
|
||||
EXECUTE router_select_with_param_expression(0);
|
||||
EXECUTE router_select_with_param_expression(0);
|
||||
EXECUTE router_select_with_param_expression(0);
|
||||
|
||||
PREPARE router_select_with_param(int) AS SELECT DISTINCT value FROM master_evaluation_table WHERE key = $1;
|
||||
|
||||
-- this time the parameter itself is a function, so should be evaluated
|
||||
-- on the coordinator
|
||||
EXECUTE router_select_with_param(0 + get_local_node_id_volatile());
|
||||
EXECUTE router_select_with_param(0 + get_local_node_id_volatile());
|
||||
EXECUTE router_select_with_param(0 + get_local_node_id_volatile());
|
||||
EXECUTE router_select_with_param(0 + get_local_node_id_volatile());
|
||||
EXECUTE router_select_with_param(0 + get_local_node_id_volatile());
|
||||
EXECUTE router_select_with_param(0 + get_local_node_id_volatile());
|
||||
EXECUTE router_select_with_param(0 + get_local_node_id_volatile());
|
||||
|
||||
-- same calls with functions as the parametres only
|
||||
EXECUTE router_select_with_param(get_local_node_id_volatile());
|
||||
EXECUTE router_select_with_param(get_local_node_id_volatile());
|
||||
EXECUTE router_select_with_param(get_local_node_id_volatile());
|
||||
EXECUTE router_select_with_param(get_local_node_id_volatile());
|
||||
EXECUTE router_select_with_param(get_local_node_id_volatile());
|
||||
EXECUTE router_select_with_param(get_local_node_id_volatile());
|
||||
EXECUTE router_select_with_param(get_local_node_id_volatile());
|
||||
|
||||
-- this time use the parameter inside the function
|
||||
PREPARE router_select_with_parameter_in_function(int) AS SELECT bool_and(get_local_node_id_volatile_sum_with_param($1) > 1) FROM master_evaluation_table WHERE key = get_local_node_id_volatile_sum_with_param($1);
|
||||
EXECUTE router_select_with_parameter_in_function(0);
|
||||
EXECUTE router_select_with_parameter_in_function(0);
|
||||
EXECUTE router_select_with_parameter_in_function(0);
|
||||
EXECUTE router_select_with_parameter_in_function(0);
|
||||
EXECUTE router_select_with_parameter_in_function(0);
|
||||
EXECUTE router_select_with_parameter_in_function(0);
|
||||
EXECUTE router_select_with_parameter_in_function(0);
|
||||
|
||||
RESET client_min_messages;
|
||||
RESET citus.log_remote_commands;
|
||||
|
||||
-- numeric has different casting affects, so some tests on that
|
||||
CREATE TABLE master_evaluation_table_2 (key numeric, value numeric);
|
||||
SELECT create_distributed_table('master_evaluation_table_2', 'key');
|
||||
|
||||
CREATE OR REPLACE FUNCTION TEST_RANDOM (INTEGER, INTEGER) RETURNS INTEGER AS $$
|
||||
DECLARE
|
||||
start_int ALIAS FOR $1;
|
||||
end_int ALIAS FOR $2;
|
||||
BEGIN
|
||||
RETURN trunc(random() * (end_int-start_int) + start_int);
|
||||
END;
|
||||
$$ LANGUAGE 'plpgsql' STRICT;
|
||||
|
||||
CREATE OR REPLACE PROCEDURE master_evaluation.test_procedure(int)
|
||||
LANGUAGE plpgsql
|
||||
AS $procedure$
|
||||
DECLARE filterKey INTEGER;
|
||||
BEGIN
|
||||
filterKey := round(master_evaluation.TEST_RANDOM(1,1)) + $1;
|
||||
PERFORM DISTINCT value FROM master_evaluation_table_2 WHERE key = filterKey;
|
||||
END;
|
||||
$procedure$;
|
||||
|
||||
-- we couldn't find a meaningful query to write for this
|
||||
-- however this query fails before https://github.com/citusdata/citus/pull/3454
|
||||
SET client_min_messages TO DEBUG2;
|
||||
\set VERBOSITY TERSE
|
||||
CALL test_procedure(100);
|
||||
CALL test_procedure(100);
|
||||
CALL test_procedure(100);
|
||||
CALL test_procedure(100);
|
||||
CALL test_procedure(100);
|
||||
CALL test_procedure(100);
|
||||
CALL test_procedure(100);
|
||||
|
||||
CREATE OR REPLACE PROCEDURE master_evaluation.test_procedure_2(int)
|
||||
LANGUAGE plpgsql
|
||||
AS $procedure$
|
||||
DECLARE filterKey INTEGER;
|
||||
BEGIN
|
||||
filterKey := round(master_evaluation.TEST_RANDOM(1,1)) + $1;
|
||||
INSERT INTO master_evaluation_table_2 VALUES (filterKey, filterKey);
|
||||
END;
|
||||
$procedure$;
|
||||
|
||||
RESET citus.log_remote_commands ;
|
||||
RESET client_min_messages;
|
||||
|
||||
-- these calls would INSERT key = 101, so test if insert succeeded
|
||||
CALL test_procedure_2(100);
|
||||
CALL test_procedure_2(100);
|
||||
CALL test_procedure_2(100);
|
||||
CALL test_procedure_2(100);
|
||||
CALL test_procedure_2(100);
|
||||
CALL test_procedure_2(100);
|
||||
CALL test_procedure_2(100);
|
||||
SELECT count(*) FROM master_evaluation_table_2 WHERE key = 101;
|
||||
|
||||
SET client_min_messages TO ERROR;
|
||||
DROP SCHEMA master_evaluation CASCADE;
|
||||
|
|
Loading…
Reference in New Issue