Add more tests

pull/4358/head
Sait Talha Nisanci 2020-12-10 21:31:26 +03:00
parent f7c1509fed
commit f5dd5379b2
13 changed files with 364 additions and 89 deletions

View File

@ -125,6 +125,7 @@
#include "utils/guc.h"
#include "utils/lsyscache.h"
#define INVALID_RTE_IDENTITY -1
/*
* Managed via a GUC
@ -158,8 +159,8 @@ static List * RequiredAttrNumbersForRelation(RangeTblEntry *relationRte,
static ConversionCandidates * CreateConversionCandidates(PlannerRestrictionContext *
plannerRestrictionContext,
List *rangeTableList,
Oid resultRelationId);
static void GetAllUniqueIndexes(Form_pg_index indexForm, List **uniqueIndexes);
int resultRTEIdentity);
static void AppendUniqueIndexColumnsToList(Form_pg_index indexForm, List **uniqueIndexes);
static RangeTableEntryDetails * GetNextRTEToConvertToSubquery(ConversionCandidates *
conversionCandidates,
PlannerRestrictionContext *
@ -167,7 +168,7 @@ static RangeTableEntryDetails * GetNextRTEToConvertToSubquery(ConversionCandidat
static void RemoveFromConversionCandidates(ConversionCandidates *conversionCandidates,
int rteIdentity);
static bool AllRangeTableEntriesHaveUniqueIndex(List *rangeTableEntryDetailsList);
static bool FirstIntListContainsSecondIntList(List *firstIntList, List *secondIntList);
static bool FirstIsSuperSetOfSecond(List *firstIntList, List *secondIntList);
/*
* RecursivelyPlanLocalTableJoins gets a query and the planner
@ -181,14 +182,15 @@ RecursivelyPlanLocalTableJoins(Query *query,
PlannerRestrictionContext *plannerRestrictionContext =
GetPlannerRestrictionContext(context);
Oid resultRelationId = InvalidOid;
int resultRTEIdentity = INVALID_RTE_IDENTITY;
if (IsModifyCommand(query))
{
resultRelationId = ModifyQueryResultRelationId(query);
RangeTblEntry *resultRTE = ExtractResultRelationRTE(query);
resultRTEIdentity = GetRTEIdentity(resultRTE);
}
ConversionCandidates *conversionCandidates =
CreateConversionCandidates(plannerRestrictionContext,
rangeTableList, resultRelationId);
rangeTableList, resultRTEIdentity);
while (ShouldConvertLocalTableJoinsToSubqueries(query, rangeTableList,
plannerRestrictionContext))
@ -362,12 +364,12 @@ HasConstantFilterOnUniqueColumn(RangeTblEntry *rangeTableEntry,
FetchEqualityAttrNumsForRTE((Node *) restrictClauseList);
List *uniqueIndexColumnsList = ExecuteFunctionOnEachTableIndex(rangeTableEntry->relid,
GetAllUniqueIndexes);
AppendUniqueIndexColumnsToList);
IndexColumns *indexColumns = NULL;
foreach_ptr(indexColumns, uniqueIndexColumnsList)
{
List *uniqueIndexColumnNos = indexColumns->indexColumnNos;
if (FirstIntListContainsSecondIntList(rteEqualityColumnsNos,
if (FirstIsSuperSetOfSecond(rteEqualityColumnsNos,
uniqueIndexColumnNos))
{
return true;
@ -378,11 +380,11 @@ HasConstantFilterOnUniqueColumn(RangeTblEntry *rangeTableEntry,
/*
* FirstIntListContainsSecondIntList returns true if the first int List
* FirstIsSuperSetOfSecond returns true if the first int List
* contains every element of the second int List.
*/
static bool
FirstIntListContainsSecondIntList(List *firstIntList, List *secondIntList)
FirstIsSuperSetOfSecond(List *firstIntList, List *secondIntList)
{
int curInt = 0;
foreach_int(curInt, secondIntList)
@ -397,13 +399,11 @@ FirstIntListContainsSecondIntList(List *firstIntList, List *secondIntList)
/*
* GetAllUniqueIndexes adds the given index's column numbers if it is a
* AppendUniqueIndexColumnsToList adds the given index's column numbers if it is a
* unique index.
* TODO:: if there is a unique index on a multiple column, then we should
* probably return true only if all the columns in the index exist in the filter.
*/
static void
GetAllUniqueIndexes(Form_pg_index indexForm, List **uniqueIndexGroups)
AppendUniqueIndexColumnsToList(Form_pg_index indexForm, List **uniqueIndexGroups)
{
if (indexForm->indisunique || indexForm->indisprimary)
{
@ -471,22 +471,25 @@ RequiredAttrNumbersForRelation(RangeTblEntry *rangeTableEntry,
*/
static ConversionCandidates *
CreateConversionCandidates(PlannerRestrictionContext *plannerRestrictionContext,
List *rangeTableList, Oid resultRelationId)
List *rangeTableList, int resultRTEIdentity)
{
ConversionCandidates *conversionCandidates =
palloc0(sizeof(ConversionCandidates));
RangeTblEntry *rangeTableEntry = NULL;
foreach_ptr(rangeTableEntry, rangeTableList)
{
/* we're only interested in tables */
if (!IsRecursivelyPlannableRelation(rangeTableEntry))
{
continue;
}
int rteIdentity = GetRTEIdentity(rangeTableEntry);
/* result relation cannot converted to a subquery */
if (resultRelationId == rangeTableEntry->relid)
if (resultRTEIdentity == rteIdentity)
{
continue;
}
@ -497,7 +500,7 @@ CreateConversionCandidates(PlannerRestrictionContext *plannerRestrictionContext,
{
continue;
}
int rteIdentity = GetRTEIdentity(rangeTableEntry);
RangeTableEntryDetails *rangeTableEntryDetails =
palloc0(sizeof(RangeTableEntryDetails));

View File

@ -3639,9 +3639,7 @@ NodeIsRangeTblRefReferenceTable(Node *node, List *rangeTableList)
/*
* FetchEqualityAttrNumsForRTE fetches the attribute numbers from quals
* which:
* - has equality operator
* - belongs to rangeTableEntry with rteIndex
* which has equality operator
*/
List *
FetchEqualityAttrNumsForRTE(Node *node)
@ -3699,9 +3697,7 @@ FetchEqualityAttrNumsForList(List *nodeList)
/*
* FetchEqualityAttrNumsForRTEOpExpr fetches the attribute numbers from opExpr
* which:
* - has equality operator
* - belongs to rangeTableEntry with rteIndex
* which has equality operator.
*/
static List *
FetchEqualityAttrNumsForRTEOpExpr(OpExpr *opExpr)
@ -3723,9 +3719,7 @@ FetchEqualityAttrNumsForRTEOpExpr(OpExpr *opExpr)
/*
* FetchEqualityAttrNumsForRTEBoolExpr fetches the attribute numbers from boolExpr
* which:
* - has equality operator
* - belongs to rangeTableEntry with rteIndex
* which has equality operator
*/
static List *
FetchEqualityAttrNumsForRTEBoolExpr(BoolExpr *boolExpr)

View File

@ -520,19 +520,8 @@ ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery,
{
return deferredError;
}
uint32 rangeTableId = 1;
CmdType commandType = queryTree->commandType;
Oid resultRelationId = ModifyQueryResultRelationId(queryTree);
*distributedTableIdOutput = resultRelationId;
Var *partitionColumn = NULL;
if (IsCitusTable(resultRelationId))
{
partitionColumn = PartitionColumn(resultRelationId, rangeTableId);
}
deferredError = DeferErrorIfModifyView(queryTree);
if (deferredError != NULL)
{
@ -624,9 +613,13 @@ ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery,
}
}
resultRelationId = ModifyQueryResultRelationId(queryTree);
rangeTableId = 1;
Oid resultRelationId = ModifyQueryResultRelationId(queryTree);
*distributedTableIdOutput = resultRelationId;
uint32 rangeTableId = 1;
Var *partitionColumn = NULL;
if (IsCitusTable(resultRelationId))
{
partitionColumn = PartitionColumn(resultRelationId, rangeTableId);
@ -2265,6 +2258,10 @@ PlanRouterQuery(Query *originalQuery,
}
/*
* ContainsOnlyLocalTables returns true if there is only
* local tables and not any distributed or reference table.
*/
static bool
ContainsOnlyLocalTables(RTEListProperties *rteProperties)
{

View File

@ -47,6 +47,7 @@
static RangeTblEntry * AnchorRte(Query *subquery);
static List * UnionRelationRestrictionLists(List *firstRelationList,
List *secondRelationList);
static void MakeVarAttNosSequential(List *targetList);
/*
@ -261,6 +262,9 @@ SubqueryColocated(Query *subquery, ColocatedJoinChecker *checker)
* Note that the query returned by this function does not contain any filters or
* projections. The returned query should be used cautiosly and it is mostly
* designed for generating a stub query.
*
* allTargetList will contain all columns for the given rteRelation but for the ones
* that are not required, it will have NULL entries.
*/
Query *
WrapRteRelationIntoSubquery(RangeTblEntry *rteRelation, List *requiredAttributes,
@ -300,6 +304,11 @@ WrapRteRelationIntoSubquery(RangeTblEntry *rteRelation, List *requiredAttributes
if (shouldAssignDummyNullColumn && !assignedDummyNullColumn)
{
/*
* in case there is no required column, we assign one dummy NULL target entry
* to the subquery targetList so that it has at least one target.
* (targetlist should have at least one element)
*/
subquery->targetList = lappend(subquery->targetList, targetEntry);
assignedDummyNullColumn = true;
}
@ -329,12 +338,38 @@ WrapRteRelationIntoSubquery(RangeTblEntry *rteRelation, List *requiredAttributes
subquery->targetList = lappend(subquery->targetList, targetEntry);
}
}
MakeVarAttNosSequential(*allTargetList);
relation_close(relation, NoLock);
return subquery;
}
/*
* MakeVarAttNosSequential changes the attribute numbers of the given targetList
* to sequential numbers, [1, 2, 3] ...
*/
static void
MakeVarAttNosSequential(List *targetList)
{
TargetEntry *entry = NULL;
int attrNo = 1;
foreach_ptr(entry, targetList)
{
if (IsA(entry->expr, Var))
{
Var *var = (Var *) entry->expr;
/*
* the inner subquery is an intermediate result hence
* the attribute no's should be in ordinal order. [1, 2, 3...]
*/
var->varattno = attrNo++;
}
}
}
/*
* UnionRelationRestrictionLists merges two relation restriction lists

View File

@ -197,8 +197,7 @@ static bool ModifiesLocalTableWithRemoteCitusLocalTable(List *rangeTableList);
static void GetRangeTableEntriesFromJoinTree(Node *joinNode, List *rangeTableList,
List **joinRangeTableEntries);
static Query * CreateOuterSubquery(RangeTblEntry *rangeTableEntry,
List *allTargetList, List *requiredAttrNumbers);
static void MakeVarAttNosSequential(List *targetList);
List *outerSubqueryTargetList);
static List * GenerateRequiredColNamesFromTargetList(List *targetList);
static char * GetRelationNameAndAliasName(RangeTblEntry *rangeTablentry);
@ -371,8 +370,6 @@ RecursivelyPlanSubqueriesAndCTEs(Query *query, RecursivePlanningContext *context
* Logical planner cannot handle "local_table" [OUTER] JOIN "dist_table", or
* a query with local table/citus local table and subquery. We convert local/citus local
* tables to a subquery until they can be planned.
* This is the last call in this function since we want the other calls to be finished
* so that we can check if the current plan is router plannable at any step within this function.
*/
RecursivelyPlanLocalTableJoins(query, context, rangeTableList);
}
@ -1462,15 +1459,24 @@ ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry,
List *requiredAttrNumbers,
RecursivePlanningContext *context)
{
List *allTargetList = NIL;
List *outerQueryTargetList = NIL;
Query *subquery = WrapRteRelationIntoSubquery(rangeTableEntry, requiredAttrNumbers,
&allTargetList);
&outerQueryTargetList);
List *restrictionList =
GetRestrictInfoListForRelation(rangeTableEntry,
context->plannerRestrictionContext);
List *copyRestrictionList = copyObject(restrictionList);
Expr *andedBoundExpressions = make_ands_explicit(copyRestrictionList);
subquery->jointree->quals = (Node *) andedBoundExpressions;
/*
* Originally the quals were pointing to the RTE and its varno
* was pointing to its index in rtable. However now we converted the RTE
* to a subquery and the quals should be pointing to that subquery, which
* is the only RTE in its rtable, hence we update the varnos so that they
* point to the subquery RTE.
* Originally: rtable: [rte1, current_rte, rte3...]
* Now: rtable: [rte1, subquery[current_rte], rte3...] --subquery[current_rte] refers to its rtable.
*/
UpdateVarNosInNode(subquery, SINGLE_RTE_INDEX);
/* replace the function with the constructed subquery */
@ -1499,8 +1505,7 @@ ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry,
"unexpected state: query should have been recursively planned")));
}
Query *outerSubquery = CreateOuterSubquery(rangeTableEntry, allTargetList,
requiredAttrNumbers);
Query *outerSubquery = CreateOuterSubquery(rangeTableEntry, outerQueryTargetList);
rangeTableEntry->subquery = outerSubquery;
}
@ -1534,11 +1539,9 @@ GetRelationNameAndAliasName(RangeTblEntry *rangeTableEntry)
* the given range table entry in its rtable.
*/
static Query *
CreateOuterSubquery(RangeTblEntry *rangeTableEntry, List *allTargetList,
List *requiredAttrNumbers)
CreateOuterSubquery(RangeTblEntry *rangeTableEntry, List *outerSubqueryTargetList)
{
MakeVarAttNosSequential(allTargetList);
List *innerSubqueryColNames = GenerateRequiredColNamesFromTargetList(allTargetList);
List *innerSubqueryColNames = GenerateRequiredColNamesFromTargetList(outerSubqueryTargetList);
Query *outerSubquery = makeNode(Query);
outerSubquery->commandType = CMD_SELECT;
@ -1554,36 +1557,11 @@ CreateOuterSubquery(RangeTblEntry *rangeTableEntry, List *allTargetList,
newRangeTableRef->rtindex = 1;
outerSubquery->jointree = makeFromExpr(list_make1(newRangeTableRef), NULL);
outerSubquery->targetList = allTargetList;
outerSubquery->targetList = outerSubqueryTargetList;
return outerSubquery;
}
/*
* MakeVarAttNosSequential changes the attribute numbers of the given targetList
* to sequential numbers, [1, 2, 3] ...
*/
static void
MakeVarAttNosSequential(List *targetList)
{
TargetEntry *entry = NULL;
int attrNo = 1;
foreach_ptr(entry, targetList)
{
if (IsA(entry->expr, Var))
{
Var *var = (Var *) entry->expr;
/*
* the inner subquery is an intermediate result hence
* the attribute no's should be in ordinal order. [1, 2, 3...]
*/
var->varattno = attrNo++;
}
}
}
/*
* GenerateRequiredColNamesFromTargetList generates the required colnames
* from the given target list.
@ -1612,7 +1590,7 @@ GenerateRequiredColNamesFromTargetList(List *targetList)
/*
* UpdateVarNosInNode iterates the Vars in the
* given node and updates the varno's as the newVarNo.
* given node's join tree quals and updates the varno's as the newVarNo.
*/
static void
UpdateVarNosInNode(Query *query, Index newVarNo)

View File

@ -818,7 +818,8 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT co
-- view treated as subquery, so should work
SELECT count(*) FROM view_3, distributed_table;
NOTICE: executing the command locally: SELECT count(*) AS count FROM (citus_local_table_queries.citus_local_table_2_1509001 citus_local_table_2(a, b) JOIN citus_local_table_queries.reference_table_1509002 reference_table(a, b) USING (a))
NOTICE: executing the command locally: SELECT a FROM citus_local_table_queries.citus_local_table_2_1509001 citus_local_table_2 WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM ((SELECT citus_local_table_2_1.a, NULL::integer AS b FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) citus_local_table_2_1) citus_local_table_2 JOIN citus_local_table_queries.reference_table_1509002 reference_table(a, b) USING (a))
count
---------------------------------------------------------------------
6

