From 681da712513ee534a6bf0a77ad653f20ee2339db Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Mon, 20 Feb 2017 14:00:25 +0200 Subject: [PATCH 1/3] Disallow INSERT ... SELECT to pushdown joins on non-partition keys With this commit, we disallow JOINs on non-partition keys. Simply, while instantiating the qual we check whether the qual is on the partition key. --- .../planner/multi_router_planner.c | 57 ++- .../regress/expected/multi_insert_select.out | 482 +++++++++++++++++- src/test/regress/sql/multi_insert_select.sql | 197 ++++++- 3 files changed, 727 insertions(+), 9 deletions(-) diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 784c9e696..a10c53d68 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -72,6 +72,14 @@ typedef struct WalkerState bool badCoalesce; } WalkerState; + +typedef struct InstantiateQualContext +{ + ShardInterval *targetShardInterval; + Var *relationPartitionColumn; +}InstantiateQualContext; + + bool EnableRouterExecution = true; /* planner functions forward declarations */ @@ -115,7 +123,7 @@ static bool MultiRouterPlannableQuery(Query *query, RelationRestrictionContext *restrictionContext); static RelationRestrictionContext * CopyRelationRestrictionContext( RelationRestrictionContext *oldContext); -static Node * InstantiatePartitionQual(Node *node, void *context); +static Node * InstantiatePartitionQualWalker(Node *node, void *context); static DeferredErrorMessage * InsertSelectQuerySupported(Query *queryTree, RangeTblEntry *insertRte, RangeTblEntry *subqueryRte, @@ -377,6 +385,8 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter { RelationRestriction *restriction = lfirst(restrictionCell); List *originalBaserestrictInfo = restriction->relOptInfo->baserestrictinfo; + InstantiateQualContext instantiateQualWalker; + Var *relationPartitionKey = PartitionKey(restriction->relationId); /* * We haven't added the quals if all participating tables are reference @@ -387,9 +397,12 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter break; } + instantiateQualWalker.relationPartitionColumn = relationPartitionKey; + instantiateQualWalker.targetShardInterval = shardInterval; + originalBaserestrictInfo = - (List *) InstantiatePartitionQual((Node *) originalBaserestrictInfo, - shardInterval); + (List *) InstantiatePartitionQualWalker((Node *) originalBaserestrictInfo, + &instantiateQualWalker); } /* @@ -2948,9 +2961,13 @@ CopyRelationRestrictionContext(RelationRestrictionContext *oldContext) * (partCol >= shardMinValue && partCol <= shardMaxValue). */ static Node * -InstantiatePartitionQual(Node *node, void *context) +InstantiatePartitionQualWalker(Node *node, void *context) { - ShardInterval *shardInterval = (ShardInterval *) context; + ShardInterval *shardInterval = + ((InstantiateQualContext *) context)->targetShardInterval; + Var *relationPartitionColumn = + ((InstantiateQualContext *) context)->relationPartitionColumn; + Assert(shardInterval->minValueExists); Assert(shardInterval->maxValueExists); @@ -2974,6 +2991,7 @@ InstantiatePartitionQual(Node *node, void *context) Node *leftop = get_leftop((Expr *) op); Node *rightop = get_rightop((Expr *) op); Param *param = NULL; + Var *currentColumn = NULL; Var *hashedGEColumn = NULL; OpExpr *hashedGEOpExpr = NULL; @@ -2992,10 +3010,28 @@ InstantiatePartitionQual(Node *node, void *context) if (IsA(leftop, Param)) { param = (Param *) leftop; + + /* + * Before instantiating the qual, ensure that it is equal to + * the partition key. + */ + if (IsA(rightop, Var)) + { + currentColumn = (Var *) rightop; + } } else if (IsA(rightop, Param)) { param = (Param *) rightop; + + /* + * Before instantiating the qual, ensure that it is equal to + * the partition key. + */ + if (IsA(leftop, Var)) + { + currentColumn = (Var *) leftop; + } } /* not an interesting param for our purpose, so return */ @@ -3004,6 +3040,13 @@ InstantiatePartitionQual(Node *node, void *context) return node; } + /* if the qual is not on the partition column, do not instantiate */ + if (relationPartitionColumn && currentColumn && + currentColumn->varattno != relationPartitionColumn->varattno) + { + return node; + } + /* get the integer >=, <= operators from the catalog */ integer4GEoperatorId = get_opfamily_member(INTEGER_BTREE_FAM_OID, INT4OID, INT4OID, @@ -3052,13 +3095,13 @@ InstantiatePartitionQual(Node *node, void *context) if (IsA(node, RestrictInfo)) { RestrictInfo *restrictInfo = (RestrictInfo *) node; - restrictInfo->clause = (Expr *) InstantiatePartitionQual( + restrictInfo->clause = (Expr *) InstantiatePartitionQualWalker( (Node *) restrictInfo->clause, context); return (Node *) restrictInfo; } - return expression_tree_mutator(node, InstantiatePartitionQual, context); + return expression_tree_mutator(node, InstantiatePartitionQualWalker, context); } diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index edf7dfede..16126f5cc 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -973,7 +973,475 @@ FROM ((SELECT user_id FROM raw_events_first WHERE user_id = 15) EXCEPT (SELECT user_id FROM raw_events_second where user_id = 17)) as foo; ERROR: set operations are not allowed in INSERT ... SELECT queries --- unsupported JOIN +-- some supported LEFT joins +INSERT INTO agg_events (user_id) +SELECT + raw_events_first.user_id +FROM + raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.user_id; +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300000 raw_events_first LEFT JOIN public.raw_events_second_13300004 raw_events_second ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((hashint4(raw_events_first.user_id) >= '-2147483648'::integer) AND (hashint4(raw_events_first.user_id) <= '-1073741825'::integer)) +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300001 raw_events_first LEFT JOIN public.raw_events_second_13300005 raw_events_second ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((hashint4(raw_events_first.user_id) >= '-1073741824'::integer) AND (hashint4(raw_events_first.user_id) <= '-1'::integer)) +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300002 raw_events_first LEFT JOIN public.raw_events_second_13300006 raw_events_second ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((hashint4(raw_events_first.user_id) >= 0) AND (hashint4(raw_events_first.user_id) <= 1073741823)) +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300003 raw_events_first LEFT JOIN public.raw_events_second_13300007 raw_events_second ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((hashint4(raw_events_first.user_id) >= 1073741824) AND (hashint4(raw_events_first.user_id) <= 2147483647)) +DEBUG: Plan is router executable +INSERT INTO agg_events (user_id) +SELECT + raw_events_second.user_id +FROM + reference_table LEFT JOIN raw_events_second ON reference_table.user_id = raw_events_second.user_id; +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS citus_table_alias (user_id) SELECT raw_events_second.user_id FROM (public.reference_table_13300012 reference_table LEFT JOIN public.raw_events_second_13300004 raw_events_second ON ((reference_table.user_id = raw_events_second.user_id))) WHERE ((hashint4(raw_events_second.user_id) >= '-2147483648'::integer) AND (hashint4(raw_events_second.user_id) <= '-1073741825'::integer)) +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id) SELECT raw_events_second.user_id FROM (public.reference_table_13300012 reference_table LEFT JOIN public.raw_events_second_13300005 raw_events_second ON ((reference_table.user_id = raw_events_second.user_id))) WHERE ((hashint4(raw_events_second.user_id) >= '-1073741824'::integer) AND (hashint4(raw_events_second.user_id) <= '-1'::integer)) +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id) SELECT raw_events_second.user_id FROM (public.reference_table_13300012 reference_table LEFT JOIN public.raw_events_second_13300006 raw_events_second ON ((reference_table.user_id = raw_events_second.user_id))) WHERE ((hashint4(raw_events_second.user_id) >= 0) AND (hashint4(raw_events_second.user_id) <= 1073741823)) +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_second.user_id FROM (public.reference_table_13300012 reference_table LEFT JOIN public.raw_events_second_13300007 raw_events_second ON ((reference_table.user_id = raw_events_second.user_id))) WHERE ((hashint4(raw_events_second.user_id) >= 1073741824) AND (hashint4(raw_events_second.user_id) <= 2147483647)) +DEBUG: Plan is router executable +INSERT INTO agg_events (user_id) +SELECT + raw_events_first.user_id +FROM + raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.user_id + WHERE raw_events_first.user_id = 10; +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300000 raw_events_first LEFT JOIN public.raw_events_second_13300004 raw_events_second ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= '-2147483648'::integer) AND (hashint4(raw_events_first.user_id) <= '-1073741825'::integer))) +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300000 raw_events_first LEFT JOIN (SELECT NULL::integer AS user_id, NULL::timestamp without time zone AS "time", NULL::integer AS value_1, NULL::integer AS value_2, NULL::double precision AS value_3, NULL::bigint AS value_4 WHERE false) raw_events_second(user_id, "time", value_1, value_2, value_3, value_4) ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= '-1073741824'::integer) AND (hashint4(raw_events_first.user_id) <= '-1'::integer))) +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300000 raw_events_first LEFT JOIN (SELECT NULL::integer AS user_id, NULL::timestamp without time zone AS "time", NULL::integer AS value_1, NULL::integer AS value_2, NULL::double precision AS value_3, NULL::bigint AS value_4 WHERE false) raw_events_second(user_id, "time", value_1, value_2, value_3, value_4) ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= 0) AND (hashint4(raw_events_first.user_id) <= 1073741823))) +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300000 raw_events_first LEFT JOIN (SELECT NULL::integer AS user_id, NULL::timestamp without time zone AS "time", NULL::integer AS value_1, NULL::integer AS value_2, NULL::double precision AS value_3, NULL::bigint AS value_4 WHERE false) raw_events_second(user_id, "time", value_1, value_2, value_3, value_4) ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= 1073741824) AND (hashint4(raw_events_first.user_id) <= 2147483647))) +DEBUG: Plan is router executable +INSERT INTO agg_events (user_id) +SELECT + raw_events_first.user_id +FROM + raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.user_id + WHERE raw_events_second.user_id = 10 OR raw_events_second.user_id = 11; +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300000 raw_events_first LEFT JOIN public.raw_events_second_13300004 raw_events_second ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE (((raw_events_second.user_id = 10) OR (raw_events_second.user_id = 11)) AND ((hashint4(raw_events_first.user_id) >= '-2147483648'::integer) AND (hashint4(raw_events_first.user_id) <= '-1073741825'::integer))) +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300001 raw_events_first LEFT JOIN (SELECT NULL::integer AS user_id, NULL::timestamp without time zone AS "time", NULL::integer AS value_1, NULL::integer AS value_2, NULL::double precision AS value_3, NULL::bigint AS value_4 WHERE false) raw_events_second(user_id, "time", value_1, value_2, value_3, value_4) ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE (((raw_events_second.user_id = 10) OR (raw_events_second.user_id = 11)) AND ((hashint4(raw_events_first.user_id) >= '-1073741824'::integer) AND (hashint4(raw_events_first.user_id) <= '-1'::integer))) +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300002 raw_events_first LEFT JOIN (SELECT NULL::integer AS user_id, NULL::timestamp without time zone AS "time", NULL::integer AS value_1, NULL::integer AS value_2, NULL::double precision AS value_3, NULL::bigint AS value_4 WHERE false) raw_events_second(user_id, "time", value_1, value_2, value_3, value_4) ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE (((raw_events_second.user_id = 10) OR (raw_events_second.user_id = 11)) AND ((hashint4(raw_events_first.user_id) >= 0) AND (hashint4(raw_events_first.user_id) <= 1073741823))) +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300003 raw_events_first LEFT JOIN public.raw_events_second_13300007 raw_events_second ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE (((raw_events_second.user_id = 10) OR (raw_events_second.user_id = 11)) AND ((hashint4(raw_events_first.user_id) >= 1073741824) AND (hashint4(raw_events_first.user_id) <= 2147483647))) +DEBUG: Plan is router executable +INSERT INTO agg_events (user_id) +SELECT + raw_events_first.user_id +FROM + raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.user_id + WHERE raw_events_first.user_id = 10 AND raw_events_first.user_id = 20; +DEBUG: Skipping target shard interval 13300008 since SELECT query for it pruned away +DEBUG: Skipping target shard interval 13300009 since SELECT query for it pruned away +DEBUG: Skipping target shard interval 13300010 since SELECT query for it pruned away +DEBUG: Skipping target shard interval 13300011 since SELECT query for it pruned away +DEBUG: Plan is router executable +INSERT INTO agg_events (user_id) +SELECT + raw_events_first.user_id +FROM + raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.user_id + WHERE raw_events_first.user_id = 10 AND raw_events_second.user_id = 20; +DEBUG: Skipping target shard interval 13300008 since SELECT query for it pruned away +DEBUG: Skipping target shard interval 13300009 since SELECT query for it pruned away +DEBUG: Skipping target shard interval 13300010 since SELECT query for it pruned away +DEBUG: Skipping target shard interval 13300011 since SELECT query for it pruned away +DEBUG: Plan is router executable +INSERT INTO agg_events (user_id) +SELECT + raw_events_first.user_id +FROM + raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.user_id + WHERE raw_events_first.user_id IN (19, 20, 21); +NOTICE: cannot use shard pruning with ANY/ALL (array expression) +HINT: Consider rewriting the expression with OR/AND clauses. +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300000 raw_events_first LEFT JOIN public.raw_events_second_13300004 raw_events_second ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((raw_events_first.user_id = ANY (ARRAY[19, 20, 21])) AND ((hashint4(raw_events_first.user_id) >= '-2147483648'::integer) AND (hashint4(raw_events_first.user_id) <= '-1073741825'::integer))) +NOTICE: cannot use shard pruning with ANY/ALL (array expression) +HINT: Consider rewriting the expression with OR/AND clauses. +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300001 raw_events_first LEFT JOIN public.raw_events_second_13300005 raw_events_second ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((raw_events_first.user_id = ANY (ARRAY[19, 20, 21])) AND ((hashint4(raw_events_first.user_id) >= '-1073741824'::integer) AND (hashint4(raw_events_first.user_id) <= '-1'::integer))) +NOTICE: cannot use shard pruning with ANY/ALL (array expression) +HINT: Consider rewriting the expression with OR/AND clauses. +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300002 raw_events_first LEFT JOIN public.raw_events_second_13300006 raw_events_second ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((raw_events_first.user_id = ANY (ARRAY[19, 20, 21])) AND ((hashint4(raw_events_first.user_id) >= 0) AND (hashint4(raw_events_first.user_id) <= 1073741823))) +NOTICE: cannot use shard pruning with ANY/ALL (array expression) +HINT: Consider rewriting the expression with OR/AND clauses. +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300003 raw_events_first LEFT JOIN public.raw_events_second_13300007 raw_events_second ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((raw_events_first.user_id = ANY (ARRAY[19, 20, 21])) AND ((hashint4(raw_events_first.user_id) >= 1073741824) AND (hashint4(raw_events_first.user_id) <= 2147483647))) +DEBUG: Plan is router executable +INSERT INTO agg_events (user_id) +SELECT + raw_events_first.user_id +FROM + raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.user_id + WHERE raw_events_second.user_id IN (19, 20, 21); +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +NOTICE: cannot use shard pruning with ANY/ALL (array expression) +HINT: Consider rewriting the expression with OR/AND clauses. +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300000 raw_events_first JOIN public.raw_events_second_13300004 raw_events_second ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((raw_events_second.user_id = ANY (ARRAY[19, 20, 21])) AND ((hashint4(raw_events_first.user_id) >= '-2147483648'::integer) AND (hashint4(raw_events_first.user_id) <= '-1073741825'::integer))) +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +NOTICE: cannot use shard pruning with ANY/ALL (array expression) +HINT: Consider rewriting the expression with OR/AND clauses. +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300001 raw_events_first JOIN public.raw_events_second_13300005 raw_events_second ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((raw_events_second.user_id = ANY (ARRAY[19, 20, 21])) AND ((hashint4(raw_events_first.user_id) >= '-1073741824'::integer) AND (hashint4(raw_events_first.user_id) <= '-1'::integer))) +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300003 +NOTICE: cannot use shard pruning with ANY/ALL (array expression) +HINT: Consider rewriting the expression with OR/AND clauses. +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300002 raw_events_first JOIN public.raw_events_second_13300006 raw_events_second ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((raw_events_second.user_id = ANY (ARRAY[19, 20, 21])) AND ((hashint4(raw_events_first.user_id) >= 0) AND (hashint4(raw_events_first.user_id) <= 1073741823))) +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +NOTICE: cannot use shard pruning with ANY/ALL (array expression) +HINT: Consider rewriting the expression with OR/AND clauses. +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300003 raw_events_first JOIN public.raw_events_second_13300007 raw_events_second ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((raw_events_second.user_id = ANY (ARRAY[19, 20, 21])) AND ((hashint4(raw_events_first.user_id) >= 1073741824) AND (hashint4(raw_events_first.user_id) <= 2147483647))) +DEBUG: Plan is router executable +-- the following is a very tricky query for Citus +-- although we do not support pushing down JOINs on non-partition +-- columns here it is safe to push it down given that we're looking for +-- a specific value (i.e., value_1 = 12) on the joining column. +-- Note that the query always hits the same shard on raw_events_second +INSERT INTO agg_events + (user_id) +SELECT raw_events_first.user_id +FROM raw_events_first, + raw_events_second +WHERE raw_events_second.user_id = raw_events_first.value_1 + AND raw_events_first.value_1 = 12; +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM public.raw_events_first_13300000 raw_events_first, public.raw_events_second_13300007 raw_events_second WHERE (((raw_events_second.user_id = raw_events_first.value_1) AND (raw_events_first.value_1 = 12)) AND ((hashint4(raw_events_first.user_id) >= '-2147483648'::integer) AND (hashint4(raw_events_first.user_id) <= '-1073741825'::integer))) +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM public.raw_events_first_13300001 raw_events_first, public.raw_events_second_13300007 raw_events_second WHERE (((raw_events_second.user_id = raw_events_first.value_1) AND (raw_events_first.value_1 = 12)) AND ((hashint4(raw_events_first.user_id) >= '-1073741824'::integer) AND (hashint4(raw_events_first.user_id) <= '-1'::integer))) +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM public.raw_events_first_13300002 raw_events_first, public.raw_events_second_13300007 raw_events_second WHERE (((raw_events_second.user_id = raw_events_first.value_1) AND (raw_events_first.value_1 = 12)) AND ((hashint4(raw_events_first.user_id) >= 0) AND (hashint4(raw_events_first.user_id) <= 1073741823))) +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM public.raw_events_first_13300003 raw_events_first, public.raw_events_second_13300007 raw_events_second WHERE (((raw_events_second.user_id = raw_events_first.value_1) AND (raw_events_first.value_1 = 12)) AND ((hashint4(raw_events_first.user_id) >= 1073741824) AND (hashint4(raw_events_first.user_id) <= 2147483647))) +DEBUG: Plan is router executable +-- some unsupported LEFT/INNER JOINs +-- JOIN on one table with partition column other is not +INSERT INTO agg_events (user_id) +SELECT + raw_events_first.user_id +FROM + raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1; +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +ERROR: cannot perform distributed planning for the given modification +DETAIL: Select query cannot be pushed down to the worker. +-- same as the above with INNER JOIN +INSERT INTO agg_events (user_id) +SELECT + raw_events_first.user_id +FROM + raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1; +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +ERROR: cannot perform distributed planning for the given modification +DETAIL: Select query cannot be pushed down to the worker. +-- a not meaningful query +INSERT INTO agg_events + (user_id) +SELECT raw_events_second.user_id +FROM raw_events_first, + raw_events_second +WHERE raw_events_first.user_id = raw_events_first.value_1; +ERROR: cannot perform distributed planning for the given modification +DETAIL: Select query cannot be pushed down to the worker. +-- both tables joined on non-partition columns +INSERT INTO agg_events (user_id) +SELECT + raw_events_first.user_id +FROM + raw_events_first LEFT JOIN raw_events_second ON raw_events_first.value_1 = raw_events_second.value_1; +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +ERROR: cannot perform distributed planning for the given modification +DETAIL: Select query cannot be pushed down to the worker. +-- same as the above with INNER JOIN +INSERT INTO agg_events (user_id) +SELECT + raw_events_first.user_id +FROM + raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1; +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +ERROR: cannot perform distributed planning for the given modification +DETAIL: Select query cannot be pushed down to the worker. +-- although we do not support pushing down JOINs on non-partition +-- columns here it is safe to push it down given that we're looking for +-- a specific value (i.e., user_id = 10) on the joining column. +-- Note that the query always hits the same shard on raw_events_second +INSERT INTO agg_events (user_id) +SELECT + raw_events_first.user_id +FROM + raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1 +WHERE + raw_events_first.user_id = 10; +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300000 raw_events_first LEFT JOIN public.raw_events_second_13300004 raw_events_second ON ((raw_events_first.user_id = raw_events_second.value_1))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= '-2147483648'::integer) AND (hashint4(raw_events_first.user_id) <= '-1073741825'::integer))) +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300000 raw_events_first LEFT JOIN public.raw_events_second_13300005 raw_events_second ON ((raw_events_first.user_id = raw_events_second.value_1))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= '-1073741824'::integer) AND (hashint4(raw_events_first.user_id) <= '-1'::integer))) +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300000 raw_events_first LEFT JOIN public.raw_events_second_13300006 raw_events_second ON ((raw_events_first.user_id = raw_events_second.value_1))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= 0) AND (hashint4(raw_events_first.user_id) <= 1073741823))) +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300000 raw_events_first LEFT JOIN public.raw_events_second_13300007 raw_events_second ON ((raw_events_first.user_id = raw_events_second.value_1))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= 1073741824) AND (hashint4(raw_events_first.user_id) <= 2147483647))) +DEBUG: Plan is router executable +-- same as the above with INNER JOIN +-- however this time query is not pushed down +-- to the worker. This is related to how we process +-- restriction infos, which we're considering to +-- improve +INSERT INTO agg_events (user_id) +SELECT + raw_events_first.user_id +FROM + raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1 +WHERE raw_events_first.user_id = 10; +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +ERROR: cannot perform distributed planning for the given modification +DETAIL: Select query cannot be pushed down to the worker. +-- make things a bit more complicate with IN clauses +INSERT INTO agg_events (user_id) +SELECT + raw_events_first.user_id +FROM + raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1 + WHERE raw_events_first.value_1 IN (10, 11,12) OR raw_events_second.user_id IN (1,2,3,4); +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +ERROR: cannot perform distributed planning for the given modification +DETAIL: Select query cannot be pushed down to the worker. +-- implicit join on non partition column should also not be pushed down +INSERT INTO agg_events + (user_id) +SELECT raw_events_first.user_id +FROM raw_events_first, + raw_events_second +WHERE raw_events_second.user_id = raw_events_first.value_1; +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +ERROR: cannot perform distributed planning for the given modification +DETAIL: Select query cannot be pushed down to the worker. +-- the following is again a very tricky query for Citus +-- if the given filter was on value_1 as shown in the above, Citus could +-- push it down. But here the query is refused +INSERT INTO agg_events + (user_id) +SELECT raw_events_first.user_id +FROM raw_events_first, + raw_events_second +WHERE raw_events_second.user_id = raw_events_first.value_1 + AND raw_events_first.value_2 = 12; +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +ERROR: cannot perform distributed planning for the given modification +DETAIL: Select query cannot be pushed down to the worker. +-- foo is not joined on the partition key so the query is not +-- pushed down +INSERT INTO agg_events + (user_id, value_4_agg) +SELECT + outer_most.id, max(outer_most.value) +FROM +( + SELECT f2.id as id, f2.v4 as value FROM + (SELECT + id + FROM (SELECT reference_table.user_id AS id + FROM raw_events_first LEFT JOIN + reference_table + ON (raw_events_first.value_1 = reference_table.user_id)) AS foo) as f + INNER JOIN + (SELECT v4, + v1, + id + FROM (SELECT SUM(raw_events_second.value_4) AS v4, + SUM(raw_events_first.value_1) AS v1, + raw_events_second.user_id AS id + FROM raw_events_first, + raw_events_second + WHERE raw_events_first.user_id = raw_events_second.user_id + GROUP BY raw_events_second.user_id + HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2 +ON (f.id = f2.id)) as outer_most +GROUP BY + outer_most.id; +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: predicate pruning for shardId 13300007 +ERROR: cannot perform distributed planning for the given modification +DETAIL: Select query cannot be pushed down to the worker. +-- not equals on the partition column cannot be pushed down INSERT INTO agg_events (value_4_agg, value_1_agg, @@ -1170,6 +1638,18 @@ DEBUG: predicate pruning for shardId 13300006 DEBUG: predicate pruning for shardId 13300007 ERROR: cannot perform distributed planning for the given modification DETAIL: Select query cannot be pushed down to the worker. +-- cannot pushdown since subquery returns another column than partition key +INSERT INTO raw_events_second + (user_id) +SELECT user_id +FROM raw_events_first +WHERE user_id IN (SELECT value_2 + FROM raw_events_second); +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +ERROR: cannot perform distributed planning for the given modification +DETAIL: Select query cannot be pushed down to the worker. -- we currently not support grouping sets INSERT INTO agg_events (user_id, diff --git a/src/test/regress/sql/multi_insert_select.sql b/src/test/regress/sql/multi_insert_select.sql index 8c142d0d1..af0999101 100644 --- a/src/test/regress/sql/multi_insert_select.sql +++ b/src/test/regress/sql/multi_insert_select.sql @@ -496,7 +496,194 @@ FROM ((SELECT user_id FROM raw_events_first WHERE user_id = 15) EXCEPT (SELECT user_id FROM raw_events_second where user_id = 17)) as foo; --- unsupported JOIN +-- some supported LEFT joins +INSERT INTO agg_events (user_id) +SELECT + raw_events_first.user_id +FROM + raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.user_id; + +INSERT INTO agg_events (user_id) +SELECT + raw_events_second.user_id +FROM + reference_table LEFT JOIN raw_events_second ON reference_table.user_id = raw_events_second.user_id; + +INSERT INTO agg_events (user_id) +SELECT + raw_events_first.user_id +FROM + raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.user_id + WHERE raw_events_first.user_id = 10; + +INSERT INTO agg_events (user_id) +SELECT + raw_events_first.user_id +FROM + raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.user_id + WHERE raw_events_second.user_id = 10 OR raw_events_second.user_id = 11; + +INSERT INTO agg_events (user_id) +SELECT + raw_events_first.user_id +FROM + raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.user_id + WHERE raw_events_first.user_id = 10 AND raw_events_first.user_id = 20; + +INSERT INTO agg_events (user_id) +SELECT + raw_events_first.user_id +FROM + raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.user_id + WHERE raw_events_first.user_id = 10 AND raw_events_second.user_id = 20; + +INSERT INTO agg_events (user_id) +SELECT + raw_events_first.user_id +FROM + raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.user_id + WHERE raw_events_first.user_id IN (19, 20, 21); + +INSERT INTO agg_events (user_id) +SELECT + raw_events_first.user_id +FROM + raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.user_id + WHERE raw_events_second.user_id IN (19, 20, 21); + +-- the following is a very tricky query for Citus +-- although we do not support pushing down JOINs on non-partition +-- columns here it is safe to push it down given that we're looking for +-- a specific value (i.e., value_1 = 12) on the joining column. +-- Note that the query always hits the same shard on raw_events_second +INSERT INTO agg_events + (user_id) +SELECT raw_events_first.user_id +FROM raw_events_first, + raw_events_second +WHERE raw_events_second.user_id = raw_events_first.value_1 + AND raw_events_first.value_1 = 12; + +-- some unsupported LEFT/INNER JOINs +-- JOIN on one table with partition column other is not +INSERT INTO agg_events (user_id) +SELECT + raw_events_first.user_id +FROM + raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1; + +-- same as the above with INNER JOIN +INSERT INTO agg_events (user_id) +SELECT + raw_events_first.user_id +FROM + raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1; + +-- a not meaningful query +INSERT INTO agg_events + (user_id) +SELECT raw_events_second.user_id +FROM raw_events_first, + raw_events_second +WHERE raw_events_first.user_id = raw_events_first.value_1; + +-- both tables joined on non-partition columns +INSERT INTO agg_events (user_id) +SELECT + raw_events_first.user_id +FROM + raw_events_first LEFT JOIN raw_events_second ON raw_events_first.value_1 = raw_events_second.value_1; + +-- same as the above with INNER JOIN +INSERT INTO agg_events (user_id) +SELECT + raw_events_first.user_id +FROM + raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1; + +-- although we do not support pushing down JOINs on non-partition +-- columns here it is safe to push it down given that we're looking for +-- a specific value (i.e., user_id = 10) on the joining column. +-- Note that the query always hits the same shard on raw_events_second +INSERT INTO agg_events (user_id) +SELECT + raw_events_first.user_id +FROM + raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1 +WHERE + raw_events_first.user_id = 10; + +-- same as the above with INNER JOIN +-- however this time query is not pushed down +-- to the worker. This is related to how we process +-- restriction infos, which we're considering to +-- improve +INSERT INTO agg_events (user_id) +SELECT + raw_events_first.user_id +FROM + raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1 +WHERE raw_events_first.user_id = 10; + +-- make things a bit more complicate with IN clauses +INSERT INTO agg_events (user_id) +SELECT + raw_events_first.user_id +FROM + raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1 + WHERE raw_events_first.value_1 IN (10, 11,12) OR raw_events_second.user_id IN (1,2,3,4); + +-- implicit join on non partition column should also not be pushed down +INSERT INTO agg_events + (user_id) +SELECT raw_events_first.user_id +FROM raw_events_first, + raw_events_second +WHERE raw_events_second.user_id = raw_events_first.value_1; + +-- the following is again a very tricky query for Citus +-- if the given filter was on value_1 as shown in the above, Citus could +-- push it down. But here the query is refused +INSERT INTO agg_events + (user_id) +SELECT raw_events_first.user_id +FROM raw_events_first, + raw_events_second +WHERE raw_events_second.user_id = raw_events_first.value_1 + AND raw_events_first.value_2 = 12; + +-- foo is not joined on the partition key so the query is not +-- pushed down +INSERT INTO agg_events + (user_id, value_4_agg) +SELECT + outer_most.id, max(outer_most.value) +FROM +( + SELECT f2.id as id, f2.v4 as value FROM + (SELECT + id + FROM (SELECT reference_table.user_id AS id + FROM raw_events_first LEFT JOIN + reference_table + ON (raw_events_first.value_1 = reference_table.user_id)) AS foo) as f + INNER JOIN + (SELECT v4, + v1, + id + FROM (SELECT SUM(raw_events_second.value_4) AS v4, + SUM(raw_events_first.value_1) AS v1, + raw_events_second.user_id AS id + FROM raw_events_first, + raw_events_second + WHERE raw_events_first.user_id = raw_events_second.user_id + GROUP BY raw_events_second.user_id + HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2 +ON (f.id = f2.id)) as outer_most +GROUP BY + outer_most.id; + +-- not equals on the partition column cannot be pushed down INSERT INTO agg_events (value_4_agg, value_1_agg, @@ -672,6 +859,14 @@ outer_most.id, max(outer_most.value) ON (f.id != f2.id)) as outer_most GROUP BY outer_most.id; +-- cannot pushdown since subquery returns another column than partition key +INSERT INTO raw_events_second + (user_id) +SELECT user_id +FROM raw_events_first +WHERE user_id IN (SELECT value_2 + FROM raw_events_second); + -- we currently not support grouping sets INSERT INTO agg_events (user_id, From 1c321b7611ea7a8f90b6bd15b7dcf2be745ab00c Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Wed, 22 Feb 2017 14:01:03 +0200 Subject: [PATCH 2/3] Get prepared for subquery pushdown using INSERT ... SELECT logic With this commit, INSERT ... SELECT query planning does not need partition key on the top level target list. Instead, we first look for the top level JOIN query. If that fails, we search for the RTE_RELATION (or RTE_RELATIONs in case of set operations). Then, add the qual there. The idea here is that add the qual to the level that is the topmost in the query tree among the other options. Then, expect postgres planner to distributed that restriction to all other tables as well. This commit is not a hard requirement for INSERT ... SELECT query planning given that the planner would going to be error out on queries where top level query does not have partition column. --- .../planner/multi_router_planner.c | 194 ++++++++++++++++-- .../regress/expected/multi_insert_select.out | 46 +---- 2 files changed, 174 insertions(+), 66 deletions(-) diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index a10c53d68..aa2555767 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -135,6 +135,9 @@ static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query, subqueryRte, Oid * selectPartitionColumnTableId); +static Query * FindTopLevelJoinQuery(Query *query); +static void AddUninstantiatedEqualityQualToRelation(Query *query); +static Var * GetFirstTargetListVar(List *targetList); static void AddUninstantiatedEqualityQual(Query *query, Var *targetPartitionColumnVar); static DeferredErrorMessage * ErrorIfQueryHasModifyingCTE(Query *queryTree); @@ -1092,9 +1095,7 @@ AddUninstantiatedPartitionRestriction(Query *originalQuery) { Query *subquery = NULL; RangeTblEntry *subqueryEntry = NULL; - ListCell *targetEntryCell = NULL; - Var *targetPartitionColumnVar = NULL; - List *targetList = NULL; + Query *topLevelJoinQuery = NULL; Assert(InsertSelectQuery(originalQuery)); @@ -1113,31 +1114,180 @@ AddUninstantiatedPartitionRestriction(Query *originalQuery) return; } - /* iterate through the target list and find the partition column on the target list */ - targetList = subquery->targetList; - foreach(targetEntryCell, targetList) - { - TargetEntry *targetEntry = lfirst(targetEntryCell); - if (IsPartitionColumn(targetEntry->expr, subquery) && - IsA(targetEntry->expr, Var)) + topLevelJoinQuery = FindTopLevelJoinQuery(subquery); + if (topLevelJoinQuery != NULL) + { + FromExpr *joinTree = topLevelJoinQuery->jointree; + List *whereClauseList = QualifierList(joinTree); + List *joinClauseList = JoinClauseList(whereClauseList); + + OpExpr *operatorExpression = (OpExpr *) linitial(joinClauseList); + List *argumentList = operatorExpression->args; + + /* get left and right side of the expression */ + Node *leftArgument = (Node *) linitial(argumentList); + + List *leftColumnList = pull_var_clause_default(leftArgument); + Var *leftColumn = (Var *) linitial(leftColumnList); + + AddUninstantiatedEqualityQual(topLevelJoinQuery, leftColumn); + } + else + { + AddUninstantiatedEqualityQualToRelation(subquery); + } +} + + +/* + * FindTopLevelJoinQuery recursively traverses the query to find the + * top level JOIN query that exists in the query. If found, the query that + * includes the JOIN is returned. Else, NULL returned. + */ +static Query * +FindTopLevelJoinQuery(Query *query) +{ + FromExpr *joinTree = NULL; + List *whereClauseList = NULL; + List *joinClauseList = NULL; + RangeTblEntry *rte = NULL; + + joinTree = query->jointree; + whereClauseList = QualifierList(joinTree); + joinClauseList = JoinClauseList(whereClauseList); + + if (list_length(joinClauseList) > 0) + { + return query; + } + + rte = list_nth(query->rtable, 0); + if (rte->rtekind == RTE_RELATION) + { + return NULL; + } + + if (rte->rtekind == RTE_SUBQUERY) + { + return FindTopLevelJoinQuery(rte->subquery); + } + + return NULL; +} + + +/* + * AddUninstantiatedEqualityQualToRelation iterates over query's + * range table list and finds and adds the partition restriction + * to the range table entry that is either (i) RTE_RELATION or + * (ii) Recurse in to the two arguments of a set operation until + * RTE_RELATION is found. + * + * Once RTE_RELATION is found, add the qual using + * AddUninstantiatedEqualityQual(). + * + */ +static void +AddUninstantiatedEqualityQualToRelation(Query *query) +{ + List *rangeTableList = query->rtable; + ListCell *rteCell = NULL; + foreach(rteCell, rangeTableList) + { + RangeTblEntry *rte = lfirst(rteCell); + + if (rte->rtekind == RTE_RELATION) { - targetPartitionColumnVar = (Var *) targetEntry->expr; + if (IsDistributedTable(rte->relid) && PartitionColumn(rte->relid, 1) != NULL) + { + Var *targetVar = GetFirstTargetListVar(query->targetList); + if (targetVar) + { + AddUninstantiatedEqualityQual(query, PartitionColumn(rte->relid, + targetVar->varno)); + return; + } + } + } + else if (rte->rtekind == RTE_SUBQUERY) + { + Query *subquery = rte->subquery; + SetOperationStmt *unionStatement = NULL; + Query *leftQuery = NULL; + Query *rightQuery = NULL; + RangeTblRef *leftRangeTableReference = NULL; + RangeTblRef *rightRangeTableReference = NULL; + List *unionQueryRangeTableList = NULL; + int leftTableIndex = 0; + int rightTableIndex = 0; + RangeTblEntry *leftRangeTableEntry = NULL; + RangeTblEntry *rightRangeTableEntry = NULL; + + /* if does not have any set operations, recurse into the subquery */ + if (subquery->setOperations == NULL) + { + AddUninstantiatedEqualityQualToRelation(rte->subquery); + return; + } + + unionStatement = (SetOperationStmt *) subquery->setOperations; + + leftRangeTableReference = (RangeTblRef *) unionStatement->larg; + rightRangeTableReference = (RangeTblRef *) unionStatement->rarg; + unionQueryRangeTableList = subquery->rtable; + + leftTableIndex = leftRangeTableReference->rtindex - 1; + rightTableIndex = rightRangeTableReference->rtindex - 1; + + leftRangeTableEntry = (RangeTblEntry *) list_nth( + unionQueryRangeTableList, + leftTableIndex); + rightRangeTableEntry = (RangeTblEntry *) list_nth( + unionQueryRangeTableList, + rightTableIndex); + + Assert(leftRangeTableEntry->rtekind == RTE_SUBQUERY); + Assert(rightRangeTableEntry->rtekind == RTE_SUBQUERY); + + leftQuery = leftRangeTableEntry->subquery; + rightQuery = rightRangeTableEntry->subquery; + + AddUninstantiatedEqualityQualToRelation(leftQuery); + AddUninstantiatedEqualityQualToRelation(rightQuery); + + return; + } + else + { + ereport(DEBUG4, (errmsg("unexpected rte kind:%d", rte->rtekind))); + } + } +} + + +/* + * GetFirstTargetListVar iterates through the given target list entries and returns + * the first target list entry whose type is Var. Otherwise, the function returns NULL. + */ +static Var * +GetFirstTargetListVar(List *targetList) +{ + Var *targetVar = NULL; + ListCell *targetListCell = NULL; + + foreach(targetListCell, targetList) + { + TargetEntry *targetEntry = (TargetEntry *) lfirst(targetListCell); + + if (IsA(targetEntry->expr, Var)) + { + targetVar = (Var *) targetEntry->expr; break; } } - /* - * If we cannot find the bare partition column, no need to add the qual since - * we're already going to error out on the multi planner. - */ - if (!targetPartitionColumnVar) - { - return; - } - - /* finally add the equality qual of target column to subquery */ - AddUninstantiatedEqualityQual(subquery, targetPartitionColumnVar); + return targetVar; } diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index 16126f5cc..c0c0ca026 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -1225,35 +1225,8 @@ FROM raw_events_first, raw_events_second WHERE raw_events_second.user_id = raw_events_first.value_1 AND raw_events_first.value_1 = 12; -DEBUG: predicate pruning for shardId 13300001 -DEBUG: predicate pruning for shardId 13300002 -DEBUG: predicate pruning for shardId 13300003 -DEBUG: predicate pruning for shardId 13300004 -DEBUG: predicate pruning for shardId 13300005 -DEBUG: predicate pruning for shardId 13300006 -DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM public.raw_events_first_13300000 raw_events_first, public.raw_events_second_13300007 raw_events_second WHERE (((raw_events_second.user_id = raw_events_first.value_1) AND (raw_events_first.value_1 = 12)) AND ((hashint4(raw_events_first.user_id) >= '-2147483648'::integer) AND (hashint4(raw_events_first.user_id) <= '-1073741825'::integer))) -DEBUG: predicate pruning for shardId 13300000 -DEBUG: predicate pruning for shardId 13300002 -DEBUG: predicate pruning for shardId 13300003 -DEBUG: predicate pruning for shardId 13300004 -DEBUG: predicate pruning for shardId 13300005 -DEBUG: predicate pruning for shardId 13300006 -DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM public.raw_events_first_13300001 raw_events_first, public.raw_events_second_13300007 raw_events_second WHERE (((raw_events_second.user_id = raw_events_first.value_1) AND (raw_events_first.value_1 = 12)) AND ((hashint4(raw_events_first.user_id) >= '-1073741824'::integer) AND (hashint4(raw_events_first.user_id) <= '-1'::integer))) -DEBUG: predicate pruning for shardId 13300000 -DEBUG: predicate pruning for shardId 13300001 -DEBUG: predicate pruning for shardId 13300003 -DEBUG: predicate pruning for shardId 13300004 -DEBUG: predicate pruning for shardId 13300005 -DEBUG: predicate pruning for shardId 13300006 -DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM public.raw_events_first_13300002 raw_events_first, public.raw_events_second_13300007 raw_events_second WHERE (((raw_events_second.user_id = raw_events_first.value_1) AND (raw_events_first.value_1 = 12)) AND ((hashint4(raw_events_first.user_id) >= 0) AND (hashint4(raw_events_first.user_id) <= 1073741823))) -DEBUG: predicate pruning for shardId 13300000 -DEBUG: predicate pruning for shardId 13300001 -DEBUG: predicate pruning for shardId 13300002 -DEBUG: predicate pruning for shardId 13300004 -DEBUG: predicate pruning for shardId 13300005 -DEBUG: predicate pruning for shardId 13300006 -DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM public.raw_events_first_13300003 raw_events_first, public.raw_events_second_13300007 raw_events_second WHERE (((raw_events_second.user_id = raw_events_first.value_1) AND (raw_events_first.value_1 = 12)) AND ((hashint4(raw_events_first.user_id) >= 1073741824) AND (hashint4(raw_events_first.user_id) <= 2147483647))) -DEBUG: Plan is router executable +ERROR: cannot perform distributed planning for the given modification +DETAIL: Select query cannot be pushed down to the worker. -- some unsupported LEFT/INNER JOINs -- JOIN on one table with partition column other is not INSERT INTO agg_events (user_id) @@ -1292,9 +1265,6 @@ SELECT raw_events_first.user_id FROM raw_events_first LEFT JOIN raw_events_second ON raw_events_first.value_1 = raw_events_second.value_1; -DEBUG: predicate pruning for shardId 13300001 -DEBUG: predicate pruning for shardId 13300002 -DEBUG: predicate pruning for shardId 13300003 ERROR: cannot perform distributed planning for the given modification DETAIL: Select query cannot be pushed down to the worker. -- same as the above with INNER JOIN @@ -1383,9 +1353,6 @@ SELECT raw_events_first.user_id FROM raw_events_first, raw_events_second WHERE raw_events_second.user_id = raw_events_first.value_1; -DEBUG: predicate pruning for shardId 13300001 -DEBUG: predicate pruning for shardId 13300002 -DEBUG: predicate pruning for shardId 13300003 ERROR: cannot perform distributed planning for the given modification DETAIL: Select query cannot be pushed down to the worker. -- the following is again a very tricky query for Citus @@ -1398,9 +1365,6 @@ FROM raw_events_first, raw_events_second WHERE raw_events_second.user_id = raw_events_first.value_1 AND raw_events_first.value_2 = 12; -DEBUG: predicate pruning for shardId 13300001 -DEBUG: predicate pruning for shardId 13300002 -DEBUG: predicate pruning for shardId 13300003 ERROR: cannot perform distributed planning for the given modification DETAIL: Select query cannot be pushed down to the worker. -- foo is not joined on the partition key so the query is not @@ -1630,12 +1594,6 @@ outer_most.id, max(outer_most.value) HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2 ON (f.id != f2.id)) as outer_most GROUP BY outer_most.id; -DEBUG: predicate pruning for shardId 13300001 -DEBUG: predicate pruning for shardId 13300002 -DEBUG: predicate pruning for shardId 13300003 -DEBUG: predicate pruning for shardId 13300005 -DEBUG: predicate pruning for shardId 13300006 -DEBUG: predicate pruning for shardId 13300007 ERROR: cannot perform distributed planning for the given modification DETAIL: Select query cannot be pushed down to the worker. -- cannot pushdown since subquery returns another column than partition key From 22fd1767b188d2a488e2fc1e8453e54523b4f834 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Wed, 22 Feb 2017 15:56:59 +0200 Subject: [PATCH 3/3] Temporarily disable some reference table tests --- .../expected/multi_reference_table.out | 48 +++++++------------ .../regress/sql/multi_reference_table.sql | 36 +++++++------- 2 files changed, 36 insertions(+), 48 deletions(-) diff --git a/src/test/regress/expected/multi_reference_table.out b/src/test/regress/expected/multi_reference_table.out index 7ff726ce1..3742605b2 100644 --- a/src/test/regress/expected/multi_reference_table.out +++ b/src/test/regress/expected/multi_reference_table.out @@ -1104,37 +1104,25 @@ WHERE ERROR: only reference tables may be queried when targeting a reference table with INSERT ... SELECT -- now, insert into the hash partitioned table and use reference -- tables in the SELECT queries -INSERT INTO - colocated_table_test (value_1, value_2) -SELECT - colocated_table_test_2.value_1, reference_table_test.value_2 -FROM - colocated_table_test_2, reference_table_test -WHERE - colocated_table_test_2.value_4 = reference_table_test.value_4 -RETURNING value_1, value_2; - value_1 | value_2 ----------+--------- - 1 | 1 - 2 | 2 -(2 rows) - +--INSERT INTO +-- colocated_table_test (value_1, value_2) +--SELECT +-- colocated_table_test_2.value_1, reference_table_test.value_2 +--FROM +-- colocated_table_test_2, reference_table_test +--WHERE +-- colocated_table_test_2.value_4 = reference_table_test.value_4 +--RETURNING value_1, value_2; -- some more complex queries (Note that there are more complex queries in multi_insert_select.sql) -INSERT INTO - colocated_table_test (value_1, value_2) -SELECT - colocated_table_test_2.value_1, reference_table_test.value_2 -FROM - colocated_table_test_2, reference_table_test -WHERE - colocated_table_test_2.value_2 = reference_table_test.value_2 -RETURNING value_1, value_2; - value_1 | value_2 ----------+--------- - 1 | 1 - 2 | 2 -(2 rows) - +--INSERT INTO +-- colocated_table_test (value_1, value_2) +--SELECT +-- colocated_table_test_2.value_1, reference_table_test.value_2 +--FROM +-- colocated_table_test_2, reference_table_test +--WHERE +-- colocated_table_test_2.value_2 = reference_table_test.value_2 +--RETURNING value_1, value_2; -- partition column value comes from reference table but still first error is -- on data type mismatch INSERT INTO diff --git a/src/test/regress/sql/multi_reference_table.sql b/src/test/regress/sql/multi_reference_table.sql index ddae075a7..666ca90f6 100644 --- a/src/test/regress/sql/multi_reference_table.sql +++ b/src/test/regress/sql/multi_reference_table.sql @@ -700,26 +700,26 @@ WHERE -- now, insert into the hash partitioned table and use reference -- tables in the SELECT queries -INSERT INTO - colocated_table_test (value_1, value_2) -SELECT - colocated_table_test_2.value_1, reference_table_test.value_2 -FROM - colocated_table_test_2, reference_table_test -WHERE - colocated_table_test_2.value_4 = reference_table_test.value_4 -RETURNING value_1, value_2; +--INSERT INTO +-- colocated_table_test (value_1, value_2) +--SELECT +-- colocated_table_test_2.value_1, reference_table_test.value_2 +--FROM +-- colocated_table_test_2, reference_table_test +--WHERE +-- colocated_table_test_2.value_4 = reference_table_test.value_4 +--RETURNING value_1, value_2; -- some more complex queries (Note that there are more complex queries in multi_insert_select.sql) -INSERT INTO - colocated_table_test (value_1, value_2) -SELECT - colocated_table_test_2.value_1, reference_table_test.value_2 -FROM - colocated_table_test_2, reference_table_test -WHERE - colocated_table_test_2.value_2 = reference_table_test.value_2 -RETURNING value_1, value_2; +--INSERT INTO +-- colocated_table_test (value_1, value_2) +--SELECT +-- colocated_table_test_2.value_1, reference_table_test.value_2 +--FROM +-- colocated_table_test_2, reference_table_test +--WHERE +-- colocated_table_test_2.value_2 = reference_table_test.value_2 +--RETURNING value_1, value_2; -- partition column value comes from reference table but still first error is -- on data type mismatch