mirror of https://github.com/citusdata/citus.git
Check if the outer table has distribution column, wip, still need to check the using clause or join constraint
parent
a375543ca8
commit
851386f94a
|
@ -291,6 +291,7 @@ UpdateWhereClauseForOuterJoin(Node *node, List *relationShardList)
|
|||
* distributed tables in the join clause.
|
||||
*/
|
||||
RangeTblEntry *innerRte = NULL;
|
||||
RangeTblEntry *outerRte = NULL;
|
||||
int outerRtIndex = -1;
|
||||
bool result = ExtractIndexesForConstaints(fromExpr->fromlist, query->rtable, &outerRtIndex, &innerRte);
|
||||
if (!result)
|
||||
|
@ -310,11 +311,25 @@ UpdateWhereClauseForOuterJoin(Node *node, List *relationShardList)
|
|||
Var *partitionColumnVar = cacheEntry->partitionColumn;
|
||||
|
||||
/*
|
||||
* we will add constraints for the outer table, so we need to set the varno
|
||||
* TODO: this only works when the outer table has the distribution column,
|
||||
* we shoul not end up here if this is not the case.
|
||||
* we will add constraints for the outer table, we need to find the column in the outer
|
||||
* table that is comparable to the partition column of the inner table.
|
||||
* If the column does not exist, we return without modifying the query.
|
||||
* If the column exists, we create a Var node for the outer table's partition column.
|
||||
*/
|
||||
partitionColumnVar->varno = outerRtIndex;
|
||||
outerRte = rt_fetch(outerRtIndex, query->rtable);
|
||||
AttrNumber attnum = GetAttrNumForMatchingColumn(outerRte, relationId, partitionColumnVar);
|
||||
// TODO: we also have to check that the tables are joined on the partition column.
|
||||
if( attnum == InvalidAttrNumber)
|
||||
{
|
||||
return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList, 0);
|
||||
}
|
||||
|
||||
Var* outerTablePartitionColumnVar = makeVar(
|
||||
outerRtIndex, attnum, partitionColumnVar->vartype,
|
||||
partitionColumnVar->vartypmod,
|
||||
partitionColumnVar->varcollid,
|
||||
0);
|
||||
|
||||
bool isFirstShard = IsFirstShard(cacheEntry, shardId);
|
||||
|
||||
/* load the interval for the shard and create constant nodes for the upper/lower bounds */
|
||||
|
@ -327,7 +342,7 @@ UpdateWhereClauseForOuterJoin(Node *node, List *relationShardList)
|
|||
/* create a function expression node for the hash partition column */
|
||||
FuncExpr *hashFunction = makeNode(FuncExpr);
|
||||
hashFunction->funcid = cacheEntry->hashFunction->fn_oid;
|
||||
hashFunction->args = list_make1(partitionColumnVar);
|
||||
hashFunction->args = list_make1(outerTablePartitionColumnVar);
|
||||
hashFunction->funcresulttype = get_func_rettype(cacheEntry->hashFunction->fn_oid);
|
||||
hashFunction->funcretset = false;
|
||||
|
||||
|
|
|
@ -756,6 +756,7 @@ RecursivelyPlanRecurringTupleOuterJoinWalker(Node *node, Query *query,
|
|||
int outerRtIndex = ((RangeTblRef *) leftNode)->rtindex;
|
||||
RangeTblEntry *rte = rt_fetch(outerRtIndex, query->rtable);
|
||||
RangeTblEntry *innerRte = NULL;
|
||||
bool planned = false;
|
||||
if (!IsPushdownSafeForRTEInLeftJoin(rte))
|
||||
{
|
||||
ereport(DEBUG1, (errmsg("recursively planning right side of "
|
||||
|
@ -763,6 +764,7 @@ RecursivelyPlanRecurringTupleOuterJoinWalker(Node *node, Query *query,
|
|||
"is a recurring rel that is not an RTE")));
|
||||
RecursivelyPlanDistributedJoinNode(rightNode, query,
|
||||
recursivePlanningContext);
|
||||
planned = true;
|
||||
}
|
||||
else if (!CheckIfAllCitusRTEsAreColocated(rightNode, query->rtable, &innerRte))
|
||||
{
|
||||
|
@ -770,8 +772,27 @@ RecursivelyPlanRecurringTupleOuterJoinWalker(Node *node, Query *query,
|
|||
"since tables in the inner side of the left "
|
||||
"join are not colocated")));
|
||||
RecursivelyPlanDistributedJoinNode(rightNode, query,
|
||||
recursivePlanningContext);
|
||||
recursivePlanningContext);
|
||||
planned = true;
|
||||
}
|
||||
|
||||
if(!planned)
|
||||
{
|
||||
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(innerRte->relid)
|
||||
if(GetAttrNumForMatchingColumn(rte, innerRte->relid, cacheEntry->partitionColumn) == InvalidAttrNumber)
|
||||
{
|
||||
ereport(DEBUG1, (errmsg("recursively planning right side of the left join "
|
||||
"since the outer side does not have the distribution column")));
|
||||
RecursivelyPlanDistributedJoinNode(rightNode, query, recursivePlanningContext);
|
||||
}
|
||||
else
|
||||
{
|
||||
ereport(DEBUG1, (errmsg("not recursively planning right side of the left join "
|
||||
"since the outer side is a RangeTblRef and "
|
||||
"the inner side is colocated with it")));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
/*
|
||||
* rightNodeRecurs if there is a recurring table in the right side. However, if the right side
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
#include "distributed/query_utils.h"
|
||||
#include "distributed/relation_restriction_equivalence.h"
|
||||
#include "distributed/version_compat.h"
|
||||
#include "utils/lsyscache.h"
|
||||
|
||||
|
||||
static bool CitusQueryableRangeTableRelation(RangeTblEntry *rangeTableEntry);
|
||||
|
@ -239,7 +240,8 @@ ExtractRangeTableIds(Node *node, ExtractRangeTableIdsContext *context)
|
|||
* given node are colocated. If they are, it sets the value of rte to a
|
||||
* representative table.
|
||||
*/
|
||||
bool CheckIfAllCitusRTEsAreColocated(Node *node, List *rtable, RangeTblEntry **rte)
|
||||
bool
|
||||
CheckIfAllCitusRTEsAreColocated(Node *node, List *rtable, RangeTblEntry **rte)
|
||||
{
|
||||
ExtractRangeTableIdsContext context;
|
||||
List *idList = NIL;
|
||||
|
@ -268,5 +270,29 @@ bool CheckIfAllCitusRTEsAreColocated(Node *node, List *rtable, RangeTblEntry **r
|
|||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
||||
/*
|
||||
* GetAttrNumForMatchingColumn returns the attribute number for the column
|
||||
* in the target relation that matches the given Var. If the column does not
|
||||
* exist or is not comparable, it returns InvalidAttrNumber.
|
||||
*/
|
||||
AttrNumber
|
||||
GetAttrNumForMatchingColumn(RangeTblEntry *rteTarget, Oid relid, Var *var)
|
||||
{
|
||||
char *targetColumnName = get_attname(relid, var->varattno, false);
|
||||
AttrNumber attnum = get_attnum(rteTarget->relid, targetColumnName);
|
||||
if (attnum == InvalidAttrNumber)
|
||||
{
|
||||
ereport(DEBUG5, (errmsg("Column %s does not exist in relation %s",
|
||||
targetColumnName, rteTarget->eref->aliasname)));
|
||||
return InvalidAttrNumber;
|
||||
}
|
||||
if(var->vartype != get_atttype(rteTarget->relid, attnum))
|
||||
{
|
||||
ereport(DEBUG5, (errmsg("Column %s is not comparable for tables with relids %d and %d",
|
||||
targetColumnName, rteTarget->relid, relid)));
|
||||
return InvalidAttrNumber;
|
||||
}
|
||||
return attnum;
|
||||
}
|
||||
|
|
|
@ -47,4 +47,5 @@ extern bool ExtractRangeTableEntryWalker(Node *node, List **rangeTableList);
|
|||
extern bool ExtractRangeTableIndexWalker(Node *node, List **rangeTableIndexList);
|
||||
extern bool ExtractRangeTableIds(Node *node, ExtractRangeTableIdsContext *context);
|
||||
extern bool CheckIfAllCitusRTEsAreColocated(Node *node, List *rtable, RangeTblEntry **rte);
|
||||
AttrNumber GetAttrNumForMatchingColumn(RangeTblEntry *rteTarget, Oid relid, Var *var);
|
||||
#endif /* QUERY_UTILS_H */
|
||||
|
|
Loading…
Reference in New Issue