mirror of https://github.com/citusdata/citus.git
Merge pull request #3943 from citusdata/fix_explain_2
Report correct INSERT/SELECT method in EXPLAINpull/3945/head^2
commit
d022f80340
|
@ -49,7 +49,7 @@
|
||||||
/* functions for creating custom scan nodes */
|
/* functions for creating custom scan nodes */
|
||||||
static Node * AdaptiveExecutorCreateScan(CustomScan *scan);
|
static Node * AdaptiveExecutorCreateScan(CustomScan *scan);
|
||||||
static Node * TaskTrackerCreateScan(CustomScan *scan);
|
static Node * TaskTrackerCreateScan(CustomScan *scan);
|
||||||
static Node * CoordinatorInsertSelectCreateScan(CustomScan *scan);
|
static Node * NonPushableInsertSelectCreateScan(CustomScan *scan);
|
||||||
static Node * DelayedErrorCreateScan(CustomScan *scan);
|
static Node * DelayedErrorCreateScan(CustomScan *scan);
|
||||||
|
|
||||||
/* functions that are common to different scans */
|
/* functions that are common to different scans */
|
||||||
|
@ -77,9 +77,9 @@ CustomScanMethods TaskTrackerCustomScanMethods = {
|
||||||
TaskTrackerCreateScan
|
TaskTrackerCreateScan
|
||||||
};
|
};
|
||||||
|
|
||||||
CustomScanMethods CoordinatorInsertSelectCustomScanMethods = {
|
CustomScanMethods NonPushableInsertSelectCustomScanMethods = {
|
||||||
"Citus INSERT ... SELECT",
|
"Citus INSERT ... SELECT",
|
||||||
CoordinatorInsertSelectCreateScan
|
NonPushableInsertSelectCreateScan
|
||||||
};
|
};
|
||||||
|
|
||||||
CustomScanMethods DelayedErrorCustomScanMethods = {
|
CustomScanMethods DelayedErrorCustomScanMethods = {
|
||||||
|
@ -109,13 +109,13 @@ static CustomExecMethods TaskTrackerCustomExecMethods = {
|
||||||
.ExplainCustomScan = CitusExplainScan
|
.ExplainCustomScan = CitusExplainScan
|
||||||
};
|
};
|
||||||
|
|
||||||
static CustomExecMethods CoordinatorInsertSelectCustomExecMethods = {
|
static CustomExecMethods NonPushableInsertSelectCustomExecMethods = {
|
||||||
.CustomName = "CoordinatorInsertSelectScan",
|
.CustomName = "NonPushableInsertSelectScan",
|
||||||
.BeginCustomScan = CitusBeginScan,
|
.BeginCustomScan = CitusBeginScan,
|
||||||
.ExecCustomScan = CoordinatorInsertSelectExecScan,
|
.ExecCustomScan = NonPushableInsertSelectExecScan,
|
||||||
.EndCustomScan = CitusEndScan,
|
.EndCustomScan = CitusEndScan,
|
||||||
.ReScanCustomScan = CitusReScan,
|
.ReScanCustomScan = CitusReScan,
|
||||||
.ExplainCustomScan = CoordinatorInsertSelectExplainScan
|
.ExplainCustomScan = NonPushableInsertSelectExplainScan
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
@ -133,7 +133,7 @@ IsCitusCustomState(PlanState *planState)
|
||||||
CustomScanState *css = castNode(CustomScanState, planState);
|
CustomScanState *css = castNode(CustomScanState, planState);
|
||||||
if (css->methods == &AdaptiveExecutorCustomExecMethods ||
|
if (css->methods == &AdaptiveExecutorCustomExecMethods ||
|
||||||
css->methods == &TaskTrackerCustomExecMethods ||
|
css->methods == &TaskTrackerCustomExecMethods ||
|
||||||
css->methods == &CoordinatorInsertSelectCustomExecMethods)
|
css->methods == &NonPushableInsertSelectCustomExecMethods)
|
||||||
{
|
{
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -150,7 +150,7 @@ RegisterCitusCustomScanMethods(void)
|
||||||
{
|
{
|
||||||
RegisterCustomScanMethods(&AdaptiveExecutorCustomScanMethods);
|
RegisterCustomScanMethods(&AdaptiveExecutorCustomScanMethods);
|
||||||
RegisterCustomScanMethods(&TaskTrackerCustomScanMethods);
|
RegisterCustomScanMethods(&TaskTrackerCustomScanMethods);
|
||||||
RegisterCustomScanMethods(&CoordinatorInsertSelectCustomScanMethods);
|
RegisterCustomScanMethods(&NonPushableInsertSelectCustomScanMethods);
|
||||||
RegisterCustomScanMethods(&DelayedErrorCustomScanMethods);
|
RegisterCustomScanMethods(&DelayedErrorCustomScanMethods);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -598,20 +598,20 @@ TaskTrackerCreateScan(CustomScan *scan)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CoordinatorInsertSelectCrateScan creates the scan state for executing
|
* NonPushableInsertSelectCrateScan creates the scan state for executing
|
||||||
* INSERT..SELECT into a distributed table via the coordinator.
|
* INSERT..SELECT into a distributed table via the coordinator.
|
||||||
*/
|
*/
|
||||||
static Node *
|
static Node *
|
||||||
CoordinatorInsertSelectCreateScan(CustomScan *scan)
|
NonPushableInsertSelectCreateScan(CustomScan *scan)
|
||||||
{
|
{
|
||||||
CitusScanState *scanState = palloc0(sizeof(CitusScanState));
|
CitusScanState *scanState = palloc0(sizeof(CitusScanState));
|
||||||
|
|
||||||
scanState->executorType = MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT;
|
scanState->executorType = MULTI_EXECUTOR_NON_PUSHABLE_INSERT_SELECT;
|
||||||
scanState->customScanState.ss.ps.type = T_CustomScanState;
|
scanState->customScanState.ss.ps.type = T_CustomScanState;
|
||||||
scanState->distributedPlan = GetDistributedPlan(scan);
|
scanState->distributedPlan = GetDistributedPlan(scan);
|
||||||
|
|
||||||
scanState->customScanState.methods =
|
scanState->customScanState.methods =
|
||||||
&CoordinatorInsertSelectCustomExecMethods;
|
&NonPushableInsertSelectCustomExecMethods;
|
||||||
|
|
||||||
return (Node *) scanState;
|
return (Node *) scanState;
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,11 +53,7 @@
|
||||||
/* Config variables managed via guc.c */
|
/* Config variables managed via guc.c */
|
||||||
bool EnableRepartitionedInsertSelect = true;
|
bool EnableRepartitionedInsertSelect = true;
|
||||||
|
|
||||||
/* depth of current insert/select executor. */
|
|
||||||
static int insertSelectExecutorLevel = 0;
|
|
||||||
|
|
||||||
|
|
||||||
static TupleTableSlot * CoordinatorInsertSelectExecScanInternal(CustomScanState *node);
|
|
||||||
static Query * WrapSubquery(Query *subquery);
|
static Query * WrapSubquery(Query *subquery);
|
||||||
static List * TwoPhaseInsertSelectTaskList(Oid targetRelationId, Query *insertSelectQuery,
|
static List * TwoPhaseInsertSelectTaskList(Oid targetRelationId, Query *insertSelectQuery,
|
||||||
char *resultIdPrefix);
|
char *resultIdPrefix);
|
||||||
|
@ -71,61 +67,27 @@ static HTAB * ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId,
|
||||||
static List * BuildColumnNameListFromTargetList(Oid targetRelationId,
|
static List * BuildColumnNameListFromTargetList(Oid targetRelationId,
|
||||||
List *insertTargetList);
|
List *insertTargetList);
|
||||||
static int PartitionColumnIndexFromColumnList(Oid relationId, List *columnNameList);
|
static int PartitionColumnIndexFromColumnList(Oid relationId, List *columnNameList);
|
||||||
static List * AddInsertSelectCasts(List *insertTargetList, List *selectTargetList,
|
|
||||||
Oid targetRelationId);
|
|
||||||
static List * RedistributedInsertSelectTaskList(Query *insertSelectQuery,
|
static List * RedistributedInsertSelectTaskList(Query *insertSelectQuery,
|
||||||
CitusTableCacheEntry *targetRelation,
|
CitusTableCacheEntry *targetRelation,
|
||||||
List **redistributedResults,
|
List **redistributedResults,
|
||||||
bool useBinaryFormat);
|
bool useBinaryFormat);
|
||||||
static int PartitionColumnIndex(List *insertTargetList, Var *partitionColumn);
|
static int PartitionColumnIndex(List *insertTargetList, Var *partitionColumn);
|
||||||
static Expr * CastExpr(Expr *expr, Oid sourceType, Oid targetType, Oid targetCollation,
|
|
||||||
int targetTypeMod);
|
|
||||||
static void WrapTaskListForProjection(List *taskList, List *projectedTargetEntries);
|
static void WrapTaskListForProjection(List *taskList, List *projectedTargetEntries);
|
||||||
static void RelableTargetEntryList(List *selectTargetList, List *insertTargetList);
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CoordinatorInsertSelectExecScan is a wrapper around
|
* NonPushableInsertSelectExecScan executes an INSERT INTO distributed_table
|
||||||
* CoordinatorInsertSelectExecScanInternal which also properly increments
|
* SELECT .. query either by routing via coordinator or by repartitioning
|
||||||
* or decrements insertSelectExecutorLevel.
|
* task results and moving data directly between nodes.
|
||||||
*/
|
*/
|
||||||
TupleTableSlot *
|
TupleTableSlot *
|
||||||
CoordinatorInsertSelectExecScan(CustomScanState *node)
|
NonPushableInsertSelectExecScan(CustomScanState *node)
|
||||||
{
|
|
||||||
TupleTableSlot *result = NULL;
|
|
||||||
insertSelectExecutorLevel++;
|
|
||||||
|
|
||||||
PG_TRY();
|
|
||||||
{
|
|
||||||
result = CoordinatorInsertSelectExecScanInternal(node);
|
|
||||||
}
|
|
||||||
PG_CATCH();
|
|
||||||
{
|
|
||||||
insertSelectExecutorLevel--;
|
|
||||||
PG_RE_THROW();
|
|
||||||
}
|
|
||||||
PG_END_TRY();
|
|
||||||
|
|
||||||
insertSelectExecutorLevel--;
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* CoordinatorInsertSelectExecScan executes an INSERT INTO distributed_table
|
|
||||||
* SELECT .. query by setting up a DestReceiver that copies tuples into the
|
|
||||||
* distributed table and then executing the SELECT query using that DestReceiver
|
|
||||||
* as the tuple destination.
|
|
||||||
*/
|
|
||||||
static TupleTableSlot *
|
|
||||||
CoordinatorInsertSelectExecScanInternal(CustomScanState *node)
|
|
||||||
{
|
{
|
||||||
CitusScanState *scanState = (CitusScanState *) node;
|
CitusScanState *scanState = (CitusScanState *) node;
|
||||||
|
|
||||||
if (!scanState->finishedRemoteScan)
|
if (!scanState->finishedRemoteScan)
|
||||||
{
|
{
|
||||||
EState *executorState = ScanStateGetExecutorState(scanState);
|
EState *executorState = ScanStateGetExecutorState(scanState);
|
||||||
ParamListInfo paramListInfo = executorState->es_param_list_info;
|
|
||||||
DistributedPlan *distributedPlan = scanState->distributedPlan;
|
DistributedPlan *distributedPlan = scanState->distributedPlan;
|
||||||
Query *insertSelectQuery = copyObject(distributedPlan->insertSelectQuery);
|
Query *insertSelectQuery = copyObject(distributedPlan->insertSelectQuery);
|
||||||
List *insertTargetList = insertSelectQuery->targetList;
|
List *insertTargetList = insertSelectQuery->targetList;
|
||||||
|
@ -136,40 +98,8 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node)
|
||||||
bool hasReturning = distributedPlan->expectResults;
|
bool hasReturning = distributedPlan->expectResults;
|
||||||
HTAB *shardStateHash = NULL;
|
HTAB *shardStateHash = NULL;
|
||||||
|
|
||||||
/* select query to execute */
|
Query *selectQuery = selectRte->subquery;
|
||||||
Query *selectQuery = BuildSelectForInsertSelect(insertSelectQuery);
|
PlannedStmt *selectPlan = copyObject(distributedPlan->selectPlanForInsertSelect);
|
||||||
|
|
||||||
selectRte->subquery = selectQuery;
|
|
||||||
ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Cast types of insert target list and select projection list to
|
|
||||||
* match the column types of the target relation.
|
|
||||||
*/
|
|
||||||
selectQuery->targetList =
|
|
||||||
AddInsertSelectCasts(insertSelectQuery->targetList,
|
|
||||||
selectQuery->targetList,
|
|
||||||
targetRelationId);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Later we might need to call WrapTaskListForProjection(), which requires
|
|
||||||
* that select target list has unique names, otherwise the outer query
|
|
||||||
* cannot select columns unambiguously. So we relabel select columns to
|
|
||||||
* match target columns.
|
|
||||||
*/
|
|
||||||
RelableTargetEntryList(selectQuery->targetList, insertTargetList);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Make a copy of the query, since pg_plan_query may scribble on it and we
|
|
||||||
* want it to be replanned every time if it is stored in a prepared
|
|
||||||
* statement.
|
|
||||||
*/
|
|
||||||
selectQuery = copyObject(selectQuery);
|
|
||||||
|
|
||||||
/* plan the subquery, this may be another distributed query */
|
|
||||||
int cursorOptions = CURSOR_OPT_PARALLEL_OK;
|
|
||||||
PlannedStmt *selectPlan = pg_plan_query(selectQuery, cursorOptions,
|
|
||||||
paramListInfo);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If we are dealing with partitioned table, we also need to lock its
|
* If we are dealing with partitioned table, we also need to lock its
|
||||||
|
@ -181,8 +111,7 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node)
|
||||||
LockPartitionRelations(targetRelationId, RowExclusiveLock);
|
LockPartitionRelations(targetRelationId, RowExclusiveLock);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (IsRedistributablePlan(selectPlan->planTree) &&
|
if (distributedPlan->insertSelectMethod == INSERT_SELECT_REPARTITION)
|
||||||
IsSupportedRedistributionTarget(targetRelationId))
|
|
||||||
{
|
{
|
||||||
ereport(DEBUG1, (errmsg("performing repartitioned INSERT ... SELECT")));
|
ereport(DEBUG1, (errmsg("performing repartitioned INSERT ... SELECT")));
|
||||||
|
|
||||||
|
@ -677,207 +606,6 @@ PartitionColumnIndexFromColumnList(Oid relationId, List *columnNameList)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/* ExecutingInsertSelect returns true if we are executing an INSERT ...SELECT query */
|
|
||||||
bool
|
|
||||||
ExecutingInsertSelect(void)
|
|
||||||
{
|
|
||||||
return insertSelectExecutorLevel > 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* AddInsertSelectCasts makes sure that the types in columns in the given
|
|
||||||
* target lists have the same type as the columns of the given relation.
|
|
||||||
* It might add casts to ensure that.
|
|
||||||
*
|
|
||||||
* It returns the updated selectTargetList.
|
|
||||||
*/
|
|
||||||
static List *
|
|
||||||
AddInsertSelectCasts(List *insertTargetList, List *selectTargetList,
|
|
||||||
Oid targetRelationId)
|
|
||||||
{
|
|
||||||
ListCell *insertEntryCell = NULL;
|
|
||||||
ListCell *selectEntryCell = NULL;
|
|
||||||
List *projectedEntries = NIL;
|
|
||||||
List *nonProjectedEntries = NIL;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* ReorderInsertSelectTargetLists() makes sure that first few columns of
|
|
||||||
* the SELECT query match the insert targets. It might contain additional
|
|
||||||
* items for GROUP BY, etc.
|
|
||||||
*/
|
|
||||||
Assert(list_length(insertTargetList) <= list_length(selectTargetList));
|
|
||||||
|
|
||||||
Relation distributedRelation = heap_open(targetRelationId, RowExclusiveLock);
|
|
||||||
TupleDesc destTupleDescriptor = RelationGetDescr(distributedRelation);
|
|
||||||
|
|
||||||
int targetEntryIndex = 0;
|
|
||||||
forboth(insertEntryCell, insertTargetList, selectEntryCell, selectTargetList)
|
|
||||||
{
|
|
||||||
TargetEntry *insertEntry = (TargetEntry *) lfirst(insertEntryCell);
|
|
||||||
TargetEntry *selectEntry = (TargetEntry *) lfirst(selectEntryCell);
|
|
||||||
Var *insertColumn = (Var *) insertEntry->expr;
|
|
||||||
Form_pg_attribute attr = TupleDescAttr(destTupleDescriptor,
|
|
||||||
insertEntry->resno - 1);
|
|
||||||
|
|
||||||
Oid sourceType = insertColumn->vartype;
|
|
||||||
Oid targetType = attr->atttypid;
|
|
||||||
if (sourceType != targetType)
|
|
||||||
{
|
|
||||||
insertEntry->expr = CastExpr((Expr *) insertColumn, sourceType, targetType,
|
|
||||||
attr->attcollation, attr->atttypmod);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* We cannot modify the selectEntry in-place, because ORDER BY or
|
|
||||||
* GROUP BY clauses might be pointing to it with comparison types
|
|
||||||
* of the source type. So instead we keep the original one as a
|
|
||||||
* non-projected entry, so GROUP BY and ORDER BY are happy, and
|
|
||||||
* create a duplicated projected entry with the coerced expression.
|
|
||||||
*/
|
|
||||||
TargetEntry *coercedEntry = copyObject(selectEntry);
|
|
||||||
coercedEntry->expr = CastExpr((Expr *) selectEntry->expr, sourceType,
|
|
||||||
targetType, attr->attcollation,
|
|
||||||
attr->atttypmod);
|
|
||||||
coercedEntry->ressortgroupref = 0;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* The only requirement is that users don't use this name in ORDER BY
|
|
||||||
* or GROUP BY, and it should be unique across the same query.
|
|
||||||
*/
|
|
||||||
StringInfo resnameString = makeStringInfo();
|
|
||||||
appendStringInfo(resnameString, "auto_coerced_by_citus_%d", targetEntryIndex);
|
|
||||||
coercedEntry->resname = resnameString->data;
|
|
||||||
|
|
||||||
projectedEntries = lappend(projectedEntries, coercedEntry);
|
|
||||||
|
|
||||||
if (selectEntry->ressortgroupref != 0)
|
|
||||||
{
|
|
||||||
selectEntry->resjunk = true;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* This entry might still end up in the SELECT output list, so
|
|
||||||
* rename it to avoid ambiguity.
|
|
||||||
*
|
|
||||||
* See https://github.com/citusdata/citus/pull/3470.
|
|
||||||
*/
|
|
||||||
resnameString = makeStringInfo();
|
|
||||||
appendStringInfo(resnameString, "discarded_target_item_%d",
|
|
||||||
targetEntryIndex);
|
|
||||||
selectEntry->resname = resnameString->data;
|
|
||||||
|
|
||||||
nonProjectedEntries = lappend(nonProjectedEntries, selectEntry);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
projectedEntries = lappend(projectedEntries, selectEntry);
|
|
||||||
}
|
|
||||||
|
|
||||||
targetEntryIndex++;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int entryIndex = list_length(insertTargetList);
|
|
||||||
entryIndex < list_length(selectTargetList);
|
|
||||||
entryIndex++)
|
|
||||||
{
|
|
||||||
nonProjectedEntries = lappend(nonProjectedEntries, list_nth(selectTargetList,
|
|
||||||
entryIndex));
|
|
||||||
}
|
|
||||||
|
|
||||||
/* selectEntry->resno must be the ordinal number of the entry */
|
|
||||||
selectTargetList = list_concat(projectedEntries, nonProjectedEntries);
|
|
||||||
int entryResNo = 1;
|
|
||||||
TargetEntry *selectTargetEntry = NULL;
|
|
||||||
foreach_ptr(selectTargetEntry, selectTargetList)
|
|
||||||
{
|
|
||||||
selectTargetEntry->resno = entryResNo++;
|
|
||||||
}
|
|
||||||
|
|
||||||
heap_close(distributedRelation, NoLock);
|
|
||||||
|
|
||||||
return selectTargetList;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* CastExpr returns an expression which casts the given expr from sourceType to
|
|
||||||
* the given targetType.
|
|
||||||
*/
|
|
||||||
static Expr *
|
|
||||||
CastExpr(Expr *expr, Oid sourceType, Oid targetType, Oid targetCollation,
|
|
||||||
int targetTypeMod)
|
|
||||||
{
|
|
||||||
Oid coercionFuncId = InvalidOid;
|
|
||||||
CoercionPathType coercionType = find_coercion_pathway(targetType, sourceType,
|
|
||||||
COERCION_EXPLICIT,
|
|
||||||
&coercionFuncId);
|
|
||||||
|
|
||||||
if (coercionType == COERCION_PATH_FUNC)
|
|
||||||
{
|
|
||||||
FuncExpr *coerceExpr = makeNode(FuncExpr);
|
|
||||||
coerceExpr->funcid = coercionFuncId;
|
|
||||||
coerceExpr->args = list_make1(copyObject(expr));
|
|
||||||
coerceExpr->funccollid = targetCollation;
|
|
||||||
coerceExpr->funcresulttype = targetType;
|
|
||||||
|
|
||||||
return (Expr *) coerceExpr;
|
|
||||||
}
|
|
||||||
else if (coercionType == COERCION_PATH_RELABELTYPE)
|
|
||||||
{
|
|
||||||
RelabelType *coerceExpr = makeNode(RelabelType);
|
|
||||||
coerceExpr->arg = copyObject(expr);
|
|
||||||
coerceExpr->resulttype = targetType;
|
|
||||||
coerceExpr->resulttypmod = targetTypeMod;
|
|
||||||
coerceExpr->resultcollid = targetCollation;
|
|
||||||
coerceExpr->relabelformat = COERCE_IMPLICIT_CAST;
|
|
||||||
coerceExpr->location = -1;
|
|
||||||
|
|
||||||
return (Expr *) coerceExpr;
|
|
||||||
}
|
|
||||||
else if (coercionType == COERCION_PATH_ARRAYCOERCE)
|
|
||||||
{
|
|
||||||
Oid sourceBaseType = get_base_element_type(sourceType);
|
|
||||||
Oid targetBaseType = get_base_element_type(targetType);
|
|
||||||
|
|
||||||
CaseTestExpr *elemExpr = makeNode(CaseTestExpr);
|
|
||||||
elemExpr->collation = targetCollation;
|
|
||||||
elemExpr->typeId = sourceBaseType;
|
|
||||||
elemExpr->typeMod = -1;
|
|
||||||
|
|
||||||
Expr *elemCastExpr = CastExpr((Expr *) elemExpr, sourceBaseType,
|
|
||||||
targetBaseType, targetCollation,
|
|
||||||
targetTypeMod);
|
|
||||||
|
|
||||||
ArrayCoerceExpr *coerceExpr = makeNode(ArrayCoerceExpr);
|
|
||||||
coerceExpr->arg = copyObject(expr);
|
|
||||||
coerceExpr->elemexpr = elemCastExpr;
|
|
||||||
coerceExpr->resultcollid = targetCollation;
|
|
||||||
coerceExpr->resulttype = targetType;
|
|
||||||
coerceExpr->resulttypmod = targetTypeMod;
|
|
||||||
coerceExpr->location = -1;
|
|
||||||
coerceExpr->coerceformat = COERCE_IMPLICIT_CAST;
|
|
||||||
|
|
||||||
return (Expr *) coerceExpr;
|
|
||||||
}
|
|
||||||
else if (coercionType == COERCION_PATH_COERCEVIAIO)
|
|
||||||
{
|
|
||||||
CoerceViaIO *coerceExpr = makeNode(CoerceViaIO);
|
|
||||||
coerceExpr->arg = (Expr *) copyObject(expr);
|
|
||||||
coerceExpr->resulttype = targetType;
|
|
||||||
coerceExpr->resultcollid = targetCollation;
|
|
||||||
coerceExpr->coerceformat = COERCE_IMPLICIT_CAST;
|
|
||||||
coerceExpr->location = -1;
|
|
||||||
|
|
||||||
return (Expr *) coerceExpr;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("could not find a conversion path from type %d to %d",
|
|
||||||
sourceType, targetType)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* IsSupportedRedistributionTarget determines whether re-partitioning into the
|
* IsSupportedRedistributionTarget determines whether re-partitioning into the
|
||||||
* given target relation is supported.
|
* given target relation is supported.
|
||||||
|
@ -1108,23 +836,3 @@ WrapTaskListForProjection(List *taskList, List *projectedTargetEntries)
|
||||||
SetTaskQueryString(task, wrappedQuery->data);
|
SetTaskQueryString(task, wrappedQuery->data);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* RelableTargetEntryList relabels select target list to have matching names with
|
|
||||||
* insert target list.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
RelableTargetEntryList(List *selectTargetList, List *insertTargetList)
|
|
||||||
{
|
|
||||||
ListCell *selectTargetCell = NULL;
|
|
||||||
ListCell *insertTargetCell = NULL;
|
|
||||||
|
|
||||||
forboth(selectTargetCell, selectTargetList, insertTargetCell, insertTargetList)
|
|
||||||
{
|
|
||||||
TargetEntry *selectTargetEntry = lfirst(selectTargetCell);
|
|
||||||
TargetEntry *insertTargetEntry = lfirst(insertTargetCell);
|
|
||||||
|
|
||||||
selectTargetEntry->resname = insertTargetEntry->resname;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -84,11 +84,11 @@ JobExecutorType(DistributedPlan *distributedPlan)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* Even if adaptiveExecutorEnabled, we go through
|
* Even if adaptiveExecutorEnabled, we go through
|
||||||
* MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT because
|
* MULTI_EXECUTOR_NON_PUSHABLE_INSERT_SELECT because
|
||||||
* the executor already knows how to handle adaptive
|
* the executor already knows how to handle adaptive
|
||||||
* executor when necessary.
|
* executor when necessary.
|
||||||
*/
|
*/
|
||||||
return MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT;
|
return MULTI_EXECUTOR_NON_PUSHABLE_INSERT_SELECT;
|
||||||
}
|
}
|
||||||
|
|
||||||
Assert(distributedPlan->modLevel == ROW_MODIFY_READONLY);
|
Assert(distributedPlan->modLevel == ROW_MODIFY_READONLY);
|
||||||
|
|
|
@ -99,7 +99,7 @@ CitusExecutorName(MultiExecutorType executorType)
|
||||||
return "task-tracker";
|
return "task-tracker";
|
||||||
}
|
}
|
||||||
|
|
||||||
case MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT:
|
case MULTI_EXECUTOR_NON_PUSHABLE_INSERT_SELECT:
|
||||||
{
|
{
|
||||||
return "insert-select";
|
return "insert-select";
|
||||||
}
|
}
|
||||||
|
|
|
@ -913,7 +913,8 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
|
||||||
}
|
}
|
||||||
|
|
||||||
distributedPlan =
|
distributedPlan =
|
||||||
CreateInsertSelectPlan(planId, originalQuery, plannerRestrictionContext);
|
CreateInsertSelectPlan(planId, originalQuery, plannerRestrictionContext,
|
||||||
|
boundParams);
|
||||||
}
|
}
|
||||||
else if (InsertSelectIntoLocalTable(originalQuery))
|
else if (InsertSelectIntoLocalTable(originalQuery))
|
||||||
{
|
{
|
||||||
|
@ -1294,9 +1295,9 @@ FinalizePlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan)
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
case MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT:
|
case MULTI_EXECUTOR_NON_PUSHABLE_INSERT_SELECT:
|
||||||
{
|
{
|
||||||
customScan->methods = &CoordinatorInsertSelectCustomScanMethods;
|
customScan->methods = &NonPushableInsertSelectCustomScanMethods;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -26,7 +26,6 @@
|
||||||
#include "distributed/deparse_shard_query.h"
|
#include "distributed/deparse_shard_query.h"
|
||||||
#include "distributed/function_call_delegation.h"
|
#include "distributed/function_call_delegation.h"
|
||||||
#include "distributed/insert_select_planner.h"
|
#include "distributed/insert_select_planner.h"
|
||||||
#include "distributed/insert_select_executor.h"
|
|
||||||
#include "distributed/metadata_utility.h"
|
#include "distributed/metadata_utility.h"
|
||||||
#include "distributed/coordinator_protocol.h"
|
#include "distributed/coordinator_protocol.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
|
@ -230,11 +229,10 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext)
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* This can be called while executing INSERT ... SELECT func(). insert_select_executor
|
* Cannot delegate functions for INSERT ... SELECT func(), since they require
|
||||||
* doesn't get the planned subquery and gets the actual struct Query, so the planning
|
* coordinated transactions.
|
||||||
* for these kinds of queries happens at the execution time.
|
|
||||||
*/
|
*/
|
||||||
if (ExecutingInsertSelect())
|
if (PlanningInsertSelect())
|
||||||
{
|
{
|
||||||
ereport(DEBUG1, (errmsg("not pushing down function calls in INSERT ... SELECT")));
|
ereport(DEBUG1, (errmsg("not pushing down function calls in INSERT ... SELECT")));
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
|
@ -48,10 +48,17 @@
|
||||||
#include "parser/parsetree.h"
|
#include "parser/parsetree.h"
|
||||||
#include "parser/parse_coerce.h"
|
#include "parser/parse_coerce.h"
|
||||||
#include "parser/parse_relation.h"
|
#include "parser/parse_relation.h"
|
||||||
|
#include "tcop/tcopprot.h"
|
||||||
|
#include "utils/builtins.h"
|
||||||
#include "utils/lsyscache.h"
|
#include "utils/lsyscache.h"
|
||||||
#include "utils/rel.h"
|
#include "utils/rel.h"
|
||||||
|
|
||||||
|
|
||||||
|
static DistributedPlan * CreateInsertSelectPlanInternal(uint64 planId,
|
||||||
|
Query *originalQuery,
|
||||||
|
PlannerRestrictionContext *
|
||||||
|
plannerRestrictionContext,
|
||||||
|
ParamListInfo boundParams);
|
||||||
static DistributedPlan * CreateDistributedInsertSelectPlan(Query *originalQuery,
|
static DistributedPlan * CreateDistributedInsertSelectPlan(Query *originalQuery,
|
||||||
PlannerRestrictionContext *
|
PlannerRestrictionContext *
|
||||||
plannerRestrictionContext);
|
plannerRestrictionContext);
|
||||||
|
@ -77,8 +84,18 @@ static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query,
|
||||||
subqueryRte,
|
subqueryRte,
|
||||||
Oid *
|
Oid *
|
||||||
selectPartitionColumnTableId);
|
selectPartitionColumnTableId);
|
||||||
static DistributedPlan * CreateCoordinatorInsertSelectPlan(uint64 planId, Query *parse);
|
static DistributedPlan * CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse,
|
||||||
static DeferredErrorMessage * CoordinatorInsertSelectSupported(Query *insertSelectQuery);
|
ParamListInfo boundParams);
|
||||||
|
static DeferredErrorMessage * NonPushableInsertSelectSupported(Query *insertSelectQuery);
|
||||||
|
static void RelabelTargetEntryList(List *selectTargetList, List *insertTargetList);
|
||||||
|
static List * AddInsertSelectCasts(List *insertTargetList, List *selectTargetList,
|
||||||
|
Oid targetRelationId);
|
||||||
|
static Expr * CastExpr(Expr *expr, Oid sourceType, Oid targetType, Oid targetCollation,
|
||||||
|
int targetTypeMod);
|
||||||
|
|
||||||
|
|
||||||
|
/* depth of current insert/select planner. */
|
||||||
|
static int insertSelectPlannerLevel = 0;
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -177,15 +194,46 @@ CheckInsertSelectQuery(Query *query)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CoordinatorInsertSelectExecScan is a wrapper around
|
||||||
|
* CoordinatorInsertSelectExecScanInternal which also properly increments
|
||||||
|
* or decrements insertSelectExecutorLevel.
|
||||||
|
*/
|
||||||
|
DistributedPlan *
|
||||||
|
CreateInsertSelectPlan(uint64 planId, Query *originalQuery,
|
||||||
|
PlannerRestrictionContext *plannerRestrictionContext,
|
||||||
|
ParamListInfo boundParams)
|
||||||
|
{
|
||||||
|
DistributedPlan *result = NULL;
|
||||||
|
insertSelectPlannerLevel++;
|
||||||
|
|
||||||
|
PG_TRY();
|
||||||
|
{
|
||||||
|
result = CreateInsertSelectPlanInternal(planId, originalQuery,
|
||||||
|
plannerRestrictionContext, boundParams);
|
||||||
|
}
|
||||||
|
PG_CATCH();
|
||||||
|
{
|
||||||
|
insertSelectPlannerLevel--;
|
||||||
|
PG_RE_THROW();
|
||||||
|
}
|
||||||
|
PG_END_TRY();
|
||||||
|
|
||||||
|
insertSelectPlannerLevel--;
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CreateInsertSelectPlan tries to create a distributed plan for an
|
* CreateInsertSelectPlan tries to create a distributed plan for an
|
||||||
* INSERT INTO distributed_table SELECT ... query by push down the
|
* INSERT INTO distributed_table SELECT ... query by push down the
|
||||||
* command to the workers and if that is not possible it creates a
|
* command to the workers and if that is not possible it creates a
|
||||||
* plan for evaluating the SELECT on the coordinator.
|
* plan for evaluating the SELECT on the coordinator.
|
||||||
*/
|
*/
|
||||||
DistributedPlan *
|
static DistributedPlan *
|
||||||
CreateInsertSelectPlan(uint64 planId, Query *originalQuery,
|
CreateInsertSelectPlanInternal(uint64 planId, Query *originalQuery,
|
||||||
PlannerRestrictionContext *plannerRestrictionContext)
|
PlannerRestrictionContext *plannerRestrictionContext,
|
||||||
|
ParamListInfo boundParams)
|
||||||
{
|
{
|
||||||
DeferredErrorMessage *deferredError = ErrorIfOnConflictNotSupported(originalQuery);
|
DeferredErrorMessage *deferredError = ErrorIfOnConflictNotSupported(originalQuery);
|
||||||
if (deferredError != NULL)
|
if (deferredError != NULL)
|
||||||
|
@ -201,8 +249,12 @@ CreateInsertSelectPlan(uint64 planId, Query *originalQuery,
|
||||||
{
|
{
|
||||||
RaiseDeferredError(distributedPlan->planningError, DEBUG1);
|
RaiseDeferredError(distributedPlan->planningError, DEBUG1);
|
||||||
|
|
||||||
/* if INSERT..SELECT cannot be distributed, pull to coordinator */
|
/*
|
||||||
distributedPlan = CreateCoordinatorInsertSelectPlan(planId, originalQuery);
|
* If INSERT..SELECT cannot be distributed, pull to coordinator or use
|
||||||
|
* repartitioning.
|
||||||
|
*/
|
||||||
|
distributedPlan = CreateNonPushableInsertSelectPlan(planId, originalQuery,
|
||||||
|
boundParams);
|
||||||
}
|
}
|
||||||
|
|
||||||
return distributedPlan;
|
return distributedPlan;
|
||||||
|
@ -1282,14 +1334,15 @@ InsertPartitionColumnMatchesSelect(Query *query, RangeTblEntry *insertRte,
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CreateCoordinatorInsertSelectPlan creates a query plan for a SELECT into a
|
* CreateNonPushableInsertSelectPlan creates a query plan for a SELECT into a
|
||||||
* distributed table. The query plan can also be executed on a worker in MX.
|
* distributed table. The query plan can also be executed on a worker in MX.
|
||||||
*/
|
*/
|
||||||
static DistributedPlan *
|
static DistributedPlan *
|
||||||
CreateCoordinatorInsertSelectPlan(uint64 planId, Query *parse)
|
CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo boundParams)
|
||||||
{
|
{
|
||||||
Query *insertSelectQuery = copyObject(parse);
|
Query *insertSelectQuery = copyObject(parse);
|
||||||
|
|
||||||
|
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery);
|
||||||
RangeTblEntry *insertRte = ExtractResultRelationRTE(insertSelectQuery);
|
RangeTblEntry *insertRte = ExtractResultRelationRTE(insertSelectQuery);
|
||||||
Oid targetRelationId = insertRte->relid;
|
Oid targetRelationId = insertRte->relid;
|
||||||
|
|
||||||
|
@ -1297,14 +1350,55 @@ CreateCoordinatorInsertSelectPlan(uint64 planId, Query *parse)
|
||||||
distributedPlan->modLevel = RowModifyLevelForQuery(insertSelectQuery);
|
distributedPlan->modLevel = RowModifyLevelForQuery(insertSelectQuery);
|
||||||
|
|
||||||
distributedPlan->planningError =
|
distributedPlan->planningError =
|
||||||
CoordinatorInsertSelectSupported(insertSelectQuery);
|
NonPushableInsertSelectSupported(insertSelectQuery);
|
||||||
|
|
||||||
if (distributedPlan->planningError != NULL)
|
if (distributedPlan->planningError != NULL)
|
||||||
{
|
{
|
||||||
return distributedPlan;
|
return distributedPlan;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Query *selectQuery = BuildSelectForInsertSelect(insertSelectQuery);
|
||||||
|
|
||||||
|
selectRte->subquery = selectQuery;
|
||||||
|
ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Cast types of insert target list and select projection list to
|
||||||
|
* match the column types of the target relation.
|
||||||
|
*/
|
||||||
|
selectQuery->targetList =
|
||||||
|
AddInsertSelectCasts(insertSelectQuery->targetList,
|
||||||
|
selectQuery->targetList,
|
||||||
|
targetRelationId);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Later we might need to call WrapTaskListForProjection(), which requires
|
||||||
|
* that select target list has unique names, otherwise the outer query
|
||||||
|
* cannot select columns unambiguously. So we relabel select columns to
|
||||||
|
* match target columns.
|
||||||
|
*/
|
||||||
|
List *insertTargetList = insertSelectQuery->targetList;
|
||||||
|
RelabelTargetEntryList(selectQuery->targetList, insertTargetList);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Make a copy of the select query, since following code scribbles it
|
||||||
|
* but we need to keep the original for EXPLAIN.
|
||||||
|
*/
|
||||||
|
Query *selectQueryCopy = copyObject(selectQuery);
|
||||||
|
|
||||||
|
/* plan the subquery, this may be another distributed query */
|
||||||
|
int cursorOptions = CURSOR_OPT_PARALLEL_OK;
|
||||||
|
PlannedStmt *selectPlan = pg_plan_query(selectQueryCopy, cursorOptions,
|
||||||
|
boundParams);
|
||||||
|
|
||||||
|
bool repartitioned = IsRedistributablePlan(selectPlan->planTree) &&
|
||||||
|
IsSupportedRedistributionTarget(targetRelationId);
|
||||||
|
|
||||||
distributedPlan->insertSelectQuery = insertSelectQuery;
|
distributedPlan->insertSelectQuery = insertSelectQuery;
|
||||||
|
distributedPlan->selectPlanForInsertSelect = selectPlan;
|
||||||
|
distributedPlan->insertSelectMethod = repartitioned ?
|
||||||
|
INSERT_SELECT_REPARTITION :
|
||||||
|
INSERT_SELECT_VIA_COORDINATOR;
|
||||||
distributedPlan->expectResults = insertSelectQuery->returningList != NIL;
|
distributedPlan->expectResults = insertSelectQuery->returningList != NIL;
|
||||||
distributedPlan->intermediateResultIdPrefix = InsertSelectResultIdPrefix(planId);
|
distributedPlan->intermediateResultIdPrefix = InsertSelectResultIdPrefix(planId);
|
||||||
distributedPlan->targetRelationId = targetRelationId;
|
distributedPlan->targetRelationId = targetRelationId;
|
||||||
|
@ -1314,13 +1408,13 @@ CreateCoordinatorInsertSelectPlan(uint64 planId, Query *parse)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CoordinatorInsertSelectSupported returns an error if executing an
|
* NonPushableInsertSelectSupported returns an error if executing an
|
||||||
* INSERT ... SELECT command by pulling results of the SELECT to the coordinator
|
* INSERT ... SELECT command by pulling results of the SELECT to the coordinator
|
||||||
* is unsupported because it needs to generate sequence values or insert into an
|
* or with repartitioning is unsupported because it needs to generate sequence
|
||||||
* append-distributed table.
|
* values or insert into an append-distributed table.
|
||||||
*/
|
*/
|
||||||
static DeferredErrorMessage *
|
static DeferredErrorMessage *
|
||||||
CoordinatorInsertSelectSupported(Query *insertSelectQuery)
|
NonPushableInsertSelectSupported(Query *insertSelectQuery)
|
||||||
{
|
{
|
||||||
DeferredErrorMessage *deferredError = ErrorIfOnConflictNotSupported(
|
DeferredErrorMessage *deferredError = ErrorIfOnConflictNotSupported(
|
||||||
insertSelectQuery);
|
insertSelectQuery);
|
||||||
|
@ -1355,3 +1449,224 @@ InsertSelectResultIdPrefix(uint64 planId)
|
||||||
|
|
||||||
return resultIdPrefix->data;
|
return resultIdPrefix->data;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RelabelTargetEntryList relabels select target list to have matching names with
|
||||||
|
* insert target list.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
RelabelTargetEntryList(List *selectTargetList, List *insertTargetList)
|
||||||
|
{
|
||||||
|
ListCell *selectTargetCell = NULL;
|
||||||
|
ListCell *insertTargetCell = NULL;
|
||||||
|
|
||||||
|
forboth(selectTargetCell, selectTargetList, insertTargetCell, insertTargetList)
|
||||||
|
{
|
||||||
|
TargetEntry *selectTargetEntry = lfirst(selectTargetCell);
|
||||||
|
TargetEntry *insertTargetEntry = lfirst(insertTargetCell);
|
||||||
|
|
||||||
|
selectTargetEntry->resname = insertTargetEntry->resname;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* AddInsertSelectCasts makes sure that the types in columns in the given
|
||||||
|
* target lists have the same type as the columns of the given relation.
|
||||||
|
* It might add casts to ensure that.
|
||||||
|
*
|
||||||
|
* It returns the updated selectTargetList.
|
||||||
|
*/
|
||||||
|
static List *
|
||||||
|
AddInsertSelectCasts(List *insertTargetList, List *selectTargetList,
|
||||||
|
Oid targetRelationId)
|
||||||
|
{
|
||||||
|
ListCell *insertEntryCell = NULL;
|
||||||
|
ListCell *selectEntryCell = NULL;
|
||||||
|
List *projectedEntries = NIL;
|
||||||
|
List *nonProjectedEntries = NIL;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ReorderInsertSelectTargetLists() makes sure that first few columns of
|
||||||
|
* the SELECT query match the insert targets. It might contain additional
|
||||||
|
* items for GROUP BY, etc.
|
||||||
|
*/
|
||||||
|
Assert(list_length(insertTargetList) <= list_length(selectTargetList));
|
||||||
|
|
||||||
|
Relation distributedRelation = heap_open(targetRelationId, RowExclusiveLock);
|
||||||
|
TupleDesc destTupleDescriptor = RelationGetDescr(distributedRelation);
|
||||||
|
|
||||||
|
int targetEntryIndex = 0;
|
||||||
|
forboth(insertEntryCell, insertTargetList, selectEntryCell, selectTargetList)
|
||||||
|
{
|
||||||
|
TargetEntry *insertEntry = (TargetEntry *) lfirst(insertEntryCell);
|
||||||
|
TargetEntry *selectEntry = (TargetEntry *) lfirst(selectEntryCell);
|
||||||
|
Var *insertColumn = (Var *) insertEntry->expr;
|
||||||
|
Form_pg_attribute attr = TupleDescAttr(destTupleDescriptor,
|
||||||
|
insertEntry->resno - 1);
|
||||||
|
|
||||||
|
Oid sourceType = insertColumn->vartype;
|
||||||
|
Oid targetType = attr->atttypid;
|
||||||
|
if (sourceType != targetType)
|
||||||
|
{
|
||||||
|
insertEntry->expr = CastExpr((Expr *) insertColumn, sourceType, targetType,
|
||||||
|
attr->attcollation, attr->atttypmod);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We cannot modify the selectEntry in-place, because ORDER BY or
|
||||||
|
* GROUP BY clauses might be pointing to it with comparison types
|
||||||
|
* of the source type. So instead we keep the original one as a
|
||||||
|
* non-projected entry, so GROUP BY and ORDER BY are happy, and
|
||||||
|
* create a duplicated projected entry with the coerced expression.
|
||||||
|
*/
|
||||||
|
TargetEntry *coercedEntry = copyObject(selectEntry);
|
||||||
|
coercedEntry->expr = CastExpr((Expr *) selectEntry->expr, sourceType,
|
||||||
|
targetType, attr->attcollation,
|
||||||
|
attr->atttypmod);
|
||||||
|
coercedEntry->ressortgroupref = 0;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* The only requirement is that users don't use this name in ORDER BY
|
||||||
|
* or GROUP BY, and it should be unique across the same query.
|
||||||
|
*/
|
||||||
|
StringInfo resnameString = makeStringInfo();
|
||||||
|
appendStringInfo(resnameString, "auto_coerced_by_citus_%d", targetEntryIndex);
|
||||||
|
coercedEntry->resname = resnameString->data;
|
||||||
|
|
||||||
|
projectedEntries = lappend(projectedEntries, coercedEntry);
|
||||||
|
|
||||||
|
if (selectEntry->ressortgroupref != 0)
|
||||||
|
{
|
||||||
|
selectEntry->resjunk = true;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This entry might still end up in the SELECT output list, so
|
||||||
|
* rename it to avoid ambiguity.
|
||||||
|
*
|
||||||
|
* See https://github.com/citusdata/citus/pull/3470.
|
||||||
|
*/
|
||||||
|
resnameString = makeStringInfo();
|
||||||
|
appendStringInfo(resnameString, "discarded_target_item_%d",
|
||||||
|
targetEntryIndex);
|
||||||
|
selectEntry->resname = resnameString->data;
|
||||||
|
|
||||||
|
nonProjectedEntries = lappend(nonProjectedEntries, selectEntry);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
projectedEntries = lappend(projectedEntries, selectEntry);
|
||||||
|
}
|
||||||
|
|
||||||
|
targetEntryIndex++;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int entryIndex = list_length(insertTargetList);
|
||||||
|
entryIndex < list_length(selectTargetList);
|
||||||
|
entryIndex++)
|
||||||
|
{
|
||||||
|
nonProjectedEntries = lappend(nonProjectedEntries, list_nth(selectTargetList,
|
||||||
|
entryIndex));
|
||||||
|
}
|
||||||
|
|
||||||
|
/* selectEntry->resno must be the ordinal number of the entry */
|
||||||
|
selectTargetList = list_concat(projectedEntries, nonProjectedEntries);
|
||||||
|
int entryResNo = 1;
|
||||||
|
TargetEntry *selectTargetEntry = NULL;
|
||||||
|
foreach_ptr(selectTargetEntry, selectTargetList)
|
||||||
|
{
|
||||||
|
selectTargetEntry->resno = entryResNo++;
|
||||||
|
}
|
||||||
|
|
||||||
|
heap_close(distributedRelation, NoLock);
|
||||||
|
|
||||||
|
return selectTargetList;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CastExpr returns an expression which casts the given expr from sourceType to
|
||||||
|
* the given targetType.
|
||||||
|
*/
|
||||||
|
static Expr *
|
||||||
|
CastExpr(Expr *expr, Oid sourceType, Oid targetType, Oid targetCollation,
|
||||||
|
int targetTypeMod)
|
||||||
|
{
|
||||||
|
Oid coercionFuncId = InvalidOid;
|
||||||
|
CoercionPathType coercionType = find_coercion_pathway(targetType, sourceType,
|
||||||
|
COERCION_EXPLICIT,
|
||||||
|
&coercionFuncId);
|
||||||
|
|
||||||
|
if (coercionType == COERCION_PATH_FUNC)
|
||||||
|
{
|
||||||
|
FuncExpr *coerceExpr = makeNode(FuncExpr);
|
||||||
|
coerceExpr->funcid = coercionFuncId;
|
||||||
|
coerceExpr->args = list_make1(copyObject(expr));
|
||||||
|
coerceExpr->funccollid = targetCollation;
|
||||||
|
coerceExpr->funcresulttype = targetType;
|
||||||
|
|
||||||
|
return (Expr *) coerceExpr;
|
||||||
|
}
|
||||||
|
else if (coercionType == COERCION_PATH_RELABELTYPE)
|
||||||
|
{
|
||||||
|
RelabelType *coerceExpr = makeNode(RelabelType);
|
||||||
|
coerceExpr->arg = copyObject(expr);
|
||||||
|
coerceExpr->resulttype = targetType;
|
||||||
|
coerceExpr->resulttypmod = targetTypeMod;
|
||||||
|
coerceExpr->resultcollid = targetCollation;
|
||||||
|
coerceExpr->relabelformat = COERCE_IMPLICIT_CAST;
|
||||||
|
coerceExpr->location = -1;
|
||||||
|
|
||||||
|
return (Expr *) coerceExpr;
|
||||||
|
}
|
||||||
|
else if (coercionType == COERCION_PATH_ARRAYCOERCE)
|
||||||
|
{
|
||||||
|
Oid sourceBaseType = get_base_element_type(sourceType);
|
||||||
|
Oid targetBaseType = get_base_element_type(targetType);
|
||||||
|
|
||||||
|
CaseTestExpr *elemExpr = makeNode(CaseTestExpr);
|
||||||
|
elemExpr->collation = targetCollation;
|
||||||
|
elemExpr->typeId = sourceBaseType;
|
||||||
|
elemExpr->typeMod = -1;
|
||||||
|
|
||||||
|
Expr *elemCastExpr = CastExpr((Expr *) elemExpr, sourceBaseType,
|
||||||
|
targetBaseType, targetCollation,
|
||||||
|
targetTypeMod);
|
||||||
|
|
||||||
|
ArrayCoerceExpr *coerceExpr = makeNode(ArrayCoerceExpr);
|
||||||
|
coerceExpr->arg = copyObject(expr);
|
||||||
|
coerceExpr->elemexpr = elemCastExpr;
|
||||||
|
coerceExpr->resultcollid = targetCollation;
|
||||||
|
coerceExpr->resulttype = targetType;
|
||||||
|
coerceExpr->resulttypmod = targetTypeMod;
|
||||||
|
coerceExpr->location = -1;
|
||||||
|
coerceExpr->coerceformat = COERCE_IMPLICIT_CAST;
|
||||||
|
|
||||||
|
return (Expr *) coerceExpr;
|
||||||
|
}
|
||||||
|
else if (coercionType == COERCION_PATH_COERCEVIAIO)
|
||||||
|
{
|
||||||
|
CoerceViaIO *coerceExpr = makeNode(CoerceViaIO);
|
||||||
|
coerceExpr->arg = (Expr *) copyObject(expr);
|
||||||
|
coerceExpr->resulttype = targetType;
|
||||||
|
coerceExpr->resultcollid = targetCollation;
|
||||||
|
coerceExpr->coerceformat = COERCE_IMPLICIT_CAST;
|
||||||
|
coerceExpr->location = -1;
|
||||||
|
|
||||||
|
return (Expr *) coerceExpr;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("could not find a conversion path from type %d to %d",
|
||||||
|
sourceType, targetType)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* PlanningInsertSelect returns true if we are planning an INSERT ...SELECT query */
|
||||||
|
bool
|
||||||
|
PlanningInsertSelect(void)
|
||||||
|
{
|
||||||
|
return insertSelectPlannerLevel > 0;
|
||||||
|
}
|
||||||
|
|
|
@ -198,33 +198,28 @@ CitusExplainScan(CustomScanState *node, List *ancestors, struct ExplainState *es
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CoordinatorInsertSelectExplainScan is a custom scan explain callback function
|
* NonPushableInsertSelectExplainScan is a custom scan explain callback function
|
||||||
* which is used to print explain information of a Citus plan for an INSERT INTO
|
* which is used to print explain information of a Citus plan for an INSERT INTO
|
||||||
* distributed_table SELECT ... query that is evaluated on the coordinator.
|
* distributed_table SELECT ... query that is evaluated on the coordinator or
|
||||||
|
* uses repartitioning.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
CoordinatorInsertSelectExplainScan(CustomScanState *node, List *ancestors,
|
NonPushableInsertSelectExplainScan(CustomScanState *node, List *ancestors,
|
||||||
struct ExplainState *es)
|
struct ExplainState *es)
|
||||||
{
|
{
|
||||||
CitusScanState *scanState = (CitusScanState *) node;
|
CitusScanState *scanState = (CitusScanState *) node;
|
||||||
DistributedPlan *distributedPlan = scanState->distributedPlan;
|
DistributedPlan *distributedPlan = scanState->distributedPlan;
|
||||||
Query *insertSelectQuery = distributedPlan->insertSelectQuery;
|
Query *insertSelectQuery = distributedPlan->insertSelectQuery;
|
||||||
Query *query = BuildSelectForInsertSelect(insertSelectQuery);
|
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery);
|
||||||
RangeTblEntry *insertRte = ExtractResultRelationRTE(insertSelectQuery);
|
|
||||||
Oid targetRelationId = insertRte->relid;
|
|
||||||
IntoClause *into = NULL;
|
|
||||||
ParamListInfo params = NULL;
|
|
||||||
char *queryString = NULL;
|
|
||||||
int cursorOptions = CURSOR_OPT_PARALLEL_OK;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Make a copy of the query, since pg_plan_query may scribble on it and later
|
* Create a copy because ExplainOneQuery can modify the query, and later
|
||||||
* stages of EXPLAIN require it.
|
* executions of prepared statements might require it. See
|
||||||
|
* https://github.com/citusdata/citus/issues/3947 for what can happen.
|
||||||
*/
|
*/
|
||||||
Query *queryCopy = copyObject(query);
|
Query *queryCopy = copyObject(selectRte->subquery);
|
||||||
PlannedStmt *selectPlan = pg_plan_query(queryCopy, cursorOptions, params);
|
|
||||||
bool repartition = IsRedistributablePlan(selectPlan->planTree) &&
|
bool repartition = distributedPlan->insertSelectMethod == INSERT_SELECT_REPARTITION;
|
||||||
IsSupportedRedistributionTarget(targetRelationId);
|
|
||||||
|
|
||||||
if (es->analyze)
|
if (es->analyze)
|
||||||
{
|
{
|
||||||
|
@ -245,7 +240,10 @@ CoordinatorInsertSelectExplainScan(CustomScanState *node, List *ancestors,
|
||||||
ExplainOpenGroup("Select Query", "Select Query", false, es);
|
ExplainOpenGroup("Select Query", "Select Query", false, es);
|
||||||
|
|
||||||
/* explain the inner SELECT query */
|
/* explain the inner SELECT query */
|
||||||
ExplainOneQuery(query, 0, into, es, queryString, params, NULL);
|
IntoClause *into = NULL;
|
||||||
|
ParamListInfo params = NULL;
|
||||||
|
char *queryString = NULL;
|
||||||
|
ExplainOneQuery(queryCopy, 0, into, es, queryString, params, NULL);
|
||||||
|
|
||||||
ExplainCloseGroup("Select Query", "Select Query", false, es);
|
ExplainCloseGroup("Select Query", "Select Query", false, es);
|
||||||
}
|
}
|
||||||
|
|
|
@ -129,6 +129,8 @@ CopyNodeDistributedPlan(COPYFUNC_ARGS)
|
||||||
COPY_NODE_FIELD(relationIdList);
|
COPY_NODE_FIELD(relationIdList);
|
||||||
COPY_SCALAR_FIELD(targetRelationId);
|
COPY_SCALAR_FIELD(targetRelationId);
|
||||||
COPY_NODE_FIELD(insertSelectQuery);
|
COPY_NODE_FIELD(insertSelectQuery);
|
||||||
|
COPY_NODE_FIELD(selectPlanForInsertSelect);
|
||||||
|
COPY_SCALAR_FIELD(insertSelectMethod);
|
||||||
COPY_STRING_FIELD(intermediateResultIdPrefix);
|
COPY_STRING_FIELD(intermediateResultIdPrefix);
|
||||||
|
|
||||||
COPY_NODE_FIELD(subPlanList);
|
COPY_NODE_FIELD(subPlanList);
|
||||||
|
|
|
@ -32,7 +32,7 @@ typedef struct CitusScanState
|
||||||
/* custom scan methods for all executors */
|
/* custom scan methods for all executors */
|
||||||
extern CustomScanMethods AdaptiveExecutorCustomScanMethods;
|
extern CustomScanMethods AdaptiveExecutorCustomScanMethods;
|
||||||
extern CustomScanMethods TaskTrackerCustomScanMethods;
|
extern CustomScanMethods TaskTrackerCustomScanMethods;
|
||||||
extern CustomScanMethods CoordinatorInsertSelectCustomScanMethods;
|
extern CustomScanMethods NonPushableInsertSelectCustomScanMethods;
|
||||||
extern CustomScanMethods DelayedErrorCustomScanMethods;
|
extern CustomScanMethods DelayedErrorCustomScanMethods;
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -18,8 +18,7 @@
|
||||||
|
|
||||||
extern bool EnableRepartitionedInsertSelect;
|
extern bool EnableRepartitionedInsertSelect;
|
||||||
|
|
||||||
extern TupleTableSlot * CoordinatorInsertSelectExecScan(CustomScanState *node);
|
extern TupleTableSlot * NonPushableInsertSelectExecScan(CustomScanState *node);
|
||||||
extern bool ExecutingInsertSelect(void);
|
|
||||||
extern Query * BuildSelectForInsertSelect(Query *insertSelectQuery);
|
extern Query * BuildSelectForInsertSelect(Query *insertSelectQuery);
|
||||||
extern bool IsSupportedRedistributionTarget(Oid targetRelationId);
|
extern bool IsSupportedRedistributionTarget(Oid targetRelationId);
|
||||||
extern bool IsRedistributablePlan(Plan *selectPlan);
|
extern bool IsRedistributablePlan(Plan *selectPlan);
|
||||||
|
|
|
@ -29,11 +29,12 @@ extern bool InsertSelectIntoLocalTable(Query *query);
|
||||||
extern Query * ReorderInsertSelectTargetLists(Query *originalQuery,
|
extern Query * ReorderInsertSelectTargetLists(Query *originalQuery,
|
||||||
RangeTblEntry *insertRte,
|
RangeTblEntry *insertRte,
|
||||||
RangeTblEntry *subqueryRte);
|
RangeTblEntry *subqueryRte);
|
||||||
extern void CoordinatorInsertSelectExplainScan(CustomScanState *node, List *ancestors,
|
extern void NonPushableInsertSelectExplainScan(CustomScanState *node, List *ancestors,
|
||||||
struct ExplainState *es);
|
struct ExplainState *es);
|
||||||
extern DistributedPlan * CreateInsertSelectPlan(uint64 planId, Query *originalQuery,
|
extern DistributedPlan * CreateInsertSelectPlan(uint64 planId, Query *originalQuery,
|
||||||
PlannerRestrictionContext *
|
PlannerRestrictionContext *
|
||||||
plannerRestrictionContext);
|
plannerRestrictionContext,
|
||||||
|
ParamListInfo boundParams);
|
||||||
extern DistributedPlan * CreateInsertSelectIntoLocalTablePlan(uint64 planId,
|
extern DistributedPlan * CreateInsertSelectIntoLocalTablePlan(uint64 planId,
|
||||||
Query *originalQuery,
|
Query *originalQuery,
|
||||||
ParamListInfo
|
ParamListInfo
|
||||||
|
@ -42,6 +43,7 @@ extern DistributedPlan * CreateInsertSelectIntoLocalTablePlan(uint64 planId,
|
||||||
PlannerRestrictionContext *
|
PlannerRestrictionContext *
|
||||||
plannerRestrictionContext);
|
plannerRestrictionContext);
|
||||||
extern char * InsertSelectResultIdPrefix(uint64 planId);
|
extern char * InsertSelectResultIdPrefix(uint64 planId);
|
||||||
|
extern bool PlanningInsertSelect(void);
|
||||||
|
|
||||||
|
|
||||||
#endif /* INSERT_SELECT_PLANNER_H */
|
#endif /* INSERT_SELECT_PLANNER_H */
|
||||||
|
|
|
@ -378,6 +378,22 @@ typedef struct JoinSequenceNode
|
||||||
} JoinSequenceNode;
|
} JoinSequenceNode;
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* InsertSelectMethod represents the method to use for INSERT INTO ... SELECT
|
||||||
|
* queries.
|
||||||
|
*
|
||||||
|
* Note that there is a third method which is not represented here, which is
|
||||||
|
* pushing down the INSERT INTO ... SELECT to workers. This method is executed
|
||||||
|
* similar to other distributed queries and doesn't need a special execution
|
||||||
|
* code, so we don't need to represent it here.
|
||||||
|
*/
|
||||||
|
typedef enum InsertSelectMethod
|
||||||
|
{
|
||||||
|
INSERT_SELECT_VIA_COORDINATOR,
|
||||||
|
INSERT_SELECT_REPARTITION
|
||||||
|
} InsertSelectMethod;
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* DistributedPlan contains all information necessary to execute a
|
* DistributedPlan contains all information necessary to execute a
|
||||||
* distribute query.
|
* distribute query.
|
||||||
|
@ -416,8 +432,11 @@ typedef struct DistributedPlan
|
||||||
/* target relation of a modification */
|
/* target relation of a modification */
|
||||||
Oid targetRelationId;
|
Oid targetRelationId;
|
||||||
|
|
||||||
/* INSERT .. SELECT via the coordinator */
|
/*
|
||||||
|
* INSERT .. SELECT via the coordinator or repartition */
|
||||||
Query *insertSelectQuery;
|
Query *insertSelectQuery;
|
||||||
|
PlannedStmt *selectPlanForInsertSelect;
|
||||||
|
InsertSelectMethod insertSelectMethod;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If intermediateResultIdPrefix is non-null, an INSERT ... SELECT
|
* If intermediateResultIdPrefix is non-null, an INSERT ... SELECT
|
||||||
|
|
|
@ -88,7 +88,7 @@ typedef enum
|
||||||
MULTI_EXECUTOR_INVALID_FIRST = 0,
|
MULTI_EXECUTOR_INVALID_FIRST = 0,
|
||||||
MULTI_EXECUTOR_ADAPTIVE = 1,
|
MULTI_EXECUTOR_ADAPTIVE = 1,
|
||||||
MULTI_EXECUTOR_TASK_TRACKER = 2,
|
MULTI_EXECUTOR_TASK_TRACKER = 2,
|
||||||
MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT = 3
|
MULTI_EXECUTOR_NON_PUSHABLE_INSERT_SELECT = 3
|
||||||
} MultiExecutorType;
|
} MultiExecutorType;
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -577,7 +577,7 @@ EXPLAIN INSERT INTO target_table SELECT a, max(b) FROM source_table GROUP BY a;
|
||||||
-- EXPLAIN ANALYZE is currently not supported
|
-- EXPLAIN ANALYZE is currently not supported
|
||||||
--
|
--
|
||||||
EXPLAIN ANALYZE INSERT INTO target_table SELECT a, max(b) FROM source_table GROUP BY a;
|
EXPLAIN ANALYZE INSERT INTO target_table SELECT a, max(b) FROM source_table GROUP BY a;
|
||||||
ERROR: EXPLAIN ANALYZE is currently not supported for INSERT ... SELECT commands with repartitioning
|
ERROR: EXPLAIN ANALYZE is currently not supported for INSERT ... SELECT commands with repartitioning
|
||||||
--
|
--
|
||||||
-- Duplicate names in target list
|
-- Duplicate names in target list
|
||||||
--
|
--
|
||||||
|
@ -660,6 +660,96 @@ SELECT a, count(*), count(distinct b) distinct_values FROM target_table GROUP BY
|
||||||
4 | 6 | 1
|
4 | 6 | 1
|
||||||
(5 rows)
|
(5 rows)
|
||||||
|
|
||||||
|
DEALLOCATE insert_plan;
|
||||||
|
--
|
||||||
|
-- Prepared router INSERT/SELECT. We currently use pull to coordinator when the
|
||||||
|
-- distributed query has a single task.
|
||||||
|
--
|
||||||
|
TRUNCATE target_table;
|
||||||
|
PREPARE insert_plan(int) AS
|
||||||
|
INSERT INTO target_table
|
||||||
|
SELECT a, max(b) FROM source_table
|
||||||
|
WHERE a=$1 GROUP BY a;
|
||||||
|
SET client_min_messages TO DEBUG1;
|
||||||
|
EXECUTE insert_plan(0);
|
||||||
|
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
|
||||||
|
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||||
|
EXECUTE insert_plan(0);
|
||||||
|
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
|
||||||
|
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||||
|
EXECUTE insert_plan(0);
|
||||||
|
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
|
||||||
|
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||||
|
EXECUTE insert_plan(0);
|
||||||
|
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
|
||||||
|
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||||
|
EXECUTE insert_plan(0);
|
||||||
|
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
|
||||||
|
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||||
|
EXECUTE insert_plan(0);
|
||||||
|
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
|
||||||
|
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||||
|
EXECUTE insert_plan(0);
|
||||||
|
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
|
||||||
|
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||||
|
RESET client_min_messages;
|
||||||
|
SELECT a, count(*), count(distinct b) distinct_values FROM target_table GROUP BY a ORDER BY a;
|
||||||
|
a | count | distinct_values
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0 | 7 | 1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DEALLOCATE insert_plan;
|
||||||
|
--
|
||||||
|
-- Prepared INSERT/SELECT with no parameters.
|
||||||
|
--
|
||||||
|
TRUNCATE target_table;
|
||||||
|
PREPARE insert_plan AS
|
||||||
|
INSERT INTO target_table
|
||||||
|
SELECT a, max(b) FROM source_table
|
||||||
|
WHERE a BETWEEN 1 AND 2 GROUP BY a;
|
||||||
|
EXPLAIN EXECUTE insert_plan;
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Custom Scan (Citus INSERT ... SELECT) (cost=0.00..0.00 rows=0 width=0)
|
||||||
|
INSERT/SELECT method: repartition
|
||||||
|
-> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=8)
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> GroupAggregate (cost=44.09..44.28 rows=11 width=8)
|
||||||
|
Group Key: a
|
||||||
|
-> Sort (cost=44.09..44.12 rows=11 width=8)
|
||||||
|
Sort Key: a
|
||||||
|
-> Seq Scan on source_table_4213606 source_table (cost=0.00..43.90 rows=11 width=8)
|
||||||
|
Filter: ((a >= 1) AND (a <= 2))
|
||||||
|
(13 rows)
|
||||||
|
|
||||||
|
SET client_min_messages TO DEBUG1;
|
||||||
|
EXECUTE insert_plan;
|
||||||
|
DEBUG: performing repartitioned INSERT ... SELECT
|
||||||
|
EXECUTE insert_plan;
|
||||||
|
DEBUG: performing repartitioned INSERT ... SELECT
|
||||||
|
EXECUTE insert_plan;
|
||||||
|
DEBUG: performing repartitioned INSERT ... SELECT
|
||||||
|
EXECUTE insert_plan;
|
||||||
|
DEBUG: performing repartitioned INSERT ... SELECT
|
||||||
|
EXECUTE insert_plan;
|
||||||
|
DEBUG: performing repartitioned INSERT ... SELECT
|
||||||
|
EXECUTE insert_plan;
|
||||||
|
DEBUG: performing repartitioned INSERT ... SELECT
|
||||||
|
EXECUTE insert_plan;
|
||||||
|
DEBUG: performing repartitioned INSERT ... SELECT
|
||||||
|
RESET client_min_messages;
|
||||||
|
SELECT a, count(*), count(distinct b) distinct_values FROM target_table GROUP BY a ORDER BY a;
|
||||||
|
a | count | distinct_values
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 7 | 1
|
||||||
|
2 | 7 | 1
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
DEALLOCATE insert_plan;
|
||||||
--
|
--
|
||||||
-- INSERT/SELECT in CTE
|
-- INSERT/SELECT in CTE
|
||||||
--
|
--
|
||||||
|
@ -673,10 +763,10 @@ DEBUG: INSERT target table and the source relation of the SELECT partition colu
|
||||||
DEBUG: only SELECT, UPDATE, or DELETE common table expressions may be router planned
|
DEBUG: only SELECT, UPDATE, or DELETE common table expressions may be router planned
|
||||||
DEBUG: generating subplan XXX_1 for CTE r: INSERT INTO insert_select_repartition.target_table (a, b) SELECT a, b FROM insert_select_repartition.source_table RETURNING target_table.a, target_table.b
|
DEBUG: generating subplan XXX_1 for CTE r: INSERT INTO insert_select_repartition.target_table (a, b) SELECT a, b FROM insert_select_repartition.source_table RETURNING target_table.a, target_table.b
|
||||||
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
|
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
|
||||||
|
DEBUG: Router planner cannot handle multi-shard select queries
|
||||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT a, max AS b FROM (SELECT source_table.a, max(source_table.b) AS max FROM (insert_select_repartition.source_table JOIN (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) r USING (a, b)) GROUP BY source_table.a) citus_insert_select_subquery
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT a, max AS b FROM (SELECT source_table.a, max(source_table.b) AS max FROM (insert_select_repartition.source_table JOIN (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) r USING (a, b)) GROUP BY source_table.a) citus_insert_select_subquery
|
||||||
DEBUG: Router planner cannot handle multi-shard select queries
|
DEBUG: Router planner cannot handle multi-shard select queries
|
||||||
DEBUG: performing repartitioned INSERT ... SELECT
|
DEBUG: performing repartitioned INSERT ... SELECT
|
||||||
DEBUG: Router planner cannot handle multi-shard select queries
|
|
||||||
DEBUG: performing repartitioned INSERT ... SELECT
|
DEBUG: performing repartitioned INSERT ... SELECT
|
||||||
DEBUG: partitioning SELECT query by column index 0 with name 'a'
|
DEBUG: partitioning SELECT query by column index 0 with name 'a'
|
||||||
DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213610 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213606_to_0,repartitioned_results_xxxxx_from_4213607_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) RETURNING citus_table_alias.a, citus_table_alias.b
|
DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213610 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213606_to_0,repartitioned_results_xxxxx_from_4213607_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) RETURNING citus_table_alias.a, citus_table_alias.b
|
||||||
|
@ -1160,6 +1250,31 @@ DO UPDATE SET
|
||||||
-> Seq Scan on source_table_4213644 source_table
|
-> Seq Scan on source_table_4213644 source_table
|
||||||
(10 rows)
|
(10 rows)
|
||||||
|
|
||||||
|
-- verify that we don't report repartitioned insert/select for tables
|
||||||
|
-- with sequences. See https://github.com/citusdata/citus/issues/3936
|
||||||
|
create table table_with_sequences (x int, y int, z bigserial);
|
||||||
|
insert into table_with_sequences values (1,1);
|
||||||
|
select create_distributed_table('table_with_sequences','x');
|
||||||
|
NOTICE: Copying data from local table...
|
||||||
|
NOTICE: copying the data has completed
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
explain insert into table_with_sequences select y, x from table_with_sequences;
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Custom Scan (Citus INSERT ... SELECT) (cost=0.00..0.00 rows=0 width=0)
|
||||||
|
INSERT/SELECT method: pull to coordinator
|
||||||
|
-> Custom Scan (Citus Adaptive) (cost=0.00..250.00 rows=100000 width=16)
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Seq Scan on table_with_sequences_4213648 table_with_sequences (cost=0.00..28.50 rows=1850 width=8)
|
||||||
|
(8 rows)
|
||||||
|
|
||||||
-- clean-up
|
-- clean-up
|
||||||
SET client_min_messages TO WARNING;
|
SET client_min_messages TO WARNING;
|
||||||
DROP SCHEMA insert_select_repartition CASCADE;
|
DROP SCHEMA insert_select_repartition CASCADE;
|
||||||
|
|
|
@ -1357,31 +1357,33 @@ WITH cte1 AS (SELECT * FROM cte1 WHERE EXISTS (SELECT * FROM cte1) LIMIT 5)
|
||||||
SELECT s FROM cte1 WHERE EXISTS (SELECT * FROM cte1);
|
SELECT s FROM cte1 WHERE EXISTS (SELECT * FROM cte1);
|
||||||
Custom Scan (Citus INSERT ... SELECT)
|
Custom Scan (Citus INSERT ... SELECT)
|
||||||
INSERT/SELECT method: pull to coordinator
|
INSERT/SELECT method: pull to coordinator
|
||||||
-> Result
|
-> Subquery Scan on citus_insert_select_subquery
|
||||||
One-Time Filter: $3
|
|
||||||
CTE cte1
|
CTE cte1
|
||||||
-> Function Scan on generate_series s
|
-> Function Scan on generate_series s
|
||||||
CTE cte1
|
-> Result
|
||||||
-> Limit
|
One-Time Filter: $3
|
||||||
InitPlan 2 (returns $1)
|
CTE cte1
|
||||||
-> CTE Scan on cte1 cte1_1
|
-> Limit
|
||||||
-> Result
|
InitPlan 2 (returns $1)
|
||||||
One-Time Filter: $1
|
-> CTE Scan on cte1 cte1_1
|
||||||
-> CTE Scan on cte1 cte1_2
|
-> Result
|
||||||
InitPlan 4 (returns $3)
|
One-Time Filter: $1
|
||||||
-> CTE Scan on cte1 cte1_3
|
-> CTE Scan on cte1 cte1_2
|
||||||
-> CTE Scan on cte1
|
InitPlan 4 (returns $3)
|
||||||
|
-> CTE Scan on cte1 cte1_3
|
||||||
|
-> CTE Scan on cte1
|
||||||
EXPLAIN (COSTS OFF)
|
EXPLAIN (COSTS OFF)
|
||||||
INSERT INTO lineitem_hash_part
|
INSERT INTO lineitem_hash_part
|
||||||
( SELECT s FROM generate_series(1,5) s) UNION
|
( SELECT s FROM generate_series(1,5) s) UNION
|
||||||
( SELECT s FROM generate_series(5,10) s);
|
( SELECT s FROM generate_series(5,10) s);
|
||||||
Custom Scan (Citus INSERT ... SELECT)
|
Custom Scan (Citus INSERT ... SELECT)
|
||||||
INSERT/SELECT method: pull to coordinator
|
INSERT/SELECT method: pull to coordinator
|
||||||
-> HashAggregate
|
-> Subquery Scan on citus_insert_select_subquery
|
||||||
Group Key: s.s
|
-> HashAggregate
|
||||||
-> Append
|
Group Key: s.s
|
||||||
-> Function Scan on generate_series s
|
-> Append
|
||||||
-> Function Scan on generate_series s_1
|
-> Function Scan on generate_series s
|
||||||
|
-> Function Scan on generate_series s_1
|
||||||
-- explain with recursive planning
|
-- explain with recursive planning
|
||||||
-- prevent PG 11 - PG 12 outputs to diverge
|
-- prevent PG 11 - PG 12 outputs to diverge
|
||||||
SET citus.enable_cte_inlining TO false;
|
SET citus.enable_cte_inlining TO false;
|
||||||
|
|
|
@ -711,6 +711,13 @@ INSERT INTO agg_events
|
||||||
DEBUG: CTE sub_cte is going to be inlined via distributed planning
|
DEBUG: CTE sub_cte is going to be inlined via distributed planning
|
||||||
DEBUG: Subqueries without relations are not allowed in distributed INSERT ... SELECT queries
|
DEBUG: Subqueries without relations are not allowed in distributed INSERT ... SELECT queries
|
||||||
DEBUG: Router planner cannot handle multi-shard select queries
|
DEBUG: Router planner cannot handle multi-shard select queries
|
||||||
|
DEBUG: Subqueries without relations are not allowed in distributed INSERT ... SELECT queries
|
||||||
|
DEBUG: CTE sub_cte is going to be inlined via distributed planning
|
||||||
|
DEBUG: Router planner cannot handle multi-shard select queries
|
||||||
|
DEBUG: Router planner cannot handle multi-shard select queries
|
||||||
|
DEBUG: generating subplan XXX_1 for CTE sub_cte: SELECT 1
|
||||||
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT user_id, (SELECT sub_cte."?column?" FROM (SELECT intermediate_result."?column?" FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result("?column?" integer)) sub_cte) AS value_1_agg FROM public.raw_events_first
|
||||||
|
DEBUG: Router planner cannot handle multi-shard select queries
|
||||||
ERROR: could not run distributed query with subquery outside the FROM, WHERE and HAVING clauses
|
ERROR: could not run distributed query with subquery outside the FROM, WHERE and HAVING clauses
|
||||||
HINT: Consider using an equality filter on the distributed table's partition column.
|
HINT: Consider using an equality filter on the distributed table's partition column.
|
||||||
-- We support set operations via the coordinator
|
-- We support set operations via the coordinator
|
||||||
|
@ -1045,7 +1052,7 @@ $Q$);
|
||||||
Custom Scan (Citus INSERT ... SELECT)
|
Custom Scan (Citus INSERT ... SELECT)
|
||||||
INSERT/SELECT method: pull to coordinator
|
INSERT/SELECT method: pull to coordinator
|
||||||
-> HashAggregate
|
-> HashAggregate
|
||||||
Group Key: remote_scan.id
|
Group Key: remote_scan.user_id
|
||||||
-> Custom Scan (Citus Adaptive)
|
-> Custom Scan (Citus Adaptive)
|
||||||
-> Distributed Subplan XXX_1
|
-> Distributed Subplan XXX_1
|
||||||
-> Custom Scan (Citus Adaptive)
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
@ -1110,7 +1117,7 @@ $Q$);
|
||||||
Custom Scan (Citus INSERT ... SELECT)
|
Custom Scan (Citus INSERT ... SELECT)
|
||||||
INSERT/SELECT method: pull to coordinator
|
INSERT/SELECT method: pull to coordinator
|
||||||
-> HashAggregate
|
-> HashAggregate
|
||||||
Group Key: remote_scan.id
|
Group Key: remote_scan.user_id
|
||||||
-> Custom Scan (Citus Adaptive)
|
-> Custom Scan (Citus Adaptive)
|
||||||
-> Distributed Subplan XXX_1
|
-> Distributed Subplan XXX_1
|
||||||
-> Custom Scan (Citus Adaptive)
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
@ -1409,7 +1416,7 @@ $Q$);
|
||||||
Custom Scan (Citus INSERT ... SELECT)
|
Custom Scan (Citus INSERT ... SELECT)
|
||||||
INSERT/SELECT method: pull to coordinator
|
INSERT/SELECT method: pull to coordinator
|
||||||
-> HashAggregate
|
-> HashAggregate
|
||||||
Group Key: remote_scan.id
|
Group Key: remote_scan.user_id
|
||||||
-> Custom Scan (Citus Adaptive)
|
-> Custom Scan (Citus Adaptive)
|
||||||
-> Distributed Subplan XXX_1
|
-> Distributed Subplan XXX_1
|
||||||
-> Custom Scan (Citus Adaptive)
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
@ -1454,7 +1461,7 @@ $Q$);
|
||||||
Custom Scan (Citus INSERT ... SELECT)
|
Custom Scan (Citus INSERT ... SELECT)
|
||||||
INSERT/SELECT method: pull to coordinator
|
INSERT/SELECT method: pull to coordinator
|
||||||
-> HashAggregate
|
-> HashAggregate
|
||||||
Group Key: remote_scan.id
|
Group Key: remote_scan.user_id
|
||||||
-> Custom Scan (Citus Adaptive)
|
-> Custom Scan (Citus Adaptive)
|
||||||
-> Distributed Subplan XXX_1
|
-> Distributed Subplan XXX_1
|
||||||
-> HashAggregate
|
-> HashAggregate
|
||||||
|
@ -1502,7 +1509,7 @@ $Q$);
|
||||||
Custom Scan (Citus INSERT ... SELECT)
|
Custom Scan (Citus INSERT ... SELECT)
|
||||||
INSERT/SELECT method: pull to coordinator
|
INSERT/SELECT method: pull to coordinator
|
||||||
-> HashAggregate
|
-> HashAggregate
|
||||||
Group Key: remote_scan.id
|
Group Key: remote_scan.user_id
|
||||||
-> Custom Scan (Citus Adaptive)
|
-> Custom Scan (Citus Adaptive)
|
||||||
-> Distributed Subplan XXX_1
|
-> Distributed Subplan XXX_1
|
||||||
-> Custom Scan (Citus Adaptive)
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
|
|
@ -114,10 +114,10 @@ WITH inserted_table AS (
|
||||||
) SELECT * FROM inserted_table ORDER BY 1;
|
) SELECT * FROM inserted_table ORDER BY 1;
|
||||||
DEBUG: generating subplan XXX_1 for CTE inserted_table: INSERT INTO on_conflict.target_table (col_1, col_2) SELECT col_1, col_2 FROM (SELECT source_table_1.col_1, source_table_1.col_2, source_table_1.col_3 FROM on_conflict.source_table_1 LIMIT 5) foo ON CONFLICT(col_1) DO UPDATE SET col_2 = excluded.col_2 RETURNING target_table.col_1, target_table.col_2
|
DEBUG: generating subplan XXX_1 for CTE inserted_table: INSERT INTO on_conflict.target_table (col_1, col_2) SELECT col_1, col_2 FROM (SELECT source_table_1.col_1, source_table_1.col_2, source_table_1.col_3 FROM on_conflict.source_table_1 LIMIT 5) foo ON CONFLICT(col_1) DO UPDATE SET col_2 = excluded.col_2 RETURNING target_table.col_1, target_table.col_2
|
||||||
DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries
|
DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries
|
||||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) inserted_table ORDER BY col_1
|
|
||||||
DEBUG: push down of limit count: 5
|
DEBUG: push down of limit count: 5
|
||||||
DEBUG: generating subplan XXX_1 for subquery SELECT col_1, col_2, col_3 FROM on_conflict.source_table_1 LIMIT 5
|
DEBUG: generating subplan XXX_1 for subquery SELECT col_1, col_2, col_3 FROM on_conflict.source_table_1 LIMIT 5
|
||||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer)) foo
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer)) foo
|
||||||
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) inserted_table ORDER BY col_1
|
||||||
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||||
col_1 | col_2
|
col_1 | col_2
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
@ -150,13 +150,13 @@ WITH inserted_table AS (
|
||||||
) SELECT * FROM inserted_table ORDER BY 1;
|
) SELECT * FROM inserted_table ORDER BY 1;
|
||||||
DEBUG: generating subplan XXX_1 for CTE inserted_table: INSERT INTO on_conflict.target_table (col_1, col_2) SELECT col_1, col_2 FROM ((SELECT source_table_1.col_1, source_table_1.col_2, source_table_1.col_3 FROM on_conflict.source_table_1 LIMIT 5) UNION (SELECT source_table_2.col_1, source_table_2.col_2, source_table_2.col_3 FROM on_conflict.source_table_2 LIMIT 5)) foo ON CONFLICT(col_1) DO UPDATE SET col_2 = 0 RETURNING target_table.col_1, target_table.col_2
|
DEBUG: generating subplan XXX_1 for CTE inserted_table: INSERT INTO on_conflict.target_table (col_1, col_2) SELECT col_1, col_2 FROM ((SELECT source_table_1.col_1, source_table_1.col_2, source_table_1.col_3 FROM on_conflict.source_table_1 LIMIT 5) UNION (SELECT source_table_2.col_1, source_table_2.col_2, source_table_2.col_3 FROM on_conflict.source_table_2 LIMIT 5)) foo ON CONFLICT(col_1) DO UPDATE SET col_2 = 0 RETURNING target_table.col_1, target_table.col_2
|
||||||
DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries
|
DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries
|
||||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) inserted_table ORDER BY col_1
|
|
||||||
DEBUG: push down of limit count: 5
|
DEBUG: push down of limit count: 5
|
||||||
DEBUG: generating subplan XXX_1 for subquery SELECT col_1, col_2, col_3 FROM on_conflict.source_table_1 LIMIT 5
|
DEBUG: generating subplan XXX_1 for subquery SELECT col_1, col_2, col_3 FROM on_conflict.source_table_1 LIMIT 5
|
||||||
DEBUG: push down of limit count: 5
|
DEBUG: push down of limit count: 5
|
||||||
DEBUG: generating subplan XXX_2 for subquery SELECT col_1, col_2, col_3 FROM on_conflict.source_table_2 LIMIT 5
|
DEBUG: generating subplan XXX_2 for subquery SELECT col_1, col_2, col_3 FROM on_conflict.source_table_2 LIMIT 5
|
||||||
DEBUG: generating subplan XXX_3 for subquery SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer) UNION SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer)
|
DEBUG: generating subplan XXX_3 for subquery SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer) UNION SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer)
|
||||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer)) foo
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer)) foo
|
||||||
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) inserted_table ORDER BY col_1
|
||||||
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||||
col_1 | col_2
|
col_1 | col_2
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
@ -246,10 +246,10 @@ WITH inserted_table AS (
|
||||||
) SELECT * FROM inserted_table ORDER BY 1;
|
) SELECT * FROM inserted_table ORDER BY 1;
|
||||||
DEBUG: generating subplan XXX_1 for CTE inserted_table: WITH cte AS (SELECT source_table_1.col_1, source_table_1.col_2, source_table_1.col_3 FROM on_conflict.source_table_1), cte_2 AS (SELECT cte.col_1, cte.col_2 FROM cte) INSERT INTO on_conflict.target_table (col_1, col_2) SELECT col_1, col_2 FROM cte_2 ON CONFLICT(col_1) DO UPDATE SET col_2 = (excluded.col_2 OPERATOR(pg_catalog.+) 1) RETURNING target_table.col_1, target_table.col_2
|
DEBUG: generating subplan XXX_1 for CTE inserted_table: WITH cte AS (SELECT source_table_1.col_1, source_table_1.col_2, source_table_1.col_3 FROM on_conflict.source_table_1), cte_2 AS (SELECT cte.col_1, cte.col_2 FROM cte) INSERT INTO on_conflict.target_table (col_1, col_2) SELECT col_1, col_2 FROM cte_2 ON CONFLICT(col_1) DO UPDATE SET col_2 = (excluded.col_2 OPERATOR(pg_catalog.+) 1) RETURNING target_table.col_1, target_table.col_2
|
||||||
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
|
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
|
||||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) inserted_table ORDER BY col_1
|
|
||||||
DEBUG: generating subplan XXX_1 for CTE cte: SELECT col_1, col_2, col_3 FROM on_conflict.source_table_1
|
DEBUG: generating subplan XXX_1 for CTE cte: SELECT col_1, col_2, col_3 FROM on_conflict.source_table_1
|
||||||
DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer)) cte
|
DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer)) cte
|
||||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT cte_2.col_1, cte_2.col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) cte_2) citus_insert_select_subquery
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT cte_2.col_1, cte_2.col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) cte_2) citus_insert_select_subquery
|
||||||
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) inserted_table ORDER BY col_1
|
||||||
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||||
col_1 | col_2
|
col_1 | col_2
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
@ -269,9 +269,9 @@ WITH cte AS (
|
||||||
UPDATE target_table SET col_2 = 4 WHERE col_1 IN (SELECT col_1 FROM cte);
|
UPDATE target_table SET col_2 = 4 WHERE col_1 IN (SELECT col_1 FROM cte);
|
||||||
DEBUG: generating subplan XXX_1 for CTE cte: WITH basic AS (SELECT source_table_1.col_1, source_table_1.col_2 FROM on_conflict.source_table_1) INSERT INTO on_conflict.target_table (col_1, col_2) SELECT col_1, col_2 FROM basic ON CONFLICT DO NOTHING RETURNING target_table.col_1, target_table.col_2
|
DEBUG: generating subplan XXX_1 for CTE cte: WITH basic AS (SELECT source_table_1.col_1, source_table_1.col_2 FROM on_conflict.source_table_1) INSERT INTO on_conflict.target_table (col_1, col_2) SELECT col_1, col_2 FROM basic ON CONFLICT DO NOTHING RETURNING target_table.col_1, target_table.col_2
|
||||||
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
|
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
|
||||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE on_conflict.target_table SET col_2 = 4 WHERE (col_1 OPERATOR(pg_catalog.=) ANY (SELECT cte.col_1 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) cte))
|
|
||||||
DEBUG: generating subplan XXX_1 for CTE basic: SELECT col_1, col_2 FROM on_conflict.source_table_1
|
DEBUG: generating subplan XXX_1 for CTE basic: SELECT col_1, col_2 FROM on_conflict.source_table_1
|
||||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT basic.col_1, basic.col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) basic) citus_insert_select_subquery
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT basic.col_1, basic.col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) basic) citus_insert_select_subquery
|
||||||
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE on_conflict.target_table SET col_2 = 4 WHERE (col_1 OPERATOR(pg_catalog.=) ANY (SELECT cte.col_1 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) cte))
|
||||||
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||||
RESET client_min_messages;
|
RESET client_min_messages;
|
||||||
-- Following query is supported by using repartition join for the insert/select
|
-- Following query is supported by using repartition join for the insert/select
|
||||||
|
|
|
@ -684,7 +684,7 @@ $Q$);
|
||||||
Custom Scan (Citus INSERT ... SELECT)
|
Custom Scan (Citus INSERT ... SELECT)
|
||||||
INSERT/SELECT method: pull to coordinator
|
INSERT/SELECT method: pull to coordinator
|
||||||
-> HashAggregate
|
-> HashAggregate
|
||||||
Group Key: remote_scan.user_id, remote_scan.event_type
|
Group Key: remote_scan.user_id, remote_scan.value_1_agg
|
||||||
-> Custom Scan (Citus Adaptive)
|
-> Custom Scan (Citus Adaptive)
|
||||||
-> Distributed Subplan XXX_1
|
-> Distributed Subplan XXX_1
|
||||||
-> Custom Scan (Citus Adaptive)
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
@ -704,7 +704,7 @@ $Q$);
|
||||||
Custom Scan (Citus INSERT ... SELECT)
|
Custom Scan (Citus INSERT ... SELECT)
|
||||||
INSERT/SELECT method: pull to coordinator
|
INSERT/SELECT method: pull to coordinator
|
||||||
-> HashAggregate
|
-> HashAggregate
|
||||||
Group Key: remote_scan.user_id, remote_scan.event_type
|
Group Key: remote_scan.user_id, remote_scan.value_1_agg
|
||||||
-> Custom Scan (Citus Adaptive)
|
-> Custom Scan (Citus Adaptive)
|
||||||
-> Distributed Subplan XXX_1
|
-> Distributed Subplan XXX_1
|
||||||
-> Custom Scan (Citus Adaptive)
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
@ -725,7 +725,7 @@ $Q$);
|
||||||
Custom Scan (Citus INSERT ... SELECT)
|
Custom Scan (Citus INSERT ... SELECT)
|
||||||
INSERT/SELECT method: pull to coordinator
|
INSERT/SELECT method: pull to coordinator
|
||||||
-> HashAggregate
|
-> HashAggregate
|
||||||
Group Key: remote_scan.user_id, remote_scan.event_type
|
Group Key: remote_scan.user_id, remote_scan.value_1_agg
|
||||||
-> Custom Scan (Citus Adaptive)
|
-> Custom Scan (Citus Adaptive)
|
||||||
-> Distributed Subplan XXX_1
|
-> Distributed Subplan XXX_1
|
||||||
-> Custom Scan (Citus Adaptive)
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
|
|
@ -317,6 +317,60 @@ RESET client_min_messages;
|
||||||
|
|
||||||
SELECT a, count(*), count(distinct b) distinct_values FROM target_table GROUP BY a ORDER BY a;
|
SELECT a, count(*), count(distinct b) distinct_values FROM target_table GROUP BY a ORDER BY a;
|
||||||
|
|
||||||
|
DEALLOCATE insert_plan;
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Prepared router INSERT/SELECT. We currently use pull to coordinator when the
|
||||||
|
-- distributed query has a single task.
|
||||||
|
--
|
||||||
|
TRUNCATE target_table;
|
||||||
|
|
||||||
|
PREPARE insert_plan(int) AS
|
||||||
|
INSERT INTO target_table
|
||||||
|
SELECT a, max(b) FROM source_table
|
||||||
|
WHERE a=$1 GROUP BY a;
|
||||||
|
|
||||||
|
SET client_min_messages TO DEBUG1;
|
||||||
|
EXECUTE insert_plan(0);
|
||||||
|
EXECUTE insert_plan(0);
|
||||||
|
EXECUTE insert_plan(0);
|
||||||
|
EXECUTE insert_plan(0);
|
||||||
|
EXECUTE insert_plan(0);
|
||||||
|
EXECUTE insert_plan(0);
|
||||||
|
EXECUTE insert_plan(0);
|
||||||
|
RESET client_min_messages;
|
||||||
|
|
||||||
|
SELECT a, count(*), count(distinct b) distinct_values FROM target_table GROUP BY a ORDER BY a;
|
||||||
|
|
||||||
|
DEALLOCATE insert_plan;
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Prepared INSERT/SELECT with no parameters.
|
||||||
|
--
|
||||||
|
|
||||||
|
TRUNCATE target_table;
|
||||||
|
|
||||||
|
PREPARE insert_plan AS
|
||||||
|
INSERT INTO target_table
|
||||||
|
SELECT a, max(b) FROM source_table
|
||||||
|
WHERE a BETWEEN 1 AND 2 GROUP BY a;
|
||||||
|
|
||||||
|
EXPLAIN EXECUTE insert_plan;
|
||||||
|
|
||||||
|
SET client_min_messages TO DEBUG1;
|
||||||
|
EXECUTE insert_plan;
|
||||||
|
EXECUTE insert_plan;
|
||||||
|
EXECUTE insert_plan;
|
||||||
|
EXECUTE insert_plan;
|
||||||
|
EXECUTE insert_plan;
|
||||||
|
EXECUTE insert_plan;
|
||||||
|
EXECUTE insert_plan;
|
||||||
|
RESET client_min_messages;
|
||||||
|
|
||||||
|
SELECT a, count(*), count(distinct b) distinct_values FROM target_table GROUP BY a ORDER BY a;
|
||||||
|
|
||||||
|
DEALLOCATE insert_plan;
|
||||||
|
|
||||||
--
|
--
|
||||||
-- INSERT/SELECT in CTE
|
-- INSERT/SELECT in CTE
|
||||||
--
|
--
|
||||||
|
@ -565,6 +619,14 @@ DO UPDATE SET
|
||||||
cardinality = enriched.cardinality + excluded.cardinality,
|
cardinality = enriched.cardinality + excluded.cardinality,
|
||||||
sum = enriched.sum + excluded.sum;
|
sum = enriched.sum + excluded.sum;
|
||||||
|
|
||||||
|
|
||||||
|
-- verify that we don't report repartitioned insert/select for tables
|
||||||
|
-- with sequences. See https://github.com/citusdata/citus/issues/3936
|
||||||
|
create table table_with_sequences (x int, y int, z bigserial);
|
||||||
|
insert into table_with_sequences values (1,1);
|
||||||
|
select create_distributed_table('table_with_sequences','x');
|
||||||
|
explain insert into table_with_sequences select y, x from table_with_sequences;
|
||||||
|
|
||||||
-- clean-up
|
-- clean-up
|
||||||
SET client_min_messages TO WARNING;
|
SET client_min_messages TO WARNING;
|
||||||
DROP SCHEMA insert_select_repartition CASCADE;
|
DROP SCHEMA insert_select_repartition CASCADE;
|
||||||
|
|
Loading…
Reference in New Issue