Recursively plan distributed tables only if all have unique filters

The previous algorithm was not consistent and it could convert different
RTEs based on the table orders in the query. Now we convert local tables
if there is a distributed table which doesn't have a unique index. So if
there are 4 tables, local1, local2, dist1, dist2_with_pkey then we will
convert local1 and local2 in `auto` mode. Converting a distributed table
is not that logical because as there is a distributed table without a
unique index, we will need to convert the local tables anyway. So
converting the distributed table with pkey is redundant.
pull/4358/head
Sait Talha Nisanci 2020-12-01 23:44:06 +03:00
parent a008fc611c
commit 0689f2ac1a
5 changed files with 131 additions and 78 deletions

View File

@ -60,6 +60,7 @@ typedef struct RangeTableEntryDetails
Index rteIndex;
List *restrictionList;
List *requiredAttributeNumbers;
bool hasUniqueIndex;
} RangeTableEntryDetails;
typedef struct ConversionCandidates
@ -71,26 +72,30 @@ typedef struct ConversionCandidates
static Oid GetResultRelationId(Query *query);
static bool ShouldConvertLocalTableJoinsToSubqueries(Query *query, List *rangeTableList,
Oid resultRelationId,
PlannerRestrictionContext *plannerRestrictionContext);
static bool HasUniqueFilter(RangeTblEntry *distRTE, List *distRTERestrictionList,
List *requiredAttrNumbersForDistRTE);
static bool ShouldConvertDistributedTable(FromExpr *joinTree,
RangeTableEntryDetails *distRTEContext);
PlannerRestrictionContext *
plannerRestrictionContext);
static bool HasUniqueIndex(FromExpr *joinTree,
RangeTblEntry *rangeTableEntry, Index rteIndex);
static List * RequiredAttrNumbersForRelation(RangeTblEntry *relationRte,
PlannerRestrictionContext *plannerRestrictionContext);
static ConversionCandidates * CreateConversionCandidates(
PlannerRestrictionContext *plannerRestrictionContext,
List *rangeTableList,
Oid resultRelationId);
PlannerRestrictionContext *
plannerRestrictionContext);
static ConversionCandidates * CreateConversionCandidates(FromExpr *joinTree,
PlannerRestrictionContext *
plannerRestrictionContext,
List *rangeTableList,
Oid resultRelationId);
static void GetAllUniqueIndexes(Form_pg_index indexForm, List **uniqueIndexes);
static RangeTableEntryDetails * GetNextRTEToConvertToSubquery(FromExpr *joinTree,
ConversionCandidates *conversionCandidates,
PlannerRestrictionContext* plannerRestrictionContext,
ConversionCandidates *
conversionCandidates,
PlannerRestrictionContext *
plannerRestrictionContext,
Oid resultRelationId);
static void GetRangeTableEntriesFromJoinTree(Node *joinNode, List *rangeTableList,
List **joinRangeTableEntries);
static void RemoveFromConversionCandidates(ConversionCandidates *conversionCandidates, Oid
relationId);
static bool AllRangeTableEntriesHaveUniqueIndex(List *rangeTableEntryDetailsList);
/*
* ConvertUnplannableTableJoinsToSubqueries gets a query and the planner
@ -101,18 +106,22 @@ void
ConvertUnplannableTableJoinsToSubqueries(Query *query,
RecursivePlanningContext *context)
{
PlannerRestrictionContext* plannerRestrictionContext = context->plannerRestrictionContext;
PlannerRestrictionContext *plannerRestrictionContext =
context->plannerRestrictionContext;
List *rangeTableList = NIL;
GetRangeTableEntriesFromJoinTree((Node *) query->jointree, query->rtable,
&rangeTableList);
Oid resultRelationId = GetResultRelationId(query);
if (!ShouldConvertLocalTableJoinsToSubqueries(query, rangeTableList, resultRelationId, plannerRestrictionContext)) {
if (!ShouldConvertLocalTableJoinsToSubqueries(query, rangeTableList, resultRelationId,
plannerRestrictionContext))
{
return;
}
ConversionCandidates *conversionCandidates =
CreateConversionCandidates(plannerRestrictionContext, rangeTableList, resultRelationId);
CreateConversionCandidates(query->jointree, plannerRestrictionContext,
rangeTableList, resultRelationId);
RangeTableEntryDetails *rangeTableEntryDetails =
GetNextRTEToConvertToSubquery(query->jointree, conversionCandidates,
@ -120,7 +129,8 @@ ConvertUnplannableTableJoinsToSubqueries(Query *query,
resultRelationId);
while (ShouldConvertLocalTableJoinsToSubqueries(query, rangeTableList,
resultRelationId, plannerRestrictionContext))
resultRelationId,
plannerRestrictionContext))
{
ReplaceRTERelationWithRteSubquery(
rangeTableEntryDetails->rangeTableEntry,
@ -219,7 +229,6 @@ GetNextRTEToConvertToSubquery(FromExpr *joinTree,
{
localRTECandidate = linitial(conversionCandidates->localTableList);
}
if (list_length(conversionCandidates->distributedTableList) > 0)
{
distributedRTECandidate = linitial(conversionCandidates->distributedTableList);
@ -227,41 +236,48 @@ GetNextRTEToConvertToSubquery(FromExpr *joinTree,
if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_LOCAL)
{
if (localRTECandidate)
{
return localRTECandidate;
}
else
{
return distributedRTECandidate;
}
return localRTECandidate ? localRTECandidate : distributedRTECandidate;
}
else if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_DISTRIBUTED)
{
if (distributedRTECandidate)
{
return distributedRTECandidate;
}
else
{
return localRTECandidate;
}
return distributedRTECandidate ? distributedRTECandidate : localRTECandidate;
}
else
{
if (ShouldConvertDistributedTable(joinTree, distributedRTECandidate) ||
localRTECandidate == NULL)
bool allRangeTableEntriesHaveUniqueIndex = AllRangeTableEntriesHaveUniqueIndex(
conversionCandidates->distributedTableList);
if (allRangeTableEntriesHaveUniqueIndex)
{
return distributedRTECandidate;
return distributedRTECandidate ? distributedRTECandidate : localRTECandidate;
}
else
{
return localRTECandidate;
return localRTECandidate ? localRTECandidate : distributedRTECandidate;
}
}
}
/*
* AllRangeTableEntriesHaveUniqueIndex returns true if all of the RTE's in the given
* list have a unique index.
*/
static bool
AllRangeTableEntriesHaveUniqueIndex(List *rangeTableEntryDetailsList)
{
RangeTableEntryDetails *rangeTableEntryDetails = NULL;
foreach_ptr(rangeTableEntryDetails, rangeTableEntryDetailsList)
{
if (!rangeTableEntryDetails->hasUniqueIndex)
{
return false;
}
}
return true;
}
/*
* RemoveFromConversionCandidates removes an element from
* the relevant list based on the relation id.
@ -289,7 +305,8 @@ RemoveFromConversionCandidates(ConversionCandidates *conversionCandidates, Oid r
static bool
ShouldConvertLocalTableJoinsToSubqueries(Query *query, List *rangeTableList,
Oid resultRelationId,
PlannerRestrictionContext *plannerRestrictionContext)
PlannerRestrictionContext *
plannerRestrictionContext)
{
if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_NEVER)
{
@ -301,7 +318,8 @@ ShouldConvertLocalTableJoinsToSubqueries(Query *query, List *rangeTableList,
return false;
}
plannerRestrictionContext = FilterPlannerRestrictionForQuery(plannerRestrictionContext, query);
plannerRestrictionContext = FilterPlannerRestrictionForQuery(
plannerRestrictionContext, query);
if (IsRouterPlannable(query, plannerRestrictionContext))
{
return false;
@ -311,22 +329,18 @@ ShouldConvertLocalTableJoinsToSubqueries(Query *query, List *rangeTableList,
/*
* ShouldConvertDistributedTable returns true if we should convert the
* distributed table rte to a subquery. This will be the case if the distributed
* table has a unique index on a column that appears in filter.
* HasUniqueIndex returns true if the given rangeTableEntry has a constant
* filter on a unique column.
*/
static bool
ShouldConvertDistributedTable(FromExpr *joinTree,
RangeTableEntryDetails *
rangeTableEntryDetails)
HasUniqueIndex(FromExpr *joinTree, RangeTblEntry *rangeTableEntry, Index rteIndex)
{
if (rangeTableEntryDetails == NULL)
if (rangeTableEntry == NULL)
{
return false;
}
List *distRTEEqualityQuals =
FetchEqualityAttrNumsForRTEFromQuals(joinTree->quals,
rangeTableEntryDetails->rteIndex);
List *rteEqualityQuals =
FetchEqualityAttrNumsForRTEFromQuals(joinTree->quals, rteIndex);
Node *join = NULL;
foreach_ptr(join, joinTree->fromlist)
@ -334,37 +348,18 @@ ShouldConvertDistributedTable(FromExpr *joinTree,
if (IsA(join, JoinExpr))
{
JoinExpr *joinExpr = (JoinExpr *) join;
distRTEEqualityQuals = list_concat(distRTEEqualityQuals,
FetchEqualityAttrNumsForRTEFromQuals(
joinExpr->quals,
rangeTableEntryDetails->
rteIndex)
);
List *joinExprEqualityQuals = FetchEqualityAttrNumsForRTEFromQuals(
joinExpr->quals, rteIndex);
rteEqualityQuals = list_concat(rteEqualityQuals, joinExprEqualityQuals);
}
}
bool hasUniqueFilter = HasUniqueFilter(
rangeTableEntryDetails->rangeTableEntry,
rangeTableEntryDetails->
restrictionList, distRTEEqualityQuals);
return hasUniqueFilter;
}
/*
* HasUniqueFilter returns true if the given RTE has a unique filter
* on a column, which is a member of the given requiredAttrNumbersForDistRTE.
*/
static bool
HasUniqueFilter(RangeTblEntry *distRTE, List *distRTERestrictionList,
List *requiredAttrNumbersForDistRTE)
{
List *uniqueIndexes = ExecuteFunctionOnEachTableIndex(distRTE->relid,
List *uniqueIndexes = ExecuteFunctionOnEachTableIndex(rangeTableEntry->relid,
GetAllUniqueIndexes);
int columnNumber = 0;
foreach_int(columnNumber, uniqueIndexes)
{
if (list_member_int(requiredAttrNumbersForDistRTE, columnNumber))
if (list_member_int(rteEqualityQuals, columnNumber))
{
return true;
}
@ -450,7 +445,8 @@ RequiredAttrNumbersForRelation(RangeTblEntry *relationRte,
* be converted to a subquery so that citus planners can work.
*/
static ConversionCandidates *
CreateConversionCandidates(PlannerRestrictionContext *plannerRestrictionContext,
CreateConversionCandidates(FromExpr *joinTree,
PlannerRestrictionContext *plannerRestrictionContext,
List *rangeTableList, Oid resultRelationId)
{
ConversionCandidates *conversionCandidates = palloc0(
@ -487,6 +483,9 @@ CreateConversionCandidates(PlannerRestrictionContext *plannerRestrictionContext,
rangeTableEntry, plannerRestrictionContext, 1);
rangeTableEntryDetails->requiredAttributeNumbers = RequiredAttrNumbersForRelation(
rangeTableEntry, plannerRestrictionContext);
rangeTableEntryDetails->hasUniqueIndex = HasUniqueIndex(joinTree,
rangeTableEntry,
rteIndex);
if (referenceOrDistributedTable)
{

View File

@ -221,8 +221,8 @@ CreateModifyPlan(Query *originalQuery, Query *query,
distributedPlan->modLevel = RowModifyLevelForQuery(query);
distributedPlan->planningError = ModifyQuerySupported(query, originalQuery,
multiShardQuery,
plannerRestrictionContext);
multiShardQuery,
plannerRestrictionContext);
if (distributedPlan->planningError != NULL)
{

View File

@ -1506,7 +1506,7 @@ ContainsLocalTableDistributedTableJoin(List *rangeTableList)
{
containsDistributedTable = true;
}
else if (IsLocalTableRteOrMatView((Node*) rangeTableEntry))
else if (IsLocalTableRteOrMatView((Node *) rangeTableEntry))
{
/* we consider citus local tables as local table */
containsLocalTable = true;
@ -1542,7 +1542,8 @@ ContainsLocalTableSubqueryJoin(List *rangeTableList, Oid resultRelationId)
continue;
}
if (IsLocalTableRteOrMatView((Node*) rangeTableEntry) && rangeTableEntry->relid != resultRelationId)
if (IsLocalTableRteOrMatView((Node *) rangeTableEntry) &&
rangeTableEntry->relid != resultRelationId)
{
containsLocalTable = true;
}
@ -1675,6 +1676,7 @@ TransformFunctionRTE(RangeTblEntry *rangeTblEntry)
subquery->targetList = lappend(subquery->targetList, targetEntry);
}
}
/*
* If tupleDesc is NULL we have 2 different cases:
*
@ -1724,6 +1726,7 @@ TransformFunctionRTE(RangeTblEntry *rangeTblEntry)
columnType = list_nth_oid(rangeTblFunction->funccoltypes,
targetColumnIndex);
}
/* use the types in the function definition otherwise */
else
{

View File

@ -289,6 +289,51 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c
0
(1 row)
-- the conversions should be independent from the order of table entries in the query
SELECT COUNT(*) FROM postgres_table join distributed_table_pkey using(key) join local_partitioned_table using(key) join distributed_table using(key) where distributed_table_pkey.key = 5;
DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 5) OFFSET 0
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 5) OFFSET 0
DEBUG: Wrapping local relation "local_partitioned_table" to a subquery: SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0
DEBUG: generating subplan XXX_2 for subquery SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table JOIN local_table_join.distributed_table_pkey USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) local_partitioned_table USING (key)) JOIN local_table_join.distributed_table USING (key)) WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 5)
count
---------------------------------------------------------------------
0
(1 row)
SELECT COUNT(*) FROM postgres_table join local_partitioned_table using(key) join distributed_table_pkey using(key) join distributed_table using(key) where distributed_table_pkey.key = 5;
DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 5) OFFSET 0
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 5) OFFSET 0
DEBUG: Wrapping local relation "local_partitioned_table" to a subquery: SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0
DEBUG: generating subplan XXX_2 for subquery SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) local_partitioned_table USING (key)) JOIN local_table_join.distributed_table_pkey USING (key)) JOIN local_table_join.distributed_table USING (key)) WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 5)
count
---------------------------------------------------------------------
0
(1 row)
SELECT COUNT(*) FROM postgres_table join distributed_table using(key) join local_partitioned_table using(key) join distributed_table_pkey using(key) where distributed_table_pkey.key = 5;
DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 5) OFFSET 0
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 5) OFFSET 0
DEBUG: Wrapping local relation "local_partitioned_table" to a subquery: SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0
DEBUG: generating subplan XXX_2 for subquery SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table JOIN local_table_join.distributed_table USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) local_partitioned_table USING (key)) JOIN local_table_join.distributed_table_pkey USING (key)) WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 5)
count
---------------------------------------------------------------------
0
(1 row)
SELECT COUNT(*) FROM distributed_table_pkey join distributed_table using(key) join postgres_table using(key) join local_partitioned_table using(key) where distributed_table_pkey.key = 5;
DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 5) OFFSET 0
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 5) OFFSET 0
DEBUG: Wrapping local relation "local_partitioned_table" to a subquery: SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0
DEBUG: generating subplan XXX_2 for subquery SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((local_table_join.distributed_table_pkey JOIN local_table_join.distributed_table USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) local_partitioned_table USING (key)) WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 5)
count
---------------------------------------------------------------------
0
(1 row)
-- TODO:: We should probably recursively plan postgres table here because primary key is on key,value not key.
SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key) WHERE distributed_table_composite.key = 10;
DEBUG: Wrapping local relation "distributed_table_composite" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0

