mirror of https://github.com/citusdata/citus.git
Handle modifications as well
parent
8f8390ed6e
commit
7a4d6b2984
|
@ -78,7 +78,6 @@ static bool DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId,
|
||||||
uint64 *tableSize);
|
uint64 *tableSize);
|
||||||
static List * ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId);
|
static List * ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId);
|
||||||
static void ErrorIfNotSuitableToGetSize(Oid relationId);
|
static void ErrorIfNotSuitableToGetSize(Oid relationId);
|
||||||
static ShardPlacement * ShardPlacementOnGroup(uint64 shardId, int groupId);
|
|
||||||
|
|
||||||
|
|
||||||
/* exports for SQL callable functions */
|
/* exports for SQL callable functions */
|
||||||
|
@ -1295,7 +1294,7 @@ UpdatePartitionShardPlacementStates(ShardPlacement *parentShardPlacement, char s
|
||||||
* of the shard on the given group. If no such placement exists, the function
|
* of the shard on the given group. If no such placement exists, the function
|
||||||
* return NULL.
|
* return NULL.
|
||||||
*/
|
*/
|
||||||
static ShardPlacement *
|
ShardPlacement *
|
||||||
ShardPlacementOnGroup(uint64 shardId, int groupId)
|
ShardPlacementOnGroup(uint64 shardId, int groupId)
|
||||||
{
|
{
|
||||||
List *placementList = ShardPlacementList(shardId);
|
List *placementList = ShardPlacementList(shardId);
|
||||||
|
|
|
@ -512,6 +512,8 @@ IsTidColumn(Node *node)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#include "distributed/recursive_planning.h"
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ModifyPartialQuerySupported implements a subset of what ModifyQuerySupported checks,
|
* ModifyPartialQuerySupported implements a subset of what ModifyQuerySupported checks,
|
||||||
* that subset being what's necessary to check modifying CTEs for.
|
* that subset being what's necessary to check modifying CTEs for.
|
||||||
|
@ -521,7 +523,15 @@ ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery,
|
||||||
Oid *distributedTableIdOutput)
|
Oid *distributedTableIdOutput)
|
||||||
{
|
{
|
||||||
DeferredErrorMessage *deferredError = DeferErrorIfModifyView(queryTree);
|
DeferredErrorMessage *deferredError = DeferErrorIfModifyView(queryTree);
|
||||||
if (deferredError != NULL)
|
if (deferredError != NULL) {
|
||||||
|
return deferredError;
|
||||||
|
}
|
||||||
|
uint32 rangeTableId = 1;
|
||||||
|
CmdType commandType = queryTree->commandType;
|
||||||
|
|
||||||
|
Oid distributedTableId = ModifyQueryResultRelationId(queryTree);
|
||||||
|
*distributedTableIdOutput = distributedTableId;
|
||||||
|
if (ContainsLocalTableDistributedTableJoin(queryTree->rtable))
|
||||||
{
|
{
|
||||||
return deferredError;
|
return deferredError;
|
||||||
}
|
}
|
||||||
|
@ -531,6 +541,18 @@ ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery,
|
||||||
{
|
{
|
||||||
return deferredError;
|
return deferredError;
|
||||||
}
|
}
|
||||||
|
Var *partitionColumn = NULL;
|
||||||
|
|
||||||
|
if (IsCitusTable(distributedTableId))
|
||||||
|
{
|
||||||
|
partitionColumn = PartitionColumn(distributedTableId, rangeTableId);
|
||||||
|
}
|
||||||
|
|
||||||
|
deferredError = DeferErrorIfModifyView(queryTree);
|
||||||
|
if (deferredError != NULL)
|
||||||
|
{
|
||||||
|
return deferredError;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Reject subqueries which are in SELECT or WHERE clause.
|
* Reject subqueries which are in SELECT or WHERE clause.
|
||||||
|
@ -939,9 +961,7 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer
|
||||||
/* for other kinds of relations, check if its distributed */
|
/* for other kinds of relations, check if its distributed */
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
Oid relationId = rangeTableEntry->relid;
|
if (ContainsLocalTableDistributedTableJoin(queryTree->rtable))
|
||||||
|
|
||||||
if (!IsCitusTable(relationId))
|
|
||||||
{
|
{
|
||||||
StringInfo errorMessage = makeStringInfo();
|
StringInfo errorMessage = makeStringInfo();
|
||||||
char *relationName = get_rel_name(rangeTableEntry->relid);
|
char *relationName = get_rel_name(rangeTableEntry->relid);
|
||||||
|
|
|
@ -194,7 +194,7 @@ static RangeTblEntry * MostFilteredRte(PlannerRestrictionContext *
|
||||||
bool localTable);
|
bool localTable);
|
||||||
static void ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry,
|
static void ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry,
|
||||||
List *restrictionList);
|
List *restrictionList);
|
||||||
static bool ContainsLocalTableDistributedTableJoin(List *rangeTableList);
|
static bool AllDataLocallyAccessible(List *rangeTableList);
|
||||||
static void WrapFunctionsInSubqueries(Query *query);
|
static void WrapFunctionsInSubqueries(Query *query);
|
||||||
static void TransformFunctionRTE(RangeTblEntry *rangeTblEntry);
|
static void TransformFunctionRTE(RangeTblEntry *rangeTblEntry);
|
||||||
static bool ShouldTransformRTE(RangeTblEntry *rangeTableEntry);
|
static bool ShouldTransformRTE(RangeTblEntry *rangeTableEntry);
|
||||||
|
@ -1392,10 +1392,14 @@ ConvertLocalTableJoinsToSubqueries(Query *query,
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (AllDataLocallyAccessible(rangeTableList))
|
||||||
{
|
{
|
||||||
/* TODO: if all tables are local, skip */
|
/* recursively planning is overkill, router planner can already handle this */
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
RangeTblEntry *resultRelation = ExtractResultRelationRTE(query);
|
||||||
|
|
||||||
while (ContainsLocalTableDistributedTableJoin(rangeTableList))
|
while (ContainsLocalTableDistributedTableJoin(rangeTableList))
|
||||||
{
|
{
|
||||||
List *localTableRestrictList = NIL;
|
List *localTableRestrictList = NIL;
|
||||||
|
@ -1410,6 +1414,11 @@ ConvertLocalTableJoinsToSubqueries(Query *query,
|
||||||
MostFilteredRte(plannerRestrictionContext, rangeTableList,
|
MostFilteredRte(plannerRestrictionContext, rangeTableList,
|
||||||
&distributedTableRestrictList, !localTable);
|
&distributedTableRestrictList, !localTable);
|
||||||
|
|
||||||
|
elog(DEBUG4, "Local relation with the most number of filters "
|
||||||
|
"on it: \"%s\"", get_rel_name(mostFilteredLocalRte->relid));
|
||||||
|
elog(DEBUG4, "Distributed relation with the most number of filters "
|
||||||
|
"on it: \"%s\"", get_rel_name(mostFilteredDistributedRte->relid));
|
||||||
|
|
||||||
if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PULL_LOCAL)
|
if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PULL_LOCAL)
|
||||||
{
|
{
|
||||||
ReplaceRTERelationWithRteSubquery(mostFilteredLocalRte,
|
ReplaceRTERelationWithRteSubquery(mostFilteredLocalRte,
|
||||||
|
@ -1426,8 +1435,21 @@ ConvertLocalTableJoinsToSubqueries(Query *query,
|
||||||
bool distributedTableHasFilter =
|
bool distributedTableHasFilter =
|
||||||
list_length(distributedTableRestrictList) > 0;
|
list_length(distributedTableRestrictList) > 0;
|
||||||
|
|
||||||
/* TODO: for modifications, either skip or do not plan target table */
|
if (resultRelation && resultRelation->relid == mostFilteredLocalRte->relid &&
|
||||||
|
!mostFilteredLocalRte->inFromCl)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* We cannot recursively plan result relation, we have to
|
||||||
|
* recursively plan the distributed table.
|
||||||
|
*
|
||||||
|
* TODO: A future improvement could be to pick the next most filtered
|
||||||
|
* local relation, if exists.
|
||||||
|
*/
|
||||||
|
ReplaceRTERelationWithRteSubquery(mostFilteredDistributedRte,
|
||||||
|
distributedTableRestrictList);
|
||||||
|
}
|
||||||
|
else if (localTableHasFilter || !distributedTableHasFilter)
|
||||||
|
{
|
||||||
/*
|
/*
|
||||||
* First, favor recursively planning local table when it has a filter.
|
* First, favor recursively planning local table when it has a filter.
|
||||||
* The rationale is that local tables are small, and at least one filter
|
* The rationale is that local tables are small, and at least one filter
|
||||||
|
@ -1444,8 +1466,6 @@ ConvertLocalTableJoinsToSubqueries(Query *query,
|
||||||
* considering the filters on them, we should pick the table with least
|
* considering the filters on them, we should pick the table with least
|
||||||
* tuples. Today, we do not have such an infrastructure.
|
* tuples. Today, we do not have such an infrastructure.
|
||||||
*/
|
*/
|
||||||
if (localTableHasFilter || !distributedTableHasFilter)
|
|
||||||
{
|
|
||||||
ReplaceRTERelationWithRteSubquery(mostFilteredLocalRte,
|
ReplaceRTERelationWithRteSubquery(mostFilteredLocalRte,
|
||||||
localTableRestrictList);
|
localTableRestrictList);
|
||||||
}
|
}
|
||||||
|
@ -1490,7 +1510,12 @@ MostFilteredRte(PlannerRestrictionContext *plannerRestrictionContext,
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (IsCitusTable(rangeTableEntry->relid) && localTable)
|
if (localTable && IsCitusTable(rangeTableEntry->relid))
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!localTable && !IsCitusTable(rangeTableEntry->relid))
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -1549,11 +1574,61 @@ ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, List *restrict
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* AllDataLocallyAccessible return true if all data for the relations in the
|
||||||
|
* rangeTableList is locally accessible.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
AllDataLocallyAccessible(List *rangeTableList)
|
||||||
|
{
|
||||||
|
ListCell *rangeTableCell = NULL;
|
||||||
|
foreach(rangeTableCell, rangeTableList)
|
||||||
|
{
|
||||||
|
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
|
||||||
|
|
||||||
|
/* we're only interested in tables */
|
||||||
|
if (!(rangeTableEntry->rtekind == RTE_RELATION &&
|
||||||
|
rangeTableEntry->relkind == RELKIND_RELATION))
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Oid relationId = rangeTableEntry->relid;
|
||||||
|
|
||||||
|
if (!IsCitusTable(relationId))
|
||||||
|
{
|
||||||
|
/* local tables are locally accessible */
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
List *shardIntervalList = LoadShardIntervalList(relationId);
|
||||||
|
if (list_length(shardIntervalList) > 1)
|
||||||
|
{
|
||||||
|
/* we currently only consider single placement tables */
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
ShardInterval *shardInterval = linitial(shardIntervalList);
|
||||||
|
uint64 shardId = shardInterval->shardId;
|
||||||
|
ShardPlacement *localShardPlacement =
|
||||||
|
ShardPlacementOnGroup(shardId, GetLocalGroupId());
|
||||||
|
if (localShardPlacement == NULL)
|
||||||
|
{
|
||||||
|
/* the table doesn't have a placement on this node */
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ContainsLocalTableDistributedTableJoin returns true if the input range table list
|
* ContainsLocalTableDistributedTableJoin returns true if the input range table list
|
||||||
* contains a direct join between local and distributed tables.
|
* contains a direct join between local and distributed tables.
|
||||||
*/
|
*/
|
||||||
static bool
|
bool
|
||||||
ContainsLocalTableDistributedTableJoin(List *rangeTableList)
|
ContainsLocalTableDistributedTableJoin(List *rangeTableList)
|
||||||
{
|
{
|
||||||
bool containsLocalTable = false;
|
bool containsLocalTable = false;
|
||||||
|
|
|
@ -113,7 +113,12 @@ extern List * AllShardPlacementsOnNodeGroup(int32 groupId);
|
||||||
extern List * GroupShardPlacementsForTableOnGroup(Oid relationId, int32 groupId);
|
extern List * GroupShardPlacementsForTableOnGroup(Oid relationId, int32 groupId);
|
||||||
extern StringInfo GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
|
extern StringInfo GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
|
||||||
char *sizeQuery);
|
char *sizeQuery);
|
||||||
|
<<<<<<< HEAD
|
||||||
extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList);
|
extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList);
|
||||||
|
=======
|
||||||
|
extern ShardPlacement * ShardPlacementOnGroup(uint64 shardId, int groupId);
|
||||||
|
|
||||||
|
>>>>>>> Handle modifications as well
|
||||||
|
|
||||||
/* Function declarations to modify shard and shard placement data */
|
/* Function declarations to modify shard and shard placement data */
|
||||||
extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType,
|
extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType,
|
||||||
|
|
|
@ -45,5 +45,7 @@ extern Query * BuildReadIntermediateResultsArrayQuery(List *targetEntryList,
|
||||||
List *resultIdList,
|
List *resultIdList,
|
||||||
bool useBinaryCopyFormat);
|
bool useBinaryCopyFormat);
|
||||||
extern bool GeneratingSubplans(void);
|
extern bool GeneratingSubplans(void);
|
||||||
|
extern bool ContainsLocalTableDistributedTableJoin(List *rangeTableList);
|
||||||
|
|
||||||
|
|
||||||
#endif /* RECURSIVE_PLANNING_H */
|
#endif /* RECURSIVE_PLANNING_H */
|
||||||
|
|
Loading…
Reference in New Issue