diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index 537f1a6bd..81d6cef87 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -49,7 +49,7 @@ /* functions for creating custom scan nodes */ static Node * AdaptiveExecutorCreateScan(CustomScan *scan); static Node * TaskTrackerCreateScan(CustomScan *scan); -static Node * CoordinatorInsertSelectCreateScan(CustomScan *scan); +static Node * NonPushableInsertSelectCreateScan(CustomScan *scan); static Node * DelayedErrorCreateScan(CustomScan *scan); /* functions that are common to different scans */ @@ -77,9 +77,9 @@ CustomScanMethods TaskTrackerCustomScanMethods = { TaskTrackerCreateScan }; -CustomScanMethods CoordinatorInsertSelectCustomScanMethods = { +CustomScanMethods NonPushableInsertSelectCustomScanMethods = { "Citus INSERT ... SELECT", - CoordinatorInsertSelectCreateScan + NonPushableInsertSelectCreateScan }; CustomScanMethods DelayedErrorCustomScanMethods = { @@ -109,13 +109,13 @@ static CustomExecMethods TaskTrackerCustomExecMethods = { .ExplainCustomScan = CitusExplainScan }; -static CustomExecMethods CoordinatorInsertSelectCustomExecMethods = { - .CustomName = "CoordinatorInsertSelectScan", +static CustomExecMethods NonPushableInsertSelectCustomExecMethods = { + .CustomName = "NonPushableInsertSelectScan", .BeginCustomScan = CitusBeginScan, - .ExecCustomScan = CoordinatorInsertSelectExecScan, + .ExecCustomScan = NonPushableInsertSelectExecScan, .EndCustomScan = CitusEndScan, .ReScanCustomScan = CitusReScan, - .ExplainCustomScan = CoordinatorInsertSelectExplainScan + .ExplainCustomScan = NonPushableInsertSelectExplainScan }; @@ -133,7 +133,7 @@ IsCitusCustomState(PlanState *planState) CustomScanState *css = castNode(CustomScanState, planState); if (css->methods == &AdaptiveExecutorCustomExecMethods || css->methods == &TaskTrackerCustomExecMethods || - css->methods == &CoordinatorInsertSelectCustomExecMethods) + css->methods == &NonPushableInsertSelectCustomExecMethods) { return true; } @@ -150,7 +150,7 @@ RegisterCitusCustomScanMethods(void) { RegisterCustomScanMethods(&AdaptiveExecutorCustomScanMethods); RegisterCustomScanMethods(&TaskTrackerCustomScanMethods); - RegisterCustomScanMethods(&CoordinatorInsertSelectCustomScanMethods); + RegisterCustomScanMethods(&NonPushableInsertSelectCustomScanMethods); RegisterCustomScanMethods(&DelayedErrorCustomScanMethods); } @@ -598,20 +598,20 @@ TaskTrackerCreateScan(CustomScan *scan) /* - * CoordinatorInsertSelectCrateScan creates the scan state for executing + * NonPushableInsertSelectCrateScan creates the scan state for executing * INSERT..SELECT into a distributed table via the coordinator. */ static Node * -CoordinatorInsertSelectCreateScan(CustomScan *scan) +NonPushableInsertSelectCreateScan(CustomScan *scan) { CitusScanState *scanState = palloc0(sizeof(CitusScanState)); - scanState->executorType = MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT; + scanState->executorType = MULTI_EXECUTOR_NON_PUSHABLE_INSERT_SELECT; scanState->customScanState.ss.ps.type = T_CustomScanState; scanState->distributedPlan = GetDistributedPlan(scan); scanState->customScanState.methods = - &CoordinatorInsertSelectCustomExecMethods; + &NonPushableInsertSelectCustomExecMethods; return (Node *) scanState; } diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index 1228f723c..ac47f0109 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -53,11 +53,7 @@ /* Config variables managed via guc.c */ bool EnableRepartitionedInsertSelect = true; -/* depth of current insert/select executor. */ -static int insertSelectExecutorLevel = 0; - -static TupleTableSlot * CoordinatorInsertSelectExecScanInternal(CustomScanState *node); static Query * WrapSubquery(Query *subquery); static List * TwoPhaseInsertSelectTaskList(Oid targetRelationId, Query *insertSelectQuery, char *resultIdPrefix); @@ -71,61 +67,27 @@ static HTAB * ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId, static List * BuildColumnNameListFromTargetList(Oid targetRelationId, List *insertTargetList); static int PartitionColumnIndexFromColumnList(Oid relationId, List *columnNameList); -static List * AddInsertSelectCasts(List *insertTargetList, List *selectTargetList, - Oid targetRelationId); static List * RedistributedInsertSelectTaskList(Query *insertSelectQuery, CitusTableCacheEntry *targetRelation, List **redistributedResults, bool useBinaryFormat); static int PartitionColumnIndex(List *insertTargetList, Var *partitionColumn); -static Expr * CastExpr(Expr *expr, Oid sourceType, Oid targetType, Oid targetCollation, - int targetTypeMod); static void WrapTaskListForProjection(List *taskList, List *projectedTargetEntries); -static void RelableTargetEntryList(List *selectTargetList, List *insertTargetList); /* - * CoordinatorInsertSelectExecScan is a wrapper around - * CoordinatorInsertSelectExecScanInternal which also properly increments - * or decrements insertSelectExecutorLevel. + * NonPushableInsertSelectExecScan executes an INSERT INTO distributed_table + * SELECT .. query either by routing via coordinator or by repartitioning + * task results and moving data directly between nodes. */ TupleTableSlot * -CoordinatorInsertSelectExecScan(CustomScanState *node) -{ - TupleTableSlot *result = NULL; - insertSelectExecutorLevel++; - - PG_TRY(); - { - result = CoordinatorInsertSelectExecScanInternal(node); - } - PG_CATCH(); - { - insertSelectExecutorLevel--; - PG_RE_THROW(); - } - PG_END_TRY(); - - insertSelectExecutorLevel--; - return result; -} - - -/* - * CoordinatorInsertSelectExecScan executes an INSERT INTO distributed_table - * SELECT .. query by setting up a DestReceiver that copies tuples into the - * distributed table and then executing the SELECT query using that DestReceiver - * as the tuple destination. - */ -static TupleTableSlot * -CoordinatorInsertSelectExecScanInternal(CustomScanState *node) +NonPushableInsertSelectExecScan(CustomScanState *node) { CitusScanState *scanState = (CitusScanState *) node; if (!scanState->finishedRemoteScan) { EState *executorState = ScanStateGetExecutorState(scanState); - ParamListInfo paramListInfo = executorState->es_param_list_info; DistributedPlan *distributedPlan = scanState->distributedPlan; Query *insertSelectQuery = copyObject(distributedPlan->insertSelectQuery); List *insertTargetList = insertSelectQuery->targetList; @@ -136,40 +98,8 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) bool hasReturning = distributedPlan->expectResults; HTAB *shardStateHash = NULL; - /* select query to execute */ - Query *selectQuery = BuildSelectForInsertSelect(insertSelectQuery); - - selectRte->subquery = selectQuery; - ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte); - - /* - * Cast types of insert target list and select projection list to - * match the column types of the target relation. - */ - selectQuery->targetList = - AddInsertSelectCasts(insertSelectQuery->targetList, - selectQuery->targetList, - targetRelationId); - - /* - * Later we might need to call WrapTaskListForProjection(), which requires - * that select target list has unique names, otherwise the outer query - * cannot select columns unambiguously. So we relabel select columns to - * match target columns. - */ - RelableTargetEntryList(selectQuery->targetList, insertTargetList); - - /* - * Make a copy of the query, since pg_plan_query may scribble on it and we - * want it to be replanned every time if it is stored in a prepared - * statement. - */ - selectQuery = copyObject(selectQuery); - - /* plan the subquery, this may be another distributed query */ - int cursorOptions = CURSOR_OPT_PARALLEL_OK; - PlannedStmt *selectPlan = pg_plan_query(selectQuery, cursorOptions, - paramListInfo); + Query *selectQuery = selectRte->subquery; + PlannedStmt *selectPlan = copyObject(distributedPlan->selectPlanForInsertSelect); /* * If we are dealing with partitioned table, we also need to lock its @@ -181,8 +111,7 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) LockPartitionRelations(targetRelationId, RowExclusiveLock); } - if (IsRedistributablePlan(selectPlan->planTree) && - IsSupportedRedistributionTarget(targetRelationId)) + if (distributedPlan->insertSelectMethod == INSERT_SELECT_REPARTITION) { ereport(DEBUG1, (errmsg("performing repartitioned INSERT ... SELECT"))); @@ -677,207 +606,6 @@ PartitionColumnIndexFromColumnList(Oid relationId, List *columnNameList) } -/* ExecutingInsertSelect returns true if we are executing an INSERT ...SELECT query */ -bool -ExecutingInsertSelect(void) -{ - return insertSelectExecutorLevel > 0; -} - - -/* - * AddInsertSelectCasts makes sure that the types in columns in the given - * target lists have the same type as the columns of the given relation. - * It might add casts to ensure that. - * - * It returns the updated selectTargetList. - */ -static List * -AddInsertSelectCasts(List *insertTargetList, List *selectTargetList, - Oid targetRelationId) -{ - ListCell *insertEntryCell = NULL; - ListCell *selectEntryCell = NULL; - List *projectedEntries = NIL; - List *nonProjectedEntries = NIL; - - /* - * ReorderInsertSelectTargetLists() makes sure that first few columns of - * the SELECT query match the insert targets. It might contain additional - * items for GROUP BY, etc. - */ - Assert(list_length(insertTargetList) <= list_length(selectTargetList)); - - Relation distributedRelation = heap_open(targetRelationId, RowExclusiveLock); - TupleDesc destTupleDescriptor = RelationGetDescr(distributedRelation); - - int targetEntryIndex = 0; - forboth(insertEntryCell, insertTargetList, selectEntryCell, selectTargetList) - { - TargetEntry *insertEntry = (TargetEntry *) lfirst(insertEntryCell); - TargetEntry *selectEntry = (TargetEntry *) lfirst(selectEntryCell); - Var *insertColumn = (Var *) insertEntry->expr; - Form_pg_attribute attr = TupleDescAttr(destTupleDescriptor, - insertEntry->resno - 1); - - Oid sourceType = insertColumn->vartype; - Oid targetType = attr->atttypid; - if (sourceType != targetType) - { - insertEntry->expr = CastExpr((Expr *) insertColumn, sourceType, targetType, - attr->attcollation, attr->atttypmod); - - /* - * We cannot modify the selectEntry in-place, because ORDER BY or - * GROUP BY clauses might be pointing to it with comparison types - * of the source type. So instead we keep the original one as a - * non-projected entry, so GROUP BY and ORDER BY are happy, and - * create a duplicated projected entry with the coerced expression. - */ - TargetEntry *coercedEntry = copyObject(selectEntry); - coercedEntry->expr = CastExpr((Expr *) selectEntry->expr, sourceType, - targetType, attr->attcollation, - attr->atttypmod); - coercedEntry->ressortgroupref = 0; - - /* - * The only requirement is that users don't use this name in ORDER BY - * or GROUP BY, and it should be unique across the same query. - */ - StringInfo resnameString = makeStringInfo(); - appendStringInfo(resnameString, "auto_coerced_by_citus_%d", targetEntryIndex); - coercedEntry->resname = resnameString->data; - - projectedEntries = lappend(projectedEntries, coercedEntry); - - if (selectEntry->ressortgroupref != 0) - { - selectEntry->resjunk = true; - - /* - * This entry might still end up in the SELECT output list, so - * rename it to avoid ambiguity. - * - * See https://github.com/citusdata/citus/pull/3470. - */ - resnameString = makeStringInfo(); - appendStringInfo(resnameString, "discarded_target_item_%d", - targetEntryIndex); - selectEntry->resname = resnameString->data; - - nonProjectedEntries = lappend(nonProjectedEntries, selectEntry); - } - } - else - { - projectedEntries = lappend(projectedEntries, selectEntry); - } - - targetEntryIndex++; - } - - for (int entryIndex = list_length(insertTargetList); - entryIndex < list_length(selectTargetList); - entryIndex++) - { - nonProjectedEntries = lappend(nonProjectedEntries, list_nth(selectTargetList, - entryIndex)); - } - - /* selectEntry->resno must be the ordinal number of the entry */ - selectTargetList = list_concat(projectedEntries, nonProjectedEntries); - int entryResNo = 1; - TargetEntry *selectTargetEntry = NULL; - foreach_ptr(selectTargetEntry, selectTargetList) - { - selectTargetEntry->resno = entryResNo++; - } - - heap_close(distributedRelation, NoLock); - - return selectTargetList; -} - - -/* - * CastExpr returns an expression which casts the given expr from sourceType to - * the given targetType. - */ -static Expr * -CastExpr(Expr *expr, Oid sourceType, Oid targetType, Oid targetCollation, - int targetTypeMod) -{ - Oid coercionFuncId = InvalidOid; - CoercionPathType coercionType = find_coercion_pathway(targetType, sourceType, - COERCION_EXPLICIT, - &coercionFuncId); - - if (coercionType == COERCION_PATH_FUNC) - { - FuncExpr *coerceExpr = makeNode(FuncExpr); - coerceExpr->funcid = coercionFuncId; - coerceExpr->args = list_make1(copyObject(expr)); - coerceExpr->funccollid = targetCollation; - coerceExpr->funcresulttype = targetType; - - return (Expr *) coerceExpr; - } - else if (coercionType == COERCION_PATH_RELABELTYPE) - { - RelabelType *coerceExpr = makeNode(RelabelType); - coerceExpr->arg = copyObject(expr); - coerceExpr->resulttype = targetType; - coerceExpr->resulttypmod = targetTypeMod; - coerceExpr->resultcollid = targetCollation; - coerceExpr->relabelformat = COERCE_IMPLICIT_CAST; - coerceExpr->location = -1; - - return (Expr *) coerceExpr; - } - else if (coercionType == COERCION_PATH_ARRAYCOERCE) - { - Oid sourceBaseType = get_base_element_type(sourceType); - Oid targetBaseType = get_base_element_type(targetType); - - CaseTestExpr *elemExpr = makeNode(CaseTestExpr); - elemExpr->collation = targetCollation; - elemExpr->typeId = sourceBaseType; - elemExpr->typeMod = -1; - - Expr *elemCastExpr = CastExpr((Expr *) elemExpr, sourceBaseType, - targetBaseType, targetCollation, - targetTypeMod); - - ArrayCoerceExpr *coerceExpr = makeNode(ArrayCoerceExpr); - coerceExpr->arg = copyObject(expr); - coerceExpr->elemexpr = elemCastExpr; - coerceExpr->resultcollid = targetCollation; - coerceExpr->resulttype = targetType; - coerceExpr->resulttypmod = targetTypeMod; - coerceExpr->location = -1; - coerceExpr->coerceformat = COERCE_IMPLICIT_CAST; - - return (Expr *) coerceExpr; - } - else if (coercionType == COERCION_PATH_COERCEVIAIO) - { - CoerceViaIO *coerceExpr = makeNode(CoerceViaIO); - coerceExpr->arg = (Expr *) copyObject(expr); - coerceExpr->resulttype = targetType; - coerceExpr->resultcollid = targetCollation; - coerceExpr->coerceformat = COERCE_IMPLICIT_CAST; - coerceExpr->location = -1; - - return (Expr *) coerceExpr; - } - else - { - ereport(ERROR, (errmsg("could not find a conversion path from type %d to %d", - sourceType, targetType))); - } -} - - /* * IsSupportedRedistributionTarget determines whether re-partitioning into the * given target relation is supported. @@ -1108,23 +836,3 @@ WrapTaskListForProjection(List *taskList, List *projectedTargetEntries) SetTaskQueryString(task, wrappedQuery->data); } } - - -/* - * RelableTargetEntryList relabels select target list to have matching names with - * insert target list. - */ -static void -RelableTargetEntryList(List *selectTargetList, List *insertTargetList) -{ - ListCell *selectTargetCell = NULL; - ListCell *insertTargetCell = NULL; - - forboth(selectTargetCell, selectTargetList, insertTargetCell, insertTargetList) - { - TargetEntry *selectTargetEntry = lfirst(selectTargetCell); - TargetEntry *insertTargetEntry = lfirst(insertTargetCell); - - selectTargetEntry->resname = insertTargetEntry->resname; - } -} diff --git a/src/backend/distributed/executor/multi_server_executor.c b/src/backend/distributed/executor/multi_server_executor.c index d7b003677..a13c39ca3 100644 --- a/src/backend/distributed/executor/multi_server_executor.c +++ b/src/backend/distributed/executor/multi_server_executor.c @@ -84,11 +84,11 @@ JobExecutorType(DistributedPlan *distributedPlan) { /* * Even if adaptiveExecutorEnabled, we go through - * MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT because + * MULTI_EXECUTOR_NON_PUSHABLE_INSERT_SELECT because * the executor already knows how to handle adaptive * executor when necessary. */ - return MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT; + return MULTI_EXECUTOR_NON_PUSHABLE_INSERT_SELECT; } Assert(distributedPlan->modLevel == ROW_MODIFY_READONLY); diff --git a/src/backend/distributed/executor/query_stats.c b/src/backend/distributed/executor/query_stats.c index 7f9493c72..04e6002c2 100644 --- a/src/backend/distributed/executor/query_stats.c +++ b/src/backend/distributed/executor/query_stats.c @@ -99,7 +99,7 @@ CitusExecutorName(MultiExecutorType executorType) return "task-tracker"; } - case MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT: + case MULTI_EXECUTOR_NON_PUSHABLE_INSERT_SELECT: { return "insert-select"; } diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 8fc89ba6b..25a4b95f4 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -913,7 +913,8 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi } distributedPlan = - CreateInsertSelectPlan(planId, originalQuery, plannerRestrictionContext); + CreateInsertSelectPlan(planId, originalQuery, plannerRestrictionContext, + boundParams); } else if (InsertSelectIntoLocalTable(originalQuery)) { @@ -1294,9 +1295,9 @@ FinalizePlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan) break; } - case MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT: + case MULTI_EXECUTOR_NON_PUSHABLE_INSERT_SELECT: { - customScan->methods = &CoordinatorInsertSelectCustomScanMethods; + customScan->methods = &NonPushableInsertSelectCustomScanMethods; break; } diff --git a/src/backend/distributed/planner/function_call_delegation.c b/src/backend/distributed/planner/function_call_delegation.c index 6c5601ead..caa4034b9 100644 --- a/src/backend/distributed/planner/function_call_delegation.c +++ b/src/backend/distributed/planner/function_call_delegation.c @@ -26,7 +26,6 @@ #include "distributed/deparse_shard_query.h" #include "distributed/function_call_delegation.h" #include "distributed/insert_select_planner.h" -#include "distributed/insert_select_executor.h" #include "distributed/metadata_utility.h" #include "distributed/coordinator_protocol.h" #include "distributed/metadata_cache.h" @@ -230,11 +229,10 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext) } /* - * This can be called while executing INSERT ... SELECT func(). insert_select_executor - * doesn't get the planned subquery and gets the actual struct Query, so the planning - * for these kinds of queries happens at the execution time. + * Cannot delegate functions for INSERT ... SELECT func(), since they require + * coordinated transactions. */ - if (ExecutingInsertSelect()) + if (PlanningInsertSelect()) { ereport(DEBUG1, (errmsg("not pushing down function calls in INSERT ... SELECT"))); return NULL; diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index 6dfe0dafc..22f882aeb 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -48,10 +48,17 @@ #include "parser/parsetree.h" #include "parser/parse_coerce.h" #include "parser/parse_relation.h" +#include "tcop/tcopprot.h" +#include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/rel.h" +static DistributedPlan * CreateInsertSelectPlanInternal(uint64 planId, + Query *originalQuery, + PlannerRestrictionContext * + plannerRestrictionContext, + ParamListInfo boundParams); static DistributedPlan * CreateDistributedInsertSelectPlan(Query *originalQuery, PlannerRestrictionContext * plannerRestrictionContext); @@ -77,8 +84,18 @@ static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query, subqueryRte, Oid * selectPartitionColumnTableId); -static DistributedPlan * CreateCoordinatorInsertSelectPlan(uint64 planId, Query *parse); -static DeferredErrorMessage * CoordinatorInsertSelectSupported(Query *insertSelectQuery); +static DistributedPlan * CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, + ParamListInfo boundParams); +static DeferredErrorMessage * NonPushableInsertSelectSupported(Query *insertSelectQuery); +static void RelabelTargetEntryList(List *selectTargetList, List *insertTargetList); +static List * AddInsertSelectCasts(List *insertTargetList, List *selectTargetList, + Oid targetRelationId); +static Expr * CastExpr(Expr *expr, Oid sourceType, Oid targetType, Oid targetCollation, + int targetTypeMod); + + +/* depth of current insert/select planner. */ +static int insertSelectPlannerLevel = 0; /* @@ -177,15 +194,46 @@ CheckInsertSelectQuery(Query *query) } +/* + * CoordinatorInsertSelectExecScan is a wrapper around + * CoordinatorInsertSelectExecScanInternal which also properly increments + * or decrements insertSelectExecutorLevel. + */ +DistributedPlan * +CreateInsertSelectPlan(uint64 planId, Query *originalQuery, + PlannerRestrictionContext *plannerRestrictionContext, + ParamListInfo boundParams) +{ + DistributedPlan *result = NULL; + insertSelectPlannerLevel++; + + PG_TRY(); + { + result = CreateInsertSelectPlanInternal(planId, originalQuery, + plannerRestrictionContext, boundParams); + } + PG_CATCH(); + { + insertSelectPlannerLevel--; + PG_RE_THROW(); + } + PG_END_TRY(); + + insertSelectPlannerLevel--; + return result; +} + + /* * CreateInsertSelectPlan tries to create a distributed plan for an * INSERT INTO distributed_table SELECT ... query by push down the * command to the workers and if that is not possible it creates a * plan for evaluating the SELECT on the coordinator. */ -DistributedPlan * -CreateInsertSelectPlan(uint64 planId, Query *originalQuery, - PlannerRestrictionContext *plannerRestrictionContext) +static DistributedPlan * +CreateInsertSelectPlanInternal(uint64 planId, Query *originalQuery, + PlannerRestrictionContext *plannerRestrictionContext, + ParamListInfo boundParams) { DeferredErrorMessage *deferredError = ErrorIfOnConflictNotSupported(originalQuery); if (deferredError != NULL) @@ -201,8 +249,12 @@ CreateInsertSelectPlan(uint64 planId, Query *originalQuery, { RaiseDeferredError(distributedPlan->planningError, DEBUG1); - /* if INSERT..SELECT cannot be distributed, pull to coordinator */ - distributedPlan = CreateCoordinatorInsertSelectPlan(planId, originalQuery); + /* + * If INSERT..SELECT cannot be distributed, pull to coordinator or use + * repartitioning. + */ + distributedPlan = CreateNonPushableInsertSelectPlan(planId, originalQuery, + boundParams); } return distributedPlan; @@ -1282,14 +1334,15 @@ InsertPartitionColumnMatchesSelect(Query *query, RangeTblEntry *insertRte, /* - * CreateCoordinatorInsertSelectPlan creates a query plan for a SELECT into a + * CreateNonPushableInsertSelectPlan creates a query plan for a SELECT into a * distributed table. The query plan can also be executed on a worker in MX. */ static DistributedPlan * -CreateCoordinatorInsertSelectPlan(uint64 planId, Query *parse) +CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo boundParams) { Query *insertSelectQuery = copyObject(parse); + RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery); RangeTblEntry *insertRte = ExtractResultRelationRTE(insertSelectQuery); Oid targetRelationId = insertRte->relid; @@ -1297,14 +1350,55 @@ CreateCoordinatorInsertSelectPlan(uint64 planId, Query *parse) distributedPlan->modLevel = RowModifyLevelForQuery(insertSelectQuery); distributedPlan->planningError = - CoordinatorInsertSelectSupported(insertSelectQuery); + NonPushableInsertSelectSupported(insertSelectQuery); if (distributedPlan->planningError != NULL) { return distributedPlan; } + Query *selectQuery = BuildSelectForInsertSelect(insertSelectQuery); + + selectRte->subquery = selectQuery; + ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte); + + /* + * Cast types of insert target list and select projection list to + * match the column types of the target relation. + */ + selectQuery->targetList = + AddInsertSelectCasts(insertSelectQuery->targetList, + selectQuery->targetList, + targetRelationId); + + /* + * Later we might need to call WrapTaskListForProjection(), which requires + * that select target list has unique names, otherwise the outer query + * cannot select columns unambiguously. So we relabel select columns to + * match target columns. + */ + List *insertTargetList = insertSelectQuery->targetList; + RelabelTargetEntryList(selectQuery->targetList, insertTargetList); + + /* + * Make a copy of the select query, since following code scribbles it + * but we need to keep the original for EXPLAIN. + */ + Query *selectQueryCopy = copyObject(selectQuery); + + /* plan the subquery, this may be another distributed query */ + int cursorOptions = CURSOR_OPT_PARALLEL_OK; + PlannedStmt *selectPlan = pg_plan_query(selectQueryCopy, cursorOptions, + boundParams); + + bool repartitioned = IsRedistributablePlan(selectPlan->planTree) && + IsSupportedRedistributionTarget(targetRelationId); + distributedPlan->insertSelectQuery = insertSelectQuery; + distributedPlan->selectPlanForInsertSelect = selectPlan; + distributedPlan->insertSelectMethod = repartitioned ? + INSERT_SELECT_REPARTITION : + INSERT_SELECT_VIA_COORDINATOR; distributedPlan->expectResults = insertSelectQuery->returningList != NIL; distributedPlan->intermediateResultIdPrefix = InsertSelectResultIdPrefix(planId); distributedPlan->targetRelationId = targetRelationId; @@ -1314,13 +1408,13 @@ CreateCoordinatorInsertSelectPlan(uint64 planId, Query *parse) /* - * CoordinatorInsertSelectSupported returns an error if executing an + * NonPushableInsertSelectSupported returns an error if executing an * INSERT ... SELECT command by pulling results of the SELECT to the coordinator - * is unsupported because it needs to generate sequence values or insert into an - * append-distributed table. + * or with repartitioning is unsupported because it needs to generate sequence + * values or insert into an append-distributed table. */ static DeferredErrorMessage * -CoordinatorInsertSelectSupported(Query *insertSelectQuery) +NonPushableInsertSelectSupported(Query *insertSelectQuery) { DeferredErrorMessage *deferredError = ErrorIfOnConflictNotSupported( insertSelectQuery); @@ -1355,3 +1449,224 @@ InsertSelectResultIdPrefix(uint64 planId) return resultIdPrefix->data; } + + +/* + * RelabelTargetEntryList relabels select target list to have matching names with + * insert target list. + */ +static void +RelabelTargetEntryList(List *selectTargetList, List *insertTargetList) +{ + ListCell *selectTargetCell = NULL; + ListCell *insertTargetCell = NULL; + + forboth(selectTargetCell, selectTargetList, insertTargetCell, insertTargetList) + { + TargetEntry *selectTargetEntry = lfirst(selectTargetCell); + TargetEntry *insertTargetEntry = lfirst(insertTargetCell); + + selectTargetEntry->resname = insertTargetEntry->resname; + } +} + + +/* + * AddInsertSelectCasts makes sure that the types in columns in the given + * target lists have the same type as the columns of the given relation. + * It might add casts to ensure that. + * + * It returns the updated selectTargetList. + */ +static List * +AddInsertSelectCasts(List *insertTargetList, List *selectTargetList, + Oid targetRelationId) +{ + ListCell *insertEntryCell = NULL; + ListCell *selectEntryCell = NULL; + List *projectedEntries = NIL; + List *nonProjectedEntries = NIL; + + /* + * ReorderInsertSelectTargetLists() makes sure that first few columns of + * the SELECT query match the insert targets. It might contain additional + * items for GROUP BY, etc. + */ + Assert(list_length(insertTargetList) <= list_length(selectTargetList)); + + Relation distributedRelation = heap_open(targetRelationId, RowExclusiveLock); + TupleDesc destTupleDescriptor = RelationGetDescr(distributedRelation); + + int targetEntryIndex = 0; + forboth(insertEntryCell, insertTargetList, selectEntryCell, selectTargetList) + { + TargetEntry *insertEntry = (TargetEntry *) lfirst(insertEntryCell); + TargetEntry *selectEntry = (TargetEntry *) lfirst(selectEntryCell); + Var *insertColumn = (Var *) insertEntry->expr; + Form_pg_attribute attr = TupleDescAttr(destTupleDescriptor, + insertEntry->resno - 1); + + Oid sourceType = insertColumn->vartype; + Oid targetType = attr->atttypid; + if (sourceType != targetType) + { + insertEntry->expr = CastExpr((Expr *) insertColumn, sourceType, targetType, + attr->attcollation, attr->atttypmod); + + /* + * We cannot modify the selectEntry in-place, because ORDER BY or + * GROUP BY clauses might be pointing to it with comparison types + * of the source type. So instead we keep the original one as a + * non-projected entry, so GROUP BY and ORDER BY are happy, and + * create a duplicated projected entry with the coerced expression. + */ + TargetEntry *coercedEntry = copyObject(selectEntry); + coercedEntry->expr = CastExpr((Expr *) selectEntry->expr, sourceType, + targetType, attr->attcollation, + attr->atttypmod); + coercedEntry->ressortgroupref = 0; + + /* + * The only requirement is that users don't use this name in ORDER BY + * or GROUP BY, and it should be unique across the same query. + */ + StringInfo resnameString = makeStringInfo(); + appendStringInfo(resnameString, "auto_coerced_by_citus_%d", targetEntryIndex); + coercedEntry->resname = resnameString->data; + + projectedEntries = lappend(projectedEntries, coercedEntry); + + if (selectEntry->ressortgroupref != 0) + { + selectEntry->resjunk = true; + + /* + * This entry might still end up in the SELECT output list, so + * rename it to avoid ambiguity. + * + * See https://github.com/citusdata/citus/pull/3470. + */ + resnameString = makeStringInfo(); + appendStringInfo(resnameString, "discarded_target_item_%d", + targetEntryIndex); + selectEntry->resname = resnameString->data; + + nonProjectedEntries = lappend(nonProjectedEntries, selectEntry); + } + } + else + { + projectedEntries = lappend(projectedEntries, selectEntry); + } + + targetEntryIndex++; + } + + for (int entryIndex = list_length(insertTargetList); + entryIndex < list_length(selectTargetList); + entryIndex++) + { + nonProjectedEntries = lappend(nonProjectedEntries, list_nth(selectTargetList, + entryIndex)); + } + + /* selectEntry->resno must be the ordinal number of the entry */ + selectTargetList = list_concat(projectedEntries, nonProjectedEntries); + int entryResNo = 1; + TargetEntry *selectTargetEntry = NULL; + foreach_ptr(selectTargetEntry, selectTargetList) + { + selectTargetEntry->resno = entryResNo++; + } + + heap_close(distributedRelation, NoLock); + + return selectTargetList; +} + + +/* + * CastExpr returns an expression which casts the given expr from sourceType to + * the given targetType. + */ +static Expr * +CastExpr(Expr *expr, Oid sourceType, Oid targetType, Oid targetCollation, + int targetTypeMod) +{ + Oid coercionFuncId = InvalidOid; + CoercionPathType coercionType = find_coercion_pathway(targetType, sourceType, + COERCION_EXPLICIT, + &coercionFuncId); + + if (coercionType == COERCION_PATH_FUNC) + { + FuncExpr *coerceExpr = makeNode(FuncExpr); + coerceExpr->funcid = coercionFuncId; + coerceExpr->args = list_make1(copyObject(expr)); + coerceExpr->funccollid = targetCollation; + coerceExpr->funcresulttype = targetType; + + return (Expr *) coerceExpr; + } + else if (coercionType == COERCION_PATH_RELABELTYPE) + { + RelabelType *coerceExpr = makeNode(RelabelType); + coerceExpr->arg = copyObject(expr); + coerceExpr->resulttype = targetType; + coerceExpr->resulttypmod = targetTypeMod; + coerceExpr->resultcollid = targetCollation; + coerceExpr->relabelformat = COERCE_IMPLICIT_CAST; + coerceExpr->location = -1; + + return (Expr *) coerceExpr; + } + else if (coercionType == COERCION_PATH_ARRAYCOERCE) + { + Oid sourceBaseType = get_base_element_type(sourceType); + Oid targetBaseType = get_base_element_type(targetType); + + CaseTestExpr *elemExpr = makeNode(CaseTestExpr); + elemExpr->collation = targetCollation; + elemExpr->typeId = sourceBaseType; + elemExpr->typeMod = -1; + + Expr *elemCastExpr = CastExpr((Expr *) elemExpr, sourceBaseType, + targetBaseType, targetCollation, + targetTypeMod); + + ArrayCoerceExpr *coerceExpr = makeNode(ArrayCoerceExpr); + coerceExpr->arg = copyObject(expr); + coerceExpr->elemexpr = elemCastExpr; + coerceExpr->resultcollid = targetCollation; + coerceExpr->resulttype = targetType; + coerceExpr->resulttypmod = targetTypeMod; + coerceExpr->location = -1; + coerceExpr->coerceformat = COERCE_IMPLICIT_CAST; + + return (Expr *) coerceExpr; + } + else if (coercionType == COERCION_PATH_COERCEVIAIO) + { + CoerceViaIO *coerceExpr = makeNode(CoerceViaIO); + coerceExpr->arg = (Expr *) copyObject(expr); + coerceExpr->resulttype = targetType; + coerceExpr->resultcollid = targetCollation; + coerceExpr->coerceformat = COERCE_IMPLICIT_CAST; + coerceExpr->location = -1; + + return (Expr *) coerceExpr; + } + else + { + ereport(ERROR, (errmsg("could not find a conversion path from type %d to %d", + sourceType, targetType))); + } +} + + +/* PlanningInsertSelect returns true if we are planning an INSERT ...SELECT query */ +bool +PlanningInsertSelect(void) +{ + return insertSelectPlannerLevel > 0; +} diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index 1f875f51f..9b20f154f 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -198,33 +198,28 @@ CitusExplainScan(CustomScanState *node, List *ancestors, struct ExplainState *es /* - * CoordinatorInsertSelectExplainScan is a custom scan explain callback function + * NonPushableInsertSelectExplainScan is a custom scan explain callback function * which is used to print explain information of a Citus plan for an INSERT INTO - * distributed_table SELECT ... query that is evaluated on the coordinator. + * distributed_table SELECT ... query that is evaluated on the coordinator or + * uses repartitioning. */ void -CoordinatorInsertSelectExplainScan(CustomScanState *node, List *ancestors, +NonPushableInsertSelectExplainScan(CustomScanState *node, List *ancestors, struct ExplainState *es) { CitusScanState *scanState = (CitusScanState *) node; DistributedPlan *distributedPlan = scanState->distributedPlan; Query *insertSelectQuery = distributedPlan->insertSelectQuery; - Query *query = BuildSelectForInsertSelect(insertSelectQuery); - RangeTblEntry *insertRte = ExtractResultRelationRTE(insertSelectQuery); - Oid targetRelationId = insertRte->relid; - IntoClause *into = NULL; - ParamListInfo params = NULL; - char *queryString = NULL; - int cursorOptions = CURSOR_OPT_PARALLEL_OK; + RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery); /* - * Make a copy of the query, since pg_plan_query may scribble on it and later - * stages of EXPLAIN require it. + * Create a copy because ExplainOneQuery can modify the query, and later + * executions of prepared statements might require it. See + * https://github.com/citusdata/citus/issues/3947 for what can happen. */ - Query *queryCopy = copyObject(query); - PlannedStmt *selectPlan = pg_plan_query(queryCopy, cursorOptions, params); - bool repartition = IsRedistributablePlan(selectPlan->planTree) && - IsSupportedRedistributionTarget(targetRelationId); + Query *queryCopy = copyObject(selectRte->subquery); + + bool repartition = distributedPlan->insertSelectMethod == INSERT_SELECT_REPARTITION; if (es->analyze) { @@ -245,7 +240,10 @@ CoordinatorInsertSelectExplainScan(CustomScanState *node, List *ancestors, ExplainOpenGroup("Select Query", "Select Query", false, es); /* explain the inner SELECT query */ - ExplainOneQuery(query, 0, into, es, queryString, params, NULL); + IntoClause *into = NULL; + ParamListInfo params = NULL; + char *queryString = NULL; + ExplainOneQuery(queryCopy, 0, into, es, queryString, params, NULL); ExplainCloseGroup("Select Query", "Select Query", false, es); } diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index ddb45be11..d3f64e792 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -129,6 +129,8 @@ CopyNodeDistributedPlan(COPYFUNC_ARGS) COPY_NODE_FIELD(relationIdList); COPY_SCALAR_FIELD(targetRelationId); COPY_NODE_FIELD(insertSelectQuery); + COPY_NODE_FIELD(selectPlanForInsertSelect); + COPY_SCALAR_FIELD(insertSelectMethod); COPY_STRING_FIELD(intermediateResultIdPrefix); COPY_NODE_FIELD(subPlanList); diff --git a/src/include/distributed/citus_custom_scan.h b/src/include/distributed/citus_custom_scan.h index 497e5db7d..dcddb27a3 100644 --- a/src/include/distributed/citus_custom_scan.h +++ b/src/include/distributed/citus_custom_scan.h @@ -32,7 +32,7 @@ typedef struct CitusScanState /* custom scan methods for all executors */ extern CustomScanMethods AdaptiveExecutorCustomScanMethods; extern CustomScanMethods TaskTrackerCustomScanMethods; -extern CustomScanMethods CoordinatorInsertSelectCustomScanMethods; +extern CustomScanMethods NonPushableInsertSelectCustomScanMethods; extern CustomScanMethods DelayedErrorCustomScanMethods; diff --git a/src/include/distributed/insert_select_executor.h b/src/include/distributed/insert_select_executor.h index e94c7fe9c..bcfe29bfb 100644 --- a/src/include/distributed/insert_select_executor.h +++ b/src/include/distributed/insert_select_executor.h @@ -18,8 +18,7 @@ extern bool EnableRepartitionedInsertSelect; -extern TupleTableSlot * CoordinatorInsertSelectExecScan(CustomScanState *node); -extern bool ExecutingInsertSelect(void); +extern TupleTableSlot * NonPushableInsertSelectExecScan(CustomScanState *node); extern Query * BuildSelectForInsertSelect(Query *insertSelectQuery); extern bool IsSupportedRedistributionTarget(Oid targetRelationId); extern bool IsRedistributablePlan(Plan *selectPlan); diff --git a/src/include/distributed/insert_select_planner.h b/src/include/distributed/insert_select_planner.h index 84066278b..74b8a0708 100644 --- a/src/include/distributed/insert_select_planner.h +++ b/src/include/distributed/insert_select_planner.h @@ -29,11 +29,12 @@ extern bool InsertSelectIntoLocalTable(Query *query); extern Query * ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte, RangeTblEntry *subqueryRte); -extern void CoordinatorInsertSelectExplainScan(CustomScanState *node, List *ancestors, +extern void NonPushableInsertSelectExplainScan(CustomScanState *node, List *ancestors, struct ExplainState *es); extern DistributedPlan * CreateInsertSelectPlan(uint64 planId, Query *originalQuery, PlannerRestrictionContext * - plannerRestrictionContext); + plannerRestrictionContext, + ParamListInfo boundParams); extern DistributedPlan * CreateInsertSelectIntoLocalTablePlan(uint64 planId, Query *originalQuery, ParamListInfo @@ -42,6 +43,7 @@ extern DistributedPlan * CreateInsertSelectIntoLocalTablePlan(uint64 planId, PlannerRestrictionContext * plannerRestrictionContext); extern char * InsertSelectResultIdPrefix(uint64 planId); +extern bool PlanningInsertSelect(void); #endif /* INSERT_SELECT_PLANNER_H */ diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 19bfc6abe..2d6d9ef8f 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -378,6 +378,22 @@ typedef struct JoinSequenceNode } JoinSequenceNode; +/* + * InsertSelectMethod represents the method to use for INSERT INTO ... SELECT + * queries. + * + * Note that there is a third method which is not represented here, which is + * pushing down the INSERT INTO ... SELECT to workers. This method is executed + * similar to other distributed queries and doesn't need a special execution + * code, so we don't need to represent it here. + */ +typedef enum InsertSelectMethod +{ + INSERT_SELECT_VIA_COORDINATOR, + INSERT_SELECT_REPARTITION +} InsertSelectMethod; + + /* * DistributedPlan contains all information necessary to execute a * distribute query. @@ -416,8 +432,11 @@ typedef struct DistributedPlan /* target relation of a modification */ Oid targetRelationId; - /* INSERT .. SELECT via the coordinator */ + /* + * INSERT .. SELECT via the coordinator or repartition */ Query *insertSelectQuery; + PlannedStmt *selectPlanForInsertSelect; + InsertSelectMethod insertSelectMethod; /* * If intermediateResultIdPrefix is non-null, an INSERT ... SELECT diff --git a/src/include/distributed/multi_server_executor.h b/src/include/distributed/multi_server_executor.h index d49f09d0c..077adb4d3 100644 --- a/src/include/distributed/multi_server_executor.h +++ b/src/include/distributed/multi_server_executor.h @@ -88,7 +88,7 @@ typedef enum MULTI_EXECUTOR_INVALID_FIRST = 0, MULTI_EXECUTOR_ADAPTIVE = 1, MULTI_EXECUTOR_TASK_TRACKER = 2, - MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT = 3 + MULTI_EXECUTOR_NON_PUSHABLE_INSERT_SELECT = 3 } MultiExecutorType; diff --git a/src/test/regress/expected/insert_select_repartition.out b/src/test/regress/expected/insert_select_repartition.out index 298f691de..5c4af0bc5 100644 --- a/src/test/regress/expected/insert_select_repartition.out +++ b/src/test/regress/expected/insert_select_repartition.out @@ -577,7 +577,7 @@ EXPLAIN INSERT INTO target_table SELECT a, max(b) FROM source_table GROUP BY a; -- EXPLAIN ANALYZE is currently not supported -- EXPLAIN ANALYZE INSERT INTO target_table SELECT a, max(b) FROM source_table GROUP BY a; -ERROR: EXPLAIN ANALYZE is currently not supported for INSERT ... SELECT commands with repartitioning +ERROR: EXPLAIN ANALYZE is currently not supported for INSERT ... SELECT commands with repartitioning -- -- Duplicate names in target list -- @@ -660,6 +660,96 @@ SELECT a, count(*), count(distinct b) distinct_values FROM target_table GROUP BY 4 | 6 | 1 (5 rows) +DEALLOCATE insert_plan; +-- +-- Prepared router INSERT/SELECT. We currently use pull to coordinator when the +-- distributed query has a single task. +-- +TRUNCATE target_table; +PREPARE insert_plan(int) AS +INSERT INTO target_table + SELECT a, max(b) FROM source_table + WHERE a=$1 GROUP BY a; +SET client_min_messages TO DEBUG1; +EXECUTE insert_plan(0); +DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT +DEBUG: Collecting INSERT ... SELECT results on coordinator +EXECUTE insert_plan(0); +DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT +DEBUG: Collecting INSERT ... SELECT results on coordinator +EXECUTE insert_plan(0); +DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT +DEBUG: Collecting INSERT ... SELECT results on coordinator +EXECUTE insert_plan(0); +DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT +DEBUG: Collecting INSERT ... SELECT results on coordinator +EXECUTE insert_plan(0); +DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT +DEBUG: Collecting INSERT ... SELECT results on coordinator +EXECUTE insert_plan(0); +DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT +DEBUG: Collecting INSERT ... SELECT results on coordinator +EXECUTE insert_plan(0); +DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT +DEBUG: Collecting INSERT ... SELECT results on coordinator +RESET client_min_messages; +SELECT a, count(*), count(distinct b) distinct_values FROM target_table GROUP BY a ORDER BY a; + a | count | distinct_values +--------------------------------------------------------------------- + 0 | 7 | 1 +(1 row) + +DEALLOCATE insert_plan; +-- +-- Prepared INSERT/SELECT with no parameters. +-- +TRUNCATE target_table; +PREPARE insert_plan AS +INSERT INTO target_table + SELECT a, max(b) FROM source_table + WHERE a BETWEEN 1 AND 2 GROUP BY a; +EXPLAIN EXECUTE insert_plan; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus INSERT ... SELECT) (cost=0.00..0.00 rows=0 width=0) + INSERT/SELECT method: repartition + -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=8) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> GroupAggregate (cost=44.09..44.28 rows=11 width=8) + Group Key: a + -> Sort (cost=44.09..44.12 rows=11 width=8) + Sort Key: a + -> Seq Scan on source_table_4213606 source_table (cost=0.00..43.90 rows=11 width=8) + Filter: ((a >= 1) AND (a <= 2)) +(13 rows) + +SET client_min_messages TO DEBUG1; +EXECUTE insert_plan; +DEBUG: performing repartitioned INSERT ... SELECT +EXECUTE insert_plan; +DEBUG: performing repartitioned INSERT ... SELECT +EXECUTE insert_plan; +DEBUG: performing repartitioned INSERT ... SELECT +EXECUTE insert_plan; +DEBUG: performing repartitioned INSERT ... SELECT +EXECUTE insert_plan; +DEBUG: performing repartitioned INSERT ... SELECT +EXECUTE insert_plan; +DEBUG: performing repartitioned INSERT ... SELECT +EXECUTE insert_plan; +DEBUG: performing repartitioned INSERT ... SELECT +RESET client_min_messages; +SELECT a, count(*), count(distinct b) distinct_values FROM target_table GROUP BY a ORDER BY a; + a | count | distinct_values +--------------------------------------------------------------------- + 1 | 7 | 1 + 2 | 7 | 1 +(2 rows) + +DEALLOCATE insert_plan; -- -- INSERT/SELECT in CTE -- @@ -673,10 +763,10 @@ DEBUG: INSERT target table and the source relation of the SELECT partition colu DEBUG: only SELECT, UPDATE, or DELETE common table expressions may be router planned DEBUG: generating subplan XXX_1 for CTE r: INSERT INTO insert_select_repartition.target_table (a, b) SELECT a, b FROM insert_select_repartition.source_table RETURNING target_table.a, target_table.b DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT +DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT a, max AS b FROM (SELECT source_table.a, max(source_table.b) AS max FROM (insert_select_repartition.source_table JOIN (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) r USING (a, b)) GROUP BY source_table.a) citus_insert_select_subquery DEBUG: Router planner cannot handle multi-shard select queries DEBUG: performing repartitioned INSERT ... SELECT -DEBUG: Router planner cannot handle multi-shard select queries DEBUG: performing repartitioned INSERT ... SELECT DEBUG: partitioning SELECT query by column index 0 with name 'a' DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213610 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213606_to_0,repartitioned_results_xxxxx_from_4213607_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) RETURNING citus_table_alias.a, citus_table_alias.b @@ -1160,6 +1250,31 @@ DO UPDATE SET -> Seq Scan on source_table_4213644 source_table (10 rows) +-- verify that we don't report repartitioned insert/select for tables +-- with sequences. See https://github.com/citusdata/citus/issues/3936 +create table table_with_sequences (x int, y int, z bigserial); +insert into table_with_sequences values (1,1); +select create_distributed_table('table_with_sequences','x'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +explain insert into table_with_sequences select y, x from table_with_sequences; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus INSERT ... SELECT) (cost=0.00..0.00 rows=0 width=0) + INSERT/SELECT method: pull to coordinator + -> Custom Scan (Citus Adaptive) (cost=0.00..250.00 rows=100000 width=16) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on table_with_sequences_4213648 table_with_sequences (cost=0.00..28.50 rows=1850 width=8) +(8 rows) + -- clean-up SET client_min_messages TO WARNING; DROP SCHEMA insert_select_repartition CASCADE; diff --git a/src/test/regress/expected/multi_explain.out b/src/test/regress/expected/multi_explain.out index ad1a18401..1612446c7 100644 --- a/src/test/regress/expected/multi_explain.out +++ b/src/test/regress/expected/multi_explain.out @@ -1357,31 +1357,33 @@ WITH cte1 AS (SELECT * FROM cte1 WHERE EXISTS (SELECT * FROM cte1) LIMIT 5) SELECT s FROM cte1 WHERE EXISTS (SELECT * FROM cte1); Custom Scan (Citus INSERT ... SELECT) INSERT/SELECT method: pull to coordinator - -> Result - One-Time Filter: $3 + -> Subquery Scan on citus_insert_select_subquery CTE cte1 -> Function Scan on generate_series s - CTE cte1 - -> Limit - InitPlan 2 (returns $1) - -> CTE Scan on cte1 cte1_1 - -> Result - One-Time Filter: $1 - -> CTE Scan on cte1 cte1_2 - InitPlan 4 (returns $3) - -> CTE Scan on cte1 cte1_3 - -> CTE Scan on cte1 + -> Result + One-Time Filter: $3 + CTE cte1 + -> Limit + InitPlan 2 (returns $1) + -> CTE Scan on cte1 cte1_1 + -> Result + One-Time Filter: $1 + -> CTE Scan on cte1 cte1_2 + InitPlan 4 (returns $3) + -> CTE Scan on cte1 cte1_3 + -> CTE Scan on cte1 EXPLAIN (COSTS OFF) INSERT INTO lineitem_hash_part ( SELECT s FROM generate_series(1,5) s) UNION ( SELECT s FROM generate_series(5,10) s); Custom Scan (Citus INSERT ... SELECT) INSERT/SELECT method: pull to coordinator - -> HashAggregate - Group Key: s.s - -> Append - -> Function Scan on generate_series s - -> Function Scan on generate_series s_1 + -> Subquery Scan on citus_insert_select_subquery + -> HashAggregate + Group Key: s.s + -> Append + -> Function Scan on generate_series s + -> Function Scan on generate_series s_1 -- explain with recursive planning -- prevent PG 11 - PG 12 outputs to diverge SET citus.enable_cte_inlining TO false; diff --git a/src/test/regress/expected/multi_insert_conflict.out b/src/test/regress/expected/multi_insert_conflict.out new file mode 100644 index 000000000..e69de29bb diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index d1e00a728..d50595468 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -711,6 +711,13 @@ INSERT INTO agg_events DEBUG: CTE sub_cte is going to be inlined via distributed planning DEBUG: Subqueries without relations are not allowed in distributed INSERT ... SELECT queries DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: Subqueries without relations are not allowed in distributed INSERT ... SELECT queries +DEBUG: CTE sub_cte is going to be inlined via distributed planning +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: generating subplan XXX_1 for CTE sub_cte: SELECT 1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT user_id, (SELECT sub_cte."?column?" FROM (SELECT intermediate_result."?column?" FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result("?column?" integer)) sub_cte) AS value_1_agg FROM public.raw_events_first +DEBUG: Router planner cannot handle multi-shard select queries ERROR: could not run distributed query with subquery outside the FROM, WHERE and HAVING clauses HINT: Consider using an equality filter on the distributed table's partition column. -- We support set operations via the coordinator @@ -1045,7 +1052,7 @@ $Q$); Custom Scan (Citus INSERT ... SELECT) INSERT/SELECT method: pull to coordinator -> HashAggregate - Group Key: remote_scan.id + Group Key: remote_scan.user_id -> Custom Scan (Citus Adaptive) -> Distributed Subplan XXX_1 -> Custom Scan (Citus Adaptive) @@ -1110,7 +1117,7 @@ $Q$); Custom Scan (Citus INSERT ... SELECT) INSERT/SELECT method: pull to coordinator -> HashAggregate - Group Key: remote_scan.id + Group Key: remote_scan.user_id -> Custom Scan (Citus Adaptive) -> Distributed Subplan XXX_1 -> Custom Scan (Citus Adaptive) @@ -1409,7 +1416,7 @@ $Q$); Custom Scan (Citus INSERT ... SELECT) INSERT/SELECT method: pull to coordinator -> HashAggregate - Group Key: remote_scan.id + Group Key: remote_scan.user_id -> Custom Scan (Citus Adaptive) -> Distributed Subplan XXX_1 -> Custom Scan (Citus Adaptive) @@ -1454,7 +1461,7 @@ $Q$); Custom Scan (Citus INSERT ... SELECT) INSERT/SELECT method: pull to coordinator -> HashAggregate - Group Key: remote_scan.id + Group Key: remote_scan.user_id -> Custom Scan (Citus Adaptive) -> Distributed Subplan XXX_1 -> HashAggregate @@ -1502,7 +1509,7 @@ $Q$); Custom Scan (Citus INSERT ... SELECT) INSERT/SELECT method: pull to coordinator -> HashAggregate - Group Key: remote_scan.id + Group Key: remote_scan.user_id -> Custom Scan (Citus Adaptive) -> Distributed Subplan XXX_1 -> Custom Scan (Citus Adaptive) diff --git a/src/test/regress/expected/multi_insert_select_conflict.out b/src/test/regress/expected/multi_insert_select_conflict.out index d069cf7d3..f9145b0a6 100644 --- a/src/test/regress/expected/multi_insert_select_conflict.out +++ b/src/test/regress/expected/multi_insert_select_conflict.out @@ -114,10 +114,10 @@ WITH inserted_table AS ( ) SELECT * FROM inserted_table ORDER BY 1; DEBUG: generating subplan XXX_1 for CTE inserted_table: INSERT INTO on_conflict.target_table (col_1, col_2) SELECT col_1, col_2 FROM (SELECT source_table_1.col_1, source_table_1.col_2, source_table_1.col_3 FROM on_conflict.source_table_1 LIMIT 5) foo ON CONFLICT(col_1) DO UPDATE SET col_2 = excluded.col_2 RETURNING target_table.col_1, target_table.col_2 DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) inserted_table ORDER BY col_1 DEBUG: push down of limit count: 5 DEBUG: generating subplan XXX_1 for subquery SELECT col_1, col_2, col_3 FROM on_conflict.source_table_1 LIMIT 5 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer)) foo +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) inserted_table ORDER BY col_1 DEBUG: Collecting INSERT ... SELECT results on coordinator col_1 | col_2 --------------------------------------------------------------------- @@ -150,13 +150,13 @@ WITH inserted_table AS ( ) SELECT * FROM inserted_table ORDER BY 1; DEBUG: generating subplan XXX_1 for CTE inserted_table: INSERT INTO on_conflict.target_table (col_1, col_2) SELECT col_1, col_2 FROM ((SELECT source_table_1.col_1, source_table_1.col_2, source_table_1.col_3 FROM on_conflict.source_table_1 LIMIT 5) UNION (SELECT source_table_2.col_1, source_table_2.col_2, source_table_2.col_3 FROM on_conflict.source_table_2 LIMIT 5)) foo ON CONFLICT(col_1) DO UPDATE SET col_2 = 0 RETURNING target_table.col_1, target_table.col_2 DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) inserted_table ORDER BY col_1 DEBUG: push down of limit count: 5 DEBUG: generating subplan XXX_1 for subquery SELECT col_1, col_2, col_3 FROM on_conflict.source_table_1 LIMIT 5 DEBUG: push down of limit count: 5 DEBUG: generating subplan XXX_2 for subquery SELECT col_1, col_2, col_3 FROM on_conflict.source_table_2 LIMIT 5 DEBUG: generating subplan XXX_3 for subquery SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer) UNION SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer)) foo +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) inserted_table ORDER BY col_1 DEBUG: Collecting INSERT ... SELECT results on coordinator col_1 | col_2 --------------------------------------------------------------------- @@ -246,10 +246,10 @@ WITH inserted_table AS ( ) SELECT * FROM inserted_table ORDER BY 1; DEBUG: generating subplan XXX_1 for CTE inserted_table: WITH cte AS (SELECT source_table_1.col_1, source_table_1.col_2, source_table_1.col_3 FROM on_conflict.source_table_1), cte_2 AS (SELECT cte.col_1, cte.col_2 FROM cte) INSERT INTO on_conflict.target_table (col_1, col_2) SELECT col_1, col_2 FROM cte_2 ON CONFLICT(col_1) DO UPDATE SET col_2 = (excluded.col_2 OPERATOR(pg_catalog.+) 1) RETURNING target_table.col_1, target_table.col_2 DEBUG: distributed INSERT ... SELECT can only select from distributed tables -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) inserted_table ORDER BY col_1 DEBUG: generating subplan XXX_1 for CTE cte: SELECT col_1, col_2, col_3 FROM on_conflict.source_table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer)) cte DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT cte_2.col_1, cte_2.col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) cte_2) citus_insert_select_subquery +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) inserted_table ORDER BY col_1 DEBUG: Collecting INSERT ... SELECT results on coordinator col_1 | col_2 --------------------------------------------------------------------- @@ -269,9 +269,9 @@ WITH cte AS ( UPDATE target_table SET col_2 = 4 WHERE col_1 IN (SELECT col_1 FROM cte); DEBUG: generating subplan XXX_1 for CTE cte: WITH basic AS (SELECT source_table_1.col_1, source_table_1.col_2 FROM on_conflict.source_table_1) INSERT INTO on_conflict.target_table (col_1, col_2) SELECT col_1, col_2 FROM basic ON CONFLICT DO NOTHING RETURNING target_table.col_1, target_table.col_2 DEBUG: distributed INSERT ... SELECT can only select from distributed tables -DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE on_conflict.target_table SET col_2 = 4 WHERE (col_1 OPERATOR(pg_catalog.=) ANY (SELECT cte.col_1 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) cte)) DEBUG: generating subplan XXX_1 for CTE basic: SELECT col_1, col_2 FROM on_conflict.source_table_1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT basic.col_1, basic.col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) basic) citus_insert_select_subquery +DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE on_conflict.target_table SET col_2 = 4 WHERE (col_1 OPERATOR(pg_catalog.=) ANY (SELECT cte.col_1 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) cte)) DEBUG: Collecting INSERT ... SELECT results on coordinator RESET client_min_messages; -- Following query is supported by using repartition join for the insert/select diff --git a/src/test/regress/expected/multi_insert_select_non_pushable_queries.out b/src/test/regress/expected/multi_insert_select_non_pushable_queries.out index a4de3c304..d6c8cc02f 100644 --- a/src/test/regress/expected/multi_insert_select_non_pushable_queries.out +++ b/src/test/regress/expected/multi_insert_select_non_pushable_queries.out @@ -684,7 +684,7 @@ $Q$); Custom Scan (Citus INSERT ... SELECT) INSERT/SELECT method: pull to coordinator -> HashAggregate - Group Key: remote_scan.user_id, remote_scan.event_type + Group Key: remote_scan.user_id, remote_scan.value_1_agg -> Custom Scan (Citus Adaptive) -> Distributed Subplan XXX_1 -> Custom Scan (Citus Adaptive) @@ -704,7 +704,7 @@ $Q$); Custom Scan (Citus INSERT ... SELECT) INSERT/SELECT method: pull to coordinator -> HashAggregate - Group Key: remote_scan.user_id, remote_scan.event_type + Group Key: remote_scan.user_id, remote_scan.value_1_agg -> Custom Scan (Citus Adaptive) -> Distributed Subplan XXX_1 -> Custom Scan (Citus Adaptive) @@ -725,7 +725,7 @@ $Q$); Custom Scan (Citus INSERT ... SELECT) INSERT/SELECT method: pull to coordinator -> HashAggregate - Group Key: remote_scan.user_id, remote_scan.event_type + Group Key: remote_scan.user_id, remote_scan.value_1_agg -> Custom Scan (Citus Adaptive) -> Distributed Subplan XXX_1 -> Custom Scan (Citus Adaptive) diff --git a/src/test/regress/sql/insert_select_repartition.sql b/src/test/regress/sql/insert_select_repartition.sql index 790bcbd73..0d66e2d77 100644 --- a/src/test/regress/sql/insert_select_repartition.sql +++ b/src/test/regress/sql/insert_select_repartition.sql @@ -317,6 +317,60 @@ RESET client_min_messages; SELECT a, count(*), count(distinct b) distinct_values FROM target_table GROUP BY a ORDER BY a; +DEALLOCATE insert_plan; + +-- +-- Prepared router INSERT/SELECT. We currently use pull to coordinator when the +-- distributed query has a single task. +-- +TRUNCATE target_table; + +PREPARE insert_plan(int) AS +INSERT INTO target_table + SELECT a, max(b) FROM source_table + WHERE a=$1 GROUP BY a; + +SET client_min_messages TO DEBUG1; +EXECUTE insert_plan(0); +EXECUTE insert_plan(0); +EXECUTE insert_plan(0); +EXECUTE insert_plan(0); +EXECUTE insert_plan(0); +EXECUTE insert_plan(0); +EXECUTE insert_plan(0); +RESET client_min_messages; + +SELECT a, count(*), count(distinct b) distinct_values FROM target_table GROUP BY a ORDER BY a; + +DEALLOCATE insert_plan; + +-- +-- Prepared INSERT/SELECT with no parameters. +-- + +TRUNCATE target_table; + +PREPARE insert_plan AS +INSERT INTO target_table + SELECT a, max(b) FROM source_table + WHERE a BETWEEN 1 AND 2 GROUP BY a; + +EXPLAIN EXECUTE insert_plan; + +SET client_min_messages TO DEBUG1; +EXECUTE insert_plan; +EXECUTE insert_plan; +EXECUTE insert_plan; +EXECUTE insert_plan; +EXECUTE insert_plan; +EXECUTE insert_plan; +EXECUTE insert_plan; +RESET client_min_messages; + +SELECT a, count(*), count(distinct b) distinct_values FROM target_table GROUP BY a ORDER BY a; + +DEALLOCATE insert_plan; + -- -- INSERT/SELECT in CTE -- @@ -565,6 +619,14 @@ DO UPDATE SET cardinality = enriched.cardinality + excluded.cardinality, sum = enriched.sum + excluded.sum; + +-- verify that we don't report repartitioned insert/select for tables +-- with sequences. See https://github.com/citusdata/citus/issues/3936 +create table table_with_sequences (x int, y int, z bigserial); +insert into table_with_sequences values (1,1); +select create_distributed_table('table_with_sequences','x'); +explain insert into table_with_sequences select y, x from table_with_sequences; + -- clean-up SET client_min_messages TO WARNING; DROP SCHEMA insert_select_repartition CASCADE;