diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index 71e66567a..58312ba19 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -419,25 +419,6 @@ PartitionColumnIndexFromColumnList(Oid relationId, List *columnNameList) } -/* - * IsSupportedRedistributionTarget determines whether re-partitioning into the - * given target relation is supported. - */ -bool -IsSupportedRedistributionTarget(Oid targetRelationId) -{ - CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(targetRelationId); - - if (!IsCitusTableTypeCacheEntry(tableEntry, HASH_DISTRIBUTED) && - !IsCitusTableTypeCacheEntry(tableEntry, RANGE_DISTRIBUTED)) - { - return false; - } - - return true; -} - - /* * DistributionColumnIndex finds the index of given distribution column in the * given target list. @@ -461,59 +442,6 @@ DistributionColumnIndex(List *insertTargetList, Var *partitionColumn) } -/* - * IsRedistributablePlan returns true if the given plan is a redistrituable plan. - */ -bool -IsRedistributablePlan(Plan *selectPlan) -{ - if (!EnableRepartitionedInsertSelect) - { - return false; - } - - /* don't redistribute if query is not distributed or requires merge on coordinator */ - if (!IsCitusCustomScan(selectPlan)) - { - return false; - } - - DistributedPlan *distSelectPlan = - GetDistributedPlan((CustomScan *) selectPlan); - Job *distSelectJob = distSelectPlan->workerJob; - List *distSelectTaskList = distSelectJob->taskList; - - /* - * Don't use redistribution if only one task. This is to keep the existing - * behaviour for CTEs that the last step is a read_intermediate_result() - * call. It doesn't hurt much in other cases too. - */ - if (list_length(distSelectTaskList) <= 1) - { - return false; - } - - /* don't use redistribution for repartition joins for now */ - if (distSelectJob->dependentJobList != NIL) - { - return false; - } - - if (distSelectPlan->combineQuery != NULL) - { - Query *combineQuery = (Query *) distSelectPlan->combineQuery; - - if (contain_nextval_expression_walker((Node *) combineQuery->targetList, NULL)) - { - /* nextval needs to be evaluated on the coordinator */ - return false; - } - } - - return true; -} - - /* * WrapTaskListForProjection wraps task query string to only select given * projected columns. It modifies the taskList. diff --git a/src/backend/distributed/executor/repartition_executor.c b/src/backend/distributed/executor/repartition_executor.c index 2d70a1356..b35527b99 100644 --- a/src/backend/distributed/executor/repartition_executor.c +++ b/src/backend/distributed/executor/repartition_executor.c @@ -15,6 +15,7 @@ #include "nodes/makefuncs.h" #include "nodes/parsenodes.h" +#include "distributed/citus_custom_scan.h" #include "distributed/intermediate_results.h" #include "distributed/listutils.h" #include "distributed/multi_physical_planner.h" @@ -24,6 +25,81 @@ #include "distributed/resource_lock.h" +/* + * IsSupportedRedistributionTarget determines whether re-partitioning into the + * given target relation is supported. + */ +bool +IsSupportedRedistributionTarget(Oid targetRelationId) +{ + CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(targetRelationId); + + if (!IsCitusTableTypeCacheEntry(tableEntry, HASH_DISTRIBUTED) && + !IsCitusTableTypeCacheEntry(tableEntry, RANGE_DISTRIBUTED)) + { + return false; + } + + return true; +} + + +/* + * IsRedistributablePlan returns true if the given plan is a distributable plan. + */ +bool +IsRedistributablePlan(Plan *selectPlan) +{ + if (!EnableRepartitionedInsertSelect) + { + return false; + } + + /* + * Don't redistribute if query is not distributed or requires + * merge on coordinator + */ + if (!IsCitusCustomScan(selectPlan)) + { + return false; + } + + DistributedPlan *distSelectPlan = + GetDistributedPlan((CustomScan *) selectPlan); + Job *distSelectJob = distSelectPlan->workerJob; + List *distSelectTaskList = distSelectJob->taskList; + + /* + * Don't use redistribution if only one task. This is to keep the existing + * behaviour for CTEs that the last step is a read_intermediate_result() + * call. It doesn't hurt much in other cases too. + */ + if (list_length(distSelectTaskList) <= 1) + { + return false; + } + + /* don't use redistribution for repartition joins for now */ + if (distSelectJob->dependentJobList != NIL) + { + return false; + } + + if (distSelectPlan->combineQuery != NULL) + { + Query *combineQuery = (Query *) distSelectPlan->combineQuery; + + if (contain_nextval_expression_walker((Node *) combineQuery->targetList, NULL)) + { + /* nextval needs to be evaluated on the coordinator */ + return false; + } + } + + return true; +} + + /* * GenerateTaskListWithColocatedIntermediateResults generates a list of tasks * for a query that inserts into a target relation and selects from a set of diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index c25fb393d..4f24d396c 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -31,6 +31,7 @@ #include "distributed/pg_dist_partition.h" #include "distributed/query_pushdown_planning.h" #include "distributed/recursive_planning.h" +#include "distributed/repartition_executor.h" #include "distributed/resource_lock.h" #include "distributed/version_compat.h" #include "nodes/makefuncs.h" diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 24cd7acd6..8f6485c25 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -44,7 +44,7 @@ #include "distributed/cte_inline.h" #include "distributed/distributed_deadlock_detection.h" #include "distributed/errormessage.h" -#include "distributed/insert_select_executor.h" +#include "distributed/repartition_executor.h" #include "distributed/intermediate_result_pruning.h" #include "distributed/local_multi_copy.h" #include "distributed/local_executor.h" diff --git a/src/include/distributed/insert_select_executor.h b/src/include/distributed/insert_select_executor.h index 6e84b80f2..1b08f5a94 100644 --- a/src/include/distributed/insert_select_executor.h +++ b/src/include/distributed/insert_select_executor.h @@ -16,11 +16,8 @@ #include "executor/execdesc.h" -extern bool EnableRepartitionedInsertSelect; extern TupleTableSlot * NonPushableInsertSelectExecScan(CustomScanState *node); -extern bool IsSupportedRedistributionTarget(Oid targetRelationId); -extern bool IsRedistributablePlan(Plan *selectPlan); #endif /* INSERT_SELECT_EXECUTOR_H */ diff --git a/src/include/distributed/repartition_executor.h b/src/include/distributed/repartition_executor.h index fea6d6525..98173b828 100644 --- a/src/include/distributed/repartition_executor.h +++ b/src/include/distributed/repartition_executor.h @@ -13,6 +13,8 @@ #ifndef REPARTITION_EXECUTOR_H #define REPARTITION_EXECUTOR_H +extern bool EnableRepartitionedInsertSelect; + extern int DistributionColumnIndex(List *insertTargetList, Var *partitionColumn); extern List * GenerateTaskListWithColocatedIntermediateResults(Oid targetRelationId, Query * @@ -24,5 +26,7 @@ extern List * GenerateTaskListWithRedistributedResults( targetRelation, List **redistributedResults, bool useBinaryFormat); +extern bool IsSupportedRedistributionTarget(Oid targetRelationId); +extern bool IsRedistributablePlan(Plan *selectPlan); #endif /* REPARTITION_EXECUTOR_H */