Choose distributed table if it has a unique index in filter

When doing local-distributed table joins we convert one of them to
subquery. The current policy is that we convert distributed tables to
subquery if it has a unique index on a column that has unique
index(primary key also has a unique index).
pull/4358/head
Sait Talha Nisanci 2020-11-18 17:38:42 +03:00
parent f0aef67ed2
commit f3d55448b3
14 changed files with 809 additions and 384 deletions

View File

@ -249,6 +249,41 @@ CreateIndexStmtGetSchemaId(IndexStmt *createIndexStatement)
return namespaceId; return namespaceId;
} }
List* ExecuteFunctionOnEachTableIndex(Oid relationId, IndexProcesor indexProcessor) {
List *result = NIL;
ScanKeyData scanKey[1];
int scanKeyCount = 1;
PushOverrideEmptySearchPath(CurrentMemoryContext);
/* open system catalog and scan all indexes that belong to this table */
Relation pgIndex = table_open(IndexRelationId, AccessShareLock);
ScanKeyInit(&scanKey[0], Anum_pg_index_indrelid,
BTEqualStrategyNumber, F_OIDEQ, relationId);
SysScanDesc scanDescriptor = systable_beginscan(pgIndex,
IndexIndrelidIndexId, true, /* indexOK */
NULL, scanKeyCount, scanKey);
HeapTuple heapTuple = systable_getnext(scanDescriptor);
while (HeapTupleIsValid(heapTuple))
{
Form_pg_index indexForm = (Form_pg_index) GETSTRUCT(heapTuple);
indexProcessor(indexForm, &result);
heapTuple = systable_getnext(scanDescriptor);
}
/* clean up scan and close system catalog */
systable_endscan(scanDescriptor);
table_close(pgIndex, AccessShareLock);
/* revert back to original search_path */
PopOverrideSearchPath();
return result;
}
/* /*
* ExecuteFunctionOnEachTableIndex executes the given pgIndexProcessor function on each * ExecuteFunctionOnEachTableIndex executes the given pgIndexProcessor function on each

View File

@ -729,7 +729,6 @@ GatherIndexAndConstraintDefinitionList(Form_pg_index indexForm, List **indexDDLE
} }
} }
/* /*
* IndexImpliedByAConstraint is a helper function to be used while scanning * IndexImpliedByAConstraint is a helper function to be used while scanning
* pg_index. It returns true if the index identified by the given indexForm is * pg_index. It returns true if the index identified by the given indexForm is

View File

@ -0,0 +1,361 @@
#include "postgres.h"
#include "distributed/pg_version_constants.h"
#include "funcapi.h"
#include "catalog/pg_type.h"
#include "catalog/pg_class.h"
#include "catalog/pg_index.h"
#include "distributed/citus_nodes.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/commands.h"
#include "distributed/commands/multi_copy.h"
#include "distributed/distributed_planner.h"
#include "distributed/errormessage.h"
#include "distributed/local_distributed_join_planner.h"
#include "distributed/listutils.h"
#include "distributed/log_utils.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_logical_planner.h"
#include "distributed/multi_logical_optimizer.h"
#include "distributed/multi_router_planner.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_server_executor.h"
#include "distributed/query_colocation_checker.h"
#include "distributed/query_pushdown_planning.h"
#include "distributed/recursive_planning.h"
#include "distributed/relation_restriction_equivalence.h"
#include "distributed/log_utils.h"
#include "distributed/shard_pruning.h"
#include "distributed/version_compat.h"
#include "lib/stringinfo.h"
#include "optimizer/clauses.h"
#if PG_VERSION_NUM >= PG_VERSION_12
#include "optimizer/optimizer.h"
#else
#include "optimizer/var.h"
#endif
#include "optimizer/planner.h"
#include "optimizer/prep.h"
#include "parser/parsetree.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "nodes/nodes.h"
#include "nodes/nodeFuncs.h"
#include "nodes/pg_list.h"
#include "nodes/primnodes.h"
#if PG_VERSION_NUM >= PG_VERSION_12
#include "nodes/pathnodes.h"
#else
#include "nodes/relation.h"
#endif
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"
static bool ShouldConvertLocalTableJoinsToSubqueries(List* rangeTableList);
static bool HasUniqueFilter(RangeTblEntry* distRTE, List* distRTERestrictionList, List* requiredAttrNumbersForDistRTE);
static void AutoConvertLocalTableJoinToSubquery(RangeTblEntry* localRTE, RangeTblEntry* distRTE,
List* localRTERestrictionList, List* distRTERestrictionList,
List *requiredAttrNumbersForLocalRTE, List *requiredAttrNumbersForDistRTE);
static List * RequiredAttrNumbersForRelation(RangeTblEntry *relationRte,
RecursivePlanningContext *planningContext);
static RangeTblEntry * FindNextRTECandidate(PlannerRestrictionContext *
plannerRestrictionContext,
List *rangeTableList, List **restrictionList,
bool localTable);
static bool AllDataLocallyAccessible(List *rangeTableList);
static void GetAllUniqueIndexes(Form_pg_index indexForm, List** uniqueIndexes);
/*
* ConvertLocalTableJoinsToSubqueries gets a query and the planner
* restrictions. As long as there is a join between a local table
* and distributed table, the function wraps one table in a
* subquery (by also pushing the filters on the table down
* to the subquery).
*
* Once this function returns, there are no direct joins between
* local and distributed tables.
*/
void
ConvertLocalTableJoinsToSubqueries(Query *query,
RecursivePlanningContext *context)
{
List *rangeTableList = query->rtable;
if(!ShouldConvertLocalTableJoinsToSubqueries(rangeTableList)) {
return;
}
RangeTblEntry *resultRelation = ExtractResultRelationRTE(query);
while (ContainsLocalTableDistributedTableJoin(rangeTableList))
{
List *localTableRestrictList = NIL;
List *distributedTableRestrictList = NIL;
bool localTable = true;
PlannerRestrictionContext *plannerRestrictionContext =
context->plannerRestrictionContext;
RangeTblEntry *localRTECandidate =
FindNextRTECandidate(plannerRestrictionContext, rangeTableList,
&localTableRestrictList, localTable);
RangeTblEntry *distributedRTECandidate =
FindNextRTECandidate(plannerRestrictionContext, rangeTableList,
&distributedTableRestrictList, !localTable);
List *requiredAttrNumbersForLocalRte =
RequiredAttrNumbersForRelation(localRTECandidate, context);
List *requiredAttrNumbersForDistributedRte =
RequiredAttrNumbersForRelation(distributedRTECandidate, context);
if (resultRelation) {
if (resultRelation->relid == localRTECandidate->relid) {
ReplaceRTERelationWithRteSubquery(distributedRTECandidate,
distributedTableRestrictList,
requiredAttrNumbersForDistributedRte);
continue;
}else if (resultRelation->relid == distributedRTECandidate->relid) {
ReplaceRTERelationWithRteSubquery(localRTECandidate,
localTableRestrictList,
requiredAttrNumbersForLocalRte);
continue;
}
}
if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_LOCAL)
{
ReplaceRTERelationWithRteSubquery(localRTECandidate,
localTableRestrictList,
requiredAttrNumbersForLocalRte);
}
else if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_DISTRIBUTED)
{
ReplaceRTERelationWithRteSubquery(distributedRTECandidate,
distributedTableRestrictList,
requiredAttrNumbersForDistributedRte);
}
else if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_AUTO)
{
AutoConvertLocalTableJoinToSubquery(localRTECandidate, distributedRTECandidate,
localTableRestrictList, distributedTableRestrictList,
requiredAttrNumbersForLocalRte, requiredAttrNumbersForDistributedRte);
}
else
{
elog(ERROR, "unexpected local table join policy: %d", LocalTableJoinPolicy);
}
}
}
/*
* ShouldConvertLocalTableJoinsToSubqueries returns true if we should
* convert local-dist table joins to subqueries.
*/
static bool ShouldConvertLocalTableJoinsToSubqueries(List* rangeTableList) {
if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_NEVER)
{
/* user doesn't want Citus to enable local table joins */
return false;
}
if (!ContainsLocalTableDistributedTableJoin(rangeTableList))
{
/* nothing to do as there are no relevant joins */
return false;
}
if (AllDataLocallyAccessible(rangeTableList))
{
/* recursively planning is overkill, router planner can already handle this */
return false;
}
return true;
}
static void AutoConvertLocalTableJoinToSubquery(RangeTblEntry* localRTE, RangeTblEntry* distRTE,
List* localRTERestrictionList, List* distRTERestrictionList,
List *requiredAttrNumbersForLocalRTE, List *requiredAttrNumbersForDistRTE) {
bool hasUniqueFilter = HasUniqueFilter(distRTE, distRTERestrictionList, requiredAttrNumbersForDistRTE);
if (hasUniqueFilter) {
ReplaceRTERelationWithRteSubquery(distRTE,
distRTERestrictionList,
requiredAttrNumbersForDistRTE);
}else {
ReplaceRTERelationWithRteSubquery(localRTE,
localRTERestrictionList,
requiredAttrNumbersForLocalRTE);
}
}
// TODO:: This function should only consider equality,
// currently it will return true for dist.a > 5. We should check this from join->quals.
static bool HasUniqueFilter(RangeTblEntry* distRTE, List* distRTERestrictionList, List* requiredAttrNumbersForDistRTE) {
List* uniqueIndexes = ExecuteFunctionOnEachTableIndex(distRTE->relid, GetAllUniqueIndexes);
int columnNumber = 0;
foreach_int(columnNumber, uniqueIndexes) {
if (list_member_int(requiredAttrNumbersForDistRTE, columnNumber)) {
return true;
}
}
return false;
}
static void GetAllUniqueIndexes(Form_pg_index indexForm, List** uniqueIndexes) {
if (indexForm->indisunique || indexForm->indisprimary) {
for(int i = 0; i < indexForm->indkey.dim1; i++) {
*uniqueIndexes = list_append_unique_int(*uniqueIndexes, indexForm->indkey.values[i]);
}
}
}
/*
* RequiredAttrNumbersForRelation returns the required attribute numbers for
* the input RTE relation in order for the planning to succeed.
*
* The function could be optimized by not adding the columns that only appear
* WHERE clause as a filter (e.g., not a join clause).
*/
static List *
RequiredAttrNumbersForRelation(RangeTblEntry *relationRte,
RecursivePlanningContext *planningContext)
{
PlannerRestrictionContext *plannerRestrictionContext =
planningContext->plannerRestrictionContext;
/* TODO: Get rid of this hack, find relation restriction information directly */
PlannerRestrictionContext *filteredPlannerRestrictionContext =
FilterPlannerRestrictionForQuery(plannerRestrictionContext,
WrapRteRelationIntoSubquery(relationRte, NIL));
RelationRestrictionContext *relationRestrictionContext =
filteredPlannerRestrictionContext->relationRestrictionContext;
List *filteredRelationRestrictionList =
relationRestrictionContext->relationRestrictionList;
RelationRestriction *relationRestriction =
(RelationRestriction *) linitial(filteredRelationRestrictionList);
PlannerInfo *plannerInfo = relationRestriction->plannerInfo;
Query *queryToProcess = plannerInfo->parse;
int rteIndex = relationRestriction->index;
List *allVarsInQuery = pull_vars_of_level((Node *) queryToProcess, 0);
ListCell *varCell = NULL;
List *requiredAttrNumbers = NIL;
foreach(varCell, allVarsInQuery)
{
Var *var = (Var *) lfirst(varCell);
if (var->varno == rteIndex)
{
requiredAttrNumbers = list_append_unique_int(requiredAttrNumbers,
var->varattno);
}
}
return requiredAttrNumbers;
}
/*
* FindNextRTECandidate returns a range table entry which has the most filters
* on it along with the restrictions (e.g., fills **restrictionList).
*
* The function also gets a boolean localTable parameter, so the caller
* can choose to run the function for only local tables or distributed tables.
*/
static RangeTblEntry *
FindNextRTECandidate(PlannerRestrictionContext *plannerRestrictionContext,
List *rangeTableList, List **restrictionList,
bool localTable)
{
ListCell *rangeTableCell = NULL;
foreach(rangeTableCell, rangeTableList)
{
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
/* we're only interested in tables */
if (!(rangeTableEntry->rtekind == RTE_RELATION &&
rangeTableEntry->relkind == RELKIND_RELATION))
{
continue;
}
if (localTable && IsCitusTable(rangeTableEntry->relid))
{
continue;
}
if (!localTable && !IsCitusTable(rangeTableEntry->relid))
{
continue;
}
List *currentRestrictionList =
GetRestrictInfoListForRelation(rangeTableEntry,
plannerRestrictionContext, 1);
*restrictionList = currentRestrictionList;
return rangeTableEntry;
}
// TODO:: Put Illegal state error code
ereport(ERROR, (errmsg("unexpected state: could not find any RTE to convert to subquery in range table list")));
return NULL;
}
/*
* AllDataLocallyAccessible return true if all data for the relations in the
* rangeTableList is locally accessible.
*/
static bool
AllDataLocallyAccessible(List *rangeTableList)
{
ListCell *rangeTableCell = NULL;
foreach(rangeTableCell, rangeTableList)
{
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
/* we're only interested in tables */
if (!(rangeTableEntry->rtekind == RTE_RELATION &&
rangeTableEntry->relkind == RELKIND_RELATION))
{
continue;
}
Oid relationId = rangeTableEntry->relid;
if (!IsCitusTable(relationId))
{
/* local tables are locally accessible */
continue;
}
List *shardIntervalList = LoadShardIntervalList(relationId);
if (list_length(shardIntervalList) > 1)
{
/* we currently only consider single placement tables */
return false;
}
ShardInterval *shardInterval = linitial(shardIntervalList);
uint64 shardId = shardInterval->shardId;
ShardPlacement *localShardPlacement =
ShardPlacementOnGroup(shardId, GetLocalGroupId());
if (localShardPlacement == NULL)
{
/* the table doesn't have a placement on this node */
return false;
}
}
return true;
}