View File

@ -438,7 +438,7 @@ WITH cte_1 AS (SELECT * FROM dist_table ORDER BY 1 LIMIT 1)
SELECT * FROM ref JOIN local ON (a = x) JOIN cte_1 ON (local.x = cte_1.a);
NOTICE: executing the command locally: SELECT a FROM coordinator_shouldhaveshards.dist_table_1503017 dist_table WHERE true ORDER BY a LIMIT '1'::bigint
NOTICE: executing the command locally: SELECT a FROM coordinator_shouldhaveshards.dist_table_1503020 dist_table WHERE true ORDER BY a LIMIT '1'::bigint
NOTICE: executing the command locally: SELECT ref.a, ref.b, local.x, local.y, cte_1.a FROM ((coordinator_shouldhaveshards.ref_1503016 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) JOIN (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) cte_1 ON ((local.x OPERATOR(pg_catalog.=) cte_1.a)))
NOTICE: executing the command locally: SELECT ref.a, ref.b, local.x, local.y, cte_1.a FROM ((coordinator_shouldhaveshards.ref_1503016 ref JOIN (SELECT local_1.x, local_1.y FROM (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) local_1) local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) JOIN (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) cte_1 ON ((local.x OPERATOR(pg_catalog.=) cte_1.a)))
a | b | x | y | a
---------------------------------------------------------------------
1 | 2 | 1 | 2 | 1

