diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index 24b75b79c..e2742c7d9 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -131,7 +131,7 @@ CitusBeginScan(CustomScanState *node, EState *estate, int eflags) distributedPlan = scanState->distributedPlan; if (distributedPlan->modLevel == ROW_MODIFY_READONLY || - distributedPlan->insertSelectSubquery != NULL) + distributedPlan->insertSelectQuery != NULL) { /* no more action required */ return; diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index 6943b664a..adbac4db1 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -11,6 +11,7 @@ #include "postgres.h" #include "miscadmin.h" +#include "distributed/citus_ruleutils.h" #include "distributed/commands/multi_copy.h" #include "distributed/distributed_execution_locks.h" #include "distributed/insert_select_executor.h" @@ -19,7 +20,9 @@ #include "distributed/multi_executor.h" #include "distributed/multi_partitioning_utils.h" #include "distributed/multi_physical_planner.h" +#include "distributed/multi_router_planner.h" #include "distributed/distributed_planner.h" +#include "distributed/recursive_planning.h" #include "distributed/relation_access_tracking.h" #include "distributed/resource_lock.h" #include "distributed/transaction_management.h" @@ -30,11 +33,13 @@ #include "nodes/parsenodes.h" #include "nodes/plannodes.h" #include "parser/parse_coerce.h" +#include "parser/parse_relation.h" #include "parser/parsetree.h" #include "tcop/pquery.h" #include "tcop/tcopprot.h" #include "utils/lsyscache.h" #include "utils/portal.h" +#include "utils/rel.h" #include "utils/snapmgr.h" @@ -43,6 +48,9 @@ static int insertSelectExecutorLevel = 0; static TupleTableSlot * CoordinatorInsertSelectExecScanInternal(CustomScanState *node); +static Query * WrapSubquery(Query *subquery); +static List * TwoPhaseInsertSelectTaskList(Oid targetRelationId, Query *insertSelectQuery, + char *resultIdPrefix); static void ExecuteSelectIntoRelation(Oid targetRelationId, List *insertTargetList, Query *selectQuery, EState *executorState); static HTAB * ExecuteSelectIntoColocatedIntermediateResults(Oid targetRelationId, @@ -98,10 +106,13 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) { EState *executorState = ScanStateGetExecutorState(scanState); DistributedPlan *distributedPlan = scanState->distributedPlan; - Query *selectQuery = distributedPlan->insertSelectSubquery; - List *insertTargetList = distributedPlan->insertTargetList; - Oid targetRelationId = distributedPlan->targetRelationId; + Query *insertSelectQuery = copyObject(distributedPlan->insertSelectQuery); + List *insertTargetList = insertSelectQuery->targetList; + RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery); + RangeTblEntry *insertRte = ExtractResultRelationRTE(insertSelectQuery); + Oid targetRelationId = insertRte->relid; char *intermediateResultIdPrefix = distributedPlan->intermediateResultIdPrefix; + bool hasReturning = distributedPlan->hasReturning; HTAB *shardStateHash = NULL; ereport(DEBUG1, (errmsg("Collecting INSERT ... SELECT results on coordinator"))); @@ -116,6 +127,12 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) */ DisableLocalExecution(); + /* select query to execute */ + Query *selectQuery = BuildSelectForInsertSelect(insertSelectQuery); + + selectRte->subquery = selectQuery; + ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte); + /* * If we are dealing with partitioned table, we also need to lock its * partitions. Here we only lock targetRelation, we acquire necessary @@ -126,7 +143,7 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) LockPartitionRelations(targetRelationId, RowExclusiveLock); } - if (distributedPlan->workerJob != NULL) + if (insertSelectQuery->onConflict || hasReturning) { /* * If we also have a workerJob that means there is a second step @@ -135,11 +152,8 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) * distributed INSERT...SELECT from a set of intermediate results * to the target relation. */ - Job *workerJob = distributedPlan->workerJob; ListCell *taskCell = NULL; - List *taskList = workerJob->taskList; List *prunedTaskList = NIL; - bool hasReturning = distributedPlan->hasReturning; shardStateHash = ExecuteSelectIntoColocatedIntermediateResults( targetRelationId, @@ -148,6 +162,11 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) executorState, intermediateResultIdPrefix); + /* generate tasks for the INSERT..SELECT phase */ + List *taskList = TwoPhaseInsertSelectTaskList(targetRelationId, + insertSelectQuery, + intermediateResultIdPrefix); + /* * We cannot actually execute INSERT...SELECT tasks that read from * intermediate results that weren't created because no rows were @@ -202,6 +221,221 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) } +/* + * BuildSelectForInsertSelect extracts the SELECT part from an INSERT...SELECT query. + * If the INSERT...SELECT has CTEs then these are added to the resulting SELECT instead. + */ +Query * +BuildSelectForInsertSelect(Query *insertSelectQuery) +{ + RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery); + Query *selectQuery = selectRte->subquery; + + /* + * Wrap the SELECT as a subquery if the INSERT...SELECT has CTEs or the SELECT + * has top-level set operations. + * + * We could simply wrap all queries, but that might create a subquery that is + * not supported by the logical planner. Since the logical planner also does + * not support CTEs and top-level set operations, we can wrap queries containing + * those without breaking anything. + */ + if (list_length(insertSelectQuery->cteList) > 0) + { + selectQuery = WrapSubquery(selectRte->subquery); + + /* copy CTEs from the INSERT ... SELECT statement into outer SELECT */ + selectQuery->cteList = copyObject(insertSelectQuery->cteList); + selectQuery->hasModifyingCTE = insertSelectQuery->hasModifyingCTE; + } + else if (selectQuery->setOperations != NULL) + { + /* top-level set operations confuse the ReorderInsertSelectTargetLists logic */ + selectQuery = WrapSubquery(selectRte->subquery); + } + + return selectQuery; +} + + +/* + * WrapSubquery wraps the given query as a subquery in a newly constructed + * "SELECT * FROM (...subquery...) citus_insert_select_subquery" query. + */ +static Query * +WrapSubquery(Query *subquery) +{ + ParseState *pstate = make_parsestate(NULL); + ListCell *selectTargetCell = NULL; + List *newTargetList = NIL; + + Query *outerQuery = makeNode(Query); + outerQuery->commandType = CMD_SELECT; + + /* create range table entries */ + Alias *selectAlias = makeAlias("citus_insert_select_subquery", NIL); + RangeTblEntry *newRangeTableEntry = addRangeTableEntryForSubquery(pstate, subquery, + selectAlias, false, + true); + outerQuery->rtable = list_make1(newRangeTableEntry); + + /* set the FROM expression to the subquery */ + RangeTblRef *newRangeTableRef = makeNode(RangeTblRef); + newRangeTableRef->rtindex = 1; + outerQuery->jointree = makeFromExpr(list_make1(newRangeTableRef), NULL); + + /* create a target list that matches the SELECT */ + foreach(selectTargetCell, subquery->targetList) + { + TargetEntry *selectTargetEntry = (TargetEntry *) lfirst(selectTargetCell); + + /* exactly 1 entry in FROM */ + int indexInRangeTable = 1; + + if (selectTargetEntry->resjunk) + { + continue; + } + + Var *newSelectVar = makeVar(indexInRangeTable, selectTargetEntry->resno, + exprType((Node *) selectTargetEntry->expr), + exprTypmod((Node *) selectTargetEntry->expr), + exprCollation((Node *) selectTargetEntry->expr), 0); + + TargetEntry *newSelectTargetEntry = makeTargetEntry((Expr *) newSelectVar, + selectTargetEntry->resno, + selectTargetEntry->resname, + selectTargetEntry->resjunk); + + newTargetList = lappend(newTargetList, newSelectTargetEntry); + } + + outerQuery->targetList = newTargetList; + + return outerQuery; +} + + +/* + * TwoPhaseInsertSelectTaskList generates a list of tasks for a query that + * inserts into a target relation and selects from a set of co-located + * intermediate results. + */ +static List * +TwoPhaseInsertSelectTaskList(Oid targetRelationId, Query *insertSelectQuery, + char *resultIdPrefix) +{ + List *taskList = NIL; + + /* + * Make a copy of the INSERT ... SELECT. We'll repeatedly replace the + * subquery of insertResultQuery for different intermediate results and + * then deparse it. + */ + Query *insertResultQuery = copyObject(insertSelectQuery); + RangeTblEntry *insertRte = ExtractResultRelationRTE(insertResultQuery); + RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertResultQuery); + + DistTableCacheEntry *targetCacheEntry = DistributedTableCacheEntry(targetRelationId); + int shardCount = targetCacheEntry->shardIntervalArrayLength; + uint32 taskIdIndex = 1; + uint64 jobId = INVALID_JOB_ID; + + ListCell *targetEntryCell = NULL; + + Relation distributedRelation = heap_open(targetRelationId, RowExclusiveLock); + TupleDesc destTupleDescriptor = RelationGetDescr(distributedRelation); + + /* + * If the type of insert column and target table's column type is + * different from each other. Cast insert column't type to target + * table's column + */ + foreach(targetEntryCell, insertSelectQuery->targetList) + { + TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); + Var *insertColumn = (Var *) targetEntry->expr; + Form_pg_attribute attr = TupleDescAttr(destTupleDescriptor, targetEntry->resno - + 1); + + if (insertColumn->vartype != attr->atttypid) + { + CoerceViaIO *coerceExpr = makeNode(CoerceViaIO); + coerceExpr->arg = (Expr *) copyObject(insertColumn); + coerceExpr->resulttype = attr->atttypid; + coerceExpr->resultcollid = attr->attcollation; + coerceExpr->coerceformat = COERCE_IMPLICIT_CAST; + coerceExpr->location = -1; + + targetEntry->expr = (Expr *) coerceExpr; + } + } + + for (int shardOffset = 0; shardOffset < shardCount; shardOffset++) + { + ShardInterval *targetShardInterval = + targetCacheEntry->sortedShardIntervalArray[shardOffset]; + uint64 shardId = targetShardInterval->shardId; + List *columnAliasList = NIL; + StringInfo queryString = makeStringInfo(); + StringInfo resultId = makeStringInfo(); + + /* during COPY, the shard ID is appended to the result name */ + appendStringInfo(resultId, "%s_" UINT64_FORMAT, resultIdPrefix, shardId); + + /* generate the query on the intermediate result */ + Query *resultSelectQuery = BuildSubPlanResultQuery(insertSelectQuery->targetList, + columnAliasList, + resultId->data); + + /* put the intermediate result query in the INSERT..SELECT */ + selectRte->subquery = resultSelectQuery; + + /* setting an alias simplifies deparsing of RETURNING */ + if (insertRte->alias == NULL) + { + Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL); + insertRte->alias = alias; + } + + /* + * Generate a query string for the query that inserts into a shard and reads + * from an intermediate result. + * + * Since CTEs have already been converted to intermediate results, they need + * to removed from the query. Otherwise, worker queries include both + * intermediate results and CTEs in the query. + */ + insertResultQuery->cteList = NIL; + deparse_shard_query(insertResultQuery, targetRelationId, shardId, queryString); + ereport(DEBUG2, (errmsg("distributed statement: %s", queryString->data))); + + LockShardDistributionMetadata(shardId, ShareLock); + List *insertShardPlacementList = FinalizedShardPlacementList(shardId); + + RelationShard *relationShard = CitusMakeNode(RelationShard); + relationShard->relationId = targetShardInterval->relationId; + relationShard->shardId = targetShardInterval->shardId; + + Task *modifyTask = CreateBasicTask(jobId, taskIdIndex, MODIFY_TASK, + queryString->data); + modifyTask->dependentTaskList = NIL; + modifyTask->anchorShardId = shardId; + modifyTask->taskPlacementList = insertShardPlacementList; + modifyTask->relationShardList = list_make1(relationShard); + modifyTask->replicationModel = targetCacheEntry->replicationModel; + + taskList = lappend(taskList, modifyTask); + + taskIdIndex++; + } + + heap_close(distributedRelation, NoLock); + + return taskList; +} + + /* * ExecuteSelectIntoColocatedIntermediateResults executes the given select query * and inserts tuples into a set of intermediate results that are colocated with diff --git a/src/backend/distributed/executor/multi_server_executor.c b/src/backend/distributed/executor/multi_server_executor.c index 5602ff87a..92f70f7ed 100644 --- a/src/backend/distributed/executor/multi_server_executor.c +++ b/src/backend/distributed/executor/multi_server_executor.c @@ -76,7 +76,7 @@ JobExecutorType(DistributedPlan *distributedPlan) return MULTI_EXECUTOR_ADAPTIVE; } - if (distributedPlan->insertSelectSubquery != NULL) + if (distributedPlan->insertSelectQuery != NULL) { /* * Even if adaptiveExecutorEnabled, we go through diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index 29291b275..1b872b874 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -70,10 +70,7 @@ static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query, selectPartitionColumnTableId); static DistributedPlan * CreateCoordinatorInsertSelectPlan(uint64 planId, Query *parse); static DeferredErrorMessage * CoordinatorInsertSelectSupported(Query *insertSelectQuery); -static Query * WrapSubquery(Query *subquery); static bool CheckInsertSelectQuery(Query *query); -static List * TwoPhaseInsertSelectTaskList(Oid targetRelationId, Query *insertSelectQuery, - char *resultIdPrefix); /* @@ -206,7 +203,7 @@ CreateInsertSelectPlan(uint64 planId, Query *originalQuery, /* * CreateDistributedInsertSelectPlan creates a DistributedPlan for distributed - * INSERT ... SELECT queries which could consists of multiple tasks. + * INSERT ... SELECT queries which could consist of multiple tasks. * * The function never returns NULL, it errors out if cannot create the DistributedPlan. */ @@ -265,9 +262,6 @@ CreateDistributedInsertSelectPlan(Query *originalQuery, taskIdIndex, allDistributionKeysInQueryAreEqual); - /* Planning error gelmisse return et, ustteki fonksiyona */ - /* distributed plan gecir */ - /* add the task if it could be created */ if (modifyTask != NULL) { @@ -276,7 +270,7 @@ CreateDistributedInsertSelectPlan(Query *originalQuery, sqlTaskList = lappend(sqlTaskList, modifyTask); } - ++taskIdIndex; + taskIdIndex++; } /* Create the worker job */ @@ -295,7 +289,7 @@ CreateDistributedInsertSelectPlan(Query *originalQuery, distributedPlan->hasReturning = false; distributedPlan->targetRelationId = targetRelationId; - if (list_length(originalQuery->returningList) > 0) + if (originalQuery->returningList != NIL) { distributedPlan->hasReturning = true; } @@ -1112,7 +1106,6 @@ CreateCoordinatorInsertSelectPlan(uint64 planId, Query *parse) { Query *insertSelectQuery = copyObject(parse); - RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery); RangeTblEntry *insertRte = ExtractResultRelationRTE(insertSelectQuery); Oid targetRelationId = insertRte->relid; @@ -1127,69 +1120,9 @@ CreateCoordinatorInsertSelectPlan(uint64 planId, Query *parse) return distributedPlan; } - Query *selectQuery = selectRte->subquery; - - /* - * Wrap the SELECT as a subquery if the INSERT...SELECT has CTEs or the SELECT - * has top-level set operations. - * - * We could simply wrap all queries, but that might create a subquery that is - * not supported by the logical planner. Since the logical planner also does - * not support CTEs and top-level set operations, we can wrap queries containing - * those without breaking anything. - */ - if (list_length(insertSelectQuery->cteList) > 0) - { - selectQuery = WrapSubquery(selectRte->subquery); - - /* copy CTEs from the INSERT ... SELECT statement into outer SELECT */ - selectQuery->cteList = copyObject(insertSelectQuery->cteList); - selectQuery->hasModifyingCTE = insertSelectQuery->hasModifyingCTE; - } - else if (selectQuery->setOperations != NULL) - { - /* top-level set operations confuse the ReorderInsertSelectTargetLists logic */ - selectQuery = WrapSubquery(selectRte->subquery); - } - - selectRte->subquery = selectQuery; - - ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte); - - if (insertSelectQuery->onConflict || insertSelectQuery->returningList != NIL) - { - /* - * We cannot perform a COPY operation with RETURNING or ON CONFLICT. - * We therefore perform the INSERT...SELECT in two phases. First we - * copy the result of the SELECT query in a set of intermediate - * results, one for each shard placement in the destination table. - * Second, we perform an INSERT..SELECT..ON CONFLICT/RETURNING from - * the intermediate results into the destination table. This is - * represented in the plan by simply having both an - * insertSelectSubuery and a workerJob to execute afterwards. - */ - uint64 jobId = INVALID_JOB_ID; - char *resultIdPrefix = InsertSelectResultIdPrefix(planId); - - /* generate tasks for the INSERT..SELECT phase */ - List *taskList = TwoPhaseInsertSelectTaskList(targetRelationId, insertSelectQuery, - resultIdPrefix); - - Job *workerJob = CitusMakeNode(Job); - workerJob->taskList = taskList; - workerJob->subqueryPushdown = false; - workerJob->dependentJobList = NIL; - workerJob->jobId = jobId; - workerJob->jobQuery = insertSelectQuery; - workerJob->requiresMasterEvaluation = false; - - distributedPlan->workerJob = workerJob; - distributedPlan->hasReturning = insertSelectQuery->returningList != NIL; - distributedPlan->intermediateResultIdPrefix = resultIdPrefix; - } - - distributedPlan->insertSelectSubquery = selectQuery; - distributedPlan->insertTargetList = insertSelectQuery->targetList; + distributedPlan->insertSelectQuery = insertSelectQuery; + distributedPlan->hasReturning = insertSelectQuery->returningList != NIL; + distributedPlan->intermediateResultIdPrefix = InsertSelectResultIdPrefix(planId); distributedPlan->targetRelationId = targetRelationId; return distributedPlan; @@ -1236,184 +1169,6 @@ CoordinatorInsertSelectSupported(Query *insertSelectQuery) } -/* - * WrapSubquery wraps the given query as a subquery in a newly constructed - * "SELECT * FROM (...subquery...) citus_insert_select_subquery" query. - */ -static Query * -WrapSubquery(Query *subquery) -{ - ParseState *pstate = make_parsestate(NULL); - ListCell *selectTargetCell = NULL; - List *newTargetList = NIL; - - Query *outerQuery = makeNode(Query); - outerQuery->commandType = CMD_SELECT; - - /* create range table entries */ - Alias *selectAlias = makeAlias("citus_insert_select_subquery", NIL); - RangeTblEntry *newRangeTableEntry = addRangeTableEntryForSubquery(pstate, subquery, - selectAlias, false, - true); - outerQuery->rtable = list_make1(newRangeTableEntry); - - /* set the FROM expression to the subquery */ - RangeTblRef *newRangeTableRef = makeNode(RangeTblRef); - newRangeTableRef->rtindex = 1; - outerQuery->jointree = makeFromExpr(list_make1(newRangeTableRef), NULL); - - /* create a target list that matches the SELECT */ - foreach(selectTargetCell, subquery->targetList) - { - TargetEntry *selectTargetEntry = (TargetEntry *) lfirst(selectTargetCell); - - /* exactly 1 entry in FROM */ - int indexInRangeTable = 1; - - if (selectTargetEntry->resjunk) - { - continue; - } - - Var *newSelectVar = makeVar(indexInRangeTable, selectTargetEntry->resno, - exprType((Node *) selectTargetEntry->expr), - exprTypmod((Node *) selectTargetEntry->expr), - exprCollation((Node *) selectTargetEntry->expr), 0); - - TargetEntry *newSelectTargetEntry = makeTargetEntry((Expr *) newSelectVar, - selectTargetEntry->resno, - selectTargetEntry->resname, - selectTargetEntry->resjunk); - - newTargetList = lappend(newTargetList, newSelectTargetEntry); - } - - outerQuery->targetList = newTargetList; - - return outerQuery; -} - - -/* - * TwoPhaseInsertSelectTaskList generates a list of tasks for a query that - * inserts into a target relation and selects from a set of co-located - * intermediate results. - */ -static List * -TwoPhaseInsertSelectTaskList(Oid targetRelationId, Query *insertSelectQuery, - char *resultIdPrefix) -{ - List *taskList = NIL; - - /* - * Make a copy of the INSERT ... SELECT. We'll repeatedly replace the - * subquery of insertResultQuery for different intermediate results and - * then deparse it. - */ - Query *insertResultQuery = copyObject(insertSelectQuery); - RangeTblEntry *insertRte = ExtractResultRelationRTE(insertResultQuery); - RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertResultQuery); - - DistTableCacheEntry *targetCacheEntry = DistributedTableCacheEntry(targetRelationId); - int shardCount = targetCacheEntry->shardIntervalArrayLength; - uint32 taskIdIndex = 1; - uint64 jobId = INVALID_JOB_ID; - - ListCell *targetEntryCell = NULL; - - Relation distributedRelation = heap_open(targetRelationId, RowExclusiveLock); - TupleDesc destTupleDescriptor = RelationGetDescr(distributedRelation); - - /* - * If the type of insert column and target table's column type is - * different from each other. Cast insert column't type to target - * table's column - */ - foreach(targetEntryCell, insertSelectQuery->targetList) - { - TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); - Var *insertColumn = (Var *) targetEntry->expr; - Form_pg_attribute attr = TupleDescAttr(destTupleDescriptor, targetEntry->resno - - 1); - - if (insertColumn->vartype != attr->atttypid) - { - CoerceViaIO *coerceExpr = makeNode(CoerceViaIO); - coerceExpr->arg = (Expr *) copyObject(insertColumn); - coerceExpr->resulttype = attr->atttypid; - coerceExpr->resultcollid = attr->attcollation; - coerceExpr->coerceformat = COERCE_IMPLICIT_CAST; - coerceExpr->location = -1; - - targetEntry->expr = (Expr *) coerceExpr; - } - } - - for (int shardOffset = 0; shardOffset < shardCount; shardOffset++) - { - ShardInterval *targetShardInterval = - targetCacheEntry->sortedShardIntervalArray[shardOffset]; - uint64 shardId = targetShardInterval->shardId; - List *columnAliasList = NIL; - StringInfo queryString = makeStringInfo(); - StringInfo resultId = makeStringInfo(); - - /* during COPY, the shard ID is appended to the result name */ - appendStringInfo(resultId, "%s_" UINT64_FORMAT, resultIdPrefix, shardId); - - /* generate the query on the intermediate result */ - Query *resultSelectQuery = BuildSubPlanResultQuery(insertSelectQuery->targetList, - columnAliasList, - resultId->data); - - /* put the intermediate result query in the INSERT..SELECT */ - selectRte->subquery = resultSelectQuery; - - /* setting an alias simplifies deparsing of RETURNING */ - if (insertRte->alias == NULL) - { - Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL); - insertRte->alias = alias; - } - - /* - * Generate a query string for the query that inserts into a shard and reads - * from an intermediate result. - * - * Since CTEs have already been converted to intermediate results, they need - * to removed from the query. Otherwise, worker queries include both - * intermediate results and CTEs in the query. - */ - insertResultQuery->cteList = NIL; - deparse_shard_query(insertResultQuery, targetRelationId, shardId, queryString); - ereport(DEBUG2, (errmsg("distributed statement: %s", queryString->data))); - - LockShardDistributionMetadata(shardId, ShareLock); - List *insertShardPlacementList = FinalizedShardPlacementList(shardId); - - RelationShard *relationShard = CitusMakeNode(RelationShard); - relationShard->relationId = targetShardInterval->relationId; - relationShard->shardId = targetShardInterval->shardId; - - Task *modifyTask = CreateBasicTask(jobId, taskIdIndex, MODIFY_TASK, - queryString->data); - modifyTask->dependentTaskList = NULL; - modifyTask->anchorShardId = shardId; - modifyTask->taskPlacementList = insertShardPlacementList; - modifyTask->relationShardList = list_make1(relationShard); - modifyTask->replicationModel = targetCacheEntry->replicationModel; - - taskList = lappend(taskList, modifyTask); - - taskIdIndex++; - } - - heap_close(distributedRelation, NoLock); - - return taskList; -} - - /* * InsertSelectResultPrefix returns the prefix to use for intermediate * results of an INSERT ... SELECT via the coordinator that runs in two diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index 4c2d938b2..1a7433843 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -23,6 +23,7 @@ #include "distributed/citus_nodefuncs.h" #include "distributed/connection_management.h" #include "distributed/insert_select_planner.h" +#include "distributed/insert_select_executor.h" #include "distributed/listutils.h" #include "distributed/multi_client_executor.h" #include "distributed/multi_executor.h" @@ -136,7 +137,8 @@ CoordinatorInsertSelectExplainScan(CustomScanState *node, List *ancestors, { CitusScanState *scanState = (CitusScanState *) node; DistributedPlan *distributedPlan = scanState->distributedPlan; - Query *query = distributedPlan->insertSelectSubquery; + Query *insertSelectQuery = distributedPlan->insertSelectQuery; + Query *query = BuildSelectForInsertSelect(insertSelectQuery); IntoClause *into = NULL; ParamListInfo params = NULL; char *queryString = NULL; diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index 7594bf1af..1a2083989 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -111,10 +111,8 @@ CopyNodeDistributedPlan(COPYFUNC_ARGS) COPY_NODE_FIELD(masterQuery); COPY_SCALAR_FIELD(queryId); COPY_NODE_FIELD(relationIdList); - - COPY_NODE_FIELD(insertSelectSubquery); - COPY_NODE_FIELD(insertTargetList); COPY_SCALAR_FIELD(targetRelationId); + COPY_NODE_FIELD(insertSelectQuery); COPY_STRING_FIELD(intermediateResultIdPrefix); COPY_NODE_FIELD(subPlanList); diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index fe5ebb535..6cf21078e 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -189,10 +189,8 @@ OutDistributedPlan(OUTFUNC_ARGS) WRITE_NODE_FIELD(masterQuery); WRITE_UINT64_FIELD(queryId); WRITE_NODE_FIELD(relationIdList); - - WRITE_NODE_FIELD(insertSelectSubquery); - WRITE_NODE_FIELD(insertTargetList); WRITE_OID_FIELD(targetRelationId); + WRITE_NODE_FIELD(insertSelectQuery); WRITE_STRING_FIELD(intermediateResultIdPrefix); WRITE_NODE_FIELD(subPlanList); diff --git a/src/backend/distributed/utils/citus_readfuncs.c b/src/backend/distributed/utils/citus_readfuncs.c index 5c638a290..f3ceef7ac 100644 --- a/src/backend/distributed/utils/citus_readfuncs.c +++ b/src/backend/distributed/utils/citus_readfuncs.c @@ -217,10 +217,8 @@ ReadDistributedPlan(READFUNC_ARGS) READ_NODE_FIELD(masterQuery); READ_UINT64_FIELD(queryId); READ_NODE_FIELD(relationIdList); - - READ_NODE_FIELD(insertSelectSubquery); - READ_NODE_FIELD(insertTargetList); READ_OID_FIELD(targetRelationId); + READ_NODE_FIELD(insertSelectQuery); READ_STRING_FIELD(intermediateResultIdPrefix); READ_NODE_FIELD(subPlanList); diff --git a/src/include/distributed/insert_select_executor.h b/src/include/distributed/insert_select_executor.h index f7227324c..eda3d6679 100644 --- a/src/include/distributed/insert_select_executor.h +++ b/src/include/distributed/insert_select_executor.h @@ -19,6 +19,7 @@ extern TupleTableSlot * CoordinatorInsertSelectExecScan(CustomScanState *node); extern bool ExecutingInsertSelect(void); +extern Query * BuildSelectForInsertSelect(Query *insertSelectQuery); #endif /* INSERT_SELECT_EXECUTOR_H */ diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index e51005f12..9862121bb 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -281,15 +281,12 @@ typedef struct DistributedPlan /* which relations are accessed by this distributed plan */ List *relationIdList; - /* SELECT query in an INSERT ... SELECT via the coordinator */ - Query *insertSelectSubquery; - - /* target list of an INSERT ... SELECT via the coordinator */ - List *insertTargetList; - /* target relation of a modification */ Oid targetRelationId; + /* INSERT .. SELECT via the coordinator */ + Query *insertSelectQuery; + /* * If intermediateResultIdPrefix is non-null, an INSERT ... SELECT * via the coordinator is written to a set of intermediate results diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index 5aeb59fdc..9fc6a2c21 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -2218,21 +2218,21 @@ INSERT INTO raw_events_first (user_id, value_1) SELECT s, nextval('insert_select_test_seq') FROM generate_series(1, 5) s ON CONFLICT DO NOTHING; DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: distributed statement: INSERT INTO public.raw_events_first_13300000 AS citus_table_alias (user_id, value_1) SELECT user_id, value_1 FROM read_intermediate_result('insert_select_206_13300000'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, value_1 integer) ON CONFLICT DO NOTHING DEBUG: distributed statement: INSERT INTO public.raw_events_first_13300001 AS citus_table_alias (user_id, value_1) SELECT user_id, value_1 FROM read_intermediate_result('insert_select_206_13300001'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, value_1 integer) ON CONFLICT DO NOTHING DEBUG: distributed statement: INSERT INTO public.raw_events_first_13300002 AS citus_table_alias (user_id, value_1) SELECT user_id, value_1 FROM read_intermediate_result('insert_select_206_13300002'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, value_1 integer) ON CONFLICT DO NOTHING DEBUG: distributed statement: INSERT INTO public.raw_events_first_13300003 AS citus_table_alias (user_id, value_1) SELECT user_id, value_1 FROM read_intermediate_result('insert_select_206_13300003'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, value_1 integer) ON CONFLICT DO NOTHING -DEBUG: Collecting INSERT ... SELECT results on coordinator -- RETURNING is supported INSERT INTO raw_events_first (user_id, value_1) SELECT s, nextval('insert_select_test_seq') FROM generate_series(1, 5) s RETURNING *; DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: distributed statement: INSERT INTO public.raw_events_first_13300000 AS citus_table_alias (user_id, value_1) SELECT user_id, value_1 FROM read_intermediate_result('insert_select_207_13300000'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, value_1 integer) RETURNING citus_table_alias.user_id, citus_table_alias."time", citus_table_alias.value_1, citus_table_alias.value_2, citus_table_alias.value_3, citus_table_alias.value_4 DEBUG: distributed statement: INSERT INTO public.raw_events_first_13300001 AS citus_table_alias (user_id, value_1) SELECT user_id, value_1 FROM read_intermediate_result('insert_select_207_13300001'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, value_1 integer) RETURNING citus_table_alias.user_id, citus_table_alias."time", citus_table_alias.value_1, citus_table_alias.value_2, citus_table_alias.value_3, citus_table_alias.value_4 DEBUG: distributed statement: INSERT INTO public.raw_events_first_13300002 AS citus_table_alias (user_id, value_1) SELECT user_id, value_1 FROM read_intermediate_result('insert_select_207_13300002'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, value_1 integer) RETURNING citus_table_alias.user_id, citus_table_alias."time", citus_table_alias.value_1, citus_table_alias.value_2, citus_table_alias.value_3, citus_table_alias.value_4 DEBUG: distributed statement: INSERT INTO public.raw_events_first_13300003 AS citus_table_alias (user_id, value_1) SELECT user_id, value_1 FROM read_intermediate_result('insert_select_207_13300003'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, value_1 integer) RETURNING citus_table_alias.user_id, citus_table_alias."time", citus_table_alias.value_1, citus_table_alias.value_2, citus_table_alias.value_3, citus_table_alias.value_4 -DEBUG: Collecting INSERT ... SELECT results on coordinator user_id | time | value_1 | value_2 | value_3 | value_4 ---------+------+---------+---------+---------+--------- 1 | | 11 | | |