Fix issue in IsMultiStatementTransaction detection

pull/3302/head
Marco Slot 2019-12-13 12:48:38 +01:00
parent 97bfd0bba0
commit 5f656e22db
6 changed files with 139 additions and 13 deletions

View File

@ -63,6 +63,9 @@ static List *plannerRestrictionContextList = NIL;
int MultiTaskQueryLogLevel = MULTI_TASK_QUERY_INFO_OFF; /* multi-task query log level */
static uint64 NextPlanId = 1;
/* keep track of planner call stack levels */
int PlannerLevel = 0;
static bool ListContainsDistributedTableRTE(List *rangeTableList);
static bool IsUpdateOrDelete(Query *query);
@ -188,6 +191,14 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
PG_TRY();
{
/*
* We keep track of how many times we've recursed into the planner, primarily
* to detect whether we are in a function call. We need to make sure that the
* PlannerLevel is decremented exactly once at the end of this PG_TRY block,
* both in the happy case and when an error occurs.
*/
PlannerLevel++;
/*
* For trivial queries, we're skipping the standard_planner() in
* order to eliminate its overhead.
@ -242,14 +253,20 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
result->planTree->total_cost = FLT_MAX / 100000000;
}
}
PlannerLevel--;
}
PG_CATCH();
{
PopPlannerRestrictionContext();
PlannerLevel--;
PG_RE_THROW();
}
PG_END_TRY();
/* remove the context from the context list */
PopPlannerRestrictionContext();

View File

@ -229,6 +229,17 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam)
ereport(DEBUG4, (errmsg("function is distributed")));
}
/*
* This can be called while executing INSERT ... SELECT func(). insert_select_executor
* doesn't get the planned subquery and gets the actual struct Query, so the planning
* for these kinds of queries happens at the execution time.
*/
if (ExecutingInsertSelect())
{
ereport(DEBUG1, (errmsg("not pushing down function calls in INSERT ... SELECT")));
return NULL;
}
if (IsMultiStatementTransaction())
{
/* cannot delegate function calls in a multi-statement transaction */
@ -288,17 +299,6 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam)
return NULL;
}
/*
* This can be called while executing INSERT ... SELECT func(). insert_select_executor
* doesn't get the planned subquery and gets the actual struct Query, so the planning
* for these kinds of queries happens at the execution time.
*/
if (ExecutingInsertSelect())
{
ereport(DEBUG1, (errmsg("not pushing down function calls in INSERT ... SELECT")));
return NULL;
}
/*
* This can be called in queries like SELECT ... WHERE EXISTS(SELECT func()), or other
* forms of CTEs or subqueries. We don't push-down in those cases.

View File

@ -21,6 +21,7 @@
#include "access/xact.h"
#include "distributed/backend_data.h"
#include "distributed/connection_management.h"
#include "distributed/distributed_planner.h"
#include "distributed/hash_helpers.h"
#include "distributed/intermediate_results.h"
#include "distributed/local_executor.h"
@ -291,6 +292,14 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
CoordinatedTransactionUses2PC = false;
ExecutorLevel = 0;
/*
* Getting here without PlannerLevel 0 is a bug, however it is such a big
* problem that will persist between reuse of the backend we still assign 0 in
* production deploys, but during development and tests we want to crash.
*/
Assert(PlannerLevel == 0);
PlannerLevel = 0;
/*
* We should reset SubPlanLevel in case a transaction is aborted,
* otherwise this variable would stay +ve if the transaction is
@ -683,9 +692,12 @@ IsMultiStatementTransaction(void)
* MaybeExecutingUDF returns true if we are possibly executing a function call.
* We use nested level of executor to check this, so this can return true for
* CTEs, etc. which also start nested executors.
*
* If the planner is being called from the executor, then we may also be in
* a UDF.
*/
static bool
MaybeExecutingUDF(void)
{
return ExecutorLevel > 1;
return ExecutorLevel > 1 || (ExecutorLevel == 1 && PlannerLevel > 0);
}

