diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index b31f29d3d..68b3a17b7 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -124,7 +124,6 @@ static PlannerRestrictionContext * CurrentPlannerRestrictionContext(void); static void PopPlannerRestrictionContext(void); static void ResetPlannerRestrictionContext( PlannerRestrictionContext *plannerRestrictionContext); -static bool HasUnresolvedExternParamsWalker(Node *expression, ParamListInfo boundParams); static bool IsLocalReferenceTableJoin(Query *parse, List *rangeTableList); static bool QueryIsNotSimpleSelect(Node *node); static void UpdateReferenceTablesWithShard(List *rangeTableList); @@ -2242,7 +2241,7 @@ ResetPlannerRestrictionContext(PlannerRestrictionContext *plannerRestrictionCont * has external parameters that are not contained in boundParams, false * otherwise. */ -static bool +bool HasUnresolvedExternParamsWalker(Node *expression, ParamListInfo boundParams) { if (expression == NULL) diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index dc359881c..d8861da18 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -2130,12 +2130,29 @@ BuildJobTreeTaskList(Job *jobTree, PlannerRestrictionContext *plannerRestriction List *assignedSqlTaskList = AssignTaskList(sqlTaskList); AssignDataFetchDependencies(assignedSqlTaskList); - /* now assign merge task's data fetch dependencies */ + /* if the parameters has not been resolved, record it */ + job->parametersInJobQueryResolved = + !HasUnresolvedExternParamsWalker((Node *) job->jobQuery, NULL); + + /* + * Make final adjustments for the assigned tasks. + * + * First, update SELECT tasks' parameters resolved field. + * + * Second, assign merge task's data fetch dependencies. + */ foreach(assignedSqlTaskCell, assignedSqlTaskList) { Task *assignedSqlTask = (Task *) lfirst(assignedSqlTaskCell); - List *assignedMergeTaskList = FindDependentMergeTaskList(assignedSqlTask); + /* we don't support parameters in the physical planner */ + if (assignedSqlTask->taskType == SELECT_TASK) + { + assignedSqlTask->parametersInQueryStringResolved = + job->parametersInJobQueryResolved; + } + + List *assignedMergeTaskList = FindDependentMergeTaskList(assignedSqlTask); AssignDataFetchDependencies(assignedMergeTaskList); } diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index 578d2bfa1..f08ebd094 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -196,6 +196,7 @@ extern void multi_join_restriction_hook(PlannerInfo *root, RelOptInfo *innerrel, JoinType jointype, JoinPathExtraData *extra); +extern bool HasUnresolvedExternParamsWalker(Node *expression, ParamListInfo boundParams); extern bool IsModifyCommand(Query *query); extern bool IsModifyDistributedPlan(struct DistributedPlan *distributedPlan); extern void EnsurePartitionTableNotReplicated(Oid relationId); diff --git a/src/test/regress/expected/local_shard_execution.out b/src/test/regress/expected/local_shard_execution.out index 7e1fb892e..9e8639d7d 100644 --- a/src/test/regress/expected/local_shard_execution.out +++ b/src/test/regress/expected/local_shard_execution.out @@ -1674,7 +1674,7 @@ RESET client_min_messages; RESET citus.log_local_commands; \c - - - :master_port SET citus.next_shard_id TO 1480000; --- local execution with custom type +-- test both local and remote execution with custom type SET citus.replication_model TO "streaming"; SET citus.shard_replication_factor TO 1; CREATE TYPE invite_resp AS ENUM ('yes', 'no', 'maybe'); @@ -1690,6 +1690,273 @@ SELECT create_distributed_table('event_responses', 'event_id'); (1 row) +INSERT INTO event_responses VALUES (1, 1, 'yes'), (2, 2, 'yes'), (3, 3, 'no'), (4, 4, 'no'); +CREATE OR REPLACE FUNCTION regular_func(p invite_resp) +RETURNS int AS $$ +DECLARE + q1Result INT; + q2Result INT; + q3Result INT; +BEGIN +SELECT count(*) INTO q1Result FROM event_responses WHERE response = $1; +SELECT count(*) INTO q2Result FROM event_responses e1 LEFT JOIN event_responses e2 USING (event_id) WHERE e2.response = $1; +SELECT count(*) INTO q3Result FROM (SELECT * FROM event_responses WHERE response = $1 LIMIT 5) as foo; +RETURN q3Result+q2Result+q1Result; +END; +$$ LANGUAGE plpgsql; +SELECT regular_func('yes'); + regular_func +--------------------------------------------------------------------- + 6 +(1 row) + +SELECT regular_func('yes'); + regular_func +--------------------------------------------------------------------- + 6 +(1 row) + +SELECT regular_func('yes'); + regular_func +--------------------------------------------------------------------- + 6 +(1 row) + +SELECT regular_func('yes'); + regular_func +--------------------------------------------------------------------- + 6 +(1 row) + +SELECT regular_func('yes'); + regular_func +--------------------------------------------------------------------- + 6 +(1 row) + +SELECT regular_func('yes'); + regular_func +--------------------------------------------------------------------- + 6 +(1 row) + +SELECT regular_func('yes'); + regular_func +--------------------------------------------------------------------- + 6 +(1 row) + +SELECT regular_func('yes'); + regular_func +--------------------------------------------------------------------- + 6 +(1 row) + +CREATE OR REPLACE PROCEDURE regular_procedure(p invite_resp) +AS $$ +BEGIN +PERFORM * FROM event_responses WHERE response = $1; +PERFORM * FROM event_responses e1 LEFT JOIN event_responses e2 USING (event_id) WHERE e2.response = $1; +PERFORM * FROM (SELECT * FROM event_responses WHERE response = $1 LIMIT 5) as foo; +END; +$$ LANGUAGE plpgsql; +CALL regular_procedure('no'); +CALL regular_procedure('no'); +CALL regular_procedure('no'); +CALL regular_procedure('no'); +CALL regular_procedure('no'); +CALL regular_procedure('no'); +CALL regular_procedure('no'); +PREPARE multi_shard_no_dist_key(invite_resp) AS select * from event_responses where response = $1::invite_resp LIMIT 1; +EXECUTE multi_shard_no_dist_key('yes'); + event_id | user_id | response +--------------------------------------------------------------------- + 1 | 1 | yes +(1 row) + +EXECUTE multi_shard_no_dist_key('yes'); + event_id | user_id | response +--------------------------------------------------------------------- + 1 | 1 | yes +(1 row) + +EXECUTE multi_shard_no_dist_key('yes'); + event_id | user_id | response +--------------------------------------------------------------------- + 1 | 1 | yes +(1 row) + +EXECUTE multi_shard_no_dist_key('yes'); + event_id | user_id | response +--------------------------------------------------------------------- + 1 | 1 | yes +(1 row) + +EXECUTE multi_shard_no_dist_key('yes'); + event_id | user_id | response +--------------------------------------------------------------------- + 1 | 1 | yes +(1 row) + +EXECUTE multi_shard_no_dist_key('yes'); + event_id | user_id | response +--------------------------------------------------------------------- + 1 | 1 | yes +(1 row) + +EXECUTE multi_shard_no_dist_key('yes'); + event_id | user_id | response +--------------------------------------------------------------------- + 1 | 1 | yes +(1 row) + +PREPARE multi_shard_with_dist_key(int, invite_resp) AS select * from event_responses where event_id > $1 AND response = $2::invite_resp LIMIT 1; +EXECUTE multi_shard_with_dist_key(1, 'yes'); + event_id | user_id | response +--------------------------------------------------------------------- + 2 | 2 | yes +(1 row) + +EXECUTE multi_shard_with_dist_key(1, 'yes'); + event_id | user_id | response +--------------------------------------------------------------------- + 2 | 2 | yes +(1 row) + +EXECUTE multi_shard_with_dist_key(1, 'yes'); + event_id | user_id | response +--------------------------------------------------------------------- + 2 | 2 | yes +(1 row) + +EXECUTE multi_shard_with_dist_key(1, 'yes'); + event_id | user_id | response +--------------------------------------------------------------------- + 2 | 2 | yes +(1 row) + +EXECUTE multi_shard_with_dist_key(1, 'yes'); + event_id | user_id | response +--------------------------------------------------------------------- + 2 | 2 | yes +(1 row) + +EXECUTE multi_shard_with_dist_key(1, 'yes'); + event_id | user_id | response +--------------------------------------------------------------------- + 2 | 2 | yes +(1 row) + +EXECUTE multi_shard_with_dist_key(1, 'yes'); + event_id | user_id | response +--------------------------------------------------------------------- + 2 | 2 | yes +(1 row) + +PREPARE query_pushdown_no_dist_key(invite_resp) AS select * from event_responses e1 LEFT JOIN event_responses e2 USING(event_id) where e1.response = $1::invite_resp LIMIT 1; +EXECUTE query_pushdown_no_dist_key('yes'); + event_id | user_id | response | user_id | response +--------------------------------------------------------------------- + 1 | 1 | yes | 1 | yes +(1 row) + +EXECUTE query_pushdown_no_dist_key('yes'); + event_id | user_id | response | user_id | response +--------------------------------------------------------------------- + 1 | 1 | yes | 1 | yes +(1 row) + +EXECUTE query_pushdown_no_dist_key('yes'); + event_id | user_id | response | user_id | response +--------------------------------------------------------------------- + 1 | 1 | yes | 1 | yes +(1 row) + +EXECUTE query_pushdown_no_dist_key('yes'); + event_id | user_id | response | user_id | response +--------------------------------------------------------------------- + 1 | 1 | yes | 1 | yes +(1 row) + +EXECUTE query_pushdown_no_dist_key('yes'); + event_id | user_id | response | user_id | response +--------------------------------------------------------------------- + 1 | 1 | yes | 1 | yes +(1 row) + +EXECUTE query_pushdown_no_dist_key('yes'); + event_id | user_id | response | user_id | response +--------------------------------------------------------------------- + 1 | 1 | yes | 1 | yes +(1 row) + +EXECUTE query_pushdown_no_dist_key('yes'); + event_id | user_id | response | user_id | response +--------------------------------------------------------------------- + 1 | 1 | yes | 1 | yes +(1 row) + +PREPARE insert_select_via_coord(invite_resp) AS INSERT INTO event_responses SELECT * FROM event_responses where response = $1::invite_resp LIMIT 1 ON CONFLICT (event_id, user_id) DO NOTHING ; +EXECUTE insert_select_via_coord('yes'); +EXECUTE insert_select_via_coord('yes'); +EXECUTE insert_select_via_coord('yes'); +EXECUTE insert_select_via_coord('yes'); +EXECUTE insert_select_via_coord('yes'); +EXECUTE insert_select_via_coord('yes'); +EXECUTE insert_select_via_coord('yes'); +PREPARE insert_select_pushdown(invite_resp) AS INSERT INTO event_responses SELECT * FROM event_responses where response = $1::invite_resp ON CONFLICT (event_id, user_id) DO NOTHING; +EXECUTE insert_select_pushdown('yes'); +EXECUTE insert_select_pushdown('yes'); +EXECUTE insert_select_pushdown('yes'); +EXECUTE insert_select_pushdown('yes'); +EXECUTE insert_select_pushdown('yes'); +EXECUTE insert_select_pushdown('yes'); +EXECUTE insert_select_pushdown('yes'); +PREPARE router_select_with_no_dist_key_filter(invite_resp) AS select * from event_responses where event_id = 1 AND response = $1::invite_resp LIMIT 1; +EXECUTE router_select_with_no_dist_key_filter('yes'); + event_id | user_id | response +--------------------------------------------------------------------- + 1 | 1 | yes +(1 row) + +EXECUTE router_select_with_no_dist_key_filter('yes'); + event_id | user_id | response +--------------------------------------------------------------------- + 1 | 1 | yes +(1 row) + +EXECUTE router_select_with_no_dist_key_filter('yes'); + event_id | user_id | response +--------------------------------------------------------------------- + 1 | 1 | yes +(1 row) + +EXECUTE router_select_with_no_dist_key_filter('yes'); + event_id | user_id | response +--------------------------------------------------------------------- + 1 | 1 | yes +(1 row) + +EXECUTE router_select_with_no_dist_key_filter('yes'); + event_id | user_id | response +--------------------------------------------------------------------- + 1 | 1 | yes +(1 row) + +EXECUTE router_select_with_no_dist_key_filter('yes'); + event_id | user_id | response +--------------------------------------------------------------------- + 1 | 1 | yes +(1 row) + +EXECUTE router_select_with_no_dist_key_filter('yes'); + event_id | user_id | response +--------------------------------------------------------------------- + 1 | 1 | yes +(1 row) + +-- rest of the tests assume the table is empty +TRUNCATE event_responses; CREATE OR REPLACE PROCEDURE register_for_event(p_event_id int, p_user_id int, p_choice invite_resp) LANGUAGE plpgsql AS $fn$ BEGIN diff --git a/src/test/regress/sql/local_shard_execution.sql b/src/test/regress/sql/local_shard_execution.sql index 8a102780d..a18ab177a 100644 --- a/src/test/regress/sql/local_shard_execution.sql +++ b/src/test/regress/sql/local_shard_execution.sql @@ -846,7 +846,7 @@ RESET citus.log_local_commands; \c - - - :master_port SET citus.next_shard_id TO 1480000; --- local execution with custom type +-- test both local and remote execution with custom type SET citus.replication_model TO "streaming"; SET citus.shard_replication_factor TO 1; CREATE TYPE invite_resp AS ENUM ('yes', 'no', 'maybe'); @@ -860,6 +860,105 @@ CREATE TABLE event_responses ( SELECT create_distributed_table('event_responses', 'event_id'); +INSERT INTO event_responses VALUES (1, 1, 'yes'), (2, 2, 'yes'), (3, 3, 'no'), (4, 4, 'no'); + +CREATE OR REPLACE FUNCTION regular_func(p invite_resp) +RETURNS int AS $$ +DECLARE + q1Result INT; + q2Result INT; + q3Result INT; +BEGIN +SELECT count(*) INTO q1Result FROM event_responses WHERE response = $1; +SELECT count(*) INTO q2Result FROM event_responses e1 LEFT JOIN event_responses e2 USING (event_id) WHERE e2.response = $1; +SELECT count(*) INTO q3Result FROM (SELECT * FROM event_responses WHERE response = $1 LIMIT 5) as foo; +RETURN q3Result+q2Result+q1Result; +END; +$$ LANGUAGE plpgsql; + +SELECT regular_func('yes'); +SELECT regular_func('yes'); +SELECT regular_func('yes'); +SELECT regular_func('yes'); +SELECT regular_func('yes'); +SELECT regular_func('yes'); +SELECT regular_func('yes'); +SELECT regular_func('yes'); + +CREATE OR REPLACE PROCEDURE regular_procedure(p invite_resp) +AS $$ +BEGIN +PERFORM * FROM event_responses WHERE response = $1; +PERFORM * FROM event_responses e1 LEFT JOIN event_responses e2 USING (event_id) WHERE e2.response = $1; +PERFORM * FROM (SELECT * FROM event_responses WHERE response = $1 LIMIT 5) as foo; +END; +$$ LANGUAGE plpgsql; + +CALL regular_procedure('no'); +CALL regular_procedure('no'); +CALL regular_procedure('no'); +CALL regular_procedure('no'); +CALL regular_procedure('no'); +CALL regular_procedure('no'); +CALL regular_procedure('no'); + +PREPARE multi_shard_no_dist_key(invite_resp) AS select * from event_responses where response = $1::invite_resp LIMIT 1; +EXECUTE multi_shard_no_dist_key('yes'); +EXECUTE multi_shard_no_dist_key('yes'); +EXECUTE multi_shard_no_dist_key('yes'); +EXECUTE multi_shard_no_dist_key('yes'); +EXECUTE multi_shard_no_dist_key('yes'); +EXECUTE multi_shard_no_dist_key('yes'); +EXECUTE multi_shard_no_dist_key('yes'); + +PREPARE multi_shard_with_dist_key(int, invite_resp) AS select * from event_responses where event_id > $1 AND response = $2::invite_resp LIMIT 1; +EXECUTE multi_shard_with_dist_key(1, 'yes'); +EXECUTE multi_shard_with_dist_key(1, 'yes'); +EXECUTE multi_shard_with_dist_key(1, 'yes'); +EXECUTE multi_shard_with_dist_key(1, 'yes'); +EXECUTE multi_shard_with_dist_key(1, 'yes'); +EXECUTE multi_shard_with_dist_key(1, 'yes'); +EXECUTE multi_shard_with_dist_key(1, 'yes'); + +PREPARE query_pushdown_no_dist_key(invite_resp) AS select * from event_responses e1 LEFT JOIN event_responses e2 USING(event_id) where e1.response = $1::invite_resp LIMIT 1; +EXECUTE query_pushdown_no_dist_key('yes'); +EXECUTE query_pushdown_no_dist_key('yes'); +EXECUTE query_pushdown_no_dist_key('yes'); +EXECUTE query_pushdown_no_dist_key('yes'); +EXECUTE query_pushdown_no_dist_key('yes'); +EXECUTE query_pushdown_no_dist_key('yes'); +EXECUTE query_pushdown_no_dist_key('yes'); + +PREPARE insert_select_via_coord(invite_resp) AS INSERT INTO event_responses SELECT * FROM event_responses where response = $1::invite_resp LIMIT 1 ON CONFLICT (event_id, user_id) DO NOTHING ; +EXECUTE insert_select_via_coord('yes'); +EXECUTE insert_select_via_coord('yes'); +EXECUTE insert_select_via_coord('yes'); +EXECUTE insert_select_via_coord('yes'); +EXECUTE insert_select_via_coord('yes'); +EXECUTE insert_select_via_coord('yes'); +EXECUTE insert_select_via_coord('yes'); + +PREPARE insert_select_pushdown(invite_resp) AS INSERT INTO event_responses SELECT * FROM event_responses where response = $1::invite_resp ON CONFLICT (event_id, user_id) DO NOTHING; +EXECUTE insert_select_pushdown('yes'); +EXECUTE insert_select_pushdown('yes'); +EXECUTE insert_select_pushdown('yes'); +EXECUTE insert_select_pushdown('yes'); +EXECUTE insert_select_pushdown('yes'); +EXECUTE insert_select_pushdown('yes'); +EXECUTE insert_select_pushdown('yes'); + +PREPARE router_select_with_no_dist_key_filter(invite_resp) AS select * from event_responses where event_id = 1 AND response = $1::invite_resp LIMIT 1; +EXECUTE router_select_with_no_dist_key_filter('yes'); +EXECUTE router_select_with_no_dist_key_filter('yes'); +EXECUTE router_select_with_no_dist_key_filter('yes'); +EXECUTE router_select_with_no_dist_key_filter('yes'); +EXECUTE router_select_with_no_dist_key_filter('yes'); +EXECUTE router_select_with_no_dist_key_filter('yes'); +EXECUTE router_select_with_no_dist_key_filter('yes'); + +-- rest of the tests assume the table is empty +TRUNCATE event_responses; + CREATE OR REPLACE PROCEDURE register_for_event(p_event_id int, p_user_id int, p_choice invite_resp) LANGUAGE plpgsql AS $fn$ BEGIN