Use BaseRestrictInfo for finding equality columns

Baseinfo also has pushed down filters etc, so it makes more sense to use
BaseRestrictInfo to determine what columns have constant equality
filters.

Also RteIdentity is used for removing conversion candidates instead of
rteIndex.
pull/4358/head
Sait Talha Nisanci 2020-12-07 16:55:15 +03:00
parent 28c5b6a425
commit 5618f3a3fc
24 changed files with 589 additions and 333 deletions

View File

@ -249,6 +249,7 @@ CreateIndexStmtGetSchemaId(IndexStmt *createIndexStatement)
return namespaceId;
}
/*
* ExecuteFunctionOnEachTableIndex executes the given pgIndexProcessor function on each
* index of the given relation.

View File

@ -368,22 +368,22 @@ CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags)
RebuildQueryStrings(workerJob);
}
if (workerJob->onDummyPlacement) {
/* if this job is on a dummy placement, then it doesn't operate on
an actual shard placement */
return;
/* We skip shard related things if the job contains only local tables */
if (!OnlyLocalTableJob(workerJob))
{
/*
* Now that we know the shard ID(s) we can acquire the necessary shard metadata
* locks. Once we have the locks it's safe to load the placement metadata.
*/
/* prevent concurrent placement changes */
AcquireMetadataLocks(workerJob->taskList);
/* modify tasks are always assigned using first-replica policy */
workerJob->taskList = FirstReplicaAssignTaskList(workerJob->taskList);
}
/*
* Now that we know the shard ID(s) we can acquire the necessary shard metadata
* locks. Once we have the locks it's safe to load the placement metadata.
*/
/* prevent concurrent placement changes */
AcquireMetadataLocks(workerJob->taskList);
/* modify tasks are always assigned using first-replica policy */
workerJob->taskList = FirstReplicaAssignTaskList(workerJob->taskList);
/*
* Now that we have populated the task placements we can determine whether
@ -544,10 +544,12 @@ RegenerateTaskForFasthPathQuery(Job *workerJob)
shardId = GetAnchorShardId(shardIntervalList);
}
bool containsOnlyLocalTable = false;
GenerateSingleShardRouterTaskList(workerJob,
relationShardList,
placementList,
shardId);
shardId,
containsOnlyLocalTable);
}

View File

@ -410,7 +410,7 @@ AdjustPartitioningForDistributedPlanning(List *rangeTableList,
/*
* We want Postgres to behave partitioned tables as regular relations
* (i.e. we do not want to expand them to their partitions). To do this
* we set each distributed partitioned table's inh flag to appropriate
* we set each partitioned table's inh flag to appropriate
* value before and after dropping to the standart_planner.
*/
if (rangeTableEntry->rtekind == RTE_RELATION &&

View File

@ -789,7 +789,8 @@ RouterModifyTaskForShardInterval(Query *originalQuery,
&relationShardList,
&prunedShardIntervalListList,
replacePrunedQueryWithDummy,
&multiShardModifyQuery, NULL);
&multiShardModifyQuery, NULL,
false);
Assert(!multiShardModifyQuery);

View File

