From 0c57ea7587056f60f4dc38e55978e754776e6f9a Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Wed, 22 Feb 2017 12:11:10 +0200 Subject: [PATCH] Improve shard pruning logic for INSERT ... SELECT logic With this commit we take into account the restriction information that are on the joininfo and hashed by InstantiatePartitionQual(). --- .../planner/multi_logical_planner.c | 25 ++- .../planner/multi_router_planner.c | 79 +++++++++- src/include/distributed/errormessage.h | 3 +- .../distributed/multi_logical_planner.h | 1 + .../regress/expected/multi_insert_select.out | 149 ++++++++++++++++-- src/test/regress/sql/multi_insert_select.sql | 28 +++- 6 files changed, 256 insertions(+), 29 deletions(-) diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index 1d2e071cd..18c3d1211 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -2108,26 +2108,37 @@ MultiSubqueryPushdownTable(RangeTblEntry *subqueryRangeTableEntry) /* - * OperatorImplementsEquality returns true if the given opno represents an - * equality operator. The function retrieves btree interpretation list for this - * opno and check if BTEqualStrategyNumber strategy is present. + * OperatorImplementsStrategy is a wrapper around OperatorImplementsStrategy using + * BTEqualStrategyNumber as the operatostrategy. */ bool OperatorImplementsEquality(Oid opno) { - bool equalityOperator = false; + return OperatorImplementsStrategy(opno, BTEqualStrategyNumber); +} + + +/* + * OperatorImplementsEquality returns true if the given opno represents the + * operator given strategy. The function retrieves btree interpretation list + * for this opno and check if the strategy is present. + */ +bool +OperatorImplementsStrategy(Oid opno, int strategy) +{ + bool operatorImplementsStrategy = false; List *btreeIntepretationList = get_op_btree_interpretation(opno); ListCell *btreeInterpretationCell = NULL; foreach(btreeInterpretationCell, btreeIntepretationList) { OpBtreeInterpretation *btreeIntepretation = (OpBtreeInterpretation *) lfirst(btreeInterpretationCell); - if (btreeIntepretation->strategy == BTEqualStrategyNumber) + if (btreeIntepretation->strategy == strategy) { - equalityOperator = true; + operatorImplementsStrategy = true; break; } } - return equalityOperator; + return operatorImplementsStrategy; } diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 6f8e1f143..a8a269e86 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -116,6 +116,7 @@ static bool RouterSelectQuery(Query *originalQuery, static bool RelationPrunesToMultipleShards(List *relationShardList); static List * TargetShardIntervalsForSelect(Query *query, RelationRestrictionContext *restrictionContext); +static List * GetHashedJoinInfoRestrictions(RelOptInfo *relInfo); static List * WorkersContainingAllShards(List *prunedShardIntervalsList); static List * IntersectPlacementList(List *lhsPlacementList, List *rhsPlacementList); static Job * RouterQueryJob(Query *query, Task *task, List *placementList); @@ -407,6 +408,9 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter originalBaserestrictInfo = (List *) InstantiatePartitionQualWalker((Node *) originalBaserestrictInfo, &instantiateQualWalker); + originalJoinInfo = + (List *) InstantiatePartitionQual((Node *) originalJoinInfo, + instantiateQualWalker); } /* @@ -2511,7 +2515,9 @@ TargetShardIntervalsForSelect(Query *query, DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId); int shardCount = cacheEntry->shardIntervalArrayLength; List *baseRestrictionList = relationRestriction->relOptInfo->baserestrictinfo; - List *restrictClauseList = get_all_actual_clauses(baseRestrictionList); + List *baseRestrictClauseList = get_all_actual_clauses(baseRestrictionList); + List *joinInfoRestrictionClauseList = + GetHashedJoinInfoRestrictions(relationRestriction->relOptInfo); List *prunedShardList = NIL; int shardIndex = 0; List *joinInfoList = relationRestriction->relOptInfo->joininfo; @@ -2530,6 +2536,7 @@ TargetShardIntervalsForSelect(Query *query, if (!whereFalseQuery && shardCount > 0) { List *shardIntervalList = NIL; + List *allRestrictions = NIL; for (shardIndex = 0; shardIndex < shardCount; shardIndex++) { @@ -2538,8 +2545,11 @@ TargetShardIntervalsForSelect(Query *query, shardIntervalList = lappend(shardIntervalList, shardInterval); } + allRestrictions = list_concat(baseRestrictClauseList, + joinInfoRestrictionClauseList); + prunedShardList = PruneShardList(relationId, tableId, - restrictClauseList, + allRestrictions, shardIntervalList); /* @@ -2562,6 +2572,71 @@ TargetShardIntervalsForSelect(Query *query, } +/* + * GetJoinInfoRestrictions iterates over the joininfo list of the given relInfo + * and returns all the restrictions which includes a hashed column generated by + * InstantiatePartitionQual() function. + */ +static List * +GetHashedJoinInfoRestrictions(RelOptInfo *relInfo) +{ + List *hashedJoinInfoRestrictions = NULL; + ListCell *joinInfoCell = NULL; + List *joinInfoClauses = NIL; + + /* Scan the rel's join clauses and get the necessary ones */ + foreach(joinInfoCell, relInfo->joininfo) + { + RestrictInfo *restrictInfo = (RestrictInfo *) lfirst(joinInfoCell); + OpExpr *restrictionOpExpression = NULL; + Var *hashedColumn = MakeInt4Column(); + Expr *restrictionExpression = NULL; + + /* + * We're looking for the hashedOperatorList that is returned by + * InstantiatePartitionQual() + */ + if (!IsA(restrictInfo->clause, List)) + { + continue; + } + + /* + * Expected expression is in the form of (hashedCol >= val) or + * (hashedCol =< val). So, the following checks aims to filter + * such operator expressions. + */ + restrictionExpression = (Expr *) linitial((List *) restrictInfo->clause); + if (!SimpleOpExpression(restrictionExpression)) + { + continue; + } + + restrictionOpExpression = (OpExpr *) restrictionExpression; + if (!(OperatorImplementsStrategy(restrictionOpExpression->opno, + BTGreaterEqualStrategyNumber) || + OperatorImplementsStrategy(restrictionOpExpression->opno, + BTLessEqualStrategyNumber))) + { + continue; + } + + if (!OpExpressionContainsColumn(restrictionOpExpression, hashedColumn)) + { + continue; + } + + hashedJoinInfoRestrictions = lappend(hashedJoinInfoRestrictions, + restrictInfo); + } + + /* finally get the actual clauses from the restrict infos */ + joinInfoClauses = get_all_actual_clauses(hashedJoinInfoRestrictions); + + return joinInfoClauses; +} + + /* * RelationPrunesToMultipleShards returns true if the given list of * relation-to-shard mappings contains at least two mappings with diff --git a/src/include/distributed/errormessage.h b/src/include/distributed/errormessage.h index 26f8dd63a..f4a38bc06 100644 --- a/src/include/distributed/errormessage.h +++ b/src/include/distributed/errormessage.h @@ -53,7 +53,8 @@ DeferredErrorMessage * DeferredErrorInternal(int code, const char *message, cons RaiseDeferredErrorInternal(error, elevel); \ if (__builtin_constant_p(elevel) && (elevel) >= ERROR) { \ pg_unreachable(); } \ - } while (0) + } \ + while (0) void RaiseDeferredErrorInternal(DeferredErrorMessage *error, int elevel); diff --git a/src/include/distributed/multi_logical_planner.h b/src/include/distributed/multi_logical_planner.h index 48dba38ca..e7969a9be 100644 --- a/src/include/distributed/multi_logical_planner.h +++ b/src/include/distributed/multi_logical_planner.h @@ -204,6 +204,7 @@ extern bool ExtractRangeTableRelationWalker(Node *node, List **rangeTableList); extern bool ExtractRangeTableEntryWalker(Node *node, List **rangeTableList); extern List * pull_var_clause_default(Node *node); extern bool OperatorImplementsEquality(Oid opno); +extern bool OperatorImplementsStrategy(Oid opno, int strategy); #endif /* MULTI_LOGICAL_PLANNER_H */ diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index 5745a6500..971754969 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -613,24 +613,30 @@ WHERE user_id IN (SELECT user_id DEBUG: predicate pruning for shardId 13300000 DEBUG: predicate pruning for shardId 13300001 DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 DEBUG: predicate pruning for shardId 13300004 DEBUG: predicate pruning for shardId 13300005 DEBUG: predicate pruning for shardId 13300006 -DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300004 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300003 raw_events_first WHERE ((user_id IN (SELECT raw_events_second.user_id FROM public.raw_events_second_13300007 raw_events_second WHERE (raw_events_second.user_id = 2))) AND ((hashint4(user_id) >= '-2147483648'::integer) AND (hashint4(user_id) <= '-1073741825'::integer))) +DEBUG: predicate pruning for shardId 13300007 +DEBUG: Skipping target shard interval 13300004 since SELECT query for it pruned away DEBUG: predicate pruning for shardId 13300000 DEBUG: predicate pruning for shardId 13300001 DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 DEBUG: predicate pruning for shardId 13300004 DEBUG: predicate pruning for shardId 13300005 DEBUG: predicate pruning for shardId 13300006 -DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300005 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300003 raw_events_first WHERE ((user_id IN (SELECT raw_events_second.user_id FROM public.raw_events_second_13300007 raw_events_second WHERE (raw_events_second.user_id = 2))) AND ((hashint4(user_id) >= '-1073741824'::integer) AND (hashint4(user_id) <= '-1'::integer))) +DEBUG: predicate pruning for shardId 13300007 +DEBUG: Skipping target shard interval 13300005 since SELECT query for it pruned away DEBUG: predicate pruning for shardId 13300000 DEBUG: predicate pruning for shardId 13300001 DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 DEBUG: predicate pruning for shardId 13300004 DEBUG: predicate pruning for shardId 13300005 DEBUG: predicate pruning for shardId 13300006 -DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300006 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300003 raw_events_first WHERE ((user_id IN (SELECT raw_events_second.user_id FROM public.raw_events_second_13300007 raw_events_second WHERE (raw_events_second.user_id = 2))) AND ((hashint4(user_id) >= 0) AND (hashint4(user_id) <= 1073741823))) +DEBUG: predicate pruning for shardId 13300007 +DEBUG: Skipping target shard interval 13300006 since SELECT query for it pruned away DEBUG: predicate pruning for shardId 13300000 DEBUG: predicate pruning for shardId 13300001 DEBUG: predicate pruning for shardId 13300002 @@ -639,6 +645,90 @@ DEBUG: predicate pruning for shardId 13300005 DEBUG: predicate pruning for shardId 13300006 DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300007 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300003 raw_events_first WHERE ((user_id IN (SELECT raw_events_second.user_id FROM public.raw_events_second_13300007 raw_events_second WHERE (raw_events_second.user_id = 2))) AND ((hashint4(user_id) >= 1073741824) AND (hashint4(user_id) <= 2147483647))) DEBUG: Plan is router executable +INSERT INTO raw_events_second + (user_id) +SELECT user_id +FROM raw_events_first +WHERE user_id IN (SELECT user_id + FROM raw_events_second + WHERE user_id != 2 AND value_1 = 2000) +ON conflict (user_id, value_1) DO NOTHING; +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300004 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300000 raw_events_first WHERE ((user_id IN (SELECT raw_events_second.user_id FROM public.raw_events_second_13300004 raw_events_second WHERE ((raw_events_second.user_id <> 2) AND (raw_events_second.value_1 = 2000)))) AND ((hashint4(user_id) >= '-2147483648'::integer) AND (hashint4(user_id) <= '-1073741825'::integer))) ON CONFLICT(user_id, value_1) DO NOTHING +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300005 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300001 raw_events_first WHERE ((user_id IN (SELECT raw_events_second.user_id FROM public.raw_events_second_13300005 raw_events_second WHERE ((raw_events_second.user_id <> 2) AND (raw_events_second.value_1 = 2000)))) AND ((hashint4(user_id) >= '-1073741824'::integer) AND (hashint4(user_id) <= '-1'::integer))) ON CONFLICT(user_id, value_1) DO NOTHING +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300006 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300002 raw_events_first WHERE ((user_id IN (SELECT raw_events_second.user_id FROM public.raw_events_second_13300006 raw_events_second WHERE ((raw_events_second.user_id <> 2) AND (raw_events_second.value_1 = 2000)))) AND ((hashint4(user_id) >= 0) AND (hashint4(user_id) <= 1073741823))) ON CONFLICT(user_id, value_1) DO NOTHING +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300007 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300003 raw_events_first WHERE ((user_id IN (SELECT raw_events_second.user_id FROM public.raw_events_second_13300007 raw_events_second WHERE ((raw_events_second.user_id <> 2) AND (raw_events_second.value_1 = 2000)))) AND ((hashint4(user_id) >= 1073741824) AND (hashint4(user_id) <= 2147483647))) ON CONFLICT(user_id, value_1) DO NOTHING +DEBUG: Plan is router executable +INSERT INTO raw_events_second + (user_id) +SELECT user_id +FROM raw_events_first +WHERE user_id IN (SELECT user_id + FROM raw_events_second WHERE false); +DEBUG: Skipping target shard interval 13300004 since SELECT query for it pruned away +DEBUG: Skipping target shard interval 13300005 since SELECT query for it pruned away +DEBUG: Skipping target shard interval 13300006 since SELECT query for it pruned away +DEBUG: Skipping target shard interval 13300007 since SELECT query for it pruned away +DEBUG: Plan is router executable +INSERT INTO raw_events_second + (user_id) +SELECT user_id +FROM raw_events_first +WHERE user_id IN (SELECT user_id + FROM raw_events_second + WHERE value_1 = 1000 OR value_1 = 2000 OR value_1 = 3000); +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300004 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300000 raw_events_first WHERE ((user_id IN (SELECT raw_events_second.user_id FROM public.raw_events_second_13300004 raw_events_second WHERE ((raw_events_second.value_1 = 1000) OR (raw_events_second.value_1 = 2000) OR (raw_events_second.value_1 = 3000)))) AND ((hashint4(user_id) >= '-2147483648'::integer) AND (hashint4(user_id) <= '-1073741825'::integer))) +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300005 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300001 raw_events_first WHERE ((user_id IN (SELECT raw_events_second.user_id FROM public.raw_events_second_13300005 raw_events_second WHERE ((raw_events_second.value_1 = 1000) OR (raw_events_second.value_1 = 2000) OR (raw_events_second.value_1 = 3000)))) AND ((hashint4(user_id) >= '-1073741824'::integer) AND (hashint4(user_id) <= '-1'::integer))) +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300006 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300002 raw_events_first WHERE ((user_id IN (SELECT raw_events_second.user_id FROM public.raw_events_second_13300006 raw_events_second WHERE ((raw_events_second.value_1 = 1000) OR (raw_events_second.value_1 = 2000) OR (raw_events_second.value_1 = 3000)))) AND ((hashint4(user_id) >= 0) AND (hashint4(user_id) <= 1073741823))) +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300007 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300003 raw_events_first WHERE ((user_id IN (SELECT raw_events_second.user_id FROM public.raw_events_second_13300007 raw_events_second WHERE ((raw_events_second.value_1 = 1000) OR (raw_events_second.value_1 = 2000) OR (raw_events_second.value_1 = 3000)))) AND ((hashint4(user_id) >= 1073741824) AND (hashint4(user_id) <= 2147483647))) +DEBUG: Plan is router executable -- some UPSERTS INSERT INTO agg_events AS ae ( @@ -1043,6 +1133,7 @@ DEBUG: predicate pruning for shardId 13300005 DEBUG: predicate pruning for shardId 13300006 DEBUG: predicate pruning for shardId 13300007 DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300000 raw_events_first LEFT JOIN public.raw_events_second_13300004 raw_events_second ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= '-2147483648'::integer) AND (hashint4(raw_events_first.user_id) <= '-1073741825'::integer))) +DEBUG: predicate pruning for shardId 13300000 DEBUG: predicate pruning for shardId 13300001 DEBUG: predicate pruning for shardId 13300002 DEBUG: predicate pruning for shardId 13300003 @@ -1050,7 +1141,8 @@ DEBUG: predicate pruning for shardId 13300004 DEBUG: predicate pruning for shardId 13300005 DEBUG: predicate pruning for shardId 13300006 DEBUG: predicate pruning for shardId 13300007 -DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300000 raw_events_first LEFT JOIN (SELECT NULL::integer AS user_id, NULL::timestamp without time zone AS "time", NULL::integer AS value_1, NULL::integer AS value_2, NULL::double precision AS value_3, NULL::bigint AS value_4 WHERE false) raw_events_second(user_id, "time", value_1, value_2, value_3, value_4) ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= '-1073741824'::integer) AND (hashint4(raw_events_first.user_id) <= '-1'::integer))) +DEBUG: Skipping target shard interval 13300009 since SELECT query for it pruned away +DEBUG: predicate pruning for shardId 13300000 DEBUG: predicate pruning for shardId 13300001 DEBUG: predicate pruning for shardId 13300002 DEBUG: predicate pruning for shardId 13300003 @@ -1058,7 +1150,8 @@ DEBUG: predicate pruning for shardId 13300004 DEBUG: predicate pruning for shardId 13300005 DEBUG: predicate pruning for shardId 13300006 DEBUG: predicate pruning for shardId 13300007 -DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300000 raw_events_first LEFT JOIN (SELECT NULL::integer AS user_id, NULL::timestamp without time zone AS "time", NULL::integer AS value_1, NULL::integer AS value_2, NULL::double precision AS value_3, NULL::bigint AS value_4 WHERE false) raw_events_second(user_id, "time", value_1, value_2, value_3, value_4) ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= 0) AND (hashint4(raw_events_first.user_id) <= 1073741823))) +DEBUG: Skipping target shard interval 13300010 since SELECT query for it pruned away +DEBUG: predicate pruning for shardId 13300000 DEBUG: predicate pruning for shardId 13300001 DEBUG: predicate pruning for shardId 13300002 DEBUG: predicate pruning for shardId 13300003 @@ -1066,7 +1159,7 @@ DEBUG: predicate pruning for shardId 13300004 DEBUG: predicate pruning for shardId 13300005 DEBUG: predicate pruning for shardId 13300006 DEBUG: predicate pruning for shardId 13300007 -DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300000 raw_events_first LEFT JOIN (SELECT NULL::integer AS user_id, NULL::timestamp without time zone AS "time", NULL::integer AS value_1, NULL::integer AS value_2, NULL::double precision AS value_3, NULL::bigint AS value_4 WHERE false) raw_events_second(user_id, "time", value_1, value_2, value_3, value_4) ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= 1073741824) AND (hashint4(raw_events_first.user_id) <= 2147483647))) +DEBUG: Skipping target shard interval 13300011 since SELECT query for it pruned away DEBUG: Plan is router executable INSERT INTO agg_events (user_id) SELECT @@ -1289,33 +1382,32 @@ DEBUG: predicate pruning for shardId 13300005 DEBUG: predicate pruning for shardId 13300006 DEBUG: predicate pruning for shardId 13300007 DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300000 raw_events_first LEFT JOIN public.raw_events_second_13300004 raw_events_second ON ((raw_events_first.user_id = raw_events_second.value_1))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= '-2147483648'::integer) AND (hashint4(raw_events_first.user_id) <= '-1073741825'::integer))) +DEBUG: predicate pruning for shardId 13300000 DEBUG: predicate pruning for shardId 13300001 DEBUG: predicate pruning for shardId 13300002 DEBUG: predicate pruning for shardId 13300003 DEBUG: predicate pruning for shardId 13300004 DEBUG: predicate pruning for shardId 13300006 DEBUG: predicate pruning for shardId 13300007 -DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300000 raw_events_first LEFT JOIN public.raw_events_second_13300005 raw_events_second ON ((raw_events_first.user_id = raw_events_second.value_1))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= '-1073741824'::integer) AND (hashint4(raw_events_first.user_id) <= '-1'::integer))) +DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM ((SELECT NULL::integer AS user_id, NULL::timestamp without time zone AS "time", NULL::integer AS value_1, NULL::integer AS value_2, NULL::double precision AS value_3, NULL::bigint AS value_4 WHERE false) raw_events_first(user_id, "time", value_1, value_2, value_3, value_4) LEFT JOIN public.raw_events_second_13300005 raw_events_second ON ((raw_events_first.user_id = raw_events_second.value_1))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= '-1073741824'::integer) AND (hashint4(raw_events_first.user_id) <= '-1'::integer))) +DEBUG: predicate pruning for shardId 13300000 DEBUG: predicate pruning for shardId 13300001 DEBUG: predicate pruning for shardId 13300002 DEBUG: predicate pruning for shardId 13300003 DEBUG: predicate pruning for shardId 13300004 DEBUG: predicate pruning for shardId 13300005 DEBUG: predicate pruning for shardId 13300007 -DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300000 raw_events_first LEFT JOIN public.raw_events_second_13300006 raw_events_second ON ((raw_events_first.user_id = raw_events_second.value_1))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= 0) AND (hashint4(raw_events_first.user_id) <= 1073741823))) +DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM ((SELECT NULL::integer AS user_id, NULL::timestamp without time zone AS "time", NULL::integer AS value_1, NULL::integer AS value_2, NULL::double precision AS value_3, NULL::bigint AS value_4 WHERE false) raw_events_first(user_id, "time", value_1, value_2, value_3, value_4) LEFT JOIN public.raw_events_second_13300006 raw_events_second ON ((raw_events_first.user_id = raw_events_second.value_1))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= 0) AND (hashint4(raw_events_first.user_id) <= 1073741823))) +DEBUG: predicate pruning for shardId 13300000 DEBUG: predicate pruning for shardId 13300001 DEBUG: predicate pruning for shardId 13300002 DEBUG: predicate pruning for shardId 13300003 DEBUG: predicate pruning for shardId 13300004 DEBUG: predicate pruning for shardId 13300005 DEBUG: predicate pruning for shardId 13300006 -DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300000 raw_events_first LEFT JOIN public.raw_events_second_13300007 raw_events_second ON ((raw_events_first.user_id = raw_events_second.value_1))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= 1073741824) AND (hashint4(raw_events_first.user_id) <= 2147483647))) +DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM ((SELECT NULL::integer AS user_id, NULL::timestamp without time zone AS "time", NULL::integer AS value_1, NULL::integer AS value_2, NULL::double precision AS value_3, NULL::bigint AS value_4 WHERE false) raw_events_first(user_id, "time", value_1, value_2, value_3, value_4) LEFT JOIN public.raw_events_second_13300007 raw_events_second ON ((raw_events_first.user_id = raw_events_second.value_1))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= 1073741824) AND (hashint4(raw_events_first.user_id) <= 2147483647))) DEBUG: Plan is router executable -- same as the above with INNER JOIN --- however this time query is not pushed down --- to the worker. This is related to how we process --- restriction infos, which we're considering to --- improve INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id @@ -1325,8 +1417,35 @@ WHERE raw_events_first.user_id = 10; DEBUG: predicate pruning for shardId 13300001 DEBUG: predicate pruning for shardId 13300002 DEBUG: predicate pruning for shardId 13300003 -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300000 raw_events_first JOIN public.raw_events_second_13300004 raw_events_second ON ((raw_events_first.user_id = raw_events_second.value_1))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= '-2147483648'::integer) AND (hashint4(raw_events_first.user_id) <= '-1073741825'::integer))) +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM ((SELECT NULL::integer AS user_id, NULL::timestamp without time zone AS "time", NULL::integer AS value_1, NULL::integer AS value_2, NULL::double precision AS value_3, NULL::bigint AS value_4 WHERE false) raw_events_first(user_id, "time", value_1, value_2, value_3, value_4) JOIN public.raw_events_second_13300005 raw_events_second ON ((raw_events_first.user_id = raw_events_second.value_1))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= '-1073741824'::integer) AND (hashint4(raw_events_first.user_id) <= '-1'::integer))) +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM ((SELECT NULL::integer AS user_id, NULL::timestamp without time zone AS "time", NULL::integer AS value_1, NULL::integer AS value_2, NULL::double precision AS value_3, NULL::bigint AS value_4 WHERE false) raw_events_first(user_id, "time", value_1, value_2, value_3, value_4) JOIN public.raw_events_second_13300006 raw_events_second ON ((raw_events_first.user_id = raw_events_second.value_1))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= 0) AND (hashint4(raw_events_first.user_id) <= 1073741823))) +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM ((SELECT NULL::integer AS user_id, NULL::timestamp without time zone AS "time", NULL::integer AS value_1, NULL::integer AS value_2, NULL::double precision AS value_3, NULL::bigint AS value_4 WHERE false) raw_events_first(user_id, "time", value_1, value_2, value_3, value_4) JOIN public.raw_events_second_13300007 raw_events_second ON ((raw_events_first.user_id = raw_events_second.value_1))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= 1073741824) AND (hashint4(raw_events_first.user_id) <= 2147483647))) +DEBUG: Plan is router executable -- make things a bit more complicate with IN clauses INSERT INTO agg_events (user_id) SELECT diff --git a/src/test/regress/sql/multi_insert_select.sql b/src/test/regress/sql/multi_insert_select.sql index 54e039135..3349cee59 100644 --- a/src/test/regress/sql/multi_insert_select.sql +++ b/src/test/regress/sql/multi_insert_select.sql @@ -326,6 +326,30 @@ WHERE user_id IN (SELECT user_id FROM raw_events_second WHERE user_id = 2); +INSERT INTO raw_events_second + (user_id) +SELECT user_id +FROM raw_events_first +WHERE user_id IN (SELECT user_id + FROM raw_events_second + WHERE user_id != 2 AND value_1 = 2000) +ON conflict (user_id, value_1) DO NOTHING; + +INSERT INTO raw_events_second + (user_id) +SELECT user_id +FROM raw_events_first +WHERE user_id IN (SELECT user_id + FROM raw_events_second WHERE false); + +INSERT INTO raw_events_second + (user_id) +SELECT user_id +FROM raw_events_first +WHERE user_id IN (SELECT user_id + FROM raw_events_second + WHERE value_1 = 1000 OR value_1 = 2000 OR value_1 = 3000); + -- some UPSERTS INSERT INTO agg_events AS ae ( @@ -610,10 +634,6 @@ WHERE raw_events_first.user_id = 10; -- same as the above with INNER JOIN --- however this time query is not pushed down --- to the worker. This is related to how we process --- restriction infos, which we're considering to --- improve INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id