View File

@ -316,6 +316,147 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c
100
(1 row)
-- similar tests in transaction block should work fine
BEGIN;
-- materialized views should work too
SELECT count(*) FROM distributed_table JOIN mv1 USING(key);
DEBUG: Wrapping relation "mv1" to a subquery
DEBUG: generating subplan XXX_1 for subquery SELECT key FROM local_table_join.mv1 WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.distributed_table JOIN (SELECT mv1_1.key, NULL::text AS value, NULL::jsonb AS value_2 FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) mv1_1) mv1 USING (key))
count
---------------------------------------------------------------------
100
(1 row)
SELECT count(*) FROM (SELECT * FROM distributed_table) d1 JOIN mv1 USING(key);
DEBUG: Wrapping relation "mv1" to a subquery
DEBUG: generating subplan XXX_1 for subquery SELECT key FROM local_table_join.mv1 WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT distributed_table.key, distributed_table.value, distributed_table.value_2 FROM local_table_join.distributed_table) d1 JOIN (SELECT mv1_1.key, NULL::text AS value, NULL::jsonb AS value_2 FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) mv1_1) mv1 USING (key))
count
---------------------------------------------------------------------
100
(1 row)
SELECT count(*) FROM reference_table JOIN mv1 USING(key);
DEBUG: Wrapping relation "mv1" to a subquery
DEBUG: generating subplan XXX_1 for subquery SELECT key FROM local_table_join.mv1 WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.reference_table JOIN (SELECT mv1_1.key, NULL::text AS value, NULL::jsonb AS value_2 FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) mv1_1) mv1 USING (key))
count
---------------------------------------------------------------------
100
(1 row)
SELECT count(*) FROM distributed_table JOIN mv1 USING(key) JOIN reference_table USING (key);
DEBUG: Wrapping relation "mv1" to a subquery
DEBUG: generating subplan XXX_1 for subquery SELECT key FROM local_table_join.mv1 WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((local_table_join.distributed_table JOIN (SELECT mv1_1.key, NULL::text AS value, NULL::jsonb AS value_2 FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) mv1_1) mv1 USING (key)) JOIN local_table_join.reference_table USING (key))
count
---------------------------------------------------------------------
100
(1 row)
SELECT count(*) FROM distributed_table JOIN mv2 USING(key);
DEBUG: Wrapping relation "mv2" to a subquery
DEBUG: generating subplan XXX_1 for subquery SELECT key FROM local_table_join.mv2 WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.distributed_table JOIN (SELECT mv2_1.key, NULL::text AS value, NULL::jsonb AS value_2 FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) mv2_1) mv2 USING (key))
count
---------------------------------------------------------------------
100
(1 row)
SELECT count(*) FROM (SELECT * FROM distributed_table) d1 JOIN mv2 USING(key);
DEBUG: Wrapping relation "mv2" to a subquery
DEBUG: generating subplan XXX_1 for subquery SELECT key FROM local_table_join.mv2 WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT distributed_table.key, distributed_table.value, distributed_table.value_2 FROM local_table_join.distributed_table) d1 JOIN (SELECT mv2_1.key, NULL::text AS value, NULL::jsonb AS value_2 FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) mv2_1) mv2 USING (key))
count
---------------------------------------------------------------------
100
(1 row)
SELECT count(*) FROM reference_table JOIN mv2 USING(key);
DEBUG: Wrapping relation "mv2" to a subquery
DEBUG: generating subplan XXX_1 for subquery SELECT key FROM local_table_join.mv2 WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.reference_table JOIN (SELECT mv2_1.key, NULL::text AS value, NULL::jsonb AS value_2 FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) mv2_1) mv2 USING (key))
count
---------------------------------------------------------------------
100
(1 row)
SELECT count(*) FROM distributed_table JOIN mv2 USING(key) JOIN reference_table USING (key);
DEBUG: Wrapping relation "mv2" to a subquery
DEBUG: generating subplan XXX_1 for subquery SELECT key FROM local_table_join.mv2 WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((local_table_join.distributed_table JOIN (SELECT mv2_1.key, NULL::text AS value, NULL::jsonb AS value_2 FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) mv2_1) mv2 USING (key)) JOIN local_table_join.reference_table USING (key))
count
---------------------------------------------------------------------
100
(1 row)
-- foreign tables should work too
SELECT count(*) FROM foreign_table JOIN distributed_table USING(key);
DEBUG: Wrapping relation "foreign_table" to a subquery
DEBUG: generating subplan XXX_1 for subquery SELECT key FROM local_table_join.foreign_table WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT foreign_table_1.key, NULL::text AS value FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) foreign_table_1) foreign_table JOIN local_table_join.distributed_table USING (key))
count
---------------------------------------------------------------------
0
(1 row)
-- partitioned tables should work as well
SELECT count(*) FROM distributed_partitioned_table JOIN postgres_table USING(key);
DEBUG: Wrapping relation "postgres_table" to a subquery
DEBUG: generating subplan XXX_1 for subquery SELECT key FROM local_table_join.postgres_table WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.distributed_partitioned_table JOIN (SELECT postgres_table_1.key, NULL::text AS value, NULL::jsonb AS value_2 FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) postgres_table_1) postgres_table USING (key))
count
---------------------------------------------------------------------
100
(1 row)
SELECT count(*) FROM distributed_partitioned_table JOIN postgres_table USING(key) WHERE distributed_partitioned_table.key = 10;
DEBUG: Wrapping relation "postgres_table" to a subquery
DEBUG: generating subplan XXX_1 for subquery SELECT key FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 10)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.distributed_partitioned_table JOIN (SELECT postgres_table_1.key, NULL::text AS value, NULL::jsonb AS value_2 FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) postgres_table_1) postgres_table USING (key)) WHERE (distributed_partitioned_table.key OPERATOR(pg_catalog.=) 10)
count
---------------------------------------------------------------------
1
(1 row)
SELECT count(*) FROM distributed_partitioned_table JOIN postgres_table USING(key) JOIN reference_table USING (key);
DEBUG: Wrapping relation "postgres_table" to a subquery
DEBUG: generating subplan XXX_1 for subquery SELECT key FROM local_table_join.postgres_table WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((local_table_join.distributed_partitioned_table JOIN (SELECT postgres_table_1.key, NULL::text AS value, NULL::jsonb AS value_2 FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) postgres_table_1) postgres_table USING (key)) JOIN local_table_join.reference_table USING (key))
count
---------------------------------------------------------------------
100
(1 row)
SELECT count(*) FROM distributed_partitioned_table JOIN local_partitioned_table USING(key);
DEBUG: Wrapping relation "local_partitioned_table" to a subquery
DEBUG: generating subplan XXX_1 for subquery SELECT key FROM local_table_join.local_partitioned_table WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.distributed_partitioned_table JOIN (SELECT local_partitioned_table_1.key, NULL::text AS value FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) local_partitioned_table_1) local_partitioned_table USING (key))
count
---------------------------------------------------------------------
100
(1 row)
SELECT count(*) FROM distributed_partitioned_table JOIN local_partitioned_table USING(key) WHERE distributed_partitioned_table.key = 10;
DEBUG: Wrapping relation "local_partitioned_table" to a subquery
DEBUG: generating subplan XXX_1 for subquery SELECT key FROM local_table_join.local_partitioned_table WHERE (key OPERATOR(pg_catalog.=) 10)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.distributed_partitioned_table JOIN (SELECT local_partitioned_table_1.key, NULL::text AS value FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) local_partitioned_table_1) local_partitioned_table USING (key)) WHERE (distributed_partitioned_table.key OPERATOR(pg_catalog.=) 10)
count
---------------------------------------------------------------------
1
(1 row)
SELECT count(*) FROM distributed_partitioned_table JOIN local_partitioned_table USING(key) JOIN reference_table USING (key);
DEBUG: Wrapping relation "local_partitioned_table" to a subquery
DEBUG: generating subplan XXX_1 for subquery SELECT key FROM local_table_join.local_partitioned_table WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((local_table_join.distributed_partitioned_table JOIN (SELECT local_partitioned_table_1.key, NULL::text AS value FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) local_partitioned_table_1) local_partitioned_table USING (key)) JOIN local_table_join.reference_table USING (key))
count
---------------------------------------------------------------------
100
(1 row)
ROLLBACK;
-- the conversions should be independent from the order of table entries in the query
SELECT COUNT(*) FROM postgres_table join distributed_table_pkey using(key) join local_partitioned_table using(key) join distributed_table using(key) where distributed_table_pkey.key = 5;
DEBUG: Wrapping relation "postgres_table" to a subquery
@ -397,6 +538,17 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c
0
(1 row)
-- different column names
SELECT a FROM postgres_table foo (a,b,c) JOIN distributed_table ON (distributed_table.key = foo.a) ORDER BY 1 LIMIT 1;
DEBUG: Wrapping relation "postgres_table" "foo" to a subquery
DEBUG: generating subplan XXX_1 for subquery SELECT a AS key FROM local_table_join.postgres_table foo(a, b, c) WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT foo.a FROM ((SELECT foo_1.a AS key, NULL::text AS value, NULL::jsonb AS value_2 FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) foo_1(a)) foo(a, b, c) JOIN local_table_join.distributed_table ON ((distributed_table.key OPERATOR(pg_catalog.=) foo.a))) ORDER BY foo.a LIMIT 1
DEBUG: push down of limit count: 1
a
---------------------------------------------------------------------
1
(1 row)
-- We will plan postgres table as the index is on key,value not just key
SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key) WHERE distributed_table_composite.key = 10;
DEBUG: Wrapping relation "postgres_table" to a subquery
@ -478,6 +630,15 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c
1
(1 row)
-- Known bug: #4269
SELECT count(*) FROM distributed_table_composite foo(a,b,c) JOIN postgres_table ON(foo.a > 1)
WHERE foo.a IN (SELECT COUNT(*) FROM local_partitioned_table) AND (foo.a = 10 OR foo.b ='text');
DEBUG: generating subplan XXX_1 for subquery SELECT count(*) AS count FROM local_table_join.local_partitioned_table
DEBUG: Wrapping relation "distributed_table_composite" "foo" to a subquery
DEBUG: generating subplan XXX_2 for subquery SELECT a AS key, b AS value FROM local_table_join.distributed_table_composite foo(a, b, c) WHERE ((a OPERATOR(pg_catalog.>) 1) AND ((a OPERATOR(pg_catalog.=) 10) OR (b OPERATOR(pg_catalog.=) 'text'::text)))
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT foo_1.a AS key, foo_1.b AS value, NULL::jsonb AS value_2 FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) foo_1(a, b)) foo(a, b, c) JOIN local_table_join.postgres_table ON ((foo.a OPERATOR(pg_catalog.>) 1))) WHERE ((foo.a OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint))) AND ((foo.a OPERATOR(pg_catalog.=) 10) OR (foo.b OPERATOR(pg_catalog.=) 'text'::text)))
ERROR: column "a" does not exist
CONTEXT: while executing command on localhost:xxxxx
-- a unique index on key so dist table should be recursively planned
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey USING(key);
DEBUG: Wrapping relation "postgres_table" to a subquery

