Refactor some of the planning code to accomodate a new planning path for MERGE SQL

pull/6791/head
Teja Mupparti 2023-03-21 14:45:03 -07:00 committed by Teja Mupparti
parent e1f1d63050
commit da7db53c87
3 changed files with 135 additions and 42 deletions

View File

@ -34,6 +34,7 @@
#include "distributed/intermediate_results.h"
#include "distributed/listutils.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/merge_planner.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_executor.h"
#include "distributed/distributed_planner.h"
@ -68,6 +69,17 @@
#include "utils/syscache.h"
/* RouterPlanType is used to determine the router plan to invoke */
typedef enum RouterPlanType
{
INSERT_SELECT_INTO_CITUS_TABLE,
INSERT_SELECT_INTO_LOCAL_TABLE,
DML_QUERY,
SELECT_QUERY,
MERGE_QUERY,
REPLAN_WITH_BOUND_PARAMETERS
} RouterPlanType;
static List *plannerRestrictionContextList = NIL;
int MultiTaskQueryLogLevel = CITUS_LOG_LEVEL_OFF; /* multi-task query log level */
static uint64 NextPlanId = 1;
@ -129,6 +141,9 @@ static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext
static RTEListProperties * GetRTEListProperties(List *rangeTableList);
static List * TranslatedVars(PlannerInfo *root, int relationIndex);
static void WarnIfListHasForeignDistributedTable(List *rangeTableList);
static RouterPlanType GetRouterPlanType(Query *query,
Query *originalQuery,
bool hasUnresolvedParams);
/* Distributed planner hook */
@ -881,6 +896,51 @@ TryCreateDistributedPlannedStmt(PlannedStmt *localPlan,
}
/*
* GetRouterPlanType checks the parse tree to return appropriate plan type.
*/
static RouterPlanType
GetRouterPlanType(Query *query, Query *originalQuery, bool hasUnresolvedParams)
{
if (!IsModifyCommand(originalQuery))
{
return SELECT_QUERY;
}
Oid targetRelationId = ModifyQueryResultRelationId(query);
EnsureModificationsCanRunOnRelation(targetRelationId);
EnsurePartitionTableNotReplicated(targetRelationId);
/* Check the type of modification being done */
if (InsertSelectIntoCitusTable(originalQuery))
{
if (hasUnresolvedParams)
{
return REPLAN_WITH_BOUND_PARAMETERS;
}
return INSERT_SELECT_INTO_CITUS_TABLE;
}
else if (InsertSelectIntoLocalTable(originalQuery))
{
if (hasUnresolvedParams)
{
return REPLAN_WITH_BOUND_PARAMETERS;
}
return INSERT_SELECT_INTO_LOCAL_TABLE;
}
else if (IsMergeQuery(originalQuery))
{
return MERGE_QUERY;
}
else
{
return DML_QUERY;
}
}
/*
* CreateDistributedPlan generates a distributed plan for a query.
* It goes through 3 steps:
@ -898,64 +958,71 @@ CreateDistributedPlan(uint64 planId, bool allowRecursivePlanning, Query *origina
DistributedPlan *distributedPlan = NULL;
bool hasCtes = originalQuery->cteList != NIL;
if (IsModifyCommand(originalQuery))
/* Step 1: Try router planner */
RouterPlanType routerPlan = GetRouterPlanType(query, originalQuery,
hasUnresolvedParams);
switch (routerPlan)
{
Oid targetRelationId = ModifyQueryResultRelationId(query);
EnsureModificationsCanRunOnRelation(targetRelationId);
EnsurePartitionTableNotReplicated(targetRelationId);
if (InsertSelectIntoCitusTable(originalQuery))
case INSERT_SELECT_INTO_CITUS_TABLE:
{
if (hasUnresolvedParams)
{
/*
* Unresolved parameters can cause performance regressions in
* INSERT...SELECT when the partition column is a parameter
* because we don't perform any additional pruning in the executor.
*/
return NULL;
}
distributedPlan =
CreateInsertSelectPlan(planId, originalQuery, plannerRestrictionContext,
CreateInsertSelectPlan(planId,
originalQuery,
plannerRestrictionContext,
boundParams);
break;
}
else if (InsertSelectIntoLocalTable(originalQuery))
case INSERT_SELECT_INTO_LOCAL_TABLE:
{
if (hasUnresolvedParams)
{
/*
* Unresolved parameters can cause performance regressions in
* INSERT...SELECT when the partition column is a parameter
* because we don't perform any additional pruning in the executor.
*/
return NULL;
}
distributedPlan =
CreateInsertSelectIntoLocalTablePlan(planId, originalQuery, boundParams,
CreateInsertSelectIntoLocalTablePlan(planId,
originalQuery,
boundParams,
hasUnresolvedParams,
plannerRestrictionContext);
break;
}
else
case DML_QUERY:
{
/* modifications are always routed through the same planner/executor */
distributedPlan =
CreateModifyPlan(originalQuery, query, plannerRestrictionContext);
break;
}
}
else
{
/*
* For select queries we, if router executor is enabled, first try to
* plan the query as a router query. If not supported, otherwise try
* the full blown plan/optimize/physical planning process needed to
* produce distributed query plans.
*/
distributedPlan = CreateRouterPlan(originalQuery, query,
plannerRestrictionContext);
case MERGE_QUERY:
{
distributedPlan =
CreateMergePlan(originalQuery, query, plannerRestrictionContext);
break;
}
case REPLAN_WITH_BOUND_PARAMETERS:
{
/*
* Unresolved parameters can cause performance regressions in
* INSERT...SELECT when the partition column is a parameter
* because we don't perform any additional pruning in the executor.
*/
return NULL;
}
case SELECT_QUERY:
{
/*
* For select queries we, if router executor is enabled, first try to
* plan the query as a router query. If not supported, otherwise try
* the full blown plan/optimize/physical planning process needed to
* produce distributed query plans.
*/
distributedPlan =
CreateRouterPlan(originalQuery, query, plannerRestrictionContext);
break;
}
}
/* the functions above always return a plan, possibly with an error */
@ -996,6 +1063,8 @@ CreateDistributedPlan(uint64 planId, bool allowRecursivePlanning, Query *origina
boundParams);
Assert(originalQuery != NULL);
/* Step 2: Generate subplans for CTEs and complex subqueries */
/*
* Plan subqueries and CTEs that cannot be pushed down by recursively
* calling the planner and return the resulting plans to subPlanList.
@ -1096,6 +1165,8 @@ CreateDistributedPlan(uint64 planId, bool allowRecursivePlanning, Query *origina
query->cteList = NIL;
Assert(originalQuery->cteList == NIL);
/* Step 3: Try Logical planner */
MultiTreeRoot *logicalPlan = MultiLogicalPlanCreate(originalQuery, query,
plannerRestrictionContext);
MultiLogicalPlanOptimize(logicalPlan);

View File

@ -54,6 +54,23 @@ static DeferredErrorMessage * MergeQualAndTargetListFunctionsSupported(Oid
#endif
/*
* CreateMergePlan attempts to create a plan for the given MERGE SQL
* statement. If planning fails ->planningError is set to a description
* of the failure.
*/
DistributedPlan *
CreateMergePlan(Query *originalQuery, Query *query,
PlannerRestrictionContext *plannerRestrictionContext)
{
/*
* For now, this is a place holder until we isolate the merge
* planning into it's own code-path.
*/
return CreateModifyPlan(originalQuery, query, plannerRestrictionContext);
}
/*
* MergeQuerySupported does check for a MERGE command in the query, if it finds
* one, it will verify the below criteria

View File

@ -17,10 +17,15 @@
#include "nodes/parsenodes.h"
#include "distributed/distributed_planner.h"
#include "distributed/errormessage.h"
#include "distributed/multi_physical_planner.h"
extern bool IsMergeAllowedOnRelation(Query *parse, RangeTblEntry *rte);
extern DeferredErrorMessage * MergeQuerySupported(Query *originalQuery,
bool multiShardQuery,
PlannerRestrictionContext *
plannerRestrictionContext);
extern DistributedPlan * CreateMergePlan(Query *originalQuery, Query *query,
PlannerRestrictionContext *
plannerRestrictionContext);
#endif /* MERGE_PLANNER_H */