From 13405e7871eacb9173f5140bd8deb22686fe1429 Mon Sep 17 00:00:00 2001 From: eaydingol Date: Wed, 13 Aug 2025 02:06:34 +0300 Subject: [PATCH] Run update quals code only for queries with outer joins, update planners for error handling, update test cases. --- .../distributed/planner/deparse_shard_query.c | 30 +++++++++++- .../distributed/planner/distributed_planner.c | 14 +----- .../planner/insert_select_planner.c | 3 +- .../distributed/planner/merge_planner.c | 3 +- .../planner/multi_physical_planner.c | 49 ++++++++++++++----- .../planner/multi_router_planner.c | 3 +- .../planner/query_pushdown_planning.c | 38 +++++++++++--- .../distributed/planner/recursive_planning.c | 21 ++++++-- src/include/distributed/deparse_shard_query.h | 1 + src/include/distributed/distributed_planner.h | 12 +++++ .../distributed/query_pushdown_planning.h | 11 +++-- src/include/distributed/recursive_planning.h | 4 +- .../regress/expected/multi_insert_select.out | 41 +++++++++++----- ...lti_subquery_in_where_reference_clause.out | 4 +- .../expected/query_single_shard_table.out | 3 +- .../regress/expected/recurring_outer_join.out | 18 ++++--- src/test/regress/sql/multi_insert_select.sql | 8 +-- ...lti_subquery_in_where_reference_clause.sql | 4 +- 18 files changed, 193 insertions(+), 74 deletions(-) diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index abdae7854..2c91df2a1 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -319,6 +319,35 @@ DefineQualsForShardInterval(RelationShard *relationShard, int attnum, int rtinde } +/* + * UpdateWhereClauseForOuterJoinWalker walks over the query tree and + * updates the WHERE clause for outer joins satisfying feasibility conditions. + */ +bool +UpdateWhereClauseForOuterJoinWalker(Node *node, List *relationShardList) +{ + if (node == NULL) + { + return false; + } + + if (IsA(node, Query)) + { + UpdateWhereClauseForOuterJoin((Query *) node, relationShardList); + return query_tree_walker((Query *) node, UpdateWhereClauseForOuterJoinWalker, + relationShardList, QTW_EXAMINE_RTES_BEFORE); + } + + if (!IsA(node, RangeTblEntry)) + { + return expression_tree_walker(node, UpdateWhereClauseForOuterJoinWalker, + relationShardList); + } + + return false; +} + + /* * UpdateWhereClauseForOuterJoin * @@ -442,7 +471,6 @@ UpdateRelationToShardNames(Node *node, List *relationShardList) /* want to look at all RTEs, even in subqueries, CTEs and such */ if (IsA(node, Query)) { - UpdateWhereClauseForOuterJoin((Query *) node, relationShardList); /* TODO, check this again, we might want to skip this for fast path queries */ return query_tree_walker((Query *) node, UpdateRelationToShardNames, relationShardList, QTW_EXAMINE_RTES_BEFORE); } diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index e22296ec7..098aaeb13 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -75,17 +75,6 @@ #endif -/* RouterPlanType is used to determine the router plan to invoke */ -typedef enum RouterPlanType -{ - INSERT_SELECT_INTO_CITUS_TABLE, - INSERT_SELECT_INTO_LOCAL_TABLE, - DML_QUERY, - SELECT_QUERY, - MERGE_QUERY, - REPLAN_WITH_BOUND_PARAMETERS -} RouterPlanType; - static List *plannerRestrictionContextList = NIL; int MultiTaskQueryLogLevel = CITUS_LOG_LEVEL_OFF; /* multi-task query log level */ static uint64 NextPlanId = 1; @@ -1097,7 +1086,8 @@ CreateDistributedPlan(uint64 planId, bool allowRecursivePlanning, Query *origina * set_plan_references>add_rtes_to_flat_rtable>add_rte_to_flat_rtable. */ List *subPlanList = GenerateSubplansForSubqueriesAndCTEs(planId, originalQuery, - plannerRestrictionContext); + plannerRestrictionContext, + routerPlan); /* * If subqueries were recursively planned then we need to replan the query diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index 3bf0bb327..1cf996b77 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -766,7 +766,8 @@ DistributedInsertSelectSupported(Query *queryTree, RangeTblEntry *insertRte, { /* first apply toplevel pushdown checks to SELECT query */ error = - DeferErrorIfUnsupportedSubqueryPushdown(subquery, plannerRestrictionContext); + DeferErrorIfUnsupportedSubqueryPushdown(subquery, plannerRestrictionContext, + true); if (error) { return error; diff --git a/src/backend/distributed/planner/merge_planner.c b/src/backend/distributed/planner/merge_planner.c index 6f3993794..b1c441f92 100644 --- a/src/backend/distributed/planner/merge_planner.c +++ b/src/backend/distributed/planner/merge_planner.c @@ -1149,7 +1149,8 @@ DeferErrorIfRoutableMergeNotSupported(Query *query, List *rangeTableList, { deferredError = DeferErrorIfUnsupportedSubqueryPushdown(query, - plannerRestrictionContext); + plannerRestrictionContext, + true); if (deferredError) { ereport(DEBUG1, (errmsg("Sub-query is not pushable, try repartitioning"))); diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 0a664943d..15d48d821 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -168,13 +168,15 @@ static uint32 HashPartitionCount(void); static Job * BuildJobTreeTaskList(Job *jobTree, PlannerRestrictionContext *plannerRestrictionContext); static bool IsInnerTableOfOuterJoin(RelationRestriction *relationRestriction, - Bitmapset *distributedTables); + Bitmapset *distributedTables, + bool *outerPartHasDistributedTable); static void ErrorIfUnsupportedShardDistribution(Query *query); static Task * QueryPushdownTaskCreate(Query *originalQuery, int shardIndex, RelationRestrictionContext *restrictionContext, uint32 taskId, TaskType taskType, bool modifyRequiresCoordinatorEvaluation, + bool updateQualsForOuterJoin, DeferredErrorMessage **planningError); static List * SqlTaskList(Job *job); static bool DependsOnHashPartitionJob(Job *job); @@ -2248,6 +2250,8 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, } /* In the second loop, populate taskRequiredForShardIndex */ + bool updateQualsForOuterJoin = false; + bool outerPartHasDistributedTable = false; forboth_ptr(prunedShardList, prunedRelationShardList, relationRestriction, relationRestrictionContext->relationRestrictionList) { @@ -2270,9 +2274,21 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, * the table is part of the non-outer side of the join and the outer side has a * distributed table. */ - if (IsInnerTableOfOuterJoin(relationRestriction, distributedTableIndex)) + if (IsInnerTableOfOuterJoin(relationRestriction, distributedTableIndex, + &outerPartHasDistributedTable)) { - continue; + if (outerPartHasDistributedTable) + { + /* we can skip the shards from this relation restriction */ + continue; + } + else + { + /* The outer part does not include distributed tables, we can not skip shards. + * Also, we will possibly update the quals of the outer relation for recurring join push down, mark here. + */ + updateQualsForOuterJoin = true; + } } ShardInterval *shardInterval = NULL; @@ -2305,6 +2321,7 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, taskIdIndex, taskType, modifyRequiresCoordinatorEvaluation, + updateQualsForOuterJoin, planningError); if (*planningError != NULL) { @@ -2337,13 +2354,16 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, * RelationRestriction if the table accessed for this relation is * a) in an outer join * b) on the inner part of said join - * c) the outer part of the join has a distributed table + * + * The function also sets outerPartHasDistributedTable if the outer part + * of the corresponding join has a distributed table. * * The function returns true only if all three conditions above hold true. */ static bool IsInnerTableOfOuterJoin(RelationRestriction *relationRestriction, - Bitmapset *distributedTables) + Bitmapset *distributedTables, + bool *outerPartHasDistributedTable) { RestrictInfo *joinInfo = NULL; foreach_declared_ptr(joinInfo, relationRestriction->relOptInfo->joininfo) @@ -2364,14 +2384,12 @@ IsInnerTableOfOuterJoin(RelationRestriction *relationRestriction, if (!isInOuter) { /* this table is joined in the inner part of an outer join */ - /* check if the outer part has a distributed relation */ - bool outerPartHasDistributedTable = bms_overlap(joinInfo->outer_relids, - distributedTables); - if (outerPartHasDistributedTable) - { - /* this is an inner table of an outer join with a distributed table */ - return true; - } + /* set if the outer part has a distributed relation */ + *outerPartHasDistributedTable = bms_overlap(joinInfo->outer_relids, + distributedTables); + + /* this is an inner table of an outer join */ + return true; } } @@ -2487,6 +2505,7 @@ static Task * QueryPushdownTaskCreate(Query *originalQuery, int shardIndex, RelationRestrictionContext *restrictionContext, uint32 taskId, TaskType taskType, bool modifyRequiresCoordinatorEvaluation, + bool updateQualsForOuterJoin, DeferredErrorMessage **planningError) { Query *taskQuery = copyObject(originalQuery); @@ -2579,6 +2598,10 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex, */ UpdateRelationToShardNames((Node *) taskQuery, relationShardList); + if (updateQualsForOuterJoin) + { + UpdateWhereClauseForOuterJoinWalker((Node *) taskQuery, relationShardList); + } /* * Ands are made implicit during shard pruning, as predicate comparison and diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 59124c5bf..43f79f30b 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -1321,7 +1321,8 @@ MultiShardUpdateDeleteSupported(Query *originalQuery, { errorMessage = DeferErrorIfUnsupportedSubqueryPushdown( originalQuery, - plannerRestrictionContext); + plannerRestrictionContext, + true); } return errorMessage; diff --git a/src/backend/distributed/planner/query_pushdown_planning.c b/src/backend/distributed/planner/query_pushdown_planning.c index ea22a1292..84f94dbb6 100644 --- a/src/backend/distributed/planner/query_pushdown_planning.c +++ b/src/backend/distributed/planner/query_pushdown_planning.c @@ -88,7 +88,7 @@ static bool WindowPartitionOnDistributionColumn(Query *query); static DeferredErrorMessage * DeferErrorIfFromClauseRecurs(Query *queryTree); static RecurringTuplesType FromClauseRecurringTupleType(Query *queryTree); static DeferredErrorMessage * DeferredErrorIfUnsupportedRecurringTuplesJoin( - PlannerRestrictionContext *plannerRestrictionContext); + PlannerRestrictionContext *plannerRestrictionContext, bool plannerPhase); static DeferredErrorMessage * DeferErrorIfUnsupportedTableCombination(Query *queryTree); static DeferredErrorMessage * DeferErrorIfSubqueryRequiresMerge(Query *subqueryTree, bool lateral, @@ -536,9 +536,16 @@ SubqueryMultiNodeTree(Query *originalQuery, Query *queryTree, RaiseDeferredError(unsupportedQueryError, ERROR); } + /* + * We reach here at the third step of the planning, thus we already checked for pushed down + * feasibility of recurring outer joins, at this step the unsupported outer join check should + * only generate an error when there is a lateral subquery. + */ DeferredErrorMessage *subqueryPushdownError = DeferErrorIfUnsupportedSubqueryPushdown( originalQuery, - plannerRestrictionContext); + plannerRestrictionContext, + false); + if (subqueryPushdownError != NULL) { RaiseDeferredError(subqueryPushdownError, ERROR); @@ -561,7 +568,8 @@ SubqueryMultiNodeTree(Query *originalQuery, Query *queryTree, DeferredErrorMessage * DeferErrorIfUnsupportedSubqueryPushdown(Query *originalQuery, PlannerRestrictionContext * - plannerRestrictionContext) + plannerRestrictionContext, + bool plannerPhase) { bool outerMostQueryHasLimit = false; ListCell *subqueryCell = NULL; @@ -613,7 +621,8 @@ DeferErrorIfUnsupportedSubqueryPushdown(Query *originalQuery, return error; } - error = DeferredErrorIfUnsupportedRecurringTuplesJoin(plannerRestrictionContext); + error = DeferredErrorIfUnsupportedRecurringTuplesJoin(plannerRestrictionContext, + plannerPhase); if (error) { return error; @@ -771,12 +780,17 @@ FromClauseRecurringTupleType(Query *queryTree) * DeferredErrorIfUnsupportedRecurringTuplesJoin returns a DeferredError if * there exists a join between a recurring rel (such as reference tables * and intermediate_results) and a non-recurring rel (such as distributed tables - * and subqueries that we can push-down to worker nodes) that can return an - * incorrect result set due to recurring tuples coming from the recurring rel. + * and subqueries that we can push-down to worker nodes) when plannerPhase is + * true, so that we try to recursively plan these joins. + * During recursive planning phase, we either replace those with recursive plans + * or leave them if it is safe to push-down. + * During the logical planning phase (plannerPhase is false), we only check if + * such queries have lateral subqueries. */ static DeferredErrorMessage * DeferredErrorIfUnsupportedRecurringTuplesJoin( - PlannerRestrictionContext *plannerRestrictionContext) + PlannerRestrictionContext *plannerRestrictionContext, + bool plannerPhase) { List *joinRestrictionList = plannerRestrictionContext->joinRestrictionContext->joinRestrictionList; @@ -829,6 +843,16 @@ DeferredErrorIfUnsupportedRecurringTuplesJoin( if (RelationInfoContainsOnlyRecurringTuples(plannerInfo, outerrelRelids)) { + if (plannerPhase) + { + /* + * We have not yet tried to recursively plan this join, we should + * defer an error. + */ + recurType = FetchFirstRecurType(plannerInfo, outerrelRelids); + break; + } + /* * Inner side contains distributed rels but the outer side only * contains recurring rels, might be an unsupported lateral outer diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index a6737353c..1ed22be64 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -110,6 +110,7 @@ struct RecursivePlanningContextInternal List *subPlanList; PlannerRestrictionContext *plannerRestrictionContext; bool restrictionEquivalenceCheck; + bool forceRecursivePlanning; }; /* track depth of current recursive planner query */ @@ -214,7 +215,8 @@ static bool hasPseudoconstantQuals( */ List * GenerateSubplansForSubqueriesAndCTEs(uint64 planId, Query *originalQuery, - PlannerRestrictionContext *plannerRestrictionContext) + PlannerRestrictionContext *plannerRestrictionContext, + RouterPlanType routerPlan) { RecursivePlanningContext context; @@ -228,6 +230,17 @@ GenerateSubplansForSubqueriesAndCTEs(uint64 planId, Query *originalQuery, context.planId = planId; context.subPlanList = NIL; context.plannerRestrictionContext = plannerRestrictionContext; + context.forceRecursivePlanning = false; + + /* + * Force recursive planning of recurring outer joins for these queries + * since the planning error from the previous step is generated prior to + * the actual planning attempt. + */ + if (routerPlan == DML_QUERY) + { + context.forceRecursivePlanning = true; + } /* * Calculating the distribution key equality upfront is a trade-off for us. @@ -756,7 +769,8 @@ RecursivelyPlanRecurringTupleOuterJoinWalker(Node *node, Query *query, /* left join */ if (leftNodeRecurs && !rightNodeRecurs) { - if (chainedJoin || !CheckPushDownFeasibilityOuterJoin(joinExpr, + if (recursivePlanningContext->forceRecursivePlanning || + chainedJoin || !CheckPushDownFeasibilityOuterJoin(joinExpr, query)) { ereport(DEBUG1, (errmsg("recursively planning right side of " @@ -787,7 +801,8 @@ RecursivelyPlanRecurringTupleOuterJoinWalker(Node *node, Query *query, /* right join */ if (!leftNodeRecurs && rightNodeRecurs) { - if (chainedJoin || !CheckPushDownFeasibilityOuterJoin(joinExpr, + if (recursivePlanningContext->forceRecursivePlanning || + chainedJoin || !CheckPushDownFeasibilityOuterJoin(joinExpr, query)) { ereport(DEBUG1, (errmsg("recursively planning left side of " diff --git a/src/include/distributed/deparse_shard_query.h b/src/include/distributed/deparse_shard_query.h index fcb83c70a..d5d0ae2d2 100644 --- a/src/include/distributed/deparse_shard_query.h +++ b/src/include/distributed/deparse_shard_query.h @@ -26,6 +26,7 @@ extern void RebuildQueryStrings(Job *workerJob); extern bool UpdateRelationToShardNames(Node *node, List *relationShardList); extern void UpdateWhereClauseForOuterJoin(Query *query, List *relationShardList); +extern bool UpdateWhereClauseForOuterJoinWalker(Node *node, List *relationShardList); Node * DefineQualsForShardInterval(RelationShard *relationShard, int attnum, int outerRtIndex); extern void SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query); diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index f416aa911..67637cd78 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -33,6 +33,18 @@ extern int PlannerLevel; +/* RouterPlanType is used to determine the router plan to invoke */ +typedef enum RouterPlanType +{ + INSERT_SELECT_INTO_CITUS_TABLE, + INSERT_SELECT_INTO_LOCAL_TABLE, + DML_QUERY, + SELECT_QUERY, + MERGE_QUERY, + REPLAN_WITH_BOUND_PARAMETERS +} RouterPlanType; + + typedef struct RelationRestrictionContext { bool allReferenceTables; diff --git a/src/include/distributed/query_pushdown_planning.h b/src/include/distributed/query_pushdown_planning.h index 7035f5fc2..ba92a0462 100644 --- a/src/include/distributed/query_pushdown_planning.h +++ b/src/include/distributed/query_pushdown_planning.h @@ -37,11 +37,12 @@ extern MultiNode * SubqueryMultiNodeTree(Query *originalQuery, Query *queryTree, PlannerRestrictionContext * plannerRestrictionContext); -extern DeferredErrorMessage * DeferErrorIfUnsupportedSubqueryPushdown(Query * - originalQuery, - PlannerRestrictionContext - * - plannerRestrictionContext); +extern DeferredErrorMessage * DeferErrorIfUnsupportedSubqueryPushdown( + Query *originalQuery, + PlannerRestrictionContext + * + plannerRestrictionContext, + bool plannerPhase); extern DeferredErrorMessage * DeferErrorIfCannotPushdownSubquery(Query *subqueryTree, bool outerMostQueryHasLimit); diff --git a/src/include/distributed/recursive_planning.h b/src/include/distributed/recursive_planning.h index feb6dadd2..83be73a90 100644 --- a/src/include/distributed/recursive_planning.h +++ b/src/include/distributed/recursive_planning.h @@ -16,6 +16,7 @@ #include "pg_version_constants.h" +#include "distributed/distributed_planner.h" #include "distributed/errormessage.h" #include "distributed/log_utils.h" #include "distributed/relation_restriction_equivalence.h" @@ -33,7 +34,8 @@ extern PlannerRestrictionContext * GetPlannerRestrictionContext( RecursivePlanningContext *recursivePlanningContext); extern List * GenerateSubplansForSubqueriesAndCTEs(uint64 planId, Query *originalQuery, PlannerRestrictionContext * - plannerRestrictionContext); + plannerRestrictionContext, + RouterPlanType routerPlan); extern char * GenerateResultId(uint64 planId, uint32 subPlanId); extern Query * BuildSubPlanResultQuery(List *targetEntryList, List *columnAliasList, char *resultId); diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index cbbb84dea..1d854704e 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -760,15 +760,28 @@ DEBUG: distributed statement: INSERT INTO multi_insert_select.agg_events_133000 DEBUG: distributed statement: INSERT INTO multi_insert_select.agg_events_13300009 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (multi_insert_select.raw_events_first_13300001 raw_events_first LEFT JOIN multi_insert_select.raw_events_second_13300005 raw_events_second ON ((raw_events_first.user_id OPERATOR(pg_catalog.=) raw_events_second.user_id))) WHERE (raw_events_first.user_id IS NOT NULL) DEBUG: distributed statement: INSERT INTO multi_insert_select.agg_events_13300010 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (multi_insert_select.raw_events_first_13300002 raw_events_first LEFT JOIN multi_insert_select.raw_events_second_13300006 raw_events_second ON ((raw_events_first.user_id OPERATOR(pg_catalog.=) raw_events_second.user_id))) WHERE (raw_events_first.user_id IS NOT NULL) DEBUG: distributed statement: INSERT INTO multi_insert_select.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (multi_insert_select.raw_events_first_13300003 raw_events_first LEFT JOIN multi_insert_select.raw_events_second_13300007 raw_events_second ON ((raw_events_first.user_id OPERATOR(pg_catalog.=) raw_events_second.user_id))) WHERE (raw_events_first.user_id IS NOT NULL) + SET client_min_messages to debug3; INSERT INTO agg_events (user_id) SELECT raw_events_second.user_id FROM reference_table LEFT JOIN raw_events_second ON reference_table.user_id = raw_events_second.user_id; -DEBUG: distributed statement: INSERT INTO multi_insert_select.agg_events_13300008 AS citus_table_alias (user_id) SELECT raw_events_second.user_id FROM (multi_insert_select.reference_table_13300012 reference_table LEFT JOIN multi_insert_select.raw_events_second_13300004 raw_events_second ON ((reference_table.user_id OPERATOR(pg_catalog.=) raw_events_second.user_id))) WHERE ((raw_events_second.user_id IS NOT NULL) AND ((reference_table.user_id IS NULL) OR ((btint4cmp('-2147483648'::integer, hashint4(reference_table.user_id)) OPERATOR(pg_catalog.<) 0) AND (btint4cmp(hashint4(reference_table.user_id), '-1073741825'::integer) OPERATOR(pg_catalog.<=) 0)))) -DEBUG: distributed statement: INSERT INTO multi_insert_select.agg_events_13300009 AS citus_table_alias (user_id) SELECT raw_events_second.user_id FROM (multi_insert_select.reference_table_13300012 reference_table LEFT JOIN multi_insert_select.raw_events_second_13300005 raw_events_second ON ((reference_table.user_id OPERATOR(pg_catalog.=) raw_events_second.user_id))) WHERE ((raw_events_second.user_id IS NOT NULL) AND ((btint4cmp('-1073741824'::integer, hashint4(reference_table.user_id)) OPERATOR(pg_catalog.<) 0) AND (btint4cmp(hashint4(reference_table.user_id), '-1'::integer) OPERATOR(pg_catalog.<=) 0))) -DEBUG: distributed statement: INSERT INTO multi_insert_select.agg_events_13300010 AS citus_table_alias (user_id) SELECT raw_events_second.user_id FROM (multi_insert_select.reference_table_13300012 reference_table LEFT JOIN multi_insert_select.raw_events_second_13300006 raw_events_second ON ((reference_table.user_id OPERATOR(pg_catalog.=) raw_events_second.user_id))) WHERE ((raw_events_second.user_id IS NOT NULL) AND ((btint4cmp(0, hashint4(reference_table.user_id)) OPERATOR(pg_catalog.<) 0) AND (btint4cmp(hashint4(reference_table.user_id), 1073741823) OPERATOR(pg_catalog.<=) 0))) -DEBUG: distributed statement: INSERT INTO multi_insert_select.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_second.user_id FROM (multi_insert_select.reference_table_13300012 reference_table LEFT JOIN multi_insert_select.raw_events_second_13300007 raw_events_second ON ((reference_table.user_id OPERATOR(pg_catalog.=) raw_events_second.user_id))) WHERE ((raw_events_second.user_id IS NOT NULL) AND ((btint4cmp(1073741824, hashint4(reference_table.user_id)) OPERATOR(pg_catalog.<) 0) AND (btint4cmp(hashint4(reference_table.user_id), 2147483647) OPERATOR(pg_catalog.<=) 0))) +DEBUG: no shard pruning constraints on raw_events_second found +DEBUG: shard count after pruning for raw_events_second: 4 +DEBUG: cannot perform a lateral outer join when a distributed subquery references a reference table +DEBUG: no shard pruning constraints on raw_events_second found +DEBUG: shard count after pruning for raw_events_second: 4 +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: a push down safe left join with recurring left side +DEBUG: no shard pruning constraints on raw_events_second found +DEBUG: shard count after pruning for raw_events_second: 4 +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: performing repartitioned INSERT ... SELECT +DEBUG: partitioning SELECT query by column index 0 with name 'user_id' + SET client_min_messages to debug2; INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id @@ -3372,17 +3385,21 @@ $$); Task Count: 1 (4 rows) --- verify that insert select can be pushed down when we have reference table in outside of outer join. -SELECT coordinator_plan($$ +-- verify that insert select cannot be pushed down when we have reference table in outside of outer join in a chained-join. + SELECT coordinator_plan($$ EXPLAIN (COSTS FALSE) INSERT INTO dist_table_5 SELECT a.id FROM dist_table_5 a LEFT JOIN ref_table_1 b ON (true) RIGHT JOIN ref_table_1 c ON (true); $$); - coordinator_plan + coordinator_plan --------------------------------------------------------------------- - Custom Scan (Citus Adaptive) - Task Count: 4 -(2 rows) + Custom Scan (Citus INSERT ... SELECT) + INSERT/SELECT method: pull to coordinator + -> Custom Scan (Citus Adaptive) + -> Distributed Subplan XXX_1 + -> Custom Scan (Citus Adaptive) + Task Count: 4 +(6 rows) --- verify that insert select can be pushed down when we have reference table in outside of left join. +-- verify that insert select can be pushed down when we have reference table in outside of outer join. SELECT coordinator_plan($$ EXPLAIN (COSTS FALSE) INSERT INTO dist_table_5 SELECT id FROM ref_table_1 LEFT JOIN dist_table_5 USING(id); $$); @@ -3394,7 +3411,7 @@ $$); Task Count: 4 (4 rows) --- verify that insert select cannot be pushed down when we have reference table in outside of left join and joined on non-distribution column. +-- verify that insert select cannot be pushed down when we have reference table in outside of left join and joined on non-partition column. SELECT coordinator_plan($$ EXPLAIN (COSTS FALSE) INSERT INTO dist_table_5 SELECT ref_table_1.id FROM ref_table_1 LEFT JOIN dist_table_5 ON ref_table_1.id = dist_table_5.id2; $$); diff --git a/src/test/regress/expected/multi_subquery_in_where_reference_clause.out b/src/test/regress/expected/multi_subquery_in_where_reference_clause.out index d2f9f2986..3f8b637dd 100644 --- a/src/test/regress/expected/multi_subquery_in_where_reference_clause.out +++ b/src/test/regress/expected/multi_subquery_in_where_reference_clause.out @@ -152,9 +152,9 @@ SELECT FROM users_table RIGHT JOIN users_reference_table USING (user_id) WHERE - users_reference_table.value_2 IN + (users_reference_table.value_2, random()*0) IN (SELECT - value_2 + value_2, 0 FROM events_table WHERE diff --git a/src/test/regress/expected/query_single_shard_table.out b/src/test/regress/expected/query_single_shard_table.out index 6eefd21b0..5475e0c63 100644 --- a/src/test/regress/expected/query_single_shard_table.out +++ b/src/test/regress/expected/query_single_shard_table.out @@ -1996,8 +1996,7 @@ WITH cte AS ( ) SELECT (a+5)*2, b FROM cte; DEBUG: CTE cte is going to be inlined via distributed planning -DEBUG: cannot push down this subquery -DETAIL: Distinct on columns without partition column is currently unsupported +DEBUG: cannot perform a lateral outer join when a distributed subquery references a reference table DEBUG: generating subplan XXX_1 for subquery SELECT DISTINCT reference_table.a, 1 AS b FROM (query_single_shard_table.distributed_table RIGHT JOIN query_single_shard_table.reference_table USING (a)) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((a OPERATOR(pg_catalog.+) 5) OPERATOR(pg_catalog.*) 2) AS user_id, b AS value_1 FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte DEBUG: Collecting INSERT ... SELECT results on coordinator diff --git a/src/test/regress/expected/recurring_outer_join.out b/src/test/regress/expected/recurring_outer_join.out index 0c479f7e1..b2ba0591f 100644 --- a/src/test/regress/expected/recurring_outer_join.out +++ b/src/test/regress/expected/recurring_outer_join.out @@ -1820,8 +1820,10 @@ BEGIN; ) q WHERE dist_5.a = q.a RETURNING *; -DEBUG: generating subplan XXX_1 for subquery SELECT t1.a, t1.b FROM (recurring_outer_join.ref_1 t1 LEFT JOIN (SELECT DISTINCT ON (t2.a) t2.a, t2.b FROM recurring_outer_join.dist_1 t2 WHERE (EXISTS (SELECT t4.a, t4.b FROM recurring_outer_join.dist_1 t4 WHERE (t4.a OPERATOR(pg_catalog.=) t2.a))) ORDER BY t2.a, t2.b) t3 USING (a)) -DEBUG: Plan XXX query after replacing subqueries and CTEs: DELETE FROM recurring_outer_join.dist_5 USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) q WHERE (dist_5.a OPERATOR(pg_catalog.=) q.a) RETURNING dist_5.a, dist_5.b, q.a, q.b +DEBUG: recursively planning right side of the left join since the outer side is a recurring rel +DEBUG: recursively planning the distributed subquery since it is part of a distributed join node that is outer joined with a recurring rel +DEBUG: generating subplan XXX_1 for subquery SELECT DISTINCT ON (a) a, b FROM recurring_outer_join.dist_1 t2 WHERE (EXISTS (SELECT t4.a, t4.b FROM recurring_outer_join.dist_1 t4 WHERE (t4.a OPERATOR(pg_catalog.=) t2.a))) ORDER BY a, b +DEBUG: Plan XXX query after replacing subqueries and CTEs: DELETE FROM recurring_outer_join.dist_5 USING (SELECT t1.a, t1.b FROM (recurring_outer_join.ref_1 t1 LEFT JOIN (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) t3 USING (a))) q WHERE (dist_5.a OPERATOR(pg_catalog.=) q.a) RETURNING dist_5.a, dist_5.b, q.a, q.b a | b | a | b --------------------------------------------------------------------- 1 | 10 | 1 | 11 @@ -1849,8 +1851,10 @@ BEGIN; USING (a) ) RETURNING *; -DEBUG: generating subplan XXX_1 for subquery SELECT t1.a FROM (recurring_outer_join.ref_1 t1 LEFT JOIN (SELECT t2.a, t2.b FROM recurring_outer_join.dist_1 t2 WHERE (EXISTS (SELECT t4.a, t4.b FROM recurring_outer_join.dist_1 t4 WHERE (t4.a OPERATOR(pg_catalog.=) t2.a)))) t3 USING (a)) -DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE recurring_outer_join.dist_5 SET b = 10 WHERE (a OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer))) RETURNING a, b +DEBUG: recursively planning right side of the left join since the outer side is a recurring rel +DEBUG: recursively planning the distributed subquery since it is part of a distributed join node that is outer joined with a recurring rel +DEBUG: generating subplan XXX_1 for subquery SELECT a, b FROM recurring_outer_join.dist_1 t2 WHERE (EXISTS (SELECT t4.a, t4.b FROM recurring_outer_join.dist_1 t4 WHERE (t4.a OPERATOR(pg_catalog.=) t2.a))) +DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE recurring_outer_join.dist_5 SET b = 10 WHERE (a OPERATOR(pg_catalog.=) ANY (SELECT t1.a FROM (recurring_outer_join.ref_1 t1 LEFT JOIN (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) t3 USING (a)))) RETURNING a, b a | b --------------------------------------------------------------------- 1 | 10 @@ -1871,8 +1875,7 @@ BEGIN; FROM ref_1 t1 LEFT JOIN dist_1 t2 ON (t1.a = t2.a); -DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match -DETAIL: The target table's partition column should correspond to a partition column in the subquery. +DEBUG: cannot perform a lateral outer join when a distributed subquery references a reference table DEBUG: performing repartitioned INSERT ... SELECT ROLLBACK; -- INSERT .. SELECT: pull to coordinator @@ -1883,8 +1886,7 @@ BEGIN; FROM ref_1 t1 LEFT JOIN dist_1 t2 ON (t1.b = t2.b); -DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match -DETAIL: The target table's partition column should correspond to a partition column in the subquery. +DEBUG: cannot perform a lateral outer join when a distributed subquery references a reference table DEBUG: recursively planning right side of the left join since the outer side is a recurring rel DEBUG: recursively planning distributed relation "dist_1" "t2" since it is part of a distributed join node that is outer joined with a recurring rel DEBUG: Wrapping relation "dist_1" "t2" to a subquery diff --git a/src/test/regress/sql/multi_insert_select.sql b/src/test/regress/sql/multi_insert_select.sql index e96addd5b..1f0679e34 100644 --- a/src/test/regress/sql/multi_insert_select.sql +++ b/src/test/regress/sql/multi_insert_select.sql @@ -577,12 +577,14 @@ FROM FROM raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.user_id; + SET client_min_messages to debug3; INSERT INTO agg_events (user_id) SELECT raw_events_second.user_id FROM reference_table LEFT JOIN raw_events_second ON reference_table.user_id = raw_events_second.user_id; + SET client_min_messages to debug2; INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id @@ -2397,17 +2399,17 @@ SELECT coordinator_plan($$ EXPLAIN (COSTS FALSE) INSERT INTO dist_table_5 SELECT id, (SELECT id FROM ref_table_1 WHERE id = 1) FROM ref_table_1; $$); --- verify that insert select can be pushed down when we have reference table in outside of outer join. +-- verify that insert select cannot be pushed down when we have reference table in outside of outer join in a chained-join. SELECT coordinator_plan($$ EXPLAIN (COSTS FALSE) INSERT INTO dist_table_5 SELECT a.id FROM dist_table_5 a LEFT JOIN ref_table_1 b ON (true) RIGHT JOIN ref_table_1 c ON (true); $$); --- verify that insert select can be pushed down when we have reference table in outside of left join. +-- verify that insert select can be pushed down when we have reference table in outside of outer join. SELECT coordinator_plan($$ EXPLAIN (COSTS FALSE) INSERT INTO dist_table_5 SELECT id FROM ref_table_1 LEFT JOIN dist_table_5 USING(id); $$); --- verify that insert select cannot be pushed down when we have reference table in outside of left join and joined on non-distribution column. +-- verify that insert select cannot be pushed down when we have reference table in outside of left join and joined on non-partition column. SELECT coordinator_plan($$ EXPLAIN (COSTS FALSE) INSERT INTO dist_table_5 SELECT ref_table_1.id FROM ref_table_1 LEFT JOIN dist_table_5 ON ref_table_1.id = dist_table_5.id2; $$); diff --git a/src/test/regress/sql/multi_subquery_in_where_reference_clause.sql b/src/test/regress/sql/multi_subquery_in_where_reference_clause.sql index ab9f4a0cc..59c9d263a 100644 --- a/src/test/regress/sql/multi_subquery_in_where_reference_clause.sql +++ b/src/test/regress/sql/multi_subquery_in_where_reference_clause.sql @@ -132,9 +132,9 @@ SELECT FROM users_table RIGHT JOIN users_reference_table USING (user_id) WHERE - users_reference_table.value_2 IN + (users_reference_table.value_2, random()*0) IN (SELECT - value_2 + value_2, 0 FROM events_table WHERE