View File

@ -32,6 +32,11 @@
#define CURSOR_OPT_FORCE_DISTRIBUTED 0x080000
/* level of planner calls */
extern int PlannerLevel;
typedef struct RelationRestrictionContext
{
bool hasDistributedRelation;

View File

@ -519,7 +519,7 @@ SELECT create_distributed_function('add_with_param_names(int, int)', '$3');
ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid
HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function()
SELECT create_distributed_function('add_with_param_names(int, int)', '$1a');
ERROR: invalid input syntax for type integer: "1a"
ERROR: invalid input syntax for integer: "1a"
-- non existing column name
SELECT create_distributed_function('add_with_param_names(int, int)', 'aaa');
ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid
@ -745,6 +745,61 @@ SELECT public.wait_until_metadata_sync();
(1 row)
SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 4;
CREATE TABLE test (id int, name text);
SELECT create_distributed_table('test','id');
create_distributed_table
--------------------------
(1 row)
INSERT INTO test VALUES (3,'three');
CREATE OR REPLACE FUNCTION increment(int)
RETURNS NUMERIC AS $$
DECLARE ret_val NUMERIC;
BEGIN
SELECT max(id)::numeric+1 INTO ret_val FROM test WHERE id = $1;
RETURN ret_val;
END;
$$ LANGUAGE plpgsql;
SELECT create_distributed_function('increment(int)', '$1', colocate_with := 'test');
create_distributed_function
-----------------------------
(1 row)
-- call a distributed function inside a pl/pgsql function
CREATE OR REPLACE FUNCTION test_func_calls_dist_func()
RETURNS NUMERIC AS $$
DECLARE incremented_val NUMERIC;
BEGIN
SELECT INTO incremented_val increment(1);
RETURN incremented_val;
END;
$$ LANGUAGE plpgsql;
SELECT test_func_calls_dist_func();
test_func_calls_dist_func
---------------------------
(1 row)
SELECT test_func_calls_dist_func();
test_func_calls_dist_func
---------------------------
(1 row)
-- test an INSERT..SELECT via the coordinator just because it is kind of funky
INSERT INTO test SELECT increment(3);
SELECT * FROM test ORDER BY id;
id | name
----+-------
3 | three
4 |
(2 rows)
DROP TABLE test;
SET client_min_messages TO error; -- suppress cascading objects dropping
DROP SCHEMA function_tests CASCADE;
DROP SCHEMA function_tests2 CASCADE;

View File

@ -422,6 +422,43 @@ SELECT create_distributed_function('add_with_param_names(int, int)', 'val1');
SELECT public.wait_until_metadata_sync();
SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 4;
CREATE TABLE test (id int, name text);
SELECT create_distributed_table('test','id');
INSERT INTO test VALUES (3,'three');
CREATE OR REPLACE FUNCTION increment(int)
RETURNS NUMERIC AS $$
DECLARE ret_val NUMERIC;
BEGIN
SELECT max(id)::numeric+1 INTO ret_val FROM test WHERE id = $1;
RETURN ret_val;
END;
$$ LANGUAGE plpgsql;
SELECT create_distributed_function('increment(int)', '$1', colocate_with := 'test');
-- call a distributed function inside a pl/pgsql function
CREATE OR REPLACE FUNCTION test_func_calls_dist_func()
RETURNS NUMERIC AS $$
DECLARE incremented_val NUMERIC;
BEGIN
SELECT INTO incremented_val increment(1);
RETURN incremented_val;
END;
$$ LANGUAGE plpgsql;
SELECT test_func_calls_dist_func();
SELECT test_func_calls_dist_func();
-- test an INSERT..SELECT via the coordinator just because it is kind of funky
INSERT INTO test SELECT increment(3);
SELECT * FROM test ORDER BY id;
DROP TABLE test;
SET client_min_messages TO error; -- suppress cascading objects dropping
DROP SCHEMA function_tests CASCADE;
DROP SCHEMA function_tests2 CASCADE;