@ -63,8 +63,7 @@ int LocalTableJoinPolicy = LOCAL_JOIN_POLICY_AUTO;
typedef struct RangeTableEntryDetails
{
RangeTblEntry *rangeTableEntry;
Index rteIndex;
List *restrictionList;
int rteIdentity;
List *requiredAttributeNumbers;
bool hasConstantFilterOnUniqueColumn;
} RangeTableEntryDetails;
@ -75,9 +74,8 @@ typedef struct ConversionCandidates
List *localTableList; /* local or citus local table */
}ConversionCandidates;
static bool HasConstantFilterOnUniqueColumn(FromExpr *joinTree,
RangeTblEntry *rangeTableEntry, Index
rteIndex);
static bool HasConstantFilterOnUniqueColumn(RangeTblEntry *rangeTableEntry,
RelationRestriction *relationRestriction);
static List * RequiredAttrNumbersForRelation(RangeTblEntry *relationRte,
PlannerRestrictionContext *
plannerRestrictionContext);
@ -92,8 +90,8 @@ static RangeTableEntryDetails * GetNextRTEToConvertToSubquery(FromExpr *joinTree
conversionCandidates,
PlannerRestrictionContext *
plannerRestrictionContext);
static void RemoveFromConversionCandidates(ConversionCandidates *conversionCandidates, Oid
relationId);
static void RemoveFromConversionCandidates(ConversionCandidates *conversionCandidates,
int rteIdentity);
static bool AllRangeTableEntriesHaveUniqueIndex(List *rangeTableEntryDetailsList);
/*
@ -103,17 +101,16 @@ static bool AllRangeTableEntriesHaveUniqueIndex(List *rangeTableEntryDetailsList
*/
void
RecursivelyPlanLocalTableJoins(Query *query,
RecursivePlanningContext *context, List* rangeTableList)
RecursivePlanningContext *context, List *rangeTableList)
{
PlannerRestrictionContext *plannerRestrictionContext =
context->plannerRestrictionContext;
GetPlannerRestrictionContext(context);
Oid resultRelationId = InvalidOid;
if (IsModifyCommand(query))
{
resultRelationId = ModifyQueryResultRelationId(query);
}
}
ConversionCandidates *conversionCandidates =
CreateConversionCandidates(query->jointree, plannerRestrictionContext,
rangeTableList, resultRelationId);
@ -131,15 +128,15 @@ RecursivelyPlanLocalTableJoins(Query *query,
}
RangeTblEntry *rangeTableEntry = rangeTableEntryDetails->rangeTableEntry;
Oid relId = rangeTableEntryDetails->rangeTableEntry->relid;
List *restrictionList = rangeTableEntryDetails->restrictionList;
List *requiredAttributeNumbers = rangeTableEntryDetails->requiredAttributeNumbers;
ReplaceRTERelationWithRteSubquery(rangeTableEntry, restrictionList,
ReplaceRTERelationWithRteSubquery(rangeTableEntry,
requiredAttributeNumbers, context);
RemoveFromConversionCandidates(conversionCandidates, relId);
int rteIdentity = rangeTableEntryDetails->rteIdentity;
RemoveFromConversionCandidates(conversionCandidates, rteIdentity);
}
}
/*
* GetNextRTEToConvertToSubquery returns the range table entry
* which should be converted to a subquery. It considers the local join policy
@ -211,18 +208,33 @@ AllRangeTableEntriesHaveUniqueIndex(List *rangeTableEntryDetailsList)
* the relevant list based on the relation id.
*/
static void
RemoveFromConversionCandidates(ConversionCandidates *conversionCandidates, Oid relationId)
RemoveFromConversionCandidates(ConversionCandidates *conversionCandidates, int
rteIdentity)
{
if (IsRelationLocalTableOrMatView(relationId))
RangeTableEntryDetails *rangeTableEntryDetails = NULL;
foreach_ptr(rangeTableEntryDetails, conversionCandidates->localTableList)
{
conversionCandidates->localTableList =
list_delete_first(conversionCandidates->localTableList);
if (rangeTableEntryDetails->rteIdentity == rteIdentity)
{
conversionCandidates->localTableList =
list_delete_ptr(conversionCandidates->localTableList,
rangeTableEntryDetails);
return;
}
}
else
foreach_ptr(rangeTableEntryDetails, conversionCandidates->distributedTableList)
{
conversionCandidates->distributedTableList =
list_delete_first(conversionCandidates->distributedTableList);
if (rangeTableEntryDetails->rteIdentity == rteIdentity)
{
conversionCandidates->distributedTableList =
list_delete_ptr(conversionCandidates->distributedTableList,
rangeTableEntryDetails);
return;
}
}
ereport(ERROR, (errmsg("invalid rte index is given :%d", rteIdentity)));
}
@ -241,7 +253,7 @@ ShouldConvertLocalTableJoinsToSubqueries(Query *query, List *rangeTableList,
return false;
}
if (!ContainsTableToBeConvertedToSubquery(rangeTableList))
if (!ContainsTableToBeConvertedToSubquery(rangeTableList))
{
return false;
}
@ -251,7 +263,7 @@ ShouldConvertLocalTableJoinsToSubqueries(Query *query, List *rangeTableList,
if (IsRouterPlannable(query, plannerRestrictionContext))
{
ereport(DEBUG1, (errmsg("local-distributed table joins will not be converted, "
"as the query is router plannable")));
"as the query is router plannable")));
return false;
}
return true;
@ -263,32 +275,22 @@ ShouldConvertLocalTableJoinsToSubqueries(Query *query, List *rangeTableList,
* filter on a unique column.
*/
static bool
HasConstantFilterOnUniqueColumn(FromExpr *joinTree, RangeTblEntry *rangeTableEntry, Index
rteIndex)
HasConstantFilterOnUniqueColumn(RangeTblEntry *rangeTableEntry,
RelationRestriction *relationRestriction)
{
if (rangeTableEntry == NULL)
{
return false;
}
List *baseRestrictionList = relationRestriction->relOptInfo->baserestrictinfo;
List *restrictClauseList = get_all_actual_clauses(baseRestrictionList);
List *rteEqualityQuals =
FetchEqualityAttrNumsForRTEFromQuals(joinTree->quals, rteIndex);
FetchEqualityAttrNumsForRTE((Node *) restrictClauseList);
Node *join = NULL;
foreach_ptr(join, joinTree->fromlist)
{
if (IsA(join, JoinExpr))
{
JoinExpr *joinExpr = (JoinExpr *) join;
List *joinExprEqualityQuals =
FetchEqualityAttrNumsForRTEFromQuals(joinExpr->quals, rteIndex);
rteEqualityQuals = list_concat(rteEqualityQuals, joinExprEqualityQuals);
}
}
List *uniqueIndexes = ExecuteFunctionOnEachTableIndex(rangeTableEntry->relid,
GetAllUniqueIndexes);
List *uniqueIndexAttrNumbers = ExecuteFunctionOnEachTableIndex(rangeTableEntry->relid,
GetAllUniqueIndexes);
int columnNumber = 0;
foreach_int(columnNumber, uniqueIndexes)
foreach_int(columnNumber, uniqueIndexAttrNumbers)
{
if (list_member_int(rteEqualityQuals, columnNumber))
{
@ -327,38 +329,28 @@ GetAllUniqueIndexes(Form_pg_index indexForm, List **uniqueIndexes)
* WHERE clause as a filter (e.g., not a join clause).
*/
static List *
RequiredAttrNumbersForRelation(RangeTblEntry *relationRte,
RequiredAttrNumbersForRelation(RangeTblEntry *rangeTableEntry,
PlannerRestrictionContext *plannerRestrictionContext)
{
int rteIdentity = GetRTEIdentity(relationRte);
RelationRestrictionContext *relationRestrictionContext =
plannerRestrictionContext->relationRestrictionContext;
Relids queryRteIdentities = bms_make_singleton(rteIdentity);
RelationRestrictionContext *filteredRelationRestrictionContext =
FilterRelationRestrictionContext(relationRestrictionContext, queryRteIdentities);
List *filteredRelationRestrictionList =
filteredRelationRestrictionContext->relationRestrictionList;
RelationRestriction *relationRestriction =
RelationRestrictionForRelation(rangeTableEntry, plannerRestrictionContext);
if (list_length(filteredRelationRestrictionList) != 1)
if (relationRestriction == NULL)
{
return NIL;
}
RelationRestriction *relationRestriction =
(RelationRestriction *) linitial(filteredRelationRestrictionList);
PlannerInfo *plannerInfo = relationRestriction->plannerInfo;
Query *queryToProcess = plannerInfo->parse;
int rteIndex = relationRestriction->index;
List *allVarsInQuery = pull_vars_of_level((Node *) queryToProcess, 0);
ListCell *varCell = NULL;
List *requiredAttrNumbers = NIL;
foreach(varCell, allVarsInQuery)
Var *var = NULL;
foreach_ptr(var, allVarsInQuery)
{
Var *var = (Var *) lfirst(varCell);
if (var->varno == rteIndex)
{
requiredAttrNumbers = list_append_unique_int(requiredAttrNumbers,
@ -379,15 +371,12 @@ CreateConversionCandidates(FromExpr *joinTree,
PlannerRestrictionContext *plannerRestrictionContext,
List *rangeTableList, Oid resultRelationId)
{
ConversionCandidates *conversionCandidates = palloc0(
sizeof(ConversionCandidates));
ConversionCandidates *conversionCandidates =
palloc0(sizeof(ConversionCandidates));
int rteIndex = 0;
RangeTblEntry *rangeTableEntry = NULL;
foreach_ptr(rangeTableEntry, rangeTableList)
{
rteIndex++;
/* we're only interested in tables */
if (!IsRecursivelyPlannableRelation(rangeTableEntry))
{
@ -400,23 +389,27 @@ CreateConversionCandidates(FromExpr *joinTree,
continue;
}
RelationRestriction *relationRestriction =
RelationRestrictionForRelation(rangeTableEntry, plannerRestrictionContext);
if (relationRestriction == NULL)
{
continue;
}
int rteIdentity = GetRTEIdentity(rangeTableEntry);
RangeTableEntryDetails *rangeTableEntryDetails =
palloc0(sizeof(RangeTableEntryDetails));
rangeTableEntryDetails->rangeTableEntry = rangeTableEntry;
rangeTableEntryDetails->rteIndex = rteIndex;
rangeTableEntryDetails->restrictionList = GetRestrictInfoListForRelation(
rangeTableEntry, plannerRestrictionContext);
rangeTableEntryDetails->requiredAttributeNumbers = RequiredAttrNumbersForRelation(
rangeTableEntry, plannerRestrictionContext);
rangeTableEntryDetails->hasConstantFilterOnUniqueColumn =
HasConstantFilterOnUniqueColumn(joinTree,
rangeTableEntry,
rteIndex);
bool referenceOrDistributedTable = IsCitusTableType(rangeTableEntry->relid,
REFERENCE_TABLE) ||
IsCitusTableType(rangeTableEntry->relid,
DISTRIBUTED_TABLE);
rangeTableEntryDetails->rangeTableEntry = rangeTableEntry;
rangeTableEntryDetails->rteIdentity = rteIdentity;
rangeTableEntryDetails->requiredAttributeNumbers =
RequiredAttrNumbersForRelation(rangeTableEntry, plannerRestrictionContext);
rangeTableEntryDetails->hasConstantFilterOnUniqueColumn =
HasConstantFilterOnUniqueColumn(rangeTableEntry, relationRestriction);
bool referenceOrDistributedTable =
IsCitusTableType(rangeTableEntry->relid, REFERENCE_TABLE) ||
IsCitusTableType(rangeTableEntry->relid, DISTRIBUTED_TABLE);
if (referenceOrDistributedTable)
{
conversionCandidates->distributedTableList =

View File

@ -340,7 +340,13 @@ IsCitusTableRTE(Node *node)
bool
IsDistributedOrReferenceTableRTE(Node *node)
{
return IsDistributedTableRTE(node) || IsReferenceTableRTE(node);
Oid relationId = NodeTryGetRteRelid(node);
if (!OidIsValid(relationId))
{
return false;
}
return IsCitusTableType(relationId, DISTRIBUTED_TABLE) ||
IsCitusTableType(relationId, REFERENCE_TABLE);
}

View File

@ -234,9 +234,9 @@ static StringInfo ColumnTypeArrayString(List *targetEntryList);
static bool CoPlacedShardIntervals(ShardInterval *firstInterval,
ShardInterval *secondInterval);
static List * FetchEqualityAttrNumsForRTEOpExpr(OpExpr *opExpr, Index rteIndex);
static List * FetchEqualityAttrNumsForRTEBoolExpr(BoolExpr *boolExpr, Index rteIndex);
static List * FetchEqualityAttrNumsForRTEOpExpr(OpExpr *opExpr);
static List * FetchEqualityAttrNumsForRTEBoolExpr(BoolExpr *boolExpr);
static List * FetchEqualityAttrNumsForList(List *nodeList);
#if PG_VERSION_NUM >= PG_VERSION_13
static List * GetColumnOriginalIndexes(Oid relationId);
#endif
@ -271,6 +271,27 @@ CreatePhysicalDistributedPlan(MultiTreeRoot *multiTree,
}
/*
* OnlyLocalTableJob true if the given task contains
* only postgres tables
*/
bool
OnlyLocalTableJob(Job *job)
{
if (job == NULL)
{
return false;
}
List *taskList = job->taskList;
if (list_length(taskList) != 1)
{
return false;
}
Task *singleTask = (Task *) linitial(taskList);
return singleTask->containsOnlyLocalTable;
}
/*
* BuildJobTree builds the physical job tree from the given logical plan tree.
* The function walks over the logical plan from the bottom up, finds boundaries
@ -2113,9 +2134,10 @@ BuildJobTreeTaskList(Job *jobTree, PlannerRestrictionContext *plannerRestriction
prunedRelationShardList, READ_TASK,
false,
&deferredErrorMessage);
if (deferredErrorMessage != NULL) {
if (deferredErrorMessage != NULL)
{
RaiseDeferredErrorInternal(deferredErrorMessage, ERROR);
}
}
}
else
{
@ -2209,10 +2231,10 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
if (list_length(relationRestrictionContext->relationRestrictionList) == 0)
{
*planningError = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"cannot handle complex subqueries when the "
"router executor is disabled",
NULL, NULL);
return NIL;
"cannot handle complex subqueries when the "
"router executor is disabled",
NULL, NULL);
return NIL;
}
/* defaults to be used if this is a reference table-only query */
@ -2238,9 +2260,9 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
if (shardCount > 0 && shardCount != cacheEntry->shardIntervalArrayLength)
{
*planningError = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"shard counts of co-located tables do not "
"match",
NULL, NULL);
"shard counts of co-located tables do not "
"match",
NULL, NULL);
return NIL;
}
@ -2306,9 +2328,10 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
taskType,
modifyRequiresCoordinatorEvaluation,
planningError);
if (*planningError != NULL) {
if (*planningError != NULL)
{
return NIL;
}
}
subqueryTask->jobId = jobId;
sqlTaskList = lappend(sqlTaskList, subqueryTask);
@ -2565,11 +2588,11 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
if (list_length(taskPlacementList) == 0)
{
*planningError = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"cannot find a worker that has active placements for all "
"shards in the query",
NULL, NULL);
"cannot find a worker that has active placements for all "
"shards in the query",
NULL, NULL);
return NULL;
return NULL;
}
/*
@ -3615,26 +3638,58 @@ NodeIsRangeTblRefReferenceTable(Node *node, List *rangeTableList)
/*
* FetchEqualityAttrNumsForRTEFromQuals fetches the attribute numbers from quals
* FetchEqualityAttrNumsForRTE fetches the attribute numbers from quals
* which:
* - has equality operator
* - belongs to rangeTableEntry with rteIndex
*/
List *
FetchEqualityAttrNumsForRTEFromQuals(Node *quals, Index rteIndex)
FetchEqualityAttrNumsForRTE(Node *node)
{
if (quals == NULL)
if (node == NULL)
{
return NIL;
}
if (IsA(quals, OpExpr))
if (IsA(node, List))
{
return FetchEqualityAttrNumsForRTEOpExpr((OpExpr *) quals, rteIndex);
return FetchEqualityAttrNumsForList((List *) node);
}
else if (IsA(quals, BoolExpr))
else if (IsA(node, OpExpr))
{
return FetchEqualityAttrNumsForRTEBoolExpr((BoolExpr *) quals, rteIndex);
return FetchEqualityAttrNumsForRTEOpExpr((OpExpr *) node);
}
else if (IsA(node, BoolExpr))
{
return FetchEqualityAttrNumsForRTEBoolExpr((BoolExpr *) node);
}
return NIL;
}
/*
* FetchEqualityAttrNumsForList fetches the constant equality numbers
* from the given node list.
*/
static List *FetchEqualityAttrNumsForList(List *nodeList)
{
List *attributeNums = NIL;
Node *node = NULL;
bool hasAtLeastOneEquality = false;
foreach_ptr(node, nodeList)
{
List *fetchedEqualityAttrNums =
FetchEqualityAttrNumsForRTE(node);
hasAtLeastOneEquality |= list_length(fetchedEqualityAttrNums) > 0;
attributeNums = list_concat(attributeNums, fetchedEqualityAttrNums);
}
/*
* the given list is in the form of AND'ed expressions
* hence if we have one equality then it is enough.
* E.g: dist.a = 5 AND dist.a > 10
*/
if (hasAtLeastOneEquality)
{
return attributeNums;
}
return NIL;
}
@ -3647,7 +3702,7 @@ FetchEqualityAttrNumsForRTEFromQuals(Node *quals, Index rteIndex)
* - belongs to rangeTableEntry with rteIndex
*/
static List *
FetchEqualityAttrNumsForRTEOpExpr(OpExpr *opExpr, Index rteIndex)
FetchEqualityAttrNumsForRTEOpExpr(OpExpr *opExpr)
{
if (!OperatorImplementsEquality(opExpr->opno))
{
@ -3656,7 +3711,7 @@ FetchEqualityAttrNumsForRTEOpExpr(OpExpr *opExpr, Index rteIndex)
List *attributeNums = NIL;
Var *var = NULL;
if (VarConstOpExprClause(opExpr, &var, NULL) && var->varno == rteIndex)
if (VarConstOpExprClause(opExpr, &var, NULL))
{
attributeNums = lappend_int(attributeNums, var->varattno);
}
@ -3671,7 +3726,7 @@ FetchEqualityAttrNumsForRTEOpExpr(OpExpr *opExpr, Index rteIndex)
* - belongs to rangeTableEntry with rteIndex
*/
static List *
FetchEqualityAttrNumsForRTEBoolExpr(BoolExpr *boolExpr, Index rteIndex)
FetchEqualityAttrNumsForRTEBoolExpr(BoolExpr *boolExpr)
{
if (boolExpr->boolop != AND_EXPR && boolExpr->boolop != OR_EXPR)
{
@ -3683,8 +3738,7 @@ FetchEqualityAttrNumsForRTEBoolExpr(BoolExpr *boolExpr, Index rteIndex)
Node *arg = NULL;
foreach_ptr(arg, boolExpr->args)
{
List *attributeNumsInSubExpression = FetchEqualityAttrNumsForRTEFromQuals(arg,
rteIndex);
List *attributeNumsInSubExpression = FetchEqualityAttrNumsForRTE(arg);
if (boolExpr->boolop == AND_EXPR)
{
hasEquality |= list_length(attributeNumsInSubExpression) > 0;
@ -5466,7 +5520,6 @@ ActiveShardPlacementLists(List *taskList)
{
Task *task = (Task *) lfirst(taskCell);
uint64 anchorShardId = task->anchorShardId;
List *shardPlacementList = ActiveShardPlacementList(anchorShardId);
/* filter out shard placements that reside in inactive nodes */

View File

@ -170,7 +170,8 @@ static int CompareInsertValuesByShardId(const void *leftElement,
const void *rightElement);
static List * SingleShardTaskList(Query *query, uint64 jobId,
List *relationShardList, List *placementList,
uint64 shardId, bool parametersInQueryResolved);
uint64 shardId, bool parametersInQueryResolved,
bool containsOnlyLocalTable);
static bool RowLocksOnRelations(Node *node, List **rtiLockList);
static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job,
TaskAssignmentPolicyType
@ -1717,6 +1718,8 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
/* router planner should create task even if it doesn't hit a shard at all */
bool replacePrunedQueryWithDummy = true;
bool containsOnlyLocalTable = false;
/* check if this query requires coordinator evaluation */
bool requiresCoordinatorEvaluation = RequiresCoordinatorEvaluation(originalQuery);
FastPathRestrictionContext *fastPathRestrictionContext =
@ -1744,7 +1747,8 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
&prunedShardIntervalListList,
replacePrunedQueryWithDummy,
&isMultiShardModifyQuery,
&partitionKeyValue);
&partitionKeyValue,
&containsOnlyLocalTable);
}
if (*planningError)
@ -1754,7 +1758,6 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
Job *job = CreateJob(originalQuery);
job->partitionKeyValue = partitionKeyValue;
job->onDummyPlacement = replacePrunedQueryWithDummy;
if (originalQuery->resultRelation > 0)
{
@ -1783,14 +1786,15 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
MODIFY_TASK,
requiresCoordinatorEvaluation,
planningError);
if (*planningError) {
if (*planningError)
{
return NULL;
}
}
}
else
{
GenerateSingleShardRouterTaskList(job, relationShardList,
placementList, shardId);
placementList, shardId, containsOnlyLocalTable);
}
job->requiresCoordinatorEvaluation = requiresCoordinatorEvaluation;
@ -1806,7 +1810,8 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
*/
void
GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,
List *placementList, uint64 shardId)
List *placementList, uint64 shardId, bool
containsOnlyLocalTable)
{
Query *originalQuery = job->jobQuery;
@ -1815,7 +1820,9 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,
job->taskList = SingleShardTaskList(originalQuery, job->jobId,
relationShardList, placementList,
shardId,
job->parametersInJobQueryResolved);
job->parametersInJobQueryResolved,
containsOnlyLocalTable);
/*
* Queries to reference tables, or distributed tables with multiple replica's have
* their task placements reordered according to the configured
@ -1831,7 +1838,7 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,
placementList);
}
}
else if (shardId == INVALID_SHARD_ID && !job->onDummyPlacement)
else if (shardId == INVALID_SHARD_ID && !containsOnlyLocalTable)
{
/* modification that prunes to 0 shards */
job->taskList = NIL;
@ -1841,7 +1848,8 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,
job->taskList = SingleShardTaskList(originalQuery, job->jobId,
relationShardList, placementList,
shardId,
job->parametersInJobQueryResolved);
job->parametersInJobQueryResolved,
containsOnlyLocalTable);
}
}
@ -1934,7 +1942,8 @@ RemoveCoordinatorPlacementIfNotSingleNode(List *placementList)
static List *
SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList,
List *placementList, uint64 shardId,
bool parametersInQueryResolved)
bool parametersInQueryResolved,
bool containsOnlyLocalTable)
{
TaskType taskType = READ_TASK;
char replicationModel = 0;
@ -1993,6 +2002,7 @@ SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList,
}
Task *task = CreateTask(taskType);
task->containsOnlyLocalTable = containsOnlyLocalTable;
List *relationRowLockList = NIL;
RowLocksOnRelations((Node *) query, &relationRowLockList);
@ -2104,6 +2114,8 @@ SelectsFromDistributedTable(List *rangeTableList, Query *query)
}
static bool ContainsOnlyLocalTables(RTEListProperties *rteProperties);
/*
* RouterQuery runs router pruning logic for SELECT, UPDATE and DELETE queries.
* If there are shards present and query is routable, all RTEs have been updated
@ -2131,7 +2143,8 @@ PlanRouterQuery(Query *originalQuery,
List **placementList, uint64 *anchorShardId, List **relationShardList,
List **prunedShardIntervalListList,
bool replacePrunedQueryWithDummy, bool *multiShardModifyQuery,
Const **partitionValueConst)
Const **partitionValueConst,
bool *containsOnlyLocalTable)
{
bool isMultiShardQuery = false;
DeferredErrorMessage *planningError = NULL;
@ -2247,6 +2260,10 @@ PlanRouterQuery(Query *originalQuery,
/* both Postgres tables and materialized tables are locally avaliable */
RTEListProperties *rteProperties = GetRTEListPropertiesForQuery(originalQuery);
if (shardId == INVALID_SHARD_ID && ContainsOnlyLocalTables(rteProperties))
{
*containsOnlyLocalTable = true;
}
bool hasPostgresLocalRelation =
rteProperties->hasPostgresLocalTable || rteProperties->hasMaterializedView;
List *taskPlacementList =
@ -2280,6 +2297,13 @@ PlanRouterQuery(Query *originalQuery,
}
static bool
ContainsOnlyLocalTables(RTEListProperties *rteProperties)
{
return !rteProperties->hasDistributedTable && !rteProperties->hasReferenceTable;
}
/*
* CreateTaskPlacementListForShardIntervals returns a list of shard placements
* on which it can access all shards in shardIntervalListList, which contains

View File

@ -100,6 +100,20 @@
#include "utils/guc.h"
#include "utils/lsyscache.h"
/*
* RecursivePlanningContext is used to recursively plan subqueries
* and CTEs, pull results to the coordinator, and push it back into
* the workers.
*/
struct RecursivePlanningContextInternal
{
int level;
uint64 planId;
bool allDistributionKeysInQueryAreEqual; /* used for some optimizations */
List *subPlanList;
PlannerRestrictionContext *plannerRestrictionContext;
};
/* track depth of current recursive planner query */
static int recursivePlanningDepth = 0;
@ -159,7 +173,7 @@ static bool AllDistributionKeysInSubqueryAreEqual(Query *subquery,
static bool IsTableLocallyAccessible(Oid relationId);
static bool ShouldRecursivelyPlanSetOperation(Query *query,
RecursivePlanningContext *context);
static void RecursivelyPlanSubquery(Query *subquery,
static bool RecursivelyPlanSubquery(Query *subquery,
RecursivePlanningContext *planningContext);
static void RecursivelyPlanSetOperations(Query *query, Node *node,
RecursivePlanningContext *context);
@ -178,11 +192,10 @@ static Query * BuildReadIntermediateResultsQuery(List *targetEntryList,
List *columnAliasList,
Const *resultIdConst, Oid functionOid,
bool useBinaryCopyFormat);
static void
UpdateVarNosInNode(Query *query, Index newVarNo);
static void UpdateVarNosInNode(Query *query, Index newVarNo);
static bool ModifiesLocalTableWithRemoteCitusLocalTable(List *rangeTableList);
static void GetRangeTableEntriesFromJoinTree(Node *joinNode, List *rangeTableList,
List **joinRangeTableEntries);
List **joinRangeTableEntries);
/*
* GenerateSubplansForSubqueriesAndCTEs is a wrapper around RecursivelyPlanSubqueriesAndCTEs.
@ -347,22 +360,34 @@ RecursivelyPlanSubqueriesAndCTEs(Query *query, RecursivePlanningContext *context
&rangeTableList);
if (ShouldConvertLocalTableJoinsToSubqueries(query, rangeTableList,
plannerRestrictionContext)) {
plannerRestrictionContext))
{
/*
* Logical planner cannot handle "local_table" [OUTER] JOIN "dist_table", or
* a query with local table/citus local table and subquery. We convert local/citus local
* tables to a subquery until they can be planned.
* This is the last call in this function since we want the other calls to be finished
* so that we can check if the current plan is router plannable at any step within this function.
*/
* Logical planner cannot handle "local_table" [OUTER] JOIN "dist_table", or
* a query with local table/citus local table and subquery. We convert local/citus local
* tables to a subquery until they can be planned.
* This is the last call in this function since we want the other calls to be finished
* so that we can check if the current plan is router plannable at any step within this function.
*/
RecursivelyPlanLocalTableJoins(query, context, rangeTableList);
}
return NULL;
}
/*
* GetPlannerRestrictionContext returns the planner restriction context
* from the given context.
*/
PlannerRestrictionContext *
GetPlannerRestrictionContext(RecursivePlanningContext *recursivePlanningContext)
{
return recursivePlanningContext->plannerRestrictionContext;
}
/*
* GetRangeTableEntriesFromJoinTree gets the range table entries that are
* on the given join tree.
@ -407,7 +432,6 @@ GetRangeTableEntriesFromJoinTree(Node *joinNode, List *rangeTableList,
}
/*
* ShouldRecursivelyPlanNonColocatedSubqueries returns true if the input query contains joins
* that are not on the distribution key.
@ -1180,7 +1204,7 @@ IsRelationLocalTableOrMatView(Oid relationId)
* and immediately returns. Later, the planner decides on what to do
* with the query.
*/
static void
static bool
RecursivelyPlanSubquery(Query *subquery, RecursivePlanningContext *planningContext)
{
uint64 planId = planningContext->planId;
@ -1191,7 +1215,7 @@ RecursivelyPlanSubquery(Query *subquery, RecursivePlanningContext *planningConte
elog(DEBUG2, "skipping recursive planning for the subquery since it "
"contains references to outer queries");
return;
return false;
}
/*
@ -1234,6 +1258,7 @@ RecursivelyPlanSubquery(Query *subquery, RecursivePlanningContext *planningConte
/* finally update the input subquery to point the result query */
*subquery = *resultQuery;
return true;
}
@ -1419,16 +1444,20 @@ NodeContainsSubqueryReferencingOuterQuery(Node *node)
/*
* ReplaceRTERelationWithRteSubquery replaces the input rte relation target entry
* with a subquery. The function also pushes down the filters to the subquery.
*
*
* It then recursively plans the subquery.
*/
void
ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, List *restrictionList,
ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry,
List *requiredAttrNumbers,
RecursivePlanningContext *context)
{
Query *subquery = WrapRteRelationIntoSubquery(rangeTableEntry, requiredAttrNumbers);
Expr *andedBoundExpressions = make_ands_explicit(restrictionList);
List *restrictionList =
GetRestrictInfoListForRelation(rangeTableEntry,
context->plannerRestrictionContext);
List *copyRestrictionList = copyObject(restrictionList);
Expr *andedBoundExpressions = make_ands_explicit(copyRestrictionList);
subquery->jointree->quals = (Node *) andedBoundExpressions;
UpdateVarNosInNode(subquery, SINGLE_RTE_INDEX);
@ -1456,7 +1485,13 @@ ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, List *restrict
}
/* as we created the subquery, now forcefully recursively plan it */
RecursivelyPlanSubquery(rangeTableEntry->subquery, context);
bool recursivellyPlanned = RecursivelyPlanSubquery(rangeTableEntry->subquery,
context);
if (!recursivellyPlanned)
{
ereport(ERROR, (errmsg(
"unexpected state: query should have been recursively planned")));
}
}
@ -1610,6 +1645,7 @@ ContainsLocalTableDistributedTableJoin(List *rangeTableList)
return containsLocalTable && containsDistributedTable;
}
/*
* WrapFunctionsInSubqueries iterates over all the immediate Range Table Entries
* of a query and wraps the functions inside (SELECT * FROM fnc() f)
@ -1733,7 +1769,6 @@ TransformFunctionRTE(RangeTblEntry *rangeTblEntry)
subquery->targetList = lappend(subquery->targetList, targetEntry);
}
}
/*
* If tupleDesc is NULL we have 2 different cases:
*
@ -1783,7 +1818,6 @@ TransformFunctionRTE(RangeTblEntry *rangeTblEntry)
columnType = list_nth_oid(rangeTblFunction->funccoltypes,
targetColumnIndex);
}
/* use the types in the function definition otherwise */
else
{

View File

@ -1848,34 +1848,25 @@ List *
GetRestrictInfoListForRelation(RangeTblEntry *rangeTblEntry,
PlannerRestrictionContext *plannerRestrictionContext)
{
int rteIdentity = GetRTEIdentity(rangeTblEntry);
RelationRestrictionContext *relationRestrictionContext =
plannerRestrictionContext->relationRestrictionContext;
Relids queryRteIdentities = bms_make_singleton(rteIdentity);
RelationRestrictionContext *filteredRelationRestrictionContext =
FilterRelationRestrictionContext(relationRestrictionContext, queryRteIdentities);
List *filteredRelationRestrictionList =
filteredRelationRestrictionContext->relationRestrictionList;
if (list_length(filteredRelationRestrictionList) != 1)
RelationRestriction *relationRestriction =
RelationRestrictionForRelation(rangeTblEntry, plannerRestrictionContext);
if (relationRestriction == NULL)
{
return NIL;
}
RelationRestriction *relationRestriction =
(RelationRestriction *) linitial(filteredRelationRestrictionList);
RelOptInfo *relOptInfo = relationRestriction->relOptInfo;
List *baseRestrictInfo = relOptInfo->baserestrictinfo;
List *joinRestrictInfo = relOptInfo->joininfo;
List *baseRestrictInfo = relOptInfo->baserestrictinfo;
List *joinRrestrictClauseList = get_all_actual_clauses(joinRestrictInfo);
if (ContainsFalseClause(joinRrestrictClauseList))
List *joinRestrictClauseList = get_all_actual_clauses(joinRestrictInfo);
if (ContainsFalseClause(joinRestrictClauseList))
{
/* found WHERE false, no need to continue */
return copyObject((List *) joinRrestrictClauseList);
return copyObject((List *) joinRestrictClauseList);
}
List *restrictExprList = NIL;
RestrictInfo *restrictInfo = NULL;
foreach_ptr(restrictInfo, baseRestrictInfo)
@ -1919,6 +1910,34 @@ GetRestrictInfoListForRelation(RangeTblEntry *rangeTblEntry,
}
/*
* RelationRestrictionForRelation gets the relation restriction for the given
* range table entry.
*/
RelationRestriction *
RelationRestrictionForRelation(RangeTblEntry *rangeTableEntry,
PlannerRestrictionContext *plannerRestrictionContext)
{
int rteIdentity = GetRTEIdentity(rangeTableEntry);
RelationRestrictionContext *relationRestrictionContext =
plannerRestrictionContext->relationRestrictionContext;
Relids queryRteIdentities = bms_make_singleton(rteIdentity);
RelationRestrictionContext *filteredRelationRestrictionContext =
FilterRelationRestrictionContext(relationRestrictionContext, queryRteIdentities);
List *filteredRelationRestrictionList =
filteredRelationRestrictionContext->relationRestrictionList;
if (list_length(filteredRelationRestrictionList) != 1)
{
return NULL;
}
RelationRestriction *relationRestriction =
(RelationRestriction *) linitial(filteredRelationRestrictionList);
return relationRestriction;
}
/*
* IsParam determines whether the given node is a param.
*/

View File

@ -720,7 +720,7 @@ RegisterCitusConfigVariables(void)
PGC_SIGHUP,
GUC_SUPERUSER_ONLY,
NULL, NULL, LocalPoolSizeGucShowHook);
DefineCustomEnumVariable(
"citus.local_table_join_policy",
gettext_noop("defines the behaviour when a distributed table "

View File

@ -101,7 +101,6 @@ copyJobInfo(Job *newnode, Job *from)
COPY_NODE_FIELD(partitionKeyValue);
COPY_NODE_FIELD(localPlannedStatements);
COPY_SCALAR_FIELD(parametersInJobQueryResolved);
COPY_SCALAR_FIELD(onDummyPlacement);
}
@ -328,6 +327,7 @@ CopyNodeTask(COPYFUNC_ARGS)
COPY_SCALAR_FIELD(fetchedExplainAnalyzePlacementIndex);
COPY_STRING_FIELD(fetchedExplainAnalyzePlan);
COPY_SCALAR_FIELD(fetchedExplainAnalyzeExecutionDuration);
COPY_SCALAR_FIELD(containsOnlyLocalTable);
}

View File

@ -540,6 +540,7 @@ OutTask(OUTFUNC_ARGS)
WRITE_INT_FIELD(fetchedExplainAnalyzePlacementIndex);
WRITE_STRING_FIELD(fetchedExplainAnalyzePlan);
WRITE_FLOAT_FIELD(fetchedExplainAnalyzeExecutionDuration, "%.2f");
WRITE_BOOL_FIELD(containsOnlyLocalTable);
}