View File

@ -534,12 +534,7 @@ ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery,
{ {
return deferredError; return deferredError;
} }
deferredError = DeferErrorIfUnsupportedModifyQueryWithLocalTable(queryTree);
if (deferredError != NULL)
{
return deferredError;
}
Var *partitionColumn = NULL; Var *partitionColumn = NULL;
if (IsCitusTable(distributedTableId)) if (IsCitusTable(distributedTableId))
@ -638,11 +633,14 @@ ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery,
} }
} }
Oid distributedTableId = ModifyQueryResultRelationId(queryTree); distributedTableId = ModifyQueryResultRelationId(queryTree);
uint32 rangeTableId = 1; rangeTableId = 1;
Var *partitionColumn = PartitionColumn(distributedTableId, rangeTableId);
CmdType commandType = queryTree->commandType; if (IsCitusTable(distributedTableId))
{
partitionColumn = PartitionColumn(distributedTableId, rangeTableId);
}
commandType = queryTree->commandType;
if (commandType == CMD_INSERT || commandType == CMD_UPDATE || if (commandType == CMD_INSERT || commandType == CMD_UPDATE ||
commandType == CMD_DELETE) commandType == CMD_DELETE)
{ {

View File

@ -59,6 +59,7 @@
#include "distributed/commands/multi_copy.h" #include "distributed/commands/multi_copy.h"
#include "distributed/distributed_planner.h" #include "distributed/distributed_planner.h"
#include "distributed/errormessage.h" #include "distributed/errormessage.h"
#include "distributed/local_distributed_join_planner.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/log_utils.h" #include "distributed/log_utils.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
@ -109,21 +110,6 @@ int LocalTableJoinPolicy = LOCAL_JOIN_POLICY_AUTO;
/* track depth of current recursive planner query */ /* track depth of current recursive planner query */
static int recursivePlanningDepth = 0; static int recursivePlanningDepth = 0;
/*
* RecursivePlanningContext is used to recursively plan subqueries
* and CTEs, pull results to the coordinator, and push it back into
* the workers.
*/
typedef struct RecursivePlanningContext
{
int level;
uint64 planId;
bool allDistributionKeysInQueryAreEqual; /* used for some optimizations */
List *subPlanList;
PlannerRestrictionContext *plannerRestrictionContext;
} RecursivePlanningContext;
/* /*
* CteReferenceWalkerContext is used to collect CTE references in * CteReferenceWalkerContext is used to collect CTE references in
* CteReferenceListWalker. * CteReferenceListWalker.
@ -191,18 +177,6 @@ static bool ContainsReferencesToOuterQuery(Query *query);
static bool ContainsReferencesToOuterQueryWalker(Node *node, static bool ContainsReferencesToOuterQueryWalker(Node *node,
VarLevelsUpWalkerContext *context); VarLevelsUpWalkerContext *context);
static bool NodeContainsSubqueryReferencingOuterQuery(Node *node); static bool NodeContainsSubqueryReferencingOuterQuery(Node *node);
static void ConvertLocalTableJoinsToSubqueries(Query *query,
RecursivePlanningContext *planningContext);
static List * RequiredAttrNumbersForRelation(RangeTblEntry *relationRte,
RecursivePlanningContext *planningContext);
static RangeTblEntry * MostFilteredRte(PlannerRestrictionContext *
plannerRestrictionContext,
List *rangeTableList, List **restrictionList,
bool localTable);
static void ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry,
List *restrictionList,
List *requiredAttrNumbers);
static bool AllDataLocallyAccessible(List *rangeTableList);
static void WrapFunctionsInSubqueries(Query *query); static void WrapFunctionsInSubqueries(Query *query);
static void TransformFunctionRTE(RangeTblEntry *rangeTblEntry); static void TransformFunctionRTE(RangeTblEntry *rangeTblEntry);
static bool ShouldTransformRTE(RangeTblEntry *rangeTableEntry); static bool ShouldTransformRTE(RangeTblEntry *rangeTableEntry);
@ -210,7 +184,6 @@ static Query * BuildReadIntermediateResultsQuery(List *targetEntryList,
List *columnAliasList, List *columnAliasList,
Const *resultIdConst, Oid functionOid, Const *resultIdConst, Oid functionOid,
bool useBinaryCopyFormat); bool useBinaryCopyFormat);
/* /*
* GenerateSubplansForSubqueriesAndCTEs is a wrapper around RecursivelyPlanSubqueriesAndCTEs. * GenerateSubplansForSubqueriesAndCTEs is a wrapper around RecursivelyPlanSubqueriesAndCTEs.
* The function returns the subplans if necessary. For the details of when/how subplans are * The function returns the subplans if necessary. For the details of when/how subplans are
@ -1371,248 +1344,11 @@ NodeContainsSubqueryReferencingOuterQuery(Node *node)
return false; return false;
} }
/*
* ConvertLocalTableJoinsToSubqueries gets a query and the planner
* restrictions. As long as there is a join between a local table
* and distributed table, the function wraps one table in a
* subquery (by also pushing the filters on the table down
* to the subquery).
*
* Once this function returns, there are no direct joins between
* local and distributed tables.
*/
static void
ConvertLocalTableJoinsToSubqueries(Query *query,
RecursivePlanningContext *context)
{
List *rangeTableList = query->rtable;
if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_NEVER)
{
/* user doesn't want Citus to enable local table joins */
return;
}
if (!ContainsLocalTableDistributedTableJoin(rangeTableList))
{
/* nothing to do as there are no relevant joins */
return;
}
if (AllDataLocallyAccessible(rangeTableList))
{
/* recursively planning is overkill, router planner can already handle this */
return;
}
RangeTblEntry *resultRelation = ExtractResultRelationRTE(query);
while (ContainsLocalTableDistributedTableJoin(rangeTableList))
{
List *localTableRestrictList = NIL;
List *distributedTableRestrictList = NIL;
bool localTable = true;
PlannerRestrictionContext *plannerRestrictionContext =
context->plannerRestrictionContext;
RangeTblEntry *mostFilteredLocalRte =
MostFilteredRte(plannerRestrictionContext, rangeTableList,
&localTableRestrictList, localTable);
RangeTblEntry *mostFilteredDistributedRte =
MostFilteredRte(plannerRestrictionContext, rangeTableList,
&distributedTableRestrictList, !localTable);
List *requiredAttrNumbersForLocalRte =
RequiredAttrNumbersForRelation(mostFilteredLocalRte, context);
List *requiredAttrNumbersForDistriutedRte =
RequiredAttrNumbersForRelation(mostFilteredDistributedRte, context);
elog(DEBUG4, "Local relation with the most number of filters "
"on it: \"%s\"", get_rel_name(mostFilteredLocalRte->relid));
elog(DEBUG4, "Distributed relation with the most number of filters "
"on it: \"%s\"", get_rel_name(mostFilteredDistributedRte->relid));
if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PULL_LOCAL)
{
ReplaceRTERelationWithRteSubquery(mostFilteredLocalRte,
localTableRestrictList,
requiredAttrNumbersForLocalRte);
}
else if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PULL_DISTRIBUTED)
{
ReplaceRTERelationWithRteSubquery(mostFilteredDistributedRte,
distributedTableRestrictList,
requiredAttrNumbersForDistriutedRte);
}
else if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_AUTO)
{
bool localTableHasFilter = list_length(localTableRestrictList) > 0;
bool distributedTableHasFilter =
list_length(distributedTableRestrictList) > 0;
if (resultRelation && resultRelation->relid == mostFilteredLocalRte->relid &&
!mostFilteredLocalRte->inFromCl)
{
/*
* We cannot recursively plan result relation, we have to
* recursively plan the distributed table.
*
* TODO: A future improvement could be to pick the next most filtered
* local relation, if exists.
*/
ReplaceRTERelationWithRteSubquery(mostFilteredDistributedRte,
distributedTableRestrictList,
requiredAttrNumbersForDistriutedRte);
}
else if (localTableHasFilter || !distributedTableHasFilter)
{
/*
* First, favor recursively planning local table when it has a filter.
* The rationale is that local tables are small, and at least one filter
* they become even smaller. On each iteration, we pick the local table
* with the most filters (e.g., WHERE clause entries). Note that the filters
* don't need to be directly on the table in the query tree, instead we use
* Postgres' filters where filters can be pushed down tables via filters.
*
* Second, if a distributed table doesn't have a filter, we do not ever
* prefer recursively planning that. Instead, we recursively plan the
* local table, assuming that it is smaller.
*
* TODO: If we have better statistics on how many tuples each table returns
* considering the filters on them, we should pick the table with least
* tuples. Today, we do not have such an infrastructure.
*/
ReplaceRTERelationWithRteSubquery(mostFilteredLocalRte,
localTableRestrictList,
requiredAttrNumbersForLocalRte);
}
else
{
ReplaceRTERelationWithRteSubquery(mostFilteredDistributedRte,
distributedTableRestrictList,
requiredAttrNumbersForDistriutedRte);
}
}
else
{
elog(ERROR, "unexpected local table join policy: %d", LocalTableJoinPolicy);
}
}
}
/*
* RequiredAttrNumbersForRelation returns the required attribute numbers for
* the input RTE relation in order for the planning to succeed.
*
* The function could be optimized by not adding the columns that only appear
* WHERE clause as a filter (e.g., not a join clause).
*/
static List *
RequiredAttrNumbersForRelation(RangeTblEntry *relationRte,
RecursivePlanningContext *planningContext)
{
PlannerRestrictionContext *plannerRestrictionContext =
planningContext->plannerRestrictionContext;
/* TODO: Get rid of this hack, find relation restriction information directly */
PlannerRestrictionContext *filteredPlannerRestrictionContext =
FilterPlannerRestrictionForQuery(plannerRestrictionContext,
WrapRteRelationIntoSubquery(relationRte, NIL));
RelationRestrictionContext *relationRestrictionContext =
filteredPlannerRestrictionContext->relationRestrictionContext;
List *filteredRelationRestrictionList =
relationRestrictionContext->relationRestrictionList;
RelationRestriction *relationRestriction =
(RelationRestriction *) linitial(filteredRelationRestrictionList);
PlannerInfo *plannerInfo = relationRestriction->plannerInfo;
Query *queryToProcess = plannerInfo->parse;
int rteIndex = relationRestriction->index;
List *allVarsInQuery = pull_vars_of_level((Node *) queryToProcess, 0);
ListCell *varCell = NULL;
List *requiredAttrNumbers = NIL;
foreach(varCell, allVarsInQuery)
{
Var *var = (Var *) lfirst(varCell);
if (var->varno == rteIndex)
{
requiredAttrNumbers = list_append_unique_int(requiredAttrNumbers,
var->varattno);
}
}
return requiredAttrNumbers;
}
/*
* MostFilteredRte returns a range table entry which has the most filters
* on it along with the restrictions (e.g., fills **restrictionList).
*
* The function also gets a boolean localTable parameter, so the caller
* can choose to run the function for only local tables or distributed tables.
*/
static RangeTblEntry *
MostFilteredRte(PlannerRestrictionContext *plannerRestrictionContext,
List *rangeTableList, List **restrictionList,
bool localTable)
{
RangeTblEntry *mostFilteredLocalRte = NULL;
ListCell *rangeTableCell = NULL;
foreach(rangeTableCell, rangeTableList)
{
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
/* we're only interested in tables */
if (!(rangeTableEntry->rtekind == RTE_RELATION &&
rangeTableEntry->relkind == RELKIND_RELATION))
{
continue;
}
if (localTable && IsCitusTable(rangeTableEntry->relid))
{
continue;
}
if (!localTable && !IsCitusTable(rangeTableEntry->relid))
{
continue;
}
List *currentRestrictionList =
GetRestrictInfoListForRelation(rangeTableEntry,
plannerRestrictionContext, 1);
if (mostFilteredLocalRte == NULL ||
list_length(*restrictionList) < list_length(currentRestrictionList) ||
ContainsFalseClause(currentRestrictionList))
{
mostFilteredLocalRte = rangeTableEntry;
*restrictionList = currentRestrictionList;
}
}
return mostFilteredLocalRte;
}
/* /*
* ReplaceRTERelationWithRteSubquery replaces the input rte relation target entry * ReplaceRTERelationWithRteSubquery replaces the input rte relation target entry
* with a subquery. The function also pushes down the filters to the subquery. * with a subquery. The function also pushes down the filters to the subquery.
*/ */
static void void
ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, List *restrictionList, ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, List *restrictionList,
List *requiredAttrNumbers) List *requiredAttrNumbers)
{ {
@ -1648,56 +1384,6 @@ ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, List *restrict
} }
/*
* AllDataLocallyAccessible return true if all data for the relations in the
* rangeTableList is locally accessible.
*/
static bool
AllDataLocallyAccessible(List *rangeTableList)
{
ListCell *rangeTableCell = NULL;
foreach(rangeTableCell, rangeTableList)
{
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
/* we're only interested in tables */
if (!(rangeTableEntry->rtekind == RTE_RELATION &&
rangeTableEntry->relkind == RELKIND_RELATION))
{
continue;
}
Oid relationId = rangeTableEntry->relid;
if (!IsCitusTable(relationId))
{
/* local tables are locally accessible */
continue;
}
List *shardIntervalList = LoadShardIntervalList(relationId);
if (list_length(shardIntervalList) > 1)
{
/* we currently only consider single placement tables */
return false;
}
ShardInterval *shardInterval = linitial(shardIntervalList);
uint64 shardId = shardInterval->shardId;
ShardPlacement *localShardPlacement =
ShardPlacementOnGroup(shardId, GetLocalGroupId());
if (localShardPlacement == NULL)
{
/* the table doesn't have a placement on this node */
return false;
}
}
return true;
}
/* /*
* ContainsLocalTableDistributedTableJoin returns true if the input range table list * ContainsLocalTableDistributedTableJoin returns true if the input range table list
* contains a direct join between local and distributed tables. * contains a direct join between local and distributed tables.
@ -1714,8 +1400,8 @@ ContainsLocalTableDistributedTableJoin(List *rangeTableList)
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
/* we're only interested in tables */ /* we're only interested in tables */
if (!(rangeTableEntry->rtekind == RTE_RELATION && if (rangeTableEntry->rtekind != RTE_RELATION ||
rangeTableEntry->relkind == RELKIND_RELATION)) rangeTableEntry->relkind != RELKIND_RELATION)
{ {
continue; continue;
} }
@ -1729,14 +1415,9 @@ ContainsLocalTableDistributedTableJoin(List *rangeTableList)
{ {
containsLocalTable = true; containsLocalTable = true;
} }
if (containsLocalTable && containsDistributedTable)
{
return true;
}
} }
return false; return containsLocalTable && containsDistributedTable;
} }