View File

@ -85,6 +85,12 @@ SELECT count(*) FROM distributed_partitioned_table JOIN local_partitioned_table
SELECT count(*) FROM distributed_partitioned_table JOIN local_partitioned_table USING(key) WHERE distributed_partitioned_table.key = 10;
SELECT count(*) FROM distributed_partitioned_table JOIN local_partitioned_table USING(key) JOIN reference_table USING (key);
-- the conversions should be independent from the order of table entries in the query
SELECT COUNT(*) FROM postgres_table join distributed_table_pkey using(key) join local_partitioned_table using(key) join distributed_table using(key) where distributed_table_pkey.key = 5;
SELECT COUNT(*) FROM postgres_table join local_partitioned_table using(key) join distributed_table_pkey using(key) join distributed_table using(key) where distributed_table_pkey.key = 5;
SELECT COUNT(*) FROM postgres_table join distributed_table using(key) join local_partitioned_table using(key) join distributed_table_pkey using(key) where distributed_table_pkey.key = 5;
SELECT COUNT(*) FROM distributed_table_pkey join distributed_table using(key) join postgres_table using(key) join local_partitioned_table using(key) where distributed_table_pkey.key = 5;
-- TODO:: We should probably recursively plan postgres table here because primary key is on key,value not key.
SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key) WHERE distributed_table_composite.key = 10;