pull/1250/merge
Önder Kalacı 2017-02-23 08:51:14 +00:00 committed by GitHub
commit 86b8ba9db6
6 changed files with 545 additions and 134 deletions

View File

@ -2108,26 +2108,37 @@ MultiSubqueryPushdownTable(RangeTblEntry *subqueryRangeTableEntry)
/* /*
* OperatorImplementsEquality returns true if the given opno represents an * OperatorImplementsStrategy is a wrapper around OperatorImplementsStrategy using
* equality operator. The function retrieves btree interpretation list for this * BTEqualStrategyNumber as the operatostrategy.
* opno and check if BTEqualStrategyNumber strategy is present.
*/ */
bool bool
OperatorImplementsEquality(Oid opno) OperatorImplementsEquality(Oid opno)
{ {
bool equalityOperator = false; return OperatorImplementsStrategy(opno, BTEqualStrategyNumber);
}
/*
* OperatorImplementsEquality returns true if the given opno represents the
* operator given strategy. The function retrieves btree interpretation list
* for this opno and check if the strategy is present.
*/
bool
OperatorImplementsStrategy(Oid opno, int strategy)
{
bool operatorImplementsStrategy = false;
List *btreeIntepretationList = get_op_btree_interpretation(opno); List *btreeIntepretationList = get_op_btree_interpretation(opno);
ListCell *btreeInterpretationCell = NULL; ListCell *btreeInterpretationCell = NULL;
foreach(btreeInterpretationCell, btreeIntepretationList) foreach(btreeInterpretationCell, btreeIntepretationList)
{ {
OpBtreeInterpretation *btreeIntepretation = (OpBtreeInterpretation *) OpBtreeInterpretation *btreeIntepretation = (OpBtreeInterpretation *)
lfirst(btreeInterpretationCell); lfirst(btreeInterpretationCell);
if (btreeIntepretation->strategy == BTEqualStrategyNumber) if (btreeIntepretation->strategy == strategy)
{ {
equalityOperator = true; operatorImplementsStrategy = true;
break; break;
} }
} }
return equalityOperator; return operatorImplementsStrategy;
} }

View File

