mirror of https://github.com/citusdata/citus.git
Enable citus-local distributed table joins
Check equality in quals We want to recursively plan distributed tables only if they have an equality filter on a unique column. So '>' and '<' operators will not trigger recursive planning of distributed tables in local-distributed table joins. Recursively plan distributed table only if the filter is constant If the filter is not a constant then the join might return multiple rows and there is a chance that the distributed table will return huge data. Hence if the filter is not constant we choose to recursively plan the local table.pull/4358/head
parent
f3d55448b3
commit
44953579cf
|
@ -54,20 +54,34 @@
|
|||
#include "utils/guc.h"
|
||||
#include "utils/lsyscache.h"
|
||||
|
||||
static bool ShouldConvertLocalTableJoinsToSubqueries(List* rangeTableList);
|
||||
typedef struct RTEToSubqueryConverterReference {
|
||||
RangeTblEntry* rangeTableEntry;
|
||||
Index rteIndex;
|
||||
List* restrictionList;
|
||||
List* requiredAttributeNumbers;
|
||||
} RTEToSubqueryConverterReference;
|
||||
|
||||
typedef struct RTEToSubqueryConverterContext{
|
||||
List* distributedTableList; /* reference or distributed table */
|
||||
List* localTableList;
|
||||
List* citusLocalTableList;
|
||||
bool hasSubqueryRTE;
|
||||
}RTEToSubqueryConverterContext;
|
||||
|
||||
static bool ShouldConvertLocalTableJoinsToSubqueries(List* rangeTableList, Oid resultRelationId);
|
||||
static bool HasUniqueFilter(RangeTblEntry* distRTE, List* distRTERestrictionList, List* requiredAttrNumbersForDistRTE);
|
||||
static void AutoConvertLocalTableJoinToSubquery(RangeTblEntry* localRTE, RangeTblEntry* distRTE,
|
||||
List* localRTERestrictionList, List* distRTERestrictionList,
|
||||
List *requiredAttrNumbersForLocalRTE, List *requiredAttrNumbersForDistRTE);
|
||||
static bool AutoConvertLocalTableJoinToSubquery(FromExpr* joinTree,
|
||||
RTEToSubqueryConverterReference* distRTEContext);
|
||||
static List * RequiredAttrNumbersForRelation(RangeTblEntry *relationRte,
|
||||
RecursivePlanningContext *planningContext);
|
||||
static RangeTblEntry * FindNextRTECandidate(PlannerRestrictionContext *
|
||||
plannerRestrictionContext,
|
||||
List *rangeTableList, List **restrictionList,
|
||||
bool localTable);
|
||||
static bool AllDataLocallyAccessible(List *rangeTableList);
|
||||
static RTEToSubqueryConverterContext * CreateRTEToSubqueryConverterContext(RecursivePlanningContext *context,
|
||||
List *rangeTableList);
|
||||
static void GetAllUniqueIndexes(Form_pg_index indexForm, List** uniqueIndexes);
|
||||
|
||||
static RTEToSubqueryConverterReference*
|
||||
GetNextRTEToConvertToSubquery(FromExpr* joinTree, RTEToSubqueryConverterContext* rteToSubqueryConverterContext,
|
||||
PlannerRestrictionContext *plannerRestrictionContext, RangeTblEntry* resultRelation);
|
||||
static void PopFromRTEToSubqueryConverterContext(RTEToSubqueryConverterContext* rteToSubqueryConverterContext,
|
||||
bool isCitusLocalTable);
|
||||
/*
|
||||
* ConvertLocalTableJoinsToSubqueries gets a query and the planner
|
||||
* restrictions. As long as there is a join between a local table
|
||||
|
@ -83,70 +97,109 @@ ConvertLocalTableJoinsToSubqueries(Query *query,
|
|||
RecursivePlanningContext *context)
|
||||
{
|
||||
List *rangeTableList = query->rtable;
|
||||
if(!ShouldConvertLocalTableJoinsToSubqueries(rangeTableList)) {
|
||||
RangeTblEntry *resultRelation = ExtractResultRelationRTE(query);
|
||||
Oid resultRelationId = InvalidOid;
|
||||
if (resultRelation) {
|
||||
resultRelationId = resultRelation->relid;
|
||||
}
|
||||
if (!ShouldConvertLocalTableJoinsToSubqueries(rangeTableList, resultRelationId)) {
|
||||
return;
|
||||
}
|
||||
RTEToSubqueryConverterContext* rteToSubqueryConverterContext = CreateRTEToSubqueryConverterContext(
|
||||
context, rangeTableList);
|
||||
|
||||
RangeTblEntry *resultRelation = ExtractResultRelationRTE(query);
|
||||
|
||||
while (ContainsLocalTableDistributedTableJoin(rangeTableList))
|
||||
RTEToSubqueryConverterReference* rteToSubqueryConverterReference =
|
||||
GetNextRTEToConvertToSubquery(query->jointree, rteToSubqueryConverterContext,
|
||||
context->plannerRestrictionContext, resultRelation);
|
||||
while (rteToSubqueryConverterReference)
|
||||
{
|
||||
List *localTableRestrictList = NIL;
|
||||
List *distributedTableRestrictList = NIL;
|
||||
ReplaceRTERelationWithRteSubquery(rteToSubqueryConverterReference->rangeTableEntry,
|
||||
rteToSubqueryConverterReference->restrictionList,
|
||||
rteToSubqueryConverterReference->requiredAttributeNumbers);
|
||||
rteToSubqueryConverterReference =
|
||||
GetNextRTEToConvertToSubquery(query->jointree, rteToSubqueryConverterContext,
|
||||
context->plannerRestrictionContext, resultRelation);
|
||||
}
|
||||
}
|
||||
|
||||
bool localTable = true;
|
||||
static RTEToSubqueryConverterReference*
|
||||
GetNextRTEToConvertToSubquery(FromExpr* joinTree, RTEToSubqueryConverterContext* rteToSubqueryConverterContext,
|
||||
PlannerRestrictionContext *plannerRestrictionContext, RangeTblEntry* resultRelation) {
|
||||
|
||||
PlannerRestrictionContext *plannerRestrictionContext =
|
||||
context->plannerRestrictionContext;
|
||||
RangeTblEntry *localRTECandidate =
|
||||
FindNextRTECandidate(plannerRestrictionContext, rangeTableList,
|
||||
&localTableRestrictList, localTable);
|
||||
RangeTblEntry *distributedRTECandidate =
|
||||
FindNextRTECandidate(plannerRestrictionContext, rangeTableList,
|
||||
&distributedTableRestrictList, !localTable);
|
||||
RTEToSubqueryConverterReference* localRTECandidate = NULL;
|
||||
RTEToSubqueryConverterReference* nonLocalRTECandidate = NULL;
|
||||
bool citusLocalTableChosen = false;
|
||||
|
||||
List *requiredAttrNumbersForLocalRte =
|
||||
RequiredAttrNumbersForRelation(localRTECandidate, context);
|
||||
List *requiredAttrNumbersForDistributedRte =
|
||||
RequiredAttrNumbersForRelation(distributedRTECandidate, context);
|
||||
if (list_length(rteToSubqueryConverterContext->localTableList) > 0) {
|
||||
localRTECandidate = linitial(rteToSubqueryConverterContext->localTableList);
|
||||
}else if (list_length(rteToSubqueryConverterContext->citusLocalTableList) > 0) {
|
||||
localRTECandidate = linitial(rteToSubqueryConverterContext->citusLocalTableList);
|
||||
citusLocalTableChosen = true;
|
||||
}
|
||||
if (localRTECandidate == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (resultRelation) {
|
||||
if (list_length(rteToSubqueryConverterContext->distributedTableList) > 0) {
|
||||
nonLocalRTECandidate = linitial(rteToSubqueryConverterContext->distributedTableList);
|
||||
}
|
||||
if (nonLocalRTECandidate == NULL && !rteToSubqueryConverterContext->hasSubqueryRTE) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (resultRelation->relid == localRTECandidate->relid) {
|
||||
ReplaceRTERelationWithRteSubquery(distributedRTECandidate,
|
||||
distributedTableRestrictList,
|
||||
requiredAttrNumbersForDistributedRte);
|
||||
continue;
|
||||
}else if (resultRelation->relid == distributedRTECandidate->relid) {
|
||||
ReplaceRTERelationWithRteSubquery(localRTECandidate,
|
||||
localTableRestrictList,
|
||||
requiredAttrNumbersForLocalRte);
|
||||
continue;
|
||||
}
|
||||
if (resultRelation) {
|
||||
if(resultRelation == localRTECandidate->rangeTableEntry) {
|
||||
rteToSubqueryConverterContext->distributedTableList = list_delete_first(
|
||||
rteToSubqueryConverterContext->distributedTableList
|
||||
);
|
||||
return nonLocalRTECandidate;
|
||||
}
|
||||
if (resultRelation == nonLocalRTECandidate->rangeTableEntry) {
|
||||
PopFromRTEToSubqueryConverterContext(rteToSubqueryConverterContext,citusLocalTableChosen);
|
||||
return localRTECandidate;
|
||||
}
|
||||
}
|
||||
|
||||
if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_LOCAL) {
|
||||
PopFromRTEToSubqueryConverterContext(rteToSubqueryConverterContext,citusLocalTableChosen);
|
||||
return localRTECandidate;
|
||||
}else if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_DISTRIBUTED) {
|
||||
if (nonLocalRTECandidate) {
|
||||
rteToSubqueryConverterContext->distributedTableList = list_delete_first(
|
||||
rteToSubqueryConverterContext->distributedTableList
|
||||
);
|
||||
return nonLocalRTECandidate;
|
||||
}else {
|
||||
PopFromRTEToSubqueryConverterContext(rteToSubqueryConverterContext,citusLocalTableChosen);
|
||||
return localRTECandidate;
|
||||
}
|
||||
|
||||
if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_LOCAL)
|
||||
{
|
||||
ReplaceRTERelationWithRteSubquery(localRTECandidate,
|
||||
localTableRestrictList,
|
||||
requiredAttrNumbersForLocalRte);
|
||||
}
|
||||
else if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_DISTRIBUTED)
|
||||
{
|
||||
ReplaceRTERelationWithRteSubquery(distributedRTECandidate,
|
||||
distributedTableRestrictList,
|
||||
requiredAttrNumbersForDistributedRte);
|
||||
}
|
||||
else if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_AUTO)
|
||||
{
|
||||
AutoConvertLocalTableJoinToSubquery(localRTECandidate, distributedRTECandidate,
|
||||
localTableRestrictList, distributedTableRestrictList,
|
||||
requiredAttrNumbersForLocalRte, requiredAttrNumbersForDistributedRte);
|
||||
}
|
||||
else
|
||||
{
|
||||
elog(ERROR, "unexpected local table join policy: %d", LocalTableJoinPolicy);
|
||||
}else if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_AUTO) {
|
||||
bool shouldConvertNonLocalTable = AutoConvertLocalTableJoinToSubquery(joinTree, nonLocalRTECandidate);
|
||||
if (shouldConvertNonLocalTable) {
|
||||
rteToSubqueryConverterContext->distributedTableList = list_delete_first(
|
||||
rteToSubqueryConverterContext->distributedTableList
|
||||
);
|
||||
return nonLocalRTECandidate;
|
||||
}else {
|
||||
PopFromRTEToSubqueryConverterContext(rteToSubqueryConverterContext,citusLocalTableChosen);
|
||||
return localRTECandidate;
|
||||
}
|
||||
}else {
|
||||
elog(ERROR, "unexpected local table join policy: %d", LocalTableJoinPolicy);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void PopFromRTEToSubqueryConverterContext(RTEToSubqueryConverterContext* rteToSubqueryConverterContext,
|
||||
bool isCitusLocalTable) {
|
||||
if (isCitusLocalTable) {
|
||||
rteToSubqueryConverterContext->citusLocalTableList =
|
||||
list_delete_first(rteToSubqueryConverterContext->citusLocalTableList);
|
||||
}else {
|
||||
rteToSubqueryConverterContext->localTableList =
|
||||
list_delete_first(rteToSubqueryConverterContext->localTableList);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -154,46 +207,42 @@ ConvertLocalTableJoinsToSubqueries(Query *query,
|
|||
* ShouldConvertLocalTableJoinsToSubqueries returns true if we should
|
||||
* convert local-dist table joins to subqueries.
|
||||
*/
|
||||
static bool ShouldConvertLocalTableJoinsToSubqueries(List* rangeTableList) {
|
||||
static bool ShouldConvertLocalTableJoinsToSubqueries(List* rangeTableList, Oid resultRelationId) {
|
||||
if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_NEVER)
|
||||
{
|
||||
/* user doesn't want Citus to enable local table joins */
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!ContainsLocalTableDistributedTableJoin(rangeTableList))
|
||||
{
|
||||
/* nothing to do as there are no relevant joins */
|
||||
return false;
|
||||
}
|
||||
|
||||
if (AllDataLocallyAccessible(rangeTableList))
|
||||
{
|
||||
/* recursively planning is overkill, router planner can already handle this */
|
||||
if (!ContainsTableToBeConvertedToSubquery(rangeTableList, resultRelationId)) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
static void AutoConvertLocalTableJoinToSubquery(RangeTblEntry* localRTE, RangeTblEntry* distRTE,
|
||||
List* localRTERestrictionList, List* distRTERestrictionList,
|
||||
List *requiredAttrNumbersForLocalRTE, List *requiredAttrNumbersForDistRTE) {
|
||||
|
||||
bool hasUniqueFilter = HasUniqueFilter(distRTE, distRTERestrictionList, requiredAttrNumbersForDistRTE);
|
||||
if (hasUniqueFilter) {
|
||||
ReplaceRTERelationWithRteSubquery(distRTE,
|
||||
distRTERestrictionList,
|
||||
requiredAttrNumbersForDistRTE);
|
||||
}else {
|
||||
ReplaceRTERelationWithRteSubquery(localRTE,
|
||||
localRTERestrictionList,
|
||||
requiredAttrNumbersForLocalRTE);
|
||||
static bool AutoConvertLocalTableJoinToSubquery(FromExpr* joinTree,
|
||||
RTEToSubqueryConverterReference* rteToSubqueryConverterReference) {
|
||||
if (rteToSubqueryConverterReference == NULL) {
|
||||
return false;
|
||||
}
|
||||
List* distRTEEqualityQuals =
|
||||
FetchAttributeNumsForRTEFromQuals(joinTree->quals, rteToSubqueryConverterReference->rteIndex);
|
||||
|
||||
Node* join = NULL;
|
||||
foreach_ptr(join, joinTree->fromlist) {
|
||||
if (IsA(join, JoinExpr)) {
|
||||
JoinExpr* joinExpr = (JoinExpr*) join;
|
||||
distRTEEqualityQuals = list_concat(distRTEEqualityQuals,
|
||||
FetchAttributeNumsForRTEFromQuals(joinExpr->quals, rteToSubqueryConverterReference->rteIndex)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
bool hasUniqueFilter = HasUniqueFilter(rteToSubqueryConverterReference->rangeTableEntry,
|
||||
rteToSubqueryConverterReference->restrictionList, distRTEEqualityQuals);
|
||||
return hasUniqueFilter;
|
||||
|
||||
}
|
||||
|
||||
// TODO:: This function should only consider equality,
|
||||
// currently it will return true for dist.a > 5. We should check this from join->quals.
|
||||
static bool HasUniqueFilter(RangeTblEntry* distRTE, List* distRTERestrictionList, List* requiredAttrNumbersForDistRTE) {
|
||||
List* uniqueIndexes = ExecuteFunctionOnEachTableIndex(distRTE->relid, GetAllUniqueIndexes);
|
||||
int columnNumber = 0;
|
||||
|
@ -237,6 +286,10 @@ RequiredAttrNumbersForRelation(RangeTblEntry *relationRte,
|
|||
filteredPlannerRestrictionContext->relationRestrictionContext;
|
||||
List *filteredRelationRestrictionList =
|
||||
relationRestrictionContext->relationRestrictionList;
|
||||
|
||||
if (list_length(filteredRelationRestrictionList) == 0) {
|
||||
return NIL;
|
||||
}
|
||||
RelationRestriction *relationRestriction =
|
||||
(RelationRestriction *) linitial(filteredRelationRestrictionList);
|
||||
|
||||
|
@ -265,23 +318,27 @@ RequiredAttrNumbersForRelation(RangeTblEntry *relationRte,
|
|||
|
||||
|
||||
/*
|
||||
* FindNextRTECandidate returns a range table entry which has the most filters
|
||||
* CreateRTEToSubqueryConverterContext returns a range table entry which has the most filters
|
||||
* on it along with the restrictions (e.g., fills **restrictionList).
|
||||
*
|
||||
* The function also gets a boolean localTable parameter, so the caller
|
||||
* can choose to run the function for only local tables or distributed tables.
|
||||
*/
|
||||
static RangeTblEntry *
|
||||
FindNextRTECandidate(PlannerRestrictionContext *plannerRestrictionContext,
|
||||
List *rangeTableList, List **restrictionList,
|
||||
bool localTable)
|
||||
static RTEToSubqueryConverterContext*
|
||||
CreateRTEToSubqueryConverterContext(RecursivePlanningContext *context,
|
||||
List *rangeTableList)
|
||||
{
|
||||
ListCell *rangeTableCell = NULL;
|
||||
|
||||
RTEToSubqueryConverterContext* rteToSubqueryConverterContext = palloc0(sizeof(RTEToSubqueryConverterContext));
|
||||
|
||||
foreach(rangeTableCell, rangeTableList)
|
||||
int rteIndex = 0;
|
||||
RangeTblEntry* rangeTableEntry = NULL;
|
||||
foreach_ptr(rangeTableEntry, rangeTableList)
|
||||
{
|
||||
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
|
||||
|
||||
rteIndex++;
|
||||
if (rangeTableEntry->rtekind == RTE_SUBQUERY) {
|
||||
rteToSubqueryConverterContext->hasSubqueryRTE = true;
|
||||
}
|
||||
/* we're only interested in tables */
|
||||
if (!(rangeTableEntry->rtekind == RTE_RELATION &&
|
||||
rangeTableEntry->relkind == RELKIND_RELATION))
|
||||
|
@ -289,73 +346,25 @@ FindNextRTECandidate(PlannerRestrictionContext *plannerRestrictionContext,
|
|||
continue;
|
||||
}
|
||||
|
||||
if (localTable && IsCitusTable(rangeTableEntry->relid))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
RTEToSubqueryConverterReference* rteToSubqueryConverter = palloc(sizeof(RTEToSubqueryConverterReference));
|
||||
rteToSubqueryConverter->rangeTableEntry = rangeTableEntry;
|
||||
rteToSubqueryConverter->rteIndex = rteIndex;
|
||||
rteToSubqueryConverter->restrictionList = GetRestrictInfoListForRelation(rangeTableEntry,
|
||||
context->plannerRestrictionContext, 1);
|
||||
rteToSubqueryConverter->requiredAttributeNumbers = RequiredAttrNumbersForRelation(rangeTableEntry, context);
|
||||
|
||||
if (!localTable && !IsCitusTable(rangeTableEntry->relid))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
List *currentRestrictionList =
|
||||
GetRestrictInfoListForRelation(rangeTableEntry,
|
||||
plannerRestrictionContext, 1);
|
||||
|
||||
*restrictionList = currentRestrictionList;
|
||||
return rangeTableEntry;
|
||||
}
|
||||
// TODO:: Put Illegal state error code
|
||||
ereport(ERROR, (errmsg("unexpected state: could not find any RTE to convert to subquery in range table list")));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
* AllDataLocallyAccessible return true if all data for the relations in the
|
||||
* rangeTableList is locally accessible.
|
||||
*/
|
||||
static bool
|
||||
AllDataLocallyAccessible(List *rangeTableList)
|
||||
{
|
||||
ListCell *rangeTableCell = NULL;
|
||||
foreach(rangeTableCell, rangeTableList)
|
||||
{
|
||||
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
|
||||
|
||||
/* we're only interested in tables */
|
||||
if (!(rangeTableEntry->rtekind == RTE_RELATION &&
|
||||
rangeTableEntry->relkind == RELKIND_RELATION))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
Oid relationId = rangeTableEntry->relid;
|
||||
|
||||
if (!IsCitusTable(relationId))
|
||||
{
|
||||
/* local tables are locally accessible */
|
||||
continue;
|
||||
}
|
||||
|
||||
List *shardIntervalList = LoadShardIntervalList(relationId);
|
||||
if (list_length(shardIntervalList) > 1)
|
||||
{
|
||||
/* we currently only consider single placement tables */
|
||||
return false;
|
||||
}
|
||||
|
||||
ShardInterval *shardInterval = linitial(shardIntervalList);
|
||||
uint64 shardId = shardInterval->shardId;
|
||||
ShardPlacement *localShardPlacement =
|
||||
ShardPlacementOnGroup(shardId, GetLocalGroupId());
|
||||
if (localShardPlacement == NULL)
|
||||
{
|
||||
/* the table doesn't have a placement on this node */
|
||||
return false;
|
||||
bool referenceOrDistributedTable = IsCitusTableType(rangeTableEntry->relid, REFERENCE_TABLE) ||
|
||||
IsCitusTableType(rangeTableEntry->relid, DISTRIBUTED_TABLE);
|
||||
if (referenceOrDistributedTable) {
|
||||
rteToSubqueryConverterContext->distributedTableList =
|
||||
lappend(rteToSubqueryConverterContext->distributedTableList, rteToSubqueryConverter);
|
||||
}else if (IsCitusTableType(rangeTableEntry->relid, CITUS_LOCAL_TABLE)) {
|
||||
rteToSubqueryConverterContext->citusLocalTableList =
|
||||
lappend(rteToSubqueryConverterContext->citusLocalTableList, rteToSubqueryConverter);
|
||||
}else {
|
||||
rteToSubqueryConverterContext->localTableList =
|
||||
lappend(rteToSubqueryConverterContext->localTableList, rteToSubqueryConverter);
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
return rteToSubqueryConverterContext;
|
||||
}
|
|
@ -3588,6 +3588,56 @@ NodeIsRangeTblRefReferenceTable(Node *node, List *rangeTableList)
|
|||
return IsCitusTableType(rangeTableEntry->relid, REFERENCE_TABLE);
|
||||
}
|
||||
|
||||
List* FetchAttributeNumsForRTEFromQuals(Node* quals, Index rteIndex) {
|
||||
List* attributeNums = NIL;
|
||||
if (quals == NULL)
|
||||
{
|
||||
return NIL;
|
||||
}
|
||||
|
||||
if (IsA(quals, OpExpr))
|
||||
{
|
||||
if (!NodeIsEqualsOpExpr(quals))
|
||||
{
|
||||
return NIL;
|
||||
}
|
||||
OpExpr *nextJoinClauseOpExpr = castNode(OpExpr, quals);
|
||||
|
||||
Var* var = NULL;
|
||||
if (VarConstOpExprClause(nextJoinClauseOpExpr, &var, NULL)) {
|
||||
attributeNums = lappend_int(attributeNums, var->varattno);
|
||||
return attributeNums;
|
||||
}
|
||||
|
||||
}
|
||||
else if (IsA(quals, BoolExpr))
|
||||
{
|
||||
BoolExpr *boolExpr = (BoolExpr *) quals;
|
||||
|
||||
if (boolExpr->boolop != AND_EXPR && boolExpr->boolop != OR_EXPR) {
|
||||
return attributeNums;
|
||||
}
|
||||
|
||||
bool hasEquality = true;
|
||||
Node* arg = NULL;
|
||||
foreach_ptr(arg, boolExpr->args)
|
||||
{
|
||||
List* attributeNumsInSubExpression = FetchAttributeNumsForRTEFromQuals(arg, rteIndex);
|
||||
if (boolExpr->boolop == AND_EXPR)
|
||||
{
|
||||
hasEquality |= list_length(attributeNumsInSubExpression) > 0;
|
||||
}else if (boolExpr->boolop == OR_EXPR){
|
||||
hasEquality &= list_length(attributeNumsInSubExpression) > 0;
|
||||
}
|
||||
attributeNums = list_concat(attributeNums, attributeNumsInSubExpression);
|
||||
|
||||
}
|
||||
if (hasEquality) {
|
||||
return attributeNums;
|
||||
}
|
||||
}
|
||||
return NIL;
|
||||
}
|
||||
|
||||
/*
|
||||
* JoinSequenceArray walks over the join nodes in the job query and constructs a join
|
||||
|
|
|
@ -181,7 +181,7 @@ static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job,
|
|||
TaskAssignmentPolicyType
|
||||
taskAssignmentPolicy,
|
||||
List *placementList);
|
||||
|
||||
static bool IsLocalOrCitusLocalTable(Oid relationId);
|
||||
|
||||
/*
|
||||
* CreateRouterPlan attempts to create a router executor plan for the given
|
||||
|
@ -530,11 +530,11 @@ ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery,
|
|||
|
||||
Oid distributedTableId = ModifyQueryResultRelationId(queryTree);
|
||||
*distributedTableIdOutput = distributedTableId;
|
||||
if (ContainsLocalTableDistributedTableJoin(queryTree->rtable))
|
||||
if (ContainsTableToBeConvertedToSubquery(queryTree->rtable, distributedTableId))
|
||||
{
|
||||
return deferredError;
|
||||
}
|
||||
|
||||
|
||||
Var *partitionColumn = NULL;
|
||||
|
||||
if (IsCitusTable(distributedTableId))
|
||||
|
@ -773,6 +773,13 @@ ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery,
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static bool IsLocalOrCitusLocalTable(Oid relationId) {
|
||||
if (!IsCitusTable(relationId)) {
|
||||
return true;
|
||||
}
|
||||
return IsCitusTableType(relationId, CITUS_LOCAL_TABLE);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* NodeIsFieldStore returns true if given Node is a FieldStore object.
|
||||
|
@ -896,7 +903,7 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer
|
|||
List *rangeTableList = NIL;
|
||||
uint32 queryTableCount = 0;
|
||||
CmdType commandType = queryTree->commandType;
|
||||
bool fastPathRouterQuery =
|
||||
bool fastPathRouterQuery =
|
||||
plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery;
|
||||
|
||||
/*
|
||||
|
@ -958,13 +965,24 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer
|
|||
/* for other kinds of relations, check if its distributed */
|
||||
else
|
||||
{
|
||||
if (ContainsLocalTableDistributedTableJoin(queryTree->rtable))
|
||||
{
|
||||
RangeTblEntry *resultRte = ExtractResultRelationRTE(queryTree);
|
||||
Oid resultRelationId = InvalidOid;
|
||||
if (resultRte) {
|
||||
resultRelationId = resultRte->relid;
|
||||
}
|
||||
if (IsLocalOrCitusLocalTable(rangeTableEntry->relid) &&
|
||||
ContainsTableToBeConvertedToSubquery(queryTree->rtable, resultRelationId)
|
||||
)
|
||||
{
|
||||
StringInfo errorMessage = makeStringInfo();
|
||||
char *relationName = get_rel_name(rangeTableEntry->relid);
|
||||
|
||||
appendStringInfo(errorMessage, "relation %s is not distributed",
|
||||
if (IsCitusTable(rangeTableEntry->relid)) {
|
||||
appendStringInfo(errorMessage, "citus local table %s cannot be used in this join",
|
||||
relationName);
|
||||
}else {
|
||||
appendStringInfo(errorMessage, "relation %s is not distributed",
|
||||
relationName);
|
||||
}
|
||||
|
||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||
errorMessage->data, NULL, NULL);
|
||||
|
|
|
@ -163,6 +163,7 @@ static bool ShouldRecursivelyPlanSubquery(Query *subquery,
|
|||
static bool AllDistributionKeysInSubqueryAreEqual(Query *subquery,
|
||||
PlannerRestrictionContext *
|
||||
restrictionContext);
|
||||
static bool AllDataLocallyAccessible(List *rangeTableList);
|
||||
static bool ShouldRecursivelyPlanSetOperation(Query *query,
|
||||
RecursivePlanningContext *context);
|
||||
static void RecursivelyPlanSetOperations(Query *query, Node *node,
|
||||
|
@ -184,6 +185,7 @@ static Query * BuildReadIntermediateResultsQuery(List *targetEntryList,
|
|||
List *columnAliasList,
|
||||
Const *resultIdConst, Oid functionOid,
|
||||
bool useBinaryCopyFormat);
|
||||
|
||||
/*
|
||||
* GenerateSubplansForSubqueriesAndCTEs is a wrapper around RecursivelyPlanSubqueriesAndCTEs.
|
||||
* The function returns the subplans if necessary. For the details of when/how subplans are
|
||||
|
@ -1383,6 +1385,64 @@ ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, List *restrict
|
|||
}
|
||||
}
|
||||
|
||||
bool ContainsTableToBeConvertedToSubquery(List* rangeTableList, Oid resultRelationId) {
|
||||
if (AllDataLocallyAccessible(rangeTableList)) {
|
||||
return false;
|
||||
}
|
||||
return ContainsLocalTableDistributedTableJoin(rangeTableList) ||
|
||||
ContainsLocalTableSubqueryJoin(rangeTableList, resultRelationId);
|
||||
}
|
||||
|
||||
/*
|
||||
* AllDataLocallyAccessible return true if all data for the relations in the
|
||||
* rangeTableList is locally accessible.
|
||||
*/
|
||||
static bool
|
||||
AllDataLocallyAccessible(List *rangeTableList)
|
||||
{
|
||||
RangeTblEntry* rangeTableEntry = NULL;
|
||||
foreach_ptr(rangeTableEntry, rangeTableList)
|
||||
{
|
||||
if (rangeTableEntry->rtekind == RTE_SUBQUERY) {
|
||||
// TODO:: check if it has distributed table
|
||||
return false;
|
||||
}
|
||||
/* we're only interested in tables */
|
||||
if (!(rangeTableEntry->rtekind == RTE_RELATION &&
|
||||
rangeTableEntry->relkind == RELKIND_RELATION))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
Oid relationId = rangeTableEntry->relid;
|
||||
|
||||
if (!IsCitusTable(relationId))
|
||||
{
|
||||
/* local tables are locally accessible */
|
||||
continue;
|
||||
}
|
||||
|
||||
List *shardIntervalList = LoadShardIntervalList(relationId);
|
||||
if (list_length(shardIntervalList) != 1)
|
||||
{
|
||||
/* we currently only consider single placement tables */
|
||||
return false;
|
||||
}
|
||||
|
||||
ShardInterval *shardInterval = linitial(shardIntervalList);
|
||||
uint64 shardId = shardInterval->shardId;
|
||||
ShardPlacement *localShardPlacement =
|
||||
ShardPlacementOnGroup(shardId, GetLocalGroupId());
|
||||
if (localShardPlacement == NULL)
|
||||
{
|
||||
/* the table doesn't have a placement on this node */
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
* ContainsLocalTableDistributedTableJoin returns true if the input range table list
|
||||
|
@ -1400,19 +1460,20 @@ ContainsLocalTableDistributedTableJoin(List *rangeTableList)
|
|||
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
|
||||
|
||||
/* we're only interested in tables */
|
||||
if (rangeTableEntry->rtekind != RTE_RELATION ||
|
||||
rangeTableEntry->relkind != RELKIND_RELATION)
|
||||
/* TODO:: What about partitioned tables? */
|
||||
if (!(rangeTableEntry->rtekind == RTE_RELATION &&
|
||||
rangeTableEntry->relkind == RELKIND_RELATION))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
/* TODO: do NOT forget Citus local tables */
|
||||
if (IsCitusTable(rangeTableEntry->relid))
|
||||
if (IsCitusTableType(rangeTableEntry->relid, DISTRIBUTED_TABLE) || IsCitusTableType(rangeTableEntry->relid, REFERENCE_TABLE))
|
||||
{
|
||||
containsDistributedTable = true;
|
||||
}
|
||||
else
|
||||
else if (IsCitusTableType(rangeTableEntry->relid, CITUS_LOCAL_TABLE) || !IsCitusTable(rangeTableEntry->relid))
|
||||
{
|
||||
/* we consider citus local tables as local table */
|
||||
containsLocalTable = true;
|
||||
}
|
||||
}
|
||||
|
@ -1421,6 +1482,41 @@ ContainsLocalTableDistributedTableJoin(List *rangeTableList)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ContainsLocalTableSubqueryJoin returns true if the input range table list
|
||||
* contains a direct join between local table/citus local table and subquery.
|
||||
*/
|
||||
bool
|
||||
ContainsLocalTableSubqueryJoin(List *rangeTableList, Oid resultRelationId)
|
||||
{
|
||||
bool containsLocalTable = false;
|
||||
bool containsSubquery = false;
|
||||
|
||||
ListCell *rangeTableCell = NULL;
|
||||
foreach(rangeTableCell, rangeTableList)
|
||||
{
|
||||
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
|
||||
|
||||
if (rangeTableEntry->rtekind == RTE_SUBQUERY) {
|
||||
containsSubquery = true;
|
||||
}
|
||||
/* we're only interested in tables */
|
||||
/* TODO:: What about partitioned tables? */
|
||||
if (!(rangeTableEntry->rtekind == RTE_RELATION &&
|
||||
rangeTableEntry->relkind == RELKIND_RELATION))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!IsCitusTable(rangeTableEntry->relid) && rangeTableEntry->relid != resultRelationId)
|
||||
{
|
||||
containsLocalTable = true;
|
||||
}
|
||||
}
|
||||
|
||||
return containsLocalTable && containsSubquery;
|
||||
}
|
||||
|
||||
/*
|
||||
* WrapFunctionsInSubqueries iterates over all the immediate Range Table Entries
|
||||
* of a query and wraps the functions inside (SELECT * FROM fnc() f)
|
||||
|
|
|
@ -254,8 +254,6 @@ static bool IsValidPartitionKeyRestriction(OpExpr *opClause);
|
|||
static void AddPartitionKeyRestrictionToInstance(ClauseWalkerContext *context,
|
||||
OpExpr *opClause, Var *varClause,
|
||||
Const *constantClause);
|
||||
static bool VarConstOpExprClause(OpExpr *opClause, Var **varClause,
|
||||
Const **constantClause);
|
||||
static void AddSAOPartitionKeyRestrictionToInstance(ClauseWalkerContext *context,
|
||||
ScalarArrayOpExpr *
|
||||
arrayOperatorExpression);
|
||||
|
|
|
@ -589,5 +589,6 @@ extern RangeTblEntry * DerivedRangeTableEntry(MultiNode *multiNode, List *column
|
|||
List *funcColumnTypeMods,
|
||||
List *funcCollations);
|
||||
|
||||
extern List* FetchAttributeNumsForRTEFromQuals(Node* quals, Index rteIndex);
|
||||
|
||||
#endif /* MULTI_PHYSICAL_PLANNER_H */
|
||||
|
|
|
@ -63,6 +63,8 @@ extern bool ContainsLocalTableDistributedTableJoin(List *rangeTableList);
|
|||
extern void ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry,
|
||||
List *restrictionList,
|
||||
List *requiredAttrNumbers);
|
||||
|
||||
extern bool
|
||||
ContainsLocalTableSubqueryJoin(List *rangeTableList, Oid resultRelationId);
|
||||
extern bool ContainsTableToBeConvertedToSubquery(List* rangeTableList, Oid resultRelationId);
|
||||
|
||||
#endif /* RECURSIVE_PLANNING_H */
|
||||
|
|
|
@ -24,4 +24,6 @@ extern List * get_all_actual_clauses(List *restrictinfo_list);
|
|||
extern Const * TransformPartitionRestrictionValue(Var *partitionColumn,
|
||||
Const *restrictionValue,
|
||||
bool missingOk);
|
||||
bool VarConstOpExprClause(OpExpr *opClause, Var **varClause, Const **constantClause);
|
||||
|
||||
#endif /* SHARD_PRUNING_H_ */
|
||||
|
|
|
@ -114,9 +114,9 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c
|
|||
|
||||
-- a unique index on key so dist table should be recursively planned
|
||||
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey USING(key);
|
||||
DEBUG: Wrapping local relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE true OFFSET 0
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE true OFFSET 0
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey USING (key))
|
||||
DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table JOIN local_table_join.distributed_table_pkey USING (key))
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
|
@ -132,9 +132,9 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c
|
|||
(1 row)
|
||||
|
||||
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON postgres_table.key = distributed_table_pkey.key;
|
||||
DEBUG: Wrapping local relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE true OFFSET 0
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE true OFFSET 0
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey ON ((postgres_table.key OPERATOR(pg_catalog.=) distributed_table_pkey.key)))
|
||||
DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table JOIN local_table_join.distributed_table_pkey ON ((postgres_table.key OPERATOR(pg_catalog.=) distributed_table_pkey.key)))
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
|
@ -149,11 +149,141 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c
|
|||
0
|
||||
(1 row)
|
||||
|
||||
-- it should favor distributed table only if it has equality on the unique column
|
||||
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key > 10;
|
||||
DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT NULL::integer AS key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT NULL::integer AS key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table JOIN local_table_join.distributed_table_pkey ON ((distributed_table_pkey.key OPERATOR(pg_catalog.>) 10)))
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key < 10;
|
||||
DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT NULL::integer AS key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT NULL::integer AS key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table JOIN local_table_join.distributed_table_pkey ON ((distributed_table_pkey.key OPERATOR(pg_catalog.<) 10)))
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10;
|
||||
DEBUG: Wrapping local relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey ON ((distributed_table_pkey.key OPERATOR(pg_catalog.=) 10)))
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 AND distributed_table_pkey.key > 10 ;
|
||||
DEBUG: Wrapping local relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE ((key OPERATOR(pg_catalog.>) 10) AND (key OPERATOR(pg_catalog.=) 10)) OFFSET 0
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE ((key OPERATOR(pg_catalog.>) 10) AND (key OPERATOR(pg_catalog.=) 10)) OFFSET 0
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey ON (((distributed_table_pkey.key OPERATOR(pg_catalog.=) 10) AND (distributed_table_pkey.key OPERATOR(pg_catalog.>) 10))))
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 AND distributed_table_pkey.key > 10 ;
|
||||
DEBUG: Wrapping local relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE ((key OPERATOR(pg_catalog.>) 10) AND (key OPERATOR(pg_catalog.=) 10)) OFFSET 0
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE ((key OPERATOR(pg_catalog.>) 10) AND (key OPERATOR(pg_catalog.=) 10)) OFFSET 0
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey ON (((distributed_table_pkey.key OPERATOR(pg_catalog.=) 10) AND (distributed_table_pkey.key OPERATOR(pg_catalog.>) 10))))
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 AND distributed_table_pkey.key > 10 AND postgres_table.key = 5;
|
||||
DEBUG: Wrapping local relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE ((key OPERATOR(pg_catalog.>) 10) AND (key OPERATOR(pg_catalog.=) 10)) OFFSET 0
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE ((key OPERATOR(pg_catalog.>) 10) AND (key OPERATOR(pg_catalog.=) 10)) OFFSET 0
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey ON (((distributed_table_pkey.key OPERATOR(pg_catalog.=) 10) AND (distributed_table_pkey.key OPERATOR(pg_catalog.>) 10) AND (postgres_table.key OPERATOR(pg_catalog.=) 5))))
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 OR distributed_table_pkey.key > 10;
|
||||
DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT NULL::integer AS key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT NULL::integer AS key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table JOIN local_table_join.distributed_table_pkey ON (((distributed_table_pkey.key OPERATOR(pg_catalog.=) 10) OR (distributed_table_pkey.key OPERATOR(pg_catalog.>) 10))))
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 OR distributed_table_pkey.key = 20;
|
||||
DEBUG: Wrapping local relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE ((key OPERATOR(pg_catalog.=) 10) OR (key OPERATOR(pg_catalog.=) 20)) OFFSET 0
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE ((key OPERATOR(pg_catalog.=) 10) OR (key OPERATOR(pg_catalog.=) 20)) OFFSET 0
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey ON (((distributed_table_pkey.key OPERATOR(pg_catalog.=) 10) OR (distributed_table_pkey.key OPERATOR(pg_catalog.=) 20))))
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 OR distributed_table_pkey.key = 20 OR distributed_table_pkey.key = 30;
|
||||
DEBUG: Wrapping local relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE ((key OPERATOR(pg_catalog.=) 10) OR (key OPERATOR(pg_catalog.=) 20) OR (key OPERATOR(pg_catalog.=) 30)) OFFSET 0
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE ((key OPERATOR(pg_catalog.=) 10) OR (key OPERATOR(pg_catalog.=) 20) OR (key OPERATOR(pg_catalog.=) 30)) OFFSET 0
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey ON (((distributed_table_pkey.key OPERATOR(pg_catalog.=) 10) OR (distributed_table_pkey.key OPERATOR(pg_catalog.=) 20) OR (distributed_table_pkey.key OPERATOR(pg_catalog.=) 30))))
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 OR distributed_table_pkey.key = (
|
||||
SELECT count(*) FROM distributed_table_pkey
|
||||
);
|
||||
DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT NULL::integer AS key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT count(*) AS count FROM local_table_join.distributed_table_pkey
|
||||
DEBUG: generating subplan XXX_2 for subquery SELECT NULL::integer AS key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table JOIN local_table_join.distributed_table_pkey ON (((distributed_table_pkey.key OPERATOR(pg_catalog.=) 10) OR (distributed_table_pkey.key OPERATOR(pg_catalog.=) (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint))))))
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 OR (distributed_table_pkey.key = 5 and distributed_table_pkey.key > 15);
|
||||
DEBUG: Wrapping local relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE ((key OPERATOR(pg_catalog.=) 10) OR ((key OPERATOR(pg_catalog.=) 5) AND (key OPERATOR(pg_catalog.>) 15))) OFFSET 0
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE ((key OPERATOR(pg_catalog.=) 10) OR ((key OPERATOR(pg_catalog.=) 5) AND (key OPERATOR(pg_catalog.>) 15))) OFFSET 0
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey ON (((distributed_table_pkey.key OPERATOR(pg_catalog.=) 10) OR ((distributed_table_pkey.key OPERATOR(pg_catalog.=) 5) AND (distributed_table_pkey.key OPERATOR(pg_catalog.>) 15)))))
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 OR (distributed_table_pkey.key > 10 and distributed_table_pkey.key > 15);
|
||||
DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT NULL::integer AS key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT NULL::integer AS key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table JOIN local_table_join.distributed_table_pkey ON (((distributed_table_pkey.key OPERATOR(pg_catalog.=) 10) OR ((distributed_table_pkey.key OPERATOR(pg_catalog.>) 10) AND (distributed_table_pkey.key OPERATOR(pg_catalog.>) 15)))))
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 OR (distributed_table_pkey.key > 10 and distributed_table_pkey.value = 'notext');
|
||||
DEBUG: Wrapping local relation "distributed_table_pkey" to a subquery: SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE ((key OPERATOR(pg_catalog.=) 10) OR ((key OPERATOR(pg_catalog.>) 10) AND (value OPERATOR(pg_catalog.=) 'notext'::text))) OFFSET 0
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE ((key OPERATOR(pg_catalog.=) 10) OR ((key OPERATOR(pg_catalog.>) 10) AND (value OPERATOR(pg_catalog.=) 'notext'::text))) OFFSET 0
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey ON (((distributed_table_pkey.key OPERATOR(pg_catalog.=) 10) OR ((distributed_table_pkey.key OPERATOR(pg_catalog.>) 10) AND (distributed_table_pkey.value OPERATOR(pg_catalog.=) 'notext'::text)))))
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 OR (distributed_table_pkey.key = 10 and distributed_table_pkey.value = 'notext');
|
||||
DEBUG: Wrapping local relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey ON (((distributed_table_pkey.key OPERATOR(pg_catalog.=) 10) OR ((distributed_table_pkey.key OPERATOR(pg_catalog.=) 10) AND (distributed_table_pkey.value OPERATOR(pg_catalog.=) 'notext'::text)))))
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- a unique index on key so dist table should be recursively planned
|
||||
SELECT count(*) FROM postgres_table JOIN distributed_table_windex USING(key);
|
||||
DEBUG: Wrapping local relation "distributed_table_windex" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE true OFFSET 0
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE true OFFSET 0
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_windex USING (key))
|
||||
DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table JOIN local_table_join.distributed_table_windex USING (key))
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
|
@ -169,9 +299,9 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c
|
|||
(1 row)
|
||||
|
||||
SELECT count(*) FROM postgres_table JOIN distributed_table_windex ON postgres_table.key = distributed_table_windex.key;
|
||||
DEBUG: Wrapping local relation "distributed_table_windex" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE true OFFSET 0
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE true OFFSET 0
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_windex ON ((postgres_table.key OPERATOR(pg_catalog.=) distributed_table_windex.key)))
|
||||
DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table JOIN local_table_join.distributed_table_windex ON ((postgres_table.key OPERATOR(pg_catalog.=) distributed_table_windex.key)))
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
|
@ -449,7 +579,157 @@ DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, N
|
|||
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table d1 WHERE true OFFSET 0
|
||||
DEBUG: generating subplan XXX_2 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table d2 WHERE true OFFSET 0
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.postgres_table SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) d1, (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) d2 WHERE ((postgres_table.key OPERATOR(pg_catalog.=) d1.key) AND (d1.key OPERATOR(pg_catalog.=) d2.key))
|
||||
\set VERBOSITY terse
|
||||
-- currently can't plan subquery-local table join
|
||||
SELECT count(*)
|
||||
FROM
|
||||
(SELECT * FROM (SELECT * FROM distributed_table) d1) d2
|
||||
JOIN postgres_table
|
||||
USING(key);
|
||||
DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT d1.key, d1.value, d1.value_2 FROM (SELECT distributed_table.key, distributed_table.value, distributed_table.value_2 FROM local_table_join.distributed_table) d1) d2 JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table USING (key))
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
---------------------------------------------------------------------
|
||||
SET client_min_messages to ERROR;
|
||||
SELECT master_add_node('localhost', :master_port, groupId => 0);
|
||||
master_add_node
|
||||
---------------------------------------------------------------------
|
||||
30
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE citus_local(key int, value text);
|
||||
SELECT create_citus_local_table('citus_local');
|
||||
create_citus_local_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SET client_min_messages TO DEBUG1;
|
||||
-- same for citus local table - distributed table joins
|
||||
-- a unique index on key so dist table should be recursively planned
|
||||
SELECT count(*) FROM citus_local JOIN distributed_table_windex USING(key);
|
||||
DEBUG: Wrapping local relation "distributed_table_windex" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE true OFFSET 0
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE true OFFSET 0
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.citus_local JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_windex USING (key))
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM citus_local JOIN distributed_table_windex USING(value);
|
||||
DEBUG: Wrapping local relation "distributed_table_windex" to a subquery: SELECT NULL::integer AS key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE true OFFSET 0
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT NULL::integer AS key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE true OFFSET 0
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.citus_local JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_windex USING (value))
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM citus_local JOIN distributed_table_windex ON citus_local.key = distributed_table_windex.key;
|
||||
DEBUG: Wrapping local relation "distributed_table_windex" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE true OFFSET 0
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE true OFFSET 0
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.citus_local JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_windex ON ((citus_local.key OPERATOR(pg_catalog.=) distributed_table_windex.key)))
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM citus_local JOIN distributed_table_windex ON distributed_table_windex.key = 10;
|
||||
DEBUG: Wrapping local relation "distributed_table_windex" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.citus_local JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_windex ON ((distributed_table_windex.key OPERATOR(pg_catalog.=) 10)))
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- no unique index, citus local table should be recursively planned
|
||||
SELECT count(*) FROM citus_local JOIN distributed_table USING(key);
|
||||
DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.citus_local JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table USING (key))
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM citus_local JOIN distributed_table USING(value);
|
||||
DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT NULL::integer AS key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT NULL::integer AS key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.citus_local JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table USING (value))
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM citus_local JOIN distributed_table ON citus_local.key = distributed_table.key;
|
||||
DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.citus_local JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table ON ((citus_local.key OPERATOR(pg_catalog.=) distributed_table.key)))
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM citus_local JOIN distributed_table ON distributed_table.key = 10;
|
||||
DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.citus_local JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table ON ((distributed_table.key OPERATOR(pg_catalog.=) 10)))
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM citus_local JOIN distributed_table USING(key) JOIN postgres_table USING (key) JOIN reference_table USING(key);
|
||||
DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0
|
||||
DEBUG: Wrapping local relation "reference_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.reference_table WHERE true OFFSET 0
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0
|
||||
DEBUG: generating subplan XXX_2 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.reference_table WHERE true OFFSET 0
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((local_table_join.citus_local JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table USING (key)) JOIN local_table_join.postgres_table USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) reference_table USING (key))
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- update
|
||||
UPDATE
|
||||
distributed_table_windex
|
||||
SET
|
||||
value = 'test'
|
||||
FROM
|
||||
citus_local
|
||||
WHERE
|
||||
distributed_table_windex.key = citus_local.key;
|
||||
DEBUG: Wrapping local relation "citus_local" to a subquery: SELECT key, NULL::text AS value FROM local_table_join.citus_local WHERE true OFFSET 0
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value FROM local_table_join.citus_local WHERE true OFFSET 0
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.distributed_table_windex SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) citus_local WHERE (distributed_table_windex.key OPERATOR(pg_catalog.=) citus_local.key)
|
||||
UPDATE
|
||||
citus_local
|
||||
SET
|
||||
value = 'test'
|
||||
FROM
|
||||
distributed_table_windex
|
||||
WHERE
|
||||
distributed_table_windex.key = citus_local.key;
|
||||
DEBUG: Wrapping local relation "distributed_table_windex" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE true OFFSET 0
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE true OFFSET 0
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.citus_local SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_windex WHERE (distributed_table_windex.key OPERATOR(pg_catalog.=) citus_local.key)
|
||||
DROP TABLE citus_local;
|
||||
CONTEXT: SQL statement "SELECT master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name)"
|
||||
PL/pgSQL function citus_drop_trigger() line 15 at PERFORM
|
||||
CONTEXT: SQL statement "SELECT master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name)"
|
||||
PL/pgSQL function citus_drop_trigger() line 15 at PERFORM
|
||||
RESET client_min_messages;
|
||||
SELECT master_remove_node('localhost', :master_port);
|
||||
master_remove_node
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
\set VERBOSITY terse
|
||||
DROP SCHEMA local_table_join CASCADE;
|
||||
NOTICE: drop cascades to 5 other objects
|
||||
NOTICE: drop cascades to 6 other objects
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
CREATE SCHEMA local_table_join;
|
||||
SET search_path TO local_table_join;
|
||||
|
||||
|
||||
CREATE TABLE postgres_table (key int, value text, value_2 jsonb);
|
||||
CREATE TABLE reference_table (key int, value text, value_2 jsonb);
|
||||
SELECT create_reference_table('reference_table');
|
||||
|
@ -52,6 +51,24 @@ SELECT count(*) FROM postgres_table JOIN distributed_table_pkey USING(key);
|
|||
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey USING(value);
|
||||
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON postgres_table.key = distributed_table_pkey.key;
|
||||
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10;
|
||||
-- it should favor distributed table only if it has equality on the unique column
|
||||
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key > 10;
|
||||
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key < 10;
|
||||
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10;
|
||||
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 AND distributed_table_pkey.key > 10 ;
|
||||
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 AND distributed_table_pkey.key > 10 ;
|
||||
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 AND distributed_table_pkey.key > 10 AND postgres_table.key = 5;
|
||||
|
||||
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 OR distributed_table_pkey.key > 10;
|
||||
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 OR distributed_table_pkey.key = 20;
|
||||
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 OR distributed_table_pkey.key = 20 OR distributed_table_pkey.key = 30;
|
||||
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 OR distributed_table_pkey.key = (
|
||||
SELECT count(*) FROM distributed_table_pkey
|
||||
);
|
||||
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 OR (distributed_table_pkey.key = 5 and distributed_table_pkey.key > 15);
|
||||
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 OR (distributed_table_pkey.key > 10 and distributed_table_pkey.key > 15);
|
||||
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 OR (distributed_table_pkey.key > 10 and distributed_table_pkey.value = 'notext');
|
||||
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 OR (distributed_table_pkey.key = 10 and distributed_table_pkey.value = 'notext');
|
||||
|
||||
|
||||
-- a unique index on key so dist table should be recursively planned
|
||||
|
@ -246,7 +263,61 @@ FROM
|
|||
WHERE
|
||||
postgres_table.key = d1.key AND d1.key = d2.key;
|
||||
|
||||
-- currently can't plan subquery-local table join
|
||||
SELECT count(*)
|
||||
FROM
|
||||
(SELECT * FROM (SELECT * FROM distributed_table) d1) d2
|
||||
JOIN postgres_table
|
||||
USING(key);
|
||||
|
||||
\set VERBOSITY terse
|
||||
|
||||
|
||||
---------------------------------------------------------
|
||||
|
||||
SET client_min_messages to ERROR;
|
||||
SELECT master_add_node('localhost', :master_port, groupId => 0);
|
||||
|
||||
|
||||
CREATE TABLE citus_local(key int, value text);
|
||||
SELECT create_citus_local_table('citus_local');
|
||||
SET client_min_messages TO DEBUG1;
|
||||
|
||||
-- same for citus local table - distributed table joins
|
||||
-- a unique index on key so dist table should be recursively planned
|
||||
SELECT count(*) FROM citus_local JOIN distributed_table_windex USING(key);
|
||||
SELECT count(*) FROM citus_local JOIN distributed_table_windex USING(value);
|
||||
SELECT count(*) FROM citus_local JOIN distributed_table_windex ON citus_local.key = distributed_table_windex.key;
|
||||
SELECT count(*) FROM citus_local JOIN distributed_table_windex ON distributed_table_windex.key = 10;
|
||||
|
||||
-- no unique index, citus local table should be recursively planned
|
||||
SELECT count(*) FROM citus_local JOIN distributed_table USING(key);
|
||||
SELECT count(*) FROM citus_local JOIN distributed_table USING(value);
|
||||
SELECT count(*) FROM citus_local JOIN distributed_table ON citus_local.key = distributed_table.key;
|
||||
SELECT count(*) FROM citus_local JOIN distributed_table ON distributed_table.key = 10;
|
||||
|
||||
SELECT count(*) FROM citus_local JOIN distributed_table USING(key) JOIN postgres_table USING (key) JOIN reference_table USING(key);
|
||||
|
||||
-- update
|
||||
UPDATE
|
||||
distributed_table_windex
|
||||
SET
|
||||
value = 'test'
|
||||
FROM
|
||||
citus_local
|
||||
WHERE
|
||||
distributed_table_windex.key = citus_local.key;
|
||||
|
||||
UPDATE
|
||||
citus_local
|
||||
SET
|
||||
value = 'test'
|
||||
FROM
|
||||
distributed_table_windex
|
||||
WHERE
|
||||
distributed_table_windex.key = citus_local.key;
|
||||
|
||||
DROP TABLE citus_local;
|
||||
RESET client_min_messages;
|
||||
SELECT master_remove_node('localhost', :master_port);
|
||||
\set VERBOSITY terse
|
||||
DROP SCHEMA local_table_join CASCADE;
|
||||
|
|
|
@ -9,9 +9,9 @@ SET client_min_messages TO DEBUG1;
|
|||
|
||||
CREATE TABLE users_table_local AS SELECT * FROM users_table;
|
||||
|
||||
-- we don't support subqueries with local tables when they are not leaf queries
|
||||
-- TODO:: Move this out of this file
|
||||
SELECT
|
||||
*
|
||||
COUNT(*)
|
||||
FROM
|
||||
(
|
||||
SELECT
|
||||
|
@ -23,7 +23,7 @@ FROM
|
|||
|
||||
RESET client_min_messages;
|
||||
-- we don't support subqueries with local tables when they are not leaf queries
|
||||
SELECT user_id FROM users_table WHERE user_id IN
|
||||
SELECT COUNT(user_id) FROM users_table WHERE user_id IN
|
||||
(SELECT
|
||||
user_id
|
||||
FROM
|
||||
|
|
Loading…
Reference in New Issue