From 4ed59d2db36449e360b42a01e8b1d3fe1c03b346 Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Thu, 25 Jun 2020 10:26:26 -0700 Subject: [PATCH] Move more from insert_select_executor to insert_select_planner --- .../executor/insert_select_executor.c | 292 +--------------- .../executor/multi_server_executor.c | 2 +- .../planner/function_call_delegation.c | 8 +- .../planner/insert_select_planner.c | 324 ++++++++++++++++-- .../distributed/utils/citus_copyfuncs.c | 1 + .../distributed/insert_select_executor.h | 1 - .../distributed/insert_select_planner.h | 1 + .../distributed/multi_physical_planner.h | 4 +- .../expected/insert_select_repartition.out | 2 +- src/test/regress/expected/multi_explain.out | 36 +- .../expected/multi_insert_conflict.out | 0 .../regress/expected/multi_insert_select.out | 18 +- .../expected/multi_insert_select_conflict.out | 8 +- ...lti_insert_select_non_pushable_queries.out | 14 +- 14 files changed, 354 insertions(+), 357 deletions(-) create mode 100644 src/test/regress/expected/multi_insert_conflict.out diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index e0e0d1d19..ac47f0109 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -53,11 +53,7 @@ /* Config variables managed via guc.c */ bool EnableRepartitionedInsertSelect = true; -/* depth of current insert/select executor. */ -static int insertSelectExecutorLevel = 0; - -static TupleTableSlot * NonPushableInsertSelectExecScanInternal(CustomScanState *node); static Query * WrapSubquery(Query *subquery); static List * TwoPhaseInsertSelectTaskList(Oid targetRelationId, Query *insertSelectQuery, char *resultIdPrefix); @@ -71,44 +67,12 @@ static HTAB * ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId, static List * BuildColumnNameListFromTargetList(Oid targetRelationId, List *insertTargetList); static int PartitionColumnIndexFromColumnList(Oid relationId, List *columnNameList); -static List * AddInsertSelectCasts(List *insertTargetList, List *selectTargetList, - Oid targetRelationId); static List * RedistributedInsertSelectTaskList(Query *insertSelectQuery, CitusTableCacheEntry *targetRelation, List **redistributedResults, bool useBinaryFormat); static int PartitionColumnIndex(List *insertTargetList, Var *partitionColumn); -static Expr * CastExpr(Expr *expr, Oid sourceType, Oid targetType, Oid targetCollation, - int targetTypeMod); static void WrapTaskListForProjection(List *taskList, List *projectedTargetEntries); -static void RelableTargetEntryList(List *selectTargetList, List *insertTargetList); - - -/* - * NonPushableInsertSelectExecScan is a wrapper around - * NonPushableInsertSelectExecScanInternal which also properly increments - * or decrements insertSelectExecutorLevel. - */ -TupleTableSlot * -NonPushableInsertSelectExecScan(CustomScanState *node) -{ - TupleTableSlot *result = NULL; - insertSelectExecutorLevel++; - - PG_TRY(); - { - result = NonPushableInsertSelectExecScanInternal(node); - } - PG_CATCH(); - { - insertSelectExecutorLevel--; - PG_RE_THROW(); - } - PG_END_TRY(); - - insertSelectExecutorLevel--; - return result; -} /* @@ -116,15 +80,14 @@ NonPushableInsertSelectExecScan(CustomScanState *node) * SELECT .. query either by routing via coordinator or by repartitioning * task results and moving data directly between nodes. */ -static TupleTableSlot * -NonPushableInsertSelectExecScanInternal(CustomScanState *node) +TupleTableSlot * +NonPushableInsertSelectExecScan(CustomScanState *node) { CitusScanState *scanState = (CitusScanState *) node; if (!scanState->finishedRemoteScan) { EState *executorState = ScanStateGetExecutorState(scanState); - ParamListInfo paramListInfo = executorState->es_param_list_info; DistributedPlan *distributedPlan = scanState->distributedPlan; Query *insertSelectQuery = copyObject(distributedPlan->insertSelectQuery); List *insertTargetList = insertSelectQuery->targetList; @@ -136,35 +99,7 @@ NonPushableInsertSelectExecScanInternal(CustomScanState *node) HTAB *shardStateHash = NULL; Query *selectQuery = selectRte->subquery; - - /* - * Cast types of insert target list and select projection list to - * match the column types of the target relation. - */ - selectQuery->targetList = - AddInsertSelectCasts(insertSelectQuery->targetList, - selectQuery->targetList, - targetRelationId); - - /* - * Later we might need to call WrapTaskListForProjection(), which requires - * that select target list has unique names, otherwise the outer query - * cannot select columns unambiguously. So we relabel select columns to - * match target columns. - */ - RelableTargetEntryList(selectQuery->targetList, insertTargetList); - - /* - * Make a copy of the query, since pg_plan_query may scribble on it and we - * want it to be replanned every time if it is stored in a prepared - * statement. - */ - selectQuery = copyObject(selectQuery); - - /* plan the subquery, this may be another distributed query */ - int cursorOptions = CURSOR_OPT_PARALLEL_OK; - PlannedStmt *selectPlan = pg_plan_query(selectQuery, cursorOptions, - paramListInfo); + PlannedStmt *selectPlan = copyObject(distributedPlan->selectPlanForInsertSelect); /* * If we are dealing with partitioned table, we also need to lock its @@ -671,207 +606,6 @@ PartitionColumnIndexFromColumnList(Oid relationId, List *columnNameList) } -/* ExecutingInsertSelect returns true if we are executing an INSERT ...SELECT query */ -bool -ExecutingInsertSelect(void) -{ - return insertSelectExecutorLevel > 0; -} - - -/* - * AddInsertSelectCasts makes sure that the types in columns in the given - * target lists have the same type as the columns of the given relation. - * It might add casts to ensure that. - * - * It returns the updated selectTargetList. - */ -static List * -AddInsertSelectCasts(List *insertTargetList, List *selectTargetList, - Oid targetRelationId) -{ - ListCell *insertEntryCell = NULL; - ListCell *selectEntryCell = NULL; - List *projectedEntries = NIL; - List *nonProjectedEntries = NIL; - - /* - * ReorderInsertSelectTargetLists() makes sure that first few columns of - * the SELECT query match the insert targets. It might contain additional - * items for GROUP BY, etc. - */ - Assert(list_length(insertTargetList) <= list_length(selectTargetList)); - - Relation distributedRelation = heap_open(targetRelationId, RowExclusiveLock); - TupleDesc destTupleDescriptor = RelationGetDescr(distributedRelation); - - int targetEntryIndex = 0; - forboth(insertEntryCell, insertTargetList, selectEntryCell, selectTargetList) - { - TargetEntry *insertEntry = (TargetEntry *) lfirst(insertEntryCell); - TargetEntry *selectEntry = (TargetEntry *) lfirst(selectEntryCell); - Var *insertColumn = (Var *) insertEntry->expr; - Form_pg_attribute attr = TupleDescAttr(destTupleDescriptor, - insertEntry->resno - 1); - - Oid sourceType = insertColumn->vartype; - Oid targetType = attr->atttypid; - if (sourceType != targetType) - { - insertEntry->expr = CastExpr((Expr *) insertColumn, sourceType, targetType, - attr->attcollation, attr->atttypmod); - - /* - * We cannot modify the selectEntry in-place, because ORDER BY or - * GROUP BY clauses might be pointing to it with comparison types - * of the source type. So instead we keep the original one as a - * non-projected entry, so GROUP BY and ORDER BY are happy, and - * create a duplicated projected entry with the coerced expression. - */ - TargetEntry *coercedEntry = copyObject(selectEntry); - coercedEntry->expr = CastExpr((Expr *) selectEntry->expr, sourceType, - targetType, attr->attcollation, - attr->atttypmod); - coercedEntry->ressortgroupref = 0; - - /* - * The only requirement is that users don't use this name in ORDER BY - * or GROUP BY, and it should be unique across the same query. - */ - StringInfo resnameString = makeStringInfo(); - appendStringInfo(resnameString, "auto_coerced_by_citus_%d", targetEntryIndex); - coercedEntry->resname = resnameString->data; - - projectedEntries = lappend(projectedEntries, coercedEntry); - - if (selectEntry->ressortgroupref != 0) - { - selectEntry->resjunk = true; - - /* - * This entry might still end up in the SELECT output list, so - * rename it to avoid ambiguity. - * - * See https://github.com/citusdata/citus/pull/3470. - */ - resnameString = makeStringInfo(); - appendStringInfo(resnameString, "discarded_target_item_%d", - targetEntryIndex); - selectEntry->resname = resnameString->data; - - nonProjectedEntries = lappend(nonProjectedEntries, selectEntry); - } - } - else - { - projectedEntries = lappend(projectedEntries, selectEntry); - } - - targetEntryIndex++; - } - - for (int entryIndex = list_length(insertTargetList); - entryIndex < list_length(selectTargetList); - entryIndex++) - { - nonProjectedEntries = lappend(nonProjectedEntries, list_nth(selectTargetList, - entryIndex)); - } - - /* selectEntry->resno must be the ordinal number of the entry */ - selectTargetList = list_concat(projectedEntries, nonProjectedEntries); - int entryResNo = 1; - TargetEntry *selectTargetEntry = NULL; - foreach_ptr(selectTargetEntry, selectTargetList) - { - selectTargetEntry->resno = entryResNo++; - } - - heap_close(distributedRelation, NoLock); - - return selectTargetList; -} - - -/* - * CastExpr returns an expression which casts the given expr from sourceType to - * the given targetType. - */ -static Expr * -CastExpr(Expr *expr, Oid sourceType, Oid targetType, Oid targetCollation, - int targetTypeMod) -{ - Oid coercionFuncId = InvalidOid; - CoercionPathType coercionType = find_coercion_pathway(targetType, sourceType, - COERCION_EXPLICIT, - &coercionFuncId); - - if (coercionType == COERCION_PATH_FUNC) - { - FuncExpr *coerceExpr = makeNode(FuncExpr); - coerceExpr->funcid = coercionFuncId; - coerceExpr->args = list_make1(copyObject(expr)); - coerceExpr->funccollid = targetCollation; - coerceExpr->funcresulttype = targetType; - - return (Expr *) coerceExpr; - } - else if (coercionType == COERCION_PATH_RELABELTYPE) - { - RelabelType *coerceExpr = makeNode(RelabelType); - coerceExpr->arg = copyObject(expr); - coerceExpr->resulttype = targetType; - coerceExpr->resulttypmod = targetTypeMod; - coerceExpr->resultcollid = targetCollation; - coerceExpr->relabelformat = COERCE_IMPLICIT_CAST; - coerceExpr->location = -1; - - return (Expr *) coerceExpr; - } - else if (coercionType == COERCION_PATH_ARRAYCOERCE) - { - Oid sourceBaseType = get_base_element_type(sourceType); - Oid targetBaseType = get_base_element_type(targetType); - - CaseTestExpr *elemExpr = makeNode(CaseTestExpr); - elemExpr->collation = targetCollation; - elemExpr->typeId = sourceBaseType; - elemExpr->typeMod = -1; - - Expr *elemCastExpr = CastExpr((Expr *) elemExpr, sourceBaseType, - targetBaseType, targetCollation, - targetTypeMod); - - ArrayCoerceExpr *coerceExpr = makeNode(ArrayCoerceExpr); - coerceExpr->arg = copyObject(expr); - coerceExpr->elemexpr = elemCastExpr; - coerceExpr->resultcollid = targetCollation; - coerceExpr->resulttype = targetType; - coerceExpr->resulttypmod = targetTypeMod; - coerceExpr->location = -1; - coerceExpr->coerceformat = COERCE_IMPLICIT_CAST; - - return (Expr *) coerceExpr; - } - else if (coercionType == COERCION_PATH_COERCEVIAIO) - { - CoerceViaIO *coerceExpr = makeNode(CoerceViaIO); - coerceExpr->arg = (Expr *) copyObject(expr); - coerceExpr->resulttype = targetType; - coerceExpr->resultcollid = targetCollation; - coerceExpr->coerceformat = COERCE_IMPLICIT_CAST; - coerceExpr->location = -1; - - return (Expr *) coerceExpr; - } - else - { - ereport(ERROR, (errmsg("could not find a conversion path from type %d to %d", - sourceType, targetType))); - } -} - - /* * IsSupportedRedistributionTarget determines whether re-partitioning into the * given target relation is supported. @@ -1102,23 +836,3 @@ WrapTaskListForProjection(List *taskList, List *projectedTargetEntries) SetTaskQueryString(task, wrappedQuery->data); } } - - -/* - * RelableTargetEntryList relabels select target list to have matching names with - * insert target list. - */ -static void -RelableTargetEntryList(List *selectTargetList, List *insertTargetList) -{ - ListCell *selectTargetCell = NULL; - ListCell *insertTargetCell = NULL; - - forboth(selectTargetCell, selectTargetList, insertTargetCell, insertTargetList) - { - TargetEntry *selectTargetEntry = lfirst(selectTargetCell); - TargetEntry *insertTargetEntry = lfirst(insertTargetCell); - - selectTargetEntry->resname = insertTargetEntry->resname; - } -} diff --git a/src/backend/distributed/executor/multi_server_executor.c b/src/backend/distributed/executor/multi_server_executor.c index fec24d191..a13c39ca3 100644 --- a/src/backend/distributed/executor/multi_server_executor.c +++ b/src/backend/distributed/executor/multi_server_executor.c @@ -84,7 +84,7 @@ JobExecutorType(DistributedPlan *distributedPlan) { /* * Even if adaptiveExecutorEnabled, we go through - * MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT because + * MULTI_EXECUTOR_NON_PUSHABLE_INSERT_SELECT because * the executor already knows how to handle adaptive * executor when necessary. */ diff --git a/src/backend/distributed/planner/function_call_delegation.c b/src/backend/distributed/planner/function_call_delegation.c index 6c5601ead..caa4034b9 100644 --- a/src/backend/distributed/planner/function_call_delegation.c +++ b/src/backend/distributed/planner/function_call_delegation.c @@ -26,7 +26,6 @@ #include "distributed/deparse_shard_query.h" #include "distributed/function_call_delegation.h" #include "distributed/insert_select_planner.h" -#include "distributed/insert_select_executor.h" #include "distributed/metadata_utility.h" #include "distributed/coordinator_protocol.h" #include "distributed/metadata_cache.h" @@ -230,11 +229,10 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext) } /* - * This can be called while executing INSERT ... SELECT func(). insert_select_executor - * doesn't get the planned subquery and gets the actual struct Query, so the planning - * for these kinds of queries happens at the execution time. + * Cannot delegate functions for INSERT ... SELECT func(), since they require + * coordinated transactions. */ - if (ExecutingInsertSelect()) + if (PlanningInsertSelect()) { ereport(DEBUG1, (errmsg("not pushing down function calls in INSERT ... SELECT"))); return NULL; diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index c7b9f720d..22f882aeb 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -49,10 +49,16 @@ #include "parser/parse_coerce.h" #include "parser/parse_relation.h" #include "tcop/tcopprot.h" +#include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/rel.h" +static DistributedPlan * CreateInsertSelectPlanInternal(uint64 planId, + Query *originalQuery, + PlannerRestrictionContext * + plannerRestrictionContext, + ParamListInfo boundParams); static DistributedPlan * CreateDistributedInsertSelectPlan(Query *originalQuery, PlannerRestrictionContext * plannerRestrictionContext); @@ -81,8 +87,15 @@ static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query, static DistributedPlan * CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo boundParams); static DeferredErrorMessage * NonPushableInsertSelectSupported(Query *insertSelectQuery); -static InsertSelectMethod GetInsertSelectMethod(Query *selectQuery, Oid targetRelationId, - ParamListInfo boundParams); +static void RelabelTargetEntryList(List *selectTargetList, List *insertTargetList); +static List * AddInsertSelectCasts(List *insertTargetList, List *selectTargetList, + Oid targetRelationId); +static Expr * CastExpr(Expr *expr, Oid sourceType, Oid targetType, Oid targetCollation, + int targetTypeMod); + + +/* depth of current insert/select planner. */ +static int insertSelectPlannerLevel = 0; /* @@ -182,15 +195,45 @@ CheckInsertSelectQuery(Query *query) /* - * CreateInsertSelectPlan tries to create a distributed plan for an - * INSERT INTO distributed_table SELECT ... query by push down the - * command to the workers and if that is not possible it creates a - * plan for evaluating the SELECT on the coordinator. + * CoordinatorInsertSelectExecScan is a wrapper around + * CoordinatorInsertSelectExecScanInternal which also properly increments + * or decrements insertSelectExecutorLevel. */ DistributedPlan * CreateInsertSelectPlan(uint64 planId, Query *originalQuery, PlannerRestrictionContext *plannerRestrictionContext, ParamListInfo boundParams) +{ + DistributedPlan *result = NULL; + insertSelectPlannerLevel++; + + PG_TRY(); + { + result = CreateInsertSelectPlanInternal(planId, originalQuery, + plannerRestrictionContext, boundParams); + } + PG_CATCH(); + { + insertSelectPlannerLevel--; + PG_RE_THROW(); + } + PG_END_TRY(); + + insertSelectPlannerLevel--; + return result; +} + + +/* + * CreateInsertSelectPlan tries to create a distributed plan for an + * INSERT INTO distributed_table SELECT ... query by push down the + * command to the workers and if that is not possible it creates a + * plan for evaluating the SELECT on the coordinator. + */ +static DistributedPlan * +CreateInsertSelectPlanInternal(uint64 planId, Query *originalQuery, + PlannerRestrictionContext *plannerRestrictionContext, + ParamListInfo boundParams) { DeferredErrorMessage *deferredError = ErrorIfOnConflictNotSupported(originalQuery); if (deferredError != NULL) @@ -1319,10 +1362,43 @@ CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo bou selectRte->subquery = selectQuery; ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte); + /* + * Cast types of insert target list and select projection list to + * match the column types of the target relation. + */ + selectQuery->targetList = + AddInsertSelectCasts(insertSelectQuery->targetList, + selectQuery->targetList, + targetRelationId); + + /* + * Later we might need to call WrapTaskListForProjection(), which requires + * that select target list has unique names, otherwise the outer query + * cannot select columns unambiguously. So we relabel select columns to + * match target columns. + */ + List *insertTargetList = insertSelectQuery->targetList; + RelabelTargetEntryList(selectQuery->targetList, insertTargetList); + + /* + * Make a copy of the select query, since following code scribbles it + * but we need to keep the original for EXPLAIN. + */ + Query *selectQueryCopy = copyObject(selectQuery); + + /* plan the subquery, this may be another distributed query */ + int cursorOptions = CURSOR_OPT_PARALLEL_OK; + PlannedStmt *selectPlan = pg_plan_query(selectQueryCopy, cursorOptions, + boundParams); + + bool repartitioned = IsRedistributablePlan(selectPlan->planTree) && + IsSupportedRedistributionTarget(targetRelationId); + distributedPlan->insertSelectQuery = insertSelectQuery; - distributedPlan->insertSelectMethod = GetInsertSelectMethod(selectQuery, - targetRelationId, - boundParams); + distributedPlan->selectPlanForInsertSelect = selectPlan; + distributedPlan->insertSelectMethod = repartitioned ? + INSERT_SELECT_REPARTITION : + INSERT_SELECT_VIA_COORDINATOR; distributedPlan->expectResults = insertSelectQuery->returningList != NIL; distributedPlan->intermediateResultIdPrefix = InsertSelectResultIdPrefix(planId); distributedPlan->targetRelationId = targetRelationId; @@ -1376,33 +1452,221 @@ InsertSelectResultIdPrefix(uint64 planId) /* - * GetInsertSelectMethod returns the preferred INSERT INTO ... SELECT method - * based on its select query. + * RelabelTargetEntryList relabels select target list to have matching names with + * insert target list. */ -static InsertSelectMethod -GetInsertSelectMethod(Query *selectQuery, Oid targetRelationId, ParamListInfo boundParams) +static void +RelabelTargetEntryList(List *selectTargetList, List *insertTargetList) { - Query *selectQueryCopy = copyObject(selectQuery); + ListCell *selectTargetCell = NULL; + ListCell *insertTargetCell = NULL; + + forboth(selectTargetCell, selectTargetList, insertTargetCell, insertTargetList) + { + TargetEntry *selectTargetEntry = lfirst(selectTargetCell); + TargetEntry *insertTargetEntry = lfirst(insertTargetCell); + + selectTargetEntry->resname = insertTargetEntry->resname; + } +} + + +/* + * AddInsertSelectCasts makes sure that the types in columns in the given + * target lists have the same type as the columns of the given relation. + * It might add casts to ensure that. + * + * It returns the updated selectTargetList. + */ +static List * +AddInsertSelectCasts(List *insertTargetList, List *selectTargetList, + Oid targetRelationId) +{ + ListCell *insertEntryCell = NULL; + ListCell *selectEntryCell = NULL; + List *projectedEntries = NIL; + List *nonProjectedEntries = NIL; /* - * Query will be replanned in insert_select_executor to plan correctly - * for prepared statements. So turn off logging here to avoid repeated - * log messages. We use SET LOCAL here so the change is reverted on ERROR. + * ReorderInsertSelectTargetLists() makes sure that first few columns of + * the SELECT query match the insert targets. It might contain additional + * items for GROUP BY, etc. */ - int savedClientMinMessages = client_min_messages; - set_config_option("client_min_messages", "ERROR", - PGC_USERSET, PGC_S_SESSION, - GUC_ACTION_LOCAL, true, 0, false); + Assert(list_length(insertTargetList) <= list_length(selectTargetList)); - int cursorOptions = CURSOR_OPT_PARALLEL_OK; - PlannedStmt *selectPlan = pg_plan_query(selectQueryCopy, cursorOptions, - boundParams); + Relation distributedRelation = heap_open(targetRelationId, RowExclusiveLock); + TupleDesc destTupleDescriptor = RelationGetDescr(distributedRelation); - client_min_messages = savedClientMinMessages; + int targetEntryIndex = 0; + forboth(insertEntryCell, insertTargetList, selectEntryCell, selectTargetList) + { + TargetEntry *insertEntry = (TargetEntry *) lfirst(insertEntryCell); + TargetEntry *selectEntry = (TargetEntry *) lfirst(selectEntryCell); + Var *insertColumn = (Var *) insertEntry->expr; + Form_pg_attribute attr = TupleDescAttr(destTupleDescriptor, + insertEntry->resno - 1); - bool repartitioned = IsRedistributablePlan(selectPlan->planTree) && - IsSupportedRedistributionTarget(targetRelationId); - return repartitioned ? - INSERT_SELECT_REPARTITION : - INSERT_SELECT_VIA_COORDINATOR; + Oid sourceType = insertColumn->vartype; + Oid targetType = attr->atttypid; + if (sourceType != targetType) + { + insertEntry->expr = CastExpr((Expr *) insertColumn, sourceType, targetType, + attr->attcollation, attr->atttypmod); + + /* + * We cannot modify the selectEntry in-place, because ORDER BY or + * GROUP BY clauses might be pointing to it with comparison types + * of the source type. So instead we keep the original one as a + * non-projected entry, so GROUP BY and ORDER BY are happy, and + * create a duplicated projected entry with the coerced expression. + */ + TargetEntry *coercedEntry = copyObject(selectEntry); + coercedEntry->expr = CastExpr((Expr *) selectEntry->expr, sourceType, + targetType, attr->attcollation, + attr->atttypmod); + coercedEntry->ressortgroupref = 0; + + /* + * The only requirement is that users don't use this name in ORDER BY + * or GROUP BY, and it should be unique across the same query. + */ + StringInfo resnameString = makeStringInfo(); + appendStringInfo(resnameString, "auto_coerced_by_citus_%d", targetEntryIndex); + coercedEntry->resname = resnameString->data; + + projectedEntries = lappend(projectedEntries, coercedEntry); + + if (selectEntry->ressortgroupref != 0) + { + selectEntry->resjunk = true; + + /* + * This entry might still end up in the SELECT output list, so + * rename it to avoid ambiguity. + * + * See https://github.com/citusdata/citus/pull/3470. + */ + resnameString = makeStringInfo(); + appendStringInfo(resnameString, "discarded_target_item_%d", + targetEntryIndex); + selectEntry->resname = resnameString->data; + + nonProjectedEntries = lappend(nonProjectedEntries, selectEntry); + } + } + else + { + projectedEntries = lappend(projectedEntries, selectEntry); + } + + targetEntryIndex++; + } + + for (int entryIndex = list_length(insertTargetList); + entryIndex < list_length(selectTargetList); + entryIndex++) + { + nonProjectedEntries = lappend(nonProjectedEntries, list_nth(selectTargetList, + entryIndex)); + } + + /* selectEntry->resno must be the ordinal number of the entry */ + selectTargetList = list_concat(projectedEntries, nonProjectedEntries); + int entryResNo = 1; + TargetEntry *selectTargetEntry = NULL; + foreach_ptr(selectTargetEntry, selectTargetList) + { + selectTargetEntry->resno = entryResNo++; + } + + heap_close(distributedRelation, NoLock); + + return selectTargetList; +} + + +/* + * CastExpr returns an expression which casts the given expr from sourceType to + * the given targetType. + */ +static Expr * +CastExpr(Expr *expr, Oid sourceType, Oid targetType, Oid targetCollation, + int targetTypeMod) +{ + Oid coercionFuncId = InvalidOid; + CoercionPathType coercionType = find_coercion_pathway(targetType, sourceType, + COERCION_EXPLICIT, + &coercionFuncId); + + if (coercionType == COERCION_PATH_FUNC) + { + FuncExpr *coerceExpr = makeNode(FuncExpr); + coerceExpr->funcid = coercionFuncId; + coerceExpr->args = list_make1(copyObject(expr)); + coerceExpr->funccollid = targetCollation; + coerceExpr->funcresulttype = targetType; + + return (Expr *) coerceExpr; + } + else if (coercionType == COERCION_PATH_RELABELTYPE) + { + RelabelType *coerceExpr = makeNode(RelabelType); + coerceExpr->arg = copyObject(expr); + coerceExpr->resulttype = targetType; + coerceExpr->resulttypmod = targetTypeMod; + coerceExpr->resultcollid = targetCollation; + coerceExpr->relabelformat = COERCE_IMPLICIT_CAST; + coerceExpr->location = -1; + + return (Expr *) coerceExpr; + } + else if (coercionType == COERCION_PATH_ARRAYCOERCE) + { + Oid sourceBaseType = get_base_element_type(sourceType); + Oid targetBaseType = get_base_element_type(targetType); + + CaseTestExpr *elemExpr = makeNode(CaseTestExpr); + elemExpr->collation = targetCollation; + elemExpr->typeId = sourceBaseType; + elemExpr->typeMod = -1; + + Expr *elemCastExpr = CastExpr((Expr *) elemExpr, sourceBaseType, + targetBaseType, targetCollation, + targetTypeMod); + + ArrayCoerceExpr *coerceExpr = makeNode(ArrayCoerceExpr); + coerceExpr->arg = copyObject(expr); + coerceExpr->elemexpr = elemCastExpr; + coerceExpr->resultcollid = targetCollation; + coerceExpr->resulttype = targetType; + coerceExpr->resulttypmod = targetTypeMod; + coerceExpr->location = -1; + coerceExpr->coerceformat = COERCE_IMPLICIT_CAST; + + return (Expr *) coerceExpr; + } + else if (coercionType == COERCION_PATH_COERCEVIAIO) + { + CoerceViaIO *coerceExpr = makeNode(CoerceViaIO); + coerceExpr->arg = (Expr *) copyObject(expr); + coerceExpr->resulttype = targetType; + coerceExpr->resultcollid = targetCollation; + coerceExpr->coerceformat = COERCE_IMPLICIT_CAST; + coerceExpr->location = -1; + + return (Expr *) coerceExpr; + } + else + { + ereport(ERROR, (errmsg("could not find a conversion path from type %d to %d", + sourceType, targetType))); + } +} + + +/* PlanningInsertSelect returns true if we are planning an INSERT ...SELECT query */ +bool +PlanningInsertSelect(void) +{ + return insertSelectPlannerLevel > 0; } diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index 0e95f3587..d3f64e792 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -129,6 +129,7 @@ CopyNodeDistributedPlan(COPYFUNC_ARGS) COPY_NODE_FIELD(relationIdList); COPY_SCALAR_FIELD(targetRelationId); COPY_NODE_FIELD(insertSelectQuery); + COPY_NODE_FIELD(selectPlanForInsertSelect); COPY_SCALAR_FIELD(insertSelectMethod); COPY_STRING_FIELD(intermediateResultIdPrefix); diff --git a/src/include/distributed/insert_select_executor.h b/src/include/distributed/insert_select_executor.h index b8151712d..bcfe29bfb 100644 --- a/src/include/distributed/insert_select_executor.h +++ b/src/include/distributed/insert_select_executor.h @@ -19,7 +19,6 @@ extern bool EnableRepartitionedInsertSelect; extern TupleTableSlot * NonPushableInsertSelectExecScan(CustomScanState *node); -extern bool ExecutingInsertSelect(void); extern Query * BuildSelectForInsertSelect(Query *insertSelectQuery); extern bool IsSupportedRedistributionTarget(Oid targetRelationId); extern bool IsRedistributablePlan(Plan *selectPlan); diff --git a/src/include/distributed/insert_select_planner.h b/src/include/distributed/insert_select_planner.h index bdbcbd16b..74b8a0708 100644 --- a/src/include/distributed/insert_select_planner.h +++ b/src/include/distributed/insert_select_planner.h @@ -43,6 +43,7 @@ extern DistributedPlan * CreateInsertSelectIntoLocalTablePlan(uint64 planId, PlannerRestrictionContext * plannerRestrictionContext); extern char * InsertSelectResultIdPrefix(uint64 planId); +extern bool PlanningInsertSelect(void); #endif /* INSERT_SELECT_PLANNER_H */ diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 87381f531..2d6d9ef8f 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -432,8 +432,10 @@ typedef struct DistributedPlan /* target relation of a modification */ Oid targetRelationId; - /* INSERT .. SELECT via the coordinator or repartition */ + /* + * INSERT .. SELECT via the coordinator or repartition */ Query *insertSelectQuery; + PlannedStmt *selectPlanForInsertSelect; InsertSelectMethod insertSelectMethod; /* diff --git a/src/test/regress/expected/insert_select_repartition.out b/src/test/regress/expected/insert_select_repartition.out index 34bf0b457..5c4af0bc5 100644 --- a/src/test/regress/expected/insert_select_repartition.out +++ b/src/test/regress/expected/insert_select_repartition.out @@ -763,10 +763,10 @@ DEBUG: INSERT target table and the source relation of the SELECT partition colu DEBUG: only SELECT, UPDATE, or DELETE common table expressions may be router planned DEBUG: generating subplan XXX_1 for CTE r: INSERT INTO insert_select_repartition.target_table (a, b) SELECT a, b FROM insert_select_repartition.source_table RETURNING target_table.a, target_table.b DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT +DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT a, max AS b FROM (SELECT source_table.a, max(source_table.b) AS max FROM (insert_select_repartition.source_table JOIN (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) r USING (a, b)) GROUP BY source_table.a) citus_insert_select_subquery DEBUG: Router planner cannot handle multi-shard select queries DEBUG: performing repartitioned INSERT ... SELECT -DEBUG: Router planner cannot handle multi-shard select queries DEBUG: performing repartitioned INSERT ... SELECT DEBUG: partitioning SELECT query by column index 0 with name 'a' DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213610 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213606_to_0,repartitioned_results_xxxxx_from_4213607_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) RETURNING citus_table_alias.a, citus_table_alias.b diff --git a/src/test/regress/expected/multi_explain.out b/src/test/regress/expected/multi_explain.out index ad1a18401..1612446c7 100644 --- a/src/test/regress/expected/multi_explain.out +++ b/src/test/regress/expected/multi_explain.out @@ -1357,31 +1357,33 @@ WITH cte1 AS (SELECT * FROM cte1 WHERE EXISTS (SELECT * FROM cte1) LIMIT 5) SELECT s FROM cte1 WHERE EXISTS (SELECT * FROM cte1); Custom Scan (Citus INSERT ... SELECT) INSERT/SELECT method: pull to coordinator - -> Result - One-Time Filter: $3 + -> Subquery Scan on citus_insert_select_subquery CTE cte1 -> Function Scan on generate_series s - CTE cte1 - -> Limit - InitPlan 2 (returns $1) - -> CTE Scan on cte1 cte1_1 - -> Result - One-Time Filter: $1 - -> CTE Scan on cte1 cte1_2 - InitPlan 4 (returns $3) - -> CTE Scan on cte1 cte1_3 - -> CTE Scan on cte1 + -> Result + One-Time Filter: $3 + CTE cte1 + -> Limit + InitPlan 2 (returns $1) + -> CTE Scan on cte1 cte1_1 + -> Result + One-Time Filter: $1 + -> CTE Scan on cte1 cte1_2 + InitPlan 4 (returns $3) + -> CTE Scan on cte1 cte1_3 + -> CTE Scan on cte1 EXPLAIN (COSTS OFF) INSERT INTO lineitem_hash_part ( SELECT s FROM generate_series(1,5) s) UNION ( SELECT s FROM generate_series(5,10) s); Custom Scan (Citus INSERT ... SELECT) INSERT/SELECT method: pull to coordinator - -> HashAggregate - Group Key: s.s - -> Append - -> Function Scan on generate_series s - -> Function Scan on generate_series s_1 + -> Subquery Scan on citus_insert_select_subquery + -> HashAggregate + Group Key: s.s + -> Append + -> Function Scan on generate_series s + -> Function Scan on generate_series s_1 -- explain with recursive planning -- prevent PG 11 - PG 12 outputs to diverge SET citus.enable_cte_inlining TO false; diff --git a/src/test/regress/expected/multi_insert_conflict.out b/src/test/regress/expected/multi_insert_conflict.out new file mode 100644 index 000000000..e69de29bb diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index 766aaf47a..d50595468 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -710,6 +710,14 @@ INSERT INTO agg_events raw_events_first; DEBUG: CTE sub_cte is going to be inlined via distributed planning DEBUG: Subqueries without relations are not allowed in distributed INSERT ... SELECT queries +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: Subqueries without relations are not allowed in distributed INSERT ... SELECT queries +DEBUG: CTE sub_cte is going to be inlined via distributed planning +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: generating subplan XXX_1 for CTE sub_cte: SELECT 1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT user_id, (SELECT sub_cte."?column?" FROM (SELECT intermediate_result."?column?" FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result("?column?" integer)) sub_cte) AS value_1_agg FROM public.raw_events_first +DEBUG: Router planner cannot handle multi-shard select queries ERROR: could not run distributed query with subquery outside the FROM, WHERE and HAVING clauses HINT: Consider using an equality filter on the distributed table's partition column. -- We support set operations via the coordinator @@ -1044,7 +1052,7 @@ $Q$); Custom Scan (Citus INSERT ... SELECT) INSERT/SELECT method: pull to coordinator -> HashAggregate - Group Key: remote_scan.id + Group Key: remote_scan.user_id -> Custom Scan (Citus Adaptive) -> Distributed Subplan XXX_1 -> Custom Scan (Citus Adaptive) @@ -1109,7 +1117,7 @@ $Q$); Custom Scan (Citus INSERT ... SELECT) INSERT/SELECT method: pull to coordinator -> HashAggregate - Group Key: remote_scan.id + Group Key: remote_scan.user_id -> Custom Scan (Citus Adaptive) -> Distributed Subplan XXX_1 -> Custom Scan (Citus Adaptive) @@ -1408,7 +1416,7 @@ $Q$); Custom Scan (Citus INSERT ... SELECT) INSERT/SELECT method: pull to coordinator -> HashAggregate - Group Key: remote_scan.id + Group Key: remote_scan.user_id -> Custom Scan (Citus Adaptive) -> Distributed Subplan XXX_1 -> Custom Scan (Citus Adaptive) @@ -1453,7 +1461,7 @@ $Q$); Custom Scan (Citus INSERT ... SELECT) INSERT/SELECT method: pull to coordinator -> HashAggregate - Group Key: remote_scan.id + Group Key: remote_scan.user_id -> Custom Scan (Citus Adaptive) -> Distributed Subplan XXX_1 -> HashAggregate @@ -1501,7 +1509,7 @@ $Q$); Custom Scan (Citus INSERT ... SELECT) INSERT/SELECT method: pull to coordinator -> HashAggregate - Group Key: remote_scan.id + Group Key: remote_scan.user_id -> Custom Scan (Citus Adaptive) -> Distributed Subplan XXX_1 -> Custom Scan (Citus Adaptive) diff --git a/src/test/regress/expected/multi_insert_select_conflict.out b/src/test/regress/expected/multi_insert_select_conflict.out index d069cf7d3..f9145b0a6 100644 --- a/src/test/regress/expected/multi_insert_select_conflict.out +++ b/src/test/regress/expected/multi_insert_select_conflict.out @@ -114,10 +114,10 @@ WITH inserted_table AS ( ) SELECT * FROM inserted_table ORDER BY 1; DEBUG: generating subplan XXX_1 for CTE inserted_table: INSERT INTO on_conflict.target_table (col_1, col_2) SELECT col_1, col_2 FROM (SELECT source_table_1.col_1, source_table_1.col_2, source_table_1.col_3 FROM on_conflict.source_table_1 LIMIT 5) foo ON CONFLICT(col_1) DO UPDATE SET col_2 = excluded.col_2 RETURNING target_table.col_1, target_table.col_2 DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) inserted_table ORDER BY col_1 DEBUG: push down of limit count: 5 DEBUG: generating subplan XXX_1 for subquery SELECT col_1, col_2, col_3 FROM on_conflict.source_table_1 LIMIT 5 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer)) foo +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) inserted_table ORDER BY col_1 DEBUG: Collecting INSERT ... SELECT results on coordinator col_1 | col_2 --------------------------------------------------------------------- @@ -150,13 +150,13 @@ WITH inserted_table AS ( ) SELECT * FROM inserted_table ORDER BY 1; DEBUG: generating subplan XXX_1 for CTE inserted_table: INSERT INTO on_conflict.target_table (col_1, col_2) SELECT col_1, col_2 FROM ((SELECT source_table_1.col_1, source_table_1.col_2, source_table_1.col_3 FROM on_conflict.source_table_1 LIMIT 5) UNION (SELECT source_table_2.col_1, source_table_2.col_2, source_table_2.col_3 FROM on_conflict.source_table_2 LIMIT 5)) foo ON CONFLICT(col_1) DO UPDATE SET col_2 = 0 RETURNING target_table.col_1, target_table.col_2 DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) inserted_table ORDER BY col_1 DEBUG: push down of limit count: 5 DEBUG: generating subplan XXX_1 for subquery SELECT col_1, col_2, col_3 FROM on_conflict.source_table_1 LIMIT 5 DEBUG: push down of limit count: 5 DEBUG: generating subplan XXX_2 for subquery SELECT col_1, col_2, col_3 FROM on_conflict.source_table_2 LIMIT 5 DEBUG: generating subplan XXX_3 for subquery SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer) UNION SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer)) foo +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) inserted_table ORDER BY col_1 DEBUG: Collecting INSERT ... SELECT results on coordinator col_1 | col_2 --------------------------------------------------------------------- @@ -246,10 +246,10 @@ WITH inserted_table AS ( ) SELECT * FROM inserted_table ORDER BY 1; DEBUG: generating subplan XXX_1 for CTE inserted_table: WITH cte AS (SELECT source_table_1.col_1, source_table_1.col_2, source_table_1.col_3 FROM on_conflict.source_table_1), cte_2 AS (SELECT cte.col_1, cte.col_2 FROM cte) INSERT INTO on_conflict.target_table (col_1, col_2) SELECT col_1, col_2 FROM cte_2 ON CONFLICT(col_1) DO UPDATE SET col_2 = (excluded.col_2 OPERATOR(pg_catalog.+) 1) RETURNING target_table.col_1, target_table.col_2 DEBUG: distributed INSERT ... SELECT can only select from distributed tables -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) inserted_table ORDER BY col_1 DEBUG: generating subplan XXX_1 for CTE cte: SELECT col_1, col_2, col_3 FROM on_conflict.source_table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer)) cte DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT cte_2.col_1, cte_2.col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) cte_2) citus_insert_select_subquery +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) inserted_table ORDER BY col_1 DEBUG: Collecting INSERT ... SELECT results on coordinator col_1 | col_2 --------------------------------------------------------------------- @@ -269,9 +269,9 @@ WITH cte AS ( UPDATE target_table SET col_2 = 4 WHERE col_1 IN (SELECT col_1 FROM cte); DEBUG: generating subplan XXX_1 for CTE cte: WITH basic AS (SELECT source_table_1.col_1, source_table_1.col_2 FROM on_conflict.source_table_1) INSERT INTO on_conflict.target_table (col_1, col_2) SELECT col_1, col_2 FROM basic ON CONFLICT DO NOTHING RETURNING target_table.col_1, target_table.col_2 DEBUG: distributed INSERT ... SELECT can only select from distributed tables -DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE on_conflict.target_table SET col_2 = 4 WHERE (col_1 OPERATOR(pg_catalog.=) ANY (SELECT cte.col_1 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) cte)) DEBUG: generating subplan XXX_1 for CTE basic: SELECT col_1, col_2 FROM on_conflict.source_table_1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT basic.col_1, basic.col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) basic) citus_insert_select_subquery +DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE on_conflict.target_table SET col_2 = 4 WHERE (col_1 OPERATOR(pg_catalog.=) ANY (SELECT cte.col_1 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) cte)) DEBUG: Collecting INSERT ... SELECT results on coordinator RESET client_min_messages; -- Following query is supported by using repartition join for the insert/select diff --git a/src/test/regress/expected/multi_insert_select_non_pushable_queries.out b/src/test/regress/expected/multi_insert_select_non_pushable_queries.out index e84991da9..d6c8cc02f 100644 --- a/src/test/regress/expected/multi_insert_select_non_pushable_queries.out +++ b/src/test/regress/expected/multi_insert_select_non_pushable_queries.out @@ -147,6 +147,10 @@ FROM ( GROUP BY t1.user_id, hasdone_event ) t GROUP BY user_id, hasdone_event; DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries +DEBUG: generating subplan XXX_1 for subquery SELECT u.user_id, 'step=>1'::text AS event, e."time" FROM public.users_table u, public.events_table e WHERE ((u.user_id OPERATOR(pg_catalog.=) e.user_id) AND (u.user_id OPERATOR(pg_catalog.>=) 10) AND (u.user_id OPERATOR(pg_catalog.<=) 25) AND (e.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[100, 101, 102]))) +DEBUG: generating subplan XXX_2 for subquery SELECT u.user_id, 'step=>2'::text AS event, e."time" FROM public.users_table u, public.events_table e WHERE ((u.user_id OPERATOR(pg_catalog.=) e.user_id) AND (u.user_id OPERATOR(pg_catalog.>=) 10) AND (u.user_id OPERATOR(pg_catalog.<=) 25) AND (e.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[103, 104, 105]))) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT intermediate_result.user_id, intermediate_result.event, intermediate_result."time" FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event text, "time" timestamp without time zone) UNION SELECT intermediate_result.user_id, intermediate_result.event, intermediate_result."time" FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event text, "time" timestamp without time zone) +DEBUG: generating subplan XXX_1 for subquery SELECT u.user_id, 'step=>1'::text AS event, e."time" FROM public.users_table u, public.events_table e WHERE ((u.user_id OPERATOR(pg_catalog.=) e.user_id) AND (u.user_id OPERATOR(pg_catalog.>=) 10) AND (u.user_id OPERATOR(pg_catalog.<=) 25) AND (e.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[100, 101, 102]))) UNION SELECT u.user_id, 'step=>2'::text AS event, e."time" FROM public.users_table u, public.events_table e WHERE ((u.user_id OPERATOR(pg_catalog.=) e.user_id) AND (u.user_id OPERATOR(pg_catalog.>=) 10) AND (u.user_id OPERATOR(pg_catalog.<=) 25) AND (e.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[103, 104, 105]))) ERROR: cannot pushdown the subquery DETAIL: Complex subqueries and CTEs cannot be in the outer part of the outer join RESET client_min_messages; @@ -293,6 +297,10 @@ GROUP BY ORDER BY count_pay; DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries +DEBUG: generating subplan XXX_1 for subquery SELECT users_table.user_id, 'action=>1'::text AS event, events_table."time" FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (users_table.user_id OPERATOR(pg_catalog.>=) 10) AND (users_table.user_id OPERATOR(pg_catalog.<=) 70) AND (events_table.event_type OPERATOR(pg_catalog.>) 10) AND (events_table.event_type OPERATOR(pg_catalog.<) 12)) +DEBUG: generating subplan XXX_2 for subquery SELECT users_table.user_id, 'action=>2'::text AS event, events_table."time" FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (users_table.user_id OPERATOR(pg_catalog.>=) 10) AND (users_table.user_id OPERATOR(pg_catalog.<=) 70) AND (events_table.event_type OPERATOR(pg_catalog.>) 12) AND (events_table.event_type OPERATOR(pg_catalog.<) 14)) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT intermediate_result.user_id, intermediate_result.event, intermediate_result."time" FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event text, "time" timestamp without time zone) UNION SELECT intermediate_result.user_id, intermediate_result.event, intermediate_result."time" FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event text, "time" timestamp without time zone) +DEBUG: generating subplan XXX_1 for subquery SELECT users_table.user_id, 'action=>1'::text AS event, events_table."time" FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (users_table.user_id OPERATOR(pg_catalog.>=) 10) AND (users_table.user_id OPERATOR(pg_catalog.<=) 70) AND (events_table.event_type OPERATOR(pg_catalog.>) 10) AND (events_table.event_type OPERATOR(pg_catalog.<) 12)) UNION SELECT users_table.user_id, 'action=>2'::text AS event, events_table."time" FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (users_table.user_id OPERATOR(pg_catalog.>=) 10) AND (users_table.user_id OPERATOR(pg_catalog.<=) 70) AND (events_table.event_type OPERATOR(pg_catalog.>) 12) AND (events_table.event_type OPERATOR(pg_catalog.<) 14)) ERROR: cannot pushdown the subquery DETAIL: Complex subqueries and CTEs cannot be in the outer part of the outer join RESET client_min_messages; @@ -676,7 +684,7 @@ $Q$); Custom Scan (Citus INSERT ... SELECT) INSERT/SELECT method: pull to coordinator -> HashAggregate - Group Key: remote_scan.user_id, remote_scan.event_type + Group Key: remote_scan.user_id, remote_scan.value_1_agg -> Custom Scan (Citus Adaptive) -> Distributed Subplan XXX_1 -> Custom Scan (Citus Adaptive) @@ -696,7 +704,7 @@ $Q$); Custom Scan (Citus INSERT ... SELECT) INSERT/SELECT method: pull to coordinator -> HashAggregate - Group Key: remote_scan.user_id, remote_scan.event_type + Group Key: remote_scan.user_id, remote_scan.value_1_agg -> Custom Scan (Citus Adaptive) -> Distributed Subplan XXX_1 -> Custom Scan (Citus Adaptive) @@ -717,7 +725,7 @@ $Q$); Custom Scan (Citus INSERT ... SELECT) INSERT/SELECT method: pull to coordinator -> HashAggregate - Group Key: remote_scan.user_id, remote_scan.event_type + Group Key: remote_scan.user_id, remote_scan.value_1_agg -> Custom Scan (Citus Adaptive) -> Distributed Subplan XXX_1 -> Custom Scan (Citus Adaptive)