View File

@ -21,6 +21,15 @@ SELECT create_distributed_table('distributed_table', 'key');
(1 row)
CREATE TYPE new_type AS (n int, m text);
CREATE TABLE local_table_type (key int, value new_type, value_2 jsonb);
CREATE TABLE distributed_table_type (key int, value new_type, value_2 jsonb);
SELECT create_distributed_table('distributed_table_type', 'key');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- Setting the debug level so that filters can be observed
SET client_min_messages TO DEBUG1;
-- for the purposes of these tests, we always want to recursively
@ -39,6 +48,45 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c
0
(1 row)
-- composite types can be pushed down
SELECT count(*)
FROM distributed_table d1
JOIN local_table_type d2 using(key)
WHERE d2.value = (83, 'citus8.3')::new_type;
DEBUG: Wrapping relation "local_table_type" "d2" to a subquery
DEBUG: generating subplan XXX_1 for subquery SELECT key, value FROM push_down_filters.local_table_type d2 WHERE (value OPERATOR(pg_catalog.=) '(83,citus8.3)'::push_down_filters.new_type)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (push_down_filters.distributed_table d1 JOIN (SELECT d2_1.key, d2_1.value, NULL::jsonb AS value_2 FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'text'::citus_copy_format) intermediate_result(key integer, value push_down_filters.new_type)) d2_1) d2 USING (key)) WHERE (d2.value OPERATOR(pg_catalog.=) ROW(83, 'citus8.3'::text)::push_down_filters.new_type)
count
---------------------------------------------------------------------
0
(1 row)
-- composite types can be pushed down
SELECT count(*)
FROM distributed_table d1
JOIN local_table_type d2 using(key)
WHERE d2.value = (83, 'citus8.3')::new_type
AND d2.key = 10;
DEBUG: Wrapping relation "local_table_type" "d2" to a subquery
DEBUG: generating subplan XXX_1 for subquery SELECT key, value FROM push_down_filters.local_table_type d2 WHERE ((key OPERATOR(pg_catalog.=) 10) AND (value OPERATOR(pg_catalog.=) '(83,citus8.3)'::push_down_filters.new_type))
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (push_down_filters.distributed_table d1 JOIN (SELECT d2_1.key, d2_1.value, NULL::jsonb AS value_2 FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'text'::citus_copy_format) intermediate_result(key integer, value push_down_filters.new_type)) d2_1) d2 USING (key)) WHERE ((d2.value OPERATOR(pg_catalog.=) ROW(83, 'citus8.3'::text)::push_down_filters.new_type) AND (d2.key OPERATOR(pg_catalog.=) 10))
count
---------------------------------------------------------------------
0
(1 row)
-- join on a composite type works
SELECT count(*)
FROM distributed_table_type d1
JOIN local_table_type d2 USING(value);
DEBUG: Wrapping relation "local_table_type" "d2" to a subquery
DEBUG: generating subplan XXX_1 for subquery SELECT value FROM push_down_filters.local_table_type d2 WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (push_down_filters.distributed_table_type d1 JOIN (SELECT NULL::integer AS key, d2_1.value, NULL::jsonb AS value_2 FROM (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'text'::citus_copy_format) intermediate_result(value push_down_filters.new_type)) d2_1) d2 USING (value))
count
---------------------------------------------------------------------
0
(1 row)
-- scalar array expressions can be pushed down
SELECT count(*)
FROM distributed_table u1
@ -425,4 +473,4 @@ ERROR: complex joins are only supported when all distributed tables are co-loca
\set VERBOSITY terse
RESET client_min_messages;
DROP SCHEMA push_down_filters CASCADE;
NOTICE: drop cascades to 2 other objects
NOTICE: drop cascades to 5 other objects

