mirror of https://github.com/citusdata/citus.git
Give consistent error messages for unsupported JOINs on INSERT ... SELECT
Ensure that all tables has the uninstantiated quals before planning of the distributed query starts. If not, give a meaningful error.pull/1250/head
parent
fd8df25daf
commit
8effb3073f
|
@ -127,7 +127,11 @@ static Node * InstantiatePartitionQualWalker(Node *node, void *context);
|
||||||
static DeferredErrorMessage * InsertSelectQuerySupported(Query *queryTree,
|
static DeferredErrorMessage * InsertSelectQuerySupported(Query *queryTree,
|
||||||
RangeTblEntry *insertRte,
|
RangeTblEntry *insertRte,
|
||||||
RangeTblEntry *subqueryRte,
|
RangeTblEntry *subqueryRte,
|
||||||
bool allReferenceTables);
|
RelationRestrictionContext *
|
||||||
|
restrictionContext);
|
||||||
|
static bool AllRelationRestrictionsContainUninstantiatedQual(RelationRestrictionContext
|
||||||
|
*restrictionContext);
|
||||||
|
static bool HasUninstantiatedQualWalker(Node *node, void *context);
|
||||||
static DeferredErrorMessage * MultiTaskRouterSelectQuerySupported(Query *query);
|
static DeferredErrorMessage * MultiTaskRouterSelectQuerySupported(Query *query);
|
||||||
static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query,
|
static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query,
|
||||||
RangeTblEntry *insertRte,
|
RangeTblEntry *insertRte,
|
||||||
|
@ -270,7 +274,6 @@ CreateInsertSelectRouterPlan(Query *originalQuery,
|
||||||
Oid targetRelationId = insertRte->relid;
|
Oid targetRelationId = insertRte->relid;
|
||||||
DistTableCacheEntry *targetCacheEntry = DistributedTableCacheEntry(targetRelationId);
|
DistTableCacheEntry *targetCacheEntry = DistributedTableCacheEntry(targetRelationId);
|
||||||
int shardCount = targetCacheEntry->shardIntervalArrayLength;
|
int shardCount = targetCacheEntry->shardIntervalArrayLength;
|
||||||
bool allReferenceTables = restrictionContext->allReferenceTables;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Error semantics for INSERT ... SELECT queries are different than regular
|
* Error semantics for INSERT ... SELECT queries are different than regular
|
||||||
|
@ -278,7 +281,7 @@ CreateInsertSelectRouterPlan(Query *originalQuery,
|
||||||
*/
|
*/
|
||||||
multiPlan->planningError = InsertSelectQuerySupported(originalQuery, insertRte,
|
multiPlan->planningError = InsertSelectQuerySupported(originalQuery, insertRte,
|
||||||
subqueryRte,
|
subqueryRte,
|
||||||
allReferenceTables);
|
restrictionContext);
|
||||||
if (multiPlan->planningError)
|
if (multiPlan->planningError)
|
||||||
{
|
{
|
||||||
return multiPlan;
|
return multiPlan;
|
||||||
|
@ -671,7 +674,8 @@ ExtractInsertRangeTableEntry(Query *query)
|
||||||
*/
|
*/
|
||||||
static DeferredErrorMessage *
|
static DeferredErrorMessage *
|
||||||
InsertSelectQuerySupported(Query *queryTree, RangeTblEntry *insertRte,
|
InsertSelectQuerySupported(Query *queryTree, RangeTblEntry *insertRte,
|
||||||
RangeTblEntry *subqueryRte, bool allReferenceTables)
|
RangeTblEntry *subqueryRte,
|
||||||
|
RelationRestrictionContext *restrictionContext)
|
||||||
{
|
{
|
||||||
Query *subquery = NULL;
|
Query *subquery = NULL;
|
||||||
Oid selectPartitionColumnTableId = InvalidOid;
|
Oid selectPartitionColumnTableId = InvalidOid;
|
||||||
|
@ -679,6 +683,7 @@ InsertSelectQuerySupported(Query *queryTree, RangeTblEntry *insertRte,
|
||||||
char targetPartitionMethod = PartitionMethod(targetRelationId);
|
char targetPartitionMethod = PartitionMethod(targetRelationId);
|
||||||
ListCell *rangeTableCell = NULL;
|
ListCell *rangeTableCell = NULL;
|
||||||
DeferredErrorMessage *error = NULL;
|
DeferredErrorMessage *error = NULL;
|
||||||
|
bool allReferenceTables = restrictionContext->allReferenceTables;
|
||||||
|
|
||||||
/* we only do this check for INSERT ... SELECT queries */
|
/* we only do this check for INSERT ... SELECT queries */
|
||||||
AssertArg(InsertSelectQuery(queryTree));
|
AssertArg(InsertSelectQuery(queryTree));
|
||||||
|
@ -755,10 +760,106 @@ InsertSelectQuerySupported(Query *queryTree, RangeTblEntry *insertRte,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
if (!AllRelationRestrictionsContainUninstantiatedQual(restrictionContext))
|
||||||
|
{
|
||||||
|
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||||
|
"cannot plan distributed query since all join conditions in the query "
|
||||||
|
"need include two distribution keys using an equality operator",
|
||||||
|
NULL, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* AllRelationRestrictionsContainUninstantiatedQual iterates over the relation
|
||||||
|
* restrictions and returns true if the qual is distributed to all relations.
|
||||||
|
* Otherwise returns false. Reference tables are ignored during the iteration
|
||||||
|
* given that they wouldn't need to have the qual in any case.
|
||||||
|
*
|
||||||
|
* Also, if any relation restriction contains a false clause, the relation is
|
||||||
|
* ignored since its restrictions are removed by postgres.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
AllRelationRestrictionsContainUninstantiatedQual(
|
||||||
|
RelationRestrictionContext *restrictionContext)
|
||||||
|
{
|
||||||
|
ListCell *relationRestrictionCell = NULL;
|
||||||
|
bool allRelationsHaveTheQual = true;
|
||||||
|
|
||||||
|
foreach(relationRestrictionCell, restrictionContext->relationRestrictionList)
|
||||||
|
{
|
||||||
|
RelationRestriction *restriction = lfirst(relationRestrictionCell);
|
||||||
|
|
||||||
|
List *baseRestrictInfo = list_copy(restriction->relOptInfo->baserestrictinfo);
|
||||||
|
List *joinInfo = list_copy(restriction->relOptInfo->joininfo);
|
||||||
|
List *allRestrictions = list_concat(baseRestrictInfo, joinInfo);
|
||||||
|
ListCell *restrictionCell = NULL;
|
||||||
|
bool relationHasRestriction = false;
|
||||||
|
|
||||||
|
if (ContainsFalseClause(extract_actual_clauses(allRestrictions, true)))
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* we don't need to check existince of qual for reference tables */
|
||||||
|
if (PartitionMethod(restriction->relationId) == DISTRIBUTE_BY_NONE)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
foreach(restrictionCell, allRestrictions)
|
||||||
|
{
|
||||||
|
RestrictInfo *restrictInfo = (RestrictInfo *) lfirst(restrictionCell);
|
||||||
|
|
||||||
|
relationHasRestriction = relationHasRestriction ||
|
||||||
|
HasUninstantiatedQualWalker(
|
||||||
|
(Node *) restrictInfo->clause,
|
||||||
|
NULL);
|
||||||
|
|
||||||
|
if (relationHasRestriction)
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
allRelationsHaveTheQual = allRelationsHaveTheQual && relationHasRestriction;
|
||||||
|
}
|
||||||
|
|
||||||
|
return allRelationsHaveTheQual;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* HasUninstantiatedQualWalker returns true if the given expression
|
||||||
|
* constains a parameter with UNINSTANTIATED_PARAMETER_ID.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
HasUninstantiatedQualWalker(Node *node, void *context)
|
||||||
|
{
|
||||||
|
Param *param = NULL;
|
||||||
|
|
||||||
|
if (node == NULL)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (IsA(node, Param))
|
||||||
|
{
|
||||||
|
param = (Param *) node;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (param && param->paramid == UNINSTANTIATED_PARAMETER_ID)
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return expression_tree_walker(node, HasUninstantiatedQualWalker, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* MultiTaskRouterSelectQuerySupported returns NULL if the query may be used
|
* MultiTaskRouterSelectQuerySupported returns NULL if the query may be used
|
||||||
* as the source for an INSERT ... SELECT or returns a description why not.
|
* as the source for an INSERT ... SELECT or returns a description why not.
|
||||||
|
|
|
@ -1213,11 +1213,7 @@ DEBUG: predicate pruning for shardId 13300005
|
||||||
DEBUG: predicate pruning for shardId 13300006
|
DEBUG: predicate pruning for shardId 13300006
|
||||||
DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300003 raw_events_first JOIN public.raw_events_second_13300007 raw_events_second ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((raw_events_second.user_id = ANY (ARRAY[19, 20, 21])) AND ((hashint4(raw_events_first.user_id) >= 1073741824) AND (hashint4(raw_events_first.user_id) <= 2147483647)))
|
DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300003 raw_events_first JOIN public.raw_events_second_13300007 raw_events_second ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((raw_events_second.user_id = ANY (ARRAY[19, 20, 21])) AND ((hashint4(raw_events_first.user_id) >= 1073741824) AND (hashint4(raw_events_first.user_id) <= 2147483647)))
|
||||||
DEBUG: Plan is router executable
|
DEBUG: Plan is router executable
|
||||||
-- the following is a very tricky query for Citus
|
-- not supported given that the join is not on the partition column
|
||||||
-- although we do not support pushing down JOINs on non-partition
|
|
||||||
-- columns here it is safe to push it down given that we're looking for
|
|
||||||
-- a specific value (i.e., value_1 = 12) on the joining column.
|
|
||||||
-- Note that the query always hits the same shard on raw_events_second
|
|
||||||
INSERT INTO agg_events
|
INSERT INTO agg_events
|
||||||
(user_id)
|
(user_id)
|
||||||
SELECT raw_events_first.user_id
|
SELECT raw_events_first.user_id
|
||||||
|
@ -1225,35 +1221,7 @@ FROM raw_events_first,
|
||||||
raw_events_second
|
raw_events_second
|
||||||
WHERE raw_events_second.user_id = raw_events_first.value_1
|
WHERE raw_events_second.user_id = raw_events_first.value_1
|
||||||
AND raw_events_first.value_1 = 12;
|
AND raw_events_first.value_1 = 12;
|
||||||
DEBUG: predicate pruning for shardId 13300001
|
ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator
|
||||||
DEBUG: predicate pruning for shardId 13300002
|
|
||||||
DEBUG: predicate pruning for shardId 13300003
|
|
||||||
DEBUG: predicate pruning for shardId 13300004
|
|
||||||
DEBUG: predicate pruning for shardId 13300005
|
|
||||||
DEBUG: predicate pruning for shardId 13300006
|
|
||||||
DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM public.raw_events_first_13300000 raw_events_first, public.raw_events_second_13300007 raw_events_second WHERE (((raw_events_second.user_id = raw_events_first.value_1) AND (raw_events_first.value_1 = 12)) AND ((hashint4(raw_events_first.user_id) >= '-2147483648'::integer) AND (hashint4(raw_events_first.user_id) <= '-1073741825'::integer)))
|
|
||||||
DEBUG: predicate pruning for shardId 13300000
|
|
||||||
DEBUG: predicate pruning for shardId 13300002
|
|
||||||
DEBUG: predicate pruning for shardId 13300003
|
|
||||||
DEBUG: predicate pruning for shardId 13300004
|
|
||||||
DEBUG: predicate pruning for shardId 13300005
|
|
||||||
DEBUG: predicate pruning for shardId 13300006
|
|
||||||
DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM public.raw_events_first_13300001 raw_events_first, public.raw_events_second_13300007 raw_events_second WHERE (((raw_events_second.user_id = raw_events_first.value_1) AND (raw_events_first.value_1 = 12)) AND ((hashint4(raw_events_first.user_id) >= '-1073741824'::integer) AND (hashint4(raw_events_first.user_id) <= '-1'::integer)))
|
|
||||||
DEBUG: predicate pruning for shardId 13300000
|
|
||||||
DEBUG: predicate pruning for shardId 13300001
|
|
||||||
DEBUG: predicate pruning for shardId 13300003
|
|
||||||
DEBUG: predicate pruning for shardId 13300004
|
|
||||||
DEBUG: predicate pruning for shardId 13300005
|
|
||||||
DEBUG: predicate pruning for shardId 13300006
|
|
||||||
DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM public.raw_events_first_13300002 raw_events_first, public.raw_events_second_13300007 raw_events_second WHERE (((raw_events_second.user_id = raw_events_first.value_1) AND (raw_events_first.value_1 = 12)) AND ((hashint4(raw_events_first.user_id) >= 0) AND (hashint4(raw_events_first.user_id) <= 1073741823)))
|
|
||||||
DEBUG: predicate pruning for shardId 13300000
|
|
||||||
DEBUG: predicate pruning for shardId 13300001
|
|
||||||
DEBUG: predicate pruning for shardId 13300002
|
|
||||||
DEBUG: predicate pruning for shardId 13300004
|
|
||||||
DEBUG: predicate pruning for shardId 13300005
|
|
||||||
DEBUG: predicate pruning for shardId 13300006
|
|
||||||
DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM public.raw_events_first_13300003 raw_events_first, public.raw_events_second_13300007 raw_events_second WHERE (((raw_events_second.user_id = raw_events_first.value_1) AND (raw_events_first.value_1 = 12)) AND ((hashint4(raw_events_first.user_id) >= 1073741824) AND (hashint4(raw_events_first.user_id) <= 2147483647)))
|
|
||||||
DEBUG: Plan is router executable
|
|
||||||
-- some unsupported LEFT/INNER JOINs
|
-- some unsupported LEFT/INNER JOINs
|
||||||
-- JOIN on one table with partition column other is not
|
-- JOIN on one table with partition column other is not
|
||||||
INSERT INTO agg_events (user_id)
|
INSERT INTO agg_events (user_id)
|
||||||
|
@ -1284,19 +1252,14 @@ SELECT raw_events_second.user_id
|
||||||
FROM raw_events_first,
|
FROM raw_events_first,
|
||||||
raw_events_second
|
raw_events_second
|
||||||
WHERE raw_events_first.user_id = raw_events_first.value_1;
|
WHERE raw_events_first.user_id = raw_events_first.value_1;
|
||||||
ERROR: cannot perform distributed planning for the given modification
|
ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator
|
||||||
DETAIL: Select query cannot be pushed down to the worker.
|
|
||||||
-- both tables joined on non-partition columns
|
-- both tables joined on non-partition columns
|
||||||
INSERT INTO agg_events (user_id)
|
INSERT INTO agg_events (user_id)
|
||||||
SELECT
|
SELECT
|
||||||
raw_events_first.user_id
|
raw_events_first.user_id
|
||||||
FROM
|
FROM
|
||||||
raw_events_first LEFT JOIN raw_events_second ON raw_events_first.value_1 = raw_events_second.value_1;
|
raw_events_first LEFT JOIN raw_events_second ON raw_events_first.value_1 = raw_events_second.value_1;
|
||||||
DEBUG: predicate pruning for shardId 13300001
|
ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator
|
||||||
DEBUG: predicate pruning for shardId 13300002
|
|
||||||
DEBUG: predicate pruning for shardId 13300003
|
|
||||||
ERROR: cannot perform distributed planning for the given modification
|
|
||||||
DETAIL: Select query cannot be pushed down to the worker.
|
|
||||||
-- same as the above with INNER JOIN
|
-- same as the above with INNER JOIN
|
||||||
INSERT INTO agg_events (user_id)
|
INSERT INTO agg_events (user_id)
|
||||||
SELECT
|
SELECT
|
||||||
|
@ -1383,11 +1346,7 @@ SELECT raw_events_first.user_id
|
||||||
FROM raw_events_first,
|
FROM raw_events_first,
|
||||||
raw_events_second
|
raw_events_second
|
||||||
WHERE raw_events_second.user_id = raw_events_first.value_1;
|
WHERE raw_events_second.user_id = raw_events_first.value_1;
|
||||||
DEBUG: predicate pruning for shardId 13300001
|
ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator
|
||||||
DEBUG: predicate pruning for shardId 13300002
|
|
||||||
DEBUG: predicate pruning for shardId 13300003
|
|
||||||
ERROR: cannot perform distributed planning for the given modification
|
|
||||||
DETAIL: Select query cannot be pushed down to the worker.
|
|
||||||
-- the following is again a very tricky query for Citus
|
-- the following is again a very tricky query for Citus
|
||||||
-- if the given filter was on value_1 as shown in the above, Citus could
|
-- if the given filter was on value_1 as shown in the above, Citus could
|
||||||
-- push it down. But here the query is refused
|
-- push it down. But here the query is refused
|
||||||
|
@ -1398,11 +1357,7 @@ FROM raw_events_first,
|
||||||
raw_events_second
|
raw_events_second
|
||||||
WHERE raw_events_second.user_id = raw_events_first.value_1
|
WHERE raw_events_second.user_id = raw_events_first.value_1
|
||||||
AND raw_events_first.value_2 = 12;
|
AND raw_events_first.value_2 = 12;
|
||||||
DEBUG: predicate pruning for shardId 13300001
|
ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator
|
||||||
DEBUG: predicate pruning for shardId 13300002
|
|
||||||
DEBUG: predicate pruning for shardId 13300003
|
|
||||||
ERROR: cannot perform distributed planning for the given modification
|
|
||||||
DETAIL: Select query cannot be pushed down to the worker.
|
|
||||||
-- foo is not joined on the partition key so the query is not
|
-- foo is not joined on the partition key so the query is not
|
||||||
-- pushed down
|
-- pushed down
|
||||||
INSERT INTO agg_events
|
INSERT INTO agg_events
|
||||||
|
@ -1456,8 +1411,7 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4,
|
||||||
raw_events_second
|
raw_events_second
|
||||||
WHERE raw_events_first.user_id != raw_events_second.user_id
|
WHERE raw_events_first.user_id != raw_events_second.user_id
|
||||||
GROUP BY raw_events_second.user_id) AS foo;
|
GROUP BY raw_events_second.user_id) AS foo;
|
||||||
ERROR: cannot perform distributed planning for the given modification
|
ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator
|
||||||
DETAIL: Select query cannot be pushed down to the worker.
|
|
||||||
-- INSERT partition column does not match with SELECT partition column
|
-- INSERT partition column does not match with SELECT partition column
|
||||||
INSERT INTO agg_events
|
INSERT INTO agg_events
|
||||||
(value_4_agg,
|
(value_4_agg,
|
||||||
|
@ -1600,8 +1554,7 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4,
|
||||||
GROUP BY raw_events_second.value_1
|
GROUP BY raw_events_second.value_1
|
||||||
HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2
|
HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2
|
||||||
ON (f.id = f2.id);
|
ON (f.id = f2.id);
|
||||||
ERROR: cannot perform distributed planning for the given modification
|
ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator
|
||||||
DETAIL: Select query cannot be pushed down to the worker.
|
|
||||||
-- cannot pushdown the query since the JOIN is not equi JOIN
|
-- cannot pushdown the query since the JOIN is not equi JOIN
|
||||||
INSERT INTO agg_events
|
INSERT INTO agg_events
|
||||||
(user_id, value_4_agg)
|
(user_id, value_4_agg)
|
||||||
|
@ -1630,14 +1583,59 @@ outer_most.id, max(outer_most.value)
|
||||||
HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2
|
HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2
|
||||||
ON (f.id != f2.id)) as outer_most
|
ON (f.id != f2.id)) as outer_most
|
||||||
GROUP BY outer_most.id;
|
GROUP BY outer_most.id;
|
||||||
DEBUG: predicate pruning for shardId 13300001
|
ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator
|
||||||
DEBUG: predicate pruning for shardId 13300002
|
-- some unsupported LATERAL JOINs
|
||||||
DEBUG: predicate pruning for shardId 13300003
|
INSERT INTO agg_events (user_id, value_4_agg)
|
||||||
DEBUG: predicate pruning for shardId 13300005
|
SELECT
|
||||||
DEBUG: predicate pruning for shardId 13300006
|
averages.user_id, avg(averages.value_4)
|
||||||
DEBUG: predicate pruning for shardId 13300007
|
FROM
|
||||||
ERROR: cannot perform distributed planning for the given modification
|
(SELECT
|
||||||
DETAIL: Select query cannot be pushed down to the worker.
|
raw_events_second.user_id
|
||||||
|
FROM
|
||||||
|
reference_table JOIN raw_events_second on (reference_table.user_id = raw_events_second.user_id)
|
||||||
|
) reference_ids
|
||||||
|
JOIN LATERAL
|
||||||
|
(SELECT
|
||||||
|
user_id, value_4
|
||||||
|
FROM
|
||||||
|
raw_events_first WHERE
|
||||||
|
value_4 = reference_ids.user_id) as averages ON true
|
||||||
|
GROUP BY averages.user_id;
|
||||||
|
ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator
|
||||||
|
INSERT INTO agg_events (user_id, value_4_agg)
|
||||||
|
SELECT
|
||||||
|
averages.user_id, avg(averages.value_4)
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
raw_events_second.user_id
|
||||||
|
FROM
|
||||||
|
reference_table JOIN raw_events_second on (reference_table.user_id = raw_events_second.user_id)
|
||||||
|
) reference_ids
|
||||||
|
JOIN LATERAL
|
||||||
|
(SELECT
|
||||||
|
user_id, value_4
|
||||||
|
FROM
|
||||||
|
raw_events_first) as averages ON averages.value_4 = reference_ids.user_id
|
||||||
|
GROUP BY averages.user_id;
|
||||||
|
ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator
|
||||||
|
INSERT INTO agg_events (user_id, value_4_agg)
|
||||||
|
SELECT
|
||||||
|
averages.user_id, avg(averages.value_4)
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
raw_events_second.user_id
|
||||||
|
FROM
|
||||||
|
reference_table JOIN raw_events_second on (reference_table.user_id = raw_events_second.user_id)
|
||||||
|
) reference_ids
|
||||||
|
JOIN LATERAL
|
||||||
|
(SELECT
|
||||||
|
user_id, value_4
|
||||||
|
FROM
|
||||||
|
raw_events_first) as averages ON averages.user_id = reference_ids.user_id
|
||||||
|
JOIN LATERAL
|
||||||
|
(SELECT user_id, value_4 FROM agg_events WHERE user_id = 15) as agg_ids ON (agg_ids.value_4 = averages.user_id)
|
||||||
|
GROUP BY averages.user_id;
|
||||||
|
ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator
|
||||||
-- cannot pushdown since subquery returns another column than partition key
|
-- cannot pushdown since subquery returns another column than partition key
|
||||||
INSERT INTO raw_events_second
|
INSERT INTO raw_events_second
|
||||||
(user_id)
|
(user_id)
|
||||||
|
|
|
@ -551,11 +551,7 @@ FROM
|
||||||
raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.user_id
|
raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.user_id
|
||||||
WHERE raw_events_second.user_id IN (19, 20, 21);
|
WHERE raw_events_second.user_id IN (19, 20, 21);
|
||||||
|
|
||||||
-- the following is a very tricky query for Citus
|
-- not supported given that the join is not on the partition column
|
||||||
-- although we do not support pushing down JOINs on non-partition
|
|
||||||
-- columns here it is safe to push it down given that we're looking for
|
|
||||||
-- a specific value (i.e., value_1 = 12) on the joining column.
|
|
||||||
-- Note that the query always hits the same shard on raw_events_second
|
|
||||||
INSERT INTO agg_events
|
INSERT INTO agg_events
|
||||||
(user_id)
|
(user_id)
|
||||||
SELECT raw_events_first.user_id
|
SELECT raw_events_first.user_id
|
||||||
|
@ -859,6 +855,60 @@ outer_most.id, max(outer_most.value)
|
||||||
ON (f.id != f2.id)) as outer_most
|
ON (f.id != f2.id)) as outer_most
|
||||||
GROUP BY outer_most.id;
|
GROUP BY outer_most.id;
|
||||||
|
|
||||||
|
-- some unsupported LATERAL JOINs
|
||||||
|
INSERT INTO agg_events (user_id, value_4_agg)
|
||||||
|
SELECT
|
||||||
|
averages.user_id, avg(averages.value_4)
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
raw_events_second.user_id
|
||||||
|
FROM
|
||||||
|
reference_table JOIN raw_events_second on (reference_table.user_id = raw_events_second.user_id)
|
||||||
|
) reference_ids
|
||||||
|
JOIN LATERAL
|
||||||
|
(SELECT
|
||||||
|
user_id, value_4
|
||||||
|
FROM
|
||||||
|
raw_events_first WHERE
|
||||||
|
value_4 = reference_ids.user_id) as averages ON true
|
||||||
|
GROUP BY averages.user_id;
|
||||||
|
|
||||||
|
|
||||||
|
INSERT INTO agg_events (user_id, value_4_agg)
|
||||||
|
SELECT
|
||||||
|
averages.user_id, avg(averages.value_4)
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
raw_events_second.user_id
|
||||||
|
FROM
|
||||||
|
reference_table JOIN raw_events_second on (reference_table.user_id = raw_events_second.user_id)
|
||||||
|
) reference_ids
|
||||||
|
JOIN LATERAL
|
||||||
|
(SELECT
|
||||||
|
user_id, value_4
|
||||||
|
FROM
|
||||||
|
raw_events_first) as averages ON averages.value_4 = reference_ids.user_id
|
||||||
|
GROUP BY averages.user_id;
|
||||||
|
|
||||||
|
|
||||||
|
INSERT INTO agg_events (user_id, value_4_agg)
|
||||||
|
SELECT
|
||||||
|
averages.user_id, avg(averages.value_4)
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
raw_events_second.user_id
|
||||||
|
FROM
|
||||||
|
reference_table JOIN raw_events_second on (reference_table.user_id = raw_events_second.user_id)
|
||||||
|
) reference_ids
|
||||||
|
JOIN LATERAL
|
||||||
|
(SELECT
|
||||||
|
user_id, value_4
|
||||||
|
FROM
|
||||||
|
raw_events_first) as averages ON averages.user_id = reference_ids.user_id
|
||||||
|
JOIN LATERAL
|
||||||
|
(SELECT user_id, value_4 FROM agg_events WHERE user_id = 15) as agg_ids ON (agg_ids.value_4 = averages.user_id)
|
||||||
|
GROUP BY averages.user_id;
|
||||||
|
|
||||||
-- cannot pushdown since subquery returns another column than partition key
|
-- cannot pushdown since subquery returns another column than partition key
|
||||||
INSERT INTO raw_events_second
|
INSERT INTO raw_events_second
|
||||||
(user_id)
|
(user_id)
|
||||||
|
|
Loading…
Reference in New Issue