Merge pull request #3794 from citusdata/fix_custom_type_select

Explicitly mark queries in physical planner for [not] having parameters
pull/3797/head
Önder Kalacı 2020-04-24 12:58:39 +02:00 committed by GitHub
commit 30a0a955d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 389 additions and 6 deletions

View File

@ -124,7 +124,6 @@ static PlannerRestrictionContext * CurrentPlannerRestrictionContext(void);
static void PopPlannerRestrictionContext(void); static void PopPlannerRestrictionContext(void);
static void ResetPlannerRestrictionContext( static void ResetPlannerRestrictionContext(
PlannerRestrictionContext *plannerRestrictionContext); PlannerRestrictionContext *plannerRestrictionContext);
static bool HasUnresolvedExternParamsWalker(Node *expression, ParamListInfo boundParams);
static bool IsLocalReferenceTableJoin(Query *parse, List *rangeTableList); static bool IsLocalReferenceTableJoin(Query *parse, List *rangeTableList);
static bool QueryIsNotSimpleSelect(Node *node); static bool QueryIsNotSimpleSelect(Node *node);
static void UpdateReferenceTablesWithShard(List *rangeTableList); static void UpdateReferenceTablesWithShard(List *rangeTableList);
@ -2242,7 +2241,7 @@ ResetPlannerRestrictionContext(PlannerRestrictionContext *plannerRestrictionCont
* has external parameters that are not contained in boundParams, false * has external parameters that are not contained in boundParams, false
* otherwise. * otherwise.
*/ */
static bool bool
HasUnresolvedExternParamsWalker(Node *expression, ParamListInfo boundParams) HasUnresolvedExternParamsWalker(Node *expression, ParamListInfo boundParams)
{ {
if (expression == NULL) if (expression == NULL)

View File

@ -2130,12 +2130,29 @@ BuildJobTreeTaskList(Job *jobTree, PlannerRestrictionContext *plannerRestriction
List *assignedSqlTaskList = AssignTaskList(sqlTaskList); List *assignedSqlTaskList = AssignTaskList(sqlTaskList);
AssignDataFetchDependencies(assignedSqlTaskList); AssignDataFetchDependencies(assignedSqlTaskList);
/* now assign merge task's data fetch dependencies */ /* if the parameters has not been resolved, record it */
job->parametersInJobQueryResolved =
!HasUnresolvedExternParamsWalker((Node *) job->jobQuery, NULL);
/*
* Make final adjustments for the assigned tasks.
*
* First, update SELECT tasks' parameters resolved field.
*
* Second, assign merge task's data fetch dependencies.
*/
foreach(assignedSqlTaskCell, assignedSqlTaskList) foreach(assignedSqlTaskCell, assignedSqlTaskList)
{ {
Task *assignedSqlTask = (Task *) lfirst(assignedSqlTaskCell); Task *assignedSqlTask = (Task *) lfirst(assignedSqlTaskCell);
List *assignedMergeTaskList = FindDependentMergeTaskList(assignedSqlTask);
/* we don't support parameters in the physical planner */
if (assignedSqlTask->taskType == SELECT_TASK)
{
assignedSqlTask->parametersInQueryStringResolved =
job->parametersInJobQueryResolved;
}
List *assignedMergeTaskList = FindDependentMergeTaskList(assignedSqlTask);
AssignDataFetchDependencies(assignedMergeTaskList); AssignDataFetchDependencies(assignedMergeTaskList);
} }

View File

@ -196,6 +196,7 @@ extern void multi_join_restriction_hook(PlannerInfo *root,
RelOptInfo *innerrel, RelOptInfo *innerrel,
JoinType jointype, JoinType jointype,
JoinPathExtraData *extra); JoinPathExtraData *extra);
extern bool HasUnresolvedExternParamsWalker(Node *expression, ParamListInfo boundParams);
extern bool IsModifyCommand(Query *query); extern bool IsModifyCommand(Query *query);
extern bool IsModifyDistributedPlan(struct DistributedPlan *distributedPlan); extern bool IsModifyDistributedPlan(struct DistributedPlan *distributedPlan);
extern void EnsurePartitionTableNotReplicated(Oid relationId); extern void EnsurePartitionTableNotReplicated(Oid relationId);

View File

@ -1674,7 +1674,7 @@ RESET client_min_messages;
RESET citus.log_local_commands; RESET citus.log_local_commands;
\c - - - :master_port \c - - - :master_port
SET citus.next_shard_id TO 1480000; SET citus.next_shard_id TO 1480000;
-- local execution with custom type -- test both local and remote execution with custom type
SET citus.replication_model TO "streaming"; SET citus.replication_model TO "streaming";
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
CREATE TYPE invite_resp AS ENUM ('yes', 'no', 'maybe'); CREATE TYPE invite_resp AS ENUM ('yes', 'no', 'maybe');
@ -1690,6 +1690,273 @@ SELECT create_distributed_table('event_responses', 'event_id');
(1 row) (1 row)
INSERT INTO event_responses VALUES (1, 1, 'yes'), (2, 2, 'yes'), (3, 3, 'no'), (4, 4, 'no');
CREATE OR REPLACE FUNCTION regular_func(p invite_resp)
RETURNS int AS $$
DECLARE
q1Result INT;
q2Result INT;
q3Result INT;
BEGIN
SELECT count(*) INTO q1Result FROM event_responses WHERE response = $1;
SELECT count(*) INTO q2Result FROM event_responses e1 LEFT JOIN event_responses e2 USING (event_id) WHERE e2.response = $1;
SELECT count(*) INTO q3Result FROM (SELECT * FROM event_responses WHERE response = $1 LIMIT 5) as foo;
RETURN q3Result+q2Result+q1Result;
END;
$$ LANGUAGE plpgsql;
SELECT regular_func('yes');
regular_func
---------------------------------------------------------------------
6
(1 row)
SELECT regular_func('yes');
regular_func
---------------------------------------------------------------------
6
(1 row)
SELECT regular_func('yes');
regular_func
---------------------------------------------------------------------
6
(1 row)
SELECT regular_func('yes');
regular_func
---------------------------------------------------------------------
6
(1 row)
SELECT regular_func('yes');
regular_func
---------------------------------------------------------------------
6
(1 row)
SELECT regular_func('yes');
regular_func
---------------------------------------------------------------------
6
(1 row)
SELECT regular_func('yes');
regular_func
---------------------------------------------------------------------
6
(1 row)
SELECT regular_func('yes');
regular_func
---------------------------------------------------------------------
6
(1 row)
CREATE OR REPLACE PROCEDURE regular_procedure(p invite_resp)
AS $$
BEGIN
PERFORM * FROM event_responses WHERE response = $1;
PERFORM * FROM event_responses e1 LEFT JOIN event_responses e2 USING (event_id) WHERE e2.response = $1;
PERFORM * FROM (SELECT * FROM event_responses WHERE response = $1 LIMIT 5) as foo;
END;
$$ LANGUAGE plpgsql;
CALL regular_procedure('no');
CALL regular_procedure('no');
CALL regular_procedure('no');
CALL regular_procedure('no');
CALL regular_procedure('no');
CALL regular_procedure('no');
CALL regular_procedure('no');
PREPARE multi_shard_no_dist_key(invite_resp) AS select * from event_responses where response = $1::invite_resp LIMIT 1;
EXECUTE multi_shard_no_dist_key('yes');
event_id | user_id | response
---------------------------------------------------------------------
1 | 1 | yes
(1 row)
EXECUTE multi_shard_no_dist_key('yes');
event_id | user_id | response
---------------------------------------------------------------------
1 | 1 | yes
(1 row)
EXECUTE multi_shard_no_dist_key('yes');
event_id | user_id | response
---------------------------------------------------------------------
1 | 1 | yes
(1 row)
EXECUTE multi_shard_no_dist_key('yes');
event_id | user_id | response
---------------------------------------------------------------------
1 | 1 | yes
(1 row)
EXECUTE multi_shard_no_dist_key('yes');
event_id | user_id | response
---------------------------------------------------------------------
1 | 1 | yes
(1 row)
EXECUTE multi_shard_no_dist_key('yes');
event_id | user_id | response
---------------------------------------------------------------------
1 | 1 | yes
(1 row)
EXECUTE multi_shard_no_dist_key('yes');
event_id | user_id | response
---------------------------------------------------------------------
1 | 1 | yes
(1 row)
PREPARE multi_shard_with_dist_key(int, invite_resp) AS select * from event_responses where event_id > $1 AND response = $2::invite_resp LIMIT 1;
EXECUTE multi_shard_with_dist_key(1, 'yes');
event_id | user_id | response
---------------------------------------------------------------------
2 | 2 | yes
(1 row)
EXECUTE multi_shard_with_dist_key(1, 'yes');
event_id | user_id | response
---------------------------------------------------------------------
2 | 2 | yes
(1 row)
EXECUTE multi_shard_with_dist_key(1, 'yes');
event_id | user_id | response
---------------------------------------------------------------------
2 | 2 | yes
(1 row)
EXECUTE multi_shard_with_dist_key(1, 'yes');
event_id | user_id | response
---------------------------------------------------------------------
2 | 2 | yes
(1 row)
EXECUTE multi_shard_with_dist_key(1, 'yes');
event_id | user_id | response
---------------------------------------------------------------------
2 | 2 | yes
(1 row)
EXECUTE multi_shard_with_dist_key(1, 'yes');
event_id | user_id | response
---------------------------------------------------------------------
2 | 2 | yes
(1 row)
EXECUTE multi_shard_with_dist_key(1, 'yes');
event_id | user_id | response
---------------------------------------------------------------------
2 | 2 | yes
(1 row)
PREPARE query_pushdown_no_dist_key(invite_resp) AS select * from event_responses e1 LEFT JOIN event_responses e2 USING(event_id) where e1.response = $1::invite_resp LIMIT 1;
EXECUTE query_pushdown_no_dist_key('yes');
event_id | user_id | response | user_id | response
---------------------------------------------------------------------
1 | 1 | yes | 1 | yes
(1 row)
EXECUTE query_pushdown_no_dist_key('yes');
event_id | user_id | response | user_id | response
---------------------------------------------------------------------
1 | 1 | yes | 1 | yes
(1 row)
EXECUTE query_pushdown_no_dist_key('yes');
event_id | user_id | response | user_id | response
---------------------------------------------------------------------
1 | 1 | yes | 1 | yes
(1 row)
EXECUTE query_pushdown_no_dist_key('yes');
event_id | user_id | response | user_id | response
---------------------------------------------------------------------
1 | 1 | yes | 1 | yes
(1 row)
EXECUTE query_pushdown_no_dist_key('yes');
event_id | user_id | response | user_id | response
---------------------------------------------------------------------
1 | 1 | yes | 1 | yes
(1 row)
EXECUTE query_pushdown_no_dist_key('yes');
event_id | user_id | response | user_id | response
---------------------------------------------------------------------
1 | 1 | yes | 1 | yes
(1 row)
EXECUTE query_pushdown_no_dist_key('yes');
event_id | user_id | response | user_id | response
---------------------------------------------------------------------
1 | 1 | yes | 1 | yes
(1 row)
PREPARE insert_select_via_coord(invite_resp) AS INSERT INTO event_responses SELECT * FROM event_responses where response = $1::invite_resp LIMIT 1 ON CONFLICT (event_id, user_id) DO NOTHING ;
EXECUTE insert_select_via_coord('yes');
EXECUTE insert_select_via_coord('yes');
EXECUTE insert_select_via_coord('yes');
EXECUTE insert_select_via_coord('yes');
EXECUTE insert_select_via_coord('yes');
EXECUTE insert_select_via_coord('yes');
EXECUTE insert_select_via_coord('yes');
PREPARE insert_select_pushdown(invite_resp) AS INSERT INTO event_responses SELECT * FROM event_responses where response = $1::invite_resp ON CONFLICT (event_id, user_id) DO NOTHING;
EXECUTE insert_select_pushdown('yes');
EXECUTE insert_select_pushdown('yes');
EXECUTE insert_select_pushdown('yes');
EXECUTE insert_select_pushdown('yes');
EXECUTE insert_select_pushdown('yes');
EXECUTE insert_select_pushdown('yes');
EXECUTE insert_select_pushdown('yes');
PREPARE router_select_with_no_dist_key_filter(invite_resp) AS select * from event_responses where event_id = 1 AND response = $1::invite_resp LIMIT 1;
EXECUTE router_select_with_no_dist_key_filter('yes');
event_id | user_id | response
---------------------------------------------------------------------
1 | 1 | yes
(1 row)
EXECUTE router_select_with_no_dist_key_filter('yes');
event_id | user_id | response
---------------------------------------------------------------------
1 | 1 | yes
(1 row)
EXECUTE router_select_with_no_dist_key_filter('yes');
event_id | user_id | response
---------------------------------------------------------------------
1 | 1 | yes
(1 row)
EXECUTE router_select_with_no_dist_key_filter('yes');
event_id | user_id | response
---------------------------------------------------------------------
1 | 1 | yes
(1 row)
EXECUTE router_select_with_no_dist_key_filter('yes');
event_id | user_id | response
---------------------------------------------------------------------
1 | 1 | yes
(1 row)
EXECUTE router_select_with_no_dist_key_filter('yes');
event_id | user_id | response
---------------------------------------------------------------------
1 | 1 | yes
(1 row)
EXECUTE router_select_with_no_dist_key_filter('yes');
event_id | user_id | response
---------------------------------------------------------------------
1 | 1 | yes
(1 row)
-- rest of the tests assume the table is empty
TRUNCATE event_responses;
CREATE OR REPLACE PROCEDURE register_for_event(p_event_id int, p_user_id int, p_choice invite_resp) CREATE OR REPLACE PROCEDURE register_for_event(p_event_id int, p_user_id int, p_choice invite_resp)
LANGUAGE plpgsql AS $fn$ LANGUAGE plpgsql AS $fn$
BEGIN BEGIN

View File

@ -846,7 +846,7 @@ RESET citus.log_local_commands;
\c - - - :master_port \c - - - :master_port
SET citus.next_shard_id TO 1480000; SET citus.next_shard_id TO 1480000;
-- local execution with custom type -- test both local and remote execution with custom type
SET citus.replication_model TO "streaming"; SET citus.replication_model TO "streaming";
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
CREATE TYPE invite_resp AS ENUM ('yes', 'no', 'maybe'); CREATE TYPE invite_resp AS ENUM ('yes', 'no', 'maybe');
@ -860,6 +860,105 @@ CREATE TABLE event_responses (
SELECT create_distributed_table('event_responses', 'event_id'); SELECT create_distributed_table('event_responses', 'event_id');
INSERT INTO event_responses VALUES (1, 1, 'yes'), (2, 2, 'yes'), (3, 3, 'no'), (4, 4, 'no');
CREATE OR REPLACE FUNCTION regular_func(p invite_resp)
RETURNS int AS $$
DECLARE
q1Result INT;
q2Result INT;
q3Result INT;
BEGIN
SELECT count(*) INTO q1Result FROM event_responses WHERE response = $1;
SELECT count(*) INTO q2Result FROM event_responses e1 LEFT JOIN event_responses e2 USING (event_id) WHERE e2.response = $1;
SELECT count(*) INTO q3Result FROM (SELECT * FROM event_responses WHERE response = $1 LIMIT 5) as foo;
RETURN q3Result+q2Result+q1Result;
END;
$$ LANGUAGE plpgsql;
SELECT regular_func('yes');
SELECT regular_func('yes');
SELECT regular_func('yes');
SELECT regular_func('yes');
SELECT regular_func('yes');
SELECT regular_func('yes');
SELECT regular_func('yes');
SELECT regular_func('yes');
CREATE OR REPLACE PROCEDURE regular_procedure(p invite_resp)
AS $$
BEGIN
PERFORM * FROM event_responses WHERE response = $1;
PERFORM * FROM event_responses e1 LEFT JOIN event_responses e2 USING (event_id) WHERE e2.response = $1;
PERFORM * FROM (SELECT * FROM event_responses WHERE response = $1 LIMIT 5) as foo;
END;
$$ LANGUAGE plpgsql;
CALL regular_procedure('no');
CALL regular_procedure('no');
CALL regular_procedure('no');
CALL regular_procedure('no');
CALL regular_procedure('no');
CALL regular_procedure('no');
CALL regular_procedure('no');
PREPARE multi_shard_no_dist_key(invite_resp) AS select * from event_responses where response = $1::invite_resp LIMIT 1;
EXECUTE multi_shard_no_dist_key('yes');
EXECUTE multi_shard_no_dist_key('yes');
EXECUTE multi_shard_no_dist_key('yes');
EXECUTE multi_shard_no_dist_key('yes');
EXECUTE multi_shard_no_dist_key('yes');
EXECUTE multi_shard_no_dist_key('yes');
EXECUTE multi_shard_no_dist_key('yes');
PREPARE multi_shard_with_dist_key(int, invite_resp) AS select * from event_responses where event_id > $1 AND response = $2::invite_resp LIMIT 1;
EXECUTE multi_shard_with_dist_key(1, 'yes');
EXECUTE multi_shard_with_dist_key(1, 'yes');
EXECUTE multi_shard_with_dist_key(1, 'yes');
EXECUTE multi_shard_with_dist_key(1, 'yes');
EXECUTE multi_shard_with_dist_key(1, 'yes');
EXECUTE multi_shard_with_dist_key(1, 'yes');
EXECUTE multi_shard_with_dist_key(1, 'yes');
PREPARE query_pushdown_no_dist_key(invite_resp) AS select * from event_responses e1 LEFT JOIN event_responses e2 USING(event_id) where e1.response = $1::invite_resp LIMIT 1;
EXECUTE query_pushdown_no_dist_key('yes');
EXECUTE query_pushdown_no_dist_key('yes');
EXECUTE query_pushdown_no_dist_key('yes');
EXECUTE query_pushdown_no_dist_key('yes');
EXECUTE query_pushdown_no_dist_key('yes');
EXECUTE query_pushdown_no_dist_key('yes');
EXECUTE query_pushdown_no_dist_key('yes');
PREPARE insert_select_via_coord(invite_resp) AS INSERT INTO event_responses SELECT * FROM event_responses where response = $1::invite_resp LIMIT 1 ON CONFLICT (event_id, user_id) DO NOTHING ;
EXECUTE insert_select_via_coord('yes');
EXECUTE insert_select_via_coord('yes');
EXECUTE insert_select_via_coord('yes');
EXECUTE insert_select_via_coord('yes');
EXECUTE insert_select_via_coord('yes');
EXECUTE insert_select_via_coord('yes');
EXECUTE insert_select_via_coord('yes');
PREPARE insert_select_pushdown(invite_resp) AS INSERT INTO event_responses SELECT * FROM event_responses where response = $1::invite_resp ON CONFLICT (event_id, user_id) DO NOTHING;
EXECUTE insert_select_pushdown('yes');
EXECUTE insert_select_pushdown('yes');
EXECUTE insert_select_pushdown('yes');
EXECUTE insert_select_pushdown('yes');
EXECUTE insert_select_pushdown('yes');
EXECUTE insert_select_pushdown('yes');
EXECUTE insert_select_pushdown('yes');
PREPARE router_select_with_no_dist_key_filter(invite_resp) AS select * from event_responses where event_id = 1 AND response = $1::invite_resp LIMIT 1;
EXECUTE router_select_with_no_dist_key_filter('yes');
EXECUTE router_select_with_no_dist_key_filter('yes');
EXECUTE router_select_with_no_dist_key_filter('yes');
EXECUTE router_select_with_no_dist_key_filter('yes');
EXECUTE router_select_with_no_dist_key_filter('yes');
EXECUTE router_select_with_no_dist_key_filter('yes');
EXECUTE router_select_with_no_dist_key_filter('yes');
-- rest of the tests assume the table is empty
TRUNCATE event_responses;
CREATE OR REPLACE PROCEDURE register_for_event(p_event_id int, p_user_id int, p_choice invite_resp) CREATE OR REPLACE PROCEDURE register_for_event(p_event_id int, p_user_id int, p_choice invite_resp)
LANGUAGE plpgsql AS $fn$ LANGUAGE plpgsql AS $fn$
BEGIN BEGIN