mirror of https://github.com/citusdata/citus.git
Fix pushing down wrong INSERT ... SELECT queries
Before this commit, in certain cases router planner allowed pushing
down JOINs that are not on the partition keys.
With @anarazel's suggestion, we change the logic to use uninstantiated
parameter. Previously, the planner was traversing on the restriction
information and once it finds the parameter, it was replacing it with
the shard range. With this commit, instead of traversing the restrict
infos, the planner explicitly checks for the equivalence of the relation
partition key with the uninstantiated parameter. If finds an equivalence,
it adds the restrictions. In this way, we have more control over the
queries that are pushed down.
Based on 11665dbe3c
pull/1385/head
parent
b8aa47e58f
commit
03083ea59f
|
@ -48,6 +48,7 @@
|
|||
#include "nodes/pg_list.h"
|
||||
#include "nodes/primnodes.h"
|
||||
#include "optimizer/clauses.h"
|
||||
#include "optimizer/paths.h"
|
||||
#include "optimizer/predtest.h"
|
||||
#include "optimizer/restrictinfo.h"
|
||||
#include "optimizer/var.h"
|
||||
|
@ -87,6 +88,8 @@ static Task * RouterModifyTaskForShardInterval(Query *originalQuery,
|
|||
RelationRestrictionContext *
|
||||
restrictionContext,
|
||||
uint32 taskIdIndex);
|
||||
static List * HashedShardIntervalOpExpressions(ShardInterval *shardInterval);
|
||||
static Param * UninstantiatedParameterForColumn(Var *relationPartitionKey);
|
||||
static bool MasterIrreducibleExpression(Node *expression, bool *varArgument,
|
||||
bool *badCoalesce);
|
||||
static bool MasterIrreducibleExpressionWalker(Node *expression, WalkerState *state);
|
||||
|
@ -116,7 +119,6 @@ static bool MultiRouterPlannableQuery(Query *query,
|
|||
RelationRestrictionContext *restrictionContext);
|
||||
static RelationRestrictionContext * CopyRelationRestrictionContext(
|
||||
RelationRestrictionContext *oldContext);
|
||||
static Node * InstantiatePartitionQual(Node *node, void *context);
|
||||
static DeferredErrorMessage * InsertSelectQuerySupported(Query *queryTree,
|
||||
RangeTblEntry *insertRte,
|
||||
RangeTblEntry *subqueryRte,
|
||||
|
@ -378,6 +380,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
|
|||
bool upsertQuery = false;
|
||||
bool replacePrunedQueryWithDummy = false;
|
||||
bool allReferenceTables = restrictionContext->allReferenceTables;
|
||||
List *hashedOpExpressions = NIL;
|
||||
|
||||
/* grab shared metadata lock to stop concurrent placement additions */
|
||||
LockShardDistributionMetadata(shardId, ShareLock);
|
||||
|
@ -390,19 +393,43 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
|
|||
{
|
||||
RelationRestriction *restriction = lfirst(restrictionCell);
|
||||
List *originalBaserestrictInfo = restriction->relOptInfo->baserestrictinfo;
|
||||
Var *relationPartitionKey = PartitionColumn(restriction->relationId,
|
||||
restriction->index);
|
||||
Param *uninstantiatedParameter = NULL;
|
||||
|
||||
/*
|
||||
* We haven't added the quals if all participating tables are reference
|
||||
* tables. Thus, now skip instantiating them.
|
||||
* We don't need to add restriction to reference tables given that they are
|
||||
* already single sharded and always prune to that single shard.
|
||||
*/
|
||||
if (allReferenceTables)
|
||||
if (PartitionMethod(restriction->relationId) == DISTRIBUTE_BY_NONE)
|
||||
{
|
||||
break;
|
||||
continue;
|
||||
}
|
||||
|
||||
originalBaserestrictInfo =
|
||||
(List *) InstantiatePartitionQual((Node *) originalBaserestrictInfo,
|
||||
shardInterval);
|
||||
hashedOpExpressions = HashedShardIntervalOpExpressions(shardInterval);
|
||||
Assert(list_length(hashedOpExpressions) == 2);
|
||||
|
||||
/*
|
||||
* Here we check whether the planner knows an equality between the partition column
|
||||
* and the uninstantiated parameter. If such an equality exists, we simply add the
|
||||
* shard restrictions.
|
||||
*/
|
||||
uninstantiatedParameter = UninstantiatedParameterForColumn(relationPartitionKey);
|
||||
if (exprs_known_equal(restriction->plannerInfo, (Node *) relationPartitionKey,
|
||||
(Node *) uninstantiatedParameter))
|
||||
{
|
||||
RestrictInfo *geRestrictInfo = NULL;
|
||||
RestrictInfo *leRestrictInfo = NULL;
|
||||
|
||||
OpExpr *hashedGEOpExpr = (OpExpr *) linitial(hashedOpExpressions);
|
||||
OpExpr *hashedLEOpExpr = (OpExpr *) lsecond(hashedOpExpressions);
|
||||
|
||||
geRestrictInfo = make_simple_restrictinfo((Expr *) hashedGEOpExpr);
|
||||
originalBaserestrictInfo = lappend(originalBaserestrictInfo, geRestrictInfo);
|
||||
|
||||
leRestrictInfo = make_simple_restrictinfo((Expr *) hashedLEOpExpr);
|
||||
originalBaserestrictInfo = lappend(originalBaserestrictInfo, leRestrictInfo);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -501,6 +528,96 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* HashedShardIntervalOpExpressions returns a list of OpExprs with exactly two
|
||||
* items in it. The list consists of shard interval ranges with hashed columns
|
||||
* such as (hashColumn >= shardMinValue) and (hashedColumn <= shardMaxValue).
|
||||
*
|
||||
* The function errors out if the given shard interval does not belong to a hash
|
||||
* distributed table.
|
||||
*/
|
||||
static List *
|
||||
HashedShardIntervalOpExpressions(ShardInterval *shardInterval)
|
||||
{
|
||||
List *operatorExpressions = NIL;
|
||||
Var *hashedGEColumn = NULL;
|
||||
Var *hashedLEColumn = NULL;
|
||||
OpExpr *hashedGEOpExpr = NULL;
|
||||
OpExpr *hashedLEOpExpr = NULL;
|
||||
Oid integer4GEoperatorId = InvalidOid;
|
||||
Oid integer4LEoperatorId = InvalidOid;
|
||||
|
||||
Datum shardMinValue = shardInterval->minValue;
|
||||
Datum shardMaxValue = shardInterval->maxValue;
|
||||
char partitionMethod = PartitionMethod(shardInterval->relationId);
|
||||
|
||||
if (partitionMethod != DISTRIBUTE_BY_HASH)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
errmsg("cannot create shard interval operator expression for "
|
||||
"distributed relations other than hash distributed "
|
||||
"relations")));
|
||||
}
|
||||
|
||||
/* get the integer >=, <= operators from the catalog */
|
||||
integer4GEoperatorId = get_opfamily_member(INTEGER_BTREE_FAM_OID, INT4OID,
|
||||
INT4OID,
|
||||
BTGreaterEqualStrategyNumber);
|
||||
integer4LEoperatorId = get_opfamily_member(INTEGER_BTREE_FAM_OID, INT4OID,
|
||||
INT4OID,
|
||||
BTLessEqualStrategyNumber);
|
||||
|
||||
/* generate hashed columns */
|
||||
hashedGEColumn = MakeInt4Column();
|
||||
hashedLEColumn = MakeInt4Column();
|
||||
|
||||
/* generate the necessary operators */
|
||||
hashedGEOpExpr = (OpExpr *) make_opclause(integer4GEoperatorId, InvalidOid, false,
|
||||
(Expr *) hashedGEColumn,
|
||||
(Expr *) MakeInt4Constant(shardMinValue),
|
||||
InvalidOid, InvalidOid);
|
||||
|
||||
hashedLEOpExpr = (OpExpr *) make_opclause(integer4LEoperatorId, InvalidOid, false,
|
||||
(Expr *) hashedLEColumn,
|
||||
(Expr *) MakeInt4Constant(shardMaxValue),
|
||||
InvalidOid, InvalidOid);
|
||||
|
||||
/* update the operators with correct operator numbers and function ids */
|
||||
hashedGEOpExpr->opfuncid = get_opcode(hashedGEOpExpr->opno);
|
||||
hashedGEOpExpr->opresulttype = get_func_rettype(hashedGEOpExpr->opfuncid);
|
||||
operatorExpressions = lappend(operatorExpressions, hashedGEOpExpr);
|
||||
|
||||
hashedLEOpExpr->opfuncid = get_opcode(hashedLEOpExpr->opno);
|
||||
hashedLEOpExpr->opresulttype = get_func_rettype(hashedLEOpExpr->opfuncid);
|
||||
operatorExpressions = lappend(operatorExpressions, hashedLEOpExpr);
|
||||
|
||||
return operatorExpressions;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* UninstantiatedParameterForColumn returns a Param that can be used as an uninstantiated
|
||||
* parameter for the given column in the sense that paramtype, paramtypmod and collid
|
||||
* is set to the input Var's corresponding values.
|
||||
*
|
||||
* Note that we're using hard coded UNINSTANTIATED_PARAMETER_ID which is the required parameter
|
||||
* for our purposes. See multi_planner.c@multi_planner for the details.
|
||||
*/
|
||||
static Param *
|
||||
UninstantiatedParameterForColumn(Var *relationPartitionKey)
|
||||
{
|
||||
Param *uninstantiatedParameter = makeNode(Param);
|
||||
|
||||
uninstantiatedParameter->paramkind = PARAM_EXTERN;
|
||||
uninstantiatedParameter->paramid = UNINSTANTIATED_PARAMETER_ID;
|
||||
uninstantiatedParameter->paramtype = relationPartitionKey->vartype;
|
||||
uninstantiatedParameter->paramtypmod = relationPartitionKey->vartypmod;
|
||||
uninstantiatedParameter->paramcollid = relationPartitionKey->varcollid;
|
||||
|
||||
return uninstantiatedParameter;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* AddShardIntervalRestrictionToSelect adds the following range boundaries
|
||||
* with the given subquery and shardInterval:
|
||||
|
@ -1148,7 +1265,7 @@ AddUninstantiatedPartitionRestriction(Query *originalQuery)
|
|||
static void
|
||||
AddUninstantiatedEqualityQual(Query *query, Var *partitionColumn)
|
||||
{
|
||||
Param *equalityParameter = makeNode(Param);
|
||||
Param *equalityParameter = UninstantiatedParameterForColumn(partitionColumn);
|
||||
OpExpr *uninstantiatedEqualityQual = NULL;
|
||||
Oid partitionColumnCollid = InvalidOid;
|
||||
Oid lessThanOperator = InvalidOid;
|
||||
|
@ -1166,13 +1283,6 @@ AddUninstantiatedEqualityQual(Query *query, Var *partitionColumn)
|
|||
|
||||
partitionColumnCollid = partitionColumn->varcollid;
|
||||
|
||||
equalityParameter->paramkind = PARAM_EXTERN;
|
||||
equalityParameter->paramid = UNINSTANTIATED_PARAMETER_ID;
|
||||
equalityParameter->paramtype = partitionColumn->vartype;
|
||||
equalityParameter->paramtypmod = partitionColumn->vartypmod;
|
||||
equalityParameter->paramcollid = partitionColumnCollid;
|
||||
equalityParameter->location = -1;
|
||||
|
||||
/* create an equality on the on the target partition column */
|
||||
uninstantiatedEqualityQual = (OpExpr *) make_opclause(equalsOperator, InvalidOid,
|
||||
false,
|
||||
|
@ -2942,129 +3052,6 @@ CopyRelationRestrictionContext(RelationRestrictionContext *oldContext)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* InstantiatePartitionQual replaces the "uninstantiated" partition
|
||||
* restriction clause with the current shard's (passed in context)
|
||||
* boundary value.
|
||||
*
|
||||
* Once we see ($1 = partition column), we replace it with
|
||||
* (partCol >= shardMinValue && partCol <= shardMaxValue).
|
||||
*/
|
||||
static Node *
|
||||
InstantiatePartitionQual(Node *node, void *context)
|
||||
{
|
||||
ShardInterval *shardInterval = (ShardInterval *) context;
|
||||
Assert(shardInterval->minValueExists);
|
||||
Assert(shardInterval->maxValueExists);
|
||||
|
||||
if (node == NULL)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
* Look for operator expressions with two arguments.
|
||||
*
|
||||
* Once Found the uninstantiate, replace with appropriate boundaries for the
|
||||
* current shard interval.
|
||||
*
|
||||
* The boundaries are replaced in the following manner:
|
||||
* (partCol >= shardMinValue && partCol <= shardMaxValue)
|
||||
*/
|
||||
if (IsA(node, OpExpr) && list_length(((OpExpr *) node)->args) == 2)
|
||||
{
|
||||
OpExpr *op = (OpExpr *) node;
|
||||
Node *leftop = get_leftop((Expr *) op);
|
||||
Node *rightop = get_rightop((Expr *) op);
|
||||
Param *param = NULL;
|
||||
|
||||
Var *hashedGEColumn = NULL;
|
||||
OpExpr *hashedGEOpExpr = NULL;
|
||||
Datum shardMinValue = shardInterval->minValue;
|
||||
|
||||
Var *hashedLEColumn = NULL;
|
||||
OpExpr *hashedLEOpExpr = NULL;
|
||||
Datum shardMaxValue = shardInterval->maxValue;
|
||||
|
||||
List *hashedOperatorList = NIL;
|
||||
|
||||
Oid integer4GEoperatorId = InvalidOid;
|
||||
Oid integer4LEoperatorId = InvalidOid;
|
||||
|
||||
/* look for the Params */
|
||||
if (IsA(leftop, Param))
|
||||
{
|
||||
param = (Param *) leftop;
|
||||
}
|
||||
else if (IsA(rightop, Param))
|
||||
{
|
||||
param = (Param *) rightop;
|
||||
}
|
||||
|
||||
/* not an interesting param for our purpose, so return */
|
||||
if (!(param && param->paramid == UNINSTANTIATED_PARAMETER_ID))
|
||||
{
|
||||
return node;
|
||||
}
|
||||
|
||||
/* get the integer >=, <= operators from the catalog */
|
||||
integer4GEoperatorId = get_opfamily_member(INTEGER_BTREE_FAM_OID, INT4OID,
|
||||
INT4OID,
|
||||
BTGreaterEqualStrategyNumber);
|
||||
integer4LEoperatorId = get_opfamily_member(INTEGER_BTREE_FAM_OID, INT4OID,
|
||||
INT4OID,
|
||||
BTLessEqualStrategyNumber);
|
||||
|
||||
/* generate hashed columns */
|
||||
hashedGEColumn = MakeInt4Column();
|
||||
hashedLEColumn = MakeInt4Column();
|
||||
|
||||
/* generate the necessary operators */
|
||||
hashedGEOpExpr = (OpExpr *) make_opclause(integer4GEoperatorId,
|
||||
InvalidOid, false,
|
||||
(Expr *) hashedGEColumn,
|
||||
(Expr *) MakeInt4Constant(
|
||||
shardMinValue),
|
||||
InvalidOid, InvalidOid);
|
||||
|
||||
hashedLEOpExpr = (OpExpr *) make_opclause(integer4LEoperatorId,
|
||||
InvalidOid, false,
|
||||
(Expr *) hashedLEColumn,
|
||||
(Expr *) MakeInt4Constant(
|
||||
shardMaxValue),
|
||||
InvalidOid, InvalidOid);
|
||||
|
||||
/* update the operators with correct operator numbers and function ids */
|
||||
hashedGEOpExpr->opfuncid = get_opcode(hashedGEOpExpr->opno);
|
||||
hashedGEOpExpr->opresulttype = get_func_rettype(hashedGEOpExpr->opfuncid);
|
||||
|
||||
hashedLEOpExpr->opfuncid = get_opcode(hashedLEOpExpr->opno);
|
||||
hashedLEOpExpr->opresulttype = get_func_rettype(hashedLEOpExpr->opfuncid);
|
||||
|
||||
/* finally add the hashed operators to a list and return it */
|
||||
hashedOperatorList = lappend(hashedOperatorList, hashedGEOpExpr);
|
||||
hashedOperatorList = lappend(hashedOperatorList, hashedLEOpExpr);
|
||||
|
||||
return (Node *) hashedOperatorList;
|
||||
}
|
||||
|
||||
/* ensure that it is not a query */
|
||||
Assert(!IsA(node, Query));
|
||||
|
||||
/* recurse into restrict info */
|
||||
if (IsA(node, RestrictInfo))
|
||||
{
|
||||
RestrictInfo *restrictInfo = (RestrictInfo *) node;
|
||||
restrictInfo->clause = (Expr *) InstantiatePartitionQual(
|
||||
(Node *) restrictInfo->clause, context);
|
||||
|
||||
return (Node *) restrictInfo;
|
||||
}
|
||||
|
||||
return expression_tree_mutator(node, InstantiatePartitionQual, context);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ErrorIfQueryHasModifyingCTE checks if the query contains modifying common table
|
||||
* expressions and errors out if it does.
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -326,6 +326,57 @@ WHERE user_id IN (SELECT user_id
|
|||
FROM raw_events_second
|
||||
WHERE user_id = 2);
|
||||
|
||||
INSERT INTO raw_events_second
|
||||
(user_id)
|
||||
SELECT user_id
|
||||
FROM raw_events_first
|
||||
WHERE user_id IN (SELECT user_id
|
||||
FROM raw_events_second
|
||||
WHERE user_id != 2 AND value_1 = 2000)
|
||||
ON conflict (user_id, value_1) DO NOTHING;
|
||||
|
||||
INSERT INTO raw_events_second
|
||||
(user_id)
|
||||
SELECT user_id
|
||||
FROM raw_events_first
|
||||
WHERE user_id IN (SELECT user_id
|
||||
FROM raw_events_second WHERE false);
|
||||
|
||||
INSERT INTO raw_events_second
|
||||
(user_id)
|
||||
SELECT user_id
|
||||
FROM raw_events_first
|
||||
WHERE user_id IN (SELECT user_id
|
||||
FROM raw_events_second
|
||||
WHERE value_1 = 1000 OR value_1 = 2000 OR value_1 = 3000);
|
||||
|
||||
-- lets mix subqueries in FROM clause and subqueries in WHERE
|
||||
INSERT INTO agg_events
|
||||
(user_id)
|
||||
SELECT f2.id FROM
|
||||
|
||||
(SELECT
|
||||
id
|
||||
FROM (SELECT reference_table.user_id AS id
|
||||
FROM raw_events_first,
|
||||
reference_table
|
||||
WHERE raw_events_first.user_id = reference_table.user_id ) AS foo) as f
|
||||
INNER JOIN
|
||||
(SELECT v4,
|
||||
v1,
|
||||
id
|
||||
FROM (SELECT SUM(raw_events_second.value_4) AS v4,
|
||||
SUM(raw_events_first.value_1) AS v1,
|
||||
raw_events_second.user_id AS id
|
||||
FROM raw_events_first,
|
||||
raw_events_second
|
||||
WHERE raw_events_first.user_id = raw_events_second.user_id
|
||||
GROUP BY raw_events_second.user_id
|
||||
HAVING SUM(raw_events_second.value_4) > 1000) AS foo2 ) as f2
|
||||
ON (f.id = f2.id)
|
||||
WHERE f.id IN (SELECT user_id
|
||||
FROM raw_events_second);
|
||||
|
||||
-- some UPSERTS
|
||||
INSERT INTO agg_events AS ae
|
||||
(
|
||||
|
@ -496,7 +547,191 @@ FROM
|
|||
((SELECT user_id FROM raw_events_first WHERE user_id = 15) EXCEPT
|
||||
(SELECT user_id FROM raw_events_second where user_id = 17)) as foo;
|
||||
|
||||
-- unsupported JOIN
|
||||
-- some supported LEFT joins
|
||||
INSERT INTO agg_events (user_id)
|
||||
SELECT
|
||||
raw_events_first.user_id
|
||||
FROM
|
||||
raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.user_id;
|
||||
|
||||
INSERT INTO agg_events (user_id)
|
||||
SELECT
|
||||
raw_events_second.user_id
|
||||
FROM
|
||||
reference_table LEFT JOIN raw_events_second ON reference_table.user_id = raw_events_second.user_id;
|
||||
|
||||
INSERT INTO agg_events (user_id)
|
||||
SELECT
|
||||
raw_events_first.user_id
|
||||
FROM
|
||||
raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.user_id
|
||||
WHERE raw_events_first.user_id = 10;
|
||||
|
||||
INSERT INTO agg_events (user_id)
|
||||
SELECT
|
||||
raw_events_first.user_id
|
||||
FROM
|
||||
raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.user_id
|
||||
WHERE raw_events_second.user_id = 10 OR raw_events_second.user_id = 11;
|
||||
|
||||
INSERT INTO agg_events (user_id)
|
||||
SELECT
|
||||
raw_events_first.user_id
|
||||
FROM
|
||||
raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.user_id
|
||||
WHERE raw_events_first.user_id = 10 AND raw_events_first.user_id = 20;
|
||||
|
||||
INSERT INTO agg_events (user_id)
|
||||
SELECT
|
||||
raw_events_first.user_id
|
||||
FROM
|
||||
raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.user_id
|
||||
WHERE raw_events_first.user_id = 10 AND raw_events_second.user_id = 20;
|
||||
|
||||
INSERT INTO agg_events (user_id)
|
||||
SELECT
|
||||
raw_events_first.user_id
|
||||
FROM
|
||||
raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.user_id
|
||||
WHERE raw_events_first.user_id IN (19, 20, 21);
|
||||
|
||||
INSERT INTO agg_events (user_id)
|
||||
SELECT
|
||||
raw_events_first.user_id
|
||||
FROM
|
||||
raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.user_id
|
||||
WHERE raw_events_second.user_id IN (19, 20, 21);
|
||||
|
||||
-- the following is a very tricky query for Citus
|
||||
-- although we do not support pushing down JOINs on non-partition
|
||||
-- columns here it is safe to push it down given that we're looking for
|
||||
-- a specific value (i.e., value_1 = 12) on the joining column.
|
||||
-- Note that the query always hits the same shard on raw_events_second
|
||||
-- and this query wouldn't have worked if we're to use different worker
|
||||
-- count or shard replication factor
|
||||
INSERT INTO agg_events
|
||||
(user_id)
|
||||
SELECT raw_events_first.user_id
|
||||
FROM raw_events_first,
|
||||
raw_events_second
|
||||
WHERE raw_events_second.user_id = raw_events_first.value_1
|
||||
AND raw_events_first.value_1 = 12;
|
||||
|
||||
-- some unsupported LEFT/INNER JOINs
|
||||
-- JOIN on one table with partition column other is not
|
||||
INSERT INTO agg_events (user_id)
|
||||
SELECT
|
||||
raw_events_first.user_id
|
||||
FROM
|
||||
raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1;
|
||||
|
||||
-- same as the above with INNER JOIN
|
||||
INSERT INTO agg_events (user_id)
|
||||
SELECT
|
||||
raw_events_first.user_id
|
||||
FROM
|
||||
raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1;
|
||||
|
||||
-- a not meaningful query
|
||||
INSERT INTO agg_events
|
||||
(user_id)
|
||||
SELECT raw_events_second.user_id
|
||||
FROM raw_events_first,
|
||||
raw_events_second
|
||||
WHERE raw_events_first.user_id = raw_events_first.value_1;
|
||||
|
||||
-- both tables joined on non-partition columns
|
||||
INSERT INTO agg_events (user_id)
|
||||
SELECT
|
||||
raw_events_first.user_id
|
||||
FROM
|
||||
raw_events_first LEFT JOIN raw_events_second ON raw_events_first.value_1 = raw_events_second.value_1;
|
||||
|
||||
-- same as the above with INNER JOIN
|
||||
INSERT INTO agg_events (user_id)
|
||||
SELECT
|
||||
raw_events_first.user_id
|
||||
FROM
|
||||
raw_events_first INNER JOIN raw_events_second ON raw_events_first.value_1 = raw_events_second.value_1;
|
||||
|
||||
-- even if there is a filter on the partition key, since the join is not on the partition key we reject
|
||||
-- this query
|
||||
INSERT INTO agg_events (user_id)
|
||||
SELECT
|
||||
raw_events_first.user_id
|
||||
FROM
|
||||
raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1
|
||||
WHERE
|
||||
raw_events_first.user_id = 10;
|
||||
|
||||
-- same as the above with INNER JOIN
|
||||
INSERT INTO agg_events (user_id)
|
||||
SELECT
|
||||
raw_events_first.user_id
|
||||
FROM
|
||||
raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1
|
||||
WHERE raw_events_first.user_id = 10;
|
||||
|
||||
-- make things a bit more complicate with IN clauses
|
||||
INSERT INTO agg_events (user_id)
|
||||
SELECT
|
||||
raw_events_first.user_id
|
||||
FROM
|
||||
raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1
|
||||
WHERE raw_events_first.value_1 IN (10, 11,12) OR raw_events_second.user_id IN (1,2,3,4);
|
||||
|
||||
-- implicit join on non partition column should also not be pushed down
|
||||
INSERT INTO agg_events
|
||||
(user_id)
|
||||
SELECT raw_events_first.user_id
|
||||
FROM raw_events_first,
|
||||
raw_events_second
|
||||
WHERE raw_events_second.user_id = raw_events_first.value_1;
|
||||
|
||||
-- the following is again a tricky query for Citus
|
||||
-- if the given filter was on value_1 as shown in the above, Citus could
|
||||
-- push it down. But here the query is refused
|
||||
INSERT INTO agg_events
|
||||
(user_id)
|
||||
SELECT raw_events_first.user_id
|
||||
FROM raw_events_first,
|
||||
raw_events_second
|
||||
WHERE raw_events_second.user_id = raw_events_first.value_1
|
||||
AND raw_events_first.value_2 = 12;
|
||||
|
||||
-- lets do some unsupported query tests with subqueries
|
||||
-- foo is not joined on the partition key so the query is not
|
||||
-- pushed down
|
||||
INSERT INTO agg_events
|
||||
(user_id, value_4_agg)
|
||||
SELECT
|
||||
outer_most.id, max(outer_most.value)
|
||||
FROM
|
||||
(
|
||||
SELECT f2.id as id, f2.v4 as value FROM
|
||||
(SELECT
|
||||
id
|
||||
FROM (SELECT reference_table.user_id AS id
|
||||
FROM raw_events_first LEFT JOIN
|
||||
reference_table
|
||||
ON (raw_events_first.value_1 = reference_table.user_id)) AS foo) as f
|
||||
INNER JOIN
|
||||
(SELECT v4,
|
||||
v1,
|
||||
id
|
||||
FROM (SELECT SUM(raw_events_second.value_4) AS v4,
|
||||
SUM(raw_events_first.value_1) AS v1,
|
||||
raw_events_second.user_id AS id
|
||||
FROM raw_events_first,
|
||||
raw_events_second
|
||||
WHERE raw_events_first.user_id = raw_events_second.user_id
|
||||
GROUP BY raw_events_second.user_id
|
||||
HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2
|
||||
ON (f.id = f2.id)) as outer_most
|
||||
GROUP BY
|
||||
outer_most.id;
|
||||
|
||||
|
||||
INSERT INTO agg_events
|
||||
(value_4_agg,
|
||||
value_1_agg,
|
||||
|
@ -619,7 +854,8 @@ ON (f.id = f2.id);
|
|||
|
||||
|
||||
-- the second part of the query is not routable since
|
||||
-- no GROUP BY on the partition column
|
||||
-- GROUP BY not on the partition column (i.e., value_1) and thus join
|
||||
-- on f.id = f2.id is not on the partition key (instead on the sum of partition key)
|
||||
INSERT INTO agg_events
|
||||
(user_id)
|
||||
SELECT f.id FROM
|
||||
|
@ -672,6 +908,163 @@ outer_most.id, max(outer_most.value)
|
|||
ON (f.id != f2.id)) as outer_most
|
||||
GROUP BY outer_most.id;
|
||||
|
||||
|
||||
-- cannot pushdown since foo2 is not join on partition key
|
||||
INSERT INTO agg_events
|
||||
(user_id, value_4_agg)
|
||||
SELECT
|
||||
outer_most.id, max(outer_most.value)
|
||||
FROM
|
||||
(
|
||||
SELECT f2.id as id, f2.v4 as value FROM
|
||||
(SELECT
|
||||
id
|
||||
FROM (SELECT reference_table.user_id AS id
|
||||
FROM raw_events_first,
|
||||
reference_table
|
||||
WHERE raw_events_first.user_id = reference_table.user_id ) AS foo) as f
|
||||
INNER JOIN
|
||||
(SELECT v4,
|
||||
v1,
|
||||
id
|
||||
FROM (SELECT SUM(raw_events_second.value_4) AS v4,
|
||||
SUM(raw_events_first.value_1) AS v1,
|
||||
raw_events_second.user_id AS id
|
||||
FROM raw_events_first,
|
||||
raw_events_second
|
||||
WHERE raw_events_first.user_id = raw_events_second.value_1
|
||||
GROUP BY raw_events_second.user_id
|
||||
HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2
|
||||
ON (f.id = f2.id)) as outer_most
|
||||
GROUP BY
|
||||
outer_most.id;
|
||||
|
||||
-- cannot push down since foo doesn't have en equi join
|
||||
INSERT INTO agg_events
|
||||
(user_id, value_4_agg)
|
||||
SELECT
|
||||
outer_most.id, max(outer_most.value)
|
||||
FROM
|
||||
(
|
||||
SELECT f2.id as id, f2.v4 as value FROM
|
||||
(SELECT
|
||||
id
|
||||
FROM (SELECT reference_table.user_id AS id
|
||||
FROM raw_events_first,
|
||||
reference_table
|
||||
WHERE raw_events_first.user_id != reference_table.user_id ) AS foo) as f
|
||||
INNER JOIN
|
||||
(SELECT v4,
|
||||
v1,
|
||||
id
|
||||
FROM (SELECT SUM(raw_events_second.value_4) AS v4,
|
||||
SUM(raw_events_first.value_1) AS v1,
|
||||
raw_events_second.user_id AS id
|
||||
FROM raw_events_first,
|
||||
raw_events_second
|
||||
WHERE raw_events_first.user_id = raw_events_second.user_id
|
||||
GROUP BY raw_events_second.user_id
|
||||
HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2
|
||||
ON (f.id = f2.id)) as outer_most
|
||||
GROUP BY
|
||||
outer_most.id;
|
||||
|
||||
|
||||
-- some unsupported LATERAL JOINs
|
||||
-- join on averages is not on the partition key
|
||||
INSERT INTO agg_events (user_id, value_4_agg)
|
||||
SELECT
|
||||
averages.user_id, avg(averages.value_4)
|
||||
FROM
|
||||
(SELECT
|
||||
raw_events_second.user_id
|
||||
FROM
|
||||
reference_table JOIN raw_events_second on (reference_table.user_id = raw_events_second.user_id)
|
||||
) reference_ids
|
||||
JOIN LATERAL
|
||||
(SELECT
|
||||
user_id, value_4
|
||||
FROM
|
||||
raw_events_first WHERE
|
||||
value_4 = reference_ids.user_id) as averages ON true
|
||||
GROUP BY averages.user_id;
|
||||
|
||||
-- join among reference_ids and averages is not on the partition key
|
||||
INSERT INTO agg_events (user_id, value_4_agg)
|
||||
SELECT
|
||||
averages.user_id, avg(averages.value_4)
|
||||
FROM
|
||||
(SELECT
|
||||
raw_events_second.user_id
|
||||
FROM
|
||||
reference_table JOIN raw_events_second on (reference_table.user_id = raw_events_second.user_id)
|
||||
) reference_ids
|
||||
JOIN LATERAL
|
||||
(SELECT
|
||||
user_id, value_4
|
||||
FROM
|
||||
raw_events_first) as averages ON averages.value_4 = reference_ids.user_id
|
||||
GROUP BY averages.user_id;
|
||||
|
||||
-- join among the agg_ids and averages is not on the partition key
|
||||
INSERT INTO agg_events (user_id, value_4_agg)
|
||||
SELECT
|
||||
averages.user_id, avg(averages.value_4)
|
||||
FROM
|
||||
(SELECT
|
||||
raw_events_second.user_id
|
||||
FROM
|
||||
reference_table JOIN raw_events_second on (reference_table.user_id = raw_events_second.user_id)
|
||||
) reference_ids
|
||||
JOIN LATERAL
|
||||
(SELECT
|
||||
user_id, value_4
|
||||
FROM
|
||||
raw_events_first) as averages ON averages.user_id = reference_ids.user_id
|
||||
JOIN LATERAL
|
||||
(SELECT user_id, value_4 FROM agg_events) as agg_ids ON (agg_ids.value_4 = averages.user_id)
|
||||
GROUP BY averages.user_id;
|
||||
|
||||
-- not supported subqueries in WHERE clause
|
||||
-- since the selected value in the WHERE is not
|
||||
-- partition key
|
||||
INSERT INTO raw_events_second
|
||||
(user_id)
|
||||
SELECT user_id
|
||||
FROM raw_events_first
|
||||
WHERE user_id IN (SELECT value_1
|
||||
FROM raw_events_second);
|
||||
|
||||
-- same as above but slightly more complex
|
||||
-- since it also includes subquery in FROM as well
|
||||
INSERT INTO agg_events
|
||||
(user_id)
|
||||
SELECT f2.id FROM
|
||||
|
||||
(SELECT
|
||||
id
|
||||
FROM (SELECT reference_table.user_id AS id
|
||||
FROM raw_events_first,
|
||||
reference_table
|
||||
WHERE raw_events_first.user_id = reference_table.user_id ) AS foo) as f
|
||||
INNER JOIN
|
||||
(SELECT v4,
|
||||
v1,
|
||||
id
|
||||
FROM (SELECT SUM(raw_events_second.value_4) AS v4,
|
||||
SUM(raw_events_first.value_1) AS v1,
|
||||
raw_events_second.user_id AS id
|
||||
FROM raw_events_first,
|
||||
raw_events_second
|
||||
WHERE raw_events_first.user_id = raw_events_second.user_id
|
||||
GROUP BY raw_events_second.user_id
|
||||
HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2
|
||||
ON (f.id = f2.id)
|
||||
WHERE f.id IN (SELECT value_1
|
||||
FROM raw_events_second);
|
||||
|
||||
|
||||
|
||||
-- we currently not support grouping sets
|
||||
INSERT INTO agg_events
|
||||
(user_id,
|
||||
|
|
Loading…
Reference in New Issue