View File

@ -200,8 +200,8 @@ static const struct config_enum_entry log_level_options[] = {
static const struct config_enum_entry local_table_join_policies[] = { static const struct config_enum_entry local_table_join_policies[] = {
{ "never", LOCAL_JOIN_POLICY_NEVER, false}, { "never", LOCAL_JOIN_POLICY_NEVER, false},
{ "pull-local", LOCAL_JOIN_POLICY_PULL_LOCAL, false}, { "prefer-local", LOCAL_JOIN_POLICY_PREFER_LOCAL, false},
{ "pull-distributed", LOCAL_JOIN_POLICY_PULL_DISTRIBUTED, false}, { "prefer-distributed", LOCAL_JOIN_POLICY_PREFER_DISTRIBUTED, false},
{ "auto", LOCAL_JOIN_POLICY_AUTO, false}, { "auto", LOCAL_JOIN_POLICY_AUTO, false},
{ NULL, 0, false} { NULL, 0, false}
}; };

View File

@ -70,6 +70,7 @@ extern List * PreprocessClusterStmt(Node *node, const char *clusterCommand);
/* index.c */ /* index.c */
typedef void (*PGIndexProcessor)(Form_pg_index, List **); typedef void (*PGIndexProcessor)(Form_pg_index, List **);
/* call.c */ /* call.c */
extern bool CallDistributedProcedureRemotely(CallStmt *callStmt, DestReceiver *dest); extern bool CallDistributedProcedureRemotely(CallStmt *callStmt, DestReceiver *dest);

View File

@ -0,0 +1,24 @@
/*-------------------------------------------------------------------------
*
* listutils.h
*
* Declarations for public utility functions related to lists.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef LOCAL_DISTRIBUTED_JOIN_PLANNER_H
#define LOCAL_DISTRIBUTED_JOIN_PLANNER_H
#include "postgres.h"
#include "distributed/recursive_planning.h"
extern void
ConvertLocalTableJoinsToSubqueries(Query *query,
RecursivePlanningContext *context);
#endif /* LOCAL_DISTRIBUTED_JOIN_PLANNER_H */

View File

@ -113,12 +113,9 @@ extern List * AllShardPlacementsOnNodeGroup(int32 groupId);
extern List * GroupShardPlacementsForTableOnGroup(Oid relationId, int32 groupId); extern List * GroupShardPlacementsForTableOnGroup(Oid relationId, int32 groupId);
extern StringInfo GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, extern StringInfo GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
char *sizeQuery); char *sizeQuery);
<<<<<<< HEAD
extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList); extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList);
=======
extern ShardPlacement * ShardPlacementOnGroup(uint64 shardId, int groupId); extern ShardPlacement * ShardPlacementOnGroup(uint64 shardId, int groupId);
>>>>>>> Handle modifications as well
/* Function declarations to modify shard and shard placement data */ /* Function declarations to modify shard and shard placement data */
extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType, extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType,

