diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 69b057b93..70a59ec66 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -72,6 +72,19 @@ typedef struct WalkerState bool badCoalesce; } WalkerState; +typedef struct InstantiateQualWalker +{ + ShardInterval *targetShardInterval; + Var *relationPartitionColumn; +}InstantiateQualWalker; + +typedef struct EqualityCheckOnPartitionColumnWalker +{ + Var *relationPartitionColumn; + bool partitionColumnEqualityExists; +}PartitionColumnEqualityCheckWalker; + + bool EnableRouterExecution = true; /* planner functions forward declarations */ @@ -128,6 +141,7 @@ static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query, Oid * selectPartitionColumnTableId); static void AddUninstantiatedEqualityQual(Query *query, Var *targetPartitionColumnVar); +static Node * PartitionColumnEqualityWalker(Node *originalNode, void *context); static DeferredErrorMessage * ErrorIfQueryHasModifyingCTE(Query *queryTree); @@ -356,11 +370,13 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter ListCell *restrictionCell = NULL; Task *modifyTask = NULL; List *selectPlacementList = NIL; + List *insertShardPlacementList = NULL; + List *intersectedPlacementList = NULL; uint64 selectAnchorShardId = INVALID_SHARD_ID; List *relationShardList = NIL; uint64 jobId = INVALID_JOB_ID; - List *insertShardPlacementList = NULL; - List *intersectedPlacementList = NULL; + ShardInterval *anchorShardInterval = NULL; + ListCell *relationShardCell = NULL; bool routerPlannable = false; bool upsertQuery = false; bool replacePrunedQueryWithDummy = false; @@ -377,6 +393,9 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter { RelationRestriction *restriction = lfirst(restrictionCell); List *originalBaserestrictInfo = restriction->relOptInfo->baserestrictinfo; + InstantiateQualWalker *instantiateQualWalker = palloc0( + sizeof(InstantiateQualWalker)); + Var *relationPartitionKey = PartitionKey(restriction->relationId); /* * We haven't added the quals if all participating tables are reference @@ -387,9 +406,12 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter break; } + instantiateQualWalker->relationPartitionColumn = relationPartitionKey; + instantiateQualWalker->targetShardInterval = shardInterval; + originalBaserestrictInfo = (List *) InstantiatePartitionQual((Node *) originalBaserestrictInfo, - shardInterval); + instantiateQualWalker); } /* @@ -436,15 +458,40 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter return NULL; } + /* + * If the anchorShardInterval belongs to a reference table and the target tables it + * not a reference table, find another shard interval that does not belong to a + * reference table. + */ + anchorShardInterval = LoadShardInterval(selectAnchorShardId); + if (PartitionMethod(anchorShardInterval->relationId) == DISTRIBUTE_BY_NONE && + !restrictionContext->allReferenceTables) + { + foreach(relationShardCell, relationShardList) + { + RelationShard *relationShard = (RelationShard *) lfirst(relationShardCell); + + if (PartitionMethod(relationShard->relationId) != DISTRIBUTE_BY_NONE) + { + anchorShardInterval = LoadShardInterval(relationShard->shardId); + break; + } + } + } + + /* this case indicate*/ + if (!ShardsColocated(anchorShardInterval, shardInterval)) + { + ereport(DEBUG2, (errmsg("Skipping target shard interval %ld since " + "SELECT query for it pruned away", shardId))); + + return NULL; + } + /* get the placements for insert target shard and its intersection with select */ insertShardPlacementList = FinalizedShardPlacementList(shardId); intersectedPlacementList = IntersectPlacementList(insertShardPlacementList, selectPlacementList); - - /* - * If insert target does not have exactly the same placements with the select, - * we sholdn't run the query. - */ if (list_length(insertShardPlacementList) != list_length(intersectedPlacementList)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), @@ -454,7 +501,6 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter "for shard %ld", shardId))); } - /* this is required for correct deparsing of the query */ ReorderInsertSelectTargetLists(copiedQuery, copiedInsertRte, copiedSubqueryRte); @@ -479,7 +525,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter modifyTask = CreateBasicTask(jobId, taskIdIndex, MODIFY_TASK, queryString->data); modifyTask->dependedTaskList = NULL; modifyTask->anchorShardId = shardId; - modifyTask->taskPlacementList = insertShardPlacementList; + modifyTask->taskPlacementList = FinalizedShardPlacementList(shardId); modifyTask->upsertQuery = upsertQuery; modifyTask->relationShardList = relationShardList; modifyTask->replicationModel = cacheEntry->replicationModel; @@ -1142,9 +1188,23 @@ AddUninstantiatedEqualityQual(Query *query, Var *partitionColumn) Oid equalsOperator = InvalidOid; Oid greaterOperator = InvalidOid; bool hashable = false; + PartitionColumnEqualityCheckWalker *walker = + palloc0(sizeof(PartitionColumnEqualityCheckWalker)); AssertArg(query->commandType == CMD_SELECT); + walker->partitionColumnEqualityExists = false; + walker->relationPartitionColumn = partitionColumn; + + /* if the query already includes part_column = const, we don't need to add another. In fact, + * adding another equality qual breaks lots of things. + */ + PartitionColumnEqualityWalker((Node *) query->jointree->quals, walker); + if (walker->partitionColumnEqualityExists) + { + return; + } + /* get the necessary equality operator */ get_sort_group_operators(partitionColumn->vartype, false, true, false, &lessThanOperator, &equalsOperator, &greaterOperator, @@ -1186,6 +1246,51 @@ AddUninstantiatedEqualityQual(Query *query, Var *partitionColumn) } +/* + * + */ +static Node * +PartitionColumnEqualityWalker(Node *originalNode, void *context) +{ + PartitionColumnEqualityCheckWalker *walker = + (PartitionColumnEqualityCheckWalker *) context; + + if (originalNode == NULL) + { + return NULL; + } + + if (IsA(originalNode, OpExpr) && list_length(((OpExpr *) originalNode)->args) == 2) + { + OpExpr *op = (OpExpr *) originalNode; + Node *leftop = get_leftop((Expr *) op); + Node *rightop = get_rightop((Expr *) op); + + + if (IsA(leftop, Var) && IsA(rightop, Const) && + OperatorImplementsEquality(op->opno)) + { + if (((Var *) leftop)->varattno == walker->relationPartitionColumn->varattno) + { + walker->partitionColumnEqualityExists = true; + + return NULL; + } + } + else if (IsA(leftop, Const) && IsA(rightop, Var) && + OperatorImplementsEquality(op->opno)) + { + walker->partitionColumnEqualityExists = true; + return NULL; + } + } + + + return expression_tree_mutator(originalNode, PartitionColumnEqualityWalker, + (void *) context); +} + + /* * ModifyQuerySupported returns NULL if the query only contains supported * features, otherwise it returns an error description. @@ -2950,7 +3055,10 @@ CopyRelationRestrictionContext(RelationRestrictionContext *oldContext) static Node * InstantiatePartitionQual(Node *node, void *context) { - ShardInterval *shardInterval = (ShardInterval *) context; + InstantiateQualWalker *instantiateContext = ((InstantiateQualWalker *) context); + ShardInterval *shardInterval = instantiateContext->targetShardInterval; + Var *relationPartitionColumn = instantiateContext->relationPartitionColumn; + Assert(shardInterval->minValueExists); Assert(shardInterval->maxValueExists); @@ -2974,6 +3082,7 @@ InstantiatePartitionQual(Node *node, void *context) Node *leftop = get_leftop((Expr *) op); Node *rightop = get_rightop((Expr *) op); Param *param = NULL; + Var *currentPartitionColumn = NULL; Var *hashedGEColumn = NULL; OpExpr *hashedGEOpExpr = NULL; @@ -2989,13 +3098,15 @@ InstantiatePartitionQual(Node *node, void *context) Oid integer4LEoperatorId = InvalidOid; /* look for the Params */ - if (IsA(leftop, Param)) + if (IsA(leftop, Param) && IsA(rightop, Var)) { param = (Param *) leftop; + currentPartitionColumn = (Var *) rightop; } - else if (IsA(rightop, Param)) + else if (IsA(rightop, Param) & IsA(leftop, Var)) { param = (Param *) rightop; + currentPartitionColumn = (Var *) leftop; } /* not an interesting param for our purpose, so return */ @@ -3004,6 +3115,13 @@ InstantiatePartitionQual(Node *node, void *context) return node; } + /* if the qual is not on the partition column, skip it */ + if (relationPartitionColumn && currentPartitionColumn->varattno != + relationPartitionColumn->varattno) + { + return node; + } + /* get the integer >=, <= operators from the catalog */ integer4GEoperatorId = get_opfamily_member(INTEGER_BTREE_FAM_OID, INT4OID, INT4OID, diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index f28f6f472..9592d1639 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -198,7 +198,6 @@ DEBUG: StartTransactionCommand DEBUG: StartTransaction DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: DEBUG: predicate pruning for shardId 13300000 -DEBUG: predicate pruning for shardId 13300001 DEBUG: predicate pruning for shardId 13300002 DEBUG: predicate pruning for shardId 13300003 DEBUG: Skipping target shard interval 13300004 since SELECT query for it pruned away @@ -207,12 +206,10 @@ DEBUG: predicate pruning for shardId 13300002 DEBUG: predicate pruning for shardId 13300003 DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300005 AS citus_table_alias (user_id, "time") SELECT user_id, "time" FROM public.raw_events_first_13300001 raw_events_first WHERE ((user_id = 7) AND ((hashint4(user_id) >= '-1073741824'::integer) AND (hashint4(user_id) <= '-1'::integer))) DEBUG: predicate pruning for shardId 13300000 -DEBUG: predicate pruning for shardId 13300001 DEBUG: predicate pruning for shardId 13300002 DEBUG: predicate pruning for shardId 13300003 DEBUG: Skipping target shard interval 13300006 since SELECT query for it pruned away DEBUG: predicate pruning for shardId 13300000 -DEBUG: predicate pruning for shardId 13300001 DEBUG: predicate pruning for shardId 13300002 DEBUG: predicate pruning for shardId 13300003 DEBUG: Skipping target shard interval 13300007 since SELECT query for it pruned away @@ -248,17 +245,14 @@ DEBUG: predicate pruning for shardId 13300001 DEBUG: predicate pruning for shardId 13300002 DEBUG: predicate pruning for shardId 13300003 DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300004 AS citus_table_alias (user_id, "time", value_1, value_2, value_3, value_4) SELECT user_id, "time", value_1, value_2, value_3, value_4 FROM public.raw_events_first_13300000 raw_events_first WHERE ((user_id = 8) AND ((hashint4(user_id) >= '-2147483648'::integer) AND (hashint4(user_id) <= '-1073741825'::integer))) -DEBUG: predicate pruning for shardId 13300000 DEBUG: predicate pruning for shardId 13300001 DEBUG: predicate pruning for shardId 13300002 DEBUG: predicate pruning for shardId 13300003 DEBUG: Skipping target shard interval 13300005 since SELECT query for it pruned away -DEBUG: predicate pruning for shardId 13300000 DEBUG: predicate pruning for shardId 13300001 DEBUG: predicate pruning for shardId 13300002 DEBUG: predicate pruning for shardId 13300003 DEBUG: Skipping target shard interval 13300006 since SELECT query for it pruned away -DEBUG: predicate pruning for shardId 13300000 DEBUG: predicate pruning for shardId 13300001 DEBUG: predicate pruning for shardId 13300002 DEBUG: predicate pruning for shardId 13300003 @@ -371,28 +365,9 @@ DEBUG: StartTransactionCommand DEBUG: StartTransaction DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: DEBUG: predicate pruning for shardId 13300000 -DEBUG: predicate pruning for shardId 13300001 DEBUG: predicate pruning for shardId 13300002 -DEBUG: predicate pruning for shardId 13300003 -DEBUG: Skipping target shard interval 13300004 since SELECT query for it pruned away -DEBUG: predicate pruning for shardId 13300000 -DEBUG: predicate pruning for shardId 13300002 -DEBUG: predicate pruning for shardId 13300003 -DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300005 AS citus_table_alias (user_id, value_1, value_3) SELECT user_id, value_1, value_3 FROM public.raw_events_first_13300001 raw_events_first WHERE (((user_id = 9) OR (user_id = 16)) AND ((hashint4(user_id) >= '-1073741824'::integer) AND (hashint4(user_id) <= '-1'::integer))) RETURNING citus_table_alias.user_id, citus_table_alias."time", citus_table_alias.value_1, citus_table_alias.value_2, citus_table_alias.value_3, citus_table_alias.value_4 -DEBUG: predicate pruning for shardId 13300000 -DEBUG: predicate pruning for shardId 13300001 -DEBUG: predicate pruning for shardId 13300002 -DEBUG: predicate pruning for shardId 13300003 -DEBUG: Skipping target shard interval 13300006 since SELECT query for it pruned away -DEBUG: predicate pruning for shardId 13300000 -DEBUG: predicate pruning for shardId 13300001 -DEBUG: predicate pruning for shardId 13300002 -DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300007 AS citus_table_alias (user_id, value_1, value_3) SELECT user_id, value_1, value_3 FROM public.raw_events_first_13300003 raw_events_first WHERE (((user_id = 9) OR (user_id = 16)) AND ((hashint4(user_id) >= 1073741824) AND (hashint4(user_id) <= 2147483647))) RETURNING citus_table_alias.user_id, citus_table_alias."time", citus_table_alias.value_1, citus_table_alias.value_2, citus_table_alias.value_3, citus_table_alias.value_4 -DEBUG: ProcessQuery -DEBUG: Plan is router executable -ERROR: duplicate key value violates unique constraint "raw_events_second_user_id_value_1_key_13300007" -DETAIL: Key (user_id, value_1)=(9, 90) already exists. -CONTEXT: while executing command on localhost:57638 +ERROR: cannot perform distributed planning for the given modification +DETAIL: Select query cannot be pushed down to the worker. -- now do some aggregations INSERT INTO agg_events SELECT @@ -722,21 +697,21 @@ DEBUG: predicate pruning for shardId 13300002 DEBUG: predicate pruning for shardId 13300004 DEBUG: predicate pruning for shardId 13300005 DEBUG: predicate pruning for shardId 13300006 -DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300004 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300003 raw_events_first WHERE ((user_id IN (SELECT raw_events_second.user_id FROM public.raw_events_second_13300007 raw_events_second WHERE (raw_events_second.user_id = 2))) AND ((hashint4(user_id) >= '-2147483648'::integer) AND (hashint4(user_id) <= '-1073741825'::integer))) +DEBUG: Skipping target shard interval 13300004 since SELECT query for it pruned away DEBUG: predicate pruning for shardId 13300000 DEBUG: predicate pruning for shardId 13300001 DEBUG: predicate pruning for shardId 13300002 DEBUG: predicate pruning for shardId 13300004 DEBUG: predicate pruning for shardId 13300005 DEBUG: predicate pruning for shardId 13300006 -DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300005 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300003 raw_events_first WHERE ((user_id IN (SELECT raw_events_second.user_id FROM public.raw_events_second_13300007 raw_events_second WHERE (raw_events_second.user_id = 2))) AND ((hashint4(user_id) >= '-1073741824'::integer) AND (hashint4(user_id) <= '-1'::integer))) +DEBUG: Skipping target shard interval 13300005 since SELECT query for it pruned away DEBUG: predicate pruning for shardId 13300000 DEBUG: predicate pruning for shardId 13300001 DEBUG: predicate pruning for shardId 13300002 DEBUG: predicate pruning for shardId 13300004 DEBUG: predicate pruning for shardId 13300005 DEBUG: predicate pruning for shardId 13300006 -DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300006 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300003 raw_events_first WHERE ((user_id IN (SELECT raw_events_second.user_id FROM public.raw_events_second_13300007 raw_events_second WHERE (raw_events_second.user_id = 2))) AND ((hashint4(user_id) >= 0) AND (hashint4(user_id) <= 1073741823))) +DEBUG: Skipping target shard interval 13300006 since SELECT query for it pruned away DEBUG: predicate pruning for shardId 13300000 DEBUG: predicate pruning for shardId 13300001 DEBUG: predicate pruning for shardId 13300002 @@ -1555,17 +1530,14 @@ DEBUG: predicate pruning for shardId 13300005 DEBUG: predicate pruning for shardId 13300006 DEBUG: predicate pruning for shardId 13300007 DEBUG: distributed statement: INSERT INTO public.raw_events_first_13300000 AS citus_table_alias (user_id, "time", value_1, value_2, value_3, value_4) SELECT user_id, "time", value_1, value_2, value_3, value_4 FROM public.raw_events_second_13300004 raw_events_second WHERE ((user_id = 5) AND ((hashint4(user_id) >= '-2147483648'::integer) AND (hashint4(user_id) <= '-1073741825'::integer))) -DEBUG: predicate pruning for shardId 13300004 DEBUG: predicate pruning for shardId 13300005 DEBUG: predicate pruning for shardId 13300006 DEBUG: predicate pruning for shardId 13300007 DEBUG: Skipping target shard interval 13300001 since SELECT query for it pruned away -DEBUG: predicate pruning for shardId 13300004 DEBUG: predicate pruning for shardId 13300005 DEBUG: predicate pruning for shardId 13300006 DEBUG: predicate pruning for shardId 13300007 DEBUG: Skipping target shard interval 13300002 since SELECT query for it pruned away -DEBUG: predicate pruning for shardId 13300004 DEBUG: predicate pruning for shardId 13300005 DEBUG: predicate pruning for shardId 13300006 DEBUG: predicate pruning for shardId 13300007 @@ -1614,12 +1586,10 @@ DEBUG: StartTransaction DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: DEBUG: predicate pruning for shardId 13300004 DEBUG: predicate pruning for shardId 13300005 -DEBUG: predicate pruning for shardId 13300006 DEBUG: predicate pruning for shardId 13300007 DEBUG: Skipping target shard interval 13300000 since SELECT query for it pruned away DEBUG: predicate pruning for shardId 13300004 DEBUG: predicate pruning for shardId 13300005 -DEBUG: predicate pruning for shardId 13300006 DEBUG: predicate pruning for shardId 13300007 DEBUG: Skipping target shard interval 13300001 since SELECT query for it pruned away DEBUG: predicate pruning for shardId 13300004 @@ -1628,7 +1598,6 @@ DEBUG: predicate pruning for shardId 13300007 DEBUG: distributed statement: INSERT INTO public.raw_events_first_13300002 AS citus_table_alias (user_id, "time", value_1, value_2, value_3, value_4) SELECT user_id, "time", value_1, value_2, value_3, value_4 FROM public.raw_events_second_13300006 raw_events_second WHERE ((user_id = 6) AND ((hashint4(user_id) >= 0) AND (hashint4(user_id) <= 1073741823))) DEBUG: predicate pruning for shardId 13300004 DEBUG: predicate pruning for shardId 13300005 -DEBUG: predicate pruning for shardId 13300006 DEBUG: predicate pruning for shardId 13300007 DEBUG: Skipping target shard interval 13300003 since SELECT query for it pruned away DEBUG: ProcessQuery @@ -1695,17 +1664,14 @@ DEBUG: predicate pruning for shardId 13300005 DEBUG: predicate pruning for shardId 13300006 DEBUG: predicate pruning for shardId 13300007 DEBUG: distributed statement: INSERT INTO public.raw_events_first_13300000 AS citus_table_alias (user_id, "time", value_1, value_2, value_3, value_4) SELECT user_id, "time", value_1, value_2, value_3, value_4 FROM public.raw_events_second_13300004 raw_events_second WHERE ((user_id = 5) AND ((hashint4(user_id) >= '-2147483648'::integer) AND (hashint4(user_id) <= '-1073741825'::integer))) -DEBUG: predicate pruning for shardId 13300004 DEBUG: predicate pruning for shardId 13300005 DEBUG: predicate pruning for shardId 13300006 DEBUG: predicate pruning for shardId 13300007 DEBUG: Skipping target shard interval 13300001 since SELECT query for it pruned away -DEBUG: predicate pruning for shardId 13300004 DEBUG: predicate pruning for shardId 13300005 DEBUG: predicate pruning for shardId 13300006 DEBUG: predicate pruning for shardId 13300007 DEBUG: Skipping target shard interval 13300002 since SELECT query for it pruned away -DEBUG: predicate pruning for shardId 13300004 DEBUG: predicate pruning for shardId 13300005 DEBUG: predicate pruning for shardId 13300006 DEBUG: predicate pruning for shardId 13300007