View File

@ -27,11 +27,12 @@ typedef enum
extern int LocalTableJoinPolicy;
extern bool
ShouldConvertLocalTableJoinsToSubqueries(Query *query,
List *rangeTableList,
PlannerRestrictionContext *plannerRestrictionContext);
extern bool ShouldConvertLocalTableJoinsToSubqueries(Query *query,
List *rangeTableList,
PlannerRestrictionContext *
plannerRestrictionContext);
extern void RecursivelyPlanLocalTableJoins(Query *query,
RecursivePlanningContext *context, List *rangeTableList);
RecursivePlanningContext *context,
List *rangeTableList);
#endif /* LOCAL_DISTRIBUTED_JOIN_PLANNER_H */

View File

@ -163,7 +163,6 @@ typedef struct Job
* query.
*/
bool parametersInJobQueryResolved;
bool onDummyPlacement;
} Job;
@ -329,6 +328,11 @@ typedef struct Task
* ExplainTaskList().
*/
double fetchedExplainAnalyzeExecutionDuration;
/*
* containsOnlyLocalTable is true if the task contains only postgres table/MV.
*/
bool containsOnlyLocalTable;
} Task;
@ -579,6 +583,8 @@ extern List * QueryPushdownSqlTaskList(Query *query, uint64 jobId,
bool modifyRequiresCoordinatorEvaluation,
DeferredErrorMessage **planningError);
extern bool OnlyLocalTableJob(Job *job);
/* function declarations for managing jobs */
extern uint64 UniqueJobId(void);
@ -591,6 +597,6 @@ extern RangeTblEntry * DerivedRangeTableEntry(MultiNode *multiNode, List *column
List *funcColumnTypeMods,
List *funcCollations);
extern List * FetchEqualityAttrNumsForRTEFromQuals(Node *quals, Index rteIndex);
extern List * FetchEqualityAttrNumsForRTE(Node *quals);
#endif /* MULTI_PHYSICAL_PLANNER_H */

