Add some more tests

pull/4358/head
Sait Talha Nisanci 2020-11-27 19:02:51 +03:00
parent 5693cabc41
commit eebcd995b3
8 changed files with 256 additions and 206 deletions

View File

@ -54,41 +54,41 @@
#include "utils/guc.h" #include "utils/guc.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
typedef struct RTEToSubqueryConverterReference typedef struct RangeTableEntryDetails
{ {
RangeTblEntry *rangeTableEntry; RangeTblEntry *rangeTableEntry;
Index rteIndex; Index rteIndex;
List *restrictionList; List *restrictionList;
List *requiredAttributeNumbers; List *requiredAttributeNumbers;
} RTEToSubqueryConverterReference; } RangeTableEntryDetails;
typedef struct RTEToSubqueryConverterContext typedef struct ConversionCandidates
{ {
List *distributedTableList; /* reference or distributed table */ List *distributedTableList; /* reference or distributed table */
List *localTableList; /* local or citus local table */ List *localTableList; /* local or citus local table */
bool hasSubqueryRTE; bool hasSubqueryRTE;
}RTEToSubqueryConverterContext; }ConversionCandidates;
static Oid GetResultRelationId(Query *query); static Oid GetResultRelationId(Query *query);
static Oid GetRTEToSubqueryConverterReferenceRelId( static Oid GetRTEToSubqueryConverterReferenceRelId(
RTEToSubqueryConverterReference *rteToSubqueryConverterReference); RangeTableEntryDetails *rangeTableEntryDetails);
static bool ShouldConvertLocalTableJoinsToSubqueries(List *rangeTableList, Oid static bool ShouldConvertLocalTableJoinsToSubqueries(List *rangeTableList, Oid
resultRelationId); resultRelationId);
static bool HasUniqueFilter(RangeTblEntry *distRTE, List *distRTERestrictionList, static bool HasUniqueFilter(RangeTblEntry *distRTE, List *distRTERestrictionList,
List *requiredAttrNumbersForDistRTE); List *requiredAttrNumbersForDistRTE);
static bool ShouldConvertDistributedTable(FromExpr *joinTree, static bool ShouldConvertDistributedTable(FromExpr *joinTree,
RTEToSubqueryConverterReference *distRTEContext); RangeTableEntryDetails *distRTEContext);
static List * RequiredAttrNumbersForRelation(RangeTblEntry *relationRte, static List * RequiredAttrNumbersForRelation(RangeTblEntry *relationRte,
RecursivePlanningContext *planningContext); RecursivePlanningContext *planningContext);
static RTEToSubqueryConverterContext * CreateRTEToSubqueryConverterContext( static ConversionCandidates * CreateConversionCandidates(
RecursivePlanningContext *context, RecursivePlanningContext *context,
List * List *
rangeTableList); rangeTableList);
static void GetAllUniqueIndexes(Form_pg_index indexForm, List **uniqueIndexes); static void GetAllUniqueIndexes(Form_pg_index indexForm, List **uniqueIndexes);
static RTEToSubqueryConverterReference * GetNextRTEToConvertToSubquery(FromExpr *joinTree, static RangeTableEntryDetails * GetNextRTEToConvertToSubquery(FromExpr *joinTree,
RTEToSubqueryConverterContext ConversionCandidates
* *
rteToSubqueryConverterContext, conversionCandidates,
PlannerRestrictionContext PlannerRestrictionContext
* *
plannerRestrictionContext, plannerRestrictionContext,
@ -96,13 +96,13 @@ static RTEToSubqueryConverterReference * GetNextRTEToConvertToSubquery(FromExpr
resultRelationId); resultRelationId);
static void GetRangeTableEntriesFromJoinTree(Node *joinNode, List *rangeTableList, static void GetRangeTableEntriesFromJoinTree(Node *joinNode, List *rangeTableList,
List **joinRangeTableEntries); List **joinRangeTableEntries);
static void RemoveFromRTEToSubqueryConverterContext( static void RemoveFromConversionCandidates(ConversionCandidates *conversionCandidates, Oid
RTEToSubqueryConverterContext *rteToSubqueryConverterContext, Oid relationId); relationId);
static bool FillLocalAndDistributedRTECandidates( static bool FillLocalAndDistributedRTECandidates(
RTEToSubqueryConverterContext *rteToSubqueryConverterContext, ConversionCandidates *conversionCandidates,
RTEToSubqueryConverterReference ** RangeTableEntryDetails **
localRTECandidate, localRTECandidate,
RTEToSubqueryConverterReference ** RangeTableEntryDetails **
distributedRTECandidate); distributedRTECandidate);
/* /*
@ -123,31 +123,31 @@ ConvertUnplannableTableJoinsToSubqueries(Query *query,
return; return;
} }
RTEToSubqueryConverterContext *rteToSubqueryConverterContext = ConversionCandidates *conversionCandidates =
CreateRTEToSubqueryConverterContext( CreateConversionCandidates(
context, rangeTableList); context, rangeTableList);
RTEToSubqueryConverterReference *rteToSubqueryConverterReference = RangeTableEntryDetails *rangeTableEntryDetails =
GetNextRTEToConvertToSubquery(query->jointree, rteToSubqueryConverterContext, GetNextRTEToConvertToSubquery(query->jointree, conversionCandidates,
context->plannerRestrictionContext, context->plannerRestrictionContext,
resultRelationId); resultRelationId);
PlannerRestrictionContext *plannerRestrictionContext = PlannerRestrictionContext *plannerRestrictionContext =
FilterPlannerRestrictionForQuery(context->plannerRestrictionContext, query); FilterPlannerRestrictionForQuery(context->plannerRestrictionContext, query);
while (rteToSubqueryConverterReference && !IsRouterPlannable(query, while (rangeTableEntryDetails && !IsRouterPlannable(query,
plannerRestrictionContext)) plannerRestrictionContext))
{ {
ReplaceRTERelationWithRteSubquery( ReplaceRTERelationWithRteSubquery(
rteToSubqueryConverterReference->rangeTableEntry, rangeTableEntryDetails->rangeTableEntry,
rteToSubqueryConverterReference->restrictionList, rangeTableEntryDetails->restrictionList,
rteToSubqueryConverterReference-> rangeTableEntryDetails->
requiredAttributeNumbers, requiredAttributeNumbers,
context); context);
RemoveFromRTEToSubqueryConverterContext(rteToSubqueryConverterContext, RemoveFromConversionCandidates(conversionCandidates,
rteToSubqueryConverterReference-> rangeTableEntryDetails->
rangeTableEntry->relid); rangeTableEntry->relid);
rteToSubqueryConverterReference = rangeTableEntryDetails =
GetNextRTEToConvertToSubquery(query->jointree, rteToSubqueryConverterContext, GetNextRTEToConvertToSubquery(query->jointree, conversionCandidates,
context->plannerRestrictionContext, context->plannerRestrictionContext,
resultRelationId); resultRelationId);
} }
@ -220,15 +220,15 @@ GetRangeTableEntriesFromJoinTree(Node *joinNode, List *rangeTableList,
* which should be converted to a subquery. It considers the local join policy * which should be converted to a subquery. It considers the local join policy
* and result relation. * and result relation.
*/ */
static RTEToSubqueryConverterReference * static RangeTableEntryDetails *
GetNextRTEToConvertToSubquery(FromExpr *joinTree, GetNextRTEToConvertToSubquery(FromExpr *joinTree,
RTEToSubqueryConverterContext *rteToSubqueryConverterContext, ConversionCandidates *conversionCandidates,
PlannerRestrictionContext *plannerRestrictionContext, Oid PlannerRestrictionContext *plannerRestrictionContext, Oid
resultRelationId) resultRelationId)
{ {
RTEToSubqueryConverterReference *localRTECandidate = NULL; RangeTableEntryDetails *localRTECandidate = NULL;
RTEToSubqueryConverterReference *distributedRTECandidate = NULL; RangeTableEntryDetails *distributedRTECandidate = NULL;
if (!FillLocalAndDistributedRTECandidates(rteToSubqueryConverterContext, if (!FillLocalAndDistributedRTECandidates(conversionCandidates,
&localRTECandidate, &localRTECandidate,
&distributedRTECandidate)) &distributedRTECandidate))
{ {
@ -283,28 +283,27 @@ GetNextRTEToConvertToSubquery(FromExpr *joinTree,
* It returns true if we should continue converting tables to subqueries. * It returns true if we should continue converting tables to subqueries.
*/ */
static bool static bool
FillLocalAndDistributedRTECandidates( FillLocalAndDistributedRTECandidates(ConversionCandidates *conversionCandidates,
RTEToSubqueryConverterContext *rteToSubqueryConverterContext, RangeTableEntryDetails **localRTECandidate,
RTEToSubqueryConverterReference **localRTECandidate, RangeTableEntryDetails **
RTEToSubqueryConverterReference **
distributedRTECandidate) distributedRTECandidate)
{ {
if (list_length(rteToSubqueryConverterContext->localTableList) > 0) if (list_length(conversionCandidates->localTableList) > 0)
{ {
*localRTECandidate = linitial(rteToSubqueryConverterContext->localTableList); *localRTECandidate = linitial(conversionCandidates->localTableList);
} }
if (*localRTECandidate == NULL) if (*localRTECandidate == NULL)
{ {
return false; return false;
} }
if (list_length(rteToSubqueryConverterContext->distributedTableList) > 0) if (list_length(conversionCandidates->distributedTableList) > 0)
{ {
*distributedRTECandidate = linitial( *distributedRTECandidate = linitial(
rteToSubqueryConverterContext->distributedTableList); conversionCandidates->distributedTableList);
} }
return *distributedRTECandidate != NULL || return *distributedRTECandidate != NULL ||
rteToSubqueryConverterContext->hasSubqueryRTE; conversionCandidates->hasSubqueryRTE;
} }
@ -313,35 +312,33 @@ FillLocalAndDistributedRTECandidates(
* if it is a valid one. * if it is a valid one.
*/ */
static Oid static Oid
GetRTEToSubqueryConverterReferenceRelId( GetRTEToSubqueryConverterReferenceRelId(RangeTableEntryDetails *rangeTableEntryDetails)
RTEToSubqueryConverterReference *rteToSubqueryConverterReference)
{ {
if (rteToSubqueryConverterReference && if (rangeTableEntryDetails &&
rteToSubqueryConverterReference->rangeTableEntry) rangeTableEntryDetails->rangeTableEntry)
{ {
return rteToSubqueryConverterReference->rangeTableEntry->relid; return rangeTableEntryDetails->rangeTableEntry->relid;
} }
return InvalidOid; return InvalidOid;
} }
/* /*
* RemoveFromRTEToSubqueryConverterContext removes an element from * RemoveFromConversionCandidates removes an element from
* the relevant list based on the relation id. * the relevant list based on the relation id.
*/ */
static void static void
RemoveFromRTEToSubqueryConverterContext( RemoveFromConversionCandidates(ConversionCandidates *conversionCandidates, Oid relationId)
RTEToSubqueryConverterContext *rteToSubqueryConverterContext, Oid relationId)
{ {
if (IsLocalOrCitusLocalTable(relationId)) if (IsLocalOrCitusLocalTable(relationId))
{ {
rteToSubqueryConverterContext->localTableList = conversionCandidates->localTableList =
list_delete_first(rteToSubqueryConverterContext->localTableList); list_delete_first(conversionCandidates->localTableList);
} }
else else
{ {
rteToSubqueryConverterContext->distributedTableList = conversionCandidates->distributedTableList =
list_delete_first(rteToSubqueryConverterContext->distributedTableList); list_delete_first(conversionCandidates->distributedTableList);
} }
} }
@ -373,16 +370,16 @@ ShouldConvertLocalTableJoinsToSubqueries(List *rangeTableList, Oid resultRelatio
*/ */
static bool static bool
ShouldConvertDistributedTable(FromExpr *joinTree, ShouldConvertDistributedTable(FromExpr *joinTree,
RTEToSubqueryConverterReference * RangeTableEntryDetails *
rteToSubqueryConverterReference) rangeTableEntryDetails)
{ {
if (rteToSubqueryConverterReference == NULL) if (rangeTableEntryDetails == NULL)
{ {
return false; return false;
} }
List *distRTEEqualityQuals = List *distRTEEqualityQuals =
FetchEqualityAttrNumsForRTEFromQuals(joinTree->quals, FetchEqualityAttrNumsForRTEFromQuals(joinTree->quals,
rteToSubqueryConverterReference->rteIndex); rangeTableEntryDetails->rteIndex);
Node *join = NULL; Node *join = NULL;
foreach_ptr(join, joinTree->fromlist) foreach_ptr(join, joinTree->fromlist)
@ -393,15 +390,15 @@ ShouldConvertDistributedTable(FromExpr *joinTree,
distRTEEqualityQuals = list_concat(distRTEEqualityQuals, distRTEEqualityQuals = list_concat(distRTEEqualityQuals,
FetchEqualityAttrNumsForRTEFromQuals( FetchEqualityAttrNumsForRTEFromQuals(
joinExpr->quals, joinExpr->quals,
rteToSubqueryConverterReference-> rangeTableEntryDetails->
rteIndex) rteIndex)
); );
} }
} }
bool hasUniqueFilter = HasUniqueFilter( bool hasUniqueFilter = HasUniqueFilter(
rteToSubqueryConverterReference->rangeTableEntry, rangeTableEntryDetails->rangeTableEntry,
rteToSubqueryConverterReference-> rangeTableEntryDetails->
restrictionList, distRTEEqualityQuals); restrictionList, distRTEEqualityQuals);
return hasUniqueFilter; return hasUniqueFilter;
} }
@ -505,18 +502,15 @@ RequiredAttrNumbersForRelation(RangeTblEntry *relationRte,
/* /*
* CreateRTEToSubqueryConverterContext returns a range table entry which has the most filters * CreateConversionCandidates creates the conversion candidates that might
* on it along with the restrictions (e.g., fills **restrictionList). * be converted to a subquery so that citus planners can work.
*
* The function also gets a boolean localTable parameter, so the caller
* can choose to run the function for only local tables or distributed tables.
*/ */
static RTEToSubqueryConverterContext * static ConversionCandidates *
CreateRTEToSubqueryConverterContext(RecursivePlanningContext *context, CreateConversionCandidates(RecursivePlanningContext *context,
List *rangeTableList) List *rangeTableList)
{ {
RTEToSubqueryConverterContext *rteToSubqueryConverterContext = palloc0( ConversionCandidates *conversionCandidates = palloc0(
sizeof(RTEToSubqueryConverterContext)); sizeof(ConversionCandidates));
int rteIndex = 0; int rteIndex = 0;
RangeTblEntry *rangeTableEntry = NULL; RangeTblEntry *rangeTableEntry = NULL;
@ -525,24 +519,24 @@ CreateRTEToSubqueryConverterContext(RecursivePlanningContext *context,
rteIndex++; rteIndex++;
if (rangeTableEntry->rtekind == RTE_SUBQUERY) if (rangeTableEntry->rtekind == RTE_SUBQUERY)
{ {
rteToSubqueryConverterContext->hasSubqueryRTE = true; conversionCandidates->hasSubqueryRTE = true;
} }
/* we're only interested in tables */ /* we're only interested in tables */
if (!SubqueryConvertableRelationForJoin(rangeTableEntry)) if (!IsRecursivelyPlannableRelation(rangeTableEntry))
{ {
continue; continue;
} }
RTEToSubqueryConverterReference *rteToSubqueryConverter = palloc( RangeTableEntryDetails *rangeTableEntryDetails = palloc0(
sizeof(RTEToSubqueryConverterReference)); sizeof(RangeTableEntryDetails));
rteToSubqueryConverter->rangeTableEntry = rangeTableEntry; rangeTableEntryDetails->rangeTableEntry = rangeTableEntry;
rteToSubqueryConverter->rteIndex = rteIndex; rangeTableEntryDetails->rteIndex = rteIndex;
rteToSubqueryConverter->restrictionList = GetRestrictInfoListForRelation( rangeTableEntryDetails->restrictionList = GetRestrictInfoListForRelation(
rangeTableEntry, rangeTableEntry,
context-> context->
plannerRestrictionContext, 1); plannerRestrictionContext, 1);
rteToSubqueryConverter->requiredAttributeNumbers = RequiredAttrNumbersForRelation( rangeTableEntryDetails->requiredAttributeNumbers = RequiredAttrNumbersForRelation(
rangeTableEntry, context); rangeTableEntry, context);
bool referenceOrDistributedTable = IsCitusTableType(rangeTableEntry->relid, bool referenceOrDistributedTable = IsCitusTableType(rangeTableEntry->relid,
@ -551,16 +545,16 @@ CreateRTEToSubqueryConverterContext(RecursivePlanningContext *context,
DISTRIBUTED_TABLE); DISTRIBUTED_TABLE);
if (referenceOrDistributedTable) if (referenceOrDistributedTable)
{ {
rteToSubqueryConverterContext->distributedTableList = conversionCandidates->distributedTableList =
lappend(rteToSubqueryConverterContext->distributedTableList, lappend(conversionCandidates->distributedTableList,
rteToSubqueryConverter); rangeTableEntryDetails);
} }
else else
{ {
rteToSubqueryConverterContext->localTableList = conversionCandidates->localTableList =
lappend(rteToSubqueryConverterContext->localTableList, lappend(conversionCandidates->localTableList,
rteToSubqueryConverter); rangeTableEntryDetails);
} }
} }
return rteToSubqueryConverterContext; return conversionCandidates;
} }

View File

@ -132,12 +132,6 @@ static DeferredErrorMessage * ModifyPartialQuerySupported(Query *queryTree, bool
multiShardQuery, multiShardQuery,
Oid *distributedTableId); Oid *distributedTableId);
static bool NodeIsFieldStore(Node *node); static bool NodeIsFieldStore(Node *node);
static DeferredErrorMessage * DeferErrorIfUnsupportedModifyQueryWithLocalTable(
Query *query);
static DeferredErrorMessage * DeferErrorIfUnsupportedModifyQueryWithCitusLocalTable(
RTEListProperties *rteListProperties, Oid targetRelationId);
static DeferredErrorMessage * DeferErrorIfUnsupportedModifyQueryWithPostgresLocalTable(
RTEListProperties *rteListProperties, Oid targetRelationId);
static DeferredErrorMessage * MultiShardUpdateDeleteSupported(Query *originalQuery, static DeferredErrorMessage * MultiShardUpdateDeleteSupported(Query *originalQuery,
PlannerRestrictionContext * PlannerRestrictionContext *
plannerRestrictionContext); plannerRestrictionContext);
@ -307,7 +301,8 @@ IsRouterPlannable(Query *query, PlannerRestrictionContext *plannerRestrictionCon
DeferredErrorMessage *deferredErrorMessage = NULL; DeferredErrorMessage *deferredErrorMessage = NULL;
if (copyQuery->commandType == CMD_SELECT) if (copyQuery->commandType == CMD_SELECT)
{ {
deferredErrorMessage = MultiRouterPlannableQuery(copyQuery); deferredErrorMessage = DeferErrorIfUnsupportedRouterPlannableSelectQuery(
copyQuery);
} }
if (deferredErrorMessage) if (deferredErrorMessage)
{ {
@ -809,93 +804,6 @@ NodeIsFieldStore(Node *node)
} }
/*
* DeferErrorIfUnsupportedModifyQueryWithLocalTable returns DeferredErrorMessage
* for unsupported modify queries that cannot be planned by router planner due to
* unsupported usage of postgres local or citus local tables.
*/
static DeferredErrorMessage *
DeferErrorIfUnsupportedModifyQueryWithLocalTable(Query *query)
{
RTEListProperties *rteListProperties = GetRTEListPropertiesForQuery(query);
Oid targetRelationId = ModifyQueryResultRelationId(query);
DeferredErrorMessage *deferredErrorMessage =
DeferErrorIfUnsupportedModifyQueryWithCitusLocalTable(rteListProperties,
targetRelationId);
if (deferredErrorMessage)
{
return deferredErrorMessage;
}
deferredErrorMessage = DeferErrorIfUnsupportedModifyQueryWithPostgresLocalTable(
rteListProperties,
targetRelationId);
return deferredErrorMessage;
}
/*
* DeferErrorIfUnsupportedModifyQueryWithCitusLocalTable is a helper function
* that takes RTEListProperties & targetRelationId and returns deferred error
* if query is not supported due to unsupported usage of citus local tables.
*/
static DeferredErrorMessage *
DeferErrorIfUnsupportedModifyQueryWithCitusLocalTable(
RTEListProperties *rteListProperties, Oid targetRelationId)
{
if (rteListProperties->hasDistributedTable && rteListProperties->hasCitusLocalTable)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"cannot plan modifications with citus local tables and "
"distributed tables", NULL,
LOCAL_TABLE_SUBQUERY_CTE_HINT);
}
if (IsCitusTableType(targetRelationId, REFERENCE_TABLE) &&
rteListProperties->hasCitusLocalTable)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"cannot plan modifications of reference tables with citus "
"local tables", NULL,
LOCAL_TABLE_SUBQUERY_CTE_HINT);
}
return NULL;
}
/*
* DeferErrorIfUnsupportedModifyQueryWithPostgresLocalTable is a helper
* function that takes RTEListProperties & targetRelationId and returns
* deferred error if query is not supported due to unsupported usage of
* postgres local tables.
*/
static DeferredErrorMessage *
DeferErrorIfUnsupportedModifyQueryWithPostgresLocalTable(
RTEListProperties *rteListProperties, Oid targetRelationId)
{
if (rteListProperties->hasPostgresLocalTable &&
rteListProperties->hasCitusTable)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"cannot plan modifications with local tables involving "
"citus tables", NULL,
LOCAL_TABLE_SUBQUERY_CTE_HINT);
}
if (!IsCitusTable(targetRelationId))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"cannot plan modifications of local tables involving "
"distributed tables",
NULL, NULL);
}
return NULL;
}
/* /*
* ModifyQuerySupported returns NULL if the query only contains supported * ModifyQuerySupported returns NULL if the query only contains supported
* features, otherwise it returns an error description. * features, otherwise it returns an error description.

View File

@ -1400,8 +1400,16 @@ ContainsTableToBeConvertedToSubquery(List *rangeTableList, Oid resultRelationId)
{ {
return false; return false;
} }
return ContainsLocalTableDistributedTableJoin(rangeTableList) || if (ContainsLocalTableDistributedTableJoin(rangeTableList))
ContainsLocalTableSubqueryJoin(rangeTableList, resultRelationId); {
return true;
}
if (ContainsLocalTableSubqueryJoin(rangeTableList, resultRelationId))
{
return true;
}
return false;
} }
@ -1420,7 +1428,7 @@ AllDataLocallyAccessible(List *rangeTableList)
/* TODO:: check if it has distributed table */ /* TODO:: check if it has distributed table */
return false; return false;
} }
if (!SubqueryConvertableRelationForJoin(rangeTableEntry)) if (!IsRecursivelyPlannableRelation(rangeTableEntry))
{ {
continue; continue;
} }
@ -1457,11 +1465,11 @@ AllDataLocallyAccessible(List *rangeTableList)
/* /*
* SubqueryConvertableRelationForJoin returns true if the given range table entry * IsRecursivelyPlannableRelation returns true if the given range table entry
* is a relation type that can be converted to a subquery. * is a relation type that can be converted to a subquery.
*/ */
bool bool
SubqueryConvertableRelationForJoin(RangeTblEntry *rangeTableEntry) IsRecursivelyPlannableRelation(RangeTblEntry *rangeTableEntry)
{ {
if (rangeTableEntry->rtekind != RTE_RELATION) if (rangeTableEntry->rtekind != RTE_RELATION)
{ {
@ -1487,7 +1495,7 @@ ContainsLocalTableDistributedTableJoin(List *rangeTableList)
{ {
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
if (!SubqueryConvertableRelationForJoin(rangeTableEntry)) if (!IsRecursivelyPlannableRelation(rangeTableEntry))
{ {
continue; continue;
} }
@ -1529,7 +1537,7 @@ ContainsLocalTableSubqueryJoin(List *rangeTableList, Oid resultRelationId)
containsSubquery = true; containsSubquery = true;
} }
if (!SubqueryConvertableRelationForJoin(rangeTableEntry)) if (!IsRecursivelyPlannableRelation(rangeTableEntry))
{ {
continue; continue;
} }

View File

@ -1910,7 +1910,6 @@ GetRestrictInfoListForRelation(RangeTblEntry *rangeTblEntry,
*/ */
Expr *copyOfRestrictClause = (Expr *) copyObject((Node *) restrictionClause); Expr *copyOfRestrictClause = (Expr *) copyObject((Node *) restrictionClause);
List *varClauses = pull_var_clause_default((Node *) copyOfRestrictClause); List *varClauses = pull_var_clause_default((Node *) copyOfRestrictClause);
ListCell *varClauseCell = NULL;
Var *column = NULL; Var *column = NULL;
foreach_ptr(column, varClauses) foreach_ptr(column, varClauses)
{ {

View File

@ -894,7 +894,7 @@ PrunableExpressionsWalker(PruningTreeNode *node, ClauseWalkerContext *context)
* VarConstOpExprClause check whether an expression is a valid comparison of a Var to a Const. * VarConstOpExprClause check whether an expression is a valid comparison of a Var to a Const.
* Also obtaining the var with constant when valid. * Also obtaining the var with constant when valid.
*/ */
static bool bool
VarConstOpExprClause(OpExpr *opClause, Var **varClause, Const **constantClause) VarConstOpExprClause(OpExpr *opClause, Var **varClause, Const **constantClause)
{ {
Var *foundVarClause = NULL; Var *foundVarClause = NULL;

View File

@ -67,6 +67,6 @@ extern void ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry,
extern bool ContainsLocalTableSubqueryJoin(List *rangeTableList, Oid resultRelationId); extern bool ContainsLocalTableSubqueryJoin(List *rangeTableList, Oid resultRelationId);
extern bool ContainsTableToBeConvertedToSubquery(List *rangeTableList, Oid extern bool ContainsTableToBeConvertedToSubquery(List *rangeTableList, Oid
resultRelationId); resultRelationId);
extern bool SubqueryConvertableRelationForJoin(RangeTblEntry *rangeTableEntry); extern bool IsRecursivelyPlannableRelation(RangeTblEntry *rangeTableEntry);
#endif /* RECURSIVE_PLANNING_H */ #endif /* RECURSIVE_PLANNING_H */

View File

@ -39,6 +39,16 @@ SELECT create_distributed_table('distributed_partitioned_table', 'key');
(1 row) (1 row)
CREATE TABLE local_partitioned_table(key int, value text) PARTITION BY RANGE (key);
CREATE TABLE local_partitioned_table_1 PARTITION OF local_partitioned_table FOR VALUES FROM (0) TO (10);
CREATE TABLE local_partitioned_table_2 PARTITION OF local_partitioned_table FOR VALUES FROM (10) TO (20);
CREATE TABLE distributed_table_composite (key int, value text, value_2 jsonb, primary key (key, value));
SELECT create_distributed_table('distributed_table_composite', 'key');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SET client_min_messages TO DEBUG1; SET client_min_messages TO DEBUG1;
-- the user doesn't allow local / distributed table joinn -- the user doesn't allow local / distributed table joinn
SET citus.local_table_join_policy TO 'never'; SET citus.local_table_join_policy TO 'never';
@ -121,6 +131,34 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c
0 0
(1 row) (1 row)
-- partititoned local tables should work as well
SELECT count(*) FROM distributed_table JOIN local_partitioned_table USING(key);
DEBUG: Wrapping local relation "local_partitioned_table" to a subquery: SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0
DEBUG: generating subplan XXX_1 for subquery SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.distributed_table JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) local_partitioned_table USING (key))
count
---------------------------------------------------------------------
0
(1 row)
SELECT count(*) FROM reference_table JOIN local_partitioned_table USING(key);
DEBUG: Wrapping local relation "local_partitioned_table" to a subquery: SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0
DEBUG: generating subplan XXX_1 for subquery SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.reference_table JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) local_partitioned_table USING (key))
count
---------------------------------------------------------------------
0
(1 row)
SELECT count(*) FROM distributed_table JOIN local_partitioned_table USING(key) JOIN reference_table USING (key);
DEBUG: Wrapping local relation "local_partitioned_table" to a subquery: SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0
DEBUG: generating subplan XXX_1 for subquery SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((local_table_join.distributed_table JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) local_partitioned_table USING (key)) JOIN local_table_join.reference_table USING (key))
count
---------------------------------------------------------------------
0
(1 row)
-- partitioned tables should work as well -- partitioned tables should work as well
SELECT count(*) FROM distributed_partitioned_table JOIN postgres_table USING(key); SELECT count(*) FROM distributed_partitioned_table JOIN postgres_table USING(key);
DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
@ -149,6 +187,43 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c
0 0
(1 row) (1 row)
SELECT count(*) FROM distributed_partitioned_table JOIN local_partitioned_table USING(key);
DEBUG: Wrapping local relation "local_partitioned_table" to a subquery: SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0
DEBUG: generating subplan XXX_1 for subquery SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.distributed_partitioned_table JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) local_partitioned_table USING (key))
count
---------------------------------------------------------------------
0
(1 row)
SELECT count(*) FROM distributed_partitioned_table JOIN local_partitioned_table USING(key) WHERE distributed_partitioned_table.key = 10;
DEBUG: Wrapping local relation "local_partitioned_table" to a subquery: SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0
DEBUG: generating subplan XXX_1 for subquery SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.distributed_partitioned_table JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) local_partitioned_table USING (key)) WHERE (distributed_partitioned_table.key OPERATOR(pg_catalog.=) 10)
count
---------------------------------------------------------------------
0
(1 row)
SELECT count(*) FROM distributed_partitioned_table JOIN local_partitioned_table USING(key) JOIN reference_table USING (key);
DEBUG: Wrapping local relation "local_partitioned_table" to a subquery: SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0
DEBUG: generating subplan XXX_1 for subquery SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((local_table_join.distributed_partitioned_table JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) local_partitioned_table USING (key)) JOIN local_table_join.reference_table USING (key))
count
---------------------------------------------------------------------
0
(1 row)
-- TODO:: We should probably recursively plan postgres table here because primary key is on key,value not key.
SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key) WHERE distributed_table_composite.key = 10;
DEBUG: Wrapping local relation "distributed_table_composite" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_composite JOIN local_table_join.postgres_table USING (key)) WHERE (distributed_table_composite.key OPERATOR(pg_catalog.=) 10)
count
---------------------------------------------------------------------
0
(1 row)
-- a unique index on key so dist table should be recursively planned -- a unique index on key so dist table should be recursively planned
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey USING(key); SELECT count(*) FROM postgres_table JOIN distributed_table_pkey USING(key);
DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
@ -815,11 +890,47 @@ WHERE
DEBUG: Wrapping local relation "distributed_table_windex" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE true OFFSET 0 DEBUG: Wrapping local relation "distributed_table_windex" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE true OFFSET 0
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE true OFFSET 0 DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE true OFFSET 0
DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.citus_local SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_windex WHERE (distributed_table_windex.key OPERATOR(pg_catalog.=) citus_local.key) DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.citus_local SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_windex WHERE (distributed_table_windex.key OPERATOR(pg_catalog.=) citus_local.key)
-- complex queries
SELECT count(*) FROM postgres_table JOIN (SELECT * FROM (SELECT * FROM distributed_table LIMIT 1) d1) d2 using (key) JOIN reference_table USING(key) JOIN citus_local USING (key) JOIN (SELECT * FROM citus_local) c1 USING (key) WHERE d2.key > 10 AND d2.key = 10;
DEBUG: push down of limit count: 1
DEBUG: generating subplan XXX_1 for subquery SELECT key, value, value_2 FROM local_table_join.distributed_table LIMIT 1
DEBUG: generating subplan XXX_2 for subquery SELECT key, value FROM local_table_join.citus_local
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((((local_table_join.postgres_table JOIN (SELECT d1.key, d1.value, d1.value_2 FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) d1) d2 USING (key)) JOIN local_table_join.reference_table USING (key)) JOIN local_table_join.citus_local USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) c1 USING (key)) WHERE ((d2.key OPERATOR(pg_catalog.>) 10) AND (d2.key OPERATOR(pg_catalog.=) 10))
count
---------------------------------------------------------------------
0
(1 row)
SELECT count(*) FROM postgres_table JOIN (SELECT * FROM (SELECT * FROM distributed_table LIMIT 1) d1) d2 using (key) JOIN reference_table USING(key) JOIN citus_local USING (key) JOIN (SELECT * FROM citus_local) c1 USING (key) WHERE d2.key > 10 AND d2.key = 10;
DEBUG: push down of limit count: 1
DEBUG: generating subplan XXX_1 for subquery SELECT key, value, value_2 FROM local_table_join.distributed_table LIMIT 1
DEBUG: generating subplan XXX_2 for subquery SELECT key, value FROM local_table_join.citus_local
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((((local_table_join.postgres_table JOIN (SELECT d1.key, d1.value, d1.value_2 FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) d1) d2 USING (key)) JOIN local_table_join.reference_table USING (key)) JOIN local_table_join.citus_local USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) c1 USING (key)) WHERE ((d2.key OPERATOR(pg_catalog.>) 10) AND (d2.key OPERATOR(pg_catalog.=) 10))
count
---------------------------------------------------------------------
0
(1 row)
-- TODO:: we should support this?
UPDATE reference_table SET key = 1 FROM postgres_table WHERE postgres_table.key = 10;
ERROR: relation postgres_table is not distributed
UPDATE reference_table SET key = 1 FROM (SELECT * FROM postgres_table) l WHERE l.key = 10;
DEBUG: generating subplan XXX_1 for subquery SELECT key, value, value_2 FROM local_table_join.postgres_table
DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.reference_table SET key = 1 FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) l WHERE (l.key OPERATOR(pg_catalog.=) 10)
-- TODO:: we should probably not wrap postgres_table here as there is a WHERE FALSE?
-- though then the planner could give an error
SELECT count(*) FROM postgres_table JOIN distributed_table USING(key) WHERE FALSE;
DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE false OFFSET 0
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE false OFFSET 0
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table USING (key)) WHERE false
count
---------------------------------------------------------------------
0
(1 row)
DROP TABLE citus_local; DROP TABLE citus_local;
CONTEXT: SQL statement "SELECT master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name)" CONTEXT: SQL statement "SELECT master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name)"
PL/pgSQL function citus_drop_trigger() line 15 at PERFORM PL/pgSQL function citus_drop_trigger() line 15 at PERFORM
CONTEXT: SQL statement "SELECT master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name)"
PL/pgSQL function citus_drop_trigger() line 15 at PERFORM
RESET client_min_messages; RESET client_min_messages;
SELECT master_remove_node('localhost', :master_port); SELECT master_remove_node('localhost', :master_port);
master_remove_node master_remove_node
@ -829,4 +940,4 @@ SELECT master_remove_node('localhost', :master_port);
\set VERBOSITY terse \set VERBOSITY terse
DROP SCHEMA local_table_join CASCADE; DROP SCHEMA local_table_join CASCADE;
NOTICE: drop cascades to 7 other objects NOTICE: drop cascades to 9 other objects

