From e8b793c454e9a908cc1b5a60f0833a8877428b4b Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Tue, 2 May 2017 11:08:02 -0700 Subject: [PATCH] Support for IN (const, list) and = ANY(const, b, c) pruning. --- .../distributed/planner/shard_pruning.c | 144 ++++++++++++++---- .../regress/expected/multi_hash_pruning.out | 105 ++++++++++++- .../regress/expected/multi_insert_select.out | 20 +-- .../expected/multi_mx_router_planner.out | 10 +- .../regress/expected/multi_router_planner.out | 10 +- src/test/regress/sql/multi_hash_pruning.sql | 50 +++++- 6 files changed, 260 insertions(+), 79 deletions(-) diff --git a/src/backend/distributed/planner/shard_pruning.c b/src/backend/distributed/planner/shard_pruning.c index 7d32ad09e..327dd694b 100644 --- a/src/backend/distributed/planner/shard_pruning.c +++ b/src/backend/distributed/planner/shard_pruning.c @@ -61,7 +61,9 @@ #include "distributed/pg_dist_partition.h" #include "distributed/worker_protocol.h" #include "nodes/nodeFuncs.h" +#include "nodes/makefuncs.h" #include "optimizer/clauses.h" +#include "utils/arrayaccess.h" #include "utils/catcache.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -169,8 +171,12 @@ static bool PrunableExpressionsWalker(Node *originalNode, ClauseWalkerContext *c static void AddPartitionKeyRestrictionToInstance(ClauseWalkerContext *context, OpExpr *opClause, Var *varClause, Const *constantClause); +static void AddSAOPartitionKeyRestrictionToInstance(ClauseWalkerContext *context, + ScalarArrayOpExpr * + arrayOperatorExpression); static void AddHashRestrictionToInstance(ClauseWalkerContext *context, OpExpr *opClause, Var *varClause, Const *constantClause); +static void AddNewConjuction(ClauseWalkerContext *context, OpExpr *op); static PruningInstance * CopyPartialPruningInstance(PruningInstance *sourceInstance); static List * ShardArrayToList(ShardInterval **shardArray, int length); static List * DeepCopyShardIntervalList(List *originalShardIntervalList); @@ -541,40 +547,8 @@ PrunableExpressionsWalker(Node *node, ClauseWalkerContext *context) } else if (IsA(node, ScalarArrayOpExpr)) { - PruningInstance *prune = context->currentPruningInstance; ScalarArrayOpExpr *arrayOperatorExpression = (ScalarArrayOpExpr *) node; - Node *leftOpExpression = linitial(arrayOperatorExpression->args); - Node *strippedLeftOpExpression = strip_implicit_coercions(leftOpExpression); - bool usingEqualityOperator = OperatorImplementsEquality( - arrayOperatorExpression->opno); - - /* - * Citus cannot prune hash-distributed shards with ANY/ALL. We show a NOTICE - * if the expression is ANY/ALL performed on the partition column with equality. - * - * TODO: this'd now be easy to implement, similar to the OR_EXPR case - * above, except that one would push an appropriately constructed - * OpExpr(LHS = $array_element) as continueAt. - */ - if (usingEqualityOperator && strippedLeftOpExpression != NULL && - equal(strippedLeftOpExpression, context->partitionColumn)) - { - ereport(NOTICE, (errmsg("cannot use shard pruning with " - "ANY/ALL (array expression)"), - errhint("Consider rewriting the expression with " - "OR/AND clauses."))); - } - - /* - * Mark expression as added, so we'll fail pruning if there's no ANDed - * restrictions that we can deal with. - */ - if (!prune->addedToPruningInstances) - { - context->pruningInstances = lappend(context->pruningInstances, - prune); - prune->addedToPruningInstances = true; - } + AddSAOPartitionKeyRestrictionToInstance(context, arrayOperatorExpression); return false; } @@ -600,6 +574,110 @@ PrunableExpressionsWalker(Node *node, ClauseWalkerContext *context) } +/* + * AddSAOPartitionKeyRestrictionToInstance adds partcol = arrayelem operator + * restriction to the current pruning instance for each element of the array. These + * restrictions are added to pruning instance to prune shards based on IN/=ANY + * constraints. + */ +static void +AddSAOPartitionKeyRestrictionToInstance(ClauseWalkerContext *context, + ScalarArrayOpExpr *arrayOperatorExpression) +{ + PruningInstance *prune = context->currentPruningInstance; + Node *leftOpExpression = linitial(arrayOperatorExpression->args); + Node *strippedLeftOpExpression = strip_implicit_coercions(leftOpExpression); + bool usingEqualityOperator = OperatorImplementsEquality( + arrayOperatorExpression->opno); + Expr *arrayArgument = (Expr *) lsecond(arrayOperatorExpression->args); + + /* checking for partcol = ANY(const, value, s); or partcol IN (const,b,c); */ + if (usingEqualityOperator && strippedLeftOpExpression != NULL && + equal(strippedLeftOpExpression, context->partitionColumn) && + IsA(arrayArgument, Const)) + { + ArrayType *array = NULL; + int16 typlen = 0; + bool typbyval = false; + char typalign = '\0'; + Oid elementType = 0; + ArrayIterator arrayIterator = NULL; + Datum arrayElement = 0; + Datum inArray = ((Const *) arrayArgument)->constvalue; + bool isNull = false; + + /* check for the NULL right-hand expression*/ + if (inArray == 0) + { + return; + } + + array = DatumGetArrayTypeP(((Const *) arrayArgument)->constvalue); + + /* get the necessary information from array type to iterate over it */ + elementType = ARR_ELEMTYPE(array); + get_typlenbyvalalign(elementType, + &typlen, + &typbyval, + &typalign); + + /* Iterate over the righthand array of expression */ + arrayIterator = array_create_iterator(array, 0, NULL); + while (array_iterate(arrayIterator, &arrayElement, &isNull)) + { + OpExpr *arrayEqualityOp = NULL; + Const *constElement = makeConst(elementType, -1, + DEFAULT_COLLATION_OID, typlen, arrayElement, + isNull, typbyval); + + /* build partcol = arrayelem operator */ + arrayEqualityOp = makeNode(OpExpr); + arrayEqualityOp->opno = arrayOperatorExpression->opno; + arrayEqualityOp->opfuncid = arrayOperatorExpression->opfuncid; + arrayEqualityOp->inputcollid = arrayOperatorExpression->inputcollid; + arrayEqualityOp->opresulttype = get_func_rettype( + arrayOperatorExpression->opfuncid); + arrayEqualityOp->opcollid = DEFAULT_COLLATION_OID; + arrayEqualityOp->location = -1; + arrayEqualityOp->args = list_make2(strippedLeftOpExpression, constElement); + + AddNewConjuction(context, arrayEqualityOp); + } + } + + /* Since we could not deal with the constraint, add the pruning instance to + * pruning instance list and labeled it as added. + */ + else if (!prune->addedToPruningInstances) + { + context->pruningInstances = lappend(context->pruningInstances, prune); + prune->addedToPruningInstances = true; + } +} + + +/* + * AddNewConjuction adds the OpExpr to pending instance list of context + * as conjunction as partial instance. + */ +static void +AddNewConjuction(ClauseWalkerContext *context, OpExpr *op) +{ + PendingPruningInstance *instance = palloc0(sizeof(PendingPruningInstance)); + + instance->instance = context->currentPruningInstance; + instance->continueAt = (Node *) op; + + /* + * Signal that this instance is not to be used for pruning on + * its own. Once the pending instance is processed, it'll be + * used. + */ + instance->instance->isPartial = true; + context->pendingInstances = lappend(context->pendingInstances, instance); +} + + /* * AddPartitionKeyRestrictionToInstance adds information about a PartitionKey * $op Const restriction to the current pruning instance. diff --git a/src/test/regress/expected/multi_hash_pruning.out b/src/test/regress/expected/multi_hash_pruning.out index ecf85a469..28eb3b1fa 100644 --- a/src/test/regress/expected/multi_hash_pruning.out +++ b/src/test/regress/expected/multi_hash_pruning.out @@ -188,19 +188,108 @@ DEBUG: Plan is router executable 0 (1 row) --- Check that we don't support pruning for ANY (array expression) and give --- a notice message when used with the partition column -SELECT count(*) FROM orders_hash_partitioned - WHERE o_orderkey = ANY ('{1,2,3}'); -NOTICE: cannot use shard pruning with ANY/ALL (array expression) -HINT: Consider rewriting the expression with OR/AND clauses. -NOTICE: cannot use shard pruning with ANY/ALL (array expression) -HINT: Consider rewriting the expression with OR/AND clauses. +SET client_min_messages TO DEFAULT; +-- Check that we support runing for ANY/IN with literal. +SELECT count(*) FROM lineitem_hash_part + WHERE l_orderkey = ANY ('{1,2,3}'); + count +------- + 13 +(1 row) + +SELECT count(*) FROM lineitem_hash_part + WHERE l_orderkey IN (1,2,3); + count +------- + 13 +(1 row) + +-- Check whether we can deal with null arrays +SELECT count(*) FROM lineitem_hash_part + WHERE l_orderkey IN (NULL); count ------- 0 (1 row) +SELECT count(*) FROM lineitem_hash_part + WHERE l_orderkey = ANY (NULL); + count +------- + 0 +(1 row) + +SELECT count(*) FROM lineitem_hash_part + WHERE l_orderkey IN (NULL) OR TRUE; + count +------- + 12000 +(1 row) + +SELECT count(*) FROM lineitem_hash_part + WHERE l_orderkey = ANY (NULL) OR TRUE; + count +------- + 12000 +(1 row) + +-- Check whether we support IN/ANY in subquery +SELECT count(*) FROM lineitem_hash_part WHERE l_orderkey IN (SELECT l_orderkey FROM lineitem_hash_part); + count +------- + 12000 +(1 row) + +SELECT count(*) FROM lineitem_hash_part WHERE l_orderkey = ANY (SELECT l_orderkey FROM lineitem_hash_part); + count +------- + 12000 +(1 row) + +-- Check whether we support IN/ANY in subquery with append and range distributed table +SELECT count(*) FROM lineitem + WHERE l_orderkey = ANY ('{1,2,3}'); + count +------- + 13 +(1 row) + +SELECT count(*) FROM lineitem + WHERE l_orderkey IN (1,2,3); + count +------- + 13 +(1 row) + +SELECT count(*) FROM lineitem + WHERE l_orderkey = ANY(NULL) OR TRUE; + count +------- + 12000 +(1 row) + +SELECT count(*) FROM lineitem_range + WHERE l_orderkey = ANY ('{1,2,3}'); + count +------- + 13 +(1 row) + +SELECT count(*) FROM lineitem_range + WHERE l_orderkey IN (1,2,3); + count +------- + 13 +(1 row) + +SELECT count(*) FROM lineitem_range + WHERE l_orderkey = ANY(NULL) OR TRUE; + count +------- + 12000 +(1 row) + +SET client_min_messages TO DEBUG2; -- Check that we don't show the message if the operator is not -- equality operator SELECT count(*) FROM orders_hash_partitioned diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index 67a21468b..91494ee9b 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -770,18 +770,10 @@ DEBUG: Plan is router executable FROM raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.user_id WHERE raw_events_first.user_id IN (19, 20, 21); -NOTICE: cannot use shard pruning with ANY/ALL (array expression) -HINT: Consider rewriting the expression with OR/AND clauses. DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300000 raw_events_first LEFT JOIN public.raw_events_second_13300004 raw_events_second ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((raw_events_first.user_id = ANY (ARRAY[19, 20, 21])) AND ((worker_hash(raw_events_first.user_id) >= '-2147483648'::integer) AND (worker_hash(raw_events_first.user_id) <= '-1073741825'::integer))) -NOTICE: cannot use shard pruning with ANY/ALL (array expression) -HINT: Consider rewriting the expression with OR/AND clauses. DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300001 raw_events_first LEFT JOIN public.raw_events_second_13300005 raw_events_second ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((raw_events_first.user_id = ANY (ARRAY[19, 20, 21])) AND ((worker_hash(raw_events_first.user_id) >= '-1073741824'::integer) AND (worker_hash(raw_events_first.user_id) <= '-1'::integer))) -NOTICE: cannot use shard pruning with ANY/ALL (array expression) -HINT: Consider rewriting the expression with OR/AND clauses. DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300002 raw_events_first LEFT JOIN public.raw_events_second_13300006 raw_events_second ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((raw_events_first.user_id = ANY (ARRAY[19, 20, 21])) AND ((worker_hash(raw_events_first.user_id) >= 0) AND (worker_hash(raw_events_first.user_id) <= 1073741823))) -NOTICE: cannot use shard pruning with ANY/ALL (array expression) -HINT: Consider rewriting the expression with OR/AND clauses. -DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300003 raw_events_first LEFT JOIN public.raw_events_second_13300007 raw_events_second ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((raw_events_first.user_id = ANY (ARRAY[19, 20, 21])) AND ((worker_hash(raw_events_first.user_id) >= 1073741824) AND (worker_hash(raw_events_first.user_id) <= 2147483647))) +DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM ((SELECT NULL::integer AS user_id, NULL::timestamp without time zone AS "time", NULL::integer AS value_1, NULL::integer AS value_2, NULL::double precision AS value_3, NULL::bigint AS value_4 WHERE false) raw_events_first(user_id, "time", value_1, value_2, value_3, value_4) LEFT JOIN public.raw_events_second_13300007 raw_events_second ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((raw_events_first.user_id = ANY (ARRAY[19, 20, 21])) AND ((worker_hash(raw_events_first.user_id) >= 1073741824) AND (worker_hash(raw_events_first.user_id) <= 2147483647))) DEBUG: Plan is router executable INSERT INTO agg_events (user_id) @@ -790,18 +782,10 @@ DEBUG: Plan is router executable FROM raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.user_id WHERE raw_events_second.user_id IN (19, 20, 21); -NOTICE: cannot use shard pruning with ANY/ALL (array expression) -HINT: Consider rewriting the expression with OR/AND clauses. DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300000 raw_events_first JOIN public.raw_events_second_13300004 raw_events_second ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((raw_events_second.user_id = ANY (ARRAY[19, 20, 21])) AND ((worker_hash(raw_events_first.user_id) >= '-2147483648'::integer) AND (worker_hash(raw_events_first.user_id) <= '-1073741825'::integer))) -NOTICE: cannot use shard pruning with ANY/ALL (array expression) -HINT: Consider rewriting the expression with OR/AND clauses. DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300001 raw_events_first JOIN public.raw_events_second_13300005 raw_events_second ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((raw_events_second.user_id = ANY (ARRAY[19, 20, 21])) AND ((worker_hash(raw_events_first.user_id) >= '-1073741824'::integer) AND (worker_hash(raw_events_first.user_id) <= '-1'::integer))) -NOTICE: cannot use shard pruning with ANY/ALL (array expression) -HINT: Consider rewriting the expression with OR/AND clauses. DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300002 raw_events_first JOIN public.raw_events_second_13300006 raw_events_second ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((raw_events_second.user_id = ANY (ARRAY[19, 20, 21])) AND ((worker_hash(raw_events_first.user_id) >= 0) AND (worker_hash(raw_events_first.user_id) <= 1073741823))) -NOTICE: cannot use shard pruning with ANY/ALL (array expression) -HINT: Consider rewriting the expression with OR/AND clauses. -DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300003 raw_events_first JOIN public.raw_events_second_13300007 raw_events_second ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((raw_events_second.user_id = ANY (ARRAY[19, 20, 21])) AND ((worker_hash(raw_events_first.user_id) >= 1073741824) AND (worker_hash(raw_events_first.user_id) <= 2147483647))) +DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300003 raw_events_first JOIN (SELECT NULL::integer AS user_id, NULL::timestamp without time zone AS "time", NULL::integer AS value_1, NULL::integer AS value_2, NULL::double precision AS value_3, NULL::bigint AS value_4 WHERE false) raw_events_second(user_id, "time", value_1, value_2, value_3, value_4) ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((raw_events_second.user_id = ANY (ARRAY[19, 20, 21])) AND ((worker_hash(raw_events_first.user_id) >= 1073741824) AND (worker_hash(raw_events_first.user_id) <= 2147483647))) DEBUG: Plan is router executable -- the following is a very tricky query for Citus diff --git a/src/test/regress/expected/multi_mx_router_planner.out b/src/test/regress/expected/multi_mx_router_planner.out index 4bee82e09..58c471833 100644 --- a/src/test/regress/expected/multi_mx_router_planner.out +++ b/src/test/regress/expected/multi_mx_router_planner.out @@ -197,10 +197,8 @@ SELECT * FROM articles_hash_mx WHERE author_id <= 1; (5 rows) SELECT * FROM articles_hash_mx WHERE author_id IN (1, 3); -NOTICE: cannot use shard pruning with ANY/ALL (array expression) -HINT: Consider rewriting the expression with OR/AND clauses. -NOTICE: cannot use shard pruning with ANY/ALL (array expression) -HINT: Consider rewriting the expression with OR/AND clauses. +DEBUG: Creating router plan +DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+--------------+------------ 1 | 1 | arsenous | 9572 @@ -1368,10 +1366,6 @@ DROP MATERIALIZED VIEW mv_articles_hash_mx; SET client_min_messages to 'DEBUG2'; CREATE MATERIALIZED VIEW mv_articles_hash_mx_error AS SELECT * FROM articles_hash_mx WHERE author_id in (1,2); -NOTICE: cannot use shard pruning with ANY/ALL (array expression) -HINT: Consider rewriting the expression with OR/AND clauses. -NOTICE: cannot use shard pruning with ANY/ALL (array expression) -HINT: Consider rewriting the expression with OR/AND clauses. -- router planner/executor is disabled for task-tracker executor -- following query is router plannable, but router planner is disabled diff --git a/src/test/regress/expected/multi_router_planner.out b/src/test/regress/expected/multi_router_planner.out index 273a6e826..3a1f83452 100644 --- a/src/test/regress/expected/multi_router_planner.out +++ b/src/test/regress/expected/multi_router_planner.out @@ -257,10 +257,8 @@ SELECT * FROM articles_hash WHERE author_id <= 1; (5 rows) SELECT * FROM articles_hash WHERE author_id IN (1, 3); -NOTICE: cannot use shard pruning with ANY/ALL (array expression) -HINT: Consider rewriting the expression with OR/AND clauses. -NOTICE: cannot use shard pruning with ANY/ALL (array expression) -HINT: Consider rewriting the expression with OR/AND clauses. +DEBUG: Creating router plan +DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+--------------+------------ 1 | 1 | arsenous | 9572 @@ -2071,10 +2069,6 @@ SELECT * FROM mv_articles_hash_empty; CREATE MATERIALIZED VIEW mv_articles_hash_data AS SELECT * FROM articles_hash WHERE author_id in (1,2); -NOTICE: cannot use shard pruning with ANY/ALL (array expression) -HINT: Consider rewriting the expression with OR/AND clauses. -NOTICE: cannot use shard pruning with ANY/ALL (array expression) -HINT: Consider rewriting the expression with OR/AND clauses. SELECT * FROM mv_articles_hash_data; id | author_id | title | word_count ----+-----------+--------------+------------ diff --git a/src/test/regress/sql/multi_hash_pruning.sql b/src/test/regress/sql/multi_hash_pruning.sql index 9ac77f710..5e95867d5 100644 --- a/src/test/regress/sql/multi_hash_pruning.sql +++ b/src/test/regress/sql/multi_hash_pruning.sql @@ -67,10 +67,52 @@ SELECT count(*) FROM orders_hash_partitioned SELECT count(*) FROM (SELECT o_orderkey FROM orders_hash_partitioned WHERE o_orderkey = 1) AS orderkeys; --- Check that we don't support pruning for ANY (array expression) and give --- a notice message when used with the partition column -SELECT count(*) FROM orders_hash_partitioned - WHERE o_orderkey = ANY ('{1,2,3}'); +SET client_min_messages TO DEFAULT; + +-- Check that we support runing for ANY/IN with literal. +SELECT count(*) FROM lineitem_hash_part + WHERE l_orderkey = ANY ('{1,2,3}'); + +SELECT count(*) FROM lineitem_hash_part + WHERE l_orderkey IN (1,2,3); + +-- Check whether we can deal with null arrays +SELECT count(*) FROM lineitem_hash_part + WHERE l_orderkey IN (NULL); + +SELECT count(*) FROM lineitem_hash_part + WHERE l_orderkey = ANY (NULL); + +SELECT count(*) FROM lineitem_hash_part + WHERE l_orderkey IN (NULL) OR TRUE; + +SELECT count(*) FROM lineitem_hash_part + WHERE l_orderkey = ANY (NULL) OR TRUE; + +-- Check whether we support IN/ANY in subquery +SELECT count(*) FROM lineitem_hash_part WHERE l_orderkey IN (SELECT l_orderkey FROM lineitem_hash_part); +SELECT count(*) FROM lineitem_hash_part WHERE l_orderkey = ANY (SELECT l_orderkey FROM lineitem_hash_part); + +-- Check whether we support IN/ANY in subquery with append and range distributed table +SELECT count(*) FROM lineitem + WHERE l_orderkey = ANY ('{1,2,3}'); + +SELECT count(*) FROM lineitem + WHERE l_orderkey IN (1,2,3); + +SELECT count(*) FROM lineitem + WHERE l_orderkey = ANY(NULL) OR TRUE; + +SELECT count(*) FROM lineitem_range + WHERE l_orderkey = ANY ('{1,2,3}'); + +SELECT count(*) FROM lineitem_range + WHERE l_orderkey IN (1,2,3); + +SELECT count(*) FROM lineitem_range + WHERE l_orderkey = ANY(NULL) OR TRUE; + +SET client_min_messages TO DEBUG2; -- Check that we don't show the message if the operator is not -- equality operator