View File

@ -42,7 +42,8 @@ extern DeferredErrorMessage * PlanRouterQuery(Query *originalQuery,
List **prunedShardIntervalListList,
bool replacePrunedQueryWithDummy,
bool *multiShardModifyQuery,
Const **partitionValueConst);
Const **partitionValueConst,
bool *containOnlyLocalTable);
extern List * RelationShardListForShardIntervalList(List *shardIntervalList,
bool *shardsPresent);
extern List * CreateTaskPlacementListForShardIntervals(List *shardIntervalList,
@ -83,9 +84,10 @@ extern List * TargetShardIntervalForFastPathQuery(Query *query,
Const *inputDistributionKeyValue,
Const **outGoingPartitionValueConst);
extern void GenerateSingleShardRouterTaskList(Job *job,
List *relationShardList,
List *placementList,
uint64 shardId);
List *relationShardList,
List *placementList,
uint64 shardId,
bool containsOnlyLocalTable);
extern bool IsRouterPlannable(Query *query,
PlannerRestrictionContext *plannerRestrictionContext);

View File

@ -22,21 +22,16 @@
#include "nodes/relation.h"
#endif
/*
* RecursivePlanningContext is used to recursively plan subqueries
* and CTEs, pull results to the coordinator, and push it back into
* the workers.
*/
typedef struct RecursivePlanningContext
typedef struct RecursivePlanningContextInternal RecursivePlanningContext;
typedef struct RangeTblEntryIndex
{
int level;
uint64 planId;
bool allDistributionKeysInQueryAreEqual; /* used for some optimizations */
List *subPlanList;
PlannerRestrictionContext *plannerRestrictionContext;
} RecursivePlanningContext;
RangeTblEntry *rangeTableEntry;
Index rteIndex;
}RangeTblEntryIndex;
extern PlannerRestrictionContext * GetPlannerRestrictionContext(
RecursivePlanningContext *recursivePlanningContext);
extern List * GenerateSubplansForSubqueriesAndCTEs(uint64 planId, Query *originalQuery,
PlannerRestrictionContext *
plannerRestrictionContext);
@ -50,7 +45,6 @@ extern Query * BuildReadIntermediateResultsArrayQuery(List *targetEntryList,
extern bool GeneratingSubplans(void);
extern bool ContainsLocalTableDistributedTableJoin(List *rangeTableList);
extern void ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry,
List *restrictionList,
List *requiredAttrNumbers,
RecursivePlanningContext *context);
extern bool ContainsTableToBeConvertedToSubquery(List *rangeTableList);

