mirror of https://github.com/citusdata/citus.git
Attempt to force custom plans for prepared statements when trying to delegate function calls
We discern between PARAM_EXEC & PARAM_EXTERN:
d52eaa0948/src/include/nodes/primnodes.h (L211)
According to primnodes.h we should only run into PARAM_EXEC or PARAM_EXTERN
pull/3055/head
parent
29f1ea079b
commit
89d35e9692
|
@ -195,11 +195,21 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
DistributedPlan *delegatePlan = TryToDelegateFunctionCall(parse);
|
bool hasExternParam = false;
|
||||||
|
DistributedPlan *delegatePlan = TryToDelegateFunctionCall(parse,
|
||||||
|
&hasExternParam);
|
||||||
if (delegatePlan != NULL)
|
if (delegatePlan != NULL)
|
||||||
{
|
{
|
||||||
result = FinalizePlan(result, delegatePlan);
|
result = FinalizePlan(result, delegatePlan);
|
||||||
}
|
}
|
||||||
|
else if (hasExternParam)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* As in CreateDistributedPlannedStmt, try dissuade planner when planning
|
||||||
|
* potentially failed due to unresolved prepared statement parameters.
|
||||||
|
*/
|
||||||
|
result->planTree->total_cost = FLT_MAX / 100000000;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
PG_CATCH();
|
PG_CATCH();
|
||||||
|
|
|
@ -48,16 +48,43 @@
|
||||||
#include "utils/lsyscache.h"
|
#include "utils/lsyscache.h"
|
||||||
#include "utils/syscache.h"
|
#include "utils/syscache.h"
|
||||||
|
|
||||||
|
struct ParamWalkerContext
|
||||||
|
{
|
||||||
|
bool hasParam;
|
||||||
|
ParamKind paramKind;
|
||||||
|
};
|
||||||
|
|
||||||
static bool contain_param_walker(Node *node, void *context);
|
static bool contain_param_walker(Node *node, void *context);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* contain_param_walker scans node for Param nodes.
|
* contain_param_walker scans node for Param nodes.
|
||||||
* returns whether any such nodes found.
|
* Ignore the return value, instead check context afterwards.
|
||||||
|
*
|
||||||
|
* context is a struct ParamWalkerContext*.
|
||||||
|
* hasParam is set to true if we find a Param node.
|
||||||
|
* paramKind is set to the paramkind of the Param node if any found.
|
||||||
|
* paramKind is set to PARAM_EXEC if both PARAM_EXEC & PARAM_EXTERN are found.
|
||||||
|
*
|
||||||
|
* By time we walk, Param nodes are either PARAM_EXTERN or PARAM_EXEC.
|
||||||
*/
|
*/
|
||||||
static bool
|
static bool
|
||||||
contain_param_walker(Node *node, void *context)
|
contain_param_walker(Node *node, void *context)
|
||||||
{
|
{
|
||||||
return IsA(node, Param);
|
if (IsA(node, Param))
|
||||||
|
{
|
||||||
|
Param *paramNode = (Param *) node;
|
||||||
|
struct ParamWalkerContext *pwcontext =
|
||||||
|
(struct ParamWalkerContext *) context;
|
||||||
|
|
||||||
|
pwcontext->hasParam = true;
|
||||||
|
pwcontext->paramKind = paramNode->paramkind;
|
||||||
|
if (paramNode->paramkind == PARAM_EXEC)
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -69,7 +96,7 @@ contain_param_walker(Node *node, void *context)
|
||||||
* ... Those complex forms are handled in the coordinator.
|
* ... Those complex forms are handled in the coordinator.
|
||||||
*/
|
*/
|
||||||
DistributedPlan *
|
DistributedPlan *
|
||||||
TryToDelegateFunctionCall(Query *query)
|
TryToDelegateFunctionCall(Query *query, bool *hasExternParam)
|
||||||
{
|
{
|
||||||
FromExpr *joinTree = NULL;
|
FromExpr *joinTree = NULL;
|
||||||
List *targetList = NIL;
|
List *targetList = NIL;
|
||||||
|
@ -90,6 +117,10 @@ TryToDelegateFunctionCall(Query *query)
|
||||||
Job *job = NULL;
|
Job *job = NULL;
|
||||||
DistributedPlan *distributedPlan = NULL;
|
DistributedPlan *distributedPlan = NULL;
|
||||||
int32 groupId = 0;
|
int32 groupId = 0;
|
||||||
|
struct ParamWalkerContext walkerParamContext = { 0 };
|
||||||
|
|
||||||
|
/* set hasExternParam now in case of early exit */
|
||||||
|
*hasExternParam = false;
|
||||||
|
|
||||||
if (!CitusHasBeenLoaded() || !CheckCitusVersion(DEBUG4))
|
if (!CitusHasBeenLoaded() || !CheckCitusVersion(DEBUG4))
|
||||||
{
|
{
|
||||||
|
@ -192,13 +223,6 @@ TryToDelegateFunctionCall(Query *query)
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (expression_tree_walker((Node *) funcExpr->args, contain_param_walker, NULL))
|
|
||||||
{
|
|
||||||
ereport(DEBUG1, (errmsg("arguments in a distributed function must "
|
|
||||||
"not contain subqueries")));
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
colocatedRelationId = ColocatedTableId(procedure->colocationId);
|
colocatedRelationId = ColocatedTableId(procedure->colocationId);
|
||||||
if (colocatedRelationId == InvalidOid)
|
if (colocatedRelationId == InvalidOid)
|
||||||
{
|
{
|
||||||
|
@ -217,6 +241,19 @@ TryToDelegateFunctionCall(Query *query)
|
||||||
}
|
}
|
||||||
|
|
||||||
partitionValue = (Const *) list_nth(funcExpr->args, procedure->distributionArgIndex);
|
partitionValue = (Const *) list_nth(funcExpr->args, procedure->distributionArgIndex);
|
||||||
|
|
||||||
|
if (IsA(partitionValue, Param))
|
||||||
|
{
|
||||||
|
Param *partitionParam = (Param *) partitionValue;
|
||||||
|
|
||||||
|
if (partitionParam->paramkind == PARAM_EXTERN)
|
||||||
|
{
|
||||||
|
/* Don't log a message, we should end up here again without a parameter */
|
||||||
|
*hasExternParam = true;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (!IsA(partitionValue, Const))
|
if (!IsA(partitionValue, Const))
|
||||||
{
|
{
|
||||||
ereport(DEBUG1, (errmsg("distribution argument value must be a constant")));
|
ereport(DEBUG1, (errmsg("distribution argument value must be a constant")));
|
||||||
|
@ -282,6 +319,23 @@ TryToDelegateFunctionCall(Query *query)
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
(void) expression_tree_walker((Node *) funcExpr->args, contain_param_walker,
|
||||||
|
&walkerParamContext);
|
||||||
|
if (walkerParamContext.hasParam)
|
||||||
|
{
|
||||||
|
if (walkerParamContext.paramKind == PARAM_EXTERN)
|
||||||
|
{
|
||||||
|
/* Don't log a message, we should end up here again without a parameter */
|
||||||
|
*hasExternParam = true;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ereport(DEBUG1, (errmsg("arguments in a distributed function must "
|
||||||
|
"not contain subqueries")));
|
||||||
|
}
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
ereport(DEBUG1, (errmsg("pushing down the function call")));
|
ereport(DEBUG1, (errmsg("pushing down the function call")));
|
||||||
|
|
||||||
queryString = makeStringInfo();
|
queryString = makeStringInfo();
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
#include "distributed/multi_physical_planner.h"
|
#include "distributed/multi_physical_planner.h"
|
||||||
|
|
||||||
|
|
||||||
DistributedPlan * TryToDelegateFunctionCall(Query *query);
|
DistributedPlan * TryToDelegateFunctionCall(Query *query, bool *hasParam);
|
||||||
|
|
||||||
|
|
||||||
#endif /* FUNCTION_CALL_DELEGATION_H */
|
#endif /* FUNCTION_CALL_DELEGATION_H */
|
||||||
|
|
|
@ -606,7 +606,7 @@ SELECT * FROM mx_call_dist_table_1 WHERE id >= 40 ORDER BY id, val;
|
||||||
41 | 4
|
41 | 4
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
|
||||||
-- Prepared statements
|
-- Prepared statements. Repeat six times to test for generic plans
|
||||||
PREPARE call_plan (int, int) AS SELECT mx_call_func($1, $2);
|
PREPARE call_plan (int, int) AS SELECT mx_call_func($1, $2);
|
||||||
EXECUTE call_plan(2, 0);
|
EXECUTE call_plan(2, 0);
|
||||||
DEBUG: pushing down the function call
|
DEBUG: pushing down the function call
|
||||||
|
@ -615,6 +615,41 @@ DEBUG: pushing down the function call
|
||||||
28
|
28
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
EXECUTE call_plan(2, 0);
|
||||||
|
DEBUG: pushing down the function call
|
||||||
|
mx_call_func
|
||||||
|
--------------
|
||||||
|
28
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
EXECUTE call_plan(2, 0);
|
||||||
|
DEBUG: pushing down the function call
|
||||||
|
mx_call_func
|
||||||
|
--------------
|
||||||
|
28
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
EXECUTE call_plan(2, 0);
|
||||||
|
DEBUG: pushing down the function call
|
||||||
|
mx_call_func
|
||||||
|
--------------
|
||||||
|
28
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
EXECUTE call_plan(2, 0);
|
||||||
|
DEBUG: pushing down the function call
|
||||||
|
mx_call_func
|
||||||
|
--------------
|
||||||
|
28
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
EXECUTE call_plan(2, 0);
|
||||||
|
DEBUG: pushing down the function call
|
||||||
|
mx_call_func
|
||||||
|
--------------
|
||||||
|
28
|
||||||
|
(1 row)
|
||||||
|
|
||||||
RESET client_min_messages;
|
RESET client_min_messages;
|
||||||
\set VERBOSITY terse
|
\set VERBOSITY terse
|
||||||
DROP SCHEMA multi_mx_function_call_delegation CASCADE;
|
DROP SCHEMA multi_mx_function_call_delegation CASCADE;
|
||||||
|
|
|
@ -234,9 +234,14 @@ select mx_call_func(2, 0), mx_call_func(0, 2);
|
||||||
DO $$ BEGIN perform mx_call_func_tbl(40); END; $$;
|
DO $$ BEGIN perform mx_call_func_tbl(40); END; $$;
|
||||||
SELECT * FROM mx_call_dist_table_1 WHERE id >= 40 ORDER BY id, val;
|
SELECT * FROM mx_call_dist_table_1 WHERE id >= 40 ORDER BY id, val;
|
||||||
|
|
||||||
-- Prepared statements
|
-- Prepared statements. Repeat six times to test for generic plans
|
||||||
PREPARE call_plan (int, int) AS SELECT mx_call_func($1, $2);
|
PREPARE call_plan (int, int) AS SELECT mx_call_func($1, $2);
|
||||||
EXECUTE call_plan(2, 0);
|
EXECUTE call_plan(2, 0);
|
||||||
|
EXECUTE call_plan(2, 0);
|
||||||
|
EXECUTE call_plan(2, 0);
|
||||||
|
EXECUTE call_plan(2, 0);
|
||||||
|
EXECUTE call_plan(2, 0);
|
||||||
|
EXECUTE call_plan(2, 0);
|
||||||
|
|
||||||
RESET client_min_messages;
|
RESET client_min_messages;
|
||||||
\set VERBOSITY terse
|
\set VERBOSITY terse
|
||||||
|
|
Loading…
Reference in New Issue