View File

@ -283,20 +283,20 @@ SELECT test_reference_local_join_func();
WITH ins AS (INSERT INTO numbers VALUES (1) RETURNING *)
SELECT * FROM numbers, local_table;
NOTICE: executing the command locally: INSERT INTO replicate_ref_to_coordinator.numbers_8000001 (a) VALUES (1) RETURNING a
NOTICE: executing the command locally: SELECT numbers.a, local_table.a FROM replicate_ref_to_coordinator.numbers_8000001 numbers, replicate_ref_to_coordinator.local_table
NOTICE: executing the command locally: SELECT numbers.a, local_table.a FROM replicate_ref_to_coordinator.numbers_8000001 numbers, (SELECT local_table_1.a FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) local_table_1) local_table
a | a
---------------------------------------------------------------------
20 | 2
20 | 4
20 | 7
20 | 20
21 | 2
21 | 4
21 | 7
21 | 20
2 | 2
20 | 4
21 | 4
2 | 4
20 | 7
21 | 7
2 | 7
20 | 20
21 | 20
2 | 20
(12 rows)

View File

@ -110,7 +110,7 @@ test: multi_average_expression multi_working_columns multi_having_pushdown havin
test: multi_array_agg multi_limit_clause multi_orderby_limit_pushdown
test: multi_jsonb_agg multi_jsonb_object_agg multi_json_agg multi_json_object_agg bool_agg ch_bench_having chbenchmark_all_queries expression_reference_join anonymous_columns
test: ch_bench_subquery_repartition
test: multi_agg_type_conversion multi_count_type_conversion recursive_relation_planning_restirction_pushdown
test: multi_agg_type_conversion multi_count_type_conversion recursive_relation_planning_restriction_pushdown
test: multi_partition_pruning single_hash_repartition_join
test: multi_join_pruning multi_hash_pruning intermediate_result_pruning
test: multi_null_minmax_value_pruning cursors

