mirror of https://github.com/citusdata/citus.git
push down left join, wip
parent
37e23f44b4
commit
31b319d84f
|
@ -520,6 +520,30 @@ IsCitusTableTypeCacheEntry(CitusTableCacheEntry *tableEntry, CitusTableType tabl
|
||||||
tableEntry->colocationId, tableType);
|
tableEntry->colocationId, tableType);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* IsFirstShard returns true if the given shardId is the first shard.
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
IsFirstShard(CitusTableCacheEntry *tableEntry, uint64 shardId)
|
||||||
|
{
|
||||||
|
if (tableEntry == NULL || tableEntry->sortedShardIntervalArray == NULL)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (tableEntry->sortedShardIntervalArray[0]->shardId == INVALID_SHARD_ID)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (shardId == tableEntry->sortedShardIntervalArray[0]->shardId)
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* HasDistributionKey returns true if given Citus table has a distribution key.
|
* HasDistributionKey returns true if given Citus table has a distribution key.
|
||||||
|
|
|
@ -316,6 +316,36 @@ BuildSelectStatementViaStdPlanner(Query *combineQuery, List *remoteScanTargetLis
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* FindCitusExtradataContainerRTE is a helper function that finds the
|
||||||
|
* citus_extradata_container in range table entry.
|
||||||
|
*
|
||||||
|
* The function returns true if it finds the RTE, and false otherwise.
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
ExtractCitusExtradataContainerRTE(RangeTblEntry *rangeTblEntry, RangeTblEntry **result)
|
||||||
|
{
|
||||||
|
if (rangeTblEntry->rtekind == RTE_FUNCTION &&
|
||||||
|
list_length(rangeTblEntry->functions) == 1)
|
||||||
|
{
|
||||||
|
RangeTblFunction *rangeTblFunction = (RangeTblFunction *) linitial(
|
||||||
|
rangeTblEntry->functions);
|
||||||
|
if (!IsA(rangeTblFunction->funcexpr, FuncExpr))
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
FuncExpr *funcExpr = castNode(FuncExpr, rangeTblFunction->funcexpr);
|
||||||
|
if (funcExpr->funcid == CitusExtraDataContainerFuncId())
|
||||||
|
{
|
||||||
|
*result = rangeTblEntry;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Finds the rangetable entry in the query that refers to the citus_extradata_container
|
* Finds the rangetable entry in the query that refers to the citus_extradata_container
|
||||||
* and stores the pointer in result.
|
* and stores the pointer in result.
|
||||||
|
@ -331,25 +361,7 @@ FindCitusExtradataContainerRTE(Node *node, RangeTblEntry **result)
|
||||||
if (IsA(node, RangeTblEntry))
|
if (IsA(node, RangeTblEntry))
|
||||||
{
|
{
|
||||||
RangeTblEntry *rangeTblEntry = castNode(RangeTblEntry, node);
|
RangeTblEntry *rangeTblEntry = castNode(RangeTblEntry, node);
|
||||||
if (rangeTblEntry->rtekind == RTE_FUNCTION &&
|
return ExtractCitusExtradataContainerRTE(rangeTblEntry, result);
|
||||||
list_length(rangeTblEntry->functions) == 1)
|
|
||||||
{
|
|
||||||
RangeTblFunction *rangeTblFunction = (RangeTblFunction *) linitial(
|
|
||||||
rangeTblEntry->functions);
|
|
||||||
if (!IsA(rangeTblFunction->funcexpr, FuncExpr))
|
|
||||||
{
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
FuncExpr *funcExpr = castNode(FuncExpr, rangeTblFunction->funcexpr);
|
|
||||||
if (funcExpr->funcid == CitusExtraDataContainerFuncId())
|
|
||||||
{
|
|
||||||
*result = rangeTblEntry;
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* query_tree_walker descends into RTEs */
|
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
else if (IsA(node, Query))
|
else if (IsA(node, Query))
|
||||||
{
|
{
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
#include "access/heapam.h"
|
#include "access/heapam.h"
|
||||||
#include "access/htup_details.h"
|
#include "access/htup_details.h"
|
||||||
#include "catalog/pg_constraint.h"
|
#include "catalog/pg_constraint.h"
|
||||||
|
#include "catalog/pg_operator.h"
|
||||||
#include "lib/stringinfo.h"
|
#include "lib/stringinfo.h"
|
||||||
#include "nodes/makefuncs.h"
|
#include "nodes/makefuncs.h"
|
||||||
#include "nodes/nodeFuncs.h"
|
#include "nodes/nodeFuncs.h"
|
||||||
|
@ -204,6 +205,299 @@ UpdateTaskQueryString(Query *query, Task *task)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ExtractRangeTableIds walks over the given node, and finds all range
|
||||||
|
* table entries.
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
ExtractRangeTableIds(Node *node, ExtractRangeTableIdsContext *context)
|
||||||
|
{
|
||||||
|
List **rangeTableList = context->result;
|
||||||
|
List *rtable = context->rtable;
|
||||||
|
|
||||||
|
if (node == NULL)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (IsA(node, RangeTblRef))
|
||||||
|
{
|
||||||
|
int rangeTableIndex = ((RangeTblRef *) node)->rtindex;
|
||||||
|
|
||||||
|
RangeTblEntry *rte = rt_fetch(rangeTableIndex, rtable);
|
||||||
|
if (rte->rtekind == RTE_SUBQUERY)
|
||||||
|
{
|
||||||
|
Query *subquery = rte->subquery;
|
||||||
|
context->rtable = subquery->rtable;
|
||||||
|
ExtractRangeTableIds((Node *) subquery, context);
|
||||||
|
context->rtable = rtable; /* restore original rtable */
|
||||||
|
}
|
||||||
|
else if (rte->rtekind == RTE_RELATION || rte->rtekind == RTE_FUNCTION)
|
||||||
|
{
|
||||||
|
(*rangeTableList) = lappend(*rangeTableList, rte);
|
||||||
|
ereport(DEBUG4, (errmsg("ExtractRangeTableIds: found range table id %d", rte->relid)));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ereport(DEBUG4, (errmsg("Unsupported RTE kind in ExtractRangeTableIds %d", rte->rtekind)));
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
else if (IsA(node, Query))
|
||||||
|
{
|
||||||
|
context->rtable = ((Query *) node)->rtable;
|
||||||
|
query_tree_walker((Query *) node, ExtractRangeTableIds, context, 0);
|
||||||
|
context->rtable = rtable; /* restore original rtable */
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
return expression_tree_walker(node, ExtractRangeTableIds, context);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Iterates through the FROM clause of the query and checks if there is a join
|
||||||
|
* clause with a reference and distributed table.
|
||||||
|
* If there is, it returns the index of the range table entry of the outer
|
||||||
|
* table in the join clause. It also sets the innerRte to point to the
|
||||||
|
* range table entry inner table. If there is no join clause with a distributed
|
||||||
|
* table, it returns -1.
|
||||||
|
*/
|
||||||
|
int
|
||||||
|
GetRepresentativeTablesFromJoinClause(List *fromlist, List *rtable, RangeTblEntry **innerRte)
|
||||||
|
{
|
||||||
|
ListCell *fromExprCell;
|
||||||
|
|
||||||
|
/* TODO: is this case even possible | fromlist | > 1, no test cases yet */
|
||||||
|
if(list_length(fromlist) > 1)
|
||||||
|
{
|
||||||
|
ereport(DEBUG5, (errmsg("GetRepresentativeTablesFromJoinClause: Fromlist length > 1")));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
foreach(fromExprCell, fromlist)
|
||||||
|
{
|
||||||
|
Node *fromElement = (Node *) lfirst(fromExprCell);
|
||||||
|
if (IsA(fromElement, JoinExpr))
|
||||||
|
{
|
||||||
|
JoinExpr *joinExpr = (JoinExpr *) fromElement;
|
||||||
|
if(!IS_OUTER_JOIN(joinExpr->jointype))
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// TODO: this path should not be active when the conditions are not met.
|
||||||
|
|
||||||
|
int outerRtIndex = ((RangeTblRef *)joinExpr->larg)->rtindex;
|
||||||
|
RangeTblEntry *outerRte = rt_fetch(outerRtIndex, rtable);
|
||||||
|
|
||||||
|
/* the outer table is a reference table */
|
||||||
|
if (outerRte->rtekind == RTE_FUNCTION)
|
||||||
|
{
|
||||||
|
RangeTblEntry *newRte = NULL;
|
||||||
|
if (!ExtractCitusExtradataContainerRTE(outerRte, &newRte))
|
||||||
|
{
|
||||||
|
/* RTE does not contain citus_extradata_container */
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (outerRte->rtekind == RTE_RELATION)
|
||||||
|
{
|
||||||
|
/* OK */
|
||||||
|
ereport(DEBUG5, (errmsg("\t\t outerRte: is RTE_RELATION")));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ereport(DEBUG5, (errmsg("\t\t not supported RTE kind %d", outerRte->rtekind)));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
ereport(DEBUG5, (errmsg("\t OK outerRte: %s", outerRte->eref->aliasname)));
|
||||||
|
|
||||||
|
int innerRelid = InvalidOid;
|
||||||
|
ExtractRangeTableIdsContext context;
|
||||||
|
List *innerRteList = NIL;
|
||||||
|
context.result = &innerRteList;
|
||||||
|
context.rtable = rtable;
|
||||||
|
/* TODO: what if we call this also for LHS? */
|
||||||
|
ExtractRangeTableIds((Node *)joinExpr->rarg, &context);
|
||||||
|
|
||||||
|
List *citusRelids = NIL;
|
||||||
|
RangeTblEntry *rte = NIL;
|
||||||
|
ListCell *lc;
|
||||||
|
|
||||||
|
foreach(lc, innerRteList)
|
||||||
|
{
|
||||||
|
rte = (RangeTblEntry *) lfirst(lc);
|
||||||
|
if (IsCitusTable(rte->relid))
|
||||||
|
{
|
||||||
|
citusRelids = lappend_int(citusRelids, rte->relid);
|
||||||
|
*innerRte = rte; // set the value of innerRte
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!AllDistributedRelationsInListColocated(citusRelids))
|
||||||
|
{
|
||||||
|
ereport(DEBUG5, (errmsg("The distributed tables are not colocated")));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return outerRtIndex;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* UpdateWhereClauseForOuterJoin walks over the query tree and appends quals
|
||||||
|
* to the WHERE clause to filter w.r.to the distribution column of the corresponding shard.
|
||||||
|
* TODO:
|
||||||
|
* - Not supported cases should not call this function.
|
||||||
|
* - Remove the excessive debug messages.
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
UpdateWhereClauseForOuterJoin(Node *node, List *relationShardList)
|
||||||
|
{
|
||||||
|
if (node == NULL)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!IsA(node, Query))
|
||||||
|
{
|
||||||
|
return expression_tree_walker(node, UpdateWhereClauseForOuterJoin, relationShardList);
|
||||||
|
}
|
||||||
|
|
||||||
|
Query *query = (Query *) node;
|
||||||
|
if (query->jointree == NULL)
|
||||||
|
{
|
||||||
|
return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
FromExpr *fromExpr = query->jointree;
|
||||||
|
if(fromExpr == NULL)
|
||||||
|
{
|
||||||
|
return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We need to find the outer table in the join clause to add the constraints w.r.to the shard
|
||||||
|
* intervals of the inner table.
|
||||||
|
* A representative inner table is sufficient as long as it is colocated with all other
|
||||||
|
* distributed tables in the join clause.
|
||||||
|
*/
|
||||||
|
RangeTblEntry *innerRte = NULL;
|
||||||
|
int outerRtIndex = GetRepresentativeTablesFromJoinClause(fromExpr->fromlist, query->rtable, &innerRte);
|
||||||
|
if (outerRtIndex < 0 || innerRte == NULL)
|
||||||
|
{
|
||||||
|
return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
ereport(DEBUG5, (errmsg("\t innerRte: %s", innerRte->eref->aliasname)));
|
||||||
|
|
||||||
|
RelationShard *relationShard = FindRelationShard(innerRte->relid, relationShardList);
|
||||||
|
uint64 shardId = relationShard->shardId;
|
||||||
|
Oid relationId = relationShard->relationId;
|
||||||
|
|
||||||
|
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
|
||||||
|
Var *partitionColumnVar = cacheEntry->partitionColumn;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* we will add constraints for the outer table, so we need to set the varno
|
||||||
|
* TODO: this only works when the outer table has the distribution column,
|
||||||
|
* we shoul not end up here if this is not the case.
|
||||||
|
*/
|
||||||
|
partitionColumnVar->varno = outerRtIndex;
|
||||||
|
bool isFirstShard = IsFirstShard(cacheEntry, shardId);
|
||||||
|
|
||||||
|
/* load the interval for the shard and create constant nodes for the upper/lower bounds */
|
||||||
|
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
||||||
|
Const *constNodeLowerBound = makeConst(INT4OID, -1, InvalidOid, sizeof(int32), shardInterval->minValue, false, true);
|
||||||
|
Const *constNodeUpperBound = makeConst(INT4OID, -1, InvalidOid, sizeof(int32), shardInterval->maxValue, false, true);
|
||||||
|
Const *constNodeZero = makeConst(INT4OID, -1, InvalidOid, sizeof(int32), Int32GetDatum(0), false, true);
|
||||||
|
|
||||||
|
// TOOD: the following is only for hash partitioned tables
|
||||||
|
/* create a function expression node for the hash partition column */
|
||||||
|
FuncExpr *hashFunction = makeNode(FuncExpr);
|
||||||
|
hashFunction->funcid = cacheEntry->hashFunction->fn_oid;
|
||||||
|
hashFunction->args = list_make1(partitionColumnVar);
|
||||||
|
hashFunction->funcresulttype = get_func_rettype(cacheEntry->hashFunction->fn_oid);
|
||||||
|
hashFunction->funcretset = false;
|
||||||
|
|
||||||
|
/* create a function expression for the lower bound of the shard interval */
|
||||||
|
Oid resultTypeOid = get_func_rettype(cacheEntry->shardIntervalCompareFunction->fn_oid);
|
||||||
|
FuncExpr* lowerBoundFuncExpr = makeNode(FuncExpr);
|
||||||
|
lowerBoundFuncExpr->funcid = cacheEntry->shardIntervalCompareFunction->fn_oid;
|
||||||
|
lowerBoundFuncExpr->args = list_make2((Node *) constNodeLowerBound, (Node *) hashFunction);
|
||||||
|
lowerBoundFuncExpr->funcresulttype = resultTypeOid;
|
||||||
|
lowerBoundFuncExpr->funcretset = false;
|
||||||
|
|
||||||
|
Oid lessThan = GetSysCacheOid(OPERNAMENSP, Anum_pg_operator_oid, CStringGetDatum("<"),
|
||||||
|
resultTypeOid, resultTypeOid, ObjectIdGetDatum(11));
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Finally, check if the comparison result is less than 0, i.e.,
|
||||||
|
* shardInterval->minValue < hash(partitionColumn)
|
||||||
|
* See SearchCachedShardInterval for the behavior at the boundaries.
|
||||||
|
*/
|
||||||
|
Expr *lowerBoundExpr = make_opclause(lessThan, BOOLOID, false, (Expr *) lowerBoundFuncExpr,
|
||||||
|
(Expr *) constNodeZero, InvalidOid, InvalidOid);
|
||||||
|
|
||||||
|
/* create a function expression for the upper bound of the shard interval */
|
||||||
|
FuncExpr* upperBoundFuncExpr = makeNode(FuncExpr);
|
||||||
|
upperBoundFuncExpr->funcid = cacheEntry->shardIntervalCompareFunction->fn_oid;
|
||||||
|
upperBoundFuncExpr->args = list_make2((Node *) hashFunction, (Expr *) constNodeUpperBound);
|
||||||
|
upperBoundFuncExpr->funcresulttype = resultTypeOid;
|
||||||
|
upperBoundFuncExpr->funcretset = false;
|
||||||
|
|
||||||
|
Oid lessThanOrEqualTo = GetSysCacheOid(OPERNAMENSP, Anum_pg_operator_oid, CStringGetDatum("<="),
|
||||||
|
resultTypeOid, resultTypeOid, ObjectIdGetDatum(11));
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Finally, check if the comparison result is less than or equal to 0, i.e.,
|
||||||
|
* hash(partitionColumn) <= shardInterval->maxValue
|
||||||
|
* See SearchCachedShardInterval for the behavior at the boundaries.
|
||||||
|
*/
|
||||||
|
Expr *upperBoundExpr = make_opclause(lessThanOrEqualTo, BOOLOID, false, (Expr *) upperBoundFuncExpr,
|
||||||
|
(Expr *) constNodeZero, InvalidOid, InvalidOid);
|
||||||
|
|
||||||
|
|
||||||
|
/* create a node for both upper and lower bound */
|
||||||
|
Node *shardIntervalBoundQuals = make_and_qual((Node *) lowerBoundExpr, (Node *) upperBoundExpr);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Add a null test for the partition column for the first shard.
|
||||||
|
* This is because we need to include the null values in exactly one of the shard queries.
|
||||||
|
* The null test is added as an OR clause to the existing AND clause.
|
||||||
|
*/
|
||||||
|
if (isFirstShard)
|
||||||
|
{
|
||||||
|
/* null test for the first shard */
|
||||||
|
NullTest *nullTest = makeNode(NullTest);
|
||||||
|
nullTest->nulltesttype = IS_NULL; /* Check for IS NULL */
|
||||||
|
nullTest->arg = (Expr *) partitionColumnVar; /* The variable to check */
|
||||||
|
nullTest->argisrow = false;
|
||||||
|
shardIntervalBoundQuals = (Node *) make_orclause(list_make2(nullTest, shardIntervalBoundQuals));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (fromExpr->quals == NULL)
|
||||||
|
{
|
||||||
|
fromExpr->quals = (Node *) shardIntervalBoundQuals;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
fromExpr->quals = make_and_qual(fromExpr->quals, shardIntervalBoundQuals);
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: verify this, do we need the recursive call for all nodes?
|
||||||
|
/* We need to continue the recursive walk for the nested join statements.*/
|
||||||
|
return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* UpdateRelationToShardNames walks over the query tree and appends shard ids to
|
* UpdateRelationToShardNames walks over the query tree and appends shard ids to
|
||||||
* relations. It uses unique identity value to establish connection between a
|
* relations. It uses unique identity value to establish connection between a
|
||||||
|
|
|
@ -174,6 +174,7 @@ static Task * QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
|
||||||
uint32 taskId,
|
uint32 taskId,
|
||||||
TaskType taskType,
|
TaskType taskType,
|
||||||
bool modifyRequiresCoordinatorEvaluation,
|
bool modifyRequiresCoordinatorEvaluation,
|
||||||
|
bool innerTableOfOuterJoin,
|
||||||
DeferredErrorMessage **planningError);
|
DeferredErrorMessage **planningError);
|
||||||
static List * SqlTaskList(Job *job);
|
static List * SqlTaskList(Job *job);
|
||||||
static bool DependsOnHashPartitionJob(Job *job);
|
static bool DependsOnHashPartitionJob(Job *job);
|
||||||
|
@ -2183,6 +2184,8 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
|
||||||
int minShardOffset = INT_MAX;
|
int minShardOffset = INT_MAX;
|
||||||
int prevShardCount = 0;
|
int prevShardCount = 0;
|
||||||
Bitmapset *taskRequiredForShardIndex = NULL;
|
Bitmapset *taskRequiredForShardIndex = NULL;
|
||||||
|
Bitmapset *innerTableOfOuterJoinSet = NULL;
|
||||||
|
bool innerTableOfOuterJoin = false;
|
||||||
|
|
||||||
/* error if shards are not co-partitioned */
|
/* error if shards are not co-partitioned */
|
||||||
ErrorIfUnsupportedShardDistribution(query);
|
ErrorIfUnsupportedShardDistribution(query);
|
||||||
|
@ -2220,20 +2223,17 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
prevShardCount = cacheEntry->shardIntervalArrayLength;
|
prevShardCount = cacheEntry->shardIntervalArrayLength;
|
||||||
|
innerTableOfOuterJoin = false;
|
||||||
/*
|
/*
|
||||||
* For left joins we don't care about the shards pruned for the right hand side.
|
* For left outer joins, we need to check if the table is in the inner
|
||||||
* If the right hand side would prune to a smaller set we should still send it to
|
* part of the join. If it is, we need to mark this shard and add interval
|
||||||
* all tables of the left hand side. However if the right hand side is bigger than
|
* constraints to the join.
|
||||||
* the left hand side we don't have to send the query to any shard that is not
|
|
||||||
* matching anything on the left hand side.
|
|
||||||
*
|
|
||||||
* Instead we will simply skip any RelationRestriction if it is an OUTER join and
|
|
||||||
* the table is part of the non-outer side of the join.
|
|
||||||
*/
|
*/
|
||||||
if (IsInnerTableOfOuterJoin(relationRestriction))
|
if (IsInnerTableOfOuterJoin(relationRestriction))
|
||||||
{
|
{
|
||||||
continue;
|
ereport(DEBUG1, errmsg("Inner Table of Outer Join %d",
|
||||||
|
relationRestriction->relationId));
|
||||||
|
innerTableOfOuterJoin = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
ShardInterval *shardInterval = NULL;
|
ShardInterval *shardInterval = NULL;
|
||||||
|
@ -2244,6 +2244,15 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
|
||||||
taskRequiredForShardIndex =
|
taskRequiredForShardIndex =
|
||||||
bms_add_member(taskRequiredForShardIndex, shardIndex);
|
bms_add_member(taskRequiredForShardIndex, shardIndex);
|
||||||
minShardOffset = Min(minShardOffset, shardIndex);
|
minShardOffset = Min(minShardOffset, shardIndex);
|
||||||
|
if (innerTableOfOuterJoin)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* We need to keep track of the inner table of outer join
|
||||||
|
* shards so that we can process them later.
|
||||||
|
*/
|
||||||
|
innerTableOfOuterJoinSet =
|
||||||
|
bms_add_member(innerTableOfOuterJoinSet, shardIndex);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2261,11 +2270,13 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
|
||||||
int shardOffset = minShardOffset - 1;
|
int shardOffset = minShardOffset - 1;
|
||||||
while ((shardOffset = bms_next_member(taskRequiredForShardIndex, shardOffset)) >= 0)
|
while ((shardOffset = bms_next_member(taskRequiredForShardIndex, shardOffset)) >= 0)
|
||||||
{
|
{
|
||||||
|
innerTableOfOuterJoin = bms_is_member(shardOffset, innerTableOfOuterJoinSet);
|
||||||
Task *subqueryTask = QueryPushdownTaskCreate(query, shardOffset,
|
Task *subqueryTask = QueryPushdownTaskCreate(query, shardOffset,
|
||||||
relationRestrictionContext,
|
relationRestrictionContext,
|
||||||
taskIdIndex,
|
taskIdIndex,
|
||||||
taskType,
|
taskType,
|
||||||
modifyRequiresCoordinatorEvaluation,
|
modifyRequiresCoordinatorEvaluation,
|
||||||
|
innerTableOfOuterJoin,
|
||||||
planningError);
|
planningError);
|
||||||
if (*planningError != NULL)
|
if (*planningError != NULL)
|
||||||
{
|
{
|
||||||
|
@ -2439,6 +2450,7 @@ static Task *
|
||||||
QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
|
QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
|
||||||
RelationRestrictionContext *restrictionContext, uint32 taskId,
|
RelationRestrictionContext *restrictionContext, uint32 taskId,
|
||||||
TaskType taskType, bool modifyRequiresCoordinatorEvaluation,
|
TaskType taskType, bool modifyRequiresCoordinatorEvaluation,
|
||||||
|
bool innerTableOfOuterJoin,
|
||||||
DeferredErrorMessage **planningError)
|
DeferredErrorMessage **planningError)
|
||||||
{
|
{
|
||||||
Query *taskQuery = copyObject(originalQuery);
|
Query *taskQuery = copyObject(originalQuery);
|
||||||
|
@ -2531,11 +2543,21 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
|
||||||
*/
|
*/
|
||||||
UpdateRelationToShardNames((Node *) taskQuery, relationShardList);
|
UpdateRelationToShardNames((Node *) taskQuery, relationShardList);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Augment the where clause with the shard intervals for inner table of outer
|
||||||
|
* joins.
|
||||||
|
*/
|
||||||
|
if (innerTableOfOuterJoin)
|
||||||
|
{
|
||||||
|
UpdateWhereClauseForOuterJoin((Node *) taskQuery, relationShardList);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Ands are made implicit during shard pruning, as predicate comparison and
|
* Ands are made implicit during shard pruning, as predicate comparison and
|
||||||
* refutation depend on it being so. We need to make them explicit again so
|
* refutation depend on it being so. We need to make them explicit again so
|
||||||
* that the query string is generated as (...) AND (...) as opposed to
|
* that the query string is generated as (...) AND (...) as opposed to
|
||||||
* (...), (...).
|
* (...), (...).
|
||||||
|
* TODO: do we need to run this before adding quals?
|
||||||
*/
|
*/
|
||||||
if (taskQuery->jointree->quals != NULL && IsA(taskQuery->jointree->quals, List))
|
if (taskQuery->jointree->quals != NULL && IsA(taskQuery->jointree->quals, List))
|
||||||
{
|
{
|
||||||
|
|
|
@ -834,6 +834,11 @@ DeferredErrorIfUnsupportedRecurringTuplesJoin(
|
||||||
* contains recurring rels, must be an unsupported lateral outer
|
* contains recurring rels, must be an unsupported lateral outer
|
||||||
* join.
|
* join.
|
||||||
*/
|
*/
|
||||||
|
/*
|
||||||
|
* For now only stop returning an error here.
|
||||||
|
* TODO: later add all required checks to push down the query here
|
||||||
|
*/
|
||||||
|
continue;
|
||||||
recurType = FetchFirstRecurType(plannerInfo, outerrelRelids);
|
recurType = FetchFirstRecurType(plannerInfo, outerrelRelids);
|
||||||
|
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -708,7 +708,12 @@ RecursivelyPlanRecurringTupleOuterJoinWalker(Node *node, Query *query,
|
||||||
case JOIN_LEFT:
|
case JOIN_LEFT:
|
||||||
{
|
{
|
||||||
/* <recurring> left join <distributed> */
|
/* <recurring> left join <distributed> */
|
||||||
if (leftNodeRecurs && !rightNodeRecurs)
|
|
||||||
|
/* TODO: For now, just disable the recursive planning here.
|
||||||
|
* However, we should add further checks, i.e., left node is a subquery
|
||||||
|
* that can not be pushed down with additional constrains.
|
||||||
|
*/
|
||||||
|
if (leftNodeRecurs && !rightNodeRecurs && false)
|
||||||
{
|
{
|
||||||
ereport(DEBUG1, (errmsg("recursively planning right side of "
|
ereport(DEBUG1, (errmsg("recursively planning right side of "
|
||||||
"the left join since the outer side "
|
"the left join since the outer side "
|
||||||
|
|
|
@ -26,6 +26,7 @@ extern Path * CreateCitusCustomScanPath(PlannerInfo *root, RelOptInfo *relOptInf
|
||||||
CustomScan *remoteScan);
|
CustomScan *remoteScan);
|
||||||
extern PlannedStmt * PlanCombineQuery(struct DistributedPlan *distributedPlan,
|
extern PlannedStmt * PlanCombineQuery(struct DistributedPlan *distributedPlan,
|
||||||
struct CustomScan *dataScan);
|
struct CustomScan *dataScan);
|
||||||
|
extern bool ExtractCitusExtradataContainerRTE(RangeTblEntry *rangeTblEntry, RangeTblEntry **result);
|
||||||
extern bool FindCitusExtradataContainerRTE(Node *node, RangeTblEntry **result);
|
extern bool FindCitusExtradataContainerRTE(Node *node, RangeTblEntry **result);
|
||||||
extern bool ReplaceCitusExtraDataContainer;
|
extern bool ReplaceCitusExtraDataContainer;
|
||||||
extern CustomScan *ReplaceCitusExtraDataContainerWithCustomScan;
|
extern CustomScan *ReplaceCitusExtraDataContainerWithCustomScan;
|
||||||
|
|
|
@ -20,10 +20,21 @@
|
||||||
#include "nodes/pg_list.h"
|
#include "nodes/pg_list.h"
|
||||||
|
|
||||||
#include "distributed/citus_custom_scan.h"
|
#include "distributed/citus_custom_scan.h"
|
||||||
|
#include "distributed/query_utils.h"
|
||||||
|
|
||||||
|
|
||||||
|
/* Struct to pass rtable list and the result list to walker */
|
||||||
|
typedef struct ExtractRangeTableIdsContext
|
||||||
|
{
|
||||||
|
List **result;
|
||||||
|
List *rtable;
|
||||||
|
} ExtractRangeTableIdsContext;
|
||||||
|
|
||||||
|
int GetRepresentativeTablesFromJoinClause(List *fromlist, List *rtable, RangeTblEntry **innerRte);
|
||||||
|
bool ExtractRangeTableIds(Node *node, ExtractRangeTableIdsContext *context);
|
||||||
extern void RebuildQueryStrings(Job *workerJob);
|
extern void RebuildQueryStrings(Job *workerJob);
|
||||||
extern bool UpdateRelationToShardNames(Node *node, List *relationShardList);
|
extern bool UpdateRelationToShardNames(Node *node, List *relationShardList);
|
||||||
|
extern bool UpdateWhereClauseForOuterJoin(Node *node, List *relationShardList);
|
||||||
extern void SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query);
|
extern void SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query);
|
||||||
extern void SetTaskQueryString(Task *task, char *queryString);
|
extern void SetTaskQueryString(Task *task, char *queryString);
|
||||||
extern void SetTaskQueryStringList(Task *task, List *queryStringList);
|
extern void SetTaskQueryStringList(Task *task, List *queryStringList);
|
||||||
|
|
|
@ -146,6 +146,7 @@ extern bool IsCitusTableType(Oid relationId, CitusTableType tableType);
|
||||||
extern CitusTableType GetCitusTableType(CitusTableCacheEntry *tableEntry);
|
extern CitusTableType GetCitusTableType(CitusTableCacheEntry *tableEntry);
|
||||||
extern bool IsCitusTableTypeCacheEntry(CitusTableCacheEntry *tableEtnry,
|
extern bool IsCitusTableTypeCacheEntry(CitusTableCacheEntry *tableEtnry,
|
||||||
CitusTableType tableType);
|
CitusTableType tableType);
|
||||||
|
extern bool IsFirstShard(CitusTableCacheEntry *tableEntry, uint64 shardId);
|
||||||
bool HasDistributionKey(Oid relationId);
|
bool HasDistributionKey(Oid relationId);
|
||||||
bool HasDistributionKeyCacheEntry(CitusTableCacheEntry *tableEntry);
|
bool HasDistributionKeyCacheEntry(CitusTableCacheEntry *tableEntry);
|
||||||
extern char * GetTableTypeName(Oid tableId);
|
extern char * GetTableTypeName(Oid tableId);
|
||||||
|
|
Loading…
Reference in New Issue