View File

@ -41,13 +41,18 @@ extern PlannerRestrictionContext * FilterPlannerRestrictionForQuery(
extern List * GetRestrictInfoListForRelation(RangeTblEntry *rangeTblEntry,
PlannerRestrictionContext *
plannerRestrictionContext);
extern RelationRestriction * RelationRestrictionForRelation(
RangeTblEntry *rangeTableEntry,
PlannerRestrictionContext *
plannerRestrictionContext);
extern JoinRestrictionContext * RemoveDuplicateJoinRestrictions(JoinRestrictionContext *
joinRestrictionContext);
extern bool EquivalenceListContainsRelationsEquality(List *attributeEquivalenceList,
RelationRestrictionContext *
restrictionContext);
extern RelationRestrictionContext *
FilterRelationRestrictionContext(RelationRestrictionContext *relationRestrictionContext,
Relids queryRteIdentities);
extern RelationRestrictionContext * FilterRelationRestrictionContext(
RelationRestrictionContext *relationRestrictionContext,
Relids
queryRteIdentities);
#endif /* RELATION_RESTRICTION_EQUIVALENCE_H */

View File

@ -446,9 +446,11 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT sum((d1.id OP
(1 row)
SELECT count(*) FROM distributed d1 JOIN local USING (id) LEFT JOIN distributed d2 USING (id) WHERE d2.id = 1 ORDER BY 1 DESC LIMIT 4;
DEBUG: Wrapping relation "local" to a subquery: SELECT id, NULL::text AS title FROM local_dist_join_mixed.local WHERE (id OPERATOR(pg_catalog.=) 1)
DEBUG: generating subplan XXX_1 for subquery SELECT id, NULL::text AS title FROM local_dist_join_mixed.local WHERE (id OPERATOR(pg_catalog.=) 1)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((local_dist_join_mixed.distributed d1 JOIN (SELECT intermediate_result.id, intermediate_result.title FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, title text)) local USING (id)) LEFT JOIN local_dist_join_mixed.distributed d2 USING (id)) WHERE (d2.id OPERATOR(pg_catalog.=) 1) ORDER BY (count(*)) DESC LIMIT 4
DEBUG: Wrapping relation "distributed" to a subquery: SELECT id, NULL::text AS name, NULL::timestamp with time zone AS created_at FROM local_dist_join_mixed.distributed d1 WHERE (id OPERATOR(pg_catalog.=) 1)
DEBUG: generating subplan XXX_1 for subquery SELECT id, NULL::text AS name, NULL::timestamp with time zone AS created_at FROM local_dist_join_mixed.distributed d1 WHERE (id OPERATOR(pg_catalog.=) 1)
DEBUG: Wrapping relation "distributed" to a subquery: SELECT id, NULL::text AS name, NULL::timestamp with time zone AS created_at FROM local_dist_join_mixed.distributed d2 WHERE (id OPERATOR(pg_catalog.=) 1)
DEBUG: generating subplan XXX_2 for subquery SELECT id, NULL::text AS name, NULL::timestamp with time zone AS created_at FROM local_dist_join_mixed.distributed d2 WHERE (id OPERATOR(pg_catalog.=) 1)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((SELECT intermediate_result.id, intermediate_result.name, intermediate_result.created_at FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, name text, created_at timestamp with time zone)) d1 JOIN local_dist_join_mixed.local USING (id)) LEFT JOIN (SELECT intermediate_result.id, intermediate_result.name, intermediate_result.created_at FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, name text, created_at timestamp with time zone)) d2 USING (id)) WHERE (d2.id OPERATOR(pg_catalog.=) 1) ORDER BY (count(*)) DESC LIMIT 4
count
---------------------------------------------------------------------
1
@ -1171,9 +1173,9 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c
-- w count(*) it works fine as PG ignores the inner tables
SELECT count(*) FROM distributed LEFT JOIN local USING (id);
DEBUG: Wrapping relation "local" to a subquery: SELECT NULL::bigint AS id, NULL::text AS title FROM local_dist_join_mixed.local WHERE true
DEBUG: generating subplan XXX_1 for subquery SELECT NULL::bigint AS id, NULL::text AS title FROM local_dist_join_mixed.local WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_dist_join_mixed.distributed LEFT JOIN (SELECT intermediate_result.id, intermediate_result.title FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, title text)) local USING (id))
DEBUG: Wrapping relation "distributed" to a subquery: SELECT id, NULL::text AS name, NULL::timestamp with time zone AS created_at FROM local_dist_join_mixed.distributed WHERE true
DEBUG: generating subplan XXX_1 for subquery SELECT id, NULL::text AS name, NULL::timestamp with time zone AS created_at FROM local_dist_join_mixed.distributed WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.id, intermediate_result.name, intermediate_result.created_at FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, name text, created_at timestamp with time zone)) distributed LEFT JOIN local_dist_join_mixed.local USING (id))
count
---------------------------------------------------------------------
101
@ -1188,20 +1190,19 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c
101
(1 row)
SELECT id, name FROM distributed LEFT JOIN local USING (id) LIMIT 1;
DEBUG: Wrapping relation "local" to a subquery: SELECT NULL::bigint AS id, NULL::text AS title FROM local_dist_join_mixed.local WHERE true
DEBUG: generating subplan XXX_1 for subquery SELECT NULL::bigint AS id, NULL::text AS title FROM local_dist_join_mixed.local WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT distributed.id, distributed.name FROM (local_dist_join_mixed.distributed LEFT JOIN (SELECT intermediate_result.id, intermediate_result.title FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, title text)) local USING (id)) LIMIT 1
DEBUG: push down of limit count: 1
SELECT id, name FROM distributed LEFT JOIN local USING (id) ORDER BY 1 LIMIT 1;
DEBUG: Wrapping relation "distributed" to a subquery: SELECT id, name, NULL::timestamp with time zone AS created_at FROM local_dist_join_mixed.distributed WHERE true
DEBUG: generating subplan XXX_1 for subquery SELECT id, name, NULL::timestamp with time zone AS created_at FROM local_dist_join_mixed.distributed WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT distributed.id, distributed.name FROM ((SELECT intermediate_result.id, intermediate_result.name, intermediate_result.created_at FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, name text, created_at timestamp with time zone)) distributed LEFT JOIN local_dist_join_mixed.local USING (id)) ORDER BY distributed.id LIMIT 1
id | name
---------------------------------------------------------------------
1 | 1
0 | 0
(1 row)
SELECT id, name FROM local LEFT JOIN distributed USING (id) LIMIT 1;
SELECT id, name FROM local LEFT JOIN distributed USING (id) ORDER BY 1 LIMIT 1;
DEBUG: Wrapping relation "local" to a subquery: SELECT id, NULL::text AS title FROM local_dist_join_mixed.local WHERE true
DEBUG: generating subplan XXX_1 for subquery SELECT id, NULL::text AS title FROM local_dist_join_mixed.local WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT local.id, distributed.name FROM ((SELECT intermediate_result.id, intermediate_result.title FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, title text)) local LEFT JOIN local_dist_join_mixed.distributed USING (id)) LIMIT 1
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT local.id, distributed.name FROM ((SELECT intermediate_result.id, intermediate_result.title FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, title text)) local LEFT JOIN local_dist_join_mixed.distributed USING (id)) ORDER BY local.id LIMIT 1
ERROR: cannot pushdown the subquery
DETAIL: Complex subqueries and CTEs cannot be in the outer part of the outer join
SELECT
@ -1521,6 +1522,78 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT foo1.id FROM
---------------------------------------------------------------------
(0 rows)
SELECT
count(*)
FROM
distributed
JOIN LATERAL
(SELECT
*
FROM
local
JOIN
distributed d2
ON(true)
WHERE local.id = distributed.id AND d2.id = local.id) as foo
ON (true);
DEBUG: Wrapping relation "local" to a subquery: SELECT id, NULL::text AS title FROM local_dist_join_mixed.local WHERE true
DEBUG: generating subplan XXX_1 for subquery SELECT id, NULL::text AS title FROM local_dist_join_mixed.local WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_dist_join_mixed.distributed JOIN LATERAL (SELECT local.id, local.title, d2.id, d2.name, d2.created_at FROM ((SELECT intermediate_result.id, intermediate_result.title FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, title text)) local JOIN local_dist_join_mixed.distributed d2 ON (true)) WHERE ((local.id OPERATOR(pg_catalog.=) distributed.id) AND (d2.id OPERATOR(pg_catalog.=) local.id))) foo(id, title, id_1, name, created_at) ON (true))
count
---------------------------------------------------------------------
101
(1 row)
SELECT local.title, local.title FROM local JOIN distributed USING(id) ORDER BY 1,2 LIMIt 1;
DEBUG: Wrapping relation "local" to a subquery: SELECT id, title FROM local_dist_join_mixed.local WHERE true
DEBUG: generating subplan XXX_1 for subquery SELECT id, title FROM local_dist_join_mixed.local WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT local.title, local.title FROM ((SELECT intermediate_result.id, intermediate_result.title FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, title text)) local JOIN local_dist_join_mixed.distributed USING (id)) ORDER BY local.title, local.title LIMIT 1
DEBUG: push down of limit count: 1
title | title
---------------------------------------------------------------------
0 | 0
(1 row)
SELECT NULL FROM local JOIN distributed USING(id) ORDER BY 1 LIMIt 1;
DEBUG: Wrapping relation "local" to a subquery: SELECT id, NULL::text AS title FROM local_dist_join_mixed.local WHERE true
DEBUG: generating subplan XXX_1 for subquery SELECT id, NULL::text AS title FROM local_dist_join_mixed.local WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT NULL::text FROM ((SELECT intermediate_result.id, intermediate_result.title FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, title text)) local JOIN local_dist_join_mixed.distributed USING (id)) ORDER BY NULL::text LIMIT 1
DEBUG: push down of limit count: 1
?column?
---------------------------------------------------------------------
(1 row)
SELECT distributed.name, distributed.name, local.title, local.title FROM local JOIN distributed USING(id) ORDER BY 1,2,3,4 LIMIT 1;
DEBUG: Wrapping relation "local" to a subquery: SELECT id, title FROM local_dist_join_mixed.local WHERE true
DEBUG: generating subplan XXX_1 for subquery SELECT id, title FROM local_dist_join_mixed.local WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT distributed.name, distributed.name, local.title, local.title FROM ((SELECT intermediate_result.id, intermediate_result.title FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, title text)) local JOIN local_dist_join_mixed.distributed USING (id)) ORDER BY distributed.name, distributed.name, local.title, local.title LIMIT 1
DEBUG: push down of limit count: 1
name | name | title | title
---------------------------------------------------------------------
0 | 0 | 0 | 0
(1 row)
SELECT
COUNT(*)
FROM
local
JOIN
distributed
USING
(id)
JOIN
(SELECT id, NULL, NULL FROM distributed) foo
USING
(id);
DEBUG: Wrapping relation "local" to a subquery: SELECT id, NULL::text AS title FROM local_dist_join_mixed.local WHERE true
DEBUG: generating subplan XXX_1 for subquery SELECT id, NULL::text AS title FROM local_dist_join_mixed.local WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((SELECT intermediate_result.id, intermediate_result.title FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, title text)) local JOIN local_dist_join_mixed.distributed USING (id)) JOIN (SELECT distributed_1.id, NULL::text, NULL::text FROM local_dist_join_mixed.distributed distributed_1) foo(id, "?column?", "?column?_1") USING (id))
count
---------------------------------------------------------------------
101
(1 row)
DROP SCHEMA local_dist_join_mixed CASCADE;
NOTICE: drop cascades to 7 other objects
DETAIL: drop cascades to table distributed