View File

@ -108,6 +108,32 @@ SELECT count(*) FROM distributed_partitioned_table JOIN local_partitioned_table
SELECT count(*) FROM distributed_partitioned_table JOIN local_partitioned_table USING(key) WHERE distributed_partitioned_table.key = 10;
SELECT count(*) FROM distributed_partitioned_table JOIN local_partitioned_table USING(key) JOIN reference_table USING (key);
-- similar tests in transaction block should work fine
BEGIN;
-- materialized views should work too
SELECT count(*) FROM distributed_table JOIN mv1 USING(key);
SELECT count(*) FROM (SELECT * FROM distributed_table) d1 JOIN mv1 USING(key);
SELECT count(*) FROM reference_table JOIN mv1 USING(key);
SELECT count(*) FROM distributed_table JOIN mv1 USING(key) JOIN reference_table USING (key);
SELECT count(*) FROM distributed_table JOIN mv2 USING(key);
SELECT count(*) FROM (SELECT * FROM distributed_table) d1 JOIN mv2 USING(key);
SELECT count(*) FROM reference_table JOIN mv2 USING(key);
SELECT count(*) FROM distributed_table JOIN mv2 USING(key) JOIN reference_table USING (key);
-- foreign tables should work too
SELECT count(*) FROM foreign_table JOIN distributed_table USING(key);
-- partitioned tables should work as well
SELECT count(*) FROM distributed_partitioned_table JOIN postgres_table USING(key);
SELECT count(*) FROM distributed_partitioned_table JOIN postgres_table USING(key) WHERE distributed_partitioned_table.key = 10;
SELECT count(*) FROM distributed_partitioned_table JOIN postgres_table USING(key) JOIN reference_table USING (key);
SELECT count(*) FROM distributed_partitioned_table JOIN local_partitioned_table USING(key);
SELECT count(*) FROM distributed_partitioned_table JOIN local_partitioned_table USING(key) WHERE distributed_partitioned_table.key = 10;
SELECT count(*) FROM distributed_partitioned_table JOIN local_partitioned_table USING(key) JOIN reference_table USING (key);
ROLLBACK;
-- the conversions should be independent from the order of table entries in the query
SELECT COUNT(*) FROM postgres_table join distributed_table_pkey using(key) join local_partitioned_table using(key) join distributed_table using(key) where distributed_table_pkey.key = 5;
SELECT COUNT(*) FROM postgres_table join local_partitioned_table using(key) join distributed_table_pkey using(key) join distributed_table using(key) where distributed_table_pkey.key = 5;
@ -119,6 +145,10 @@ SELECT count(*) FROM (SELECT *, random() FROM distributed_table_pkey) as d1 JOI
SELECT count(*) FROM (SELECT *, random() FROM distributed_partitioned_table) as d1 JOIN postgres_table ON (postgres_table.key = d1.key AND d1.key < postgres_table.key) WHERE d1.key = 1 AND false;
SELECT count(*) FROM (SELECT *, random() FROM distributed_partitioned_table) as d1 JOIN postgres_table ON (postgres_table.key::int = d1.key::int AND d1.key < postgres_table.key) WHERE d1.key::int = 1 AND false;
-- different column names
SELECT a FROM postgres_table foo (a,b,c) JOIN distributed_table ON (distributed_table.key = foo.a) ORDER BY 1 LIMIT 1;
-- We will plan postgres table as the index is on key,value not just key
SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key) WHERE distributed_table_composite.key = 10;
SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key) WHERE distributed_table_composite.key = 10 OR distributed_table_composite.key = 20;
@ -137,6 +167,10 @@ SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key)
WHERE (distributed_table_composite.key > 10 AND distributed_table_composite.key = 20)
OR (distributed_table_composite.value = 'text' AND distributed_table_composite.value = 'text');
-- Known bug: #4269
SELECT count(*) FROM distributed_table_composite foo(a,b,c) JOIN postgres_table ON(foo.a > 1)
WHERE foo.a IN (SELECT COUNT(*) FROM local_partitioned_table) AND (foo.a = 10 OR foo.b ='text');
-- a unique index on key so dist table should be recursively planned
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey USING(key);
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey USING(value);

