Rename insert-select redistribute code base to generic purpose

pull/6960/head
Teja Mupparti 2023-05-30 16:06:10 -07:00 committed by Teja Mupparti
parent 9961d39d97
commit ff2062e8c3
11 changed files with 311 additions and 249 deletions

View File

@ -182,7 +182,7 @@ CitusBeginScan(CustomScanState *node, EState *estate, int eflags)
node->ss.ps.qual = ExecInitQual(node->ss.ps.plan->qual, (PlanState *) node); node->ss.ps.qual = ExecInitQual(node->ss.ps.plan->qual, (PlanState *) node);
DistributedPlan *distributedPlan = scanState->distributedPlan; DistributedPlan *distributedPlan = scanState->distributedPlan;
if (distributedPlan->insertSelectQuery != NULL) if (distributedPlan->modifyQueryViaCoordinatorOrRepartition != NULL)
{ {
/* /*
* INSERT..SELECT via coordinator or re-partitioning are special because * INSERT..SELECT via coordinator or re-partitioning are special because

View File

@ -30,6 +30,7 @@
#include "distributed/distributed_planner.h" #include "distributed/distributed_planner.h"
#include "distributed/recursive_planning.h" #include "distributed/recursive_planning.h"
#include "distributed/relation_access_tracking.h" #include "distributed/relation_access_tracking.h"
#include "distributed/repartition_executor.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "distributed/shardinterval_utils.h" #include "distributed/shardinterval_utils.h"
#include "distributed/subplan_execution.h" #include "distributed/subplan_execution.h"
@ -55,8 +56,6 @@
bool EnableRepartitionedInsertSelect = true; bool EnableRepartitionedInsertSelect = true;
static List * TwoPhaseInsertSelectTaskList(Oid targetRelationId, Query *insertSelectQuery,
char *resultIdPrefix);
static void ExecutePlanIntoRelation(Oid targetRelationId, List *insertTargetList, static void ExecutePlanIntoRelation(Oid targetRelationId, List *insertTargetList,
PlannedStmt *selectPlan, EState *executorState); PlannedStmt *selectPlan, EState *executorState);
static HTAB * ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId, static HTAB * ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId,
@ -67,11 +66,6 @@ static HTAB * ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId,
static List * BuildColumnNameListFromTargetList(Oid targetRelationId, static List * BuildColumnNameListFromTargetList(Oid targetRelationId,
List *insertTargetList); List *insertTargetList);
static int PartitionColumnIndexFromColumnList(Oid relationId, List *columnNameList); static int PartitionColumnIndexFromColumnList(Oid relationId, List *columnNameList);
static List * RedistributedInsertSelectTaskList(Query *insertSelectQuery,
CitusTableCacheEntry *targetRelation,
List **redistributedResults,
bool useBinaryFormat);
static int PartitionColumnIndex(List *insertTargetList, Var *partitionColumn);
static void WrapTaskListForProjection(List *taskList, List *projectedTargetEntries); static void WrapTaskListForProjection(List *taskList, List *projectedTargetEntries);
@ -89,7 +83,8 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
{ {
EState *executorState = ScanStateGetExecutorState(scanState); EState *executorState = ScanStateGetExecutorState(scanState);
DistributedPlan *distributedPlan = scanState->distributedPlan; DistributedPlan *distributedPlan = scanState->distributedPlan;
Query *insertSelectQuery = copyObject(distributedPlan->insertSelectQuery); Query *insertSelectQuery =
copyObject(distributedPlan->modifyQueryViaCoordinatorOrRepartition);
List *insertTargetList = insertSelectQuery->targetList; List *insertTargetList = insertSelectQuery->targetList;
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery); RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery);
RangeTblEntry *insertRte = ExtractResultRelationRTE(insertSelectQuery); RangeTblEntry *insertRte = ExtractResultRelationRTE(insertSelectQuery);
@ -99,7 +94,8 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
HTAB *shardStateHash = NULL; HTAB *shardStateHash = NULL;
Query *selectQuery = selectRte->subquery; Query *selectQuery = selectRte->subquery;
PlannedStmt *selectPlan = copyObject(distributedPlan->selectPlanForInsertSelect); PlannedStmt *selectPlan =
copyObject(distributedPlan->selectPlanForModifyViaCoordinatorOrRepartition);
/* /*
* If we are dealing with partitioned table, we also need to lock its * If we are dealing with partitioned table, we also need to lock its
@ -111,7 +107,7 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
LockPartitionRelations(targetRelationId, RowExclusiveLock); LockPartitionRelations(targetRelationId, RowExclusiveLock);
} }
if (distributedPlan->insertSelectMethod == INSERT_SELECT_REPARTITION) if (distributedPlan->modifyWithSelectMethod == MODIFY_WITH_SELECT_REPARTITION)
{ {
ereport(DEBUG1, (errmsg("performing repartitioned INSERT ... SELECT"))); ereport(DEBUG1, (errmsg("performing repartitioned INSERT ... SELECT")));
@ -142,9 +138,10 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
CitusTableCacheEntry *targetRelation = CitusTableCacheEntry *targetRelation =
GetCitusTableCacheEntry(targetRelationId); GetCitusTableCacheEntry(targetRelationId);
int partitionColumnIndex = int distributionColumnIndex =
PartitionColumnIndex(insertTargetList, targetRelation->partitionColumn); DistributionColumnIndex(insertTargetList,
if (partitionColumnIndex == -1) targetRelation->partitionColumn);
if (distributionColumnIndex == -1)
{ {
char *relationName = get_rel_name(targetRelationId); char *relationName = get_rel_name(targetRelationId);
Oid schemaOid = get_rel_namespace(targetRelationId); Oid schemaOid = get_rel_namespace(targetRelationId);
@ -158,13 +155,13 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
} }
TargetEntry *selectPartitionTE = list_nth(selectQuery->targetList, TargetEntry *selectPartitionTE = list_nth(selectQuery->targetList,
partitionColumnIndex); distributionColumnIndex);
const char *partitionColumnName = selectPartitionTE->resname ? const char *partitionColumnName = selectPartitionTE->resname ?
selectPartitionTE->resname : "(none)"; selectPartitionTE->resname : "(none)";
ereport(DEBUG2, (errmsg( ereport(DEBUG2, (errmsg(
"partitioning SELECT query by column index %d with name %s", "partitioning SELECT query by column index %d with name %s",
partitionColumnIndex, quote_literal_cstr( distributionColumnIndex, quote_literal_cstr(
partitionColumnName)))); partitionColumnName))));
/* /*
@ -182,7 +179,7 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
List **redistributedResults = RedistributeTaskListResults(distResultPrefix, List **redistributedResults = RedistributeTaskListResults(distResultPrefix,
distSelectTaskList, distSelectTaskList,
partitionColumnIndex, distributionColumnIndex,
targetRelation, targetRelation,
binaryFormat); binaryFormat);
@ -192,7 +189,7 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
* target shard. Create and execute a list of tasks of form * target shard. Create and execute a list of tasks of form
* INSERT INTO ... SELECT * FROM read_intermediate_results(...); * INSERT INTO ... SELECT * FROM read_intermediate_results(...);
*/ */
List *taskList = RedistributedInsertSelectTaskList(insertSelectQuery, List *taskList = GenerateTaskListWithRedistributedResults(insertSelectQuery,
targetRelation, targetRelation,
redistributedResults, redistributedResults,
binaryFormat); binaryFormat);
@ -235,8 +232,9 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
intermediateResultIdPrefix); intermediateResultIdPrefix);
/* generate tasks for the INSERT..SELECT phase */ /* generate tasks for the INSERT..SELECT phase */
List *taskList = TwoPhaseInsertSelectTaskList(targetRelationId, List *taskList =
insertSelectQuery, GenerateTaskListWithColocatedIntermediateResults(
targetRelationId, insertSelectQuery,
intermediateResultIdPrefix); intermediateResultIdPrefix);
/* /*
@ -298,94 +296,6 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
} }
/*
* TwoPhaseInsertSelectTaskList generates a list of tasks for a query that
* inserts into a target relation and selects from a set of co-located
* intermediate results.
*/
static List *
TwoPhaseInsertSelectTaskList(Oid targetRelationId, Query *insertSelectQuery,
char *resultIdPrefix)
{
List *taskList = NIL;
/*
* Make a copy of the INSERT ... SELECT. We'll repeatedly replace the
* subquery of insertResultQuery for different intermediate results and
* then deparse it.
*/
Query *insertResultQuery = copyObject(insertSelectQuery);
RangeTblEntry *insertRte = ExtractResultRelationRTE(insertResultQuery);
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertResultQuery);
CitusTableCacheEntry *targetCacheEntry = GetCitusTableCacheEntry(targetRelationId);
int shardCount = targetCacheEntry->shardIntervalArrayLength;
uint32 taskIdIndex = 1;
uint64 jobId = INVALID_JOB_ID;
for (int shardOffset = 0; shardOffset < shardCount; shardOffset++)
{
ShardInterval *targetShardInterval =
targetCacheEntry->sortedShardIntervalArray[shardOffset];
uint64 shardId = targetShardInterval->shardId;
List *columnAliasList = NIL;
StringInfo queryString = makeStringInfo();
StringInfo resultId = makeStringInfo();
/* during COPY, the shard ID is appended to the result name */
appendStringInfo(resultId, "%s_" UINT64_FORMAT, resultIdPrefix, shardId);
/* generate the query on the intermediate result */
Query *resultSelectQuery = BuildSubPlanResultQuery(insertSelectQuery->targetList,
columnAliasList,
resultId->data);
/* put the intermediate result query in the INSERT..SELECT */
selectRte->subquery = resultSelectQuery;
/* setting an alias simplifies deparsing of RETURNING */
if (insertRte->alias == NULL)
{
Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL);
insertRte->alias = alias;
}
/*
* Generate a query string for the query that inserts into a shard and reads
* from an intermediate result.
*
* Since CTEs have already been converted to intermediate results, they need
* to removed from the query. Otherwise, worker queries include both
* intermediate results and CTEs in the query.
*/
insertResultQuery->cteList = NIL;
deparse_shard_query(insertResultQuery, targetRelationId, shardId, queryString);
ereport(DEBUG2, (errmsg("distributed statement: %s", queryString->data)));
LockShardDistributionMetadata(shardId, ShareLock);
List *insertShardPlacementList = ActiveShardPlacementList(shardId);
RelationShard *relationShard = CitusMakeNode(RelationShard);
relationShard->relationId = targetShardInterval->relationId;
relationShard->shardId = targetShardInterval->shardId;
Task *modifyTask = CreateBasicTask(jobId, taskIdIndex, MODIFY_TASK,
queryString->data);
modifyTask->dependentTaskList = NIL;
modifyTask->anchorShardId = shardId;
modifyTask->taskPlacementList = insertShardPlacementList;
modifyTask->relationShardList = list_make1(relationShard);
modifyTask->replicationModel = targetCacheEntry->replicationModel;
taskList = lappend(taskList, modifyTask);
taskIdIndex++;
}
return taskList;
}
/* /*
* ExecutePlanIntoColocatedIntermediateResults executes the given PlannedStmt * ExecutePlanIntoColocatedIntermediateResults executes the given PlannedStmt
* and inserts tuples into a set of intermediate results that are colocated with * and inserts tuples into a set of intermediate results that are colocated with
@ -529,111 +439,11 @@ IsSupportedRedistributionTarget(Oid targetRelationId)
/* /*
* RedistributedInsertSelectTaskList returns a task list to insert given * DistributionColumnIndex finds the index of given distribution column in the
* redistributedResults into the given target relation.
* redistributedResults[shardIndex] is list of cstrings each of which is
* a result name which should be inserted into
* targetRelation->sortedShardIntervalArray[shardIndex].
*/
static List *
RedistributedInsertSelectTaskList(Query *insertSelectQuery,
CitusTableCacheEntry *targetRelation,
List **redistributedResults,
bool useBinaryFormat)
{
List *taskList = NIL;
/*
* Make a copy of the INSERT ... SELECT. We'll repeatedly replace the
* subquery of insertResultQuery for different intermediate results and
* then deparse it.
*/
Query *insertResultQuery = copyObject(insertSelectQuery);
RangeTblEntry *insertRte = ExtractResultRelationRTE(insertResultQuery);
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertResultQuery);
List *selectTargetList = selectRte->subquery->targetList;
Oid targetRelationId = targetRelation->relationId;
int shardCount = targetRelation->shardIntervalArrayLength;
int shardOffset = 0;
uint32 taskIdIndex = 1;
uint64 jobId = INVALID_JOB_ID;
for (shardOffset = 0; shardOffset < shardCount; shardOffset++)
{
ShardInterval *targetShardInterval =
targetRelation->sortedShardIntervalArray[shardOffset];
List *resultIdList = redistributedResults[targetShardInterval->shardIndex];
uint64 shardId = targetShardInterval->shardId;
StringInfo queryString = makeStringInfo();
/* skip empty tasks */
if (resultIdList == NIL)
{
continue;
}
/* sort result ids for consistent test output */
List *sortedResultIds = SortList(resultIdList, pg_qsort_strcmp);
/* generate the query on the intermediate result */
Query *fragmentSetQuery = BuildReadIntermediateResultsArrayQuery(selectTargetList,
NIL,
sortedResultIds,
useBinaryFormat);
/* put the intermediate result query in the INSERT..SELECT */
selectRte->subquery = fragmentSetQuery;
/* setting an alias simplifies deparsing of RETURNING */
if (insertRte->alias == NULL)
{
Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL);
insertRte->alias = alias;
}
/*
* Generate a query string for the query that inserts into a shard and reads
* from an intermediate result.
*
* Since CTEs have already been converted to intermediate results, they need
* to removed from the query. Otherwise, worker queries include both
* intermediate results and CTEs in the query.
*/
insertResultQuery->cteList = NIL;
deparse_shard_query(insertResultQuery, targetRelationId, shardId, queryString);
ereport(DEBUG2, (errmsg("distributed statement: %s", queryString->data)));
LockShardDistributionMetadata(shardId, ShareLock);
List *insertShardPlacementList = ActiveShardPlacementList(shardId);
RelationShard *relationShard = CitusMakeNode(RelationShard);
relationShard->relationId = targetShardInterval->relationId;
relationShard->shardId = targetShardInterval->shardId;
Task *modifyTask = CreateBasicTask(jobId, taskIdIndex, MODIFY_TASK,
queryString->data);
modifyTask->dependentTaskList = NIL;
modifyTask->anchorShardId = shardId;
modifyTask->taskPlacementList = insertShardPlacementList;
modifyTask->relationShardList = list_make1(relationShard);
modifyTask->replicationModel = targetRelation->replicationModel;
taskList = lappend(taskList, modifyTask);
taskIdIndex++;
}
return taskList;
}
/*
* PartitionColumnIndex finds the index of given partition column in the
* given target list. * given target list.
*/ */
static int int
PartitionColumnIndex(List *insertTargetList, Var *partitionColumn) DistributionColumnIndex(List *insertTargetList, Var *partitionColumn)
{ {
TargetEntry *insertTargetEntry = NULL; TargetEntry *insertTargetEntry = NULL;
int targetEntryIndex = 0; int targetEntryIndex = 0;

View File

@ -47,7 +47,7 @@ JobExecutorType(DistributedPlan *distributedPlan)
{ {
Job *job = distributedPlan->workerJob; Job *job = distributedPlan->workerJob;
if (distributedPlan->insertSelectQuery != NULL) if (distributedPlan->modifyQueryViaCoordinatorOrRepartition != NULL)
{ {
/* /*
* We go through * We go through

View File

@ -0,0 +1,216 @@
/*-------------------------------------------------------------------
*
* repartition_executor.c
*
* Definitions for public functions and types related to repartition
* of select query results.
*
* Copyright (c) Citus Data, Inc.
*-------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "nodes/parsenodes.h"
#include "distributed/intermediate_results.h"
#include "distributed/listutils.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_router_planner.h"
#include "distributed/recursive_planning.h"
#include "distributed/repartition_executor.h"
#include "distributed/resource_lock.h"
/*
* GenerateTaskListWithColocatedIntermediateResults generates a list of tasks
* for a query that inserts into a target relation and selects from a set of
* co-located intermediate results.
*/
List *
GenerateTaskListWithColocatedIntermediateResults(Oid targetRelationId,
Query *
modifyQueryViaCoordinatorOrRepartition,
char *resultIdPrefix)
{
List *taskList = NIL;
/*
* Make a copy of the <MODIFY-SQL> ... SELECT. We'll repeatedly replace
* the subquery of modifyResultQuery for different intermediate results and
* then deparse it.
*/
Query *modifyWithResultQuery = copyObject(modifyQueryViaCoordinatorOrRepartition);
RangeTblEntry *insertRte = ExtractResultRelationRTE(modifyWithResultQuery);
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(modifyWithResultQuery);
CitusTableCacheEntry *targetCacheEntry = GetCitusTableCacheEntry(targetRelationId);
int shardCount = targetCacheEntry->shardIntervalArrayLength;
uint32 taskIdIndex = 1;
uint64 jobId = INVALID_JOB_ID;
for (int shardOffset = 0; shardOffset < shardCount; shardOffset++)
{
ShardInterval *targetShardInterval =
targetCacheEntry->sortedShardIntervalArray[shardOffset];
uint64 shardId = targetShardInterval->shardId;
List *columnAliasList = NIL;
StringInfo queryString = makeStringInfo();
StringInfo resultId = makeStringInfo();
/* during COPY, the shard ID is appended to the result name */
appendStringInfo(resultId, "%s_" UINT64_FORMAT, resultIdPrefix, shardId);
/* generate the query on the intermediate result */
Query *resultSelectQuery = BuildSubPlanResultQuery(
modifyQueryViaCoordinatorOrRepartition->targetList,
columnAliasList,
resultId->data);
/* put the intermediate result query in the INSERT..SELECT */
selectRte->subquery = resultSelectQuery;
/* setting an alias simplifies deparsing of RETURNING */
if (insertRte->alias == NULL)
{
Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL);
insertRte->alias = alias;
}
/*
* Generate a query string for the query that inserts into a shard and reads
* from an intermediate result.
*
* Since CTEs have already been converted to intermediate results, they need
* to removed from the query. Otherwise, worker queries include both
* intermediate results and CTEs in the query.
*/
modifyWithResultQuery->cteList = NIL;
deparse_shard_query(modifyWithResultQuery, targetRelationId, shardId,
queryString);
ereport(DEBUG2, (errmsg("distributed statement: %s", queryString->data)));
LockShardDistributionMetadata(shardId, ShareLock);
List *insertShardPlacementList = ActiveShardPlacementList(shardId);
RelationShard *relationShard = CitusMakeNode(RelationShard);
relationShard->relationId = targetShardInterval->relationId;
relationShard->shardId = targetShardInterval->shardId;
Task *modifyTask = CreateBasicTask(jobId, taskIdIndex, MODIFY_TASK,
queryString->data);
modifyTask->dependentTaskList = NIL;
modifyTask->anchorShardId = shardId;
modifyTask->taskPlacementList = insertShardPlacementList;
modifyTask->relationShardList = list_make1(relationShard);
modifyTask->replicationModel = targetCacheEntry->replicationModel;
taskList = lappend(taskList, modifyTask);
taskIdIndex++;
}
return taskList;
}
/*
* GenerateTaskListWithRedistributedResults returns a task list to insert given
* redistributedResults into the given target relation.
* redistributedResults[shardIndex] is list of cstrings each of which is
* a result name which should be inserted into
* targetRelation->sortedShardIntervalArray[shardIndex].
*/
List *
GenerateTaskListWithRedistributedResults(Query *modifyQueryViaCoordinatorOrRepartition,
CitusTableCacheEntry *targetRelation,
List **redistributedResults, bool
useBinaryFormat)
{
List *taskList = NIL;
/*
* Make a copy of the <MODIFY-SQL> ... SELECT. We'll repeatedly replace
* the subquery of modifyResultQuery for different intermediate results and
* then deparse it.
*/
Query *modifyResultQuery = copyObject(modifyQueryViaCoordinatorOrRepartition);
RangeTblEntry *insertRte = ExtractResultRelationRTE(modifyResultQuery);
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(modifyResultQuery);
List *selectTargetList = selectRte->subquery->targetList;
Oid targetRelationId = targetRelation->relationId;
int shardCount = targetRelation->shardIntervalArrayLength;
int shardOffset = 0;
uint32 taskIdIndex = 1;
uint64 jobId = INVALID_JOB_ID;
for (shardOffset = 0; shardOffset < shardCount; shardOffset++)
{
ShardInterval *targetShardInterval =
targetRelation->sortedShardIntervalArray[shardOffset];
List *resultIdList = redistributedResults[targetShardInterval->shardIndex];
uint64 shardId = targetShardInterval->shardId;
StringInfo queryString = makeStringInfo();
/* skip empty tasks */
if (resultIdList == NIL)
{
continue;
}
/* sort result ids for consistent test output */
List *sortedResultIds = SortList(resultIdList, pg_qsort_strcmp);
/* generate the query on the intermediate result */
Query *fragmentSetQuery = BuildReadIntermediateResultsArrayQuery(selectTargetList,
NIL,
sortedResultIds,
useBinaryFormat);
/* put the intermediate result query in the INSERT..SELECT */
selectRte->subquery = fragmentSetQuery;
/* setting an alias simplifies deparsing of RETURNING */
if (insertRte->alias == NULL)
{
Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL);
insertRte->alias = alias;
}
/*
* Generate a query string for the query that inserts into a shard and reads
* from an intermediate result.
*
* Since CTEs have already been converted to intermediate results, they need
* to removed from the query. Otherwise, worker queries include both
* intermediate results and CTEs in the query.
*/
modifyResultQuery->cteList = NIL;
deparse_shard_query(modifyResultQuery, targetRelationId, shardId, queryString);
ereport(DEBUG2, (errmsg("distributed statement: %s", queryString->data)));
LockShardDistributionMetadata(shardId, ShareLock);
List *insertShardPlacementList = ActiveShardPlacementList(shardId);
RelationShard *relationShard = CitusMakeNode(RelationShard);
relationShard->relationId = targetShardInterval->relationId;
relationShard->shardId = targetShardInterval->shardId;
Task *modifyTask = CreateBasicTask(jobId, taskIdIndex, MODIFY_TASK,
queryString->data);
modifyTask->dependentTaskList = NIL;
modifyTask->anchorShardId = shardId;
modifyTask->taskPlacementList = insertShardPlacementList;
modifyTask->relationShardList = list_make1(relationShard);
modifyTask->replicationModel = targetRelation->replicationModel;
taskList = lappend(taskList, modifyTask);
taskIdIndex++;
}
return taskList;
}

View File

@ -1417,11 +1417,11 @@ CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo bou
Assert(!repartitioned || Assert(!repartitioned ||
!GetRTEListPropertiesForQuery(selectQueryCopy)->hasSingleShardDistTable); !GetRTEListPropertiesForQuery(selectQueryCopy)->hasSingleShardDistTable);
distributedPlan->insertSelectQuery = insertSelectQuery; distributedPlan->modifyQueryViaCoordinatorOrRepartition = insertSelectQuery;
distributedPlan->selectPlanForInsertSelect = selectPlan; distributedPlan->selectPlanForModifyViaCoordinatorOrRepartition = selectPlan;
distributedPlan->insertSelectMethod = repartitioned ? distributedPlan->modifyWithSelectMethod = repartitioned ?
INSERT_SELECT_REPARTITION : MODIFY_WITH_SELECT_REPARTITION :
INSERT_SELECT_VIA_COORDINATOR; MODIFY_WITH_SELECT_VIA_COORDINATOR;
distributedPlan->expectResults = insertSelectQuery->returningList != NIL; distributedPlan->expectResults = insertSelectQuery->returningList != NIL;
distributedPlan->intermediateResultIdPrefix = InsertSelectResultIdPrefix(planId); distributedPlan->intermediateResultIdPrefix = InsertSelectResultIdPrefix(planId);
distributedPlan->targetRelationId = targetRelationId; distributedPlan->targetRelationId = targetRelationId;

View File

@ -69,7 +69,7 @@ FindSubPlanUsages(DistributedPlan *plan)
SUBPLAN_ACCESS_REMOTE); SUBPLAN_ACCESS_REMOTE);
} }
if (plan->insertSelectQuery != NULL) if (plan->modifyQueryViaCoordinatorOrRepartition != NULL)
{ {
/* INSERT..SELECT plans currently do not have a workerJob */ /* INSERT..SELECT plans currently do not have a workerJob */
Assert(plan->workerJob == NULL); Assert(plan->workerJob == NULL);
@ -79,7 +79,8 @@ FindSubPlanUsages(DistributedPlan *plan)
* perform pruning. We therefore require all subplans used in the * perform pruning. We therefore require all subplans used in the
* INSERT..SELECT to be available all nodes. * INSERT..SELECT to be available all nodes.
*/ */
remoteSubPlans = FindSubPlansUsedInNode((Node *) plan->insertSelectQuery, remoteSubPlans =
FindSubPlansUsedInNode((Node *) plan->modifyQueryViaCoordinatorOrRepartition,
SUBPLAN_ACCESS_ANYWHERE); SUBPLAN_ACCESS_ANYWHERE);
} }