View File

@ -26,13 +26,27 @@
typedef enum typedef enum
{ {
LOCAL_JOIN_POLICY_NEVER = 0, LOCAL_JOIN_POLICY_NEVER = 0,
LOCAL_JOIN_POLICY_PULL_LOCAL = 1, LOCAL_JOIN_POLICY_PREFER_LOCAL = 1,
LOCAL_JOIN_POLICY_PULL_DISTRIBUTED = 2, LOCAL_JOIN_POLICY_PREFER_DISTRIBUTED = 2,
LOCAL_JOIN_POLICY_AUTO = 3, LOCAL_JOIN_POLICY_AUTO = 3,
} LocalJoinPolicy; } LocalJoinPolicy;
extern int LocalTableJoinPolicy; extern int LocalTableJoinPolicy;
/*
* RecursivePlanningContext is used to recursively plan subqueries
* and CTEs, pull results to the coordinator, and push it back into
* the workers.
*/
typedef struct RecursivePlanningContext
{
int level;
uint64 planId;
bool allDistributionKeysInQueryAreEqual; /* used for some optimizations */
List *subPlanList;
PlannerRestrictionContext *plannerRestrictionContext;
} RecursivePlanningContext;
extern List * GenerateSubplansForSubqueriesAndCTEs(uint64 planId, Query *originalQuery, extern List * GenerateSubplansForSubqueriesAndCTEs(uint64 planId, Query *originalQuery,
PlannerRestrictionContext * PlannerRestrictionContext *
@ -46,6 +60,9 @@ extern Query * BuildReadIntermediateResultsArrayQuery(List *targetEntryList,
bool useBinaryCopyFormat); bool useBinaryCopyFormat);
extern bool GeneratingSubplans(void); extern bool GeneratingSubplans(void);
extern bool ContainsLocalTableDistributedTableJoin(List *rangeTableList); extern bool ContainsLocalTableDistributedTableJoin(List *rangeTableList);
extern void ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry,
List *restrictionList,
List *requiredAttrNumbers);
#endif /* RECURSIVE_PLANNING_H */ #endif /* RECURSIVE_PLANNING_H */