View File

@ -21,6 +21,12 @@ CREATE TABLE local_table (key int, value int, time timestamptz);
CREATE TABLE distributed_table (key int, value int, metadata jsonb);
SELECT create_distributed_table('distributed_table', 'key');
CREATE TYPE new_type AS (n int, m text);
CREATE TABLE local_table_type (key int, value new_type, value_2 jsonb);
CREATE TABLE distributed_table_type (key int, value new_type, value_2 jsonb);
SELECT create_distributed_table('distributed_table_type', 'key');
-- Setting the debug level so that filters can be observed
SET client_min_messages TO DEBUG1;
@ -35,6 +41,24 @@ FROM distributed_table u1
JOIN distributed_table u2 USING(key)
JOIN local_table USING (key);
-- composite types can be pushed down
SELECT count(*)
FROM distributed_table d1
JOIN local_table_type d2 using(key)
WHERE d2.value = (83, 'citus8.3')::new_type;
-- composite types can be pushed down
SELECT count(*)
FROM distributed_table d1
JOIN local_table_type d2 using(key)
WHERE d2.value = (83, 'citus8.3')::new_type
AND d2.key = 10;
-- join on a composite type works
SELECT count(*)
FROM distributed_table_type d1
JOIN local_table_type d2 USING(value);
-- scalar array expressions can be pushed down
SELECT count(*)
FROM distributed_table u1