@ -116,6 +116,7 @@ static bool RouterSelectQuery(Query *originalQuery,
static bool RelationPrunesToMultipleShards(List *relationShardList); static bool RelationPrunesToMultipleShards(List *relationShardList);
static List * TargetShardIntervalsForSelect(Query *query, static List * TargetShardIntervalsForSelect(Query *query,
RelationRestrictionContext *restrictionContext); RelationRestrictionContext *restrictionContext);
static List * GetHashedJoinInfoRestrictions(RelOptInfo *relInfo);
static List * WorkersContainingAllShards(List *prunedShardIntervalsList); static List * WorkersContainingAllShards(List *prunedShardIntervalsList);
static List * IntersectPlacementList(List *lhsPlacementList, List *rhsPlacementList); static List * IntersectPlacementList(List *lhsPlacementList, List *rhsPlacementList);
static Job * RouterQueryJob(Query *query, Task *task, List *placementList); static Job * RouterQueryJob(Query *query, Task *task, List *placementList);
@ -127,7 +128,11 @@ static Node * InstantiatePartitionQualWalker(Node *node, void *context);
static DeferredErrorMessage * InsertSelectQuerySupported(Query *queryTree, static DeferredErrorMessage * InsertSelectQuerySupported(Query *queryTree,
RangeTblEntry *insertRte, RangeTblEntry *insertRte,
RangeTblEntry *subqueryRte, RangeTblEntry *subqueryRte,
bool allReferenceTables); RelationRestrictionContext *
restrictionContext);
static bool AllRelationRestrictionsContainUninstantiatedQual(RelationRestrictionContext
*restrictionContext);
static bool HasUninstantiatedQualWalker(Node *node, void *context);
static DeferredErrorMessage * MultiTaskRouterSelectQuerySupported(Query *query); static DeferredErrorMessage * MultiTaskRouterSelectQuerySupported(Query *query);
static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query, static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query,
RangeTblEntry *insertRte, RangeTblEntry *insertRte,
@ -270,7 +275,6 @@ CreateInsertSelectRouterPlan(Query *originalQuery,
Oid targetRelationId = insertRte->relid; Oid targetRelationId = insertRte->relid;
DistTableCacheEntry *targetCacheEntry = DistributedTableCacheEntry(targetRelationId); DistTableCacheEntry *targetCacheEntry = DistributedTableCacheEntry(targetRelationId);
int shardCount = targetCacheEntry->shardIntervalArrayLength; int shardCount = targetCacheEntry->shardIntervalArrayLength;
bool allReferenceTables = restrictionContext->allReferenceTables;
/* /*
* Error semantics for INSERT ... SELECT queries are different than regular * Error semantics for INSERT ... SELECT queries are different than regular
@ -278,7 +282,7 @@ CreateInsertSelectRouterPlan(Query *originalQuery,
*/ */
multiPlan->planningError = InsertSelectQuerySupported(originalQuery, insertRte, multiPlan->planningError = InsertSelectQuerySupported(originalQuery, insertRte,
subqueryRte, subqueryRte,
allReferenceTables); restrictionContext);
if (multiPlan->planningError) if (multiPlan->planningError)
{ {
return multiPlan; return multiPlan;
@ -385,6 +389,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
{ {
RelationRestriction *restriction = lfirst(restrictionCell); RelationRestriction *restriction = lfirst(restrictionCell);
List *originalBaserestrictInfo = restriction->relOptInfo->baserestrictinfo; List *originalBaserestrictInfo = restriction->relOptInfo->baserestrictinfo;
List *originalJoinInfo = restriction->relOptInfo->joininfo;
InstantiateQualContext instantiateQualWalker; InstantiateQualContext instantiateQualWalker;
Var *relationPartitionKey = PartitionKey(restriction->relationId); Var *relationPartitionKey = PartitionKey(restriction->relationId);
@ -403,6 +408,9 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
originalBaserestrictInfo = originalBaserestrictInfo =
(List *) InstantiatePartitionQualWalker((Node *) originalBaserestrictInfo, (List *) InstantiatePartitionQualWalker((Node *) originalBaserestrictInfo,
&instantiateQualWalker); &instantiateQualWalker);
originalJoinInfo =
(List *) InstantiatePartitionQualWalker((Node *) originalJoinInfo,
&instantiateQualWalker);
} }
/* /*
@ -670,7 +678,8 @@ ExtractInsertRangeTableEntry(Query *query)
*/ */
static DeferredErrorMessage * static DeferredErrorMessage *
InsertSelectQuerySupported(Query *queryTree, RangeTblEntry *insertRte, InsertSelectQuerySupported(Query *queryTree, RangeTblEntry *insertRte,
RangeTblEntry *subqueryRte, bool allReferenceTables) RangeTblEntry *subqueryRte,
RelationRestrictionContext *restrictionContext)
{ {
Query *subquery = NULL; Query *subquery = NULL;
Oid selectPartitionColumnTableId = InvalidOid; Oid selectPartitionColumnTableId = InvalidOid;
@ -678,6 +687,7 @@ InsertSelectQuerySupported(Query *queryTree, RangeTblEntry *insertRte,
char targetPartitionMethod = PartitionMethod(targetRelationId); char targetPartitionMethod = PartitionMethod(targetRelationId);
ListCell *rangeTableCell = NULL; ListCell *rangeTableCell = NULL;
DeferredErrorMessage *error = NULL; DeferredErrorMessage *error = NULL;
bool allReferenceTables = restrictionContext->allReferenceTables;
/* we only do this check for INSERT ... SELECT queries */ /* we only do this check for INSERT ... SELECT queries */
AssertArg(InsertSelectQuery(queryTree)); AssertArg(InsertSelectQuery(queryTree));
@ -754,10 +764,160 @@ InsertSelectQuerySupported(Query *queryTree, RangeTblEntry *insertRte,
} }
} }
if (!AllRelationRestrictionsContainUninstantiatedQual(restrictionContext))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"cannot plan distributed query since all join conditions in the query "
"need include two distribution keys using an equality operator",
NULL, NULL);
}
return NULL; return NULL;
} }
/*
* AllRelationRestrictionsContainUninstantiatedQual iterates over the relation
* restrictions and returns true if the qual is distributed to all relations.
* Otherwise returns false. Reference tables are ignored during the iteration
* given that they wouldn't need to have the qual in any case.
*
* Also, if any relation restriction contains a false clause, the relation is
* ignored since its restrictions are removed by postgres.
*/
static bool
AllRelationRestrictionsContainUninstantiatedQual(
RelationRestrictionContext *restrictionContext)
{
ListCell *relationRestrictionCell = NULL;
bool allRelationsHaveTheQual = true;
foreach(relationRestrictionCell, restrictionContext->relationRestrictionList)
{
RelationRestriction *restriction = lfirst(relationRestrictionCell);
List *baseRestrictInfo = list_copy(restriction->relOptInfo->baserestrictinfo);
List *joinInfo = list_copy(restriction->relOptInfo->joininfo);
List *allRestrictions = list_concat(baseRestrictInfo, joinInfo);
ListCell *restrictionCell = NULL;
Var *relationPartitionKey = NULL;
bool relationHasRestriction = false;
if (ContainsFalseClause(extract_actual_clauses(allRestrictions, true)))
{
continue;
}
/* we don't need to check existince of qual for reference tables */
if (PartitionMethod(restriction->relationId) == DISTRIBUTE_BY_NONE)
{
continue;
}
relationPartitionKey = PartitionKey(restriction->relationId);
foreach(restrictionCell, allRestrictions)
{
RestrictInfo *restrictInfo = (RestrictInfo *) lfirst(restrictionCell);
relationHasRestriction = relationHasRestriction ||
HasUninstantiatedQualWalker(
(Node *) restrictInfo->clause,
relationPartitionKey);
if (relationHasRestriction)
{
break;
}
}
allRelationsHaveTheQual = allRelationsHaveTheQual && relationHasRestriction;
}
return allRelationsHaveTheQual;
}
/*
* HasUninstantiatedQualWalker returns true if the given expression
* constains a parameter with UNINSTANTIATED_PARAMETER_ID.
*/
static bool
HasUninstantiatedQualWalker(Node *node, void *context)
{
Var *relationPartitionColumn = (Var *) context;
if (node == NULL)
{
return false;
}
if (IsA(node, OpExpr) && list_length(((OpExpr *) node)->args) == 2)
{
OpExpr *op = (OpExpr *) node;
Node *leftop = get_leftop((Expr *) op);
Node *rightop = get_rightop((Expr *) op);
Param *param = NULL;
Var *currentColumn = NULL;
/* look for the Params */
if (IsA(leftop, Param))
{
param = (Param *) leftop;
/*
* Before instantiating the qual, ensure that it is equal to
* the partition key.
*/
if (IsA(rightop, Var))
{
currentColumn = (Var *) rightop;
}
}
else if (IsA(rightop, Param))
{
param = (Param *) rightop;
/*
* Before instantiating the qual, ensure that it is equal to
* the partition key.
*/
if (IsA(leftop, Var))
{
currentColumn = (Var *) leftop;
}
}
else
{
return expression_tree_walker(node, HasUninstantiatedQualWalker, context);
}
if (!(param && param->paramid == UNINSTANTIATED_PARAMETER_ID))
{
return false;
}
/* ensure that it is the relation's partition column */
if (relationPartitionColumn && currentColumn &&
currentColumn->varattno != relationPartitionColumn->varattno)
{
return false;
}
/*
* We still return true here given that finding the parameter is the
* actual goal of the walker. We only hit here once the query includes
* (partitionColumn = Const) on the query and we artificially added
* the uninstantiated parameter to the query.
*/
return true;
}
return expression_tree_walker(node, HasUninstantiatedQualWalker, context);
}
/* /*
* MultiTaskRouterSelectQuerySupported returns NULL if the query may be used * MultiTaskRouterSelectQuerySupported returns NULL if the query may be used
* as the source for an INSERT ... SELECT or returns a description why not. * as the source for an INSERT ... SELECT or returns a description why not.
@ -2409,7 +2569,9 @@ TargetShardIntervalsForSelect(Query *query,
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId); DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId);
int shardCount = cacheEntry->shardIntervalArrayLength; int shardCount = cacheEntry->shardIntervalArrayLength;
List *baseRestrictionList = relationRestriction->relOptInfo->baserestrictinfo; List *baseRestrictionList = relationRestriction->relOptInfo->baserestrictinfo;
List *restrictClauseList = get_all_actual_clauses(baseRestrictionList); List *baseRestrictClauseList = get_all_actual_clauses(baseRestrictionList);
List *joinInfoRestrictionClauseList =
GetHashedJoinInfoRestrictions(relationRestriction->relOptInfo);
List *prunedShardList = NIL; List *prunedShardList = NIL;
int shardIndex = 0; int shardIndex = 0;
List *joinInfoList = relationRestriction->relOptInfo->joininfo; List *joinInfoList = relationRestriction->relOptInfo->joininfo;
@ -2428,6 +2590,7 @@ TargetShardIntervalsForSelect(Query *query,
if (!whereFalseQuery && shardCount > 0) if (!whereFalseQuery && shardCount > 0)
{ {
List *shardIntervalList = NIL; List *shardIntervalList = NIL;
List *allRestrictions = NIL;
for (shardIndex = 0; shardIndex < shardCount; shardIndex++) for (shardIndex = 0; shardIndex < shardCount; shardIndex++)
{ {
@ -2436,8 +2599,11 @@ TargetShardIntervalsForSelect(Query *query,
shardIntervalList = lappend(shardIntervalList, shardInterval); shardIntervalList = lappend(shardIntervalList, shardInterval);
} }
allRestrictions = list_concat(baseRestrictClauseList,
joinInfoRestrictionClauseList);
prunedShardList = PruneShardList(relationId, tableId, prunedShardList = PruneShardList(relationId, tableId,
restrictClauseList, allRestrictions,
shardIntervalList); shardIntervalList);
/* /*
@ -2460,6 +2626,71 @@ TargetShardIntervalsForSelect(Query *query,
} }
/*
* GetJoinInfoRestrictions iterates over the joininfo list of the given relInfo
* and returns all the restrictions which includes a hashed column generated by
* InstantiatePartitionQual() function.
*/
static List *
GetHashedJoinInfoRestrictions(RelOptInfo *relInfo)
{
List *hashedJoinInfoRestrictions = NULL;
ListCell *joinInfoCell = NULL;
List *joinInfoClauses = NIL;
/* Scan the rel's join clauses and get the necessary ones */
foreach(joinInfoCell, relInfo->joininfo)
{
RestrictInfo *restrictInfo = (RestrictInfo *) lfirst(joinInfoCell);
OpExpr *restrictionOpExpression = NULL;
Var *hashedColumn = MakeInt4Column();
Expr *restrictionExpression = NULL;
/*
* We're looking for the hashedOperatorList that is returned by
* InstantiatePartitionQual()
*/
if (!IsA(restrictInfo->clause, List))
{
continue;
}
/*
* Expected expression is in the form of (hashedCol >= val) or
* (hashedCol =< val). So, the following checks aims to filter
* such operator expressions.
*/
restrictionExpression = (Expr *) linitial((List *) restrictInfo->clause);
if (!SimpleOpExpression(restrictionExpression))
{
continue;
}
restrictionOpExpression = (OpExpr *) restrictionExpression;
if (!(OperatorImplementsStrategy(restrictionOpExpression->opno,
BTGreaterEqualStrategyNumber) ||
OperatorImplementsStrategy(restrictionOpExpression->opno,
BTLessEqualStrategyNumber)))
{
continue;
}
if (!OpExpressionContainsColumn(restrictionOpExpression, hashedColumn))
{
continue;
}
hashedJoinInfoRestrictions = lappend(hashedJoinInfoRestrictions,
restrictInfo);
}
/* finally get the actual clauses from the restrict infos */
joinInfoClauses = get_all_actual_clauses(hashedJoinInfoRestrictions);
return joinInfoClauses;
}
/* /*
* RelationPrunesToMultipleShards returns true if the given list of * RelationPrunesToMultipleShards returns true if the given list of
* relation-to-shard mappings contains at least two mappings with * relation-to-shard mappings contains at least two mappings with
@ -3047,6 +3278,13 @@ InstantiatePartitionQualWalker(Node *node, void *context)
return node; return node;
} }
/* if the qual is not on the partition column, do not instantiate */
if (relationPartitionColumn && currentColumn &&
currentColumn->varattno != relationPartitionColumn->varattno)
{
return node;
}
/* get the integer >=, <= operators from the catalog */ /* get the integer >=, <= operators from the catalog */
integer4GEoperatorId = get_opfamily_member(INTEGER_BTREE_FAM_OID, INT4OID, integer4GEoperatorId = get_opfamily_member(INTEGER_BTREE_FAM_OID, INT4OID,
INT4OID, INT4OID,

View File

@ -53,7 +53,8 @@ DeferredErrorMessage * DeferredErrorInternal(int code, const char *message, cons
RaiseDeferredErrorInternal(error, elevel); \ RaiseDeferredErrorInternal(error, elevel); \
if (__builtin_constant_p(elevel) && (elevel) >= ERROR) { \ if (__builtin_constant_p(elevel) && (elevel) >= ERROR) { \
pg_unreachable(); } \ pg_unreachable(); } \
} while (0) } \
while (0)
void RaiseDeferredErrorInternal(DeferredErrorMessage *error, int elevel); void RaiseDeferredErrorInternal(DeferredErrorMessage *error, int elevel);

View File

@ -204,6 +204,7 @@ extern bool ExtractRangeTableRelationWalker(Node *node, List **rangeTableList);
extern bool ExtractRangeTableEntryWalker(Node *node, List **rangeTableList); extern bool ExtractRangeTableEntryWalker(Node *node, List **rangeTableList);
extern List * pull_var_clause_default(Node *node); extern List * pull_var_clause_default(Node *node);
extern bool OperatorImplementsEquality(Oid opno); extern bool OperatorImplementsEquality(Oid opno);
extern bool OperatorImplementsStrategy(Oid opno, int strategy);
#endif /* MULTI_LOGICAL_PLANNER_H */ #endif /* MULTI_LOGICAL_PLANNER_H */

View File

@ -613,24 +613,30 @@ WHERE user_id IN (SELECT user_id
DEBUG: predicate pruning for shardId 13300000 DEBUG: predicate pruning for shardId 13300000
DEBUG: predicate pruning for shardId 13300001 DEBUG: predicate pruning for shardId 13300001
DEBUG: predicate pruning for shardId 13300002 DEBUG: predicate pruning for shardId 13300002
DEBUG: predicate pruning for shardId 13300003
DEBUG: predicate pruning for shardId 13300004 DEBUG: predicate pruning for shardId 13300004
DEBUG: predicate pruning for shardId 13300005 DEBUG: predicate pruning for shardId 13300005
DEBUG: predicate pruning for shardId 13300006 DEBUG: predicate pruning for shardId 13300006
DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300004 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300003 raw_events_first WHERE ((user_id IN (SELECT raw_events_second.user_id FROM public.raw_events_second_13300007 raw_events_second WHERE (raw_events_second.user_id = 2))) AND ((hashint4(user_id) >= '-2147483648'::integer) AND (hashint4(user_id) <= '-1073741825'::integer))) DEBUG: predicate pruning for shardId 13300007
DEBUG: Skipping target shard interval 13300004 since SELECT query for it pruned away
DEBUG: predicate pruning for shardId 13300000 DEBUG: predicate pruning for shardId 13300000
DEBUG: predicate pruning for shardId 13300001 DEBUG: predicate pruning for shardId 13300001
DEBUG: predicate pruning for shardId 13300002 DEBUG: predicate pruning for shardId 13300002
DEBUG: predicate pruning for shardId 13300003
DEBUG: predicate pruning for shardId 13300004 DEBUG: predicate pruning for shardId 13300004
DEBUG: predicate pruning for shardId 13300005 DEBUG: predicate pruning for shardId 13300005
DEBUG: predicate pruning for shardId 13300006 DEBUG: predicate pruning for shardId 13300006
DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300005 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300003 raw_events_first WHERE ((user_id IN (SELECT raw_events_second.user_id FROM public.raw_events_second_13300007 raw_events_second WHERE (raw_events_second.user_id = 2))) AND ((hashint4(user_id) >= '-1073741824'::integer) AND (hashint4(user_id) <= '-1'::integer))) DEBUG: predicate pruning for shardId 13300007
DEBUG: Skipping target shard interval 13300005 since SELECT query for it pruned away
DEBUG: predicate pruning for shardId 13300000 DEBUG: predicate pruning for shardId 13300000
DEBUG: predicate pruning for shardId 13300001 DEBUG: predicate pruning for shardId 13300001
DEBUG: predicate pruning for shardId 13300002 DEBUG: predicate pruning for shardId 13300002
DEBUG: predicate pruning for shardId 13300003
DEBUG: predicate pruning for shardId 13300004 DEBUG: predicate pruning for shardId 13300004
DEBUG: predicate pruning for shardId 13300005 DEBUG: predicate pruning for shardId 13300005
DEBUG: predicate pruning for shardId 13300006 DEBUG: predicate pruning for shardId 13300006
DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300006 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300003 raw_events_first WHERE ((user_id IN (SELECT raw_events_second.user_id FROM public.raw_events_second_13300007 raw_events_second WHERE (raw_events_second.user_id = 2))) AND ((hashint4(user_id) >= 0) AND (hashint4(user_id) <= 1073741823))) DEBUG: predicate pruning for shardId 13300007
DEBUG: Skipping target shard interval 13300006 since SELECT query for it pruned away
DEBUG: predicate pruning for shardId 13300000 DEBUG: predicate pruning for shardId 13300000
DEBUG: predicate pruning for shardId 13300001 DEBUG: predicate pruning for shardId 13300001
DEBUG: predicate pruning for shardId 13300002 DEBUG: predicate pruning for shardId 13300002
@ -639,6 +645,90 @@ DEBUG: predicate pruning for shardId 13300005
DEBUG: predicate pruning for shardId 13300006 DEBUG: predicate pruning for shardId 13300006
DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300007 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300003 raw_events_first WHERE ((user_id IN (SELECT raw_events_second.user_id FROM public.raw_events_second_13300007 raw_events_second WHERE (raw_events_second.user_id = 2))) AND ((hashint4(user_id) >= 1073741824) AND (hashint4(user_id) <= 2147483647))) DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300007 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300003 raw_events_first WHERE ((user_id IN (SELECT raw_events_second.user_id FROM public.raw_events_second_13300007 raw_events_second WHERE (raw_events_second.user_id = 2))) AND ((hashint4(user_id) >= 1073741824) AND (hashint4(user_id) <= 2147483647)))
DEBUG: Plan is router executable DEBUG: Plan is router executable
INSERT INTO raw_events_second
(user_id)
SELECT user_id
FROM raw_events_first
WHERE user_id IN (SELECT user_id
FROM raw_events_second
WHERE user_id != 2 AND value_1 = 2000)
ON conflict (user_id, value_1) DO NOTHING;
DEBUG: predicate pruning for shardId 13300001
DEBUG: predicate pruning for shardId 13300002
DEBUG: predicate pruning for shardId 13300003
DEBUG: predicate pruning for shardId 13300005
DEBUG: predicate pruning for shardId 13300006
DEBUG: predicate pruning for shardId 13300007
DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300004 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300000 raw_events_first WHERE ((user_id IN (SELECT raw_events_second.user_id FROM public.raw_events_second_13300004 raw_events_second WHERE ((raw_events_second.user_id <> 2) AND (raw_events_second.value_1 = 2000)))) AND ((hashint4(user_id) >= '-2147483648'::integer) AND (hashint4(user_id) <= '-1073741825'::integer))) ON CONFLICT(user_id, value_1) DO NOTHING
DEBUG: predicate pruning for shardId 13300000
DEBUG: predicate pruning for shardId 13300002
DEBUG: predicate pruning for shardId 13300003
DEBUG: predicate pruning for shardId 13300004
DEBUG: predicate pruning for shardId 13300006
DEBUG: predicate pruning for shardId 13300007
DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300005 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300001 raw_events_first WHERE ((user_id IN (SELECT raw_events_second.user_id FROM public.raw_events_second_13300005 raw_events_second WHERE ((raw_events_second.user_id <> 2) AND (raw_events_second.value_1 = 2000)))) AND ((hashint4(user_id) >= '-1073741824'::integer) AND (hashint4(user_id) <= '-1'::integer))) ON CONFLICT(user_id, value_1) DO NOTHING
DEBUG: predicate pruning for shardId 13300000
DEBUG: predicate pruning for shardId 13300001
DEBUG: predicate pruning for shardId 13300003
DEBUG: predicate pruning for shardId 13300004
DEBUG: predicate pruning for shardId 13300005
DEBUG: predicate pruning for shardId 13300007
DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300006 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300002 raw_events_first WHERE ((user_id IN (SELECT raw_events_second.user_id FROM public.raw_events_second_13300006 raw_events_second WHERE ((raw_events_second.user_id <> 2) AND (raw_events_second.value_1 = 2000)))) AND ((hashint4(user_id) >= 0) AND (hashint4(user_id) <= 1073741823))) ON CONFLICT(user_id, value_1) DO NOTHING
DEBUG: predicate pruning for shardId 13300000
DEBUG: predicate pruning for shardId 13300001
DEBUG: predicate pruning for shardId 13300002
DEBUG: predicate pruning for shardId 13300004
DEBUG: predicate pruning for shardId 13300005
DEBUG: predicate pruning for shardId 13300006
DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300007 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300003 raw_events_first WHERE ((user_id IN (SELECT raw_events_second.user_id FROM public.raw_events_second_13300007 raw_events_second WHERE ((raw_events_second.user_id <> 2) AND (raw_events_second.value_1 = 2000)))) AND ((hashint4(user_id) >= 1073741824) AND (hashint4(user_id) <= 2147483647))) ON CONFLICT(user_id, value_1) DO NOTHING
DEBUG: Plan is router executable
INSERT INTO raw_events_second
(user_id)
SELECT user_id
FROM raw_events_first
WHERE user_id IN (SELECT user_id
FROM raw_events_second WHERE false);
DEBUG: Skipping target shard interval 13300004 since SELECT query for it pruned away
DEBUG: Skipping target shard interval 13300005 since SELECT query for it pruned away
DEBUG: Skipping target shard interval 13300006 since SELECT query for it pruned away
DEBUG: Skipping target shard interval 13300007 since SELECT query for it pruned away
DEBUG: Plan is router executable
INSERT INTO raw_events_second
(user_id)
SELECT user_id
FROM raw_events_first
WHERE user_id IN (SELECT user_id
FROM raw_events_second
WHERE value_1 = 1000 OR value_1 = 2000 OR value_1 = 3000);
DEBUG: predicate pruning for shardId 13300001
DEBUG: predicate pruning for shardId 13300002
DEBUG: predicate pruning for shardId 13300003
DEBUG: predicate pruning for shardId 13300005
DEBUG: predicate pruning for shardId 13300006
DEBUG: predicate pruning for shardId 13300007
DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300004 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300000 raw_events_first WHERE ((user_id IN (SELECT raw_events_second.user_id FROM public.raw_events_second_13300004 raw_events_second WHERE ((raw_events_second.value_1 = 1000) OR (raw_events_second.value_1 = 2000) OR (raw_events_second.value_1 = 3000)))) AND ((hashint4(user_id) >= '-2147483648'::integer) AND (hashint4(user_id) <= '-1073741825'::integer)))
DEBUG: predicate pruning for shardId 13300000
DEBUG: predicate pruning for shardId 13300002
DEBUG: predicate pruning for shardId 13300003
DEBUG: predicate pruning for shardId 13300004
DEBUG: predicate pruning for shardId 13300006
DEBUG: predicate pruning for shardId 13300007
DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300005 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300001 raw_events_first WHERE ((user_id IN (SELECT raw_events_second.user_id FROM public.raw_events_second_13300005 raw_events_second WHERE ((raw_events_second.value_1 = 1000) OR (raw_events_second.value_1 = 2000) OR (raw_events_second.value_1 = 3000)))) AND ((hashint4(user_id) >= '-1073741824'::integer) AND (hashint4(user_id) <= '-1'::integer)))
DEBUG: predicate pruning for shardId 13300000
DEBUG: predicate pruning for shardId 13300001
DEBUG: predicate pruning for shardId 13300003
DEBUG: predicate pruning for shardId 13300004
DEBUG: predicate pruning for shardId 13300005
DEBUG: predicate pruning for shardId 13300007
DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300006 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300002 raw_events_first WHERE ((user_id IN (SELECT raw_events_second.user_id FROM public.raw_events_second_13300006 raw_events_second WHERE ((raw_events_second.value_1 = 1000) OR (raw_events_second.value_1 = 2000) OR (raw_events_second.value_1 = 3000)))) AND ((hashint4(user_id) >= 0) AND (hashint4(user_id) <= 1073741823)))
DEBUG: predicate pruning for shardId 13300000
DEBUG: predicate pruning for shardId 13300001
DEBUG: predicate pruning for shardId 13300002
DEBUG: predicate pruning for shardId 13300004
DEBUG: predicate pruning for shardId 13300005
DEBUG: predicate pruning for shardId 13300006
DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300007 AS citus_table_alias (user_id) SELECT user_id FROM public.raw_events_first_13300003 raw_events_first WHERE ((user_id IN (SELECT raw_events_second.user_id FROM public.raw_events_second_13300007 raw_events_second WHERE ((raw_events_second.value_1 = 1000) OR (raw_events_second.value_1 = 2000) OR (raw_events_second.value_1 = 3000)))) AND ((hashint4(user_id) >= 1073741824) AND (hashint4(user_id) <= 2147483647)))
DEBUG: Plan is router executable
-- some UPSERTS -- some UPSERTS
INSERT INTO agg_events AS ae INSERT INTO agg_events AS ae
( (
@ -1043,6 +1133,7 @@ DEBUG: predicate pruning for shardId 13300005
DEBUG: predicate pruning for shardId 13300006 DEBUG: predicate pruning for shardId 13300006
DEBUG: predicate pruning for shardId 13300007 DEBUG: predicate pruning for shardId 13300007
DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300000 raw_events_first LEFT JOIN public.raw_events_second_13300004 raw_events_second ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= '-2147483648'::integer) AND (hashint4(raw_events_first.user_id) <= '-1073741825'::integer))) DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300000 raw_events_first LEFT JOIN public.raw_events_second_13300004 raw_events_second ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= '-2147483648'::integer) AND (hashint4(raw_events_first.user_id) <= '-1073741825'::integer)))
DEBUG: predicate pruning for shardId 13300000
DEBUG: predicate pruning for shardId 13300001 DEBUG: predicate pruning for shardId 13300001
DEBUG: predicate pruning for shardId 13300002 DEBUG: predicate pruning for shardId 13300002
DEBUG: predicate pruning for shardId 13300003 DEBUG: predicate pruning for shardId 13300003
@ -1050,7 +1141,8 @@ DEBUG: predicate pruning for shardId 13300004
DEBUG: predicate pruning for shardId 13300005 DEBUG: predicate pruning for shardId 13300005
DEBUG: predicate pruning for shardId 13300006 DEBUG: predicate pruning for shardId 13300006
DEBUG: predicate pruning for shardId 13300007 DEBUG: predicate pruning for shardId 13300007
DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300000 raw_events_first LEFT JOIN (SELECT NULL::integer AS user_id, NULL::timestamp without time zone AS "time", NULL::integer AS value_1, NULL::integer AS value_2, NULL::double precision AS value_3, NULL::bigint AS value_4 WHERE false) raw_events_second(user_id, "time", value_1, value_2, value_3, value_4) ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= '-1073741824'::integer) AND (hashint4(raw_events_first.user_id) <= '-1'::integer))) DEBUG: Skipping target shard interval 13300009 since SELECT query for it pruned away
DEBUG: predicate pruning for shardId 13300000
DEBUG: predicate pruning for shardId 13300001 DEBUG: predicate pruning for shardId 13300001
DEBUG: predicate pruning for shardId 13300002 DEBUG: predicate pruning for shardId 13300002
DEBUG: predicate pruning for shardId 13300003 DEBUG: predicate pruning for shardId 13300003
@ -1058,7 +1150,8 @@ DEBUG: predicate pruning for shardId 13300004
DEBUG: predicate pruning for shardId 13300005 DEBUG: predicate pruning for shardId 13300005
DEBUG: predicate pruning for shardId 13300006 DEBUG: predicate pruning for shardId 13300006
DEBUG: predicate pruning for shardId 13300007 DEBUG: predicate pruning for shardId 13300007
DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300000 raw_events_first LEFT JOIN (SELECT NULL::integer AS user_id, NULL::timestamp without time zone AS "time", NULL::integer AS value_1, NULL::integer AS value_2, NULL::double precision AS value_3, NULL::bigint AS value_4 WHERE false) raw_events_second(user_id, "time", value_1, value_2, value_3, value_4) ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= 0) AND (hashint4(raw_events_first.user_id) <= 1073741823))) DEBUG: Skipping target shard interval 13300010 since SELECT query for it pruned away
DEBUG: predicate pruning for shardId 13300000
DEBUG: predicate pruning for shardId 13300001 DEBUG: predicate pruning for shardId 13300001
DEBUG: predicate pruning for shardId 13300002 DEBUG: predicate pruning for shardId 13300002
DEBUG: predicate pruning for shardId 13300003 DEBUG: predicate pruning for shardId 13300003
@ -1066,7 +1159,7 @@ DEBUG: predicate pruning for shardId 13300004
DEBUG: predicate pruning for shardId 13300005 DEBUG: predicate pruning for shardId 13300005
DEBUG: predicate pruning for shardId 13300006 DEBUG: predicate pruning for shardId 13300006
DEBUG: predicate pruning for shardId 13300007 DEBUG: predicate pruning for shardId 13300007
DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300000 raw_events_first LEFT JOIN (SELECT NULL::integer AS user_id, NULL::timestamp without time zone AS "time", NULL::integer AS value_1, NULL::integer AS value_2, NULL::double precision AS value_3, NULL::bigint AS value_4 WHERE false) raw_events_second(user_id, "time", value_1, value_2, value_3, value_4) ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= 1073741824) AND (hashint4(raw_events_first.user_id) <= 2147483647))) DEBUG: Skipping target shard interval 13300011 since SELECT query for it pruned away
DEBUG: Plan is router executable DEBUG: Plan is router executable
INSERT INTO agg_events (user_id) INSERT INTO agg_events (user_id)
SELECT SELECT
@ -1213,11 +1306,7 @@ DEBUG: predicate pruning for shardId 13300005
DEBUG: predicate pruning for shardId 13300006 DEBUG: predicate pruning for shardId 13300006
DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300003 raw_events_first JOIN public.raw_events_second_13300007 raw_events_second ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((raw_events_second.user_id = ANY (ARRAY[19, 20, 21])) AND ((hashint4(raw_events_first.user_id) >= 1073741824) AND (hashint4(raw_events_first.user_id) <= 2147483647))) DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300003 raw_events_first JOIN public.raw_events_second_13300007 raw_events_second ON ((raw_events_first.user_id = raw_events_second.user_id))) WHERE ((raw_events_second.user_id = ANY (ARRAY[19, 20, 21])) AND ((hashint4(raw_events_first.user_id) >= 1073741824) AND (hashint4(raw_events_first.user_id) <= 2147483647)))
DEBUG: Plan is router executable DEBUG: Plan is router executable
-- the following is a very tricky query for Citus -- not supported given that the join is not on the partition column
-- although we do not support pushing down JOINs on non-partition
-- columns here it is safe to push it down given that we're looking for
-- a specific value (i.e., value_1 = 12) on the joining column.
-- Note that the query always hits the same shard on raw_events_second
INSERT INTO agg_events INSERT INTO agg_events
(user_id) (user_id)
SELECT raw_events_first.user_id SELECT raw_events_first.user_id
@ -1225,35 +1314,7 @@ FROM raw_events_first,
raw_events_second raw_events_second
WHERE raw_events_second.user_id = raw_events_first.value_1 WHERE raw_events_second.user_id = raw_events_first.value_1
AND raw_events_first.value_1 = 12; AND raw_events_first.value_1 = 12;
DEBUG: predicate pruning for shardId 13300001 ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator
DEBUG: predicate pruning for shardId 13300002
DEBUG: predicate pruning for shardId 13300003
DEBUG: predicate pruning for shardId 13300004
DEBUG: predicate pruning for shardId 13300005
DEBUG: predicate pruning for shardId 13300006
DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM public.raw_events_first_13300000 raw_events_first, public.raw_events_second_13300007 raw_events_second WHERE (((raw_events_second.user_id = raw_events_first.value_1) AND (raw_events_first.value_1 = 12)) AND ((hashint4(raw_events_first.user_id) >= '-2147483648'::integer) AND (hashint4(raw_events_first.user_id) <= '-1073741825'::integer)))
DEBUG: predicate pruning for shardId 13300000
DEBUG: predicate pruning for shardId 13300002
DEBUG: predicate pruning for shardId 13300003
DEBUG: predicate pruning for shardId 13300004
DEBUG: predicate pruning for shardId 13300005
DEBUG: predicate pruning for shardId 13300006
DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM public.raw_events_first_13300001 raw_events_first, public.raw_events_second_13300007 raw_events_second WHERE (((raw_events_second.user_id = raw_events_first.value_1) AND (raw_events_first.value_1 = 12)) AND ((hashint4(raw_events_first.user_id) >= '-1073741824'::integer) AND (hashint4(raw_events_first.user_id) <= '-1'::integer)))
DEBUG: predicate pruning for shardId 13300000
DEBUG: predicate pruning for shardId 13300001
DEBUG: predicate pruning for shardId 13300003
DEBUG: predicate pruning for shardId 13300004
DEBUG: predicate pruning for shardId 13300005
DEBUG: predicate pruning for shardId 13300006
DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM public.raw_events_first_13300002 raw_events_first, public.raw_events_second_13300007 raw_events_second WHERE (((raw_events_second.user_id = raw_events_first.value_1) AND (raw_events_first.value_1 = 12)) AND ((hashint4(raw_events_first.user_id) >= 0) AND (hashint4(raw_events_first.user_id) <= 1073741823)))
DEBUG: predicate pruning for shardId 13300000
DEBUG: predicate pruning for shardId 13300001
DEBUG: predicate pruning for shardId 13300002
DEBUG: predicate pruning for shardId 13300004
DEBUG: predicate pruning for shardId 13300005
DEBUG: predicate pruning for shardId 13300006
DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM public.raw_events_first_13300003 raw_events_first, public.raw_events_second_13300007 raw_events_second WHERE (((raw_events_second.user_id = raw_events_first.value_1) AND (raw_events_first.value_1 = 12)) AND ((hashint4(raw_events_first.user_id) >= 1073741824) AND (hashint4(raw_events_first.user_id) <= 2147483647)))
DEBUG: Plan is router executable
-- some unsupported LEFT/INNER JOINs -- some unsupported LEFT/INNER JOINs
-- JOIN on one table with partition column other is not -- JOIN on one table with partition column other is not
INSERT INTO agg_events (user_id) INSERT INTO agg_events (user_id)
@ -1261,22 +1322,14 @@ SELECT
raw_events_first.user_id raw_events_first.user_id
FROM FROM
raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1; raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1;
DEBUG: predicate pruning for shardId 13300001 ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator
DEBUG: predicate pruning for shardId 13300002
DEBUG: predicate pruning for shardId 13300003
ERROR: cannot perform distributed planning for the given modification
DETAIL: Select query cannot be pushed down to the worker.
-- same as the above with INNER JOIN -- same as the above with INNER JOIN
INSERT INTO agg_events (user_id) INSERT INTO agg_events (user_id)
SELECT SELECT
raw_events_first.user_id raw_events_first.user_id
FROM FROM
raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1; raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1;
DEBUG: predicate pruning for shardId 13300001 ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator
DEBUG: predicate pruning for shardId 13300002
DEBUG: predicate pruning for shardId 13300003
ERROR: cannot perform distributed planning for the given modification
DETAIL: Select query cannot be pushed down to the worker.
-- a not meaningful query -- a not meaningful query
INSERT INTO agg_events INSERT INTO agg_events
(user_id) (user_id)
@ -1284,30 +1337,21 @@ SELECT raw_events_second.user_id
FROM raw_events_first, FROM raw_events_first,
raw_events_second raw_events_second
WHERE raw_events_first.user_id = raw_events_first.value_1; WHERE raw_events_first.user_id = raw_events_first.value_1;
ERROR: cannot perform distributed planning for the given modification ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator
DETAIL: Select query cannot be pushed down to the worker.
-- both tables joined on non-partition columns -- both tables joined on non-partition columns
INSERT INTO agg_events (user_id) INSERT INTO agg_events (user_id)
SELECT SELECT
raw_events_first.user_id raw_events_first.user_id
FROM FROM
raw_events_first LEFT JOIN raw_events_second ON raw_events_first.value_1 = raw_events_second.value_1; raw_events_first LEFT JOIN raw_events_second ON raw_events_first.value_1 = raw_events_second.value_1;
DEBUG: predicate pruning for shardId 13300001 ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator
DEBUG: predicate pruning for shardId 13300002
DEBUG: predicate pruning for shardId 13300003
ERROR: cannot perform distributed planning for the given modification
DETAIL: Select query cannot be pushed down to the worker.
-- same as the above with INNER JOIN -- same as the above with INNER JOIN
INSERT INTO agg_events (user_id) INSERT INTO agg_events (user_id)
SELECT SELECT
raw_events_first.user_id raw_events_first.user_id
FROM FROM
raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1; raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1;
DEBUG: predicate pruning for shardId 13300001 ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator
DEBUG: predicate pruning for shardId 13300002
DEBUG: predicate pruning for shardId 13300003
ERROR: cannot perform distributed planning for the given modification
DETAIL: Select query cannot be pushed down to the worker.
-- although we do not support pushing down JOINs on non-partition -- although we do not support pushing down JOINs on non-partition
-- columns here it is safe to push it down given that we're looking for -- columns here it is safe to push it down given that we're looking for
-- a specific value (i.e., user_id = 10) on the joining column. -- a specific value (i.e., user_id = 10) on the joining column.
@ -1326,33 +1370,32 @@ DEBUG: predicate pruning for shardId 13300005
DEBUG: predicate pruning for shardId 13300006 DEBUG: predicate pruning for shardId 13300006
DEBUG: predicate pruning for shardId 13300007 DEBUG: predicate pruning for shardId 13300007
DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300000 raw_events_first LEFT JOIN public.raw_events_second_13300004 raw_events_second ON ((raw_events_first.user_id = raw_events_second.value_1))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= '-2147483648'::integer) AND (hashint4(raw_events_first.user_id) <= '-1073741825'::integer))) DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300000 raw_events_first LEFT JOIN public.raw_events_second_13300004 raw_events_second ON ((raw_events_first.user_id = raw_events_second.value_1))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= '-2147483648'::integer) AND (hashint4(raw_events_first.user_id) <= '-1073741825'::integer)))
DEBUG: predicate pruning for shardId 13300000
DEBUG: predicate pruning for shardId 13300001 DEBUG: predicate pruning for shardId 13300001
DEBUG: predicate pruning for shardId 13300002 DEBUG: predicate pruning for shardId 13300002
DEBUG: predicate pruning for shardId 13300003 DEBUG: predicate pruning for shardId 13300003
DEBUG: predicate pruning for shardId 13300004 DEBUG: predicate pruning for shardId 13300004
DEBUG: predicate pruning for shardId 13300006 DEBUG: predicate pruning for shardId 13300006
DEBUG: predicate pruning for shardId 13300007 DEBUG: predicate pruning for shardId 13300007
DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300000 raw_events_first LEFT JOIN public.raw_events_second_13300005 raw_events_second ON ((raw_events_first.user_id = raw_events_second.value_1))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= '-1073741824'::integer) AND (hashint4(raw_events_first.user_id) <= '-1'::integer))) DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM ((SELECT NULL::integer AS user_id, NULL::timestamp without time zone AS "time", NULL::integer AS value_1, NULL::integer AS value_2, NULL::double precision AS value_3, NULL::bigint AS value_4 WHERE false) raw_events_first(user_id, "time", value_1, value_2, value_3, value_4) LEFT JOIN public.raw_events_second_13300005 raw_events_second ON ((raw_events_first.user_id = raw_events_second.value_1))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= '-1073741824'::integer) AND (hashint4(raw_events_first.user_id) <= '-1'::integer)))
DEBUG: predicate pruning for shardId 13300000
DEBUG: predicate pruning for shardId 13300001 DEBUG: predicate pruning for shardId 13300001
DEBUG: predicate pruning for shardId 13300002 DEBUG: predicate pruning for shardId 13300002
DEBUG: predicate pruning for shardId 13300003 DEBUG: predicate pruning for shardId 13300003
DEBUG: predicate pruning for shardId 13300004 DEBUG: predicate pruning for shardId 13300004
DEBUG: predicate pruning for shardId 13300005 DEBUG: predicate pruning for shardId 13300005
DEBUG: predicate pruning for shardId 13300007 DEBUG: predicate pruning for shardId 13300007
DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300000 raw_events_first LEFT JOIN public.raw_events_second_13300006 raw_events_second ON ((raw_events_first.user_id = raw_events_second.value_1))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= 0) AND (hashint4(raw_events_first.user_id) <= 1073741823))) DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM ((SELECT NULL::integer AS user_id, NULL::timestamp without time zone AS "time", NULL::integer AS value_1, NULL::integer AS value_2, NULL::double precision AS value_3, NULL::bigint AS value_4 WHERE false) raw_events_first(user_id, "time", value_1, value_2, value_3, value_4) LEFT JOIN public.raw_events_second_13300006 raw_events_second ON ((raw_events_first.user_id = raw_events_second.value_1))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= 0) AND (hashint4(raw_events_first.user_id) <= 1073741823)))
DEBUG: predicate pruning for shardId 13300000
DEBUG: predicate pruning for shardId 13300001 DEBUG: predicate pruning for shardId 13300001
DEBUG: predicate pruning for shardId 13300002 DEBUG: predicate pruning for shardId 13300002
DEBUG: predicate pruning for shardId 13300003 DEBUG: predicate pruning for shardId 13300003
DEBUG: predicate pruning for shardId 13300004 DEBUG: predicate pruning for shardId 13300004
DEBUG: predicate pruning for shardId 13300005 DEBUG: predicate pruning for shardId 13300005
DEBUG: predicate pruning for shardId 13300006 DEBUG: predicate pruning for shardId 13300006
DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300000 raw_events_first LEFT JOIN public.raw_events_second_13300007 raw_events_second ON ((raw_events_first.user_id = raw_events_second.value_1))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= 1073741824) AND (hashint4(raw_events_first.user_id) <= 2147483647))) DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM ((SELECT NULL::integer AS user_id, NULL::timestamp without time zone AS "time", NULL::integer AS value_1, NULL::integer AS value_2, NULL::double precision AS value_3, NULL::bigint AS value_4 WHERE false) raw_events_first(user_id, "time", value_1, value_2, value_3, value_4) LEFT JOIN public.raw_events_second_13300007 raw_events_second ON ((raw_events_first.user_id = raw_events_second.value_1))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= 1073741824) AND (hashint4(raw_events_first.user_id) <= 2147483647)))
DEBUG: Plan is router executable DEBUG: Plan is router executable
-- same as the above with INNER JOIN -- same as the above with INNER JOIN
-- however this time query is not pushed down
-- to the worker. This is related to how we process
-- restriction infos, which we're considering to
-- improve
INSERT INTO agg_events (user_id) INSERT INTO agg_events (user_id)
SELECT SELECT
raw_events_first.user_id raw_events_first.user_id
@ -1362,8 +1405,35 @@ WHERE raw_events_first.user_id = 10;
DEBUG: predicate pruning for shardId 13300001 DEBUG: predicate pruning for shardId 13300001
DEBUG: predicate pruning for shardId 13300002 DEBUG: predicate pruning for shardId 13300002
DEBUG: predicate pruning for shardId 13300003 DEBUG: predicate pruning for shardId 13300003
ERROR: cannot perform distributed planning for the given modification DEBUG: predicate pruning for shardId 13300005
DETAIL: Select query cannot be pushed down to the worker. DEBUG: predicate pruning for shardId 13300006
DEBUG: predicate pruning for shardId 13300007
DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM (public.raw_events_first_13300000 raw_events_first JOIN public.raw_events_second_13300004 raw_events_second ON ((raw_events_first.user_id = raw_events_second.value_1))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= '-2147483648'::integer) AND (hashint4(raw_events_first.user_id) <= '-1073741825'::integer)))
DEBUG: predicate pruning for shardId 13300000
DEBUG: predicate pruning for shardId 13300001
DEBUG: predicate pruning for shardId 13300002
DEBUG: predicate pruning for shardId 13300003
DEBUG: predicate pruning for shardId 13300004
DEBUG: predicate pruning for shardId 13300006
DEBUG: predicate pruning for shardId 13300007
DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM ((SELECT NULL::integer AS user_id, NULL::timestamp without time zone AS "time", NULL::integer AS value_1, NULL::integer AS value_2, NULL::double precision AS value_3, NULL::bigint AS value_4 WHERE false) raw_events_first(user_id, "time", value_1, value_2, value_3, value_4) JOIN public.raw_events_second_13300005 raw_events_second ON ((raw_events_first.user_id = raw_events_second.value_1))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= '-1073741824'::integer) AND (hashint4(raw_events_first.user_id) <= '-1'::integer)))
DEBUG: predicate pruning for shardId 13300000
DEBUG: predicate pruning for shardId 13300001
DEBUG: predicate pruning for shardId 13300002
DEBUG: predicate pruning for shardId 13300003
DEBUG: predicate pruning for shardId 13300004
DEBUG: predicate pruning for shardId 13300005
DEBUG: predicate pruning for shardId 13300007
DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM ((SELECT NULL::integer AS user_id, NULL::timestamp without time zone AS "time", NULL::integer AS value_1, NULL::integer AS value_2, NULL::double precision AS value_3, NULL::bigint AS value_4 WHERE false) raw_events_first(user_id, "time", value_1, value_2, value_3, value_4) JOIN public.raw_events_second_13300006 raw_events_second ON ((raw_events_first.user_id = raw_events_second.value_1))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= 0) AND (hashint4(raw_events_first.user_id) <= 1073741823)))
DEBUG: predicate pruning for shardId 13300000
DEBUG: predicate pruning for shardId 13300001
DEBUG: predicate pruning for shardId 13300002
DEBUG: predicate pruning for shardId 13300003
DEBUG: predicate pruning for shardId 13300004
DEBUG: predicate pruning for shardId 13300005
DEBUG: predicate pruning for shardId 13300006
DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM ((SELECT NULL::integer AS user_id, NULL::timestamp without time zone AS "time", NULL::integer AS value_1, NULL::integer AS value_2, NULL::double precision AS value_3, NULL::bigint AS value_4 WHERE false) raw_events_first(user_id, "time", value_1, value_2, value_3, value_4) JOIN public.raw_events_second_13300007 raw_events_second ON ((raw_events_first.user_id = raw_events_second.value_1))) WHERE ((raw_events_first.user_id = 10) AND ((hashint4(raw_events_first.user_id) >= 1073741824) AND (hashint4(raw_events_first.user_id) <= 2147483647)))
DEBUG: Plan is router executable
-- make things a bit more complicate with IN clauses -- make things a bit more complicate with IN clauses
INSERT INTO agg_events (user_id) INSERT INTO agg_events (user_id)
SELECT SELECT
@ -1371,11 +1441,7 @@ SELECT
FROM FROM
raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1 raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1
WHERE raw_events_first.value_1 IN (10, 11,12) OR raw_events_second.user_id IN (1,2,3,4); WHERE raw_events_first.value_1 IN (10, 11,12) OR raw_events_second.user_id IN (1,2,3,4);
DEBUG: predicate pruning for shardId 13300001 ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator
DEBUG: predicate pruning for shardId 13300002
DEBUG: predicate pruning for shardId 13300003
ERROR: cannot perform distributed planning for the given modification
DETAIL: Select query cannot be pushed down to the worker.
-- implicit join on non partition column should also not be pushed down -- implicit join on non partition column should also not be pushed down
INSERT INTO agg_events INSERT INTO agg_events
(user_id) (user_id)
@ -1383,11 +1449,7 @@ SELECT raw_events_first.user_id
FROM raw_events_first, FROM raw_events_first,
raw_events_second raw_events_second
WHERE raw_events_second.user_id = raw_events_first.value_1; WHERE raw_events_second.user_id = raw_events_first.value_1;
DEBUG: predicate pruning for shardId 13300001 ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator
DEBUG: predicate pruning for shardId 13300002
DEBUG: predicate pruning for shardId 13300003
ERROR: cannot perform distributed planning for the given modification
DETAIL: Select query cannot be pushed down to the worker.
-- the following is again a very tricky query for Citus -- the following is again a very tricky query for Citus
-- if the given filter was on value_1 as shown in the above, Citus could -- if the given filter was on value_1 as shown in the above, Citus could
-- push it down. But here the query is refused -- push it down. But here the query is refused
@ -1398,11 +1460,7 @@ FROM raw_events_first,
raw_events_second raw_events_second
WHERE raw_events_second.user_id = raw_events_first.value_1 WHERE raw_events_second.user_id = raw_events_first.value_1
AND raw_events_first.value_2 = 12; AND raw_events_first.value_2 = 12;
DEBUG: predicate pruning for shardId 13300001 ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator
DEBUG: predicate pruning for shardId 13300002
DEBUG: predicate pruning for shardId 13300003
ERROR: cannot perform distributed planning for the given modification
DETAIL: Select query cannot be pushed down to the worker.
-- foo is not joined on the partition key so the query is not -- foo is not joined on the partition key so the query is not
-- pushed down -- pushed down
INSERT INTO agg_events INSERT INTO agg_events
@ -1433,14 +1491,7 @@ FROM
ON (f.id = f2.id)) as outer_most ON (f.id = f2.id)) as outer_most
GROUP BY GROUP BY
outer_most.id; outer_most.id;
DEBUG: predicate pruning for shardId 13300001 ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator
DEBUG: predicate pruning for shardId 13300002
DEBUG: predicate pruning for shardId 13300003
DEBUG: predicate pruning for shardId 13300005
DEBUG: predicate pruning for shardId 13300006
DEBUG: predicate pruning for shardId 13300007
ERROR: cannot perform distributed planning for the given modification
DETAIL: Select query cannot be pushed down to the worker.
-- not equals on the partition column cannot be pushed down -- not equals on the partition column cannot be pushed down
INSERT INTO agg_events INSERT INTO agg_events
(value_4_agg, (value_4_agg,
@ -1456,8 +1507,7 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4,
raw_events_second raw_events_second
WHERE raw_events_first.user_id != raw_events_second.user_id WHERE raw_events_first.user_id != raw_events_second.user_id
GROUP BY raw_events_second.user_id) AS foo; GROUP BY raw_events_second.user_id) AS foo;
ERROR: cannot perform distributed planning for the given modification ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator
DETAIL: Select query cannot be pushed down to the worker.
-- INSERT partition column does not match with SELECT partition column -- INSERT partition column does not match with SELECT partition column
INSERT INTO agg_events INSERT INTO agg_events
(value_4_agg, (value_4_agg,
@ -1600,8 +1650,7 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4,
GROUP BY raw_events_second.value_1 GROUP BY raw_events_second.value_1
HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2 HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2
ON (f.id = f2.id); ON (f.id = f2.id);
ERROR: cannot perform distributed planning for the given modification ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator
DETAIL: Select query cannot be pushed down to the worker.
-- cannot pushdown the query since the JOIN is not equi JOIN -- cannot pushdown the query since the JOIN is not equi JOIN
INSERT INTO agg_events INSERT INTO agg_events
(user_id, value_4_agg) (user_id, value_4_agg)
@ -1630,14 +1679,59 @@ outer_most.id, max(outer_most.value)
HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2 HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2
ON (f.id != f2.id)) as outer_most ON (f.id != f2.id)) as outer_most
GROUP BY outer_most.id; GROUP BY outer_most.id;
DEBUG: predicate pruning for shardId 13300001 ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator
DEBUG: predicate pruning for shardId 13300002 -- some unsupported LATERAL JOINs
DEBUG: predicate pruning for shardId 13300003 INSERT INTO agg_events (user_id, value_4_agg)
DEBUG: predicate pruning for shardId 13300005 SELECT
DEBUG: predicate pruning for shardId 13300006 averages.user_id, avg(averages.value_4)
DEBUG: predicate pruning for shardId 13300007 FROM
ERROR: cannot perform distributed planning for the given modification (SELECT
DETAIL: Select query cannot be pushed down to the worker. raw_events_second.user_id
FROM
reference_table JOIN raw_events_second on (reference_table.user_id = raw_events_second.user_id)
) reference_ids
JOIN LATERAL
(SELECT
user_id, value_4
FROM
raw_events_first WHERE
value_4 = reference_ids.user_id) as averages ON true
GROUP BY averages.user_id;
ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator
INSERT INTO agg_events (user_id, value_4_agg)
SELECT
averages.user_id, avg(averages.value_4)
FROM
(SELECT
raw_events_second.user_id
FROM
reference_table JOIN raw_events_second on (reference_table.user_id = raw_events_second.user_id)
) reference_ids
JOIN LATERAL
(SELECT
user_id, value_4
FROM
raw_events_first) as averages ON averages.value_4 = reference_ids.user_id
GROUP BY averages.user_id;
ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator
INSERT INTO agg_events (user_id, value_4_agg)
SELECT
averages.user_id, avg(averages.value_4)
FROM
(SELECT
raw_events_second.user_id
FROM
reference_table JOIN raw_events_second on (reference_table.user_id = raw_events_second.user_id)
) reference_ids
JOIN LATERAL
(SELECT
user_id, value_4
FROM
raw_events_first) as averages ON averages.user_id = reference_ids.user_id
JOIN LATERAL
(SELECT user_id, value_4 FROM agg_events WHERE user_id = 15) as agg_ids ON (agg_ids.value_4 = averages.user_id)
GROUP BY averages.user_id;
ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator
-- cannot pushdown since subquery returns another column than partition key -- cannot pushdown since subquery returns another column than partition key
INSERT INTO raw_events_second INSERT INTO raw_events_second
(user_id) (user_id)
@ -1645,11 +1739,7 @@ SELECT user_id
FROM raw_events_first FROM raw_events_first
WHERE user_id IN (SELECT value_2 WHERE user_id IN (SELECT value_2
FROM raw_events_second); FROM raw_events_second);
DEBUG: predicate pruning for shardId 13300001 ERROR: cannot plan distributed query since all join conditions in the query need include two distribution keys using an equality operator
DEBUG: predicate pruning for shardId 13300002
DEBUG: predicate pruning for shardId 13300003
ERROR: cannot perform distributed planning for the given modification
DETAIL: Select query cannot be pushed down to the worker.
-- we currently not support grouping sets -- we currently not support grouping sets
INSERT INTO agg_events INSERT INTO agg_events
(user_id, (user_id,

View File

@ -326,6 +326,30 @@ WHERE user_id IN (SELECT user_id
FROM raw_events_second FROM raw_events_second
WHERE user_id = 2); WHERE user_id = 2);
INSERT INTO raw_events_second
(user_id)
SELECT user_id
FROM raw_events_first
WHERE user_id IN (SELECT user_id
FROM raw_events_second
WHERE user_id != 2 AND value_1 = 2000)
ON conflict (user_id, value_1) DO NOTHING;
INSERT INTO raw_events_second
(user_id)
SELECT user_id
FROM raw_events_first
WHERE user_id IN (SELECT user_id
FROM raw_events_second WHERE false);
INSERT INTO raw_events_second
(user_id)
SELECT user_id
FROM raw_events_first
WHERE user_id IN (SELECT user_id
FROM raw_events_second
WHERE value_1 = 1000 OR value_1 = 2000 OR value_1 = 3000);
-- some UPSERTS -- some UPSERTS
INSERT INTO agg_events AS ae INSERT INTO agg_events AS ae
( (
@ -551,11 +575,7 @@ FROM
raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.user_id raw_events_first INNER JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.user_id
WHERE raw_events_second.user_id IN (19, 20, 21); WHERE raw_events_second.user_id IN (19, 20, 21);
-- the following is a very tricky query for Citus -- not supported given that the join is not on the partition column
-- although we do not support pushing down JOINs on non-partition
-- columns here it is safe to push it down given that we're looking for
-- a specific value (i.e., value_1 = 12) on the joining column.
-- Note that the query always hits the same shard on raw_events_second
INSERT INTO agg_events INSERT INTO agg_events
(user_id) (user_id)
SELECT raw_events_first.user_id SELECT raw_events_first.user_id
@ -614,10 +634,6 @@ WHERE
raw_events_first.user_id = 10; raw_events_first.user_id = 10;
-- same as the above with INNER JOIN -- same as the above with INNER JOIN
-- however this time query is not pushed down
-- to the worker. This is related to how we process
-- restriction infos, which we're considering to
-- improve
INSERT INTO agg_events (user_id) INSERT INTO agg_events (user_id)
SELECT SELECT
raw_events_first.user_id raw_events_first.user_id
@ -859,6 +875,60 @@ outer_most.id, max(outer_most.value)
ON (f.id != f2.id)) as outer_most ON (f.id != f2.id)) as outer_most
GROUP BY outer_most.id; GROUP BY outer_most.id;
-- some unsupported LATERAL JOINs
INSERT INTO agg_events (user_id, value_4_agg)
SELECT
averages.user_id, avg(averages.value_4)
FROM
(SELECT
raw_events_second.user_id
FROM
reference_table JOIN raw_events_second on (reference_table.user_id = raw_events_second.user_id)
) reference_ids
JOIN LATERAL
(SELECT
user_id, value_4
FROM
raw_events_first WHERE
value_4 = reference_ids.user_id) as averages ON true
GROUP BY averages.user_id;
INSERT INTO agg_events (user_id, value_4_agg)
SELECT
averages.user_id, avg(averages.value_4)
FROM
(SELECT
raw_events_second.user_id
FROM
reference_table JOIN raw_events_second on (reference_table.user_id = raw_events_second.user_id)
) reference_ids
JOIN LATERAL
(SELECT
user_id, value_4
FROM
raw_events_first) as averages ON averages.value_4 = reference_ids.user_id
GROUP BY averages.user_id;
INSERT INTO agg_events (user_id, value_4_agg)
SELECT
averages.user_id, avg(averages.value_4)
FROM
(SELECT
raw_events_second.user_id
FROM
reference_table JOIN raw_events_second on (reference_table.user_id = raw_events_second.user_id)
) reference_ids
JOIN LATERAL
(SELECT
user_id, value_4
FROM
raw_events_first) as averages ON averages.user_id = reference_ids.user_id
JOIN LATERAL
(SELECT user_id, value_4 FROM agg_events WHERE user_id = 15) as agg_ids ON (agg_ids.value_4 = averages.user_id)
GROUP BY averages.user_id;
-- cannot pushdown since subquery returns another column than partition key -- cannot pushdown since subquery returns another column than partition key
INSERT INTO raw_events_second INSERT INTO raw_events_second
(user_id) (user_id)