diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index f2976e3da..9b20f1c9c 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -130,8 +130,6 @@ static OperatorCacheEntry * LookupOperatorByType(Oid typeId, Oid accessMethodId, int16 strategyNumber); static Oid GetOperatorByType(Oid typeId, Oid accessMethodId, int16 strategyNumber); static Node * HashableClauseMutator(Node *originalNode, Var *partitionColumn); -static Var * MakeInt4Column(void); -static Const * MakeInt4Constant(Datum constantValue); static OpExpr * MakeHashedOperatorExpression(OpExpr *operatorExpression); static List * BuildRestrictInfoList(List *qualList); static List * FragmentCombinationList(List *rangeTableFragmentsList, Query *jobQuery, @@ -3015,7 +3013,7 @@ MakeHashedOperatorExpression(OpExpr *operatorExpression) * MakeInt4Column creates a column of int4 type with invalid table id and max * attribute number. */ -static Var * +Var * MakeInt4Column() { Index tableId = 0; @@ -3035,7 +3033,7 @@ MakeInt4Column() * MakeInt4Constant creates a new constant of int4 type and assigns the given * value as a constant value. */ -static Const * +Const * MakeInt4Constant(Datum constantValue) { Oid constantType = INT4OID; diff --git a/src/backend/distributed/planner/multi_planner.c b/src/backend/distributed/planner/multi_planner.c index 51b5f14c0..1e1cda05b 100644 --- a/src/backend/distributed/planner/multi_planner.c +++ b/src/backend/distributed/planner/multi_planner.c @@ -62,22 +62,22 @@ multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) /* * We implement INSERT INTO .. SELECT by pushing down the SELECT to - * each shard. That requires that the SELECT is co-located with the - * target table. To compute that we use the router planner, by adding + * each shard. To compute that we use the router planner, by adding * a "hidden" constraint that the partition column be equal to a * certain value. standard_planner() distributes that constraint to - * all affected table's baserestrictinfos. The router planner then - * iterates over the target table's shards, for each we replace the - * "hidden" restriction, with one that PruneShardList() handles, and - * then generate a query for that individual shard. If any of the - * involved tables don't prune down to a single shard, or if the - * pruned shards aren't colocated, we error out. + * the baserestrictinfos of the tables that they are connected + * via equi joins. * - * TODO: we currently not support CTEs. + * The router planner then iterates over the target table's shards, + * for each we replace the "hidden" restriction, with one that + * PruneShardList() handles, and then generate a query for that + * individual shard. If any of the involved tables don't prune down + * to a single shard, or if the pruned shards aren't colocated, + * we error out. */ - if (InsertSelectQuery(parse) && parse->cteList == NULL) + if (InsertSelectQuery(parse)) { - AddHiddenPartitionColumnParameter(parse); + AddHiddenPartitionColumnEqualityQual(parse); } } diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 9ad4ad18e..2bd5fb59c 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -74,6 +74,11 @@ static MultiPlan * CreateSingleTaskRouterPlan(Query *originalQuery, Query *query static MultiPlan * CreateMultiTaskRouterPlan(Query *originalQuery, Query *query, RelationRestrictionContext * restrictionContext); +static Task * CreateMultipleTaskRouterModifyTask(Query *originalQuery, Query *query, + ShardInterval *shardInterval, + RelationRestrictionContext * + restrictionContext, + uint32 taskIdIndex); static bool MasterIrreducibleExpression(Node *expression, bool *varArgument, bool *badCoalesce); static bool MasterIrreducibleExpressionWalker(Node *expression, WalkerState *state); @@ -91,9 +96,9 @@ static Const * ExtractInsertPartitionValue(Query *query, Var *partitionColumn); static Task * RouterSelectTask(Query *originalQuery, Query *query, RelationRestrictionContext *restrictionContext, List **placementList); -static Query * RouterSelectQuery(Query *originalQuery, Query *query, - RelationRestrictionContext *restrictionContext, - List **placementList, uint64 *anchorShardId); +static bool RouterSelectQuery(Query *originalQuery, Query *query, + RelationRestrictionContext *restrictionContext, + List **placementList, uint64 *anchorShardId); static List * TargetShardIntervalsForSelect(Query *query, RelationRestrictionContext *restrictionContext); static List * WorkersContainingAllShards(List *prunedShardIntervalsList); @@ -105,15 +110,17 @@ static bool MultiRouterPlannableQuery(Query *query, MultiExecutorType taskExecut RelationRestrictionContext *restrictionContext); static RelationRestrictionContext * copyRelationRestrictionContext( RelationRestrictionContext *oldContext); -static Node * ReplaceHiddenParameter(Node *node, void *context); -static Var * MakeInt4Column(void); -static Const * MakeInt4Constant(Datum constantValue); -static void ErrorIfInsertSelectQueryNotSupported(Query *queryTree); +static Node * ReplaceHiddenQual(Node *node, void *context); +static void ErrorIfInsertSelectQueryNotSupported(Query *queryTree, + RangeTblEntry *insertRte, + RangeTblEntry *subqueryRte); static void ErrorIfMultiTaskRouterSelectQueryUnsupported(Query *query); -static void ErrorIfNotAllParticipatingTablesAreColocated(Query *query); -static void ErrorIfInsertPartitionColumnDoesNotMatchSelect(Query *query); -static Query * FirstQueryReferencingDistributedTable(Query *query); -static void AddHiddenParameterToFirstTable(Query *query); +static void ErrorIfInsertPartitionColumnDoesNotMatchSelect(Query *query, + RangeTblEntry *insertRte, + RangeTblEntry *subqueryRte, + Oid * + selectPartitionColumnTableId); +static void AddHiddenEqualityQual(Query *query, Var *targetPartitionColumnVar); /* @@ -201,7 +208,7 @@ CreateSingleTaskRouterPlan(Query *originalQuery, Query *query, /* - * Creates a router plan for INSERT ... SELECT queries which can consists of + * Creates a router plan for INSERT ... SELECT queries which could consists of * multiple tasks. * * The function never returns NULL, it errors out if cannot create the multi plan. @@ -210,125 +217,52 @@ static MultiPlan * CreateMultiTaskRouterPlan(Query *originalQuery, Query *query, RelationRestrictionContext *restrictionContext) { - Oid distributedTableId = ExtractFirstDistributedTableId(query); - DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); int shardOffset = 0; - int shardCount = cacheEntry->shardIntervalArrayLength; List *sqlTaskList = NIL; uint32 taskIdIndex = 1; /* 0 is reserved for invalid taskId */ Job *workerJob = NULL; uint64 jobId = INVALID_JOB_ID; MultiPlan *multiPlan = NULL; + RangeTblEntry *insertRte = linitial(query->rtable); + RangeTblEntry *subqueryRte = lsecond(query->rtable); + Oid targetRelationId = insertRte->relid; + DistTableCacheEntry *targetCacheEntry = DistributedTableCacheEntry(targetRelationId); + int shardCount = targetCacheEntry->shardIntervalArrayLength; /* * Error semantics for INSERT ... SELECT queries are different than regular * modify queries. Thus, handle separately. */ - ErrorIfInsertSelectQueryNotSupported(originalQuery); + ErrorIfInsertSelectQueryNotSupported(originalQuery, insertRte, subqueryRte); /* * Plan select query for each shard in the target table. Do so by - * replacing the magic parameters added in multi_planner() with actual - * current shard's boundary values. Then perform the normal shard - * pruning. + * replacing the partitioning qual parameter added in multi_planner() + * with actual current shard's boundary values. Then perform the normal + * shard pruning. */ for (shardOffset = 0; shardOffset < shardCount; shardOffset++) { - Query *subquery = ((RangeTblEntry *) list_nth(query->rtable, 1))->subquery; - Query *copiedOriginal = copyObject(originalQuery); - Query *originalSubquery = ((RangeTblEntry *) list_nth(copiedOriginal->rtable, - 1))->subquery; - RelationRestrictionContext *copiedRestrictionContext = - copyRelationRestrictionContext(restrictionContext); - ShardInterval *shardInterval = - cacheEntry->sortedShardIntervalArray[shardOffset]; - uint64 shardId = shardInterval->shardId; - StringInfo queryString = makeStringInfo(); - ListCell *restrictionCell = NULL; - Query *routerQuery = NULL; - Task *sqlTask = NULL; - List *selectPlacementList = NIL; - uint64 selectAnchorShardId = INVALID_SHARD_ID; - List *insertShardPlacementList = NULL; - List *intersectedPlacementList = NULL; - RangeTblEntry *rangeTableEntry = NULL; + ShardInterval *targetShardInterval = + targetCacheEntry->sortedShardIntervalArray[shardOffset]; + Task *modifyTask = NULL; - /* - * Replace the magic value in all baserestrictinfos. Note that - * this has to be done on a copy, as the walker modifies in place. - */ - foreach(restrictionCell, copiedRestrictionContext->relationRestrictionList) + modifyTask = CreateMultipleTaskRouterModifyTask(originalQuery, query, + targetShardInterval, + restrictionContext, + taskIdIndex); + + /* add the task if it could be created */ + if (modifyTask != NULL) { - RelationRestriction *restriction = lfirst(restrictionCell); - - restriction->relOptInfo->baserestrictinfo = (List *) - ReplaceHiddenParameter( - (Node *) restriction->relOptInfo->baserestrictinfo, - shardInterval); + sqlTaskList = lappend(sqlTaskList, modifyTask); } - /* - * Use select planner to generate query for this specific - * shard. We don't use the generated query, just rely on the - * side-effect that all RTEs have been updated to point to the - * relevant nodes. - */ - routerQuery = RouterSelectQuery(originalSubquery, subquery, - copiedRestrictionContext, &selectPlacementList, - &selectAnchorShardId); - - if (routerQuery == NULL) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot perform distributed planning for the given " - "modification"), - errdetail( - "Select query cannot be pushed down to the worker."))); - } - - /* Ensure that we have INSERTed table's placement exists on the same worker */ - insertShardPlacementList = ShardPlacementList(shardId); - intersectedPlacementList = IntersectPlacementList(insertShardPlacementList, - selectPlacementList); - - if (list_length(insertShardPlacementList) != list_length( - intersectedPlacementList)) - { - ereport(DEBUG2, (errmsg("skipping the task"), - errdetail("Insert query hits %d placements, Select query " - "hits %d placements and only %d of those placements match.", - list_length(insertShardPlacementList), list_length( - selectPlacementList), - list_length(intersectedPlacementList)))); - - continue; - } - - /* this is required for correct deparsing of the query */ - ReorderInsertSelectTargetListsIfExists(copiedOriginal); - - /* setting an alias simplifies deparsing of RETURNING */ - rangeTableEntry = linitial(copiedOriginal->rtable); - if (rangeTableEntry->alias == NULL) - { - Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL); - rangeTableEntry->alias = alias; - } - - /* and generate the full query string */ - deparse_shard_query(copiedOriginal, distributedTableId, shardInterval->shardId, - queryString); - ereport(DEBUG4, (errmsg("distributed statement: %s", queryString->data))); - - - sqlTask = CreateBasicTask(jobId, taskIdIndex++, MODIFY_TASK, queryString->data); - sqlTask->dependedTaskList = NULL; - sqlTask->anchorShardId = shardId; - sqlTask->taskPlacementList = insertShardPlacementList; - - sqlTaskList = lappend(sqlTaskList, sqlTask); + ++taskIdIndex; } + /* there should be a least a single task */ + Assert(sqlTaskList != NIL); /* Create the worker job */ workerJob = CitusMakeNode(Job); @@ -349,25 +283,135 @@ CreateMultiTaskRouterPlan(Query *originalQuery, Query *query, } +/* + * CreateMultipleTaskRouterModifyTask creates a modify task by + * replacing the partitioning qual parameter added in multi_planner() + * with the shardInterval's boundary value. Then perform the normal + * shard pruning on the subquery. Finally, checks if the target shardInterval + * has exactly same placements with the select task's available anchor + * placements. + * + * The function errors out if the subquery is not router select query (i.e., + * subqueries with non euqi-joins.). + */ +static Task * +CreateMultipleTaskRouterModifyTask(Query *originalQuery, Query *query, + ShardInterval *shardInterval, + RelationRestrictionContext *restrictionContext, + uint32 taskIdIndex) +{ + RangeTblEntry *subqueryRte = lsecond(query->rtable); + Query *subquery = subqueryRte->subquery; + + Query *copiedQuery = copyObject(originalQuery); + RangeTblEntry *copiedInsertRte = linitial(copiedQuery->rtable); + RangeTblEntry *copiedSubqueryRte = lsecond(copiedQuery->rtable); + Query *copiedSubquery = (Query *) copiedSubqueryRte->subquery; + + uint64 shardId = shardInterval->shardId; + Oid distributedTableId = shardInterval->relationId; + + RelationRestrictionContext *copiedRestrictionContext = + copyRelationRestrictionContext(restrictionContext); + + StringInfo queryString = makeStringInfo(); + ListCell *restrictionCell = NULL; + Task *modifyTask = NULL; + List *selectPlacementList = NIL; + uint64 selectAnchorShardId = INVALID_SHARD_ID; + uint64 jobId = INVALID_JOB_ID; + List *insertShardPlacementList = NULL; + List *intersectedPlacementList = NULL; + bool queryRoutable = false; + + /* + * Replace the partitioning qual parameter value in all baserestrictinfos. + * Note that this has to be done on a copy, as the walker modifies in place. + */ + foreach(restrictionCell, copiedRestrictionContext->relationRestrictionList) + { + RelationRestriction *restriction = lfirst(restrictionCell); + List *originalBaserestrictInfo = restriction->relOptInfo->baserestrictinfo; + + originalBaserestrictInfo = + (List *) ReplaceHiddenQual((Node *) originalBaserestrictInfo, + shardInterval); + } + + /* + * Use router select planner to decide on whether we can push down the query + * or not. If we can, we also rely on the side-effects that all RTEs have been + * updated to point to the relevant nodes and selectPlacementList is determined. + */ + queryRoutable = RouterSelectQuery(copiedSubquery, subquery, + copiedRestrictionContext, &selectPlacementList, + &selectAnchorShardId); + + if (!queryRoutable) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot perform distributed planning for the given " + "modification"), + errdetail("Select query cannot be pushed down to the worker."))); + } + + /* Ensure that we have INSERTed table's placement exists on the same worker */ + insertShardPlacementList = ShardPlacementList(shardId); + intersectedPlacementList = IntersectPlacementList(insertShardPlacementList, + selectPlacementList); + + if (list_length(insertShardPlacementList) != list_length(intersectedPlacementList)) + { + ereport(DEBUG2, (errmsg("skipping the task"), + errdetail("Insert query hits %d placements, Select query " + "hits %d placements and only %d of those placements match.", + list_length(insertShardPlacementList), + list_length(selectPlacementList), + list_length(intersectedPlacementList)))); + + return NULL; + } + + /* this is required for correct deparsing of the query */ + ReorderInsertSelectTargetLists(copiedQuery, copiedInsertRte, copiedSubqueryRte); + + /* setting an alias simplifies deparsing of RETURNING */ + if (copiedInsertRte->alias == NULL) + { + Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL); + copiedInsertRte->alias = alias; + } + + /* and generate the full query string */ + deparse_shard_query(copiedQuery, distributedTableId, shardInterval->shardId, + queryString); + ereport(DEBUG4, (errmsg("distributed statement: %s", queryString->data))); + + modifyTask = CreateBasicTask(jobId, taskIdIndex, MODIFY_TASK, queryString->data); + modifyTask->dependedTaskList = NULL; + modifyTask->anchorShardId = shardId; + modifyTask->taskPlacementList = insertShardPlacementList; + + return modifyTask; +} + + /* * ErrorIfInsertSelectQueryNotSupported errors out for unsupported * INSERT ... SELECT queries. */ static void -ErrorIfInsertSelectQueryNotSupported(Query *queryTree) +ErrorIfInsertSelectQueryNotSupported(Query *queryTree, RangeTblEntry *insertRte, + RangeTblEntry *subqueryRte) { - RangeTblEntry *subqueryRte = NULL; Query *subquery = NULL; + Oid selectPartitionColumnTableId = InvalidOid; /* we only do this check for INSERT ... SELECT queries */ AssertArg(InsertSelectQuery(queryTree)); - subqueryRte = lsecond(queryTree->rtable); subquery = subqueryRte->subquery; - /* we support this feature only for colocated tables */ - ErrorIfNotAllParticipatingTablesAreColocated(queryTree); - if (contain_mutable_functions((Node *) queryTree)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), @@ -383,16 +427,25 @@ ErrorIfInsertSelectQueryNotSupported(Query *queryTree) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot perform distributed planning for the given " "modification"), - errdetail( - "Common table expressions are not allowed in INSERT ... SELECT " - "queries"))); + errdetail("Common table expressions are not allowed in " + "INSERT ... SELECT queries"))); } /* we don't support LIMIT, OFFSET and WINDOW functions */ ErrorIfMultiTaskRouterSelectQueryUnsupported(subquery); /* ensure that INSERT's partition column comes from SELECT's partition column */ - ErrorIfInsertPartitionColumnDoesNotMatchSelect(queryTree); + ErrorIfInsertPartitionColumnDoesNotMatchSelect(queryTree, insertRte, subqueryRte, + &selectPartitionColumnTableId); + + /* we expect partition column values come from colocated tables */ + if (!TablesColocated(insertRte->relid, selectPartitionColumnTableId)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("INSERT target table and the source relation " + "of the SELECT partition column value " + "must be colocated"))); + } } @@ -419,9 +472,8 @@ ErrorIfMultiTaskRouterSelectQueryUnsupported(Query *query) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot perform distributed planning for the given " "modification"), - errdetail( - "LIMIT clauses are not allowed in INSERT ... SELECT " - "queries"))); + errdetail("LIMIT clauses are not allowed in " + "INSERT ... SELECT queries"))); } if (subquery->limitOffset != NULL) @@ -429,11 +481,8 @@ ErrorIfMultiTaskRouterSelectQueryUnsupported(Query *query) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot perform distributed planning for the given " "modification"), - errdetail( - "OFFSET clauses are not allowed in INSERT ... SELECT " - "queries") - - )); + errdetail("OFFSET clauses are not allowed in " + "INSERT ... SELECT queries"))); } if (subquery->windowClause != NULL) @@ -441,84 +490,41 @@ ErrorIfMultiTaskRouterSelectQueryUnsupported(Query *query) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot perform distributed planning for the given " "modification"), - errdetail( - "Window functions are not allowed in INSERT ... SELECT " - "queries"))); + errdetail("Window functions are not allowed in " + "INSERT ... SELECT queries"))); } + /* see comment on AddHiddenPartitionColumnEqualityQual() */ if (subquery->setOperations != NULL) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot perform distributed planning for the given " "modification"), - errdetail( - "Union, Intersect and Except are currently unsupported are not allowed in INSERT ... SELECT " - "queries"))); + errdetail("Set operations are not allowed in " + "INSERT ... SELECT queries"))); } } } -/* - * ErrorIfNotAllParticipatingTablesAreColocated errors out of all tables - * referenced in the query are not colocated. - */ -static void -ErrorIfNotAllParticipatingTablesAreColocated(Query *query) -{ - List *relationIdList = RelationIdList(query); - ListCell *relationIdCell = NULL; - uint64 colocationId = INVALID_COLOCATION_ID; - bool tablesAreColocated = true; - - foreach(relationIdCell, relationIdList) - { - Oid relationId = lfirst_oid(relationIdCell); - uint64 currentColocationId = TableColocationId(relationId); - - if (currentColocationId == INVALID_COLOCATION_ID) - { - tablesAreColocated = false; - break; - } - - /* set for the first table */ - if (colocationId == INVALID_COLOCATION_ID) - { - colocationId = currentColocationId; - } - - if (colocationId != currentColocationId) - { - tablesAreColocated = false; - break; - } - } - - - if (!tablesAreColocated) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("all participating tables should be colocated"))); - } -} - - /* * ErrorIfInsertPartitionColumnDoesNotMatchSelect checks whether the INSERTed table's * partition column value matches with the any of the SELECTed table's partition column. + * + * On return without error (i.e., if partition columns match), the function also sets + * selectPartitionColumnTableId. */ static void -ErrorIfInsertPartitionColumnDoesNotMatchSelect(Query *query) +ErrorIfInsertPartitionColumnDoesNotMatchSelect(Query *query, RangeTblEntry *insertRte, + RangeTblEntry *subqueryRte, + Oid *selectPartitionColumnTableId) { ListCell *targetEntryCell = NULL; uint32 rangeTableId = 1; - RangeTblEntry *insertRte = linitial(query->rtable); - RangeTblEntry *subqueryRte = lsecond(query->rtable); - Query *subquery = subqueryRte->subquery; Oid insertRelationId = insertRte->relid; Var *insertPartitionColumn = PartitionColumn(insertRelationId, rangeTableId); bool partitionColumnsMatch = false; + Query *subquery = subqueryRte->subquery; foreach(targetEntryCell, query->targetList) { @@ -552,6 +558,8 @@ ErrorIfInsertPartitionColumnDoesNotMatchSelect(Query *query) } partitionColumnsMatch = true; + *selectPartitionColumnTableId = subqeryTargetEntry->resorigtbl; + break; } } @@ -567,83 +575,100 @@ ErrorIfInsertPartitionColumnDoesNotMatchSelect(Query *query) /* - * AddHiddenPartitionColumnParameter() can only be used with + * AddHiddenPartitionColumnEqualityQual() can only be used with * INSERT ... SELECT queries. * - * If the input query is not INSERT .. SELECT the function errors-out. + * AddHiddenPartitionColumnEqualityQual adds a hidden equality qual + * to the SELECT query of the given originalQuery. The function currently + * does NOT add hidden quals if + * (i) CTEs are present on the top level query + * (ii) Set operations are present on the top level query + * (iii) Target list does not include a bare partition column. + * + * Note that if the input query is not an INSERT .. SELECT the assertion fails. */ void -AddHiddenPartitionColumnParameter(Query *originalQuery) +AddHiddenPartitionColumnEqualityQual(Query *originalQuery) { Query *subquery = NULL; - Query *referencedSubquery = NULL; RangeTblEntry *subqueryEntry = NULL; + ListCell *targetEntryCell = NULL; + Var *targetPartitionColumnVar = NULL; + List *targetList = NULL; - if (!InsertSelectQuery(originalQuery)) + Assert(InsertSelectQuery(originalQuery)); + + /* we currently do not support CTEs */ + if (originalQuery->cteList != NULL) { - elog(ERROR, "Only INSERT .. SELECT queries can be modified"); - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot perform distributed planning for the given" - " modification"))); + return; } /* TODO: once CTEs are present, this does not work */ subqueryEntry = (RangeTblEntry *) list_nth(originalQuery->rtable, 1); subquery = subqueryEntry->subquery; - referencedSubquery = FirstQueryReferencingDistributedTable(subquery); - - AddHiddenParameterToFirstTable(referencedSubquery); -} - - -static Query * -FirstQueryReferencingDistributedTable(Query *query) -{ - List *queryList = NIL; - ListCell *queryCell = NULL; - Query *subquery = NULL; - - ExtractQueryWalker((Node *) query, &queryList); - - /* iterate and find the query which references to distributed table */ - foreach(queryCell, queryList) + /* + * We currently not support the subquery with set operations for three reasons. + * + * (i) Adding only a single qual where there are more than one join trees + * leads to an assertion failure on the standard planner (i.e., + * Assert(parse->jointree->quals == NULL); on plan_set_operations()). + * [THE ABOVE COMMENT IS TO EASE THE REVIEW, REMOVE LATER ON] + * + * (ii) There are potentially multiple jointree quals that we need to add + * the hidden qual, and we haven't implemented that logic yet. + * + * (iii) We cannot get the source tables OID via target entries resorigtbl field. + * This makes hard to check the colocation requirement of the source and target + * tables. + * + * Note that we do not allow set operations on the lower level's of the subquery + * as well, which is handled on ErrorIfMultiTaskRouterSelectQueryUnsupported(). + */ + if (subquery->setOperations != NULL) { - Query *innerSubquery = (Query *) lfirst(queryCell); + return; + } - List *rangeTableList = NIL; - ListCell *rangeTableCell = NULL; + /* iterate through the target list and find the partition column on the target list */ + targetList = subquery->targetList; + foreach(targetEntryCell, targetList) + { + TargetEntry *targetEntry = lfirst(targetEntryCell); - /* extract range table entries */ - ExtractRangeTableEntryWalker((Node *) innerSubquery, &rangeTableList); - foreach(rangeTableCell, rangeTableList) + if (IsPartitionColumnRecursive(targetEntry->expr, subquery) && + IsA(targetEntry->expr, Var)) { - RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); - - if (IsDistributedTable(rangeTableEntry->relid)) - { - subquery = innerSubquery; - break; - } + targetPartitionColumnVar = (Var *) targetEntry->expr; + break; } } - return subquery; + /* + * If we cannot find the bare partition column, no need to add the qual since + * we're already going to error out on the multi planner. + */ + if (!targetPartitionColumnVar) + { + return; + } + + /* finally add the hidden equality qual of target column to subquery */ + AddHiddenEqualityQual(subquery, targetPartitionColumnVar); } /* - * AddHiddenParameterToFirstTable adds a hidden parameter - * ($1 = partitionColumn) for the first table on the query. + * AddHiddenEqualityQual adds a hidden qual in the following form ($1 = partitionColumn) + * on the input query and partitionColumn. */ static void -AddHiddenParameterToFirstTable(Query *query) +AddHiddenEqualityQual(Query *query, Var *partitionColumn) { - Param *hiddenParam = makeNode(Param); - Node *hiddenBound = NULL; - Oid firstTableId = ExtractFirstDistributedTableId(query); - Var *partitionColumn = PartitionColumn(firstTableId, 1); - Oid partitionColumnCollid = partitionColumn->varcollid; + Param *equalityParameter = makeNode(Param); + Node *hiddenEqualityQual = NULL; + Oid partitionColumnCollid = InvalidOid; Oid lessThanOperator = InvalidOid; Oid equalsOperator = InvalidOid; Oid greaterOperator = InvalidOid; @@ -651,37 +676,37 @@ AddHiddenParameterToFirstTable(Query *query) AssertArg(query->commandType == CMD_SELECT); - hiddenParam->paramkind = PARAM_EXTERN; - hiddenParam->paramid = HIDDEN_PARAMETER_ID; - hiddenParam->paramtype = partitionColumn->vartype; - hiddenParam->paramtypmod = partitionColumn->vartypmod; - hiddenParam->paramcollid = partitionColumnCollid; - hiddenParam->location = -1; - - get_sort_group_operators(partitionColumn->vartype, true, true, true, + /* get the necessary equality operator */ + get_sort_group_operators(partitionColumn->vartype, false, true, false, &lessThanOperator, &equalsOperator, &greaterOperator, &hashable); - /* - * XXX: Using an equality constraint here isn't exactly correct, - * might want to replace it with >= and <=. - * - * It looks like this works. - */ - hiddenBound = (Node *) - make_opclause(equalsOperator, InvalidOid, false, - (Expr *) hiddenParam, (Expr *) partitionColumn, - partitionColumnCollid, partitionColumnCollid); + + partitionColumnCollid = partitionColumn->varcollid; + + equalityParameter->paramkind = PARAM_EXTERN; + equalityParameter->paramid = HIDDEN_PARAMETER_ID; + equalityParameter->paramtype = partitionColumn->vartype; + equalityParameter->paramtypmod = partitionColumn->vartypmod; + equalityParameter->paramcollid = partitionColumnCollid; + equalityParameter->location = -1; + + /* create a hidden equality on the on the target partition column */ + hiddenEqualityQual = (Node *) + make_opclause(equalsOperator, InvalidOid, false, + (Expr *) partitionColumn, + (Expr *) equalityParameter, + partitionColumnCollid, partitionColumnCollid); /* add restriction on partition column */ if (query->jointree->quals == NULL) { - query->jointree->quals = hiddenBound; + query->jointree->quals = hiddenEqualityQual; } else { query->jointree->quals = make_and_qual(query->jointree->quals, - hiddenBound); + hiddenEqualityQual); } } @@ -1569,15 +1594,16 @@ RouterSelectTask(Query *originalQuery, Query *query, List **placementList) { Task *task = NULL; + bool queryRoutable = false; StringInfo queryString = makeStringInfo(); bool upsertQuery = false; uint64 shardId = INVALID_SHARD_ID; - originalQuery = RouterSelectQuery(originalQuery, query, restrictionContext, + queryRoutable = RouterSelectQuery(originalQuery, query, restrictionContext, placementList, &shardId); - if (originalQuery == NULL) + if (!queryRoutable) { return NULL; } @@ -1593,14 +1619,20 @@ RouterSelectTask(Query *originalQuery, Query *query, task->dependedTaskList = NIL; task->upsertQuery = upsertQuery; - /* task->requiresMasterEvaluation = false; */ - return task; } -/* RouterSelectQuery builds a Task to represent a single shard select query */ -static Query * +/* + * RouterSelectQuery returns true if the input query can be pushed down to the + * worker node as it is. Otherwise, the function returns false. + * + * On return true, all RTEs have been updated to point to the relevant nodes in + * the originalQuery. Also, placementList is filled with the list of worker nodes + * that has all the required shard placements for the query execution. Finally, + * anchorShardId is set to the first pruned shardId of the given query. + */ +static bool RouterSelectQuery(Query *originalQuery, Query *query, RelationRestrictionContext *restrictionContext, List **placementList, uint64 *anchorShardId) @@ -1612,12 +1644,13 @@ RouterSelectQuery(Query *originalQuery, Query *query, ListCell *prunedRelationShardListCell = NULL; List *workerList = NIL; bool shardsPresent = false; + bool queryRoutable = false; *placementList = NIL; if (prunedRelationShardList == NULL) { - return NULL; + return queryRoutable; } Assert(commandType == CMD_SELECT); @@ -1674,7 +1707,7 @@ RouterSelectQuery(Query *originalQuery, Query *query, { ereport(DEBUG2, (errmsg("Found no worker with all shard placements"))); - return NULL; + return queryRoutable; } UpdateRelationNames((Node *) originalQuery, restrictionContext); @@ -1682,7 +1715,10 @@ RouterSelectQuery(Query *originalQuery, Query *query, *placementList = workerList; *anchorShardId = shardId; - return originalQuery; + /* now that the query is qualified to be routable */ + queryRoutable = true; + + return queryRoutable; } @@ -2115,10 +2151,8 @@ MultiRouterPlannableQuery(Query *query, MultiExecutorType taskExecutorType, } -#include "nodes/print.h" - /* - * ReorderInsertSelectTargetListsIfExists reorders the target lists of INSERT/SELECT + * ReorderInsertSelectTargetLists reorders the target lists of INSERT/SELECT * query which is required for deparsing purposes. The reordered query is returned. * * The necessity for this function comes from the fact that ruleutils.c is not supposed to be @@ -2128,10 +2162,9 @@ MultiRouterPlannableQuery(Query *query, MultiExecutorType taskExecutorType, * defaults etc. For the details of reordeing, see transformInsertRow(). */ Query * -ReorderInsertSelectTargetListsIfExists(Query *originalQuery) +ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte, + RangeTblEntry *subqueryRte) { - RangeTblEntry *insertRte = NULL; - RangeTblEntry *subqueryRte = NULL; Query *subquery = NULL; ListCell *insertTargetEntryCell; List *newSubqueryTargetlist = NIL; @@ -2142,14 +2175,8 @@ ReorderInsertSelectTargetListsIfExists(Query *originalQuery) Oid insertRelationId = InvalidOid; int subqueryTargetLength = 0; - /* we only apply the reording for INSERT ... SELECT queries */ - if (!InsertSelectQuery(originalQuery)) - { - return originalQuery; - } + AssertArg(InsertSelectQuery(originalQuery)); - insertRte = linitial(originalQuery->rtable); - subqueryRte = lsecond(originalQuery->rtable); subquery = subqueryRte->subquery; insertRelationId = insertRte->relid; @@ -2223,12 +2250,9 @@ ReorderInsertSelectTargetListsIfExists(Query *originalQuery) } newInsertVar = makeVar(insertTableId, originalAttrNo, - exprType( - (Node *) newSubqueryTargetEntry->expr), - exprTypmod( - (Node *) newSubqueryTargetEntry->expr), - exprCollation( - (Node *) newSubqueryTargetEntry->expr), + exprType((Node *) newSubqueryTargetEntry->expr), + exprTypmod((Node *) newSubqueryTargetEntry->expr), + exprCollation((Node *) newSubqueryTargetEntry->expr), 0); newInsertTargetEntry = makeTargetEntry((Expr *) newInsertVar, originalAttrNo, oldInsertTargetEntry->resname, @@ -2352,11 +2376,14 @@ copyRelationRestrictionContext(RelationRestrictionContext *oldContext) /* - * Replace the "hidden" partition restriction clause with the current shard's - * (passed in context) boundary value. + * ReplaceHiddenQual Replace the "hidden" partition restriction clause with + * the current shard's (passed in context) boundary value. + * + * Once we see ($1 = partition column), we replace it with + * (partCol >= shardMinValue && partCol <= shardMaxValue) */ static Node * -ReplaceHiddenParameter(Node *node, void *context) +ReplaceHiddenQual(Node *node, void *context) { ShardInterval *shardInterval = (ShardInterval *) context; Assert(shardInterval->minValueExists); @@ -2367,113 +2394,103 @@ ReplaceHiddenParameter(Node *node, void *context) return NULL; } - if (IsA(node, OpExpr)) + /* + * Look for operator expressions with two arguments. + * + * Once Found hidden op, replace with appropriate boundaries for the + * current shard interval. + * + * The boundaries are replaced in the following manner: + * (partCol >= shardMinValue && partCol <= shardMaxValue) + */ + if (IsA(node, OpExpr) && list_length(((OpExpr *) node)->args) == 2) { OpExpr *op = (OpExpr *) node; - if (list_length(op->args) == 2) + Node *leftop = get_leftop((Expr *) op); + Node *rightop = get_rightop((Expr *) op); + Param *param = NULL; + + Var *hashedGEColumn = NULL; + OpExpr *hashedGEOpExpr = NULL; + Datum shardMinValue = shardInterval->minValue; + + Var *hashedLEColumn = NULL; + OpExpr *hashedLEOpExpr = NULL; + Datum shardMaxValue = shardInterval->maxValue; + + List *hashedOperatorList = NIL; + + /* TODO: how can I get those ids */ + Oid integer4GEoperatorId = 525; + Oid integer4LEoperatorId = 523; + + /* look for the Params */ + if (IsA(leftop, Param)) { - Node *leftop = get_leftop((Expr *) op); - Node *rightop = get_rightop((Expr *) op); - Param *param = NULL; - - /* - * TODO: do we really need Var? Postgres replaces Var with Const in case we already have the same - * Var in the restrictInfo - * */ - if (IsA(leftop, Param))/* && IsA(rightop, Var)) */ - { - param = (Param *) leftop; - } - else if (IsA(rightop, Param)) /* IsA(leftop, Var))/ * &&) * / */ - { - param = (Param *) rightop; - } - - /* - * Found hidden op, replace with appropriate boundaries for the - * current shard interval. - */ - if (param && param->paramid == HIDDEN_PARAMETER_ID) - { - Var *hashedColumn = NULL; - OpExpr *hashedOperatorExpression = NULL; - - hashedColumn = MakeInt4Column(); - hashedOperatorExpression = (OpExpr *) - make_opclause(96, - InvalidOid, - false, /* no return set */ - (Expr *) hashedColumn, - (Expr *) MakeInt4Constant( - shardInterval->maxValue), - InvalidOid, InvalidOid); - hashedOperatorExpression->opfuncid = get_opcode( - hashedOperatorExpression->opno); - hashedOperatorExpression->opresulttype = get_func_rettype( - hashedOperatorExpression->opfuncid); - return (Node *) hashedOperatorExpression; - } + param = (Param *) leftop; } + else if (IsA(rightop, Param)) + { + param = (Param *) rightop; + } + + /* not an interesting param for our purpose, so return */ + if (!(param && param->paramid == HIDDEN_PARAMETER_ID)) + { + return node; + } + + /* generate hashed columns */ + hashedGEColumn = MakeInt4Column(); + hashedLEColumn = MakeInt4Column(); + + /* generate the necessary operators */ + hashedGEOpExpr = (OpExpr *) make_opclause(integer4GEoperatorId, + InvalidOid, false, + (Expr *) hashedGEColumn, + (Expr *) MakeInt4Constant( + shardMinValue), + InvalidOid, InvalidOid); + + hashedLEOpExpr = (OpExpr *) make_opclause(integer4LEoperatorId, + InvalidOid, false, + (Expr *) hashedLEColumn, + (Expr *) MakeInt4Constant( + shardMaxValue), + InvalidOid, InvalidOid); + + /* update the operators with correct operator numbers and function ids */ + hashedGEOpExpr->opfuncid = get_opcode(hashedGEOpExpr->opno); + hashedGEOpExpr->opresulttype = get_func_rettype(hashedGEOpExpr->opfuncid); + + hashedLEOpExpr->opfuncid = get_opcode(hashedLEOpExpr->opno); + hashedLEOpExpr->opresulttype = get_func_rettype(hashedLEOpExpr->opfuncid); + + /* finally add the hashed operators to a list and return it */ + hashedOperatorList = lappend(hashedOperatorList, hashedGEOpExpr); + hashedOperatorList = lappend(hashedOperatorList, hashedLEOpExpr); + + return (Node *) hashedOperatorList; } + if (IsA(node, Query)) { /* FIXME: probably can remove support for this */ /* to support CTEs, subqueries, etc */ return (Node *) query_tree_mutator((Query *) node, - ReplaceHiddenParameter, + ReplaceHiddenQual, context, QTW_EXAMINE_RTES); } else if (IsA(node, RestrictInfo)) { RestrictInfo *restrictInfo = (RestrictInfo *) node; - restrictInfo->clause = (Expr *) ReplaceHiddenParameter( + restrictInfo->clause = (Expr *) ReplaceHiddenQual( (Node *) restrictInfo->clause, context); return (Node *) restrictInfo; } - return expression_tree_mutator(node, ReplaceHiddenParameter, context); -} - - -/* - * MakeInt4Column creates a column of int4 type with invalid table id and max - * attribute number. - */ -static Var * -MakeInt4Column() -{ - Index tableId = 0; - AttrNumber columnAttributeNumber = RESERVED_HASHED_COLUMN_ID; - Oid columnType = INT4OID; - int32 columnTypeMod = -1; - Oid columnCollationOid = InvalidOid; - Index columnLevelSup = 0; - - Var *int4Column = makeVar(tableId, columnAttributeNumber, columnType, - columnTypeMod, columnCollationOid, columnLevelSup); - return int4Column; -} - - -/* - * MakeInt4Constant creates a new constant of int4 type and assigns the given - * value as a constant value. - */ -static Const * -MakeInt4Constant(Datum constantValue) -{ - Oid constantType = INT4OID; - int32 constantTypeMode = -1; - Oid constantCollationId = InvalidOid; - int constantLength = sizeof(int32); - bool constantIsNull = false; - bool constantByValue = true; - - Const *int4Constant = makeConst(constantType, constantTypeMode, constantCollationId, - constantLength, constantValue, constantIsNull, - constantByValue); - return int4Constant; + return expression_tree_mutator(node, ReplaceHiddenQual, context); } diff --git a/src/backend/distributed/test/deparse_shard_query.c b/src/backend/distributed/test/deparse_shard_query.c index 4d586d3a0..883f59258 100644 --- a/src/backend/distributed/test/deparse_shard_query.c +++ b/src/backend/distributed/test/deparse_shard_query.c @@ -59,7 +59,16 @@ deparse_shard_query_test(PG_FUNCTION_ARGS) Query *query = lfirst(queryTreeCell); StringInfo buffer = makeStringInfo(); - ReorderInsertSelectTargetListsIfExists(query); + /* reoreder the target list only for INSERT .. SELECT queries */ + if (InsertSelectQuery(query)) + { + RangeTblEntry *insertRte = linitial(query->rtable); + RangeTblEntry *subqueryRte = lsecond(query->rtable); + + + ReorderInsertSelectTargetLists(query, insertRte, subqueryRte); + } + deparse_shard_query(query, InvalidOid, 0, buffer); elog(INFO, "query: %s", buffer->data); diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index e46959d4c..e1fc30aea 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -245,9 +245,10 @@ extern void UpdateConstraint(Node *baseConstraint, ShardInterval *shardInterval) extern bool SimpleOpExpression(Expr *clause); extern bool OpExpressionContainsColumn(OpExpr *operatorExpression, Var *partitionColumn); +/* helper functions */ +extern Var * MakeInt4Column(void); +extern Const * MakeInt4Constant(Datum constantValue); extern int CompareShardPlacements(const void *leftElement, const void *rightElement); - -/* Function declarations for sorting shards. */ extern bool ShardIntervalsOverlap(ShardInterval *firstInterval, ShardInterval *secondInterval); diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index 2f362f9e3..9f89883b6 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -31,9 +31,11 @@ extern MultiPlan * MultiRouterPlanCreate(Query *originalQuery, Query *query, MultiExecutorType taskExecutorType, RelationRestrictionContext *restrictionContext); -extern void AddHiddenPartitionColumnParameter(Query *originalQuery); +extern void AddHiddenPartitionColumnEqualityQual(Query *originalQuery); extern void ErrorIfModifyQueryNotSupported(Query *queryTree); -extern Query * ReorderInsertSelectTargetListsIfExists(Query *originalQuery); +extern Query * ReorderInsertSelectTargetLists(Query *originalQuery, + RangeTblEntry *insertRte, + RangeTblEntry *subqueryRte); extern bool InsertSelectQuery(Query *query); #endif /* MULTI_ROUTER_PLANNER_H */ diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index 91613c6df..9c7a519fa 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -44,6 +44,19 @@ SELECT master_create_worker_shards('agg_events', 4, 2); -- make tables as co-located UPDATE pg_dist_partition SET colocationid = 100000 WHERE logicalrelid IN ('raw_events_first', 'raw_events_second', 'agg_events'); +CREATE TABLE reference_table (user_id int); +SELECT master_create_distributed_table('reference_table', 'user_id', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_worker_shards('reference_table', 1, 2); + master_create_worker_shards +----------------------------- + +(1 row) + INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES (1, now(), 10, 100, 1000.1, 10000); INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES @@ -654,7 +667,86 @@ DEBUG: sent COMMIT over connection 13300009 DEBUG: sent COMMIT over connection 13300009 DEBUG: sent COMMIT over connection 13300010 DEBUG: sent COMMIT over connection 13300010 +-- a very simple UNION query +INSERT INTO + raw_events_first(user_id) +SELECT + user_id +FROM + ((SELECT user_id FROM raw_events_first) UNION + (SELECT user_id FROM raw_events_second)) as foo; +DEBUG: StartTransactionCommand +DEBUG: StartTransaction +DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +ERROR: cannot perform distributed planning for the given modification +DETAIL: Set operations are not allowed in INSERT ... SELECT queries +-- same query with slightly different syntax, but this time we cannot push it down +INSERT INTO + raw_events_first(user_id) + (SELECT user_id FROM raw_events_first) UNION + (SELECT user_id FROM raw_events_first); +DEBUG: StartTransactionCommand +DEBUG: StartTransaction +DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +ERROR: cannot perform distributed planning for the given modification +DETAIL: Set operations are not allowed in INSERT ... SELECT queries +-- similar query with a filter on two of the queries +INSERT INTO + raw_events_first(user_id) +SELECT + user_id +FROM + ((SELECT user_id FROM raw_events_first WHERE user_id = 15) UNION + (SELECT user_id FROM raw_events_second where user_id = 17)) as foo; +DEBUG: StartTransactionCommand +DEBUG: StartTransaction +DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +ERROR: cannot perform distributed planning for the given modification +DETAIL: Set operations are not allowed in INSERT ... SELECT queries -- TODO: UUIDs +-- a test with reference table JOINs +INSERT INTO + agg_events (user_id, value_1_agg) +SELECT + raw_events_first.user_id, sum(value_1) +FROM + reference_table, raw_events_first +WHERE + raw_events_first.user_id = reference_table.user_id +GROUP BY + raw_events_first.user_id; +DEBUG: StartTransactionCommand +DEBUG: StartTransaction +DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS citus_table_alias (user_id, value_1_agg) SELECT raw_events_first.user_id, sum(raw_events_first.value_1) AS sum FROM public.reference_table_13300012 reference_table, public.raw_events_first_13300000 raw_events_first WHERE (raw_events_first.user_id = reference_table.user_id) GROUP BY raw_events_first.user_id +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id, value_1_agg) SELECT raw_events_first.user_id, sum(raw_events_first.value_1) AS sum FROM public.reference_table_13300012 reference_table, public.raw_events_first_13300001 raw_events_first WHERE (raw_events_first.user_id = reference_table.user_id) GROUP BY raw_events_first.user_id +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id, value_1_agg) SELECT raw_events_first.user_id, sum(raw_events_first.value_1) AS sum FROM public.reference_table_13300012 reference_table, public.raw_events_first_13300002 raw_events_first WHERE (raw_events_first.user_id = reference_table.user_id) GROUP BY raw_events_first.user_id +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id, value_1_agg) SELECT raw_events_first.user_id, sum(raw_events_first.value_1) AS sum FROM public.reference_table_13300012 reference_table, public.raw_events_first_13300003 raw_events_first WHERE (raw_events_first.user_id = reference_table.user_id) GROUP BY raw_events_first.user_id +DEBUG: ProcessQuery +DEBUG: Plan is router executable +DEBUG: CommitTransactionCommand +DEBUG: CommitTransaction +DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +DEBUG: sent COMMIT over connection 13300008 +DEBUG: sent COMMIT over connection 13300008 +DEBUG: sent COMMIT over connection 13300011 +DEBUG: sent COMMIT over connection 13300011 +DEBUG: sent COMMIT over connection 13300009 +DEBUG: sent COMMIT over connection 13300009 +DEBUG: sent COMMIT over connection 13300010 +DEBUG: sent COMMIT over connection 13300010 -- unsupported JOIN INSERT INTO agg_events (value_4_agg, @@ -673,9 +765,6 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4, DEBUG: StartTransactionCommand DEBUG: StartTransaction DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -DEBUG: predicate pruning for shardId 13300001 -DEBUG: predicate pruning for shardId 13300002 -DEBUG: predicate pruning for shardId 13300003 ERROR: cannot perform distributed planning for the given modification DETAIL: Select query cannot be pushed down to the worker. -- INSERT partition column does not match with SELECT partition column @@ -699,38 +788,84 @@ DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: ERROR: SELECT query should return bare partition column on the same ordinal position with INSERT query's partition column -- error cases -- no part column at all -INSERT INTO raw_events_second (value_1) SELECT value_1 FROM raw_events_first; +INSERT INTO raw_events_second + (value_1) +SELECT value_1 +FROM raw_events_first; DEBUG: StartTransactionCommand DEBUG: StartTransaction DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: ERROR: SELECT query should return bare partition column on the same ordinal position with INSERT query's partition column -INSERT INTO raw_events_second (value_1) SELECT user_id FROM raw_events_first; +INSERT INTO raw_events_second + (value_1) +SELECT user_id +FROM raw_events_first; DEBUG: StartTransactionCommand DEBUG: StartTransaction DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: ERROR: SELECT query should return bare partition column on the same ordinal position with INSERT query's partition column -INSERT INTO raw_events_second (user_id) SELECT value_1 FROM raw_events_first; +INSERT INTO raw_events_second + (user_id) +SELECT value_1 +FROM raw_events_first; DEBUG: StartTransactionCommand DEBUG: StartTransaction DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: ERROR: SELECT query should return bare partition column on the same ordinal position with INSERT query's partition column -INSERT INTO raw_events_second (user_id) SELECT user_id * 2 FROM raw_events_first; +INSERT INTO raw_events_second + (user_id) +SELECT user_id * 2 +FROM raw_events_first; DEBUG: StartTransactionCommand DEBUG: StartTransaction DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: ERROR: SELECT query should return bare partition column on the same ordinal position with INSERT query's partition column -INSERT INTO raw_events_second (user_id) SELECT user_id::bigint FROM raw_events_first; +INSERT INTO raw_events_second + (user_id) +SELECT user_id :: bigint +FROM raw_events_first; DEBUG: StartTransactionCommand DEBUG: StartTransaction DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: ERROR: SELECT query should return bare partition column on the same ordinal position with INSERT query's partition column -INSERT INTO agg_events (value_3_agg, value_4_agg, value_1_agg, value_2_agg, user_id) SELECT sum(value_3), count(value_4), user_id, sum(value_1), avg(value_2) FROM raw_events_first GROUP BY user_id; +INSERT INTO agg_events + (value_3_agg, + value_4_agg, + value_1_agg, + value_2_agg, + user_id) +SELECT SUM(value_3), + Count(value_4), + user_id, + SUM(value_1), + Avg(value_2) +FROM raw_events_first +GROUP BY user_id; DEBUG: StartTransactionCommand DEBUG: StartTransaction DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: ERROR: SELECT query should return bare partition column on the same ordinal position with INSERT query's partition column -INSERT INTO agg_events (value_3_agg, value_4_agg, value_1_agg, value_2_agg, user_id) SELECT sum(value_3), count(value_4), user_id, sum(value_1), value_2 FROM raw_events_first GROUP BY user_id, value_2; +INSERT INTO agg_events + (value_3_agg, + value_4_agg, + value_1_agg, + value_2_agg, + user_id) +SELECT SUM(value_3), + Count(value_4), + user_id, + SUM(value_1), + value_2 +FROM raw_events_first +GROUP BY user_id, + value_2; DEBUG: StartTransactionCommand DEBUG: StartTransaction DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: ERROR: SELECT query should return bare partition column on the same ordinal position with INSERT query's partition column +-- tables should be co-located +INSERT INTO agg_events (user_id) SELECT user_id FROM reference_table; +DEBUG: StartTransactionCommand +DEBUG: StartTransaction +DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +ERROR: INSERT target table and the source relation of the SELECT partition column value must be colocated diff --git a/src/test/regress/sql/multi_insert_select.sql b/src/test/regress/sql/multi_insert_select.sql index 44e5af07b..32eb751f2 100644 --- a/src/test/regress/sql/multi_insert_select.sql +++ b/src/test/regress/sql/multi_insert_select.sql @@ -14,7 +14,6 @@ CREATE TABLE raw_events_second (user_id int, time timestamp, value_1 int, value_ SELECT master_create_distributed_table('raw_events_second', 'user_id', 'hash'); SELECT master_create_worker_shards('raw_events_second', 4, 2); - CREATE TABLE agg_events (user_id int, value_1_agg int, value_2_agg int, value_3_agg float, value_4_agg bigint, agg_time timestamp, UNIQUE(user_id, value_1_agg)); SELECT master_create_distributed_table('agg_events', 'user_id', 'hash'); SELECT master_create_worker_shards('agg_events', 4, 2); @@ -23,6 +22,10 @@ SELECT master_create_worker_shards('agg_events', 4, 2); UPDATE pg_dist_partition SET colocationid = 100000 WHERE logicalrelid IN ('raw_events_first', 'raw_events_second', 'agg_events'); +CREATE TABLE reference_table (user_id int); +SELECT master_create_distributed_table('reference_table', 'user_id', 'hash'); +SELECT master_create_worker_shards('reference_table', 1, 2); + INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES (1, now(), 10, 100, 1000.1, 10000); INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES @@ -209,9 +212,45 @@ SELECT FROM raw_events_first GROUP BY user_id; +-- a very simple UNION query +INSERT INTO + raw_events_first(user_id) +SELECT + user_id +FROM + ((SELECT user_id FROM raw_events_first) UNION + (SELECT user_id FROM raw_events_second)) as foo; + +-- same query with slightly different syntax, but this time we cannot push it down +INSERT INTO + raw_events_first(user_id) + (SELECT user_id FROM raw_events_first) UNION + (SELECT user_id FROM raw_events_first); + +-- similar query with a filter on two of the queries +INSERT INTO + raw_events_first(user_id) +SELECT + user_id +FROM + ((SELECT user_id FROM raw_events_first WHERE user_id = 15) UNION + (SELECT user_id FROM raw_events_second where user_id = 17)) as foo; -- TODO: UUIDs +-- a test with reference table JOINs +INSERT INTO + agg_events (user_id, value_1_agg) +SELECT + raw_events_first.user_id, sum(value_1) +FROM + reference_table, raw_events_first +WHERE + raw_events_first.user_id = reference_table.user_id +GROUP BY + raw_events_first.user_id; + + -- unsupported JOIN INSERT INTO agg_events (value_4_agg, @@ -247,10 +286,59 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4, -- error cases -- no part column at all -INSERT INTO raw_events_second (value_1) SELECT value_1 FROM raw_events_first; -INSERT INTO raw_events_second (value_1) SELECT user_id FROM raw_events_first; -INSERT INTO raw_events_second (user_id) SELECT value_1 FROM raw_events_first; -INSERT INTO raw_events_second (user_id) SELECT user_id * 2 FROM raw_events_first; -INSERT INTO raw_events_second (user_id) SELECT user_id::bigint FROM raw_events_first; -INSERT INTO agg_events (value_3_agg, value_4_agg, value_1_agg, value_2_agg, user_id) SELECT sum(value_3), count(value_4), user_id, sum(value_1), avg(value_2) FROM raw_events_first GROUP BY user_id; -INSERT INTO agg_events (value_3_agg, value_4_agg, value_1_agg, value_2_agg, user_id) SELECT sum(value_3), count(value_4), user_id, sum(value_1), value_2 FROM raw_events_first GROUP BY user_id, value_2; +INSERT INTO raw_events_second + (value_1) +SELECT value_1 +FROM raw_events_first; + +INSERT INTO raw_events_second + (value_1) +SELECT user_id +FROM raw_events_first; + +INSERT INTO raw_events_second + (user_id) +SELECT value_1 +FROM raw_events_first; + +INSERT INTO raw_events_second + (user_id) +SELECT user_id * 2 +FROM raw_events_first; + +INSERT INTO raw_events_second + (user_id) +SELECT user_id :: bigint +FROM raw_events_first; + +INSERT INTO agg_events + (value_3_agg, + value_4_agg, + value_1_agg, + value_2_agg, + user_id) +SELECT SUM(value_3), + Count(value_4), + user_id, + SUM(value_1), + Avg(value_2) +FROM raw_events_first +GROUP BY user_id; + +INSERT INTO agg_events + (value_3_agg, + value_4_agg, + value_1_agg, + value_2_agg, + user_id) +SELECT SUM(value_3), + Count(value_4), + user_id, + SUM(value_1), + value_2 +FROM raw_events_first +GROUP BY user_id, + value_2; + +-- tables should be co-located +INSERT INTO agg_events (user_id) SELECT user_id FROM reference_table;