mirror of https://github.com/citusdata/citus.git
Fix a crash when calling a distributed function from PL/pgSQL (#3302)
Fix a crash when calling a distributed function from PL/pgSQLpull/3311/head
commit
f90bbc64f6
|
@ -63,6 +63,9 @@ static List *plannerRestrictionContextList = NIL;
|
||||||
int MultiTaskQueryLogLevel = MULTI_TASK_QUERY_INFO_OFF; /* multi-task query log level */
|
int MultiTaskQueryLogLevel = MULTI_TASK_QUERY_INFO_OFF; /* multi-task query log level */
|
||||||
static uint64 NextPlanId = 1;
|
static uint64 NextPlanId = 1;
|
||||||
|
|
||||||
|
/* keep track of planner call stack levels */
|
||||||
|
int PlannerLevel = 0;
|
||||||
|
|
||||||
|
|
||||||
static bool ListContainsDistributedTableRTE(List *rangeTableList);
|
static bool ListContainsDistributedTableRTE(List *rangeTableList);
|
||||||
static bool IsUpdateOrDelete(Query *query);
|
static bool IsUpdateOrDelete(Query *query);
|
||||||
|
@ -188,6 +191,14 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
||||||
|
|
||||||
PG_TRY();
|
PG_TRY();
|
||||||
{
|
{
|
||||||
|
/*
|
||||||
|
* We keep track of how many times we've recursed into the planner, primarily
|
||||||
|
* to detect whether we are in a function call. We need to make sure that the
|
||||||
|
* PlannerLevel is decremented exactly once at the end of this PG_TRY block,
|
||||||
|
* both in the happy case and when an error occurs.
|
||||||
|
*/
|
||||||
|
PlannerLevel++;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* For trivial queries, we're skipping the standard_planner() in
|
* For trivial queries, we're skipping the standard_planner() in
|
||||||
* order to eliminate its overhead.
|
* order to eliminate its overhead.
|
||||||
|
@ -242,14 +253,20 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
||||||
result->planTree->total_cost = FLT_MAX / 100000000;
|
result->planTree->total_cost = FLT_MAX / 100000000;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
PlannerLevel--;
|
||||||
}
|
}
|
||||||
PG_CATCH();
|
PG_CATCH();
|
||||||
{
|
{
|
||||||
PopPlannerRestrictionContext();
|
PopPlannerRestrictionContext();
|
||||||
|
|
||||||
|
PlannerLevel--;
|
||||||
|
|
||||||
PG_RE_THROW();
|
PG_RE_THROW();
|
||||||
}
|
}
|
||||||
PG_END_TRY();
|
PG_END_TRY();
|
||||||
|
|
||||||
|
|
||||||
/* remove the context from the context list */
|
/* remove the context from the context list */
|
||||||
PopPlannerRestrictionContext();
|
PopPlannerRestrictionContext();
|
||||||
|
|
||||||
|
|
|
@ -229,6 +229,17 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam)
|
||||||
ereport(DEBUG4, (errmsg("function is distributed")));
|
ereport(DEBUG4, (errmsg("function is distributed")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This can be called while executing INSERT ... SELECT func(). insert_select_executor
|
||||||
|
* doesn't get the planned subquery and gets the actual struct Query, so the planning
|
||||||
|
* for these kinds of queries happens at the execution time.
|
||||||
|
*/
|
||||||
|
if (ExecutingInsertSelect())
|
||||||
|
{
|
||||||
|
ereport(DEBUG1, (errmsg("not pushing down function calls in INSERT ... SELECT")));
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
if (IsMultiStatementTransaction())
|
if (IsMultiStatementTransaction())
|
||||||
{
|
{
|
||||||
/* cannot delegate function calls in a multi-statement transaction */
|
/* cannot delegate function calls in a multi-statement transaction */
|
||||||
|
@ -288,17 +299,6 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam)
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* This can be called while executing INSERT ... SELECT func(). insert_select_executor
|
|
||||||
* doesn't get the planned subquery and gets the actual struct Query, so the planning
|
|
||||||
* for these kinds of queries happens at the execution time.
|
|
||||||
*/
|
|
||||||
if (ExecutingInsertSelect())
|
|
||||||
{
|
|
||||||
ereport(DEBUG1, (errmsg("not pushing down function calls in INSERT ... SELECT")));
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* This can be called in queries like SELECT ... WHERE EXISTS(SELECT func()), or other
|
* This can be called in queries like SELECT ... WHERE EXISTS(SELECT func()), or other
|
||||||
* forms of CTEs or subqueries. We don't push-down in those cases.
|
* forms of CTEs or subqueries. We don't push-down in those cases.
|
||||||
|
|
|
@ -21,6 +21,7 @@
|
||||||
#include "access/xact.h"
|
#include "access/xact.h"
|
||||||
#include "distributed/backend_data.h"
|
#include "distributed/backend_data.h"
|
||||||
#include "distributed/connection_management.h"
|
#include "distributed/connection_management.h"
|
||||||
|
#include "distributed/distributed_planner.h"
|
||||||
#include "distributed/hash_helpers.h"
|
#include "distributed/hash_helpers.h"
|
||||||
#include "distributed/intermediate_results.h"
|
#include "distributed/intermediate_results.h"
|
||||||
#include "distributed/local_executor.h"
|
#include "distributed/local_executor.h"
|
||||||
|
@ -291,6 +292,14 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
|
||||||
CoordinatedTransactionUses2PC = false;
|
CoordinatedTransactionUses2PC = false;
|
||||||
ExecutorLevel = 0;
|
ExecutorLevel = 0;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Getting here without PlannerLevel 0 is a bug, however it is such a big
|
||||||
|
* problem that will persist between reuse of the backend we still assign 0 in
|
||||||
|
* production deploys, but during development and tests we want to crash.
|
||||||
|
*/
|
||||||
|
Assert(PlannerLevel == 0);
|
||||||
|
PlannerLevel = 0;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We should reset SubPlanLevel in case a transaction is aborted,
|
* We should reset SubPlanLevel in case a transaction is aborted,
|
||||||
* otherwise this variable would stay +ve if the transaction is
|
* otherwise this variable would stay +ve if the transaction is
|
||||||
|
@ -683,9 +692,12 @@ IsMultiStatementTransaction(void)
|
||||||
* MaybeExecutingUDF returns true if we are possibly executing a function call.
|
* MaybeExecutingUDF returns true if we are possibly executing a function call.
|
||||||
* We use nested level of executor to check this, so this can return true for
|
* We use nested level of executor to check this, so this can return true for
|
||||||
* CTEs, etc. which also start nested executors.
|
* CTEs, etc. which also start nested executors.
|
||||||
|
*
|
||||||
|
* If the planner is being called from the executor, then we may also be in
|
||||||
|
* a UDF.
|
||||||
*/
|
*/
|
||||||
static bool
|
static bool
|
||||||
MaybeExecutingUDF(void)
|
MaybeExecutingUDF(void)
|
||||||
{
|
{
|
||||||
return ExecutorLevel > 1;
|
return ExecutorLevel > 1 || (ExecutorLevel == 1 && PlannerLevel > 0);
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,6 +32,11 @@
|
||||||
|
|
||||||
#define CURSOR_OPT_FORCE_DISTRIBUTED 0x080000
|
#define CURSOR_OPT_FORCE_DISTRIBUTED 0x080000
|
||||||
|
|
||||||
|
|
||||||
|
/* level of planner calls */
|
||||||
|
extern int PlannerLevel;
|
||||||
|
|
||||||
|
|
||||||
typedef struct RelationRestrictionContext
|
typedef struct RelationRestrictionContext
|
||||||
{
|
{
|
||||||
bool hasDistributedRelation;
|
bool hasDistributedRelation;
|
||||||
|
|
|
@ -519,7 +519,7 @@ SELECT create_distributed_function('add_with_param_names(int, int)', '$3');
|
||||||
ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid
|
ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid
|
||||||
HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function()
|
HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function()
|
||||||
SELECT create_distributed_function('add_with_param_names(int, int)', '$1a');
|
SELECT create_distributed_function('add_with_param_names(int, int)', '$1a');
|
||||||
ERROR: invalid input syntax for type integer: "1a"
|
ERROR: invalid input syntax for integer: "1a"
|
||||||
-- non existing column name
|
-- non existing column name
|
||||||
SELECT create_distributed_function('add_with_param_names(int, int)', 'aaa');
|
SELECT create_distributed_function('add_with_param_names(int, int)', 'aaa');
|
||||||
ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid
|
ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid
|
||||||
|
@ -745,6 +745,61 @@ SELECT public.wait_until_metadata_sync();
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SET citus.shard_count TO 4;
|
||||||
|
CREATE TABLE test (id int, name text);
|
||||||
|
SELECT create_distributed_table('test','id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO test VALUES (3,'three');
|
||||||
|
CREATE OR REPLACE FUNCTION increment(int)
|
||||||
|
RETURNS NUMERIC AS $$
|
||||||
|
DECLARE ret_val NUMERIC;
|
||||||
|
BEGIN
|
||||||
|
SELECT max(id)::numeric+1 INTO ret_val FROM test WHERE id = $1;
|
||||||
|
RETURN ret_val;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql;
|
||||||
|
SELECT create_distributed_function('increment(int)', '$1', colocate_with := 'test');
|
||||||
|
create_distributed_function
|
||||||
|
-----------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- call a distributed function inside a pl/pgsql function
|
||||||
|
CREATE OR REPLACE FUNCTION test_func_calls_dist_func()
|
||||||
|
RETURNS NUMERIC AS $$
|
||||||
|
DECLARE incremented_val NUMERIC;
|
||||||
|
BEGIN
|
||||||
|
SELECT INTO incremented_val increment(1);
|
||||||
|
RETURN incremented_val;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql;
|
||||||
|
SELECT test_func_calls_dist_func();
|
||||||
|
test_func_calls_dist_func
|
||||||
|
---------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT test_func_calls_dist_func();
|
||||||
|
test_func_calls_dist_func
|
||||||
|
---------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- test an INSERT..SELECT via the coordinator just because it is kind of funky
|
||||||
|
INSERT INTO test SELECT increment(3);
|
||||||
|
SELECT * FROM test ORDER BY id;
|
||||||
|
id | name
|
||||||
|
----+-------
|
||||||
|
3 | three
|
||||||
|
4 |
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
DROP TABLE test;
|
||||||
SET client_min_messages TO error; -- suppress cascading objects dropping
|
SET client_min_messages TO error; -- suppress cascading objects dropping
|
||||||
DROP SCHEMA function_tests CASCADE;
|
DROP SCHEMA function_tests CASCADE;
|
||||||
DROP SCHEMA function_tests2 CASCADE;
|
DROP SCHEMA function_tests2 CASCADE;
|
||||||
|
|
|
@ -422,6 +422,43 @@ SELECT create_distributed_function('add_with_param_names(int, int)', 'val1');
|
||||||
SELECT public.wait_until_metadata_sync();
|
SELECT public.wait_until_metadata_sync();
|
||||||
|
|
||||||
|
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SET citus.shard_count TO 4;
|
||||||
|
CREATE TABLE test (id int, name text);
|
||||||
|
SELECT create_distributed_table('test','id');
|
||||||
|
INSERT INTO test VALUES (3,'three');
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION increment(int)
|
||||||
|
RETURNS NUMERIC AS $$
|
||||||
|
DECLARE ret_val NUMERIC;
|
||||||
|
BEGIN
|
||||||
|
SELECT max(id)::numeric+1 INTO ret_val FROM test WHERE id = $1;
|
||||||
|
RETURN ret_val;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
SELECT create_distributed_function('increment(int)', '$1', colocate_with := 'test');
|
||||||
|
|
||||||
|
-- call a distributed function inside a pl/pgsql function
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION test_func_calls_dist_func()
|
||||||
|
RETURNS NUMERIC AS $$
|
||||||
|
DECLARE incremented_val NUMERIC;
|
||||||
|
BEGIN
|
||||||
|
SELECT INTO incremented_val increment(1);
|
||||||
|
RETURN incremented_val;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
SELECT test_func_calls_dist_func();
|
||||||
|
SELECT test_func_calls_dist_func();
|
||||||
|
|
||||||
|
-- test an INSERT..SELECT via the coordinator just because it is kind of funky
|
||||||
|
INSERT INTO test SELECT increment(3);
|
||||||
|
SELECT * FROM test ORDER BY id;
|
||||||
|
|
||||||
|
DROP TABLE test;
|
||||||
|
|
||||||
SET client_min_messages TO error; -- suppress cascading objects dropping
|
SET client_min_messages TO error; -- suppress cascading objects dropping
|
||||||
DROP SCHEMA function_tests CASCADE;
|
DROP SCHEMA function_tests CASCADE;
|
||||||
DROP SCHEMA function_tests2 CASCADE;
|
DROP SCHEMA function_tests2 CASCADE;
|
||||||
|
|
Loading…
Reference in New Issue