Refactor repartitioning code into generic format

pull/6975/head
Teja Mupparti 2023-06-02 09:53:40 -07:00 committed by Teja Mupparti
parent 1c9e3fabc2
commit f6a516dab5
6 changed files with 82 additions and 76 deletions

View File

@ -419,25 +419,6 @@ PartitionColumnIndexFromColumnList(Oid relationId, List *columnNameList)
}
/*
* IsSupportedRedistributionTarget determines whether re-partitioning into the
* given target relation is supported.
*/
bool
IsSupportedRedistributionTarget(Oid targetRelationId)
{
CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(targetRelationId);
if (!IsCitusTableTypeCacheEntry(tableEntry, HASH_DISTRIBUTED) &&
!IsCitusTableTypeCacheEntry(tableEntry, RANGE_DISTRIBUTED))
{
return false;
}
return true;
}
/*
* DistributionColumnIndex finds the index of given distribution column in the
* given target list.
@ -461,59 +442,6 @@ DistributionColumnIndex(List *insertTargetList, Var *partitionColumn)
}
/*
* IsRedistributablePlan returns true if the given plan is a redistrituable plan.
*/
bool
IsRedistributablePlan(Plan *selectPlan)
{
if (!EnableRepartitionedInsertSelect)
{
return false;
}
/* don't redistribute if query is not distributed or requires merge on coordinator */
if (!IsCitusCustomScan(selectPlan))
{
return false;
}
DistributedPlan *distSelectPlan =
GetDistributedPlan((CustomScan *) selectPlan);
Job *distSelectJob = distSelectPlan->workerJob;
List *distSelectTaskList = distSelectJob->taskList;
/*
* Don't use redistribution if only one task. This is to keep the existing
* behaviour for CTEs that the last step is a read_intermediate_result()
* call. It doesn't hurt much in other cases too.
*/
if (list_length(distSelectTaskList) <= 1)
{
return false;
}
/* don't use redistribution for repartition joins for now */
if (distSelectJob->dependentJobList != NIL)
{
return false;
}
if (distSelectPlan->combineQuery != NULL)
{
Query *combineQuery = (Query *) distSelectPlan->combineQuery;
if (contain_nextval_expression_walker((Node *) combineQuery->targetList, NULL))
{
/* nextval needs to be evaluated on the coordinator */
return false;
}
}
return true;
}
/*
* WrapTaskListForProjection wraps task query string to only select given
* projected columns. It modifies the taskList.

View File

@ -15,6 +15,7 @@
#include "nodes/makefuncs.h"
#include "nodes/parsenodes.h"
#include "distributed/citus_custom_scan.h"
#include "distributed/intermediate_results.h"
#include "distributed/listutils.h"
#include "distributed/multi_physical_planner.h"
@ -24,6 +25,81 @@
#include "distributed/resource_lock.h"
/*
* IsSupportedRedistributionTarget determines whether re-partitioning into the
* given target relation is supported.
*/
bool
IsSupportedRedistributionTarget(Oid targetRelationId)
{
CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(targetRelationId);
if (!IsCitusTableTypeCacheEntry(tableEntry, HASH_DISTRIBUTED) &&
!IsCitusTableTypeCacheEntry(tableEntry, RANGE_DISTRIBUTED))
{
return false;
}
return true;
}
/*
* IsRedistributablePlan returns true if the given plan is a distributable plan.
*/
bool
IsRedistributablePlan(Plan *selectPlan)
{
if (!EnableRepartitionedInsertSelect)
{
return false;
}
/*
* Don't redistribute if query is not distributed or requires
* merge on coordinator
*/
if (!IsCitusCustomScan(selectPlan))
{
return false;
}
DistributedPlan *distSelectPlan =
GetDistributedPlan((CustomScan *) selectPlan);
Job *distSelectJob = distSelectPlan->workerJob;
List *distSelectTaskList = distSelectJob->taskList;
/*
* Don't use redistribution if only one task. This is to keep the existing
* behaviour for CTEs that the last step is a read_intermediate_result()
* call. It doesn't hurt much in other cases too.
*/
if (list_length(distSelectTaskList) <= 1)
{
return false;
}
/* don't use redistribution for repartition joins for now */
if (distSelectJob->dependentJobList != NIL)
{
return false;
}
if (distSelectPlan->combineQuery != NULL)
{
Query *combineQuery = (Query *) distSelectPlan->combineQuery;
if (contain_nextval_expression_walker((Node *) combineQuery->targetList, NULL))
{
/* nextval needs to be evaluated on the coordinator */
return false;
}
}
return true;
}
/*
* GenerateTaskListWithColocatedIntermediateResults generates a list of tasks
* for a query that inserts into a target relation and selects from a set of

View File

@ -31,6 +31,7 @@
#include "distributed/pg_dist_partition.h"
#include "distributed/query_pushdown_planning.h"
#include "distributed/recursive_planning.h"
#include "distributed/repartition_executor.h"
#include "distributed/resource_lock.h"
#include "distributed/version_compat.h"
#include "nodes/makefuncs.h"

View File

@ -44,7 +44,7 @@
#include "distributed/cte_inline.h"
#include "distributed/distributed_deadlock_detection.h"
#include "distributed/errormessage.h"
#include "distributed/insert_select_executor.h"
#include "distributed/repartition_executor.h"
#include "distributed/intermediate_result_pruning.h"
#include "distributed/local_multi_copy.h"
#include "distributed/local_executor.h"

View File

@ -16,11 +16,8 @@
#include "executor/execdesc.h"
extern bool EnableRepartitionedInsertSelect;
extern TupleTableSlot * NonPushableInsertSelectExecScan(CustomScanState *node);
extern bool IsSupportedRedistributionTarget(Oid targetRelationId);
extern bool IsRedistributablePlan(Plan *selectPlan);
#endif /* INSERT_SELECT_EXECUTOR_H */

View File

@ -13,6 +13,8 @@
#ifndef REPARTITION_EXECUTOR_H
#define REPARTITION_EXECUTOR_H
extern bool EnableRepartitionedInsertSelect;
extern int DistributionColumnIndex(List *insertTargetList, Var *partitionColumn);
extern List * GenerateTaskListWithColocatedIntermediateResults(Oid targetRelationId,
Query *
@ -24,5 +26,7 @@ extern List * GenerateTaskListWithRedistributedResults(
targetRelation,
List **redistributedResults,
bool useBinaryFormat);
extern bool IsSupportedRedistributionTarget(Oid targetRelationId);
extern bool IsRedistributablePlan(Plan *selectPlan);
#endif /* REPARTITION_EXECUTOR_H */