View File

@ -1471,17 +1471,29 @@ BEGIN;
UPDATE partitioning_locks_2009 SET time = '2009-03-01';
-- see the locks on parent table
SELECT * FROM lockinfo;
logicalrelid | locktype | mode
logicalrelid | locktype | mode
---------------------------------------------------------------------
partitioning_locks | shard | ShareUpdateExclusiveLock
partitioning_locks | shard | ShareUpdateExclusiveLock
partitioning_locks | shard | ShareUpdateExclusiveLock
partitioning_locks | shard | ShareUpdateExclusiveLock
partitioning_locks_2009 | shard | ShareUpdateExclusiveLock
partitioning_locks_2009 | shard | ShareUpdateExclusiveLock
partitioning_locks_2009 | shard | ShareUpdateExclusiveLock
partitioning_locks_2009 | shard | ShareUpdateExclusiveLock
(8 rows)
partitioning_locks | colocated_shards_metadata | ShareLock
partitioning_locks | colocated_shards_metadata | ShareLock
partitioning_locks | colocated_shards_metadata | ShareLock
partitioning_locks | colocated_shards_metadata | ShareLock
partitioning_locks | shard | ShareUpdateExclusiveLock
partitioning_locks | shard | ShareUpdateExclusiveLock
partitioning_locks | shard | ShareUpdateExclusiveLock
partitioning_locks | shard | ShareUpdateExclusiveLock
partitioning_locks_2009 | colocated_shards_metadata | ShareLock
partitioning_locks_2009 | colocated_shards_metadata | ShareLock
partitioning_locks_2009 | colocated_shards_metadata | ShareLock
partitioning_locks_2009 | colocated_shards_metadata | ShareLock
partitioning_locks_2009 | shard | ShareUpdateExclusiveLock
partitioning_locks_2009 | shard | ShareUpdateExclusiveLock
partitioning_locks_2009 | shard | ShareUpdateExclusiveLock
partitioning_locks_2009 | shard | ShareUpdateExclusiveLock
partitioning_locks_2010 | colocated_shards_metadata | ShareLock
partitioning_locks_2010 | colocated_shards_metadata | ShareLock
partitioning_locks_2010 | colocated_shards_metadata | ShareLock
partitioning_locks_2010 | colocated_shards_metadata | ShareLock
(20 rows)
COMMIT;
-- test shard resource locks with TRUNCATE

