Refactor query pushdown related logic

pull/2104/head
velioglu 2018-04-12 18:02:22 +03:00
parent f8fb7a27fb
commit d9fa69c031
9 changed files with 1802 additions and 1731 deletions

View File

@ -47,6 +47,7 @@ OBJS = src/backend/distributed/shared_library_init.o \
src/backend/distributed/planner/multi_master_planner.o \
src/backend/distributed/planner/multi_physical_planner.o \
src/backend/distributed/planner/query_colocation_checker.o \
src/backend/distributed/planner/query_pushdown_planning.o \
src/backend/distributed/planner/multi_router_planner.o \
src/backend/distributed/planner/postgres_planning_functions.o \
src/backend/distributed/planner/recursive_planning.o \

View File

@ -23,6 +23,7 @@
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_router_planner.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/query_pushdown_planning.h"
#include "distributed/resource_lock.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"

File diff suppressed because it is too large Load Diff

View File

@ -42,6 +42,7 @@
#include "distributed/multi_physical_planner.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/query_pushdown_planning.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/shard_pruning.h"
#include "distributed/task_tracker.h"
@ -126,15 +127,18 @@ static ArrayType * SplitPointObject(ShardInterval **shardIntervalArray,
static bool DistributedPlanRouterExecutable(DistributedPlan *distributedPlan);
static Job * BuildJobTreeTaskList(Job *jobTree,
PlannerRestrictionContext *plannerRestrictionContext);
static List * SubquerySqlTaskList(Job *job,
PlannerRestrictionContext *plannerRestrictionContext);
static List * QueryPushdownSqlTaskList(Query *query, uint64 jobId,
RelationRestrictionContext *
relationRestrictionContext,
List *prunedRelationShardList, TaskType taskType);
static void ErrorIfUnsupportedShardDistribution(Query *query);
static Task * QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
RelationRestrictionContext *restrictionContext,
uint32 taskId,
TaskType taskType);
static bool ShardIntervalsEqual(FmgrInfo *comparisonFunction,
ShardInterval *firstInterval,
ShardInterval *secondInterval);
static Task * SubqueryTaskCreate(Query *originalQuery, int shardIndex,
RelationRestrictionContext *restrictionContext,
uint32 taskId);
static List * SqlTaskList(Job *job);
static bool DependsOnHashPartitionJob(Job *job);
static uint32 AnchorRangeTableId(List *rangeTableList);
@ -1994,7 +1998,17 @@ BuildJobTreeTaskList(Job *jobTree, PlannerRestrictionContext *plannerRestriction
/* create sql tasks for the job, and prune redundant data fetch tasks */
if (job->subqueryPushdown)
{
sqlTaskList = SubquerySqlTaskList(job, plannerRestrictionContext);
bool isMultiShardQuery = false;
List *prunedRelationShardList = TargetShardIntervalsForQuery(job->jobQuery,
plannerRestrictionContext
->
relationRestrictionContext,
&
isMultiShardQuery);
sqlTaskList = QueryPushdownSqlTaskList(job->jobQuery, job->jobId,
plannerRestrictionContext->
relationRestrictionContext,
prunedRelationShardList, SQL_TASK);
}
else
{
@ -2045,17 +2059,17 @@ BuildJobTreeTaskList(Job *jobTree, PlannerRestrictionContext *plannerRestriction
/*
* SubquerySqlTaskList creates a list of SQL tasks to execute the given subquery
* QueryPushdownSqlTaskList creates a list of SQL tasks to execute the given subquery
* pushdown job. For this, the it is being checked whether the query is router
* plannable per target shard interval. For those router plannable worker
* queries, we create a SQL task and append the task to the task list that is going
* to be executed.
*/
static List *
SubquerySqlTaskList(Job *job, PlannerRestrictionContext *plannerRestrictionContext)
List *
QueryPushdownSqlTaskList(Query *query, uint64 jobId,
RelationRestrictionContext *relationRestrictionContext,
List *prunedRelationShardList, TaskType taskType)
{
Query *subquery = job->jobQuery;
uint64 jobId = job->jobId;
List *sqlTaskList = NIL;
ListCell *restrictionCell = NULL;
uint32 taskIdIndex = 1; /* 0 is reserved for invalid taskId */
@ -2063,15 +2077,11 @@ SubquerySqlTaskList(Job *job, PlannerRestrictionContext *plannerRestrictionConte
int shardOffset = 0;
int minShardOffset = 0;
int maxShardOffset = 0;
RelationRestrictionContext *relationRestrictionContext =
plannerRestrictionContext->relationRestrictionContext;
bool *taskRequiredForShardIndex = NULL;
List *prunedRelationShardList = NIL;
ListCell *prunedRelationShardCell = NULL;
bool isMultiShardQuery = false;
/* error if shards are not co-partitioned */
ErrorIfUnsupportedShardDistribution(subquery);
ErrorIfUnsupportedShardDistribution(query);
if (list_length(relationRestrictionContext->relationRestrictionList) == 0)
{
@ -2083,10 +2093,6 @@ SubquerySqlTaskList(Job *job, PlannerRestrictionContext *plannerRestrictionConte
minShardOffset = 0;
maxShardOffset = 0;
prunedRelationShardList = TargetShardIntervalsForQuery(subquery,
relationRestrictionContext,
&isMultiShardQuery);
forboth(prunedRelationShardCell, prunedRelationShardList,
restrictionCell, relationRestrictionContext->relationRestrictionList)
{
@ -2160,8 +2166,9 @@ SubquerySqlTaskList(Job *job, PlannerRestrictionContext *plannerRestrictionConte
continue;
}
subqueryTask = SubqueryTaskCreate(subquery, shardOffset,
relationRestrictionContext, taskIdIndex);
subqueryTask = QueryPushdownTaskCreate(query, shardOffset,
relationRestrictionContext, taskIdIndex,
taskType);
subqueryTask->jobId = jobId;
sqlTaskList = lappend(sqlTaskList, subqueryTask);
@ -2259,6 +2266,104 @@ ErrorIfUnsupportedShardDistribution(Query *query)
}
/*
* SubqueryTaskCreate creates a sql task by replacing the target
* shardInterval's boundary value.
*/
static Task *
QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
RelationRestrictionContext *restrictionContext, uint32 taskId,
TaskType taskType)
{
Query *taskQuery = copyObject(originalQuery);
StringInfo queryString = makeStringInfo();
ListCell *restrictionCell = NULL;
Task *subqueryTask = NULL;
List *taskShardList = NIL;
List *relationShardList = NIL;
List *selectPlacementList = NIL;
uint64 jobId = INVALID_JOB_ID;
uint64 anchorShardId = INVALID_SHARD_ID;
/*
* Find the relevant shard out of each relation for this task.
*/
foreach(restrictionCell, restrictionContext->relationRestrictionList)
{
RelationRestriction *relationRestriction =
(RelationRestriction *) lfirst(restrictionCell);
Oid relationId = relationRestriction->relationId;
DistTableCacheEntry *cacheEntry = NULL;
ShardInterval *shardInterval = NULL;
RelationShard *relationShard = NULL;
cacheEntry = DistributedTableCacheEntry(relationId);
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE)
{
/* reference table only has one shard */
shardInterval = cacheEntry->sortedShardIntervalArray[0];
/* only use reference table as anchor shard if none exists yet */
if (anchorShardId == INVALID_SHARD_ID)
{
anchorShardId = shardInterval->shardId;
}
}
else
{
/* use the shard from a specific index */
shardInterval = cacheEntry->sortedShardIntervalArray[shardIndex];
/* use a shard from a distributed table as the anchor shard */
anchorShardId = shardInterval->shardId;
}
taskShardList = lappend(taskShardList, list_make1(shardInterval));
relationShard = CitusMakeNode(RelationShard);
relationShard->relationId = shardInterval->relationId;
relationShard->shardId = shardInterval->shardId;
relationShardList = lappend(relationShardList, relationShard);
}
selectPlacementList = WorkersContainingAllShards(taskShardList);
if (list_length(selectPlacementList) == 0)
{
ereport(ERROR, (errmsg("cannot find a worker that has active placements for all "
"shards in the query")));
}
/*
* Augment the relations in the query with the shard IDs.
*/
UpdateRelationToShardNames((Node *) taskQuery, relationShardList);
/*
* Ands are made implicit during shard pruning, as predicate comparison and
* refutation depend on it being so. We need to make them explicit again so
* that the query string is generated as (...) AND (...) as opposed to
* (...), (...).
*/
taskQuery->jointree->quals =
(Node *) make_ands_explicit((List *) taskQuery->jointree->quals);
/* and generate the full query string */
pg_get_query_def(taskQuery, queryString);
ereport(DEBUG4, (errmsg("distributed statement: %s", queryString->data)));
subqueryTask = CreateBasicTask(jobId, taskId, taskType, queryString->data);
subqueryTask->dependedTaskList = NULL;
subqueryTask->anchorShardId = anchorShardId;
subqueryTask->taskPlacementList = selectPlacementList;
subqueryTask->upsertQuery = false;
subqueryTask->relationShardList = relationShardList;
return subqueryTask;
}
/*
* CoPartitionedTables checks if given two distributed tables have 1-to-1 shard
* placement matching. It first checks for the shard count, if tables don't have
@ -2420,103 +2525,6 @@ ShardIntervalsEqual(FmgrInfo *comparisonFunction, ShardInterval *firstInterval,
}
/*
* SubqueryTaskCreate creates a sql task by replacing the target
* shardInterval's boundary value.
*/
static Task *
SubqueryTaskCreate(Query *originalQuery, int shardIndex,
RelationRestrictionContext *restrictionContext, uint32 taskId)
{
Query *taskQuery = copyObject(originalQuery);
StringInfo queryString = makeStringInfo();
ListCell *restrictionCell = NULL;
Task *subqueryTask = NULL;
List *taskShardList = NIL;
List *relationShardList = NIL;
List *selectPlacementList = NIL;
uint64 jobId = INVALID_JOB_ID;
uint64 anchorShardId = INVALID_SHARD_ID;
/*
* Find the relevant shard out of each relation for this task.
*/
foreach(restrictionCell, restrictionContext->relationRestrictionList)
{
RelationRestriction *relationRestriction =
(RelationRestriction *) lfirst(restrictionCell);
Oid relationId = relationRestriction->relationId;
DistTableCacheEntry *cacheEntry = NULL;
ShardInterval *shardInterval = NULL;
RelationShard *relationShard = NULL;
cacheEntry = DistributedTableCacheEntry(relationId);
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE)
{
/* reference table only has one shard */
shardInterval = cacheEntry->sortedShardIntervalArray[0];
/* only use reference table as anchor shard if none exists yet */
if (anchorShardId == INVALID_SHARD_ID)
{
anchorShardId = shardInterval->shardId;
}
}
else
{
/* use the shard from a specific index */
shardInterval = cacheEntry->sortedShardIntervalArray[shardIndex];
/* use a shard from a distributed table as the anchor shard */
anchorShardId = shardInterval->shardId;
}
taskShardList = lappend(taskShardList, list_make1(shardInterval));
relationShard = CitusMakeNode(RelationShard);
relationShard->relationId = shardInterval->relationId;
relationShard->shardId = shardInterval->shardId;
relationShardList = lappend(relationShardList, relationShard);
}
selectPlacementList = WorkersContainingAllShards(taskShardList);
if (list_length(selectPlacementList) == 0)
{
ereport(ERROR, (errmsg("cannot find a worker that has active placements for all "
"shards in the query")));
}
/*
* Augment the relations in the query with the shard IDs.
*/
UpdateRelationToShardNames((Node *) taskQuery, relationShardList);
/*
* Ands are made implicit during shard pruning, as predicate comparison and
* refutation depend on it being so. We need to make them explicit again so
* that the query string is generated as (...) AND (...) as opposed to
* (...), (...).
*/
taskQuery->jointree->quals =
(Node *) make_ands_explicit((List *) taskQuery->jointree->quals);
/* and generate the full query string */
pg_get_query_def(taskQuery, queryString);
ereport(DEBUG4, (errmsg("distributed statement: %s", queryString->data)));
subqueryTask = CreateBasicTask(jobId, taskId, SQL_TASK, queryString->data);
subqueryTask->dependedTaskList = NULL;
subqueryTask->anchorShardId = anchorShardId;
subqueryTask->taskPlacementList = selectPlacementList;
subqueryTask->upsertQuery = false;
subqueryTask->relationShardList = relationShardList;
return subqueryTask;
}
/*
* SqlTaskList creates a list of SQL tasks to execute the given job. For this,
* the function walks over each range table in the job's range table list, gets

File diff suppressed because it is too large Load Diff

View File

@ -78,6 +78,7 @@
#include "nodes/relation.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "../../../include/distributed/query_pushdown_planning.h"
/*

View File

@ -38,6 +38,7 @@
#include "distributed/multi_utility.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/placement_connection.h"
#include "distributed/query_pushdown_planning.h"
#include "distributed/remote_commands.h"
#include "distributed/shared_library_init.h"
#include "distributed/statistics_collection.h"

View File

@ -181,23 +181,12 @@ typedef struct MultiExtendedOp
} MultiExtendedOp;
/* Config variables managed via guc.c */
extern bool SubqueryPushdown;
/* Function declarations for building logical plans */
extern MultiTreeRoot * MultiLogicalPlanCreate(Query *originalQuery, Query *queryTree,
PlannerRestrictionContext *
plannerRestrictionContext);
extern bool JoinTreeContainsSubquery(Query *query);
extern bool WhereClauseContainsSubquery(Query *query);
extern bool FindNodeCheck(Node *node, bool (*check)(Node *));
extern bool SingleRelationRepartitionSubquery(Query *queryTree);
extern DeferredErrorMessage * DeferErrorIfCannotPushdownSubquery(Query *subqueryTree,
bool
outerMostQueryHasLimit);
extern DeferredErrorMessage * DeferErrorIfUnsupportedUnionQuery(Query *queryTree);
extern bool SafeToPushdownWindowFunction(Query *query, StringInfo *errorDetail);
extern bool TargetListOnPartitionColumn(Query *query, List *targetEntryList);
extern bool FindNodeCheckInRangeTableList(List *rtable, bool (*check)(Node *));
extern bool IsDistributedTableRTE(Node *node);
@ -227,6 +216,12 @@ extern bool ExtractRangeTableEntryWalker(Node *node, List **rangeTableList);
extern List * pull_var_clause_default(Node *node);
extern bool OperatorImplementsEquality(Oid opno);
extern bool FindNodeCheck(Node *node, bool (*check)(Node *));
extern DeferredErrorMessage * DeferErrorIfUnsupportedClause(List *clauseList);
extern MultiProject * MultiProjectNode(List *targetEntryList);
extern MultiExtendedOp * MultiExtendedOpNode(Query *queryTree);
extern DeferredErrorMessage * DeferErrorIfUnsupportedSubqueryRepartition(Query *
subqueryTree);
extern MultiNode * MultiNodeTree(Query *queryTree);
#endif /* MULTI_LOGICAL_PLANNER_H */

View File

@ -0,0 +1,44 @@
/*-------------------------------------------------------------------------
* query_pushdown_planning.h
*
* Copyright (c) 2018, Citus Data, Inc.
* Function declarations used in query pushdown logic.
*
*-------------------------------------------------------------------------
*/
#ifndef QUERY_PUSHDOWN_PLANNING
#define QUERY_PUSHDOWN_PLANNING
#include "postgres.h"
#include "distributed/distributed_planner.h"
#include "distributed/multi_logical_planner.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/errormessage.h"
/* Config variables managed via guc.c */
extern bool SubqueryPushdown;
extern bool ShouldUseSubqueryPushDown(Query *originalQuery, Query *rewrittenQuery);
extern bool JoinTreeContainsSubquery(Query *query);
extern bool WhereClauseContainsSubquery(Query *query);
extern bool SafeToPushdownWindowFunction(Query *query, StringInfo *errorDetail);
extern MultiNode * SubqueryMultiNodeTree(Query *originalQuery,
Query *queryTree,
PlannerRestrictionContext *
plannerRestrictionContext);
extern DeferredErrorMessage * DeferErrorIfUnsupportedSubqueryPushdown(Query *
originalQuery,
PlannerRestrictionContext
*
plannerRestrictionContext);
extern DeferredErrorMessage * DeferErrorIfCannotPushdownSubquery(Query *subqueryTree,
bool
outerMostQueryHasLimit);
extern DeferredErrorMessage * DeferErrorIfUnsupportedUnionQuery(Query *queryTree);
#endif /* QUERY_PUSHDOWN_PLANNING_H */