mirror of https://github.com/citusdata/citus.git
Merge pull request #3059 from citusdata/cte_function_calls
Don't push down queries when in subqueries/ctespull/3058/head
commit
1989ff85b5
|
@ -38,6 +38,11 @@
|
|||
#include "utils/snapmgr.h"
|
||||
|
||||
|
||||
/* depth of current insert/select executor. */
|
||||
static int insertSelectExecutorLevel = 0;
|
||||
|
||||
|
||||
static TupleTableSlot * CoordinatorInsertSelectExecScanInternal(CustomScanState *node);
|
||||
static void ExecuteSelectIntoRelation(Oid targetRelationId, List *insertTargetList,
|
||||
Query *selectQuery, EState *executorState);
|
||||
static HTAB * ExecuteSelectIntoColocatedIntermediateResults(Oid targetRelationId,
|
||||
|
@ -51,14 +56,41 @@ static List * BuildColumnNameListFromTargetList(Oid targetRelationId,
|
|||
static int PartitionColumnIndexFromColumnList(Oid relationId, List *columnNameList);
|
||||
|
||||
|
||||
/*
|
||||
* CoordinatorInsertSelectExecScan is a wrapper around
|
||||
* CoordinatorInsertSelectExecScanInternal which also properly increments
|
||||
* or decrements insertSelectExecutorLevel.
|
||||
*/
|
||||
TupleTableSlot *
|
||||
CoordinatorInsertSelectExecScan(CustomScanState *node)
|
||||
{
|
||||
TupleTableSlot *result = NULL;
|
||||
insertSelectExecutorLevel++;
|
||||
|
||||
PG_TRY();
|
||||
{
|
||||
result = CoordinatorInsertSelectExecScanInternal(node);
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
insertSelectExecutorLevel--;
|
||||
PG_RE_THROW();
|
||||
}
|
||||
PG_END_TRY();
|
||||
|
||||
insertSelectExecutorLevel--;
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CoordinatorInsertSelectExecScan executes an INSERT INTO distributed_table
|
||||
* SELECT .. query by setting up a DestReceiver that copies tuples into the
|
||||
* distributed table and then executing the SELECT query using that DestReceiver
|
||||
* as the tuple destination.
|
||||
*/
|
||||
TupleTableSlot *
|
||||
CoordinatorInsertSelectExecScan(CustomScanState *node)
|
||||
static TupleTableSlot *
|
||||
CoordinatorInsertSelectExecScanInternal(CustomScanState *node)
|
||||
{
|
||||
CitusScanState *scanState = (CitusScanState *) node;
|
||||
TupleTableSlot *resultSlot = NULL;
|
||||
|
@ -343,3 +375,11 @@ PartitionColumnIndexFromColumnList(Oid relationId, List *columnNameList)
|
|||
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
/* ExecutingInsertSelect returns true if we are executing an INSERT ...SELECT query */
|
||||
bool
|
||||
ExecutingInsertSelect(void)
|
||||
{
|
||||
return insertSelectExecutorLevel > 0;
|
||||
}
|
||||
|
|
|
@ -22,6 +22,8 @@
|
|||
#include "distributed/commands/multi_copy.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/function_call_delegation.h"
|
||||
#include "distributed/insert_select_planner.h"
|
||||
#include "distributed/insert_select_executor.h"
|
||||
#include "distributed/master_metadata_utility.h"
|
||||
#include "distributed/master_protocol.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
|
@ -29,6 +31,7 @@
|
|||
#include "distributed/multi_physical_planner.h"
|
||||
#include "distributed/remote_commands.h"
|
||||
#include "distributed/shard_pruning.h"
|
||||
#include "distributed/recursive_planning.h"
|
||||
#include "distributed/version_compat.h"
|
||||
#include "distributed/worker_manager.h"
|
||||
#include "nodes/makefuncs.h"
|
||||
|
@ -220,6 +223,28 @@ TryToDelegateFunctionCall(Query *query)
|
|||
return NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
* This can be called while executing INSERT ... SELECT func(). insert_select_executor
|
||||
* doesn't get the planned subquery and gets the actual struct Query, so the planning
|
||||
* for these kinds of queries happens at the execution time.
|
||||
*/
|
||||
if (ExecutingInsertSelect())
|
||||
{
|
||||
ereport(DEBUG1, (errmsg("not pushing down function calls in INSERT ... SELECT")));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
* This can be called in queries like SELECT ... WHERE EXISTS(SELECT func()), or other
|
||||
* forms of CTEs or subqueries. We don't push-down in those cases.
|
||||
*/
|
||||
if (GeneratingSubplans())
|
||||
{
|
||||
ereport(DEBUG1, (errmsg(
|
||||
"not pushing down function calls in CTEs or Subqueries")));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
partitionValueDatum = partitionValue->constvalue;
|
||||
|
||||
if (partitionValue->consttype != partitionColumn->vartype)
|
||||
|
|
|
@ -87,6 +87,9 @@
|
|||
#include "utils/guc.h"
|
||||
|
||||
|
||||
/* track depth of current recursive planner query */
|
||||
static int recursivePlanningDepth = 0;
|
||||
|
||||
/*
|
||||
* RecursivePlanningContext is used to recursively plan subqueries
|
||||
* and CTEs, pull results to the coordinator, and push it back into
|
||||
|
@ -186,6 +189,8 @@ GenerateSubplansForSubqueriesAndCTEs(uint64 planId, Query *originalQuery,
|
|||
RecursivePlanningContext context;
|
||||
DeferredErrorMessage *error = NULL;
|
||||
|
||||
recursivePlanningDepth++;
|
||||
|
||||
/*
|
||||
* Plan subqueries and CTEs that cannot be pushed down by recursively
|
||||
* calling the planner and add the resulting plans to subPlanList.
|
||||
|
@ -213,6 +218,7 @@ GenerateSubplansForSubqueriesAndCTEs(uint64 planId, Query *originalQuery,
|
|||
error = RecursivelyPlanSubqueriesAndCTEs(originalQuery, &context);
|
||||
if (error != NULL)
|
||||
{
|
||||
recursivePlanningDepth--;
|
||||
RaiseDeferredError(error, ERROR);
|
||||
}
|
||||
|
||||
|
@ -227,6 +233,8 @@ GenerateSubplansForSubqueriesAndCTEs(uint64 planId, Query *originalQuery,
|
|||
ApplyLogRedaction(subPlanString->data))));
|
||||
}
|
||||
|
||||
recursivePlanningDepth--;
|
||||
|
||||
return context.subPlanList;
|
||||
}
|
||||
|
||||
|
@ -1734,3 +1742,14 @@ GenerateResultId(uint64 planId, uint32 subPlanId)
|
|||
|
||||
return resultId->data;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GeneratingSubplans returns true if we are currently in the process of
|
||||
* generating subplans.
|
||||
*/
|
||||
bool
|
||||
GeneratingSubplans(void)
|
||||
{
|
||||
return recursivePlanningDepth > 0;
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
|
||||
extern TupleTableSlot * CoordinatorInsertSelectExecScan(CustomScanState *node);
|
||||
extern bool ExecutingInsertSelect(void);
|
||||
|
||||
|
||||
#endif /* INSERT_SELECT_EXECUTOR_H */
|
||||
|
|
|
@ -21,13 +21,12 @@
|
|||
#include "nodes/relation.h"
|
||||
#endif
|
||||
|
||||
|
||||
extern List * GenerateSubplansForSubqueriesAndCTEs(uint64 planId, Query *originalQuery,
|
||||
PlannerRestrictionContext *
|
||||
plannerRestrictionContext);
|
||||
extern char * GenerateResultId(uint64 planId, uint32 subPlanId);
|
||||
extern Query * BuildSubPlanResultQuery(List *targetEntryList, List *columnAliasList,
|
||||
char *resultId);
|
||||
|
||||
extern bool GeneratingSubplans(void);
|
||||
|
||||
#endif /* RECURSIVE_PLANNING_H */
|
||||
|
|
|
@ -371,6 +371,88 @@ DETAIL: WARNING from localhost:57638
|
|||
ERROR: error
|
||||
CONTEXT: while executing command on localhost:57638
|
||||
PL/pgSQL function multi_mx_function_call_delegation.mx_call_func_raise(integer) line 4 at RAISE
|
||||
-- Don't push-down when doing INSERT INTO ... SELECT func();
|
||||
SET client_min_messages TO ERROR;
|
||||
CREATE TABLE test (x int primary key);
|
||||
SELECT create_distributed_table('test','x');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE OR REPLACE FUNCTION delegated_function(a int)
|
||||
RETURNS int
|
||||
LANGUAGE plpgsql
|
||||
AS $function$
|
||||
DECLARE
|
||||
BEGIN
|
||||
INSERT INTO multi_mx_function_call_delegation.test VALUES (a);
|
||||
INSERT INTO multi_mx_function_call_delegation.test VALUES (a + 1);
|
||||
RETURN a+2;
|
||||
END;
|
||||
$function$;
|
||||
SELECT create_distributed_function('delegated_function(int)', 'a');
|
||||
create_distributed_function
|
||||
-----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SET client_min_messages TO DEBUG1;
|
||||
INSERT INTO test SELECT delegated_function(1);
|
||||
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
|
||||
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||
DEBUG: not pushing down function calls in INSERT ... SELECT
|
||||
-- Don't push down in subqueries or CTEs.
|
||||
SELECT * FROM test WHERE not exists(
|
||||
SELECT delegated_function(4)
|
||||
);
|
||||
DEBUG: not pushing down function calls in CTEs or Subqueries
|
||||
DEBUG: generating subplan 30_1 for subquery SELECT multi_mx_function_call_delegation.delegated_function(4) AS delegated_function
|
||||
DEBUG: Plan 30 query after replacing subqueries and CTEs: SELECT x FROM multi_mx_function_call_delegation.test WHERE (NOT (EXISTS (SELECT intermediate_result.delegated_function FROM read_intermediate_result('30_1'::text, 'binary'::citus_copy_format) intermediate_result(delegated_function integer))))
|
||||
x
|
||||
---
|
||||
(0 rows)
|
||||
|
||||
WITH r AS (
|
||||
SELECT delegated_function(7)
|
||||
) SELECT * FROM test WHERE (SELECT count(*)=0 FROM r);
|
||||
DEBUG: generating subplan 33_1 for CTE r: SELECT multi_mx_function_call_delegation.delegated_function(7) AS delegated_function
|
||||
DEBUG: not pushing down function calls in CTEs or Subqueries
|
||||
DEBUG: generating subplan 33_2 for subquery SELECT (count(*) OPERATOR(pg_catalog.=) 0) FROM (SELECT intermediate_result.delegated_function FROM read_intermediate_result('33_1'::text, 'binary'::citus_copy_format) intermediate_result(delegated_function integer)) r
|
||||
DEBUG: Plan 33 query after replacing subqueries and CTEs: SELECT x FROM multi_mx_function_call_delegation.test WHERE (SELECT intermediate_result."?column?" FROM read_intermediate_result('33_2'::text, 'binary'::citus_copy_format) intermediate_result("?column?" boolean))
|
||||
x
|
||||
---
|
||||
(0 rows)
|
||||
|
||||
WITH r AS (
|
||||
SELECT delegated_function(10)
|
||||
), t AS (
|
||||
SELECT count(*) c FROM r
|
||||
) SELECT * FROM test, t WHERE t.c=0;
|
||||
DEBUG: generating subplan 37_1 for CTE r: SELECT multi_mx_function_call_delegation.delegated_function(10) AS delegated_function
|
||||
DEBUG: not pushing down function calls in CTEs or Subqueries
|
||||
DEBUG: generating subplan 37_2 for CTE t: SELECT count(*) AS c FROM (SELECT intermediate_result.delegated_function FROM read_intermediate_result('37_1'::text, 'binary'::citus_copy_format) intermediate_result(delegated_function integer)) r
|
||||
DEBUG: Plan 37 query after replacing subqueries and CTEs: SELECT test.x, t.c FROM multi_mx_function_call_delegation.test, (SELECT intermediate_result.c FROM read_intermediate_result('37_2'::text, 'binary'::citus_copy_format) intermediate_result(c bigint)) t WHERE (t.c OPERATOR(pg_catalog.=) 0)
|
||||
x | c
|
||||
---+---
|
||||
(0 rows)
|
||||
|
||||
WITH r AS (
|
||||
SELECT count(*) FROM test
|
||||
), s AS (
|
||||
SELECT delegated_function(13)
|
||||
), t AS (
|
||||
SELECT count(*) c FROM s
|
||||
) SELECT * FROM test, r, t WHERE t.c=0;
|
||||
DEBUG: generating subplan 41_1 for CTE r: SELECT count(*) AS count FROM multi_mx_function_call_delegation.test
|
||||
DEBUG: generating subplan 41_2 for CTE s: SELECT multi_mx_function_call_delegation.delegated_function(13) AS delegated_function
|
||||
DEBUG: not pushing down function calls in CTEs or Subqueries
|
||||
DEBUG: generating subplan 41_3 for CTE t: SELECT count(*) AS c FROM (SELECT intermediate_result.delegated_function FROM read_intermediate_result('41_2'::text, 'binary'::citus_copy_format) intermediate_result(delegated_function integer)) s
|
||||
DEBUG: Plan 41 query after replacing subqueries and CTEs: SELECT test.x, r.count, t.c FROM multi_mx_function_call_delegation.test, (SELECT intermediate_result.count FROM read_intermediate_result('41_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) r, (SELECT intermediate_result.c FROM read_intermediate_result('41_3'::text, 'binary'::citus_copy_format) intermediate_result(c bigint)) t WHERE (t.c OPERATOR(pg_catalog.=) 0)
|
||||
x | count | c
|
||||
---+-------+---
|
||||
(0 rows)
|
||||
|
||||
-- Test that we don't propagate to non-metadata worker nodes
|
||||
select stop_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
stop_metadata_sync_to_node
|
||||
|
@ -386,10 +468,10 @@ select stop_metadata_sync_to_node('localhost', :worker_2_port);
|
|||
|
||||
select mx_call_func(2, 0);
|
||||
DEBUG: the worker node does not have metadata
|
||||
DEBUG: generating subplan 27_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
|
||||
DEBUG: generating subplan 46_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
|
||||
DEBUG: Plan 27 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('27_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
|
||||
DEBUG: Plan 46 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('46_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
|
||||
mx_call_func
|
||||
|
@ -536,4 +618,4 @@ DEBUG: pushing down the function call
|
|||
RESET client_min_messages;
|
||||
\set VERBOSITY terse
|
||||
DROP SCHEMA multi_mx_function_call_delegation CASCADE;
|
||||
NOTICE: drop cascades to 10 other objects
|
||||
NOTICE: drop cascades to 12 other objects
|
||||
|
|
|
@ -152,6 +152,51 @@ END;$$;
|
|||
select create_distributed_function('mx_call_func_raise(int)', '$1', 'mx_call_dist_table_1');
|
||||
select mx_call_func_raise(2);
|
||||
|
||||
-- Don't push-down when doing INSERT INTO ... SELECT func();
|
||||
SET client_min_messages TO ERROR;
|
||||
CREATE TABLE test (x int primary key);
|
||||
SELECT create_distributed_table('test','x');
|
||||
|
||||
CREATE OR REPLACE FUNCTION delegated_function(a int)
|
||||
RETURNS int
|
||||
LANGUAGE plpgsql
|
||||
AS $function$
|
||||
DECLARE
|
||||
BEGIN
|
||||
INSERT INTO multi_mx_function_call_delegation.test VALUES (a);
|
||||
INSERT INTO multi_mx_function_call_delegation.test VALUES (a + 1);
|
||||
RETURN a+2;
|
||||
END;
|
||||
$function$;
|
||||
|
||||
SELECT create_distributed_function('delegated_function(int)', 'a');
|
||||
|
||||
SET client_min_messages TO DEBUG1;
|
||||
INSERT INTO test SELECT delegated_function(1);
|
||||
|
||||
-- Don't push down in subqueries or CTEs.
|
||||
SELECT * FROM test WHERE not exists(
|
||||
SELECT delegated_function(4)
|
||||
);
|
||||
|
||||
WITH r AS (
|
||||
SELECT delegated_function(7)
|
||||
) SELECT * FROM test WHERE (SELECT count(*)=0 FROM r);
|
||||
|
||||
WITH r AS (
|
||||
SELECT delegated_function(10)
|
||||
), t AS (
|
||||
SELECT count(*) c FROM r
|
||||
) SELECT * FROM test, t WHERE t.c=0;
|
||||
|
||||
WITH r AS (
|
||||
SELECT count(*) FROM test
|
||||
), s AS (
|
||||
SELECT delegated_function(13)
|
||||
), t AS (
|
||||
SELECT count(*) c FROM s
|
||||
) SELECT * FROM test, r, t WHERE t.c=0;
|
||||
|
||||
-- Test that we don't propagate to non-metadata worker nodes
|
||||
select stop_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
select stop_metadata_sync_to_node('localhost', :worker_2_port);
|
||||
|
|
Loading…
Reference in New Issue