View File

@ -222,10 +222,10 @@ test: multi_modifying_xacts
test: multi_repartition_udt multi_repartitioned_subquery_udf multi_subtransactions
test: multi_transaction_recovery
test: local_dist_join_modifications
test: local_dist_join_modifications
test: local_table_join
test: local_dist_join_mixed
test: citus_local_dist_joins
test: citus_local_dist_joins
# ---------
# multi_copy creates hash and range-partitioned tables and performs COPY

View File

@ -128,7 +128,7 @@ SET
FROM
citus_local
WHERE
mv1.key = citus_local.key;
mv1.key = citus_local.key;
ROLLBACK;
BEGIN;
@ -139,7 +139,7 @@ SET
FROM
mv1
WHERE
mv1.key = citus_local.key;
mv1.key = citus_local.key;
ROLLBACK;
BEGIN;
@ -150,9 +150,9 @@ SET
FROM
mv2
WHERE
mv2.key = citus_local.key;
mv2.key = citus_local.key;
ROLLBACK;
-- DELETE operations
BEGIN;
@ -204,21 +204,21 @@ DELETE FROM
USING
citus_local
WHERE
mv1.key = citus_local.key;
mv1.key = citus_local.key;
DELETE FROM
citus_local
USING
mv1
WHERE
mv1.key = citus_local.key;
mv1.key = citus_local.key;
DELETE FROM
citus_local
USING
mv2
WHERE
mv2.key = citus_local.key;
mv2.key = citus_local.key;
SELECT count(*) FROM postgres_table JOIN (SELECT * FROM (SELECT * FROM distributed_table LIMIT 1) d1) d2 using (key) JOIN reference_table USING(key) JOIN citus_local USING (key) JOIN (SELECT * FROM citus_local) c1 USING (key) WHERE d2.key > 10 AND d2.key = 10;
SELECT count(*) FROM postgres_table JOIN (SELECT * FROM (SELECT * FROM distributed_table LIMIT 1) d1) d2 using (key) JOIN reference_table USING(key) JOIN citus_local USING (key) JOIN (SELECT * FROM citus_local) c1 USING (key) WHERE d2.key > 10 AND d2.key = 10;
@ -229,4 +229,4 @@ SET client_min_messages to ERROR;
DROP TABLE citus_local;
SELECT master_remove_node('localhost', :master_port);
\set VERBOSITY terse
DROP SCHEMA citus_local_dist_joins CASCADE;
DROP SCHEMA citus_local_dist_joins CASCADE;

View File