View File

@ -15,15 +15,32 @@ SELECT create_distributed_table('distributed_table', 'key');
(1 row) (1 row)
CREATE TABLE distributed_table_pkey (key int primary key, value text, value_2 jsonb);
SELECT create_distributed_table('distributed_table_pkey', 'key');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE distributed_table_windex (key int, value text, value_2 jsonb);
SELECT create_distributed_table('distributed_table_windex', 'key');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE UNIQUE INDEX key_index ON distributed_table_windex (key);
SET client_min_messages TO DEBUG1; SET client_min_messages TO DEBUG1;
-- the user doesn't allow local / distributed table joinn -- the user doesn't allow local / distributed table joinn
SET citus.local_table_join_policy TO 'never'; SET citus.local_table_join_policy TO 'never';
SELECT count(*) FROM postgres_table JOIN distributed_table USING(key); SELECT count(*) FROM postgres_table JOIN distributed_table USING(key);
ERROR: relation postgres_table is not distributed ERROR: direct joins between distributed and local tables are not supported
HINT: Use CTE's or subqueries to select from local tables and use them in joins
SELECT count(*) FROM postgres_table JOIN reference_table USING(key); SELECT count(*) FROM postgres_table JOIN reference_table USING(key);
ERROR: relation postgres_table is not distributed ERROR: direct joins between distributed and local tables are not supported
HINT: Use CTE's or subqueries to select from local tables and use them in joins
-- the user prefers local table recursively planned -- the user prefers local table recursively planned
SET citus.local_table_join_policy TO 'pull-local'; SET citus.local_table_join_policy TO 'prefer-local';
SELECT count(*) FROM postgres_table JOIN distributed_table USING(key); SELECT count(*) FROM postgres_table JOIN distributed_table USING(key);
DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
@ -43,7 +60,7 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c
(1 row) (1 row)
-- the user prefers distributed table recursively planned -- the user prefers distributed table recursively planned
SET citus.local_table_join_policy TO 'pull-distributed'; SET citus.local_table_join_policy TO 'prefer-distributed';
SELECT count(*) FROM postgres_table JOIN distributed_table USING(key); SELECT count(*) FROM postgres_table JOIN distributed_table USING(key);
DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0 DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0 DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0
@ -65,8 +82,9 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c
-- update/delete -- update/delete
-- auto tests -- auto tests
-- switch back to the default policy, which is auto -- switch back to the default policy, which is auto
RESET citus.local_table_join_policy; SET citus.local_table_join_policy to 'auto';
-- on the default mode, the local tables should be recursively planned -- on the auto mode, the local tables should be recursively planned
-- unless a unique index exists in a column for distributed table
SELECT count(*) FROM distributed_table JOIN postgres_table USING(key); SELECT count(*) FROM distributed_table JOIN postgres_table USING(key);
DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
@ -94,20 +112,90 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c
0 0
(1 row) (1 row)
-- this is a contreversial part that we should discuss further -- a unique index on key so dist table should be recursively planned
-- if the distributed table has at least one filter, we prefer SELECT count(*) FROM postgres_table JOIN distributed_table_pkey USING(key);
-- recursively planning of the distributed table DEBUG: Wrapping local relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE true OFFSET 0
SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) WHERE distributed_table.value = 'test'; DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE true OFFSET 0
DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE (value OPERATOR(pg_catalog.=) 'test'::text) OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey USING (key))
DEBUG: generating subplan XXX_1 for subquery SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE (value OPERATOR(pg_catalog.=) 'test'::text) OFFSET 0 count
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table JOIN local_table_join.postgres_table USING (key)) WHERE (distributed_table.value OPERATOR(pg_catalog.=) 'test'::text) ---------------------------------------------------------------------
0
(1 row)
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey USING(value);
DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT NULL::integer AS key, value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
DEBUG: generating subplan XXX_1 for subquery SELECT NULL::integer AS key, value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table JOIN local_table_join.distributed_table_pkey USING (value))
count
---------------------------------------------------------------------
0
(1 row)
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON postgres_table.key = distributed_table_pkey.key;
DEBUG: Wrapping local relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE true OFFSET 0
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE true OFFSET 0
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey ON ((postgres_table.key OPERATOR(pg_catalog.=) distributed_table_pkey.key)))
count
---------------------------------------------------------------------
0
(1 row)
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10;
DEBUG: Wrapping local relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey ON ((distributed_table_pkey.key OPERATOR(pg_catalog.=) 10)))
count
---------------------------------------------------------------------
0
(1 row)
-- a unique index on key so dist table should be recursively planned
SELECT count(*) FROM postgres_table JOIN distributed_table_windex USING(key);
DEBUG: Wrapping local relation "distributed_table_windex" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE true OFFSET 0
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE true OFFSET 0
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_windex USING (key))
count
---------------------------------------------------------------------
0
(1 row)
SELECT count(*) FROM postgres_table JOIN distributed_table_windex USING(value);
DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT NULL::integer AS key, value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
DEBUG: generating subplan XXX_1 for subquery SELECT NULL::integer AS key, value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table JOIN local_table_join.distributed_table_windex USING (value))
count
---------------------------------------------------------------------
0
(1 row)
SELECT count(*) FROM postgres_table JOIN distributed_table_windex ON postgres_table.key = distributed_table_windex.key;
DEBUG: Wrapping local relation "distributed_table_windex" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE true OFFSET 0
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE true OFFSET 0
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_windex ON ((postgres_table.key OPERATOR(pg_catalog.=) distributed_table_windex.key)))
count
---------------------------------------------------------------------
0
(1 row)
SELECT count(*) FROM postgres_table JOIN distributed_table_windex ON distributed_table_windex.key = 10;
DEBUG: Wrapping local relation "distributed_table_windex" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_windex ON ((distributed_table_windex.key OPERATOR(pg_catalog.=) 10)))
count
---------------------------------------------------------------------
0
(1 row)
-- no unique index on value so local table should be recursively planned.
SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) WHERE distributed_table.value = 'test';
DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.distributed_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table USING (key)) WHERE (distributed_table.value OPERATOR(pg_catalog.=) 'test'::text)
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
0 0
(1 row) (1 row)
-- but if the filters can be pushed downn to the local table via the join
-- we are smart about recursively planning the local table
SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) WHERE distributed_table.key = 1; SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) WHERE distributed_table.key = 1;
DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 1) OFFSET 0 DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 1) OFFSET 0
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 1) OFFSET 0 DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 1) OFFSET 0
@ -117,7 +205,7 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c
0 0
(1 row) (1 row)
-- if both local and distributed tables have a filter, we prefer local -- if both local and distributed tables have a filter, we prefer local unless distributed table has unique indexes on any equality filter
SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) WHERE distributed_table.value = 'test' AND postgres_table.value = 'test'; SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) WHERE distributed_table.value = 'test' AND postgres_table.value = 'test';
DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (value OPERATOR(pg_catalog.=) 'test'::text) OFFSET 0 DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (value OPERATOR(pg_catalog.=) 'test'::text) OFFSET 0
DEBUG: generating subplan XXX_1 for subquery SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (value OPERATOR(pg_catalog.=) 'test'::text) OFFSET 0 DEBUG: generating subplan XXX_1 for subquery SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (value OPERATOR(pg_catalog.=) 'test'::text) OFFSET 0
@ -149,29 +237,24 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c
0 0
(1 row) (1 row)
-- if one of the distributed tables have a filter, we'll prefer recursive planning of it as well
-- it actually leads to a poor plan as we need to recursively plan local tables anyway as it is
-- joined with another distributed table
SELECT SELECT
count(*) count(*)
FROM FROM
distributed_table d1 JOIN postgres_table p1 USING(key) JOIN distributed_table d2 USING(key) JOIN postgres_table p2 USING(key) distributed_table d1 JOIN postgres_table p1 USING(key) JOIN distributed_table d2 USING(key) JOIN postgres_table p2 USING(key)
WHERE WHERE
d1.value = '1'; d1.value = '1';
DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table d1 WHERE (value OPERATOR(pg_catalog.=) '1'::text) OFFSET 0
DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p1 WHERE true OFFSET 0 DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p1 WHERE true OFFSET 0
DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p2 WHERE true OFFSET 0 DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p2 WHERE true OFFSET 0
DEBUG: generating subplan XXX_1 for subquery SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table d1 WHERE (value OPERATOR(pg_catalog.=) '1'::text) OFFSET 0 DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p1 WHERE true OFFSET 0
DEBUG: generating subplan XXX_2 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p1 WHERE true OFFSET 0 DEBUG: generating subplan XXX_2 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p2 WHERE true OFFSET 0
DEBUG: generating subplan XXX_3 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p2 WHERE true OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((local_table_join.distributed_table d1 JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) p1 USING (key)) JOIN local_table_join.distributed_table d2 USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) p2 USING (key)) WHERE (d1.value OPERATOR(pg_catalog.=) '1'::text)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) d1 JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) p1 USING (key)) JOIN local_table_join.distributed_table d2 USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) p2 USING (key)) WHERE (d1.value OPERATOR(pg_catalog.=) '1'::text)
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
0 0
(1 row) (1 row)
-- if the filter is on the JOIN key, we can recursively plan the local -- if the filter is on the JOIN key, we can recursively plan the local
-- tables as filters are pushded down to the local tables -- tables as filters are pushed down to the local tables
SELECT SELECT
count(*) count(*)
FROM FROM
@ -188,6 +271,7 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c
0 0
(1 row) (1 row)
SET citus.local_table_join_policy to 'auto';
-- we can support modification queries as well -- we can support modification queries as well
UPDATE UPDATE
postgres_table postgres_table
@ -211,6 +295,120 @@ WHERE
DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.distributed_table SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table WHERE (distributed_table.key OPERATOR(pg_catalog.=) postgres_table.key) DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.distributed_table SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table WHERE (distributed_table.key OPERATOR(pg_catalog.=) postgres_table.key)
UPDATE
distributed_table_pkey
SET
value = 'test'
FROM
postgres_table
WHERE
distributed_table_pkey.key = postgres_table.key;
DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.distributed_table_pkey SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) postgres_table.key)
UPDATE
distributed_table_windex
SET
value = 'test'
FROM
postgres_table
WHERE
distributed_table_windex.key = postgres_table.key;
DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.distributed_table_windex SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table WHERE (distributed_table_windex.key OPERATOR(pg_catalog.=) postgres_table.key)
-- in case of update/delete we always recursively plan
-- the tables other than target table no matter what the policy is
SET citus.local_table_join_policy TO 'prefer-local';
UPDATE
postgres_table
SET
value = 'test'
FROM
distributed_table
WHERE
distributed_table.key = postgres_table.key;
DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0
DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.postgres_table SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table WHERE (distributed_table.key OPERATOR(pg_catalog.=) postgres_table.key)
UPDATE
distributed_table
SET
value = 'test'
FROM
postgres_table
WHERE
distributed_table.key = postgres_table.key;
DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.distributed_table SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table WHERE (distributed_table.key OPERATOR(pg_catalog.=) postgres_table.key)
UPDATE
distributed_table_pkey
SET
value = 'test'
FROM
postgres_table
WHERE
distributed_table_pkey.key = postgres_table.key;
DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.distributed_table_pkey SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) postgres_table.key)
UPDATE
distributed_table_windex
SET
value = 'test'
FROM
postgres_table
WHERE
distributed_table_windex.key = postgres_table.key;
DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.distributed_table_windex SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table WHERE (distributed_table_windex.key OPERATOR(pg_catalog.=) postgres_table.key)
SET citus.local_table_join_policy TO 'prefer-distributed';
UPDATE
postgres_table
SET
value = 'test'
FROM
distributed_table
WHERE
distributed_table.key = postgres_table.key;
DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0
DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.postgres_table SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table WHERE (distributed_table.key OPERATOR(pg_catalog.=) postgres_table.key)
UPDATE
distributed_table
SET
value = 'test'
FROM
postgres_table
WHERE
distributed_table.key = postgres_table.key;
DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.distributed_table SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table WHERE (distributed_table.key OPERATOR(pg_catalog.=) postgres_table.key)
UPDATE
distributed_table_pkey
SET
value = 'test'
FROM
postgres_table
WHERE
distributed_table_pkey.key = postgres_table.key;
DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.distributed_table_pkey SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) postgres_table.key)
UPDATE
distributed_table_windex
SET
value = 'test'
FROM
postgres_table
WHERE
distributed_table_windex.key = postgres_table.key;
DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.distributed_table_windex SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table WHERE (distributed_table_windex.key OPERATOR(pg_catalog.=) postgres_table.key)
-- modifications with multiple tables -- modifications with multiple tables
UPDATE UPDATE
distributed_table distributed_table
@ -254,4 +452,4 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_j
\set VERBOSITY terse \set VERBOSITY terse
RESET client_min_messages; RESET client_min_messages;
DROP SCHEMA local_table_join CASCADE; DROP SCHEMA local_table_join CASCADE;
NOTICE: drop cascades to 3 other objects NOTICE: drop cascades to 5 other objects

