From 8effb3073f1c9b0a880698de1b1da5fc55304fc5 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Wed, 22 Feb 2017 10:26:00 +0200 Subject: [PATCH] Give consistent error messages for unsupported JOINs on INSERT ... SELECT Ensure that all tables has the uninstantiated quals before planning of the distributed query starts. If not, give a meaningful error. --- .../planner/multi_router_planner.c | 109 ++++++++++++++- .../regress/expected/multi_insert_select.out | 124 +++++++++--------- src/test/regress/sql/multi_insert_select.sql | 60 ++++++++- 3 files changed, 221 insertions(+), 72 deletions(-) diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 641864893..6f8e1f143 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -127,7 +127,11 @@ static Node * InstantiatePartitionQualWalker(Node *node, void *context); static DeferredErrorMessage * InsertSelectQuerySupported(Query *queryTree, RangeTblEntry *insertRte, RangeTblEntry *subqueryRte, - bool allReferenceTables); + RelationRestrictionContext * + restrictionContext); +static bool AllRelationRestrictionsContainUninstantiatedQual(RelationRestrictionContext + *restrictionContext); +static bool HasUninstantiatedQualWalker(Node *node, void *context); static DeferredErrorMessage * MultiTaskRouterSelectQuerySupported(Query *query); static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query, RangeTblEntry *insertRte, @@ -270,7 +274,6 @@ CreateInsertSelectRouterPlan(Query *originalQuery, Oid targetRelationId = insertRte->relid; DistTableCacheEntry *targetCacheEntry = DistributedTableCacheEntry(targetRelationId); int shardCount = targetCacheEntry->shardIntervalArrayLength; - bool allReferenceTables = restrictionContext->allReferenceTables; /* * Error semantics for INSERT ... SELECT queries are different than regular @@ -278,7 +281,7 @@ CreateInsertSelectRouterPlan(Query *originalQuery, */ multiPlan->planningError = InsertSelectQuerySupported(originalQuery, insertRte, subqueryRte, - allReferenceTables); + restrictionContext); if (multiPlan->planningError) { return multiPlan; @@ -671,7 +674,8 @@ ExtractInsertRangeTableEntry(Query *query) */ static DeferredErrorMessage * InsertSelectQuerySupported(Query *queryTree, RangeTblEntry *insertRte, - RangeTblEntry *subqueryRte, bool allReferenceTables) + RangeTblEntry *subqueryRte, + RelationRestrictionContext *restrictionContext) { Query *subquery = NULL; Oid selectPartitionColumnTableId = InvalidOid; @@ -679,6 +683,7 @@ InsertSelectQuerySupported(Query *queryTree, RangeTblEntry *insertRte, char targetPartitionMethod = PartitionMethod(targetRelationId); ListCell *rangeTableCell = NULL; DeferredErrorMessage *error = NULL; + bool allReferenceTables = restrictionContext->allReferenceTables; /* we only do this check for INSERT ... SELECT queries */ AssertArg(InsertSelectQuery(queryTree)); @@ -755,10 +760,106 @@ InsertSelectQuerySupported(Query *queryTree, RangeTblEntry *insertRte, } } + + if (!AllRelationRestrictionsContainUninstantiatedQual(restrictionContext)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot plan distributed query since all join conditions in the query " + "need include two distribution keys using an equality operator", + NULL, NULL); + } + return NULL; } +/* + * AllRelationRestrictionsContainUninstantiatedQual iterates over the relation + * restrictions and returns true if the qual is distributed to all relations. + * Otherwise returns false. Reference tables are ignored during the iteration + * given that they wouldn't need to have the qual in any case. + * + * Also, if any relation restriction contains a false clause, the relation is + * ignored since its restrictions are removed by postgres. + */ +static bool +AllRelationRestrictionsContainUninstantiatedQual( + RelationRestrictionContext *restrictionContext) +{ + ListCell *relationRestrictionCell = NULL; + bool allRelationsHaveTheQual = true; + + foreach(relationRestrictionCell, restrictionContext->relationRestrictionList) + { + RelationRestriction *restriction = lfirst(relationRestrictionCell); + + List *baseRestrictInfo = list_copy(restriction->relOptInfo->baserestrictinfo); + List *joinInfo = list_copy(restriction->relOptInfo->joininfo); + List *allRestrictions = list_concat(baseRestrictInfo, joinInfo); + ListCell *restrictionCell = NULL; + bool relationHasRestriction = false; + + if (ContainsFalseClause(extract_actual_clauses(allRestrictions, true))) + { + continue; + } + + /* we don't need to check existince of qual for reference tables */ + if (PartitionMethod(restriction->relationId) == DISTRIBUTE_BY_NONE) + { + continue; + } + + foreach(restrictionCell, allRestrictions) + { + RestrictInfo *restrictInfo = (RestrictInfo *) lfirst(restrictionCell); + + relationHasRestriction = relationHasRestriction || + HasUninstantiatedQualWalker( + (Node *) restrictInfo->clause, + NULL); + + if (relationHasRestriction) + { + break; + } + } + + allRelationsHaveTheQual = allRelationsHaveTheQual && relationHasRestriction; + } + + return allRelationsHaveTheQual; +} + + +/* + * HasUninstantiatedQualWalker returns true if the given expression + * constains a parameter with UNINSTANTIATED_PARAMETER_ID. + */ +static bool +HasUninstantiatedQualWalker(Node *node, void *context) +{ + Param *param = NULL; + + if (node == NULL) + { + return false; + } + + if (IsA(node, Param)) + { + param = (Param *) node; + } + + if (param && param->paramid == UNINSTANTIATED_PARAMETER_ID) + { + return true; + } + + return expression_tree_walker(node, HasUninstantiatedQualWalker, NULL); +} + + /* * MultiTaskRouterSelectQuerySupported returns NULL if the query may be used * as the source for an INSERT ... SELECT or returns a description why not. diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index 16126f5cc..5745a6500 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -1213,11 +1213,7 @@ DEBUG: predicate pruning for shardId 13300005 DEBUG: predicate pruning for shardId 13300006 DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300003 raw_events_first JOIN public.raw_events_second_13300007 raw_events_second ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((raw_events_second.user_id = ANY (ARRAY[19, 20, 21])) AND ((hashint4(raw_events_first.user_id) >= 1073741824) AND (hashint4(raw_events_first.user_id) <= 2147483647))) DEBUG: Plan is router executable --- the following is a very tricky query for Citus --- although we do not support pushing down JOINs on non-partition --- columns here it is safe to push it down given that we're looking for --- a specific value (i.e., value_1 = 12) on the joining column. --- Note that the query always hits the same shard on raw_events_second +-- not supported given that the join is not on the partition column INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id @@ -1225,35 +1221,7 @@ FROM raw_events_first, raw_events_second WHERE raw_events_second.user_id = raw_events_first.value_1 AND raw_events_first.value_1 = 12; -DEBUG: predicate pruning for shardId 13300001 -DEBUG: predicate pruning for shardId 13300002 -DEBUG: predicate pruning for shardId 13300003 -DEBUG: predicate pruning for shardId 13300004 -DEBUG: predicate pruning for shardId 13300005 -DEBUG: predicate pruning for shardId 13300006 -DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM public.raw_events_first_13300000 raw_events_first, public.raw_events_second_13300007 raw_events_second WHERE (((raw_events_second.user_id = raw_events_first.value_1) AND (raw_events_first.value_1 = 12)) AND ((hashint4(raw_events_first.user_id) >= '-2147483648'::integer) AND (hashint4(raw_events_first.user_id) <= '-1073741825'::integer))) -DEBUG: predicate pruning for shardId 13300000 -DEBUG: predicate pruning for shardId 13300002 -DEBUG: predicate pruning for shardId 13300003 -DEBUG: predicate pruning for shardId 13300004 -DEBUG: predicate pruning for shardId 13300005 -DEBUG: predicate pruning for shardId 13300006 -DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM public.raw_events_first_13300001 raw_events_first, public.raw_events_second_13300007 raw_events_second WHERE (((raw_events_second.user_id = raw_events_first.value_1) AND (raw_events_first.value_1 = 12)) AND ((hashint4(raw_events_first.user_id) >= '-1073741824'::integer) AND (hashint4(raw_events_first.user_id) <= '-1'::integer))) -DEBUG: predicate pruning for shardId 13300000 -DEBUG: predicate pruning for shardId 13300001 -DEBUG: predicate pruning for shardId 13300003 -DEBUG: predicate pruning for shardId 13300004 -DEBUG: predicate pruning for shardId 13300005 -DEBUG: predicate pruning for shardId 13300006 -DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM public.raw_events_first_13300002 raw_events_first, public.raw_events_second_13300007 raw_events_second WHERE (((raw_events_second.user_id = raw_events_first.value_1) AND (raw_events_first.value_1 = 12)) AND ((hashint4(raw_events_first.user_id) >= 0) AND (hashint4(raw_events_first.user_id) <= 1073741823))) -DEBUG: predicate pruning for shardId 13300000 -DEBUG: predicate pruning for shardId 13300001 -DEBUG: predicate pruning for shardId 13300002 -DEBUG: predicate pruning for shardId 13300004 -DEBUG: predicate pruning for shardId 13300005 -DEBUG: predicate pruning for shardId 13300006 -DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM public.raw_events_first_13300003 raw_events_first, public.raw_events_second_13300007 raw_events_second WHERE (((raw_events_second.user_id = raw_events_first.value_1) AND (raw_events_first.value_1 = 12)) AND ((hashint4(raw_events_first.user_id) >= 1073741824) AND (hashint4(raw_events_first.user_id) <= 2147483647))) -DEBUG: Plan is router executable +ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator -- some unsupported LEFT/INNER JOINs -- JOIN on one table with partition column other is not INSERT INTO agg_events (user_id) @@ -1284,19 +1252,14 @@ SELECT raw_events_second.user_id FROM raw_events_first, raw_events_second WHERE raw_events_first.user_id = raw_events_first.value_1; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator -- both tables joined on non-partition columns INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id FROM raw_events_first LEFT JOIN raw_events_second ON raw_events_first.value_1 = raw_events_second.value_1; -DEBUG: predicate pruning for shardId 13300001 -DEBUG: predicate pruning for shardId 13300002 -DEBUG: predicate pruning for shardId 13300003 -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator -- same as the above with INNER JOIN INSERT INTO agg_events (user_id) SELECT @@ -1383,11 +1346,7 @@ SELECT raw_events_first.user_id FROM raw_events_first, raw_events_second WHERE raw_events_second.user_id = raw_events_first.value_1; -DEBUG: predicate pruning for shardId 13300001 -DEBUG: predicate pruning for shardId 13300002 -DEBUG: predicate pruning for shardId 13300003 -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator -- the following is again a very tricky query for Citus -- if the given filter was on value_1 as shown in the above, Citus could -- push it down. But here the query is refused @@ -1398,11 +1357,7 @@ FROM raw_events_first, raw_events_second WHERE raw_events_second.user_id = raw_events_first.value_1 AND raw_events_first.value_2 = 12; -DEBUG: predicate pruning for shardId 13300001 -DEBUG: predicate pruning for shardId 13300002 -DEBUG: predicate pruning for shardId 13300003 -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator -- foo is not joined on the partition key so the query is not -- pushed down INSERT INTO agg_events @@ -1456,8 +1411,7 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4, raw_events_second WHERE raw_events_first.user_id != raw_events_second.user_id GROUP BY raw_events_second.user_id) AS foo; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator -- INSERT partition column does not match with SELECT partition column INSERT INTO agg_events (value_4_agg, @@ -1600,8 +1554,7 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4, GROUP BY raw_events_second.value_1 HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2 ON (f.id = f2.id); -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator -- cannot pushdown the query since the JOIN is not equi JOIN INSERT INTO agg_events (user_id, value_4_agg) @@ -1630,14 +1583,59 @@ outer_most.id, max(outer_most.value) HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2 ON (f.id != f2.id)) as outer_most GROUP BY outer_most.id; -DEBUG: predicate pruning for shardId 13300001 -DEBUG: predicate pruning for shardId 13300002 -DEBUG: predicate pruning for shardId 13300003 -DEBUG: predicate pruning for shardId 13300005 -DEBUG: predicate pruning for shardId 13300006 -DEBUG: predicate pruning for shardId 13300007 -ERROR: cannot perform distributed planning for the given modification -DETAIL: Select query cannot be pushed down to the worker. +ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator +-- some unsupported LATERAL JOINs +INSERT INTO agg_events (user_id, value_4_agg) +SELECT + averages.user_id, avg(averages.value_4) +FROM + (SELECT + raw_events_second.user_id + FROM + reference_table JOIN raw_events_second on (reference_table.user_id = raw_events_second.user_id) + ) reference_ids + JOIN LATERAL + (SELECT + user_id, value_4 + FROM + raw_events_first WHERE + value_4 = reference_ids.user_id) as averages ON true + GROUP BY averages.user_id; +ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator +INSERT INTO agg_events (user_id, value_4_agg) +SELECT + averages.user_id, avg(averages.value_4) +FROM + (SELECT + raw_events_second.user_id + FROM + reference_table JOIN raw_events_second on (reference_table.user_id = raw_events_second.user_id) + ) reference_ids + JOIN LATERAL + (SELECT + user_id, value_4 + FROM + raw_events_first) as averages ON averages.value_4 = reference_ids.user_id + GROUP BY averages.user_id; +ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator +INSERT INTO agg_events (user_id, value_4_agg) +SELECT + averages.user_id, avg(averages.value_4) +FROM + (SELECT + raw_events_second.user_id + FROM + reference_table JOIN raw_events_second on (reference_table.user_id = raw_events_second.user_id) + ) reference_ids + JOIN LATERAL + (SELECT + user_id, value_4 + FROM + raw_events_first) as averages ON averages.user_id = reference_ids.user_id +JOIN LATERAL + (SELECT user_id, value_4 FROM agg_events WHERE user_id = 15) as agg_ids ON (agg_ids.value_4 = averages.user_id) + GROUP BY averages.user_id; +ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator -- cannot pushdown since subquery returns another column than partition key INSERT INTO raw_events_second (user_id) diff --git a/src/test/regress/sql/multi_insert_select.sql b/src/test/regress/sql/multi_insert_select.sql index af0999101..54e039135 100644 --- a/src/test/regress/sql/multi_insert_select.sql +++ b/src/test/regress/sql/multi_insert_select.sql @@ -551,11 +551,7 @@ FROM raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.user_id WHERE raw_events_second.user_id IN (19, 20, 21); --- the following is a very tricky query for Citus --- although we do not support pushing down JOINs on non-partition --- columns here it is safe to push it down given that we're looking for --- a specific value (i.e., value_1 = 12) on the joining column. --- Note that the query always hits the same shard on raw_events_second +-- not supported given that the join is not on the partition column INSERT INTO agg_events (user_id) SELECT raw_events_first.user_id @@ -859,6 +855,60 @@ outer_most.id, max(outer_most.value) ON (f.id != f2.id)) as outer_most GROUP BY outer_most.id; +-- some unsupported LATERAL JOINs +INSERT INTO agg_events (user_id, value_4_agg) +SELECT + averages.user_id, avg(averages.value_4) +FROM + (SELECT + raw_events_second.user_id + FROM + reference_table JOIN raw_events_second on (reference_table.user_id = raw_events_second.user_id) + ) reference_ids + JOIN LATERAL + (SELECT + user_id, value_4 + FROM + raw_events_first WHERE + value_4 = reference_ids.user_id) as averages ON true + GROUP BY averages.user_id; + + +INSERT INTO agg_events (user_id, value_4_agg) +SELECT + averages.user_id, avg(averages.value_4) +FROM + (SELECT + raw_events_second.user_id + FROM + reference_table JOIN raw_events_second on (reference_table.user_id = raw_events_second.user_id) + ) reference_ids + JOIN LATERAL + (SELECT + user_id, value_4 + FROM + raw_events_first) as averages ON averages.value_4 = reference_ids.user_id + GROUP BY averages.user_id; + + +INSERT INTO agg_events (user_id, value_4_agg) +SELECT + averages.user_id, avg(averages.value_4) +FROM + (SELECT + raw_events_second.user_id + FROM + reference_table JOIN raw_events_second on (reference_table.user_id = raw_events_second.user_id) + ) reference_ids + JOIN LATERAL + (SELECT + user_id, value_4 + FROM + raw_events_first) as averages ON averages.user_id = reference_ids.user_id +JOIN LATERAL + (SELECT user_id, value_4 FROM agg_events WHERE user_id = 15) as agg_ids ON (agg_ids.value_4 = averages.user_id) + GROUP BY averages.user_id; + -- cannot pushdown since subquery returns another column than partition key INSERT INTO raw_events_second (user_id)