@ -3,13 +3,13 @@ SET search_path TO local_dist_join_mixed;
CREATE TABLE distributed (id bigserial PRIMARY KEY,
name text,
CREATE TABLE distributed (id bigserial PRIMARY KEY,
name text,
created_at timestamptz DEFAULT now());
CREATE TABLE reference (id bigserial PRIMARY KEY,
CREATE TABLE reference (id bigserial PRIMARY KEY,
title text);
CREATE TABLE local (id bigserial PRIMARY KEY,
CREATE TABLE local (id bigserial PRIMARY KEY,
title text);
-- these above restrictions brought us to the following schema
@ -100,118 +100,118 @@ SELECT sum(d1.id + local.id) OVER (PARTITION BY d1.id + local.id) FROM distribut
-- nested subqueries
SELECT
count(*)
FROM
SELECT
count(*)
FROM
(SELECT * FROM (SELECT * FROM distributed) as foo) as bar
JOIN
JOIN
local
USING(id);
SELECT
count(*)
FROM
SELECT
count(*)
FROM
(SELECT *, random() FROM (SELECT *, random() FROM distributed) as foo) as bar
JOIN
JOIN
local
USING(id);
SELECT
count(*)
FROM
SELECT
count(*)
FROM
(SELECT *, random() FROM (SELECT *, random() FROM distributed) as foo) as bar
JOIN
JOIN
local
USING(id);
SELECT
count(*)
FROM
SELECT
count(*)
FROM
(SELECT *, random() FROM (SELECT *, random() FROM distributed) as foo) as bar
JOIN
JOIN
(SELECT *, random() FROM (SELECT *,random() FROM local) as foo2) as bar2
USING(id);
-- TODO: Unnecessary recursive planning for local
SELECT
count(*)
FROM
-- TODO: Unnecessary recursive planning for local
SELECT
count(*)
FROM
(SELECT *, random() FROM (SELECT *, random() FROM distributed LIMIT 1) as foo) as bar
JOIN
JOIN
(SELECT *, random() FROM (SELECT *,random() FROM local) as foo2) as bar2
USING(id);
-- subqueries in WHERE clause
-- is not colocated, and the JOIN inside as well.
-- so should be recursively planned twice
SELECT
count(*)
FROM
distributed
WHERE
id > (SELECT
count(*)
FROM
SELECT
count(*)
FROM
distributed
WHERE
id > (SELECT
count(*)
FROM
(SELECT *, random() FROM (SELECT *, random() FROM distributed) as foo) as bar
JOIN
JOIN
(SELECT *, random() FROM (SELECT *,random() FROM local) as foo2) as bar2
USING(id)
);
-- two distributed tables are co-located and JOINed on distribution
-- key, so should be fine to pushdown
SELECT
count(*)
FROM
-- key, so should be fine to pushdown
SELECT
count(*)
FROM
distributed d_upper
WHERE
(SELECT
WHERE
(SELECT
bar.id
FROM
FROM
(SELECT *, random() FROM (SELECT *, random() FROM distributed WHERE distributed.id = d_upper.id) as foo) as bar
JOIN
JOIN
(SELECT *, random() FROM (SELECT *,random() FROM local) as foo2) as bar2
USING(id)
) IS NOT NULL;
SELECT
count(*)
FROM
SELECT
count(*)
FROM
distributed d_upper
WHERE
(SELECT
WHERE
(SELECT
bar.id
FROM
FROM
(SELECT *, random() FROM (SELECT *, random() FROM distributed WHERE distributed.id = d_upper.id) as foo) as bar
JOIN
JOIN
local as foo
USING(id)
) IS NOT NULL;
SELECT
count(*)
FROM
SELECT
count(*)
FROM
distributed d_upper
WHERE d_upper.id >
(SELECT
WHERE d_upper.id >
(SELECT
bar.id
FROM
FROM
(SELECT *, random() FROM (SELECT *, random() FROM distributed WHERE distributed.id = d_upper.id) as foo) as bar
JOIN
JOIN
local as foo
USING(id)
);
SELECT
count(*)
FROM
SELECT
count(*)
FROM
distributed d_upper
WHERE
(SELECT
WHERE
(SELECT
bar.id
FROM
FROM
(SELECT *, random() FROM (SELECT *, random() FROM distributed WHERE distributed.id = d_upper.id) as foo) as bar
JOIN
JOIN
(SELECT *, random() FROM (SELECT *,random() FROM local WHERE d_upper.id = id) as foo2) as bar2
USING(id)
) IS NOT NULL;
@ -222,15 +222,15 @@ WHERE
-- subqueries in the target list
-- router, should work
select (SELECT local.id) FROM local, distributed WHERE distributed.id = 1 LIMIT 1;
select (SELECT local.id) FROM local, distributed WHERE distributed.id = 1 LIMIT 1;
-- should fail
select (SELECT local.id) FROM local, distributed WHERE distributed.id != 1 LIMIT 1;
select (SELECT local.id) FROM local, distributed WHERE distributed.id != 1 LIMIT 1;
-- currently not supported, but should work with https://github.com/citusdata/citus/pull/4360/files
SELECT
SELECT
name, (SELECT id FROM local WHERE id = e.id)
FROM
FROM
distributed e
ORDER BY 1,2 LIMIT 1;
@ -260,7 +260,7 @@ SELECT count(*) FROM
(SELECT * FROM (SELECT distributed.* FROM local JOIN distributed USING (id)) as ba)
) bar;
select count(DISTINCT id)
select count(DISTINCT id)
FROM
(
(SELECT * FROM (SELECT distributed.* FROM local JOIN distributed USING (id)) as fo)
@ -303,14 +303,14 @@ SELECT COUNT(*) FROM distributed JOIN LATERAL (SELECT * FROM local WHERE local.i
SELECT count(*) FROM distributed CROSS JOIN local;
SELECT count(*) FROM distributed CROSS JOIN local WHERE distributed.id = 1;
SELECT count(*) FROM distributed CROSS JOIN local WHERE distributed.id = 1;
-- w count(*) it works fine as PG ignores the inner tables
SELECT count(*) FROM distributed LEFT JOIN local USING (id);
SELECT count(*) FROM local LEFT JOIN distributed USING (id);
SELECT id, name FROM distributed LEFT JOIN local USING (id) LIMIT 1;
SELECT id, name FROM local LEFT JOIN distributed USING (id) LIMIT 1;
SELECT id, name FROM distributed LEFT JOIN local USING (id) ORDER BY 1 LIMIT 1;
SELECT id, name FROM local LEFT JOIN distributed USING (id) ORDER BY 1 LIMIT 1;
SELECT
foo1.id
@ -326,18 +326,18 @@ SELECT id, name FROM local LEFT JOIN distributed USING (id) LIMIT 1;
(SELECT local.id, local.title FROM local, distributed WHERE local.id = distributed.id ) as foo10,
(SELECT local.id, local.title FROM local, distributed WHERE local.id = distributed.id ) as foo1
WHERE
foo1.id = foo9.id AND
foo1.id = foo8.id AND
foo1.id = foo7.id AND
foo1.id = foo6.id AND
foo1.id = foo5.id AND
foo1.id = foo4.id AND
foo1.id = foo3.id AND
foo1.id = foo2.id AND
foo1.id = foo10.id AND
foo1.id = foo9.id AND
foo1.id = foo8.id AND
foo1.id = foo7.id AND
foo1.id = foo6.id AND
foo1.id = foo5.id AND
foo1.id = foo4.id AND
foo1.id = foo3.id AND
foo1.id = foo2.id AND
foo1.id = foo10.id AND
foo1.id = foo1.id
ORDER BY 1;
ORDER BY 1;
SELECT
foo1.id
FROM
@ -352,7 +352,7 @@ WHERE
foo1.id = foo3.id AND
foo1.id = foo4.id AND
foo1.id = foo5.id
ORDER BY 1;
ORDER BY 1;
SELECT
foo1.id
@ -368,8 +368,37 @@ WHERE
foo1.id = foo3.id AND
foo1.id = foo4.id AND
foo1.id = foo5.id
ORDER BY 1;
ORDER BY 1;
SELECT
count(*)
FROM
distributed
JOIN LATERAL
(SELECT
*
FROM
local
JOIN
distributed d2
ON(true)
WHERE local.id = distributed.id AND d2.id = local.id) as foo
ON (true);
SELECT local.title, local.title FROM local JOIN distributed USING(id) ORDER BY 1,2 LIMIt 1;
SELECT NULL FROM local JOIN distributed USING(id) ORDER BY 1 LIMIt 1;
SELECT distributed.name, distributed.name, local.title, local.title FROM local JOIN distributed USING(id) ORDER BY 1,2,3,4 LIMIT 1;
SELECT
COUNT(*)
FROM
local
JOIN
distributed
USING
(id)
JOIN
(SELECT id, NULL, NULL FROM distributed) foo
USING
(id);
DROP SCHEMA local_dist_join_mixed CASCADE;
DROP SCHEMA local_dist_join_mixed CASCADE;

View File

@ -104,7 +104,7 @@ SET
FROM
postgres_table
WHERE
mv1.key = postgres_table.key;
mv1.key = postgres_table.key;
ROLLBACK;
BEGIN;
@ -115,7 +115,7 @@ SET
FROM
mv1
WHERE
mv1.key = postgres_table.key;
mv1.key = postgres_table.key;
ROLLBACK;
BEGIN;
@ -126,7 +126,7 @@ SET
FROM
mv2
WHERE
mv2.key = postgres_table.key;
mv2.key = postgres_table.key;
ROLLBACK;
-- in case of update/delete we always recursively plan
@ -351,22 +351,22 @@ DELETE FROM
USING
postgres_table
WHERE
mv1.key = postgres_table.key;
mv1.key = postgres_table.key;
DELETE FROM
postgres_table
USING
mv1
WHERE
mv1.key = postgres_table.key;
mv1.key = postgres_table.key;
DELETE FROM
postgres_table
USING
mv2
WHERE
mv2.key = postgres_table.key;
mv2.key = postgres_table.key;
SET client_min_messages to ERROR;
DROP SCHEMA local_dist_join_modifications CASCADE;
DROP SCHEMA local_dist_join_modifications CASCADE;