View File

@ -25,7 +25,7 @@ SELECT create_distributed_table('distributed_table', 'key');
SET client_min_messages TO DEBUG1; SET client_min_messages TO DEBUG1;
-- for the purposes of these tests, we always want to recursively -- for the purposes of these tests, we always want to recursively
-- plan local tables. -- plan local tables.
SET citus.local_table_join_policy TO "pull-local"; SET citus.local_table_join_policy TO "prefer-local";
-- there are no filters, hence cannot pushdown any filters -- there are no filters, hence cannot pushdown any filters
SELECT count(*) SELECT count(*)
FROM distributed_table u1 FROM distributed_table u1

View File

@ -8,6 +8,13 @@ SELECT create_reference_table('reference_table');
CREATE TABLE distributed_table (key int, value text, value_2 jsonb); CREATE TABLE distributed_table (key int, value text, value_2 jsonb);
SELECT create_distributed_table('distributed_table', 'key'); SELECT create_distributed_table('distributed_table', 'key');
CREATE TABLE distributed_table_pkey (key int primary key, value text, value_2 jsonb);
SELECT create_distributed_table('distributed_table_pkey', 'key');
CREATE TABLE distributed_table_windex (key int, value text, value_2 jsonb);
SELECT create_distributed_table('distributed_table_windex', 'key');
CREATE UNIQUE INDEX key_index ON distributed_table_windex (key);
SET client_min_messages TO DEBUG1; SET client_min_messages TO DEBUG1;
@ -17,13 +24,13 @@ SELECT count(*) FROM postgres_table JOIN distributed_table USING(key);
SELECT count(*) FROM postgres_table JOIN reference_table USING(key); SELECT count(*) FROM postgres_table JOIN reference_table USING(key);
-- the user prefers local table recursively planned -- the user prefers local table recursively planned
SET citus.local_table_join_policy TO 'pull-local'; SET citus.local_table_join_policy TO 'prefer-local';
SELECT count(*) FROM postgres_table JOIN distributed_table USING(key); SELECT count(*) FROM postgres_table JOIN distributed_table USING(key);
SELECT count(*) FROM postgres_table JOIN reference_table USING(key); SELECT count(*) FROM postgres_table JOIN reference_table USING(key);
-- the user prefers distributed table recursively planned -- the user prefers distributed table recursively planned
SET citus.local_table_join_policy TO 'pull-distributed'; SET citus.local_table_join_policy TO 'prefer-distributed';
SELECT count(*) FROM postgres_table JOIN distributed_table USING(key); SELECT count(*) FROM postgres_table JOIN distributed_table USING(key);
SELECT count(*) FROM postgres_table JOIN reference_table USING(key); SELECT count(*) FROM postgres_table JOIN reference_table USING(key);
@ -32,26 +39,34 @@ SELECT count(*) FROM postgres_table JOIN reference_table USING(key);
-- auto tests -- auto tests
-- switch back to the default policy, which is auto -- switch back to the default policy, which is auto
RESET citus.local_table_join_policy; SET citus.local_table_join_policy to 'auto';
-- on the default mode, the local tables should be recursively planned -- on the auto mode, the local tables should be recursively planned
-- unless a unique index exists in a column for distributed table
SELECT count(*) FROM distributed_table JOIN postgres_table USING(key); SELECT count(*) FROM distributed_table JOIN postgres_table USING(key);
SELECT count(*) FROM reference_table JOIN postgres_table USING(key); SELECT count(*) FROM reference_table JOIN postgres_table USING(key);
SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) JOIN reference_table USING (key); SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) JOIN reference_table USING (key);
-- a unique index on key so dist table should be recursively planned
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey USING(key);
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey USING(value);
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON postgres_table.key = distributed_table_pkey.key;
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10;
-- this is a contreversial part that we should discuss further -- a unique index on key so dist table should be recursively planned
-- if the distributed table has at least one filter, we prefer SELECT count(*) FROM postgres_table JOIN distributed_table_windex USING(key);
-- recursively planning of the distributed table SELECT count(*) FROM postgres_table JOIN distributed_table_windex USING(value);
SELECT count(*) FROM postgres_table JOIN distributed_table_windex ON postgres_table.key = distributed_table_windex.key;
SELECT count(*) FROM postgres_table JOIN distributed_table_windex ON distributed_table_windex.key = 10;
-- no unique index on value so local table should be recursively planned.
SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) WHERE distributed_table.value = 'test'; SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) WHERE distributed_table.value = 'test';
-- but if the filters can be pushed downn to the local table via the join
-- we are smart about recursively planning the local table
SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) WHERE distributed_table.key = 1; SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) WHERE distributed_table.key = 1;
-- if both local and distributed tables have a filter, we prefer local -- if both local and distributed tables have a filter, we prefer local unless distributed table has unique indexes on any equality filter
SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) WHERE distributed_table.value = 'test' AND postgres_table.value = 'test'; SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) WHERE distributed_table.value = 'test' AND postgres_table.value = 'test';
SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) WHERE distributed_table.value = 'test' OR postgres_table.value = 'test'; SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) WHERE distributed_table.value = 'test' OR postgres_table.value = 'test';
@ -61,9 +76,6 @@ SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) WHERE dist
SELECT count(*) FROM distributed_table d1 JOIN postgres_table p1 USING(key) JOIN distributed_table d2 USING(key) JOIN postgres_table p2 USING(key); SELECT count(*) FROM distributed_table d1 JOIN postgres_table p1 USING(key) JOIN distributed_table d2 USING(key) JOIN postgres_table p2 USING(key);
-- if one of the distributed tables have a filter, we'll prefer recursive planning of it as well
-- it actually leads to a poor plan as we need to recursively plan local tables anyway as it is
-- joined with another distributed table
SELECT SELECT
count(*) count(*)
FROM FROM
@ -72,7 +84,7 @@ WHERE
d1.value = '1'; d1.value = '1';
-- if the filter is on the JOIN key, we can recursively plan the local -- if the filter is on the JOIN key, we can recursively plan the local
-- tables as filters are pushded down to the local tables -- tables as filters are pushed down to the local tables
SELECT SELECT
count(*) count(*)
FROM FROM
@ -81,6 +93,8 @@ WHERE
d1.key = 1; d1.key = 1;
SET citus.local_table_join_policy to 'auto';
-- we can support modification queries as well -- we can support modification queries as well
UPDATE UPDATE
postgres_table postgres_table
@ -101,6 +115,106 @@ FROM
WHERE WHERE
distributed_table.key = postgres_table.key; distributed_table.key = postgres_table.key;
UPDATE
distributed_table_pkey
SET
value = 'test'
FROM
postgres_table
WHERE
distributed_table_pkey.key = postgres_table.key;
UPDATE
distributed_table_windex
SET
value = 'test'
FROM
postgres_table
WHERE
distributed_table_windex.key = postgres_table.key;
-- in case of update/delete we always recursively plan
-- the tables other than target table no matter what the policy is
SET citus.local_table_join_policy TO 'prefer-local';
UPDATE
postgres_table
SET
value = 'test'
FROM
distributed_table
WHERE
distributed_table.key = postgres_table.key;
UPDATE
distributed_table
SET
value = 'test'
FROM
postgres_table
WHERE
distributed_table.key = postgres_table.key;
UPDATE
distributed_table_pkey
SET
value = 'test'
FROM
postgres_table
WHERE
distributed_table_pkey.key = postgres_table.key;
UPDATE
distributed_table_windex
SET
value = 'test'
FROM
postgres_table
WHERE
distributed_table_windex.key = postgres_table.key;
SET citus.local_table_join_policy TO 'prefer-distributed';
UPDATE
postgres_table
SET
value = 'test'
FROM
distributed_table
WHERE
distributed_table.key = postgres_table.key;
UPDATE
distributed_table
SET
value = 'test'
FROM
postgres_table
WHERE
distributed_table.key = postgres_table.key;
UPDATE
distributed_table_pkey
SET
value = 'test'
FROM
postgres_table
WHERE
distributed_table_pkey.key = postgres_table.key;
UPDATE
distributed_table_windex
SET
value = 'test'
FROM
postgres_table
WHERE
distributed_table_windex.key = postgres_table.key;
-- modifications with multiple tables -- modifications with multiple tables
UPDATE UPDATE
distributed_table distributed_table

View File

@ -26,7 +26,7 @@ SET client_min_messages TO DEBUG1;
-- for the purposes of these tests, we always want to recursively -- for the purposes of these tests, we always want to recursively
-- plan local tables. -- plan local tables.
SET citus.local_table_join_policy TO "pull-local"; SET citus.local_table_join_policy TO "prefer-local";
-- there are no filters, hence cannot pushdown any filters -- there are no filters, hence cannot pushdown any filters