From fd8df25daf4e2e45d33f9e0a3cef824644d19920 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Mon, 20 Feb 2017 14:00:25 +0200 Subject: [PATCH 1/5] Disallow INSERT ... SELECT to pushdown joins on non-partition keys With this commit, we disallow JOINs on non-partition keys. Simply, while instantiating the qual we check whether the qual is on the partition key. --- src/backend/distributed/planner/multi_router_planner.c | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index a10c53d68..641864893 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -387,6 +387,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter List *originalBaserestrictInfo = restriction->relOptInfo->baserestrictinfo; InstantiateQualContext instantiateQualWalker; Var *relationPartitionKey = PartitionKey(restriction->relationId); + Var *relationPartitionKey = PartitionKey(restriction->relationId); /* * We haven't added the quals if all participating tables are reference @@ -3047,6 +3048,13 @@ InstantiatePartitionQualWalker(Node *node, void *context) return node; } + /* if the qual is not on the partition column, do not instantiate */ + if (relationPartitionColumn && currentColumn && + currentColumn->varattno != relationPartitionColumn->varattno) + { + return node; + } + /* get the integer >=, <= operators from the catalog */ integer4GEoperatorId = get_opfamily_member(INTEGER_BTREE_FAM_OID, INT4OID, INT4OID, From 8effb3073f1c9b0a880698de1b1da5fc55304fc5 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Wed, 22 Feb 2017 10:26:00 +0200 Subject: [PATCH 2/5] Give consistent error messages for unsupported JOINs on INSERT ... SELECT Ensure that all tables has the uninstantiated quals before planning of the distributed query starts. If not, give a meaningful error. --- .../planner/multi_router_planner.c | 109 ++++++++++++++- .../regress/expected/multi_insert_select.out | 124 +++++++++--------- src/test/regress/sql/multi_insert_select.sql | 60 ++++++++- 3 files changed, 221 insertions(+), 72 deletions(-) diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 641864893..6f8e1f143 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -127,7 +127,11 @@ static Node * InstantiatePartitionQualWalker(Node *node, void *context); static DeferredErrorMessage * InsertSelectQuerySupported(Query *queryTree, RangeTblEntry *insertRte, RangeTblEntry *subqueryRte, - bool allReferenceTables); + RelationRestrictionContext * + restrictionContext); +static bool AllRelationRestrictionsContainUninstantiatedQual(RelationRestrictionContext + *restrictionContext); +static bool HasUninstantiatedQualWalker(Node *node, void *context); static DeferredErrorMessage * MultiTaskRouterSelectQuerySupported(Query *query); static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query, RangeTblEntry *insertRte, @@ -270,7 +274,6 @@ CreateInsertSelectRouterPlan(Query *originalQuery, Oid targetRelationId = insertRte->relid; DistTableCacheEntry *targetCacheEntry = DistributedTableCacheEntry(targetRelationId); int shardCount = targetCacheEntry->shardIntervalArrayLength; - bool allReferenceTables = restrictionContext->allReferenceTables; /* * Error semantics for INSERT ... SELECT queries are different than regular @@ -278,7 +281,7 @@ CreateInsertSelectRouterPlan(Query *originalQuery, */ multiPlan->planningError = InsertSelectQuerySupported(originalQuery, insertRte, subqueryRte, - allReferenceTables); + restrictionContext); if (multiPlan->planningError) { return multiPlan; @@ -671,7 +674,8 @@ ExtractInsertRangeTableEntry(Query *query) */ static DeferredErrorMessage * InsertSelectQuerySupported(Query *queryTree, RangeTblEntry *insertRte, - RangeTblEntry *subqueryRte, bool allReferenceTables) + RangeTblEntry *subqueryRte, + RelationRestrictionContext *restrictionContext) { Query *subquery = NULL; Oid selectPartitionColumnTableId = InvalidOid; @@ -679,6 +683,7 @@ InsertSelectQuerySupported(Query *queryTree, RangeTblEntry *insertRte, char targetPartitionMethod = PartitionMethod(targetRelationId); ListCell *rangeTableCell = NULL; DeferredErrorMessage *error = NULL; + bool allReferenceTables = restrictionContext->allReferenceTables; /* we only do this check for INSERT ... SELECT queries */ AssertArg(InsertSelectQuery(queryTree)); @@ -755,10 +760,106 @@ InsertSelectQuerySupported(Query *queryTree, RangeTblEntry *insertRte, } } + + if (!AllRelationRestrictionsContainUninstantiatedQual(restrictionContext)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot plan distributed query since all join conditions in the query " + "need include two distribution keys using an equality operator", + NULL, NULL); + } + return NULL; } +/* + * AllRelationRestrictionsContainUninstantiatedQual iterates over the relation + * restrictions and returns true if the qual is distributed to all relations. + * Otherwise returns false. Reference tables are ignored during the iteration + * given that they wouldn't need to have the qual in any case. + * + * Also, if any relation restriction contains a false clause, the relation is + * ignored since its restrictions are removed by postgres. + */ +static bool +AllRelationRestrictionsContainUninstantiatedQual( + RelationRestrictionContext *restrictionContext) +{ + ListCell *relationRestrictionCell = NULL; + bool allRelationsHaveTheQual = true; + + foreach(relationRestrictionCell, restrictionContext->relationRestrictionList) + { + RelationRestriction *restriction = lfirst(relationRestrictionCell); + + List *baseRestrictInfo = list_copy(restriction->relOptInfo->baserestrictinfo); + List *joinInfo = list_copy(restriction->relOptInfo->joininfo); + List *allRestrictions = list_concat(baseRestrictInfo, joinInfo); + ListCell *restrictionCell = NULL; + bool relationHasRestriction = false; + + if (ContainsFalseClause(extract_actual_clauses(allRestrictions, true))) + { + continue; + } + + /* we don't need to check existince of qual for reference tables */ + if (PartitionMethod(restriction->relationId) == DISTRIBUTE_BY_NONE) + { + continue; + } + + foreach(restrictionCell, allRestrictions) + { + RestrictInfo *restrictInfo = (RestrictInfo *) lfirst(restrictionCell); + + relationHasRestriction = relationHasRestriction || + HasUninstantiatedQualWalker( + (Node *) restrictInfo->clause, + NULL); + + if (relationHasRestriction) + { + break; + } + } + + allRelationsHaveTheQual = allRelationsHaveTheQual && relationHasRestriction; + } + + return allRelationsHaveTheQual; +} + + +/* + * HasUninstantiatedQualWalker returns true if the given expression + * constains a parameter with UNINSTANTIATED_PARAMETER_ID. + */ +static bool +HasUninstantiatedQualWalker(Node *node, void *context) +{ + Param *param = NULL; + + if (node == NULL) + { + return false; + } + + if (IsA(node, Param)) + { + param = (Param *) node; + } + + if (param && param->paramid == UNINSTANTIATED_PARAMETER_ID) + { + return true; + } + + return expression_tree_walker(node, HasUninstantiatedQualWalker, NULL); +} + + /* * MultiTaskRouterSelectQuerySupported returns NULL if the query may be used * as the source for an INSERT ... SELECT or returns a description why not. diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index 16126f5cc..5745a6500 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -1213,11 +1213,7 @@ DEBUG: predicate pruning for shardId 13300005 DEBUG: predicate pruning for shardId 13300006 DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300003 raw_events_first JOIN public.raw_events_second_13300007 raw_events_second ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((raw_events_second.user_id = ANY (ARRAY[19, 20, 21])) AND ((hashint4(raw_events_first.user_id) >= 1073741824) AND (hashint4(raw_events_first.user_id) <= 2147483647))) DEBUG: Plan is router executable --- the following is a very tricky query for Citus --- although we do not support pushing down JOINs on non-partition --- columns here it is safe to push it down given that we're looking for --- a specific value (i.e., value_1 = 12) on the joining column. --- Note that the query always hits the same shard on raw_events_second +-- not supported given that the join is not on the partition column INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id @@ -1225,35 +1221,7 @@ FROM raw_events_first, raw_events_second WHERE raw_events_second.user_id = raw_events_first.value_1 AND raw_events_first.value_1 = 12; -DEBUG: predicate pruning for shardId 13300001 -DEBUG: predicate pruning for shardId 13300002 -DEBUG: predicate pruning for shardId 13300003 -DEBUG: predicate pruning for shardId 13300004 -DEBUG: predicate pruning for shardId 13300005 -DEBUG: predicate pruning for shardId 13300006 -DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM public.raw_events_first_13300000 raw_events_first, public.raw_events_second_13300007 raw_events_second WHERE (((raw_events_second.user_id = raw_events_first.value_1) AND (raw_events_first.value_1 = 12)) AND ((hashint4(raw_events_first.user_id) >= '-2147483648'::integer) AND (hashint4(raw_events_first.user_id) <= '-1073741825'::integer))) -DEBUG: predicate pruning for shardId 13300000 -DEBUG: predicate pruning for shardId 13300002 -DEBUG: predicate pruning for shardId 13300003 -DEBUG: predicate pruning for shardId 13300004 -DEBUG: predicate pruning for shardId 13300005 -DEBUG: predicate pruning for shardId 13300006 -DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM public.raw_events_first_13300001 raw_events_first, public.raw_events_second_13300007 raw_events_second WHERE (((raw_events_second.user_id = raw_events_first.value_1) AND (raw_events_first.value_1 = 12)) AND ((hashint4(raw_events_first.user_id) >= '-1073741824'::integer) AND (hashint4(raw_events_first.user_id) <= '-1'::integer))) -DEBUG: predicate pruning for shardId 13300000 -DEBUG: predicate pruning for shardId 13300001 -DEBUG: predicate pruning for shardId 13300003 -DEBUG: predicate pruning for shardId 13300004 -DEBUG: predicate pruning for shardId 13300005 -DEBUG: predicate pruning for shardId 13300006 -DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM public.raw_events_first_13300002 raw_events_first, public.raw_events_second_13300007 raw_events_second WHERE (((raw_events_second.user_id = raw_events_first.value_1) AND (raw_events_first.value_1 = 12)) AND ((hashint4(raw_events_first.user_id) >= 0) AND (hashint4(raw_events_first.user_id) <= 1073741823))) -DEBUG: predicate pruning for shardId 13300000 -DEBUG: predicate pruning for shardId 13300001 -DEBUG: predicate pruning for shardId 13300002 -DEBUG: predicate pruning for shardId 13300004 -DEBUG: predicate pruning for shardId 13300005 -DEBUG: predicate pruning for shardId 13300006 -DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM public.raw_events_first_13300003 raw_events_first, public.raw_events_second_13300007 raw_events_second WHERE (((raw_events_second.user_id = raw_events_first.value_1) AND (raw_events_first.value_1 = 12)) AND ((hashint4(raw_events_first.user_id) >= 1073741824) AND (hashint4(raw_events_first.user_id) <= 2147483647))) -DEBUG: Plan is router executable +ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator -- some unsupported LEFT/INNER JOINs -- JOIN on one table with partition column other is not INSERT INTO agg_events (user_id) @@ -1284,19 +1252,14 @@ SELECT raw_events_second.user_id FROM raw_events_first, raw_events_second WHERE raw_events_first.user_id = raw_events_first.value_1; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator -- both tables joined on non-partition columns INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id FROM raw_events_first LEFT JOIN raw_events_second ON raw_events_first.value_1 = raw_events_second.value_1; -DEBUG: predicate pruning for shardId 13300001 -DEBUG: predicate pruning for shardId 13300002 -DEBUG: predicate pruning for shardId 13300003 -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator -- same as the above with INNER JOIN INSERT INTO agg_events (user_id) SELECT @@ -1383,11 +1346,7 @@ SELECT raw_events_first.user_id FROM raw_events_first, raw_events_second WHERE raw_events_second.user_id = raw_events_first.value_1; -DEBUG: predicate pruning for shardId 13300001 -DEBUG: predicate pruning for shardId 13300002 -DEBUG: predicate pruning for shardId 13300003 -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator -- the following is again a very tricky query for Citus -- if the given filter was on value_1 as shown in the above, Citus could -- push it down. But here the query is refused @@ -1398,11 +1357,7 @@ FROM raw_events_first, raw_events_second WHERE raw_events_second.user_id = raw_events_first.value_1 AND raw_events_first.value_2 = 12; -DEBUG: predicate pruning for shardId 13300001 -DEBUG: predicate pruning for shardId 13300002 -DEBUG: predicate pruning for shardId 13300003 -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator -- foo is not joined on the partition key so the query is not -- pushed down INSERT INTO agg_events @@ -1456,8 +1411,7 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4, raw_events_second WHERE raw_events_first.user_id != raw_events_second.user_id GROUP BY raw_events_second.user_id) AS foo; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator -- INSERT partition column does not match with SELECT partition column INSERT INTO agg_events (value_4_agg, @@ -1600,8 +1554,7 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4, GROUP BY raw_events_second.value_1 HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2 ON (f.id = f2.id); -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator -- cannot pushdown the query since the JOIN is not equi JOIN INSERT INTO agg_events (user_id, value_4_agg) @@ -1630,14 +1583,59 @@ outer_most.id, max(outer_most.value) HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2 ON (f.id != f2.id)) as outer_most GROUP BY outer_most.id; -DEBUG: predicate pruning for shardId 13300001 -DEBUG: predicate pruning for shardId 13300002 -DEBUG: predicate pruning for shardId 13300003 -DEBUG: predicate pruning for shardId 13300005 -DEBUG: predicate pruning for shardId 13300006 -DEBUG: predicate pruning for shardId 13300007 -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator +-- some unsupported LATERAL JOINs +INSERT INTO agg_events (user_id, value_4_agg) +SELECT + averages.user_id, avg(averages.value_4) +FROM + (SELECT + raw_events_second.user_id + FROM + reference_table JOIN raw_events_second on (reference_table.user_id = raw_events_second.user_id) + ) reference_ids + JOIN LATERAL + (SELECT + user_id, value_4 + FROM + raw_events_first WHERE + value_4 = reference_ids.user_id) as averages ON true + GROUP BY averages.user_id; +ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator +INSERT INTO agg_events (user_id, value_4_agg) +SELECT + averages.user_id, avg(averages.value_4) +FROM + (SELECT + raw_events_second.user_id + FROM + reference_table JOIN raw_events_second on (reference_table.user_id = raw_events_second.user_id) + ) reference_ids + JOIN LATERAL + (SELECT + user_id, value_4 + FROM + raw_events_first) as averages ON averages.value_4 = reference_ids.user_id + GROUP BY averages.user_id; +ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator +INSERT INTO agg_events (user_id, value_4_agg) +SELECT + averages.user_id, avg(averages.value_4) +FROM + (SELECT + raw_events_second.user_id + FROM + reference_table JOIN raw_events_second on (reference_table.user_id = raw_events_second.user_id) + ) reference_ids + JOIN LATERAL + (SELECT + user_id, value_4 + FROM + raw_events_first) as averages ON averages.user_id = reference_ids.user_id +JOIN LATERAL + (SELECT user_id, value_4 FROM agg_events WHERE user_id = 15) as agg_ids ON (agg_ids.value_4 = averages.user_id) + GROUP BY averages.user_id; +ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator -- cannot pushdown since subquery returns another column than partition key INSERT INTO raw_events_second (user_id) diff --git a/src/test/regress/sql/multi_insert_select.sql b/src/test/regress/sql/multi_insert_select.sql index af0999101..54e039135 100644 --- a/src/test/regress/sql/multi_insert_select.sql +++ b/src/test/regress/sql/multi_insert_select.sql @@ -551,11 +551,7 @@ FROM raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.user_id WHERE raw_events_second.user_id IN (19, 20, 21); --- the following is a very tricky query for Citus --- although we do not support pushing down JOINs on non-partition --- columns here it is safe to push it down given that we're looking for --- a specific value (i.e., value_1 = 12) on the joining column. --- Note that the query always hits the same shard on raw_events_second +-- not supported given that the join is not on the partition column INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id @@ -859,6 +855,60 @@ outer_most.id, max(outer_most.value) ON (f.id != f2.id)) as outer_most GROUP BY outer_most.id; +-- some unsupported LATERAL JOINs +INSERT INTO agg_events (user_id, value_4_agg) +SELECT + averages.user_id, avg(averages.value_4) +FROM + (SELECT + raw_events_second.user_id + FROM + reference_table JOIN raw_events_second on (reference_table.user_id = raw_events_second.user_id) + ) reference_ids + JOIN LATERAL + (SELECT + user_id, value_4 + FROM + raw_events_first WHERE + value_4 = reference_ids.user_id) as averages ON true + GROUP BY averages.user_id; + + +INSERT INTO agg_events (user_id, value_4_agg) +SELECT + averages.user_id, avg(averages.value_4) +FROM + (SELECT + raw_events_second.user_id + FROM + reference_table JOIN raw_events_second on (reference_table.user_id = raw_events_second.user_id) + ) reference_ids + JOIN LATERAL + (SELECT + user_id, value_4 + FROM + raw_events_first) as averages ON averages.value_4 = reference_ids.user_id + GROUP BY averages.user_id; + + +INSERT INTO agg_events (user_id, value_4_agg) +SELECT + averages.user_id, avg(averages.value_4) +FROM + (SELECT + raw_events_second.user_id + FROM + reference_table JOIN raw_events_second on (reference_table.user_id = raw_events_second.user_id) + ) reference_ids + JOIN LATERAL + (SELECT + user_id, value_4 + FROM + raw_events_first) as averages ON averages.user_id = reference_ids.user_id +JOIN LATERAL + (SELECT user_id, value_4 FROM agg_events WHERE user_id = 15) as agg_ids ON (agg_ids.value_4 = averages.user_id) + GROUP BY averages.user_id; + -- cannot pushdown since subquery returns another column than partition key INSERT INTO raw_events_second (user_id) From 0c57ea7587056f60f4dc38e55978e754776e6f9a Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Wed, 22 Feb 2017 12:11:10 +0200 Subject: [PATCH 3/5] Improve shard pruning logic for INSERT ... SELECT logic With this commit we take into account the restriction information that are on the joininfo and hashed by InstantiatePartitionQual(). --- .../planner/multi_logical_planner.c | 25 ++- .../planner/multi_router_planner.c | 79 +++++++++- src/include/distributed/errormessage.h | 3 +- .../distributed/multi_logical_planner.h | 1 + .../regress/expected/multi_insert_select.out | 149 ++++++++++++++++-- src/test/regress/sql/multi_insert_select.sql | 28 +++- 6 files changed, 256 insertions(+), 29 deletions(-) diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index 1d2e071cd..18c3d1211 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -2108,26 +2108,37 @@ MultiSubqueryPushdownTable(RangeTblEntry *subqueryRangeTableEntry) /* - * OperatorImplementsEquality returns true if the given opno represents an - * equality operator. The function retrieves btree interpretation list for this - * opno and check if BTEqualStrategyNumber strategy is present. + * OperatorImplementsStrategy is a wrapper around OperatorImplementsStrategy using + * BTEqualStrategyNumber as the operatostrategy. */ bool OperatorImplementsEquality(Oid opno) { - bool equalityOperator = false; + return OperatorImplementsStrategy(opno, BTEqualStrategyNumber); +} + + +/* + * OperatorImplementsEquality returns true if the given opno represents the + * operator given strategy. The function retrieves btree interpretation list + * for this opno and check if the strategy is present. + */ +bool +OperatorImplementsStrategy(Oid opno, int strategy) +{ + bool operatorImplementsStrategy = false; List *btreeIntepretationList = get_op_btree_interpretation(opno); ListCell *btreeInterpretationCell = NULL; foreach(btreeInterpretationCell, btreeIntepretationList) { OpBtreeInterpretation *btreeIntepretation = (OpBtreeInterpretation *) lfirst(btreeInterpretationCell); - if (btreeIntepretation->strategy == BTEqualStrategyNumber) + if (btreeIntepretation->strategy == strategy) { - equalityOperator = true; + operatorImplementsStrategy = true; break; } } - return equalityOperator; + return operatorImplementsStrategy; } diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 6f8e1f143..a8a269e86 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -116,6 +116,7 @@ static bool RouterSelectQuery(Query *originalQuery, static bool RelationPrunesToMultipleShards(List *relationShardList); static List * TargetShardIntervalsForSelect(Query *query, RelationRestrictionContext *restrictionContext); +static List * GetHashedJoinInfoRestrictions(RelOptInfo *relInfo); static List * WorkersContainingAllShards(List *prunedShardIntervalsList); static List * IntersectPlacementList(List *lhsPlacementList, List *rhsPlacementList); static Job * RouterQueryJob(Query *query, Task *task, List *placementList); @@ -407,6 +408,9 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter originalBaserestrictInfo = (List *) InstantiatePartitionQualWalker((Node *) originalBaserestrictInfo, &instantiateQualWalker); + originalJoinInfo = + (List *) InstantiatePartitionQual((Node *) originalJoinInfo, + instantiateQualWalker); } /* @@ -2511,7 +2515,9 @@ TargetShardIntervalsForSelect(Query *query, DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId); int shardCount = cacheEntry->shardIntervalArrayLength; List *baseRestrictionList = relationRestriction->relOptInfo->baserestrictinfo; - List *restrictClauseList = get_all_actual_clauses(baseRestrictionList); + List *baseRestrictClauseList = get_all_actual_clauses(baseRestrictionList); + List *joinInfoRestrictionClauseList = + GetHashedJoinInfoRestrictions(relationRestriction->relOptInfo); List *prunedShardList = NIL; int shardIndex = 0; List *joinInfoList = relationRestriction->relOptInfo->joininfo; @@ -2530,6 +2536,7 @@ TargetShardIntervalsForSelect(Query *query, if (!whereFalseQuery && shardCount > 0) { List *shardIntervalList = NIL; + List *allRestrictions = NIL; for (shardIndex = 0; shardIndex < shardCount; shardIndex++) { @@ -2538,8 +2545,11 @@ TargetShardIntervalsForSelect(Query *query, shardIntervalList = lappend(shardIntervalList, shardInterval); } + allRestrictions = list_concat(baseRestrictClauseList, + joinInfoRestrictionClauseList); + prunedShardList = PruneShardList(relationId, tableId, - restrictClauseList, + allRestrictions, shardIntervalList); /* @@ -2562,6 +2572,71 @@ TargetShardIntervalsForSelect(Query *query, } +/* + * GetJoinInfoRestrictions iterates over the joininfo list of the given relInfo + * and returns all the restrictions which includes a hashed column generated by + * InstantiatePartitionQual() function. + */ +static List * +GetHashedJoinInfoRestrictions(RelOptInfo *relInfo) +{ + List *hashedJoinInfoRestrictions = NULL; + ListCell *joinInfoCell = NULL; + List *joinInfoClauses = NIL; + + /* Scan the rel's join clauses and get the necessary ones */ + foreach(joinInfoCell, relInfo->joininfo) + { + RestrictInfo *restrictInfo = (RestrictInfo *) lfirst(joinInfoCell); + OpExpr *restrictionOpExpression = NULL; + Var *hashedColumn = MakeInt4Column(); + Expr *restrictionExpression = NULL; + + /* + * We're looking for the hashedOperatorList that is returned by + * InstantiatePartitionQual() + */ + if (!IsA(restrictInfo->clause, List)) + { + continue; + } + + /* + * Expected expression is in the form of (hashedCol >= val) or + * (hashedCol =< val). So, the following checks aims to filter + * such operator expressions. + */ + restrictionExpression = (Expr *) linitial((List *) restrictInfo->clause); + if (!SimpleOpExpression(restrictionExpression)) + { + continue; + } + + restrictionOpExpression = (OpExpr *) restrictionExpression; + if (!(OperatorImplementsStrategy(restrictionOpExpression->opno, + BTGreaterEqualStrategyNumber) || + OperatorImplementsStrategy(restrictionOpExpression->opno, + BTLessEqualStrategyNumber))) + { + continue; + } + + if (!OpExpressionContainsColumn(restrictionOpExpression, hashedColumn)) + { + continue; + } + + hashedJoinInfoRestrictions = lappend(hashedJoinInfoRestrictions, + restrictInfo); + } + + /* finally get the actual clauses from the restrict infos */ + joinInfoClauses = get_all_actual_clauses(hashedJoinInfoRestrictions); + + return joinInfoClauses; +} + + /* * RelationPrunesToMultipleShards returns true if the given list of * relation-to-shard mappings contains at least two mappings with diff --git a/src/include/distributed/errormessage.h b/src/include/distributed/errormessage.h index 26f8dd63a..f4a38bc06 100644 --- a/src/include/distributed/errormessage.h +++ b/src/include/distributed/errormessage.h @@ -53,7 +53,8 @@ DeferredErrorMessage * DeferredErrorInternal(int code, const char *message, cons RaiseDeferredErrorInternal(error, elevel); \ if (__builtin_constant_p(elevel) && (elevel) >= ERROR) { \ pg_unreachable(); } \ - } while (0) + } \ + while (0) void RaiseDeferredErrorInternal(DeferredErrorMessage *error, int elevel); diff --git a/src/include/distributed/multi_logical_planner.h b/src/include/distributed/multi_logical_planner.h index 48dba38ca..e7969a9be 100644 --- a/src/include/distributed/multi_logical_planner.h +++ b/src/include/distributed/multi_logical_planner.h @@ -204,6 +204,7 @@ extern bool ExtractRangeTableRelationWalker(Node *node, List **rangeTableList); extern bool ExtractRangeTableEntryWalker(Node *node, List **rangeTableList); extern List * pull_var_clause_default(Node *node); extern bool OperatorImplementsEquality(Oid opno); +extern bool OperatorImplementsStrategy(Oid opno, int strategy); #endif /* MULTI_LOGICAL_PLANNER_H */ diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index 5745a6500..971754969 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -613,24 +613,30 @@ WHERE user_id IN (SELECT user_id DEBUG: predicate pruning for shardId 13300000 DEBUG: predicate pruning for shardId 13300001 DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 DEBUG: predicate pruning for shardId 13300004 DEBUG: predicate pruning for shardId 13300005 DEBUG: predicate pruning for shardId 13300006 -DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300004 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300003 raw_events_first WHERE ((user_id IN (SELECT raw_events_second.user_id FROM public.raw_events_second_13300007 raw_events_second WHERE (raw_events_second.user_id = 2))) AND ((hashint4(user_id) >= '-2147483648'::integer) AND (hashint4(user_id) <= '-1073741825'::integer))) +DEBUG: predicate pruning for shardId 13300007 +DEBUG: Skipping target shard interval 13300004 since SELECT query for it pruned away DEBUG: predicate pruning for shardId 13300000 DEBUG: predicate pruning for shardId 13300001 DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 DEBUG: predicate pruning for shardId 13300004 DEBUG: predicate pruning for shardId 13300005 DEBUG: predicate pruning for shardId 13300006 -DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300005 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300003 raw_events_first WHERE ((user_id IN (SELECT raw_events_second.user_id FROM public.raw_events_second_13300007 raw_events_second WHERE (raw_events_second.user_id = 2))) AND ((hashint4(user_id) >= '-1073741824'::integer) AND (hashint4(user_id) <= '-1'::integer))) +DEBUG: predicate pruning for shardId 13300007 +DEBUG: Skipping target shard interval 13300005 since SELECT query for it pruned away DEBUG: predicate pruning for shardId 13300000 DEBUG: predicate pruning for shardId 13300001 DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 DEBUG: predicate pruning for shardId 13300004 DEBUG: predicate pruning for shardId 13300005 DEBUG: predicate pruning for shardId 13300006 -DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300006 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300003 raw_events_first WHERE ((user_id IN (SELECT raw_events_second.user_id FROM public.raw_events_second_13300007 raw_events_second WHERE (raw_events_second.user_id = 2))) AND ((hashint4(user_id) >= 0) AND (hashint4(user_id) <= 1073741823))) +DEBUG: predicate pruning for shardId 13300007 +DEBUG: Skipping target shard interval 13300006 since SELECT query for it pruned away DEBUG: predicate pruning for shardId 13300000 DEBUG: predicate pruning for shardId 13300001 DEBUG: predicate pruning for shardId 13300002 @@ -639,6 +645,90 @@ DEBUG: predicate pruning for shardId 13300005 DEBUG: predicate pruning for shardId 13300006 DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300007 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300003 raw_events_first WHERE ((user_id IN (SELECT raw_events_second.user_id FROM public.raw_events_second_13300007 raw_events_second WHERE (raw_events_second.user_id = 2))) AND ((hashint4(user_id) >= 1073741824) AND (hashint4(user_id) <= 2147483647))) DEBUG: Plan is router executable +INSERT INTO raw_events_second + (user_id) +SELECT user_id +FROM raw_events_first +WHERE user_id IN (SELECT user_id + FROM raw_events_second + WHERE user_id != 2 AND value_1 = 2000) +ON conflict (user_id, value_1) DO NOTHING; +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300004 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300000 raw_events_first WHERE ((user_id IN (SELECT raw_events_second.user_id FROM public.raw_events_second_13300004 raw_events_second WHERE ((raw_events_second.user_id <> 2) AND (raw_events_second.value_1 = 2000)))) AND ((hashint4(user_id) >= '-2147483648'::integer) AND (hashint4(user_id) <= '-1073741825'::integer))) ON CONFLICT(user_id, value_1) DO NOTHING +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300005 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300001 raw_events_first WHERE ((user_id IN (SELECT raw_events_second.user_id FROM public.raw_events_second_13300005 raw_events_second WHERE ((raw_events_second.user_id <> 2) AND (raw_events_second.value_1 = 2000)))) AND ((hashint4(user_id) >= '-1073741824'::integer) AND (hashint4(user_id) <= '-1'::integer))) ON CONFLICT(user_id, value_1) DO NOTHING +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300006 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300002 raw_events_first WHERE ((user_id IN (SELECT raw_events_second.user_id FROM public.raw_events_second_13300006 raw_events_second WHERE ((raw_events_second.user_id <> 2) AND (raw_events_second.value_1 = 2000)))) AND ((hashint4(user_id) >= 0) AND (hashint4(user_id) <= 1073741823))) ON CONFLICT(user_id, value_1) DO NOTHING +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300007 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300003 raw_events_first WHERE ((user_id IN (SELECT raw_events_second.user_id FROM public.raw_events_second_13300007 raw_events_second WHERE ((raw_events_second.user_id <> 2) AND (raw_events_second.value_1 = 2000)))) AND ((hashint4(user_id) >= 1073741824) AND (hashint4(user_id) <= 2147483647))) ON CONFLICT(user_id, value_1) DO NOTHING +DEBUG: Plan is router executable +INSERT INTO raw_events_second + (user_id) +SELECT user_id +FROM raw_events_first +WHERE user_id IN (SELECT user_id + FROM raw_events_second WHERE false); +DEBUG: Skipping target shard interval 13300004 since SELECT query for it pruned away +DEBUG: Skipping target shard interval 13300005 since SELECT query for it pruned away +DEBUG: Skipping target shard interval 13300006 since SELECT query for it pruned away +DEBUG: Skipping target shard interval 13300007 since SELECT query for it pruned away +DEBUG: Plan is router executable +INSERT INTO raw_events_second + (user_id) +SELECT user_id +FROM raw_events_first +WHERE user_id IN (SELECT user_id + FROM raw_events_second + WHERE value_1 = 1000 OR value_1 = 2000 OR value_1 = 3000); +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300004 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300000 raw_events_first WHERE ((user_id IN (SELECT raw_events_second.user_id FROM public.raw_events_second_13300004 raw_events_second WHERE ((raw_events_second.value_1 = 1000) OR (raw_events_second.value_1 = 2000) OR (raw_events_second.value_1 = 3000)))) AND ((hashint4(user_id) >= '-2147483648'::integer) AND (hashint4(user_id) <= '-1073741825'::integer))) +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300005 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300001 raw_events_first WHERE ((user_id IN (SELECT raw_events_second.user_id FROM public.raw_events_second_13300005 raw_events_second WHERE ((raw_events_second.value_1 = 1000) OR (raw_events_second.value_1 = 2000) OR (raw_events_second.value_1 = 3000)))) AND ((hashint4(user_id) >= '-1073741824'::integer) AND (hashint4(user_id) <= '-1'::integer))) +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300006 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300002 raw_events_first WHERE ((user_id IN (SELECT raw_events_second.user_id FROM public.raw_events_second_13300006 raw_events_second WHERE ((raw_events_second.value_1 = 1000) OR (raw_events_second.value_1 = 2000) OR (raw_events_second.value_1 = 3000)))) AND ((hashint4(user_id) >= 0) AND (hashint4(user_id) <= 1073741823))) +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300007 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300003 raw_events_first WHERE ((user_id IN (SELECT raw_events_second.user_id FROM public.raw_events_second_13300007 raw_events_second WHERE ((raw_events_second.value_1 = 1000) OR (raw_events_second.value_1 = 2000) OR (raw_events_second.value_1 = 3000)))) AND ((hashint4(user_id) >= 1073741824) AND (hashint4(user_id) <= 2147483647))) +DEBUG: Plan is router executable -- some UPSERTS INSERT INTO agg_events AS ae ( @@ -1043,6 +1133,7 @@ DEBUG: predicate pruning for shardId 13300005 DEBUG: predicate pruning for shardId 13300006 DEBUG: predicate pruning for shardId 13300007 DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300000 raw_events_first LEFT JOIN public.raw_events_second_13300004 raw_events_second ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= '-2147483648'::integer) AND (hashint4(raw_events_first.user_id) <= '-1073741825'::integer))) +DEBUG: predicate pruning for shardId 13300000 DEBUG: predicate pruning for shardId 13300001 DEBUG: predicate pruning for shardId 13300002 DEBUG: predicate pruning for shardId 13300003 @@ -1050,7 +1141,8 @@ DEBUG: predicate pruning for shardId 13300004 DEBUG: predicate pruning for shardId 13300005 DEBUG: predicate pruning for shardId 13300006 DEBUG: predicate pruning for shardId 13300007 -DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300000 raw_events_first LEFT JOIN (SELECT NULL::integer AS user_id, NULL::timestamp without time zone AS "time", NULL::integer AS value_1, NULL::integer AS value_2, NULL::double precision AS value_3, NULL::bigint AS value_4 WHERE false) raw_events_second(user_id, "time", value_1, value_2, value_3, value_4) ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= '-1073741824'::integer) AND (hashint4(raw_events_first.user_id) <= '-1'::integer))) +DEBUG: Skipping target shard interval 13300009 since SELECT query for it pruned away +DEBUG: predicate pruning for shardId 13300000 DEBUG: predicate pruning for shardId 13300001 DEBUG: predicate pruning for shardId 13300002 DEBUG: predicate pruning for shardId 13300003 @@ -1058,7 +1150,8 @@ DEBUG: predicate pruning for shardId 13300004 DEBUG: predicate pruning for shardId 13300005 DEBUG: predicate pruning for shardId 13300006 DEBUG: predicate pruning for shardId 13300007 -DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300000 raw_events_first LEFT JOIN (SELECT NULL::integer AS user_id, NULL::timestamp without time zone AS "time", NULL::integer AS value_1, NULL::integer AS value_2, NULL::double precision AS value_3, NULL::bigint AS value_4 WHERE false) raw_events_second(user_id, "time", value_1, value_2, value_3, value_4) ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= 0) AND (hashint4(raw_events_first.user_id) <= 1073741823))) +DEBUG: Skipping target shard interval 13300010 since SELECT query for it pruned away +DEBUG: predicate pruning for shardId 13300000 DEBUG: predicate pruning for shardId 13300001 DEBUG: predicate pruning for shardId 13300002 DEBUG: predicate pruning for shardId 13300003 @@ -1066,7 +1159,7 @@ DEBUG: predicate pruning for shardId 13300004 DEBUG: predicate pruning for shardId 13300005 DEBUG: predicate pruning for shardId 13300006 DEBUG: predicate pruning for shardId 13300007 -DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300000 raw_events_first LEFT JOIN (SELECT NULL::integer AS user_id, NULL::timestamp without time zone AS "time", NULL::integer AS value_1, NULL::integer AS value_2, NULL::double precision AS value_3, NULL::bigint AS value_4 WHERE false) raw_events_second(user_id, "time", value_1, value_2, value_3, value_4) ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= 1073741824) AND (hashint4(raw_events_first.user_id) <= 2147483647))) +DEBUG: Skipping target shard interval 13300011 since SELECT query for it pruned away DEBUG: Plan is router executable INSERT INTO agg_events (user_id) SELECT @@ -1289,33 +1382,32 @@ DEBUG: predicate pruning for shardId 13300005 DEBUG: predicate pruning for shardId 13300006 DEBUG: predicate pruning for shardId 13300007 DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300000 raw_events_first LEFT JOIN public.raw_events_second_13300004 raw_events_second ON ((raw_events_first.user_id = raw_events_second.value_1))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= '-2147483648'::integer) AND (hashint4(raw_events_first.user_id) <= '-1073741825'::integer))) +DEBUG: predicate pruning for shardId 13300000 DEBUG: predicate pruning for shardId 13300001 DEBUG: predicate pruning for shardId 13300002 DEBUG: predicate pruning for shardId 13300003 DEBUG: predicate pruning for shardId 13300004 DEBUG: predicate pruning for shardId 13300006 DEBUG: predicate pruning for shardId 13300007 -DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300000 raw_events_first LEFT JOIN public.raw_events_second_13300005 raw_events_second ON ((raw_events_first.user_id = raw_events_second.value_1))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= '-1073741824'::integer) AND (hashint4(raw_events_first.user_id) <= '-1'::integer))) +DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM ((SELECT NULL::integer AS user_id, NULL::timestamp without time zone AS "time", NULL::integer AS value_1, NULL::integer AS value_2, NULL::double precision AS value_3, NULL::bigint AS value_4 WHERE false) raw_events_first(user_id, "time", value_1, value_2, value_3, value_4) LEFT JOIN public.raw_events_second_13300005 raw_events_second ON ((raw_events_first.user_id = raw_events_second.value_1))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= '-1073741824'::integer) AND (hashint4(raw_events_first.user_id) <= '-1'::integer))) +DEBUG: predicate pruning for shardId 13300000 DEBUG: predicate pruning for shardId 13300001 DEBUG: predicate pruning for shardId 13300002 DEBUG: predicate pruning for shardId 13300003 DEBUG: predicate pruning for shardId 13300004 DEBUG: predicate pruning for shardId 13300005 DEBUG: predicate pruning for shardId 13300007 -DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300000 raw_events_first LEFT JOIN public.raw_events_second_13300006 raw_events_second ON ((raw_events_first.user_id = raw_events_second.value_1))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= 0) AND (hashint4(raw_events_first.user_id) <= 1073741823))) +DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM ((SELECT NULL::integer AS user_id, NULL::timestamp without time zone AS "time", NULL::integer AS value_1, NULL::integer AS value_2, NULL::double precision AS value_3, NULL::bigint AS value_4 WHERE false) raw_events_first(user_id, "time", value_1, value_2, value_3, value_4) LEFT JOIN public.raw_events_second_13300006 raw_events_second ON ((raw_events_first.user_id = raw_events_second.value_1))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= 0) AND (hashint4(raw_events_first.user_id) <= 1073741823))) +DEBUG: predicate pruning for shardId 13300000 DEBUG: predicate pruning for shardId 13300001 DEBUG: predicate pruning for shardId 13300002 DEBUG: predicate pruning for shardId 13300003 DEBUG: predicate pruning for shardId 13300004 DEBUG: predicate pruning for shardId 13300005 DEBUG: predicate pruning for shardId 13300006 -DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300000 raw_events_first LEFT JOIN public.raw_events_second_13300007 raw_events_second ON ((raw_events_first.user_id = raw_events_second.value_1))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= 1073741824) AND (hashint4(raw_events_first.user_id) <= 2147483647))) +DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM ((SELECT NULL::integer AS user_id, NULL::timestamp without time zone AS "time", NULL::integer AS value_1, NULL::integer AS value_2, NULL::double precision AS value_3, NULL::bigint AS value_4 WHERE false) raw_events_first(user_id, "time", value_1, value_2, value_3, value_4) LEFT JOIN public.raw_events_second_13300007 raw_events_second ON ((raw_events_first.user_id = raw_events_second.value_1))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= 1073741824) AND (hashint4(raw_events_first.user_id) <= 2147483647))) DEBUG: Plan is router executable -- same as the above with INNER JOIN --- however this time query is not pushed down --- to the worker. This is related to how we process --- restriction infos, which we're considering to --- improve INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id @@ -1325,8 +1417,35 @@ WHERE raw_events_first.user_id = 10; DEBUG: predicate pruning for shardId 13300001 DEBUG: predicate pruning for shardId 13300002 DEBUG: predicate pruning for shardId 13300003 -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300000 raw_events_first JOIN public.raw_events_second_13300004 raw_events_second ON ((raw_events_first.user_id = raw_events_second.value_1))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= '-2147483648'::integer) AND (hashint4(raw_events_first.user_id) <= '-1073741825'::integer))) +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM ((SELECT NULL::integer AS user_id, NULL::timestamp without time zone AS "time", NULL::integer AS value_1, NULL::integer AS value_2, NULL::double precision AS value_3, NULL::bigint AS value_4 WHERE false) raw_events_first(user_id, "time", value_1, value_2, value_3, value_4) JOIN public.raw_events_second_13300005 raw_events_second ON ((raw_events_first.user_id = raw_events_second.value_1))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= '-1073741824'::integer) AND (hashint4(raw_events_first.user_id) <= '-1'::integer))) +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM ((SELECT NULL::integer AS user_id, NULL::timestamp without time zone AS "time", NULL::integer AS value_1, NULL::integer AS value_2, NULL::double precision AS value_3, NULL::bigint AS value_4 WHERE false) raw_events_first(user_id, "time", value_1, value_2, value_3, value_4) JOIN public.raw_events_second_13300006 raw_events_second ON ((raw_events_first.user_id = raw_events_second.value_1))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= 0) AND (hashint4(raw_events_first.user_id) <= 1073741823))) +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM ((SELECT NULL::integer AS user_id, NULL::timestamp without time zone AS "time", NULL::integer AS value_1, NULL::integer AS value_2, NULL::double precision AS value_3, NULL::bigint AS value_4 WHERE false) raw_events_first(user_id, "time", value_1, value_2, value_3, value_4) JOIN public.raw_events_second_13300007 raw_events_second ON ((raw_events_first.user_id = raw_events_second.value_1))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= 1073741824) AND (hashint4(raw_events_first.user_id) <= 2147483647))) +DEBUG: Plan is router executable -- make things a bit more complicate with IN clauses INSERT INTO agg_events (user_id) SELECT diff --git a/src/test/regress/sql/multi_insert_select.sql b/src/test/regress/sql/multi_insert_select.sql index 54e039135..3349cee59 100644 --- a/src/test/regress/sql/multi_insert_select.sql +++ b/src/test/regress/sql/multi_insert_select.sql @@ -326,6 +326,30 @@ WHERE user_id IN (SELECT user_id FROM raw_events_second WHERE user_id = 2); +INSERT INTO raw_events_second + (user_id) +SELECT user_id +FROM raw_events_first +WHERE user_id IN (SELECT user_id + FROM raw_events_second + WHERE user_id != 2 AND value_1 = 2000) +ON conflict (user_id, value_1) DO NOTHING; + +INSERT INTO raw_events_second + (user_id) +SELECT user_id +FROM raw_events_first +WHERE user_id IN (SELECT user_id + FROM raw_events_second WHERE false); + +INSERT INTO raw_events_second + (user_id) +SELECT user_id +FROM raw_events_first +WHERE user_id IN (SELECT user_id + FROM raw_events_second + WHERE value_1 = 1000 OR value_1 = 2000 OR value_1 = 3000); + -- some UPSERTS INSERT INTO agg_events AS ae ( @@ -610,10 +634,6 @@ WHERE raw_events_first.user_id = 10; -- same as the above with INNER JOIN --- however this time query is not pushed down --- to the worker. This is related to how we process --- restriction infos, which we're considering to --- improve INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id From 4c22ed9ec227bedbfbc0c1d6b5f231ffb16c75d0 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Thu, 23 Feb 2017 10:40:08 +0200 Subject: [PATCH 4/5] Fix rebase conflicts --- src/backend/distributed/planner/multi_router_planner.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index a8a269e86..c38c00adf 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -389,9 +389,9 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter { RelationRestriction *restriction = lfirst(restrictionCell); List *originalBaserestrictInfo = restriction->relOptInfo->baserestrictinfo; + List *originalJoinInfo = restriction->relOptInfo->joininfo; InstantiateQualContext instantiateQualWalker; Var *relationPartitionKey = PartitionKey(restriction->relationId); - Var *relationPartitionKey = PartitionKey(restriction->relationId); /* * We haven't added the quals if all participating tables are reference @@ -409,8 +409,8 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter (List *) InstantiatePartitionQualWalker((Node *) originalBaserestrictInfo, &instantiateQualWalker); originalJoinInfo = - (List *) InstantiatePartitionQual((Node *) originalJoinInfo, - instantiateQualWalker); + (List *) InstantiatePartitionQualWalker((Node *) originalJoinInfo, + &instantiateQualWalker); } /* From 56ec4e9b778ecdb06d8fa1534096374673a99443 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Thu, 23 Feb 2017 10:50:37 +0200 Subject: [PATCH 5/5] Improve the logic some more --- .../planner/multi_router_planner.c | 70 ++++++++++++++++--- .../regress/expected/multi_insert_select.out | 39 ++--------- 2 files changed, 68 insertions(+), 41 deletions(-) diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index c38c00adf..cae092141 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -801,6 +801,7 @@ AllRelationRestrictionsContainUninstantiatedQual( List *joinInfo = list_copy(restriction->relOptInfo->joininfo); List *allRestrictions = list_concat(baseRestrictInfo, joinInfo); ListCell *restrictionCell = NULL; + Var *relationPartitionKey = NULL; bool relationHasRestriction = false; if (ContainsFalseClause(extract_actual_clauses(allRestrictions, true))) @@ -814,6 +815,8 @@ AllRelationRestrictionsContainUninstantiatedQual( continue; } + relationPartitionKey = PartitionKey(restriction->relationId); + foreach(restrictionCell, allRestrictions) { RestrictInfo *restrictInfo = (RestrictInfo *) lfirst(restrictionCell); @@ -821,7 +824,7 @@ AllRelationRestrictionsContainUninstantiatedQual( relationHasRestriction = relationHasRestriction || HasUninstantiatedQualWalker( (Node *) restrictInfo->clause, - NULL); + relationPartitionKey); if (relationHasRestriction) { @@ -843,24 +846,75 @@ AllRelationRestrictionsContainUninstantiatedQual( static bool HasUninstantiatedQualWalker(Node *node, void *context) { - Param *param = NULL; + Var *relationPartitionColumn = (Var *) context; if (node == NULL) { return false; } - if (IsA(node, Param)) + if (IsA(node, OpExpr) && list_length(((OpExpr *) node)->args) == 2) { - param = (Param *) node; - } + OpExpr *op = (OpExpr *) node; + Node *leftop = get_leftop((Expr *) op); + Node *rightop = get_rightop((Expr *) op); + Param *param = NULL; + Var *currentColumn = NULL; - if (param && param->paramid == UNINSTANTIATED_PARAMETER_ID) - { + /* look for the Params */ + if (IsA(leftop, Param)) + { + param = (Param *) leftop; + + /* + * Before instantiating the qual, ensure that it is equal to + * the partition key. + */ + if (IsA(rightop, Var)) + { + currentColumn = (Var *) rightop; + } + } + else if (IsA(rightop, Param)) + { + param = (Param *) rightop; + + /* + * Before instantiating the qual, ensure that it is equal to + * the partition key. + */ + if (IsA(leftop, Var)) + { + currentColumn = (Var *) leftop; + } + } + else + { + return expression_tree_walker(node, HasUninstantiatedQualWalker, context); + } + + if (!(param && param->paramid == UNINSTANTIATED_PARAMETER_ID)) + { + return false; + } + + /* ensure that it is the relation's partition column */ + if (relationPartitionColumn && currentColumn && + currentColumn->varattno != relationPartitionColumn->varattno) + { + return false; + } + + /* + * We still return true here given that finding the parameter is the + * actual goal of the walker. We only hit here once the query includes + * (partitionColumn = Const) on the query and we artificially added + * the uninstantiated parameter to the query. + */ return true; } - return expression_tree_walker(node, HasUninstantiatedQualWalker, NULL); + return expression_tree_walker(node, HasUninstantiatedQualWalker, context); } diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index 971754969..deb8273cb 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -1322,22 +1322,14 @@ SELECT raw_events_first.user_id FROM raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1; -DEBUG: predicate pruning for shardId 13300001 -DEBUG: predicate pruning for shardId 13300002 -DEBUG: predicate pruning for shardId 13300003 -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator -- same as the above with INNER JOIN INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id FROM raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1; -DEBUG: predicate pruning for shardId 13300001 -DEBUG: predicate pruning for shardId 13300002 -DEBUG: predicate pruning for shardId 13300003 -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator -- a not meaningful query INSERT INTO agg_events (user_id) @@ -1359,11 +1351,7 @@ SELECT raw_events_first.user_id FROM raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1; -DEBUG: predicate pruning for shardId 13300001 -DEBUG: predicate pruning for shardId 13300002 -DEBUG: predicate pruning for shardId 13300003 -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator -- although we do not support pushing down JOINs on non-partition -- columns here it is safe to push it down given that we're looking for -- a specific value (i.e., user_id = 10) on the joining column. @@ -1453,11 +1441,7 @@ SELECT FROM raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1 WHERE raw_events_first.value_1 IN (10, 11,12) OR raw_events_second.user_id IN (1,2,3,4); -DEBUG: predicate pruning for shardId 13300001 -DEBUG: predicate pruning for shardId 13300002 -DEBUG: predicate pruning for shardId 13300003 -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator -- implicit join on non partition column should also not be pushed down INSERT INTO agg_events (user_id) @@ -1507,14 +1491,7 @@ FROM ON (f.id = f2.id)) as outer_most GROUP BY outer_most.id; -DEBUG: predicate pruning for shardId 13300001 -DEBUG: predicate pruning for shardId 13300002 -DEBUG: predicate pruning for shardId 13300003 -DEBUG: predicate pruning for shardId 13300005 -DEBUG: predicate pruning for shardId 13300006 -DEBUG: predicate pruning for shardId 13300007 -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator -- not equals on the partition column cannot be pushed down INSERT INTO agg_events (value_4_agg, @@ -1762,11 +1739,7 @@ SELECT user_id FROM raw_events_first WHERE user_id IN (SELECT value_2 FROM raw_events_second); -DEBUG: predicate pruning for shardId 13300001 -DEBUG: predicate pruning for shardId 13300002 -DEBUG: predicate pruning for shardId 13300003 -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator -- we currently not support grouping sets INSERT INTO agg_events (user_id,