View File

@ -234,7 +234,7 @@ NonPushableInsertSelectExplainScan(CustomScanState *node, List *ancestors,
{ {
CitusScanState *scanState = (CitusScanState *) node; CitusScanState *scanState = (CitusScanState *) node;
DistributedPlan *distributedPlan = scanState->distributedPlan; DistributedPlan *distributedPlan = scanState->distributedPlan;
Query *insertSelectQuery = distributedPlan->insertSelectQuery; Query *insertSelectQuery = distributedPlan->modifyQueryViaCoordinatorOrRepartition;
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery); RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery);
/* /*
@ -244,7 +244,8 @@ NonPushableInsertSelectExplainScan(CustomScanState *node, List *ancestors,
*/ */
Query *queryCopy = copyObject(selectRte->subquery); Query *queryCopy = copyObject(selectRte->subquery);
bool repartition = distributedPlan->insertSelectMethod == INSERT_SELECT_REPARTITION; bool repartition = distributedPlan->modifyWithSelectMethod ==
MODIFY_WITH_SELECT_REPARTITION;
if (es->analyze) if (es->analyze)

View File

@ -127,9 +127,9 @@ CopyNodeDistributedPlan(COPYFUNC_ARGS)
COPY_SCALAR_FIELD(queryId); COPY_SCALAR_FIELD(queryId);
COPY_NODE_FIELD(relationIdList); COPY_NODE_FIELD(relationIdList);
COPY_SCALAR_FIELD(targetRelationId); COPY_SCALAR_FIELD(targetRelationId);
COPY_NODE_FIELD(insertSelectQuery); COPY_NODE_FIELD(modifyQueryViaCoordinatorOrRepartition);
COPY_NODE_FIELD(selectPlanForInsertSelect); COPY_NODE_FIELD(selectPlanForModifyViaCoordinatorOrRepartition);
COPY_SCALAR_FIELD(insertSelectMethod); COPY_SCALAR_FIELD(modifyWithSelectMethod);
COPY_STRING_FIELD(intermediateResultIdPrefix); COPY_STRING_FIELD(intermediateResultIdPrefix);
COPY_NODE_FIELD(subPlanList); COPY_NODE_FIELD(subPlanList);

View File

@ -192,7 +192,9 @@ OutDistributedPlan(OUTFUNC_ARGS)
WRITE_UINT64_FIELD(queryId); WRITE_UINT64_FIELD(queryId);
WRITE_NODE_FIELD(relationIdList); WRITE_NODE_FIELD(relationIdList);
WRITE_OID_FIELD(targetRelationId); WRITE_OID_FIELD(targetRelationId);
WRITE_NODE_FIELD(insertSelectQuery); WRITE_NODE_FIELD(modifyQueryViaCoordinatorOrRepartition);
WRITE_NODE_FIELD(selectPlanForModifyViaCoordinatorOrRepartition);
WRITE_ENUM_FIELD(modifyWithSelectMethod, ModifyWithSelectMethod);
WRITE_STRING_FIELD(intermediateResultIdPrefix); WRITE_STRING_FIELD(intermediateResultIdPrefix);
WRITE_NODE_FIELD(subPlanList); WRITE_NODE_FIELD(subPlanList);

View File

@ -361,19 +361,19 @@ typedef struct JoinSequenceNode
/* /*
* InsertSelectMethod represents the method to use for INSERT INTO ... SELECT * ModifyWithSelectMethod represents the method to use for INSERT INTO ... SELECT
* queries. * or MERGE type of queries.
* *
* Note that there is a third method which is not represented here, which is * Note that there is a third method which is not represented here, which is
* pushing down the INSERT INTO ... SELECT to workers. This method is executed * pushing down the MERGE/INSERT INTO ... SELECT to workers. This method is
* similar to other distributed queries and doesn't need a special execution * executed similar to other distributed queries and doesn't need a special
* code, so we don't need to represent it here. * execution code, so we don't need to represent it here.
*/ */
typedef enum InsertSelectMethod typedef enum ModifyWithSelectMethod
{ {
INSERT_SELECT_VIA_COORDINATOR, MODIFY_WITH_SELECT_VIA_COORDINATOR,
INSERT_SELECT_REPARTITION MODIFY_WITH_SELECT_REPARTITION
} InsertSelectMethod; } ModifyWithSelectMethod;
/* /*
@ -412,18 +412,22 @@ typedef struct DistributedPlan
Oid targetRelationId; Oid targetRelationId;
/* /*
* INSERT .. SELECT via the coordinator or repartition */ * Modifications performed using the output of a source query via
Query *insertSelectQuery; * the coordinator or repartition.
PlannedStmt *selectPlanForInsertSelect; */
InsertSelectMethod insertSelectMethod; Query *modifyQueryViaCoordinatorOrRepartition;
PlannedStmt *selectPlanForModifyViaCoordinatorOrRepartition;
ModifyWithSelectMethod modifyWithSelectMethod;
/* /*
* If intermediateResultIdPrefix is non-null, an INSERT ... SELECT * If intermediateResultIdPrefix is non-null, the source query
* via the coordinator is written to a set of intermediate results * results are written to a set of intermediate results named
* named according to <intermediateResultIdPrefix>_<anchorShardId>. * according to <intermediateResultIdPrefix>_<anchorShardId>.
* That way we can run a distributed INSERT ... SELECT with * That way we can run a distributed modification query which
* RETURNING or ON CONFLICT from the intermediate results to the * requires evaluating source query results at the coordinator.
* target relation. * Once results are captured in intermediate files, modification
* is done from the intermediate results into the target relation.
*
*/ */
char *intermediateResultIdPrefix; char *intermediateResultIdPrefix;

View File

@ -0,0 +1,28 @@
/*-------------------------------------------------------------------------
*
* repartition_executor.h
*
* Declarations for public functions and types related to repartition of
* select query results.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef REPARTITION_EXECUTOR_H
#define REPARTITION_EXECUTOR_H
extern int DistributionColumnIndex(List *insertTargetList, Var *partitionColumn);
extern List * GenerateTaskListWithColocatedIntermediateResults(Oid targetRelationId,
Query *
modifyQueryViaCoordinatorOrRepartition,
char *resultIdPrefix);
extern List * GenerateTaskListWithRedistributedResults(
Query *modifyQueryViaCoordinatorOrRepartition,
CitusTableCacheEntry *
targetRelation,
List **redistributedResults,
bool useBinaryFormat);
#endif /* REPARTITION_EXECUTOR_H */