View File

@ -19,6 +19,12 @@ CREATE TABLE distributed_partitioned_table_1 PARTITION OF distributed_partitione
CREATE TABLE distributed_partitioned_table_2 PARTITION OF distributed_partitioned_table FOR VALUES FROM (10) TO (20); CREATE TABLE distributed_partitioned_table_2 PARTITION OF distributed_partitioned_table FOR VALUES FROM (10) TO (20);
SELECT create_distributed_table('distributed_partitioned_table', 'key'); SELECT create_distributed_table('distributed_partitioned_table', 'key');
CREATE TABLE local_partitioned_table(key int, value text) PARTITION BY RANGE (key);
CREATE TABLE local_partitioned_table_1 PARTITION OF local_partitioned_table FOR VALUES FROM (0) TO (10);
CREATE TABLE local_partitioned_table_2 PARTITION OF local_partitioned_table FOR VALUES FROM (10) TO (20);
CREATE TABLE distributed_table_composite (key int, value text, value_2 jsonb, primary key (key, value));
SELECT create_distributed_table('distributed_table_composite', 'key');
SET client_min_messages TO DEBUG1; SET client_min_messages TO DEBUG1;
@ -51,11 +57,23 @@ SELECT count(*) FROM distributed_table JOIN postgres_table USING(key);
SELECT count(*) FROM reference_table JOIN postgres_table USING(key); SELECT count(*) FROM reference_table JOIN postgres_table USING(key);
SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) JOIN reference_table USING (key); SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) JOIN reference_table USING (key);
-- partititoned local tables should work as well
SELECT count(*) FROM distributed_table JOIN local_partitioned_table USING(key);
SELECT count(*) FROM reference_table JOIN local_partitioned_table USING(key);
SELECT count(*) FROM distributed_table JOIN local_partitioned_table USING(key) JOIN reference_table USING (key);
-- partitioned tables should work as well -- partitioned tables should work as well
SELECT count(*) FROM distributed_partitioned_table JOIN postgres_table USING(key); SELECT count(*) FROM distributed_partitioned_table JOIN postgres_table USING(key);
SELECT count(*) FROM distributed_partitioned_table JOIN postgres_table USING(key) WHERE distributed_partitioned_table.key = 10; SELECT count(*) FROM distributed_partitioned_table JOIN postgres_table USING(key) WHERE distributed_partitioned_table.key = 10;
SELECT count(*) FROM distributed_partitioned_table JOIN postgres_table USING(key) JOIN reference_table USING (key); SELECT count(*) FROM distributed_partitioned_table JOIN postgres_table USING(key) JOIN reference_table USING (key);
SELECT count(*) FROM distributed_partitioned_table JOIN local_partitioned_table USING(key);
SELECT count(*) FROM distributed_partitioned_table JOIN local_partitioned_table USING(key) WHERE distributed_partitioned_table.key = 10;
SELECT count(*) FROM distributed_partitioned_table JOIN local_partitioned_table USING(key) JOIN reference_table USING (key);
-- TODO:: We should probably recursively plan postgres table here because primary key is on key,value not key.
SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key) WHERE distributed_table_composite.key = 10;
-- a unique index on key so dist table should be recursively planned -- a unique index on key so dist table should be recursively planned
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey USING(key); SELECT count(*) FROM postgres_table JOIN distributed_table_pkey USING(key);
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey USING(value); SELECT count(*) FROM postgres_table JOIN distributed_table_pkey USING(value);
@ -352,6 +370,18 @@ FROM
WHERE WHERE
distributed_table_windex.key = citus_local.key; distributed_table_windex.key = citus_local.key;
-- complex queries
SELECT count(*) FROM postgres_table JOIN (SELECT * FROM (SELECT * FROM distributed_table LIMIT 1) d1) d2 using (key) JOIN reference_table USING(key) JOIN citus_local USING (key) JOIN (SELECT * FROM citus_local) c1 USING (key) WHERE d2.key > 10 AND d2.key = 10;
SELECT count(*) FROM postgres_table JOIN (SELECT * FROM (SELECT * FROM distributed_table LIMIT 1) d1) d2 using (key) JOIN reference_table USING(key) JOIN citus_local USING (key) JOIN (SELECT * FROM citus_local) c1 USING (key) WHERE d2.key > 10 AND d2.key = 10;
-- TODO:: we should support this?
UPDATE reference_table SET key = 1 FROM postgres_table WHERE postgres_table.key = 10;
UPDATE reference_table SET key = 1 FROM (SELECT * FROM postgres_table) l WHERE l.key = 10;
-- TODO:: we should probably not wrap postgres_table here as there is a WHERE FALSE?
-- though then the planner could give an error
SELECT count(*) FROM postgres_table JOIN distributed_table USING(key) WHERE FALSE;
DROP TABLE citus_local; DROP TABLE citus_local;
RESET client_min_messages; RESET client_min_messages;
SELECT master_remove_node('localhost', :master_port); SELECT master_remove_node('localhost', :master_port);