mirror of https://github.com/citusdata/citus.git
Move more from insert_select_executor to insert_select_planner
parent
d34c21890f
commit
4ed59d2db3
|
@ -53,11 +53,7 @@
|
|||
/* Config variables managed via guc.c */
|
||||
bool EnableRepartitionedInsertSelect = true;
|
||||
|
||||
/* depth of current insert/select executor. */
|
||||
static int insertSelectExecutorLevel = 0;
|
||||
|
||||
|
||||
static TupleTableSlot * NonPushableInsertSelectExecScanInternal(CustomScanState *node);
|
||||
static Query * WrapSubquery(Query *subquery);
|
||||
static List * TwoPhaseInsertSelectTaskList(Oid targetRelationId, Query *insertSelectQuery,
|
||||
char *resultIdPrefix);
|
||||
|
@ -71,44 +67,12 @@ static HTAB * ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId,
|
|||
static List * BuildColumnNameListFromTargetList(Oid targetRelationId,
|
||||
List *insertTargetList);
|
||||
static int PartitionColumnIndexFromColumnList(Oid relationId, List *columnNameList);
|
||||
static List * AddInsertSelectCasts(List *insertTargetList, List *selectTargetList,
|
||||
Oid targetRelationId);
|
||||
static List * RedistributedInsertSelectTaskList(Query *insertSelectQuery,
|
||||
CitusTableCacheEntry *targetRelation,
|
||||
List **redistributedResults,
|
||||
bool useBinaryFormat);
|
||||
static int PartitionColumnIndex(List *insertTargetList, Var *partitionColumn);
|
||||
static Expr * CastExpr(Expr *expr, Oid sourceType, Oid targetType, Oid targetCollation,
|
||||
int targetTypeMod);
|
||||
static void WrapTaskListForProjection(List *taskList, List *projectedTargetEntries);
|
||||
static void RelableTargetEntryList(List *selectTargetList, List *insertTargetList);
|
||||
|
||||
|
||||
/*
|
||||
* NonPushableInsertSelectExecScan is a wrapper around
|
||||
* NonPushableInsertSelectExecScanInternal which also properly increments
|
||||
* or decrements insertSelectExecutorLevel.
|
||||
*/
|
||||
TupleTableSlot *
|
||||
NonPushableInsertSelectExecScan(CustomScanState *node)
|
||||
{
|
||||
TupleTableSlot *result = NULL;
|
||||
insertSelectExecutorLevel++;
|
||||
|
||||
PG_TRY();
|
||||
{
|
||||
result = NonPushableInsertSelectExecScanInternal(node);
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
insertSelectExecutorLevel--;
|
||||
PG_RE_THROW();
|
||||
}
|
||||
PG_END_TRY();
|
||||
|
||||
insertSelectExecutorLevel--;
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
|
@ -116,15 +80,14 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
|
|||
* SELECT .. query either by routing via coordinator or by repartitioning
|
||||
* task results and moving data directly between nodes.
|
||||
*/
|
||||
static TupleTableSlot *
|
||||
NonPushableInsertSelectExecScanInternal(CustomScanState *node)
|
||||
TupleTableSlot *
|
||||
NonPushableInsertSelectExecScan(CustomScanState *node)
|
||||
{
|
||||
CitusScanState *scanState = (CitusScanState *) node;
|
||||
|
||||
if (!scanState->finishedRemoteScan)
|
||||
{
|
||||
EState *executorState = ScanStateGetExecutorState(scanState);
|
||||
ParamListInfo paramListInfo = executorState->es_param_list_info;
|
||||
DistributedPlan *distributedPlan = scanState->distributedPlan;
|
||||
Query *insertSelectQuery = copyObject(distributedPlan->insertSelectQuery);
|
||||
List *insertTargetList = insertSelectQuery->targetList;
|
||||
|
@ -136,35 +99,7 @@ NonPushableInsertSelectExecScanInternal(CustomScanState *node)
|
|||
HTAB *shardStateHash = NULL;
|
||||
|
||||
Query *selectQuery = selectRte->subquery;
|
||||
|
||||
/*
|
||||
* Cast types of insert target list and select projection list to
|
||||
* match the column types of the target relation.
|
||||
*/
|
||||
selectQuery->targetList =
|
||||
AddInsertSelectCasts(insertSelectQuery->targetList,
|
||||
selectQuery->targetList,
|
||||
targetRelationId);
|
||||
|
||||
/*
|
||||
* Later we might need to call WrapTaskListForProjection(), which requires
|
||||
* that select target list has unique names, otherwise the outer query
|
||||
* cannot select columns unambiguously. So we relabel select columns to
|
||||
* match target columns.
|
||||
*/
|
||||
RelableTargetEntryList(selectQuery->targetList, insertTargetList);
|
||||
|
||||
/*
|
||||
* Make a copy of the query, since pg_plan_query may scribble on it and we
|
||||
* want it to be replanned every time if it is stored in a prepared
|
||||
* statement.
|
||||
*/
|
||||
selectQuery = copyObject(selectQuery);
|
||||
|
||||
/* plan the subquery, this may be another distributed query */
|
||||
int cursorOptions = CURSOR_OPT_PARALLEL_OK;
|
||||
PlannedStmt *selectPlan = pg_plan_query(selectQuery, cursorOptions,
|
||||
paramListInfo);
|
||||
PlannedStmt *selectPlan = copyObject(distributedPlan->selectPlanForInsertSelect);
|
||||
|
||||
/*
|
||||
* If we are dealing with partitioned table, we also need to lock its
|
||||
|
@ -671,207 +606,6 @@ PartitionColumnIndexFromColumnList(Oid relationId, List *columnNameList)
|
|||
}
|
||||
|
||||
|
||||
/* ExecutingInsertSelect returns true if we are executing an INSERT ...SELECT query */
|
||||
bool
|
||||
ExecutingInsertSelect(void)
|
||||
{
|
||||
return insertSelectExecutorLevel > 0;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* AddInsertSelectCasts makes sure that the types in columns in the given
|
||||
* target lists have the same type as the columns of the given relation.
|
||||
* It might add casts to ensure that.
|
||||
*
|
||||
* It returns the updated selectTargetList.
|
||||
*/
|
||||
static List *
|
||||
AddInsertSelectCasts(List *insertTargetList, List *selectTargetList,
|
||||
Oid targetRelationId)
|
||||
{
|
||||
ListCell *insertEntryCell = NULL;
|
||||
ListCell *selectEntryCell = NULL;
|
||||
List *projectedEntries = NIL;
|
||||
List *nonProjectedEntries = NIL;
|
||||
|
||||
/*
|
||||
* ReorderInsertSelectTargetLists() makes sure that first few columns of
|
||||
* the SELECT query match the insert targets. It might contain additional
|
||||
* items for GROUP BY, etc.
|
||||
*/
|
||||
Assert(list_length(insertTargetList) <= list_length(selectTargetList));
|
||||
|
||||
Relation distributedRelation = heap_open(targetRelationId, RowExclusiveLock);
|
||||
TupleDesc destTupleDescriptor = RelationGetDescr(distributedRelation);
|
||||
|
||||
int targetEntryIndex = 0;
|
||||
forboth(insertEntryCell, insertTargetList, selectEntryCell, selectTargetList)
|
||||
{
|
||||
TargetEntry *insertEntry = (TargetEntry *) lfirst(insertEntryCell);
|
||||
TargetEntry *selectEntry = (TargetEntry *) lfirst(selectEntryCell);
|
||||
Var *insertColumn = (Var *) insertEntry->expr;
|
||||
Form_pg_attribute attr = TupleDescAttr(destTupleDescriptor,
|
||||
insertEntry->resno - 1);
|
||||
|
||||
Oid sourceType = insertColumn->vartype;
|
||||
Oid targetType = attr->atttypid;
|
||||
if (sourceType != targetType)
|
||||
{
|
||||
insertEntry->expr = CastExpr((Expr *) insertColumn, sourceType, targetType,
|
||||
attr->attcollation, attr->atttypmod);
|
||||
|
||||
/*
|
||||
* We cannot modify the selectEntry in-place, because ORDER BY or
|
||||
* GROUP BY clauses might be pointing to it with comparison types
|
||||
* of the source type. So instead we keep the original one as a
|
||||
* non-projected entry, so GROUP BY and ORDER BY are happy, and
|
||||
* create a duplicated projected entry with the coerced expression.
|
||||
*/
|
||||
TargetEntry *coercedEntry = copyObject(selectEntry);
|
||||
coercedEntry->expr = CastExpr((Expr *) selectEntry->expr, sourceType,
|
||||
targetType, attr->attcollation,
|
||||
attr->atttypmod);
|
||||
coercedEntry->ressortgroupref = 0;
|
||||
|
||||
/*
|
||||
* The only requirement is that users don't use this name in ORDER BY
|
||||
* or GROUP BY, and it should be unique across the same query.
|
||||
*/
|
||||
StringInfo resnameString = makeStringInfo();
|
||||
appendStringInfo(resnameString, "auto_coerced_by_citus_%d", targetEntryIndex);
|
||||
coercedEntry->resname = resnameString->data;
|
||||
|
||||
projectedEntries = lappend(projectedEntries, coercedEntry);
|
||||
|
||||
if (selectEntry->ressortgroupref != 0)
|
||||
{
|
||||
selectEntry->resjunk = true;
|
||||
|
||||
/*
|
||||
* This entry might still end up in the SELECT output list, so
|
||||
* rename it to avoid ambiguity.
|
||||
*
|
||||
* See https://github.com/citusdata/citus/pull/3470.
|
||||
*/
|
||||
resnameString = makeStringInfo();
|
||||
appendStringInfo(resnameString, "discarded_target_item_%d",
|
||||
targetEntryIndex);
|
||||
selectEntry->resname = resnameString->data;
|
||||
|
||||
nonProjectedEntries = lappend(nonProjectedEntries, selectEntry);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
projectedEntries = lappend(projectedEntries, selectEntry);
|
||||
}
|
||||
|
||||
targetEntryIndex++;
|
||||
}
|
||||
|
||||
for (int entryIndex = list_length(insertTargetList);
|
||||
entryIndex < list_length(selectTargetList);
|
||||
entryIndex++)
|
||||
{
|
||||
nonProjectedEntries = lappend(nonProjectedEntries, list_nth(selectTargetList,
|
||||
entryIndex));
|
||||
}
|
||||
|
||||
/* selectEntry->resno must be the ordinal number of the entry */
|
||||
selectTargetList = list_concat(projectedEntries, nonProjectedEntries);
|
||||
int entryResNo = 1;
|
||||
TargetEntry *selectTargetEntry = NULL;
|
||||
foreach_ptr(selectTargetEntry, selectTargetList)
|
||||
{
|
||||
selectTargetEntry->resno = entryResNo++;
|
||||
}
|
||||
|
||||
heap_close(distributedRelation, NoLock);
|
||||
|
||||
return selectTargetList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CastExpr returns an expression which casts the given expr from sourceType to
|
||||
* the given targetType.
|
||||
*/
|
||||
static Expr *
|
||||
CastExpr(Expr *expr, Oid sourceType, Oid targetType, Oid targetCollation,
|
||||
int targetTypeMod)
|
||||
{
|
||||
Oid coercionFuncId = InvalidOid;
|
||||
CoercionPathType coercionType = find_coercion_pathway(targetType, sourceType,
|
||||
COERCION_EXPLICIT,
|
||||
&coercionFuncId);
|
||||
|
||||
if (coercionType == COERCION_PATH_FUNC)
|
||||
{
|
||||
FuncExpr *coerceExpr = makeNode(FuncExpr);
|
||||
coerceExpr->funcid = coercionFuncId;
|
||||
coerceExpr->args = list_make1(copyObject(expr));
|
||||
coerceExpr->funccollid = targetCollation;
|
||||
coerceExpr->funcresulttype = targetType;
|
||||
|
||||
return (Expr *) coerceExpr;
|
||||
}
|
||||
else if (coercionType == COERCION_PATH_RELABELTYPE)
|
||||
{
|
||||
RelabelType *coerceExpr = makeNode(RelabelType);
|
||||
coerceExpr->arg = copyObject(expr);
|
||||
coerceExpr->resulttype = targetType;
|
||||
coerceExpr->resulttypmod = targetTypeMod;
|
||||
coerceExpr->resultcollid = targetCollation;
|
||||
coerceExpr->relabelformat = COERCE_IMPLICIT_CAST;
|
||||
coerceExpr->location = -1;
|
||||
|
||||
return (Expr *) coerceExpr;
|
||||
}
|
||||
else if (coercionType == COERCION_PATH_ARRAYCOERCE)
|
||||
{
|
||||
Oid sourceBaseType = get_base_element_type(sourceType);
|
||||
Oid targetBaseType = get_base_element_type(targetType);
|
||||
|
||||
CaseTestExpr *elemExpr = makeNode(CaseTestExpr);
|
||||
elemExpr->collation = targetCollation;
|
||||
elemExpr->typeId = sourceBaseType;
|
||||
elemExpr->typeMod = -1;
|
||||
|
||||
Expr *elemCastExpr = CastExpr((Expr *) elemExpr, sourceBaseType,
|
||||
targetBaseType, targetCollation,
|
||||
targetTypeMod);
|
||||
|
||||
ArrayCoerceExpr *coerceExpr = makeNode(ArrayCoerceExpr);
|
||||
coerceExpr->arg = copyObject(expr);
|
||||
coerceExpr->elemexpr = elemCastExpr;
|
||||
coerceExpr->resultcollid = targetCollation;
|
||||
coerceExpr->resulttype = targetType;
|
||||
coerceExpr->resulttypmod = targetTypeMod;
|
||||
coerceExpr->location = -1;
|
||||
coerceExpr->coerceformat = COERCE_IMPLICIT_CAST;
|
||||
|
||||
return (Expr *) coerceExpr;
|
||||
}
|
||||
else if (coercionType == COERCION_PATH_COERCEVIAIO)
|
||||
{
|
||||
CoerceViaIO *coerceExpr = makeNode(CoerceViaIO);
|
||||
coerceExpr->arg = (Expr *) copyObject(expr);
|
||||
coerceExpr->resulttype = targetType;
|
||||
coerceExpr->resultcollid = targetCollation;
|
||||
coerceExpr->coerceformat = COERCE_IMPLICIT_CAST;
|
||||
coerceExpr->location = -1;
|
||||
|
||||
return (Expr *) coerceExpr;
|
||||
}
|
||||
else
|
||||
{
|
||||
ereport(ERROR, (errmsg("could not find a conversion path from type %d to %d",
|
||||
sourceType, targetType)));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsSupportedRedistributionTarget determines whether re-partitioning into the
|
||||
* given target relation is supported.
|
||||
|
@ -1102,23 +836,3 @@ WrapTaskListForProjection(List *taskList, List *projectedTargetEntries)
|
|||
SetTaskQueryString(task, wrappedQuery->data);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* RelableTargetEntryList relabels select target list to have matching names with
|
||||
* insert target list.
|
||||
*/
|
||||
static void
|
||||
RelableTargetEntryList(List *selectTargetList, List *insertTargetList)
|
||||
{
|
||||
ListCell *selectTargetCell = NULL;
|
||||
ListCell *insertTargetCell = NULL;
|
||||
|
||||
forboth(selectTargetCell, selectTargetList, insertTargetCell, insertTargetList)
|
||||
{
|
||||
TargetEntry *selectTargetEntry = lfirst(selectTargetCell);
|
||||
TargetEntry *insertTargetEntry = lfirst(insertTargetCell);
|
||||
|
||||
selectTargetEntry->resname = insertTargetEntry->resname;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -84,7 +84,7 @@ JobExecutorType(DistributedPlan *distributedPlan)
|
|||
{
|
||||
/*
|
||||
* Even if adaptiveExecutorEnabled, we go through
|
||||
* MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT because
|
||||
* MULTI_EXECUTOR_NON_PUSHABLE_INSERT_SELECT because
|
||||
* the executor already knows how to handle adaptive
|
||||
* executor when necessary.
|
||||
*/
|
||||
|
|
|
@ -26,7 +26,6 @@
|
|||
#include "distributed/deparse_shard_query.h"
|
||||
#include "distributed/function_call_delegation.h"
|
||||
#include "distributed/insert_select_planner.h"
|
||||
#include "distributed/insert_select_executor.h"
|
||||
#include "distributed/metadata_utility.h"
|
||||
#include "distributed/coordinator_protocol.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
|
@ -230,11 +229,10 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext)
|
|||
}
|
||||
|
||||
/*
|
||||
* This can be called while executing INSERT ... SELECT func(). insert_select_executor
|
||||
* doesn't get the planned subquery and gets the actual struct Query, so the planning
|
||||
* for these kinds of queries happens at the execution time.
|
||||
* Cannot delegate functions for INSERT ... SELECT func(), since they require
|
||||
* coordinated transactions.
|
||||
*/
|
||||
if (ExecutingInsertSelect())
|
||||
if (PlanningInsertSelect())
|
||||
{
|
||||
ereport(DEBUG1, (errmsg("not pushing down function calls in INSERT ... SELECT")));
|
||||
return NULL;
|
||||
|
|
|
@ -49,10 +49,16 @@
|
|||
#include "parser/parse_coerce.h"
|
||||
#include "parser/parse_relation.h"
|
||||
#include "tcop/tcopprot.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/lsyscache.h"
|
||||
#include "utils/rel.h"
|
||||
|
||||
|
||||
static DistributedPlan * CreateInsertSelectPlanInternal(uint64 planId,
|
||||
Query *originalQuery,
|
||||
PlannerRestrictionContext *
|
||||
plannerRestrictionContext,
|
||||
ParamListInfo boundParams);
|
||||
static DistributedPlan * CreateDistributedInsertSelectPlan(Query *originalQuery,
|
||||
PlannerRestrictionContext *
|
||||
plannerRestrictionContext);
|
||||
|
@ -81,8 +87,15 @@ static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query,
|
|||
static DistributedPlan * CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse,
|
||||
ParamListInfo boundParams);
|
||||
static DeferredErrorMessage * NonPushableInsertSelectSupported(Query *insertSelectQuery);
|
||||
static InsertSelectMethod GetInsertSelectMethod(Query *selectQuery, Oid targetRelationId,
|
||||
ParamListInfo boundParams);
|
||||
static void RelabelTargetEntryList(List *selectTargetList, List *insertTargetList);
|
||||
static List * AddInsertSelectCasts(List *insertTargetList, List *selectTargetList,
|
||||
Oid targetRelationId);
|
||||
static Expr * CastExpr(Expr *expr, Oid sourceType, Oid targetType, Oid targetCollation,
|
||||
int targetTypeMod);
|
||||
|
||||
|
||||
/* depth of current insert/select planner. */
|
||||
static int insertSelectPlannerLevel = 0;
|
||||
|
||||
|
||||
/*
|
||||
|
@ -182,15 +195,45 @@ CheckInsertSelectQuery(Query *query)
|
|||
|
||||
|
||||
/*
|
||||
* CreateInsertSelectPlan tries to create a distributed plan for an
|
||||
* INSERT INTO distributed_table SELECT ... query by push down the
|
||||
* command to the workers and if that is not possible it creates a
|
||||
* plan for evaluating the SELECT on the coordinator.
|
||||
* CoordinatorInsertSelectExecScan is a wrapper around
|
||||
* CoordinatorInsertSelectExecScanInternal which also properly increments
|
||||
* or decrements insertSelectExecutorLevel.
|
||||
*/
|
||||
DistributedPlan *
|
||||
CreateInsertSelectPlan(uint64 planId, Query *originalQuery,
|
||||
PlannerRestrictionContext *plannerRestrictionContext,
|
||||
ParamListInfo boundParams)
|
||||
{
|
||||
DistributedPlan *result = NULL;
|
||||
insertSelectPlannerLevel++;
|
||||
|
||||
PG_TRY();
|
||||
{
|
||||
result = CreateInsertSelectPlanInternal(planId, originalQuery,
|
||||
plannerRestrictionContext, boundParams);
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
insertSelectPlannerLevel--;
|
||||
PG_RE_THROW();
|
||||
}
|
||||
PG_END_TRY();
|
||||
|
||||
insertSelectPlannerLevel--;
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateInsertSelectPlan tries to create a distributed plan for an
|
||||
* INSERT INTO distributed_table SELECT ... query by push down the
|
||||
* command to the workers and if that is not possible it creates a
|
||||
* plan for evaluating the SELECT on the coordinator.
|
||||
*/
|
||||
static DistributedPlan *
|
||||
CreateInsertSelectPlanInternal(uint64 planId, Query *originalQuery,
|
||||
PlannerRestrictionContext *plannerRestrictionContext,
|
||||
ParamListInfo boundParams)
|
||||
{
|
||||
DeferredErrorMessage *deferredError = ErrorIfOnConflictNotSupported(originalQuery);
|
||||
if (deferredError != NULL)
|
||||
|
@ -1319,10 +1362,43 @@ CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo bou
|
|||
selectRte->subquery = selectQuery;
|
||||
ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte);
|
||||
|
||||
/*
|
||||
* Cast types of insert target list and select projection list to
|
||||
* match the column types of the target relation.
|
||||
*/
|
||||
selectQuery->targetList =
|
||||
AddInsertSelectCasts(insertSelectQuery->targetList,
|
||||
selectQuery->targetList,
|
||||
targetRelationId);
|
||||
|
||||
/*
|
||||
* Later we might need to call WrapTaskListForProjection(), which requires
|
||||
* that select target list has unique names, otherwise the outer query
|
||||
* cannot select columns unambiguously. So we relabel select columns to
|
||||
* match target columns.
|
||||
*/
|
||||
List *insertTargetList = insertSelectQuery->targetList;
|
||||
RelabelTargetEntryList(selectQuery->targetList, insertTargetList);
|
||||
|
||||
/*
|
||||
* Make a copy of the select query, since following code scribbles it
|
||||
* but we need to keep the original for EXPLAIN.
|
||||
*/
|
||||
Query *selectQueryCopy = copyObject(selectQuery);
|
||||
|
||||
/* plan the subquery, this may be another distributed query */
|
||||
int cursorOptions = CURSOR_OPT_PARALLEL_OK;
|
||||
PlannedStmt *selectPlan = pg_plan_query(selectQueryCopy, cursorOptions,
|
||||
boundParams);
|
||||
|
||||
bool repartitioned = IsRedistributablePlan(selectPlan->planTree) &&
|
||||
IsSupportedRedistributionTarget(targetRelationId);
|
||||
|
||||
distributedPlan->insertSelectQuery = insertSelectQuery;
|
||||
distributedPlan->insertSelectMethod = GetInsertSelectMethod(selectQuery,
|
||||
targetRelationId,
|
||||
boundParams);
|
||||
distributedPlan->selectPlanForInsertSelect = selectPlan;
|
||||
distributedPlan->insertSelectMethod = repartitioned ?
|
||||
INSERT_SELECT_REPARTITION :
|
||||
INSERT_SELECT_VIA_COORDINATOR;
|
||||
distributedPlan->expectResults = insertSelectQuery->returningList != NIL;
|
||||
distributedPlan->intermediateResultIdPrefix = InsertSelectResultIdPrefix(planId);
|
||||
distributedPlan->targetRelationId = targetRelationId;
|
||||
|
@ -1376,33 +1452,221 @@ InsertSelectResultIdPrefix(uint64 planId)
|
|||
|
||||
|
||||
/*
|
||||
* GetInsertSelectMethod returns the preferred INSERT INTO ... SELECT method
|
||||
* based on its select query.
|
||||
* RelabelTargetEntryList relabels select target list to have matching names with
|
||||
* insert target list.
|
||||
*/
|
||||
static InsertSelectMethod
|
||||
GetInsertSelectMethod(Query *selectQuery, Oid targetRelationId, ParamListInfo boundParams)
|
||||
static void
|
||||
RelabelTargetEntryList(List *selectTargetList, List *insertTargetList)
|
||||
{
|
||||
Query *selectQueryCopy = copyObject(selectQuery);
|
||||
ListCell *selectTargetCell = NULL;
|
||||
ListCell *insertTargetCell = NULL;
|
||||
|
||||
forboth(selectTargetCell, selectTargetList, insertTargetCell, insertTargetList)
|
||||
{
|
||||
TargetEntry *selectTargetEntry = lfirst(selectTargetCell);
|
||||
TargetEntry *insertTargetEntry = lfirst(insertTargetCell);
|
||||
|
||||
selectTargetEntry->resname = insertTargetEntry->resname;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* AddInsertSelectCasts makes sure that the types in columns in the given
|
||||
* target lists have the same type as the columns of the given relation.
|
||||
* It might add casts to ensure that.
|
||||
*
|
||||
* It returns the updated selectTargetList.
|
||||
*/
|
||||
static List *
|
||||
AddInsertSelectCasts(List *insertTargetList, List *selectTargetList,
|
||||
Oid targetRelationId)
|
||||
{
|
||||
ListCell *insertEntryCell = NULL;
|
||||
ListCell *selectEntryCell = NULL;
|
||||
List *projectedEntries = NIL;
|
||||
List *nonProjectedEntries = NIL;
|
||||
|
||||
/*
|
||||
* Query will be replanned in insert_select_executor to plan correctly
|
||||
* for prepared statements. So turn off logging here to avoid repeated
|
||||
* log messages. We use SET LOCAL here so the change is reverted on ERROR.
|
||||
* ReorderInsertSelectTargetLists() makes sure that first few columns of
|
||||
* the SELECT query match the insert targets. It might contain additional
|
||||
* items for GROUP BY, etc.
|
||||
*/
|
||||
int savedClientMinMessages = client_min_messages;
|
||||
set_config_option("client_min_messages", "ERROR",
|
||||
PGC_USERSET, PGC_S_SESSION,
|
||||
GUC_ACTION_LOCAL, true, 0, false);
|
||||
Assert(list_length(insertTargetList) <= list_length(selectTargetList));
|
||||
|
||||
int cursorOptions = CURSOR_OPT_PARALLEL_OK;
|
||||
PlannedStmt *selectPlan = pg_plan_query(selectQueryCopy, cursorOptions,
|
||||
boundParams);
|
||||
Relation distributedRelation = heap_open(targetRelationId, RowExclusiveLock);
|
||||
TupleDesc destTupleDescriptor = RelationGetDescr(distributedRelation);
|
||||
|
||||
client_min_messages = savedClientMinMessages;
|
||||
int targetEntryIndex = 0;
|
||||
forboth(insertEntryCell, insertTargetList, selectEntryCell, selectTargetList)
|
||||
{
|
||||
TargetEntry *insertEntry = (TargetEntry *) lfirst(insertEntryCell);
|
||||
TargetEntry *selectEntry = (TargetEntry *) lfirst(selectEntryCell);
|
||||
Var *insertColumn = (Var *) insertEntry->expr;
|
||||
Form_pg_attribute attr = TupleDescAttr(destTupleDescriptor,
|
||||
insertEntry->resno - 1);
|
||||
|
||||
bool repartitioned = IsRedistributablePlan(selectPlan->planTree) &&
|
||||
IsSupportedRedistributionTarget(targetRelationId);
|
||||
return repartitioned ?
|
||||
INSERT_SELECT_REPARTITION :
|
||||
INSERT_SELECT_VIA_COORDINATOR;
|
||||
Oid sourceType = insertColumn->vartype;
|
||||
Oid targetType = attr->atttypid;
|
||||
if (sourceType != targetType)
|
||||
{
|
||||
insertEntry->expr = CastExpr((Expr *) insertColumn, sourceType, targetType,
|
||||
attr->attcollation, attr->atttypmod);
|
||||
|
||||
/*
|
||||
* We cannot modify the selectEntry in-place, because ORDER BY or
|
||||
* GROUP BY clauses might be pointing to it with comparison types
|
||||
* of the source type. So instead we keep the original one as a
|
||||
* non-projected entry, so GROUP BY and ORDER BY are happy, and
|
||||
* create a duplicated projected entry with the coerced expression.
|
||||
*/
|
||||
TargetEntry *coercedEntry = copyObject(selectEntry);
|
||||
coercedEntry->expr = CastExpr((Expr *) selectEntry->expr, sourceType,
|
||||
targetType, attr->attcollation,
|
||||
attr->atttypmod);
|
||||
coercedEntry->ressortgroupref = 0;
|
||||
|
||||
/*
|
||||
* The only requirement is that users don't use this name in ORDER BY
|
||||
* or GROUP BY, and it should be unique across the same query.
|
||||
*/
|
||||
StringInfo resnameString = makeStringInfo();
|
||||
appendStringInfo(resnameString, "auto_coerced_by_citus_%d", targetEntryIndex);
|
||||
coercedEntry->resname = resnameString->data;
|
||||
|
||||
projectedEntries = lappend(projectedEntries, coercedEntry);
|
||||
|
||||
if (selectEntry->ressortgroupref != 0)
|
||||
{
|
||||
selectEntry->resjunk = true;
|
||||
|
||||
/*
|
||||
* This entry might still end up in the SELECT output list, so
|
||||
* rename it to avoid ambiguity.
|
||||
*
|
||||
* See https://github.com/citusdata/citus/pull/3470.
|
||||
*/
|
||||
resnameString = makeStringInfo();
|
||||
appendStringInfo(resnameString, "discarded_target_item_%d",
|
||||
targetEntryIndex);
|
||||
selectEntry->resname = resnameString->data;
|
||||
|
||||
nonProjectedEntries = lappend(nonProjectedEntries, selectEntry);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
projectedEntries = lappend(projectedEntries, selectEntry);
|
||||
}
|
||||
|
||||
targetEntryIndex++;
|
||||
}
|
||||
|
||||
for (int entryIndex = list_length(insertTargetList);
|
||||
entryIndex < list_length(selectTargetList);
|
||||
entryIndex++)
|
||||
{
|
||||
nonProjectedEntries = lappend(nonProjectedEntries, list_nth(selectTargetList,
|
||||
entryIndex));
|
||||
}
|
||||
|
||||
/* selectEntry->resno must be the ordinal number of the entry */
|
||||
selectTargetList = list_concat(projectedEntries, nonProjectedEntries);
|
||||
int entryResNo = 1;
|
||||
TargetEntry *selectTargetEntry = NULL;
|
||||
foreach_ptr(selectTargetEntry, selectTargetList)
|
||||
{
|
||||
selectTargetEntry->resno = entryResNo++;
|
||||
}
|
||||
|
||||
heap_close(distributedRelation, NoLock);
|
||||
|
||||
return selectTargetList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CastExpr returns an expression which casts the given expr from sourceType to
|
||||
* the given targetType.
|
||||
*/
|
||||
static Expr *
|
||||
CastExpr(Expr *expr, Oid sourceType, Oid targetType, Oid targetCollation,
|
||||
int targetTypeMod)
|
||||
{
|
||||
Oid coercionFuncId = InvalidOid;
|
||||
CoercionPathType coercionType = find_coercion_pathway(targetType, sourceType,
|
||||
COERCION_EXPLICIT,
|
||||
&coercionFuncId);
|
||||
|
||||
if (coercionType == COERCION_PATH_FUNC)
|
||||
{
|
||||
FuncExpr *coerceExpr = makeNode(FuncExpr);
|
||||
coerceExpr->funcid = coercionFuncId;
|
||||
coerceExpr->args = list_make1(copyObject(expr));
|
||||
coerceExpr->funccollid = targetCollation;
|
||||
coerceExpr->funcresulttype = targetType;
|
||||
|
||||
return (Expr *) coerceExpr;
|
||||
}
|
||||
else if (coercionType == COERCION_PATH_RELABELTYPE)
|
||||
{
|
||||
RelabelType *coerceExpr = makeNode(RelabelType);
|
||||
coerceExpr->arg = copyObject(expr);
|
||||
coerceExpr->resulttype = targetType;
|
||||
coerceExpr->resulttypmod = targetTypeMod;
|
||||
coerceExpr->resultcollid = targetCollation;
|
||||
coerceExpr->relabelformat = COERCE_IMPLICIT_CAST;
|
||||
coerceExpr->location = -1;
|
||||
|
||||
return (Expr *) coerceExpr;
|
||||
}
|
||||
else if (coercionType == COERCION_PATH_ARRAYCOERCE)
|
||||
{
|
||||
Oid sourceBaseType = get_base_element_type(sourceType);
|
||||
Oid targetBaseType = get_base_element_type(targetType);
|
||||
|
||||
CaseTestExpr *elemExpr = makeNode(CaseTestExpr);
|
||||
elemExpr->collation = targetCollation;
|
||||
elemExpr->typeId = sourceBaseType;
|
||||
elemExpr->typeMod = -1;
|
||||
|
||||
Expr *elemCastExpr = CastExpr((Expr *) elemExpr, sourceBaseType,
|
||||
targetBaseType, targetCollation,
|
||||
targetTypeMod);
|
||||
|
||||
ArrayCoerceExpr *coerceExpr = makeNode(ArrayCoerceExpr);
|
||||
coerceExpr->arg = copyObject(expr);
|
||||
coerceExpr->elemexpr = elemCastExpr;
|
||||
coerceExpr->resultcollid = targetCollation;
|
||||
coerceExpr->resulttype = targetType;
|
||||
coerceExpr->resulttypmod = targetTypeMod;
|
||||
coerceExpr->location = -1;
|
||||
coerceExpr->coerceformat = COERCE_IMPLICIT_CAST;
|
||||
|
||||
return (Expr *) coerceExpr;
|
||||
}
|
||||
else if (coercionType == COERCION_PATH_COERCEVIAIO)
|
||||
{
|
||||
CoerceViaIO *coerceExpr = makeNode(CoerceViaIO);
|
||||
coerceExpr->arg = (Expr *) copyObject(expr);
|
||||
coerceExpr->resulttype = targetType;
|
||||
coerceExpr->resultcollid = targetCollation;
|
||||
coerceExpr->coerceformat = COERCE_IMPLICIT_CAST;
|
||||
coerceExpr->location = -1;
|
||||
|
||||
return (Expr *) coerceExpr;
|
||||
}
|
||||
else
|
||||
{
|
||||
ereport(ERROR, (errmsg("could not find a conversion path from type %d to %d",
|
||||
sourceType, targetType)));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/* PlanningInsertSelect returns true if we are planning an INSERT ...SELECT query */
|
||||
bool
|
||||
PlanningInsertSelect(void)
|
||||
{
|
||||
return insertSelectPlannerLevel > 0;
|
||||
}
|
||||
|
|
|
@ -129,6 +129,7 @@ CopyNodeDistributedPlan(COPYFUNC_ARGS)
|
|||
COPY_NODE_FIELD(relationIdList);
|
||||
COPY_SCALAR_FIELD(targetRelationId);
|
||||
COPY_NODE_FIELD(insertSelectQuery);
|
||||
COPY_NODE_FIELD(selectPlanForInsertSelect);
|
||||
COPY_SCALAR_FIELD(insertSelectMethod);
|
||||
COPY_STRING_FIELD(intermediateResultIdPrefix);
|
||||
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
extern bool EnableRepartitionedInsertSelect;
|
||||
|
||||
extern TupleTableSlot * NonPushableInsertSelectExecScan(CustomScanState *node);
|
||||
extern bool ExecutingInsertSelect(void);
|
||||
extern Query * BuildSelectForInsertSelect(Query *insertSelectQuery);
|
||||
extern bool IsSupportedRedistributionTarget(Oid targetRelationId);
|
||||
extern bool IsRedistributablePlan(Plan *selectPlan);
|
||||
|
|
|
@ -43,6 +43,7 @@ extern DistributedPlan * CreateInsertSelectIntoLocalTablePlan(uint64 planId,
|
|||
PlannerRestrictionContext *
|
||||
plannerRestrictionContext);
|
||||
extern char * InsertSelectResultIdPrefix(uint64 planId);
|
||||
extern bool PlanningInsertSelect(void);
|
||||
|
||||
|
||||
#endif /* INSERT_SELECT_PLANNER_H */
|
||||
|
|
|
@ -432,8 +432,10 @@ typedef struct DistributedPlan
|
|||
/* target relation of a modification */
|
||||
Oid targetRelationId;
|
||||
|
||||
/* INSERT .. SELECT via the coordinator or repartition */
|
||||
/*
|
||||
* INSERT .. SELECT via the coordinator or repartition */
|
||||
Query *insertSelectQuery;
|
||||
PlannedStmt *selectPlanForInsertSelect;
|
||||
InsertSelectMethod insertSelectMethod;
|
||||
|
||||
/*
|
||||
|
|
|
@ -763,10 +763,10 @@ DEBUG: INSERT target table and the source relation of the SELECT partition colu
|
|||
DEBUG: only SELECT, UPDATE, or DELETE common table expressions may be router planned
|
||||
DEBUG: generating subplan XXX_1 for CTE r: INSERT INTO insert_select_repartition.target_table (a, b) SELECT a, b FROM insert_select_repartition.source_table RETURNING target_table.a, target_table.b
|
||||
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
|
||||
DEBUG: Router planner cannot handle multi-shard select queries
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT a, max AS b FROM (SELECT source_table.a, max(source_table.b) AS max FROM (insert_select_repartition.source_table JOIN (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) r USING (a, b)) GROUP BY source_table.a) citus_insert_select_subquery
|
||||
DEBUG: Router planner cannot handle multi-shard select queries
|
||||
DEBUG: performing repartitioned INSERT ... SELECT
|
||||
DEBUG: Router planner cannot handle multi-shard select queries
|
||||
DEBUG: performing repartitioned INSERT ... SELECT
|
||||
DEBUG: partitioning SELECT query by column index 0 with name 'a'
|
||||
DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213610 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213606_to_0,repartitioned_results_xxxxx_from_4213607_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) RETURNING citus_table_alias.a, citus_table_alias.b
|
||||
|
|
|
@ -1357,31 +1357,33 @@ WITH cte1 AS (SELECT * FROM cte1 WHERE EXISTS (SELECT * FROM cte1) LIMIT 5)
|
|||
SELECT s FROM cte1 WHERE EXISTS (SELECT * FROM cte1);
|
||||
Custom Scan (Citus INSERT ... SELECT)
|
||||
INSERT/SELECT method: pull to coordinator
|
||||
-> Result
|
||||
One-Time Filter: $3
|
||||
-> Subquery Scan on citus_insert_select_subquery
|
||||
CTE cte1
|
||||
-> Function Scan on generate_series s
|
||||
CTE cte1
|
||||
-> Limit
|
||||
InitPlan 2 (returns $1)
|
||||
-> CTE Scan on cte1 cte1_1
|
||||
-> Result
|
||||
One-Time Filter: $1
|
||||
-> CTE Scan on cte1 cte1_2
|
||||
InitPlan 4 (returns $3)
|
||||
-> CTE Scan on cte1 cte1_3
|
||||
-> CTE Scan on cte1
|
||||
-> Result
|
||||
One-Time Filter: $3
|
||||
CTE cte1
|
||||
-> Limit
|
||||
InitPlan 2 (returns $1)
|
||||
-> CTE Scan on cte1 cte1_1
|
||||
-> Result
|
||||
One-Time Filter: $1
|
||||
-> CTE Scan on cte1 cte1_2
|
||||
InitPlan 4 (returns $3)
|
||||
-> CTE Scan on cte1 cte1_3
|
||||
-> CTE Scan on cte1
|
||||
EXPLAIN (COSTS OFF)
|
||||
INSERT INTO lineitem_hash_part
|
||||
( SELECT s FROM generate_series(1,5) s) UNION
|
||||
( SELECT s FROM generate_series(5,10) s);
|
||||
Custom Scan (Citus INSERT ... SELECT)
|
||||
INSERT/SELECT method: pull to coordinator
|
||||
-> HashAggregate
|
||||
Group Key: s.s
|
||||
-> Append
|
||||
-> Function Scan on generate_series s
|
||||
-> Function Scan on generate_series s_1
|
||||
-> Subquery Scan on citus_insert_select_subquery
|
||||
-> HashAggregate
|
||||
Group Key: s.s
|
||||
-> Append
|
||||
-> Function Scan on generate_series s
|
||||
-> Function Scan on generate_series s_1
|
||||
-- explain with recursive planning
|
||||
-- prevent PG 11 - PG 12 outputs to diverge
|
||||
SET citus.enable_cte_inlining TO false;
|
||||
|
|
|
@ -710,6 +710,14 @@ INSERT INTO agg_events
|
|||
raw_events_first;
|
||||
DEBUG: CTE sub_cte is going to be inlined via distributed planning
|
||||
DEBUG: Subqueries without relations are not allowed in distributed INSERT ... SELECT queries
|
||||
DEBUG: Router planner cannot handle multi-shard select queries
|
||||
DEBUG: Subqueries without relations are not allowed in distributed INSERT ... SELECT queries
|
||||
DEBUG: CTE sub_cte is going to be inlined via distributed planning
|
||||
DEBUG: Router planner cannot handle multi-shard select queries
|
||||
DEBUG: Router planner cannot handle multi-shard select queries
|
||||
DEBUG: generating subplan XXX_1 for CTE sub_cte: SELECT 1
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT user_id, (SELECT sub_cte."?column?" FROM (SELECT intermediate_result."?column?" FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result("?column?" integer)) sub_cte) AS value_1_agg FROM public.raw_events_first
|
||||
DEBUG: Router planner cannot handle multi-shard select queries
|
||||
ERROR: could not run distributed query with subquery outside the FROM, WHERE and HAVING clauses
|
||||
HINT: Consider using an equality filter on the distributed table's partition column.
|
||||
-- We support set operations via the coordinator
|
||||
|
@ -1044,7 +1052,7 @@ $Q$);
|
|||
Custom Scan (Citus INSERT ... SELECT)
|
||||
INSERT/SELECT method: pull to coordinator
|
||||
-> HashAggregate
|
||||
Group Key: remote_scan.id
|
||||
Group Key: remote_scan.user_id
|
||||
-> Custom Scan (Citus Adaptive)
|
||||
-> Distributed Subplan XXX_1
|
||||
-> Custom Scan (Citus Adaptive)
|
||||
|
@ -1109,7 +1117,7 @@ $Q$);
|
|||
Custom Scan (Citus INSERT ... SELECT)
|
||||
INSERT/SELECT method: pull to coordinator
|
||||
-> HashAggregate
|
||||
Group Key: remote_scan.id
|
||||
Group Key: remote_scan.user_id
|
||||
-> Custom Scan (Citus Adaptive)
|
||||
-> Distributed Subplan XXX_1
|
||||
-> Custom Scan (Citus Adaptive)
|
||||
|
@ -1408,7 +1416,7 @@ $Q$);
|
|||
Custom Scan (Citus INSERT ... SELECT)
|
||||
INSERT/SELECT method: pull to coordinator
|
||||
-> HashAggregate
|
||||
Group Key: remote_scan.id
|
||||
Group Key: remote_scan.user_id
|
||||
-> Custom Scan (Citus Adaptive)
|
||||
-> Distributed Subplan XXX_1
|
||||
-> Custom Scan (Citus Adaptive)
|
||||
|
@ -1453,7 +1461,7 @@ $Q$);
|
|||
Custom Scan (Citus INSERT ... SELECT)
|
||||
INSERT/SELECT method: pull to coordinator
|
||||
-> HashAggregate
|
||||
Group Key: remote_scan.id
|
||||
Group Key: remote_scan.user_id
|
||||
-> Custom Scan (Citus Adaptive)
|
||||
-> Distributed Subplan XXX_1
|
||||
-> HashAggregate
|
||||
|
@ -1501,7 +1509,7 @@ $Q$);
|
|||
Custom Scan (Citus INSERT ... SELECT)
|
||||
INSERT/SELECT method: pull to coordinator
|
||||
-> HashAggregate
|
||||
Group Key: remote_scan.id
|
||||
Group Key: remote_scan.user_id
|
||||
-> Custom Scan (Citus Adaptive)
|
||||
-> Distributed Subplan XXX_1
|
||||
-> Custom Scan (Citus Adaptive)
|
||||
|
|
|
@ -114,10 +114,10 @@ WITH inserted_table AS (
|
|||
) SELECT * FROM inserted_table ORDER BY 1;
|
||||
DEBUG: generating subplan XXX_1 for CTE inserted_table: INSERT INTO on_conflict.target_table (col_1, col_2) SELECT col_1, col_2 FROM (SELECT source_table_1.col_1, source_table_1.col_2, source_table_1.col_3 FROM on_conflict.source_table_1 LIMIT 5) foo ON CONFLICT(col_1) DO UPDATE SET col_2 = excluded.col_2 RETURNING target_table.col_1, target_table.col_2
|
||||
DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) inserted_table ORDER BY col_1
|
||||
DEBUG: push down of limit count: 5
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT col_1, col_2, col_3 FROM on_conflict.source_table_1 LIMIT 5
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer)) foo
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) inserted_table ORDER BY col_1
|
||||
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||
col_1 | col_2
|
||||
---------------------------------------------------------------------
|
||||
|
@ -150,13 +150,13 @@ WITH inserted_table AS (
|
|||
) SELECT * FROM inserted_table ORDER BY 1;
|
||||
DEBUG: generating subplan XXX_1 for CTE inserted_table: INSERT INTO on_conflict.target_table (col_1, col_2) SELECT col_1, col_2 FROM ((SELECT source_table_1.col_1, source_table_1.col_2, source_table_1.col_3 FROM on_conflict.source_table_1 LIMIT 5) UNION (SELECT source_table_2.col_1, source_table_2.col_2, source_table_2.col_3 FROM on_conflict.source_table_2 LIMIT 5)) foo ON CONFLICT(col_1) DO UPDATE SET col_2 = 0 RETURNING target_table.col_1, target_table.col_2
|
||||
DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) inserted_table ORDER BY col_1
|
||||
DEBUG: push down of limit count: 5
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT col_1, col_2, col_3 FROM on_conflict.source_table_1 LIMIT 5
|
||||
DEBUG: push down of limit count: 5
|
||||
DEBUG: generating subplan XXX_2 for subquery SELECT col_1, col_2, col_3 FROM on_conflict.source_table_2 LIMIT 5
|
||||
DEBUG: generating subplan XXX_3 for subquery SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer) UNION SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer)
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer)) foo
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) inserted_table ORDER BY col_1
|
||||
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||
col_1 | col_2
|
||||
---------------------------------------------------------------------
|
||||
|
@ -246,10 +246,10 @@ WITH inserted_table AS (
|
|||
) SELECT * FROM inserted_table ORDER BY 1;
|
||||
DEBUG: generating subplan XXX_1 for CTE inserted_table: WITH cte AS (SELECT source_table_1.col_1, source_table_1.col_2, source_table_1.col_3 FROM on_conflict.source_table_1), cte_2 AS (SELECT cte.col_1, cte.col_2 FROM cte) INSERT INTO on_conflict.target_table (col_1, col_2) SELECT col_1, col_2 FROM cte_2 ON CONFLICT(col_1) DO UPDATE SET col_2 = (excluded.col_2 OPERATOR(pg_catalog.+) 1) RETURNING target_table.col_1, target_table.col_2
|
||||
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) inserted_table ORDER BY col_1
|
||||
DEBUG: generating subplan XXX_1 for CTE cte: SELECT col_1, col_2, col_3 FROM on_conflict.source_table_1
|
||||
DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer)) cte
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT cte_2.col_1, cte_2.col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) cte_2) citus_insert_select_subquery
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) inserted_table ORDER BY col_1
|
||||
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||
col_1 | col_2
|
||||
---------------------------------------------------------------------
|
||||
|
@ -269,9 +269,9 @@ WITH cte AS (
|
|||
UPDATE target_table SET col_2 = 4 WHERE col_1 IN (SELECT col_1 FROM cte);
|
||||
DEBUG: generating subplan XXX_1 for CTE cte: WITH basic AS (SELECT source_table_1.col_1, source_table_1.col_2 FROM on_conflict.source_table_1) INSERT INTO on_conflict.target_table (col_1, col_2) SELECT col_1, col_2 FROM basic ON CONFLICT DO NOTHING RETURNING target_table.col_1, target_table.col_2
|
||||
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE on_conflict.target_table SET col_2 = 4 WHERE (col_1 OPERATOR(pg_catalog.=) ANY (SELECT cte.col_1 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) cte))
|
||||
DEBUG: generating subplan XXX_1 for CTE basic: SELECT col_1, col_2 FROM on_conflict.source_table_1
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT basic.col_1, basic.col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) basic) citus_insert_select_subquery
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE on_conflict.target_table SET col_2 = 4 WHERE (col_1 OPERATOR(pg_catalog.=) ANY (SELECT cte.col_1 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) cte))
|
||||
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||
RESET client_min_messages;
|
||||
-- Following query is supported by using repartition join for the insert/select
|
||||
|
|
|
@ -147,6 +147,10 @@ FROM (
|
|||
GROUP BY t1.user_id, hasdone_event
|
||||
) t GROUP BY user_id, hasdone_event;
|
||||
DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT u.user_id, 'step=>1'::text AS event, e."time" FROM public.users_table u, public.events_table e WHERE ((u.user_id OPERATOR(pg_catalog.=) e.user_id) AND (u.user_id OPERATOR(pg_catalog.>=) 10) AND (u.user_id OPERATOR(pg_catalog.<=) 25) AND (e.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[100, 101, 102])))
|
||||
DEBUG: generating subplan XXX_2 for subquery SELECT u.user_id, 'step=>2'::text AS event, e."time" FROM public.users_table u, public.events_table e WHERE ((u.user_id OPERATOR(pg_catalog.=) e.user_id) AND (u.user_id OPERATOR(pg_catalog.>=) 10) AND (u.user_id OPERATOR(pg_catalog.<=) 25) AND (e.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[103, 104, 105])))
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT intermediate_result.user_id, intermediate_result.event, intermediate_result."time" FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event text, "time" timestamp without time zone) UNION SELECT intermediate_result.user_id, intermediate_result.event, intermediate_result."time" FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event text, "time" timestamp without time zone)
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT u.user_id, 'step=>1'::text AS event, e."time" FROM public.users_table u, public.events_table e WHERE ((u.user_id OPERATOR(pg_catalog.=) e.user_id) AND (u.user_id OPERATOR(pg_catalog.>=) 10) AND (u.user_id OPERATOR(pg_catalog.<=) 25) AND (e.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[100, 101, 102]))) UNION SELECT u.user_id, 'step=>2'::text AS event, e."time" FROM public.users_table u, public.events_table e WHERE ((u.user_id OPERATOR(pg_catalog.=) e.user_id) AND (u.user_id OPERATOR(pg_catalog.>=) 10) AND (u.user_id OPERATOR(pg_catalog.<=) 25) AND (e.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[103, 104, 105])))
|
||||
ERROR: cannot pushdown the subquery
|
||||
DETAIL: Complex subqueries and CTEs cannot be in the outer part of the outer join
|
||||
RESET client_min_messages;
|
||||
|
@ -293,6 +297,10 @@ GROUP BY
|
|||
ORDER BY
|
||||
count_pay;
|
||||
DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT users_table.user_id, 'action=>1'::text AS event, events_table."time" FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (users_table.user_id OPERATOR(pg_catalog.>=) 10) AND (users_table.user_id OPERATOR(pg_catalog.<=) 70) AND (events_table.event_type OPERATOR(pg_catalog.>) 10) AND (events_table.event_type OPERATOR(pg_catalog.<) 12))
|
||||
DEBUG: generating subplan XXX_2 for subquery SELECT users_table.user_id, 'action=>2'::text AS event, events_table."time" FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (users_table.user_id OPERATOR(pg_catalog.>=) 10) AND (users_table.user_id OPERATOR(pg_catalog.<=) 70) AND (events_table.event_type OPERATOR(pg_catalog.>) 12) AND (events_table.event_type OPERATOR(pg_catalog.<) 14))
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT intermediate_result.user_id, intermediate_result.event, intermediate_result."time" FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event text, "time" timestamp without time zone) UNION SELECT intermediate_result.user_id, intermediate_result.event, intermediate_result."time" FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event text, "time" timestamp without time zone)
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT users_table.user_id, 'action=>1'::text AS event, events_table."time" FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (users_table.user_id OPERATOR(pg_catalog.>=) 10) AND (users_table.user_id OPERATOR(pg_catalog.<=) 70) AND (events_table.event_type OPERATOR(pg_catalog.>) 10) AND (events_table.event_type OPERATOR(pg_catalog.<) 12)) UNION SELECT users_table.user_id, 'action=>2'::text AS event, events_table."time" FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (users_table.user_id OPERATOR(pg_catalog.>=) 10) AND (users_table.user_id OPERATOR(pg_catalog.<=) 70) AND (events_table.event_type OPERATOR(pg_catalog.>) 12) AND (events_table.event_type OPERATOR(pg_catalog.<) 14))
|
||||
ERROR: cannot pushdown the subquery
|
||||
DETAIL: Complex subqueries and CTEs cannot be in the outer part of the outer join
|
||||
RESET client_min_messages;
|
||||
|
@ -676,7 +684,7 @@ $Q$);
|
|||
Custom Scan (Citus INSERT ... SELECT)
|
||||
INSERT/SELECT method: pull to coordinator
|
||||
-> HashAggregate
|
||||
Group Key: remote_scan.user_id, remote_scan.event_type
|
||||
Group Key: remote_scan.user_id, remote_scan.value_1_agg
|
||||
-> Custom Scan (Citus Adaptive)
|
||||
-> Distributed Subplan XXX_1
|
||||
-> Custom Scan (Citus Adaptive)
|
||||
|
@ -696,7 +704,7 @@ $Q$);
|
|||
Custom Scan (Citus INSERT ... SELECT)
|
||||
INSERT/SELECT method: pull to coordinator
|
||||
-> HashAggregate
|
||||
Group Key: remote_scan.user_id, remote_scan.event_type
|
||||
Group Key: remote_scan.user_id, remote_scan.value_1_agg
|
||||
-> Custom Scan (Citus Adaptive)
|
||||
-> Distributed Subplan XXX_1
|
||||
-> Custom Scan (Citus Adaptive)
|
||||
|
@ -717,7 +725,7 @@ $Q$);
|
|||
Custom Scan (Citus INSERT ... SELECT)
|
||||
INSERT/SELECT method: pull to coordinator
|
||||
-> HashAggregate
|
||||
Group Key: remote_scan.user_id, remote_scan.event_type
|
||||
Group Key: remote_scan.user_id, remote_scan.value_1_agg
|
||||
-> Custom Scan (Citus Adaptive)
|
||||
-> Distributed Subplan XXX_1
|
||||
-> Custom Scan (Citus Adaptive)
|
||||
|
|
Loading…
Reference in New Issue