Explicitly mark queries in physical planner for [not] having parameters

Physical planner doesn't support parameters. If the parameters have already
been resolved when the physical planner handling the queries, mark it.
The reason is that the executor is unaware of this, and sends the parameters
along with the worker queries, which fails for composite types.

(See `DissuadePlannerFromUsingPlan()` for the details of paramater resolving)
pull/3794/head
Onder Kalaci 2020-04-23 10:42:50 +02:00
parent 4372954f31
commit 0cb7ab2d05
5 changed files with 389 additions and 6 deletions

View File

@ -124,7 +124,6 @@ static PlannerRestrictionContext * CurrentPlannerRestrictionContext(void);
static void PopPlannerRestrictionContext(void);
static void ResetPlannerRestrictionContext(
PlannerRestrictionContext *plannerRestrictionContext);
static bool HasUnresolvedExternParamsWalker(Node *expression, ParamListInfo boundParams);
static bool IsLocalReferenceTableJoin(Query *parse, List *rangeTableList);
static bool QueryIsNotSimpleSelect(Node *node);
static void UpdateReferenceTablesWithShard(List *rangeTableList);
@ -2242,7 +2241,7 @@ ResetPlannerRestrictionContext(PlannerRestrictionContext *plannerRestrictionCont
* has external parameters that are not contained in boundParams, false
* otherwise.
*/
static bool
bool
HasUnresolvedExternParamsWalker(Node *expression, ParamListInfo boundParams)
{
if (expression == NULL)

View File

@ -2130,12 +2130,29 @@ BuildJobTreeTaskList(Job *jobTree, PlannerRestrictionContext *plannerRestriction
List *assignedSqlTaskList = AssignTaskList(sqlTaskList);
AssignDataFetchDependencies(assignedSqlTaskList);
/* now assign merge task's data fetch dependencies */
/* if the parameters has not been resolved, record it */
job->parametersInJobQueryResolved =
!HasUnresolvedExternParamsWalker((Node *) job->jobQuery, NULL);
/*
* Make final adjustments for the assigned tasks.
*
* First, update SELECT tasks' parameters resolved field.
*
* Second, assign merge task's data fetch dependencies.
*/
foreach(assignedSqlTaskCell, assignedSqlTaskList)
{
Task *assignedSqlTask = (Task *) lfirst(assignedSqlTaskCell);
List *assignedMergeTaskList = FindDependentMergeTaskList(assignedSqlTask);
/* we don't support parameters in the physical planner */
if (assignedSqlTask->taskType == SELECT_TASK)
{
assignedSqlTask->parametersInQueryStringResolved =
job->parametersInJobQueryResolved;
}
List *assignedMergeTaskList = FindDependentMergeTaskList(assignedSqlTask);
AssignDataFetchDependencies(assignedMergeTaskList);
}

View File

@ -196,6 +196,7 @@ extern void multi_join_restriction_hook(PlannerInfo *root,
RelOptInfo *innerrel,
JoinType jointype,
JoinPathExtraData *extra);
extern bool HasUnresolvedExternParamsWalker(Node *expression, ParamListInfo boundParams);
extern bool IsModifyCommand(Query *query);
extern bool IsModifyDistributedPlan(struct DistributedPlan *distributedPlan);
extern void EnsurePartitionTableNotReplicated(Oid relationId);

View File

@ -1674,7 +1674,7 @@ RESET client_min_messages;
RESET citus.log_local_commands;
\c - - - :master_port
SET citus.next_shard_id TO 1480000;
-- local execution with custom type
-- test both local and remote execution with custom type
SET citus.replication_model TO "streaming";
SET citus.shard_replication_factor TO 1;
CREATE TYPE invite_resp AS ENUM ('yes', 'no', 'maybe');
@ -1690,6 +1690,273 @@ SELECT create_distributed_table('event_responses', 'event_id');
(1 row)
INSERT INTO event_responses VALUES (1, 1, 'yes'), (2, 2, 'yes'), (3, 3, 'no'), (4, 4, 'no');
CREATE OR REPLACE FUNCTION regular_func(p invite_resp)
RETURNS int AS $$
DECLARE
q1Result INT;
q2Result INT;
q3Result INT;
BEGIN
SELECT count(*) INTO q1Result FROM event_responses WHERE response = $1;
SELECT count(*) INTO q2Result FROM event_responses e1 LEFT JOIN event_responses e2 USING (event_id) WHERE e2.response = $1;
SELECT count(*) INTO q3Result FROM (SELECT * FROM event_responses WHERE response = $1 LIMIT 5) as foo;
RETURN q3Result+q2Result+q1Result;
END;
$$ LANGUAGE plpgsql;
SELECT regular_func('yes');
regular_func
---------------------------------------------------------------------
6
(1 row)
SELECT regular_func('yes');
regular_func
---------------------------------------------------------------------
6
(1 row)
SELECT regular_func('yes');
regular_func
---------------------------------------------------------------------
6
(1 row)
SELECT regular_func('yes');
regular_func
---------------------------------------------------------------------
6
(1 row)
SELECT regular_func('yes');
regular_func
---------------------------------------------------------------------
6
(1 row)
SELECT regular_func('yes');
regular_func
---------------------------------------------------------------------
6
(1 row)
SELECT regular_func('yes');
regular_func
---------------------------------------------------------------------
6
(1 row)
SELECT regular_func('yes');
regular_func
---------------------------------------------------------------------
6
(1 row)
CREATE OR REPLACE PROCEDURE regular_procedure(p invite_resp)
AS $$
BEGIN
PERFORM * FROM event_responses WHERE response = $1;
PERFORM * FROM event_responses e1 LEFT JOIN event_responses e2 USING (event_id) WHERE e2.response = $1;
PERFORM * FROM (SELECT * FROM event_responses WHERE response = $1 LIMIT 5) as foo;
END;
$$ LANGUAGE plpgsql;
CALL regular_procedure('no');
CALL regular_procedure('no');
CALL regular_procedure('no');
CALL regular_procedure('no');
CALL regular_procedure('no');
CALL regular_procedure('no');
CALL regular_procedure('no');
PREPARE multi_shard_no_dist_key(invite_resp) AS select * from event_responses where response = $1::invite_resp LIMIT 1;
EXECUTE multi_shard_no_dist_key('yes');
event_id | user_id | response
---------------------------------------------------------------------
1 | 1 | yes
(1 row)
EXECUTE multi_shard_no_dist_key('yes');
event_id | user_id | response
---------------------------------------------------------------------
1 | 1 | yes
(1 row)
EXECUTE multi_shard_no_dist_key('yes');
event_id | user_id | response
---------------------------------------------------------------------
1 | 1 | yes
(1 row)
EXECUTE multi_shard_no_dist_key('yes');
event_id | user_id | response
---------------------------------------------------------------------
1 | 1 | yes
(1 row)
EXECUTE multi_shard_no_dist_key('yes');
event_id | user_id | response
---------------------------------------------------------------------
1 | 1 | yes
(1 row)
EXECUTE multi_shard_no_dist_key('yes');
event_id | user_id | response
---------------------------------------------------------------------
1 | 1 | yes
(1 row)
EXECUTE multi_shard_no_dist_key('yes');
event_id | user_id | response
---------------------------------------------------------------------
1 | 1 | yes
(1 row)
PREPARE multi_shard_with_dist_key(int, invite_resp) AS select * from event_responses where event_id > $1 AND response = $2::invite_resp LIMIT 1;
EXECUTE multi_shard_with_dist_key(1, 'yes');
event_id | user_id | response
---------------------------------------------------------------------
2 | 2 | yes
(1 row)
EXECUTE multi_shard_with_dist_key(1, 'yes');
event_id | user_id | response
---------------------------------------------------------------------
2 | 2 | yes
(1 row)
EXECUTE multi_shard_with_dist_key(1, 'yes');
event_id | user_id | response
---------------------------------------------------------------------
2 | 2 | yes
(1 row)
EXECUTE multi_shard_with_dist_key(1, 'yes');
event_id | user_id | response
---------------------------------------------------------------------
2 | 2 | yes
(1 row)
EXECUTE multi_shard_with_dist_key(1, 'yes');
event_id | user_id | response
---------------------------------------------------------------------
2 | 2 | yes
(1 row)
EXECUTE multi_shard_with_dist_key(1, 'yes');
event_id | user_id | response
---------------------------------------------------------------------
2 | 2 | yes
(1 row)
EXECUTE multi_shard_with_dist_key(1, 'yes');
event_id | user_id | response
---------------------------------------------------------------------
2 | 2 | yes
(1 row)
PREPARE query_pushdown_no_dist_key(invite_resp) AS select * from event_responses e1 LEFT JOIN event_responses e2 USING(event_id) where e1.response = $1::invite_resp LIMIT 1;
EXECUTE query_pushdown_no_dist_key('yes');
event_id | user_id | response | user_id | response
---------------------------------------------------------------------
1 | 1 | yes | 1 | yes
(1 row)
EXECUTE query_pushdown_no_dist_key('yes');
event_id | user_id | response | user_id | response
---------------------------------------------------------------------
1 | 1 | yes | 1 | yes
(1 row)
EXECUTE query_pushdown_no_dist_key('yes');
event_id | user_id | response | user_id | response
---------------------------------------------------------------------
1 | 1 | yes | 1 | yes
(1 row)
EXECUTE query_pushdown_no_dist_key('yes');
event_id | user_id | response | user_id | response
---------------------------------------------------------------------
1 | 1 | yes | 1 | yes
(1 row)
EXECUTE query_pushdown_no_dist_key('yes');
event_id | user_id | response | user_id | response
---------------------------------------------------------------------
1 | 1 | yes | 1 | yes
(1 row)
EXECUTE query_pushdown_no_dist_key('yes');
event_id | user_id | response | user_id | response
---------------------------------------------------------------------
1 | 1 | yes | 1 | yes
(1 row)
EXECUTE query_pushdown_no_dist_key('yes');
event_id | user_id | response | user_id | response
---------------------------------------------------------------------
1 | 1 | yes | 1 | yes
(1 row)
PREPARE insert_select_via_coord(invite_resp) AS INSERT INTO event_responses SELECT * FROM event_responses where response = $1::invite_resp LIMIT 1 ON CONFLICT (event_id, user_id) DO NOTHING ;
EXECUTE insert_select_via_coord('yes');
EXECUTE insert_select_via_coord('yes');
EXECUTE insert_select_via_coord('yes');
EXECUTE insert_select_via_coord('yes');
EXECUTE insert_select_via_coord('yes');
EXECUTE insert_select_via_coord('yes');
EXECUTE insert_select_via_coord('yes');
PREPARE insert_select_pushdown(invite_resp) AS INSERT INTO event_responses SELECT * FROM event_responses where response = $1::invite_resp ON CONFLICT (event_id, user_id) DO NOTHING;
EXECUTE insert_select_pushdown('yes');
EXECUTE insert_select_pushdown('yes');
EXECUTE insert_select_pushdown('yes');
EXECUTE insert_select_pushdown('yes');
EXECUTE insert_select_pushdown('yes');
EXECUTE insert_select_pushdown('yes');
EXECUTE insert_select_pushdown('yes');
PREPARE router_select_with_no_dist_key_filter(invite_resp) AS select * from event_responses where event_id = 1 AND response = $1::invite_resp LIMIT 1;
EXECUTE router_select_with_no_dist_key_filter('yes');
event_id | user_id | response
---------------------------------------------------------------------
1 | 1 | yes
(1 row)
EXECUTE router_select_with_no_dist_key_filter('yes');
event_id | user_id | response
---------------------------------------------------------------------
1 | 1 | yes
(1 row)
EXECUTE router_select_with_no_dist_key_filter('yes');
event_id | user_id | response
---------------------------------------------------------------------
1 | 1 | yes
(1 row)
EXECUTE router_select_with_no_dist_key_filter('yes');
event_id | user_id | response
---------------------------------------------------------------------
1 | 1 | yes
(1 row)
EXECUTE router_select_with_no_dist_key_filter('yes');
event_id | user_id | response
---------------------------------------------------------------------
1 | 1 | yes
(1 row)
EXECUTE router_select_with_no_dist_key_filter('yes');
event_id | user_id | response
---------------------------------------------------------------------
1 | 1 | yes
(1 row)
EXECUTE router_select_with_no_dist_key_filter('yes');
event_id | user_id | response
---------------------------------------------------------------------
1 | 1 | yes
(1 row)
-- rest of the tests assume the table is empty
TRUNCATE event_responses;
CREATE OR REPLACE PROCEDURE register_for_event(p_event_id int, p_user_id int, p_choice invite_resp)
LANGUAGE plpgsql AS $fn$
BEGIN

View File

@ -846,7 +846,7 @@ RESET citus.log_local_commands;
\c - - - :master_port
SET citus.next_shard_id TO 1480000;
-- local execution with custom type
-- test both local and remote execution with custom type
SET citus.replication_model TO "streaming";
SET citus.shard_replication_factor TO 1;
CREATE TYPE invite_resp AS ENUM ('yes', 'no', 'maybe');
@ -860,6 +860,105 @@ CREATE TABLE event_responses (
SELECT create_distributed_table('event_responses', 'event_id');
INSERT INTO event_responses VALUES (1, 1, 'yes'), (2, 2, 'yes'), (3, 3, 'no'), (4, 4, 'no');
CREATE OR REPLACE FUNCTION regular_func(p invite_resp)
RETURNS int AS $$
DECLARE
q1Result INT;
q2Result INT;
q3Result INT;
BEGIN
SELECT count(*) INTO q1Result FROM event_responses WHERE response = $1;
SELECT count(*) INTO q2Result FROM event_responses e1 LEFT JOIN event_responses e2 USING (event_id) WHERE e2.response = $1;
SELECT count(*) INTO q3Result FROM (SELECT * FROM event_responses WHERE response = $1 LIMIT 5) as foo;
RETURN q3Result+q2Result+q1Result;
END;
$$ LANGUAGE plpgsql;
SELECT regular_func('yes');
SELECT regular_func('yes');
SELECT regular_func('yes');
SELECT regular_func('yes');
SELECT regular_func('yes');
SELECT regular_func('yes');
SELECT regular_func('yes');
SELECT regular_func('yes');
CREATE OR REPLACE PROCEDURE regular_procedure(p invite_resp)
AS $$
BEGIN
PERFORM * FROM event_responses WHERE response = $1;
PERFORM * FROM event_responses e1 LEFT JOIN event_responses e2 USING (event_id) WHERE e2.response = $1;
PERFORM * FROM (SELECT * FROM event_responses WHERE response = $1 LIMIT 5) as foo;
END;
$$ LANGUAGE plpgsql;
CALL regular_procedure('no');
CALL regular_procedure('no');
CALL regular_procedure('no');
CALL regular_procedure('no');
CALL regular_procedure('no');
CALL regular_procedure('no');
CALL regular_procedure('no');
PREPARE multi_shard_no_dist_key(invite_resp) AS select * from event_responses where response = $1::invite_resp LIMIT 1;
EXECUTE multi_shard_no_dist_key('yes');
EXECUTE multi_shard_no_dist_key('yes');
EXECUTE multi_shard_no_dist_key('yes');
EXECUTE multi_shard_no_dist_key('yes');
EXECUTE multi_shard_no_dist_key('yes');
EXECUTE multi_shard_no_dist_key('yes');
EXECUTE multi_shard_no_dist_key('yes');
PREPARE multi_shard_with_dist_key(int, invite_resp) AS select * from event_responses where event_id > $1 AND response = $2::invite_resp LIMIT 1;
EXECUTE multi_shard_with_dist_key(1, 'yes');
EXECUTE multi_shard_with_dist_key(1, 'yes');
EXECUTE multi_shard_with_dist_key(1, 'yes');
EXECUTE multi_shard_with_dist_key(1, 'yes');
EXECUTE multi_shard_with_dist_key(1, 'yes');
EXECUTE multi_shard_with_dist_key(1, 'yes');
EXECUTE multi_shard_with_dist_key(1, 'yes');
PREPARE query_pushdown_no_dist_key(invite_resp) AS select * from event_responses e1 LEFT JOIN event_responses e2 USING(event_id) where e1.response = $1::invite_resp LIMIT 1;
EXECUTE query_pushdown_no_dist_key('yes');
EXECUTE query_pushdown_no_dist_key('yes');
EXECUTE query_pushdown_no_dist_key('yes');
EXECUTE query_pushdown_no_dist_key('yes');
EXECUTE query_pushdown_no_dist_key('yes');
EXECUTE query_pushdown_no_dist_key('yes');
EXECUTE query_pushdown_no_dist_key('yes');
PREPARE insert_select_via_coord(invite_resp) AS INSERT INTO event_responses SELECT * FROM event_responses where response = $1::invite_resp LIMIT 1 ON CONFLICT (event_id, user_id) DO NOTHING ;
EXECUTE insert_select_via_coord('yes');
EXECUTE insert_select_via_coord('yes');
EXECUTE insert_select_via_coord('yes');
EXECUTE insert_select_via_coord('yes');
EXECUTE insert_select_via_coord('yes');
EXECUTE insert_select_via_coord('yes');
EXECUTE insert_select_via_coord('yes');
PREPARE insert_select_pushdown(invite_resp) AS INSERT INTO event_responses SELECT * FROM event_responses where response = $1::invite_resp ON CONFLICT (event_id, user_id) DO NOTHING;
EXECUTE insert_select_pushdown('yes');
EXECUTE insert_select_pushdown('yes');
EXECUTE insert_select_pushdown('yes');
EXECUTE insert_select_pushdown('yes');
EXECUTE insert_select_pushdown('yes');
EXECUTE insert_select_pushdown('yes');
EXECUTE insert_select_pushdown('yes');
PREPARE router_select_with_no_dist_key_filter(invite_resp) AS select * from event_responses where event_id = 1 AND response = $1::invite_resp LIMIT 1;
EXECUTE router_select_with_no_dist_key_filter('yes');
EXECUTE router_select_with_no_dist_key_filter('yes');
EXECUTE router_select_with_no_dist_key_filter('yes');
EXECUTE router_select_with_no_dist_key_filter('yes');
EXECUTE router_select_with_no_dist_key_filter('yes');
EXECUTE router_select_with_no_dist_key_filter('yes');
EXECUTE router_select_with_no_dist_key_filter('yes');
-- rest of the tests assume the table is empty
TRUNCATE event_responses;
CREATE OR REPLACE PROCEDURE register_for_event(p_event_id int, p_user_id int, p_choice invite_resp)
LANGUAGE plpgsql AS $fn$
BEGIN