From ff2062e8c36ede4c6e85179cdf7e37f4a1ce92db Mon Sep 17 00:00:00 2001 From: Teja Mupparti Date: Tue, 30 May 2023 16:06:10 -0700 Subject: [PATCH] Rename insert-select redistribute code base to generic purpose --- .../distributed/executor/citus_custom_scan.c | 2 +- .../executor/insert_select_executor.c | 238 ++---------------- .../executor/multi_server_executor.c | 2 +- .../executor/repartition_executor.c | 216 ++++++++++++++++ .../planner/insert_select_planner.c | 10 +- .../planner/intermediate_result_pruning.c | 7 +- .../distributed/planner/multi_explain.c | 5 +- .../distributed/utils/citus_copyfuncs.c | 6 +- .../distributed/utils/citus_outfuncs.c | 4 +- .../distributed/multi_physical_planner.h | 42 ++-- .../distributed/repartition_executor.h | 28 +++ 11 files changed, 311 insertions(+), 249 deletions(-) create mode 100644 src/backend/distributed/executor/repartition_executor.c create mode 100644 src/include/distributed/repartition_executor.h diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index 28486f23d..be04f38f4 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -182,7 +182,7 @@ CitusBeginScan(CustomScanState *node, EState *estate, int eflags) node->ss.ps.qual = ExecInitQual(node->ss.ps.plan->qual, (PlanState *) node); DistributedPlan *distributedPlan = scanState->distributedPlan; - if (distributedPlan->insertSelectQuery != NULL) + if (distributedPlan->modifyQueryViaCoordinatorOrRepartition != NULL) { /* * INSERT..SELECT via coordinator or re-partitioning are special because diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index a69ae0f22..71e66567a 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -30,6 +30,7 @@ #include "distributed/distributed_planner.h" #include "distributed/recursive_planning.h" #include "distributed/relation_access_tracking.h" +#include "distributed/repartition_executor.h" #include "distributed/resource_lock.h" #include "distributed/shardinterval_utils.h" #include "distributed/subplan_execution.h" @@ -55,8 +56,6 @@ bool EnableRepartitionedInsertSelect = true; -static List * TwoPhaseInsertSelectTaskList(Oid targetRelationId, Query *insertSelectQuery, - char *resultIdPrefix); static void ExecutePlanIntoRelation(Oid targetRelationId, List *insertTargetList, PlannedStmt *selectPlan, EState *executorState); static HTAB * ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId, @@ -67,11 +66,6 @@ static HTAB * ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId, static List * BuildColumnNameListFromTargetList(Oid targetRelationId, List *insertTargetList); static int PartitionColumnIndexFromColumnList(Oid relationId, List *columnNameList); -static List * RedistributedInsertSelectTaskList(Query *insertSelectQuery, - CitusTableCacheEntry *targetRelation, - List **redistributedResults, - bool useBinaryFormat); -static int PartitionColumnIndex(List *insertTargetList, Var *partitionColumn); static void WrapTaskListForProjection(List *taskList, List *projectedTargetEntries); @@ -89,7 +83,8 @@ NonPushableInsertSelectExecScan(CustomScanState *node) { EState *executorState = ScanStateGetExecutorState(scanState); DistributedPlan *distributedPlan = scanState->distributedPlan; - Query *insertSelectQuery = copyObject(distributedPlan->insertSelectQuery); + Query *insertSelectQuery = + copyObject(distributedPlan->modifyQueryViaCoordinatorOrRepartition); List *insertTargetList = insertSelectQuery->targetList; RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery); RangeTblEntry *insertRte = ExtractResultRelationRTE(insertSelectQuery); @@ -99,7 +94,8 @@ NonPushableInsertSelectExecScan(CustomScanState *node) HTAB *shardStateHash = NULL; Query *selectQuery = selectRte->subquery; - PlannedStmt *selectPlan = copyObject(distributedPlan->selectPlanForInsertSelect); + PlannedStmt *selectPlan = + copyObject(distributedPlan->selectPlanForModifyViaCoordinatorOrRepartition); /* * If we are dealing with partitioned table, we also need to lock its @@ -111,7 +107,7 @@ NonPushableInsertSelectExecScan(CustomScanState *node) LockPartitionRelations(targetRelationId, RowExclusiveLock); } - if (distributedPlan->insertSelectMethod == INSERT_SELECT_REPARTITION) + if (distributedPlan->modifyWithSelectMethod == MODIFY_WITH_SELECT_REPARTITION) { ereport(DEBUG1, (errmsg("performing repartitioned INSERT ... SELECT"))); @@ -142,9 +138,10 @@ NonPushableInsertSelectExecScan(CustomScanState *node) CitusTableCacheEntry *targetRelation = GetCitusTableCacheEntry(targetRelationId); - int partitionColumnIndex = - PartitionColumnIndex(insertTargetList, targetRelation->partitionColumn); - if (partitionColumnIndex == -1) + int distributionColumnIndex = + DistributionColumnIndex(insertTargetList, + targetRelation->partitionColumn); + if (distributionColumnIndex == -1) { char *relationName = get_rel_name(targetRelationId); Oid schemaOid = get_rel_namespace(targetRelationId); @@ -158,13 +155,13 @@ NonPushableInsertSelectExecScan(CustomScanState *node) } TargetEntry *selectPartitionTE = list_nth(selectQuery->targetList, - partitionColumnIndex); + distributionColumnIndex); const char *partitionColumnName = selectPartitionTE->resname ? selectPartitionTE->resname : "(none)"; ereport(DEBUG2, (errmsg( "partitioning SELECT query by column index %d with name %s", - partitionColumnIndex, quote_literal_cstr( + distributionColumnIndex, quote_literal_cstr( partitionColumnName)))); /* @@ -182,7 +179,7 @@ NonPushableInsertSelectExecScan(CustomScanState *node) List **redistributedResults = RedistributeTaskListResults(distResultPrefix, distSelectTaskList, - partitionColumnIndex, + distributionColumnIndex, targetRelation, binaryFormat); @@ -192,10 +189,10 @@ NonPushableInsertSelectExecScan(CustomScanState *node) * target shard. Create and execute a list of tasks of form * INSERT INTO ... SELECT * FROM read_intermediate_results(...); */ - List *taskList = RedistributedInsertSelectTaskList(insertSelectQuery, - targetRelation, - redistributedResults, - binaryFormat); + List *taskList = GenerateTaskListWithRedistributedResults(insertSelectQuery, + targetRelation, + redistributedResults, + binaryFormat); scanState->tuplestorestate = tuplestore_begin_heap(randomAccess, interTransactions, work_mem); @@ -235,9 +232,10 @@ NonPushableInsertSelectExecScan(CustomScanState *node) intermediateResultIdPrefix); /* generate tasks for the INSERT..SELECT phase */ - List *taskList = TwoPhaseInsertSelectTaskList(targetRelationId, - insertSelectQuery, - intermediateResultIdPrefix); + List *taskList = + GenerateTaskListWithColocatedIntermediateResults( + targetRelationId, insertSelectQuery, + intermediateResultIdPrefix); /* * We cannot actually execute INSERT...SELECT tasks that read from @@ -298,94 +296,6 @@ NonPushableInsertSelectExecScan(CustomScanState *node) } -/* - * TwoPhaseInsertSelectTaskList generates a list of tasks for a query that - * inserts into a target relation and selects from a set of co-located - * intermediate results. - */ -static List * -TwoPhaseInsertSelectTaskList(Oid targetRelationId, Query *insertSelectQuery, - char *resultIdPrefix) -{ - List *taskList = NIL; - - /* - * Make a copy of the INSERT ... SELECT. We'll repeatedly replace the - * subquery of insertResultQuery for different intermediate results and - * then deparse it. - */ - Query *insertResultQuery = copyObject(insertSelectQuery); - RangeTblEntry *insertRte = ExtractResultRelationRTE(insertResultQuery); - RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertResultQuery); - - CitusTableCacheEntry *targetCacheEntry = GetCitusTableCacheEntry(targetRelationId); - int shardCount = targetCacheEntry->shardIntervalArrayLength; - uint32 taskIdIndex = 1; - uint64 jobId = INVALID_JOB_ID; - - for (int shardOffset = 0; shardOffset < shardCount; shardOffset++) - { - ShardInterval *targetShardInterval = - targetCacheEntry->sortedShardIntervalArray[shardOffset]; - uint64 shardId = targetShardInterval->shardId; - List *columnAliasList = NIL; - StringInfo queryString = makeStringInfo(); - StringInfo resultId = makeStringInfo(); - - /* during COPY, the shard ID is appended to the result name */ - appendStringInfo(resultId, "%s_" UINT64_FORMAT, resultIdPrefix, shardId); - - /* generate the query on the intermediate result */ - Query *resultSelectQuery = BuildSubPlanResultQuery(insertSelectQuery->targetList, - columnAliasList, - resultId->data); - - /* put the intermediate result query in the INSERT..SELECT */ - selectRte->subquery = resultSelectQuery; - - /* setting an alias simplifies deparsing of RETURNING */ - if (insertRte->alias == NULL) - { - Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL); - insertRte->alias = alias; - } - - /* - * Generate a query string for the query that inserts into a shard and reads - * from an intermediate result. - * - * Since CTEs have already been converted to intermediate results, they need - * to removed from the query. Otherwise, worker queries include both - * intermediate results and CTEs in the query. - */ - insertResultQuery->cteList = NIL; - deparse_shard_query(insertResultQuery, targetRelationId, shardId, queryString); - ereport(DEBUG2, (errmsg("distributed statement: %s", queryString->data))); - - LockShardDistributionMetadata(shardId, ShareLock); - List *insertShardPlacementList = ActiveShardPlacementList(shardId); - - RelationShard *relationShard = CitusMakeNode(RelationShard); - relationShard->relationId = targetShardInterval->relationId; - relationShard->shardId = targetShardInterval->shardId; - - Task *modifyTask = CreateBasicTask(jobId, taskIdIndex, MODIFY_TASK, - queryString->data); - modifyTask->dependentTaskList = NIL; - modifyTask->anchorShardId = shardId; - modifyTask->taskPlacementList = insertShardPlacementList; - modifyTask->relationShardList = list_make1(relationShard); - modifyTask->replicationModel = targetCacheEntry->replicationModel; - - taskList = lappend(taskList, modifyTask); - - taskIdIndex++; - } - - return taskList; -} - - /* * ExecutePlanIntoColocatedIntermediateResults executes the given PlannedStmt * and inserts tuples into a set of intermediate results that are colocated with @@ -529,111 +439,11 @@ IsSupportedRedistributionTarget(Oid targetRelationId) /* - * RedistributedInsertSelectTaskList returns a task list to insert given - * redistributedResults into the given target relation. - * redistributedResults[shardIndex] is list of cstrings each of which is - * a result name which should be inserted into - * targetRelation->sortedShardIntervalArray[shardIndex]. - */ -static List * -RedistributedInsertSelectTaskList(Query *insertSelectQuery, - CitusTableCacheEntry *targetRelation, - List **redistributedResults, - bool useBinaryFormat) -{ - List *taskList = NIL; - - /* - * Make a copy of the INSERT ... SELECT. We'll repeatedly replace the - * subquery of insertResultQuery for different intermediate results and - * then deparse it. - */ - Query *insertResultQuery = copyObject(insertSelectQuery); - RangeTblEntry *insertRte = ExtractResultRelationRTE(insertResultQuery); - RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertResultQuery); - List *selectTargetList = selectRte->subquery->targetList; - Oid targetRelationId = targetRelation->relationId; - - int shardCount = targetRelation->shardIntervalArrayLength; - int shardOffset = 0; - uint32 taskIdIndex = 1; - uint64 jobId = INVALID_JOB_ID; - - for (shardOffset = 0; shardOffset < shardCount; shardOffset++) - { - ShardInterval *targetShardInterval = - targetRelation->sortedShardIntervalArray[shardOffset]; - List *resultIdList = redistributedResults[targetShardInterval->shardIndex]; - uint64 shardId = targetShardInterval->shardId; - StringInfo queryString = makeStringInfo(); - - /* skip empty tasks */ - if (resultIdList == NIL) - { - continue; - } - - /* sort result ids for consistent test output */ - List *sortedResultIds = SortList(resultIdList, pg_qsort_strcmp); - - /* generate the query on the intermediate result */ - Query *fragmentSetQuery = BuildReadIntermediateResultsArrayQuery(selectTargetList, - NIL, - sortedResultIds, - useBinaryFormat); - - /* put the intermediate result query in the INSERT..SELECT */ - selectRte->subquery = fragmentSetQuery; - - /* setting an alias simplifies deparsing of RETURNING */ - if (insertRte->alias == NULL) - { - Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL); - insertRte->alias = alias; - } - - /* - * Generate a query string for the query that inserts into a shard and reads - * from an intermediate result. - * - * Since CTEs have already been converted to intermediate results, they need - * to removed from the query. Otherwise, worker queries include both - * intermediate results and CTEs in the query. - */ - insertResultQuery->cteList = NIL; - deparse_shard_query(insertResultQuery, targetRelationId, shardId, queryString); - ereport(DEBUG2, (errmsg("distributed statement: %s", queryString->data))); - - LockShardDistributionMetadata(shardId, ShareLock); - List *insertShardPlacementList = ActiveShardPlacementList(shardId); - - RelationShard *relationShard = CitusMakeNode(RelationShard); - relationShard->relationId = targetShardInterval->relationId; - relationShard->shardId = targetShardInterval->shardId; - - Task *modifyTask = CreateBasicTask(jobId, taskIdIndex, MODIFY_TASK, - queryString->data); - modifyTask->dependentTaskList = NIL; - modifyTask->anchorShardId = shardId; - modifyTask->taskPlacementList = insertShardPlacementList; - modifyTask->relationShardList = list_make1(relationShard); - modifyTask->replicationModel = targetRelation->replicationModel; - - taskList = lappend(taskList, modifyTask); - - taskIdIndex++; - } - - return taskList; -} - - -/* - * PartitionColumnIndex finds the index of given partition column in the + * DistributionColumnIndex finds the index of given distribution column in the * given target list. */ -static int -PartitionColumnIndex(List *insertTargetList, Var *partitionColumn) +int +DistributionColumnIndex(List *insertTargetList, Var *partitionColumn) { TargetEntry *insertTargetEntry = NULL; int targetEntryIndex = 0; diff --git a/src/backend/distributed/executor/multi_server_executor.c b/src/backend/distributed/executor/multi_server_executor.c index caf6797da..d92b39bfb 100644 --- a/src/backend/distributed/executor/multi_server_executor.c +++ b/src/backend/distributed/executor/multi_server_executor.c @@ -47,7 +47,7 @@ JobExecutorType(DistributedPlan *distributedPlan) { Job *job = distributedPlan->workerJob; - if (distributedPlan->insertSelectQuery != NULL) + if (distributedPlan->modifyQueryViaCoordinatorOrRepartition != NULL) { /* * We go through diff --git a/src/backend/distributed/executor/repartition_executor.c b/src/backend/distributed/executor/repartition_executor.c new file mode 100644 index 000000000..2d70a1356 --- /dev/null +++ b/src/backend/distributed/executor/repartition_executor.c @@ -0,0 +1,216 @@ +/*------------------------------------------------------------------- + * + * repartition_executor.c + * + * Definitions for public functions and types related to repartition + * of select query results. + * + * Copyright (c) Citus Data, Inc. + *------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "miscadmin.h" + +#include "nodes/makefuncs.h" +#include "nodes/parsenodes.h" + +#include "distributed/intermediate_results.h" +#include "distributed/listutils.h" +#include "distributed/multi_physical_planner.h" +#include "distributed/multi_router_planner.h" +#include "distributed/recursive_planning.h" +#include "distributed/repartition_executor.h" +#include "distributed/resource_lock.h" + + +/* + * GenerateTaskListWithColocatedIntermediateResults generates a list of tasks + * for a query that inserts into a target relation and selects from a set of + * co-located intermediate results. + */ +List * +GenerateTaskListWithColocatedIntermediateResults(Oid targetRelationId, + Query * + modifyQueryViaCoordinatorOrRepartition, + char *resultIdPrefix) +{ + List *taskList = NIL; + + /* + * Make a copy of the ... SELECT. We'll repeatedly replace + * the subquery of modifyResultQuery for different intermediate results and + * then deparse it. + */ + Query *modifyWithResultQuery = copyObject(modifyQueryViaCoordinatorOrRepartition); + RangeTblEntry *insertRte = ExtractResultRelationRTE(modifyWithResultQuery); + RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(modifyWithResultQuery); + + CitusTableCacheEntry *targetCacheEntry = GetCitusTableCacheEntry(targetRelationId); + int shardCount = targetCacheEntry->shardIntervalArrayLength; + uint32 taskIdIndex = 1; + uint64 jobId = INVALID_JOB_ID; + + for (int shardOffset = 0; shardOffset < shardCount; shardOffset++) + { + ShardInterval *targetShardInterval = + targetCacheEntry->sortedShardIntervalArray[shardOffset]; + uint64 shardId = targetShardInterval->shardId; + List *columnAliasList = NIL; + StringInfo queryString = makeStringInfo(); + StringInfo resultId = makeStringInfo(); + + /* during COPY, the shard ID is appended to the result name */ + appendStringInfo(resultId, "%s_" UINT64_FORMAT, resultIdPrefix, shardId); + + /* generate the query on the intermediate result */ + Query *resultSelectQuery = BuildSubPlanResultQuery( + modifyQueryViaCoordinatorOrRepartition->targetList, + columnAliasList, + resultId->data); + + /* put the intermediate result query in the INSERT..SELECT */ + selectRte->subquery = resultSelectQuery; + + /* setting an alias simplifies deparsing of RETURNING */ + if (insertRte->alias == NULL) + { + Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL); + insertRte->alias = alias; + } + + /* + * Generate a query string for the query that inserts into a shard and reads + * from an intermediate result. + * + * Since CTEs have already been converted to intermediate results, they need + * to removed from the query. Otherwise, worker queries include both + * intermediate results and CTEs in the query. + */ + modifyWithResultQuery->cteList = NIL; + deparse_shard_query(modifyWithResultQuery, targetRelationId, shardId, + queryString); + ereport(DEBUG2, (errmsg("distributed statement: %s", queryString->data))); + + LockShardDistributionMetadata(shardId, ShareLock); + List *insertShardPlacementList = ActiveShardPlacementList(shardId); + + RelationShard *relationShard = CitusMakeNode(RelationShard); + relationShard->relationId = targetShardInterval->relationId; + relationShard->shardId = targetShardInterval->shardId; + + Task *modifyTask = CreateBasicTask(jobId, taskIdIndex, MODIFY_TASK, + queryString->data); + modifyTask->dependentTaskList = NIL; + modifyTask->anchorShardId = shardId; + modifyTask->taskPlacementList = insertShardPlacementList; + modifyTask->relationShardList = list_make1(relationShard); + modifyTask->replicationModel = targetCacheEntry->replicationModel; + + taskList = lappend(taskList, modifyTask); + + taskIdIndex++; + } + + return taskList; +} + + +/* + * GenerateTaskListWithRedistributedResults returns a task list to insert given + * redistributedResults into the given target relation. + * redistributedResults[shardIndex] is list of cstrings each of which is + * a result name which should be inserted into + * targetRelation->sortedShardIntervalArray[shardIndex]. + */ +List * +GenerateTaskListWithRedistributedResults(Query *modifyQueryViaCoordinatorOrRepartition, + CitusTableCacheEntry *targetRelation, + List **redistributedResults, bool + useBinaryFormat) +{ + List *taskList = NIL; + + /* + * Make a copy of the ... SELECT. We'll repeatedly replace + * the subquery of modifyResultQuery for different intermediate results and + * then deparse it. + */ + Query *modifyResultQuery = copyObject(modifyQueryViaCoordinatorOrRepartition); + RangeTblEntry *insertRte = ExtractResultRelationRTE(modifyResultQuery); + RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(modifyResultQuery); + List *selectTargetList = selectRte->subquery->targetList; + Oid targetRelationId = targetRelation->relationId; + + int shardCount = targetRelation->shardIntervalArrayLength; + int shardOffset = 0; + uint32 taskIdIndex = 1; + uint64 jobId = INVALID_JOB_ID; + + for (shardOffset = 0; shardOffset < shardCount; shardOffset++) + { + ShardInterval *targetShardInterval = + targetRelation->sortedShardIntervalArray[shardOffset]; + List *resultIdList = redistributedResults[targetShardInterval->shardIndex]; + uint64 shardId = targetShardInterval->shardId; + StringInfo queryString = makeStringInfo(); + + /* skip empty tasks */ + if (resultIdList == NIL) + { + continue; + } + + /* sort result ids for consistent test output */ + List *sortedResultIds = SortList(resultIdList, pg_qsort_strcmp); + + /* generate the query on the intermediate result */ + Query *fragmentSetQuery = BuildReadIntermediateResultsArrayQuery(selectTargetList, + NIL, + sortedResultIds, + useBinaryFormat); + + /* put the intermediate result query in the INSERT..SELECT */ + selectRte->subquery = fragmentSetQuery; + + /* setting an alias simplifies deparsing of RETURNING */ + if (insertRte->alias == NULL) + { + Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL); + insertRte->alias = alias; + } + + /* + * Generate a query string for the query that inserts into a shard and reads + * from an intermediate result. + * + * Since CTEs have already been converted to intermediate results, they need + * to removed from the query. Otherwise, worker queries include both + * intermediate results and CTEs in the query. + */ + modifyResultQuery->cteList = NIL; + deparse_shard_query(modifyResultQuery, targetRelationId, shardId, queryString); + ereport(DEBUG2, (errmsg("distributed statement: %s", queryString->data))); + + LockShardDistributionMetadata(shardId, ShareLock); + List *insertShardPlacementList = ActiveShardPlacementList(shardId); + + RelationShard *relationShard = CitusMakeNode(RelationShard); + relationShard->relationId = targetShardInterval->relationId; + relationShard->shardId = targetShardInterval->shardId; + + Task *modifyTask = CreateBasicTask(jobId, taskIdIndex, MODIFY_TASK, + queryString->data); + modifyTask->dependentTaskList = NIL; + modifyTask->anchorShardId = shardId; + modifyTask->taskPlacementList = insertShardPlacementList; + modifyTask->relationShardList = list_make1(relationShard); + modifyTask->replicationModel = targetRelation->replicationModel; + + taskList = lappend(taskList, modifyTask); + + taskIdIndex++; + } + + return taskList; +} diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index 06e446783..8a58b0f13 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -1417,11 +1417,11 @@ CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo bou Assert(!repartitioned || !GetRTEListPropertiesForQuery(selectQueryCopy)->hasSingleShardDistTable); - distributedPlan->insertSelectQuery = insertSelectQuery; - distributedPlan->selectPlanForInsertSelect = selectPlan; - distributedPlan->insertSelectMethod = repartitioned ? - INSERT_SELECT_REPARTITION : - INSERT_SELECT_VIA_COORDINATOR; + distributedPlan->modifyQueryViaCoordinatorOrRepartition = insertSelectQuery; + distributedPlan->selectPlanForModifyViaCoordinatorOrRepartition = selectPlan; + distributedPlan->modifyWithSelectMethod = repartitioned ? + MODIFY_WITH_SELECT_REPARTITION : + MODIFY_WITH_SELECT_VIA_COORDINATOR; distributedPlan->expectResults = insertSelectQuery->returningList != NIL; distributedPlan->intermediateResultIdPrefix = InsertSelectResultIdPrefix(planId); distributedPlan->targetRelationId = targetRelationId; diff --git a/src/backend/distributed/planner/intermediate_result_pruning.c b/src/backend/distributed/planner/intermediate_result_pruning.c index 76aba8321..cefbfb833 100644 --- a/src/backend/distributed/planner/intermediate_result_pruning.c +++ b/src/backend/distributed/planner/intermediate_result_pruning.c @@ -69,7 +69,7 @@ FindSubPlanUsages(DistributedPlan *plan) SUBPLAN_ACCESS_REMOTE); } - if (plan->insertSelectQuery != NULL) + if (plan->modifyQueryViaCoordinatorOrRepartition != NULL) { /* INSERT..SELECT plans currently do not have a workerJob */ Assert(plan->workerJob == NULL); @@ -79,8 +79,9 @@ FindSubPlanUsages(DistributedPlan *plan) * perform pruning. We therefore require all subplans used in the * INSERT..SELECT to be available all nodes. */ - remoteSubPlans = FindSubPlansUsedInNode((Node *) plan->insertSelectQuery, - SUBPLAN_ACCESS_ANYWHERE); + remoteSubPlans = + FindSubPlansUsedInNode((Node *) plan->modifyQueryViaCoordinatorOrRepartition, + SUBPLAN_ACCESS_ANYWHERE); } /* merge the used subplans */ diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index c23509df1..248117904 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -234,7 +234,7 @@ NonPushableInsertSelectExplainScan(CustomScanState *node, List *ancestors, { CitusScanState *scanState = (CitusScanState *) node; DistributedPlan *distributedPlan = scanState->distributedPlan; - Query *insertSelectQuery = distributedPlan->insertSelectQuery; + Query *insertSelectQuery = distributedPlan->modifyQueryViaCoordinatorOrRepartition; RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery); /* @@ -244,7 +244,8 @@ NonPushableInsertSelectExplainScan(CustomScanState *node, List *ancestors, */ Query *queryCopy = copyObject(selectRte->subquery); - bool repartition = distributedPlan->insertSelectMethod == INSERT_SELECT_REPARTITION; + bool repartition = distributedPlan->modifyWithSelectMethod == + MODIFY_WITH_SELECT_REPARTITION; if (es->analyze) diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index d4e95e16c..7e1379ef3 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -127,9 +127,9 @@ CopyNodeDistributedPlan(COPYFUNC_ARGS) COPY_SCALAR_FIELD(queryId); COPY_NODE_FIELD(relationIdList); COPY_SCALAR_FIELD(targetRelationId); - COPY_NODE_FIELD(insertSelectQuery); - COPY_NODE_FIELD(selectPlanForInsertSelect); - COPY_SCALAR_FIELD(insertSelectMethod); + COPY_NODE_FIELD(modifyQueryViaCoordinatorOrRepartition); + COPY_NODE_FIELD(selectPlanForModifyViaCoordinatorOrRepartition); + COPY_SCALAR_FIELD(modifyWithSelectMethod); COPY_STRING_FIELD(intermediateResultIdPrefix); COPY_NODE_FIELD(subPlanList); diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index b02626233..b4062751a 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -192,7 +192,9 @@ OutDistributedPlan(OUTFUNC_ARGS) WRITE_UINT64_FIELD(queryId); WRITE_NODE_FIELD(relationIdList); WRITE_OID_FIELD(targetRelationId); - WRITE_NODE_FIELD(insertSelectQuery); + WRITE_NODE_FIELD(modifyQueryViaCoordinatorOrRepartition); + WRITE_NODE_FIELD(selectPlanForModifyViaCoordinatorOrRepartition); + WRITE_ENUM_FIELD(modifyWithSelectMethod, ModifyWithSelectMethod); WRITE_STRING_FIELD(intermediateResultIdPrefix); WRITE_NODE_FIELD(subPlanList); diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 26d074053..c457918db 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -361,19 +361,19 @@ typedef struct JoinSequenceNode /* - * InsertSelectMethod represents the method to use for INSERT INTO ... SELECT - * queries. + * ModifyWithSelectMethod represents the method to use for INSERT INTO ... SELECT + * or MERGE type of queries. * * Note that there is a third method which is not represented here, which is - * pushing down the INSERT INTO ... SELECT to workers. This method is executed - * similar to other distributed queries and doesn't need a special execution - * code, so we don't need to represent it here. + * pushing down the MERGE/INSERT INTO ... SELECT to workers. This method is + * executed similar to other distributed queries and doesn't need a special + * execution code, so we don't need to represent it here. */ -typedef enum InsertSelectMethod +typedef enum ModifyWithSelectMethod { - INSERT_SELECT_VIA_COORDINATOR, - INSERT_SELECT_REPARTITION -} InsertSelectMethod; + MODIFY_WITH_SELECT_VIA_COORDINATOR, + MODIFY_WITH_SELECT_REPARTITION +} ModifyWithSelectMethod; /* @@ -412,18 +412,22 @@ typedef struct DistributedPlan Oid targetRelationId; /* - * INSERT .. SELECT via the coordinator or repartition */ - Query *insertSelectQuery; - PlannedStmt *selectPlanForInsertSelect; - InsertSelectMethod insertSelectMethod; + * Modifications performed using the output of a source query via + * the coordinator or repartition. + */ + Query *modifyQueryViaCoordinatorOrRepartition; + PlannedStmt *selectPlanForModifyViaCoordinatorOrRepartition; + ModifyWithSelectMethod modifyWithSelectMethod; /* - * If intermediateResultIdPrefix is non-null, an INSERT ... SELECT - * via the coordinator is written to a set of intermediate results - * named according to _. - * That way we can run a distributed INSERT ... SELECT with - * RETURNING or ON CONFLICT from the intermediate results to the - * target relation. + * If intermediateResultIdPrefix is non-null, the source query + * results are written to a set of intermediate results named + * according to _. + * That way we can run a distributed modification query which + * requires evaluating source query results at the coordinator. + * Once results are captured in intermediate files, modification + * is done from the intermediate results into the target relation. + * */ char *intermediateResultIdPrefix; diff --git a/src/include/distributed/repartition_executor.h b/src/include/distributed/repartition_executor.h new file mode 100644 index 000000000..fea6d6525 --- /dev/null +++ b/src/include/distributed/repartition_executor.h @@ -0,0 +1,28 @@ +/*------------------------------------------------------------------------- + * + * repartition_executor.h + * + * Declarations for public functions and types related to repartition of + * select query results. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef REPARTITION_EXECUTOR_H +#define REPARTITION_EXECUTOR_H + +extern int DistributionColumnIndex(List *insertTargetList, Var *partitionColumn); +extern List * GenerateTaskListWithColocatedIntermediateResults(Oid targetRelationId, + Query * + modifyQueryViaCoordinatorOrRepartition, + char *resultIdPrefix); +extern List * GenerateTaskListWithRedistributedResults( + Query *modifyQueryViaCoordinatorOrRepartition, + CitusTableCacheEntry * + targetRelation, + List **redistributedResults, + bool useBinaryFormat); + +#endif /* REPARTITION_EXECUTOR_H */