mirror of https://github.com/citusdata/citus.git
1125 lines
36 KiB
C
1125 lines
36 KiB
C
/*-------------------------------------------------------------------------
|
|
*
|
|
* insert_select_executor.c
|
|
*
|
|
* Executor logic for INSERT..SELECT.
|
|
*
|
|
* Copyright (c) Citus Data, Inc.
|
|
*-------------------------------------------------------------------------
|
|
*/
|
|
|
|
#include "postgres.h"
|
|
#include "miscadmin.h"
|
|
|
|
#include "distributed/citus_ruleutils.h"
|
|
#include "distributed/commands/multi_copy.h"
|
|
#include "distributed/adaptive_executor.h"
|
|
#include "distributed/deparse_shard_query.h"
|
|
#include "distributed/distributed_execution_locks.h"
|
|
#include "distributed/insert_select_executor.h"
|
|
#include "distributed/insert_select_planner.h"
|
|
#include "distributed/intermediate_results.h"
|
|
#include "distributed/local_executor.h"
|
|
#include "distributed/multi_executor.h"
|
|
#include "distributed/multi_partitioning_utils.h"
|
|
#include "distributed/multi_physical_planner.h"
|
|
#include "distributed/listutils.h"
|
|
#include "distributed/metadata_cache.h"
|
|
#include "distributed/multi_router_planner.h"
|
|
#include "distributed/local_executor.h"
|
|
#include "distributed/distributed_planner.h"
|
|
#include "distributed/recursive_planning.h"
|
|
#include "distributed/relation_access_tracking.h"
|
|
#include "distributed/resource_lock.h"
|
|
#include "distributed/shardinterval_utils.h"
|
|
#include "distributed/subplan_execution.h"
|
|
#include "distributed/transaction_management.h"
|
|
#include "executor/executor.h"
|
|
#include "nodes/execnodes.h"
|
|
#include "nodes/makefuncs.h"
|
|
#include "nodes/nodeFuncs.h"
|
|
#include "nodes/parsenodes.h"
|
|
#include "nodes/plannodes.h"
|
|
#include "parser/parse_coerce.h"
|
|
#include "parser/parse_relation.h"
|
|
#include "parser/parsetree.h"
|
|
#include "tcop/pquery.h"
|
|
#include "tcop/tcopprot.h"
|
|
#include "utils/lsyscache.h"
|
|
#include "utils/portal.h"
|
|
#include "utils/rel.h"
|
|
#include "utils/snapmgr.h"
|
|
|
|
/* Config variables managed via guc.c */
|
|
bool EnableRepartitionedInsertSelect = true;
|
|
|
|
/* depth of current insert/select executor. */
|
|
static int insertSelectExecutorLevel = 0;
|
|
|
|
|
|
static TupleTableSlot * NonPushableInsertSelectExecScanInternal(CustomScanState *node);
|
|
static Query * WrapSubquery(Query *subquery);
|
|
static List * TwoPhaseInsertSelectTaskList(Oid targetRelationId, Query *insertSelectQuery,
|
|
char *resultIdPrefix);
|
|
static void ExecutePlanIntoRelation(Oid targetRelationId, List *insertTargetList,
|
|
PlannedStmt *selectPlan, EState *executorState);
|
|
static HTAB * ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId,
|
|
List *insertTargetList,
|
|
PlannedStmt *selectPlan,
|
|
EState *executorState,
|
|
char *intermediateResultIdPrefix);
|
|
static List * BuildColumnNameListFromTargetList(Oid targetRelationId,
|
|
List *insertTargetList);
|
|
static int PartitionColumnIndexFromColumnList(Oid relationId, List *columnNameList);
|
|
static List * AddInsertSelectCasts(List *insertTargetList, List *selectTargetList,
|
|
Oid targetRelationId);
|
|
static List * RedistributedInsertSelectTaskList(Query *insertSelectQuery,
|
|
CitusTableCacheEntry *targetRelation,
|
|
List **redistributedResults,
|
|
bool useBinaryFormat);
|
|
static int PartitionColumnIndex(List *insertTargetList, Var *partitionColumn);
|
|
static Expr * CastExpr(Expr *expr, Oid sourceType, Oid targetType, Oid targetCollation,
|
|
int targetTypeMod);
|
|
static void WrapTaskListForProjection(List *taskList, List *projectedTargetEntries);
|
|
static void RelableTargetEntryList(List *selectTargetList, List *insertTargetList);
|
|
|
|
|
|
/*
|
|
* NonPushableInsertSelectExecScan is a wrapper around
|
|
* NonPushableInsertSelectExecScanInternal which also properly increments
|
|
* or decrements insertSelectExecutorLevel.
|
|
*/
|
|
TupleTableSlot *
|
|
NonPushableInsertSelectExecScan(CustomScanState *node)
|
|
{
|
|
TupleTableSlot *result = NULL;
|
|
insertSelectExecutorLevel++;
|
|
|
|
PG_TRY();
|
|
{
|
|
result = NonPushableInsertSelectExecScanInternal(node);
|
|
}
|
|
PG_CATCH();
|
|
{
|
|
insertSelectExecutorLevel--;
|
|
PG_RE_THROW();
|
|
}
|
|
PG_END_TRY();
|
|
|
|
insertSelectExecutorLevel--;
|
|
return result;
|
|
}
|
|
|
|
|
|
/*
|
|
* NonPushableInsertSelectExecScan executes an INSERT INTO distributed_table
|
|
* SELECT .. query either by routing via coordinator or by repartitioning
|
|
* task results and moving data directly between nodes.
|
|
*/
|
|
static TupleTableSlot *
|
|
NonPushableInsertSelectExecScanInternal(CustomScanState *node)
|
|
{
|
|
CitusScanState *scanState = (CitusScanState *) node;
|
|
|
|
if (!scanState->finishedRemoteScan)
|
|
{
|
|
EState *executorState = ScanStateGetExecutorState(scanState);
|
|
ParamListInfo paramListInfo = executorState->es_param_list_info;
|
|
DistributedPlan *distributedPlan = scanState->distributedPlan;
|
|
Query *insertSelectQuery = copyObject(distributedPlan->insertSelectQuery);
|
|
List *insertTargetList = insertSelectQuery->targetList;
|
|
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery);
|
|
RangeTblEntry *insertRte = ExtractResultRelationRTE(insertSelectQuery);
|
|
Oid targetRelationId = insertRte->relid;
|
|
char *intermediateResultIdPrefix = distributedPlan->intermediateResultIdPrefix;
|
|
bool hasReturning = distributedPlan->expectResults;
|
|
HTAB *shardStateHash = NULL;
|
|
|
|
Query *selectQuery = selectRte->subquery;
|
|
|
|
/*
|
|
* Cast types of insert target list and select projection list to
|
|
* match the column types of the target relation.
|
|
*/
|
|
selectQuery->targetList =
|
|
AddInsertSelectCasts(insertSelectQuery->targetList,
|
|
selectQuery->targetList,
|
|
targetRelationId);
|
|
|
|
/*
|
|
* Later we might need to call WrapTaskListForProjection(), which requires
|
|
* that select target list has unique names, otherwise the outer query
|
|
* cannot select columns unambiguously. So we relabel select columns to
|
|
* match target columns.
|
|
*/
|
|
RelableTargetEntryList(selectQuery->targetList, insertTargetList);
|
|
|
|
/*
|
|
* Make a copy of the query, since pg_plan_query may scribble on it and we
|
|
* want it to be replanned every time if it is stored in a prepared
|
|
* statement.
|
|
*/
|
|
selectQuery = copyObject(selectQuery);
|
|
|
|
/* plan the subquery, this may be another distributed query */
|
|
int cursorOptions = CURSOR_OPT_PARALLEL_OK;
|
|
PlannedStmt *selectPlan = pg_plan_query(selectQuery, cursorOptions,
|
|
paramListInfo);
|
|
|
|
/*
|
|
* If we are dealing with partitioned table, we also need to lock its
|
|
* partitions. Here we only lock targetRelation, we acquire necessary
|
|
* locks on selected tables during execution of those select queries.
|
|
*/
|
|
if (PartitionedTable(targetRelationId))
|
|
{
|
|
LockPartitionRelations(targetRelationId, RowExclusiveLock);
|
|
}
|
|
|
|
if (distributedPlan->insertSelectMethod == INSERT_SELECT_REPARTITION)
|
|
{
|
|
ereport(DEBUG1, (errmsg("performing repartitioned INSERT ... SELECT")));
|
|
|
|
DistributedPlan *distSelectPlan =
|
|
GetDistributedPlan((CustomScan *) selectPlan->planTree);
|
|
Job *distSelectJob = distSelectPlan->workerJob;
|
|
List *distSelectTaskList = distSelectJob->taskList;
|
|
bool randomAccess = true;
|
|
bool interTransactions = false;
|
|
bool binaryFormat =
|
|
CanUseBinaryCopyFormatForTargetList(selectQuery->targetList);
|
|
|
|
ExecuteSubPlans(distSelectPlan);
|
|
|
|
/*
|
|
* We have a separate directory for each transaction, so choosing
|
|
* the same result prefix won't cause filename conflicts. Results
|
|
* directory name also includes node id and database id, so we don't
|
|
* need to include them in the filename. We include job id here for
|
|
* the case "INSERT/SELECTs" are executed recursively.
|
|
*/
|
|
StringInfo distResultPrefixString = makeStringInfo();
|
|
appendStringInfo(distResultPrefixString,
|
|
"repartitioned_results_" UINT64_FORMAT,
|
|
distSelectJob->jobId);
|
|
char *distResultPrefix = distResultPrefixString->data;
|
|
|
|
CitusTableCacheEntry *targetRelation =
|
|
GetCitusTableCacheEntry(targetRelationId);
|
|
|
|
int partitionColumnIndex =
|
|
PartitionColumnIndex(insertTargetList, targetRelation->partitionColumn);
|
|
if (partitionColumnIndex == -1)
|
|
{
|
|
char *relationName = get_rel_name(targetRelationId);
|
|
Oid schemaOid = get_rel_namespace(targetRelationId);
|
|
char *schemaName = get_namespace_name(schemaOid);
|
|
|
|
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
|
|
errmsg(
|
|
"the partition column of table %s should have a value",
|
|
quote_qualified_identifier(schemaName,
|
|
relationName))));
|
|
}
|
|
|
|
TargetEntry *selectPartitionTE = list_nth(selectQuery->targetList,
|
|
partitionColumnIndex);
|
|
const char *partitionColumnName = selectPartitionTE->resname ?
|
|
selectPartitionTE->resname : "(none)";
|
|
|
|
ereport(DEBUG2, (errmsg(
|
|
"partitioning SELECT query by column index %d with name %s",
|
|
partitionColumnIndex, quote_literal_cstr(
|
|
partitionColumnName))));
|
|
|
|
/*
|
|
* ExpandWorkerTargetEntry() can add additional columns to the worker
|
|
* query. Modify the task queries to only select columns we need.
|
|
*/
|
|
int requiredColumnCount = list_length(insertTargetList);
|
|
List *jobTargetList = distSelectJob->jobQuery->targetList;
|
|
if (list_length(jobTargetList) > requiredColumnCount)
|
|
{
|
|
List *projectedTargetEntries = ListTake(jobTargetList,
|
|
requiredColumnCount);
|
|
WrapTaskListForProjection(distSelectTaskList, projectedTargetEntries);
|
|
}
|
|
|
|
List **redistributedResults = RedistributeTaskListResults(distResultPrefix,
|
|
distSelectTaskList,
|
|
partitionColumnIndex,
|
|
targetRelation,
|
|
binaryFormat);
|
|
|
|
/*
|
|
* At this point select query has been executed on workers and results
|
|
* have been fetched in such a way that they are colocated with corresponding
|
|
* target shard. Create and execute a list of tasks of form
|
|
* INSERT INTO ... SELECT * FROM read_intermediate_results(...);
|
|
*/
|
|
List *taskList = RedistributedInsertSelectTaskList(insertSelectQuery,
|
|
targetRelation,
|
|
redistributedResults,
|
|
binaryFormat);
|
|
|
|
scanState->tuplestorestate =
|
|
tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
|
|
TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState);
|
|
uint64 rowsInserted = ExecuteTaskListIntoTupleStore(ROW_MODIFY_COMMUTATIVE,
|
|
taskList, tupleDescriptor,
|
|
scanState->tuplestorestate,
|
|
hasReturning);
|
|
|
|
executorState->es_processed = rowsInserted;
|
|
}
|
|
else if (insertSelectQuery->onConflict || hasReturning)
|
|
{
|
|
ereport(DEBUG1, (errmsg(
|
|
"Collecting INSERT ... SELECT results on coordinator")));
|
|
|
|
/*
|
|
* If we also have a workerJob that means there is a second step
|
|
* to the INSERT...SELECT. This happens when there is a RETURNING
|
|
* or ON CONFLICT clause which is implemented as a separate
|
|
* distributed INSERT...SELECT from a set of intermediate results
|
|
* to the target relation.
|
|
*/
|
|
List *prunedTaskList = NIL;
|
|
|
|
shardStateHash = ExecutePlanIntoColocatedIntermediateResults(
|
|
targetRelationId,
|
|
insertTargetList,
|
|
selectPlan,
|
|
executorState,
|
|
intermediateResultIdPrefix);
|
|
|
|
/* generate tasks for the INSERT..SELECT phase */
|
|
List *taskList = TwoPhaseInsertSelectTaskList(targetRelationId,
|
|
insertSelectQuery,
|
|
intermediateResultIdPrefix);
|
|
|
|
/*
|
|
* We cannot actually execute INSERT...SELECT tasks that read from
|
|
* intermediate results that weren't created because no rows were
|
|
* written to them. Prune those tasks out by only including tasks
|
|
* on shards with connections.
|
|
*/
|
|
Task *task = NULL;
|
|
foreach_ptr(task, taskList)
|
|
{
|
|
uint64 shardId = task->anchorShardId;
|
|
bool shardModified = false;
|
|
|
|
hash_search(shardStateHash, &shardId, HASH_FIND, &shardModified);
|
|
if (shardModified)
|
|
{
|
|
prunedTaskList = lappend(prunedTaskList, task);
|
|
}
|
|
}
|
|
|
|
if (prunedTaskList != NIL)
|
|
{
|
|
bool randomAccess = true;
|
|
bool interTransactions = false;
|
|
|
|
Assert(scanState->tuplestorestate == NULL);
|
|
scanState->tuplestorestate =
|
|
tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
|
|
|
|
TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState);
|
|
ExecuteTaskListIntoTupleStore(ROW_MODIFY_COMMUTATIVE, prunedTaskList,
|
|
tupleDescriptor, scanState->tuplestorestate,
|
|
hasReturning);
|
|
|
|
if (SortReturning && hasReturning)
|
|
{
|
|
SortTupleStore(scanState);
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
ereport(DEBUG1, (errmsg(
|
|
"Collecting INSERT ... SELECT results on coordinator")));
|
|
|
|
ExecutePlanIntoRelation(targetRelationId, insertTargetList, selectPlan,
|
|
executorState);
|
|
}
|
|
|
|
scanState->finishedRemoteScan = true;
|
|
}
|
|
|
|
TupleTableSlot *resultSlot = ReturnTupleFromTuplestore(scanState);
|
|
|
|
return resultSlot;
|
|
}
|
|
|
|
|
|
/*
|
|
* BuildSelectForInsertSelect extracts the SELECT part from an INSERT...SELECT query.
|
|
* If the INSERT...SELECT has CTEs then these are added to the resulting SELECT instead.
|
|
*/
|
|
Query *
|
|
BuildSelectForInsertSelect(Query *insertSelectQuery)
|
|
{
|
|
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery);
|
|
Query *selectQuery = selectRte->subquery;
|
|
|
|
/*
|
|
* Wrap the SELECT as a subquery if the INSERT...SELECT has CTEs or the SELECT
|
|
* has top-level set operations.
|
|
*
|
|
* We could simply wrap all queries, but that might create a subquery that is
|
|
* not supported by the logical planner. Since the logical planner also does
|
|
* not support CTEs and top-level set operations, we can wrap queries containing
|
|
* those without breaking anything.
|
|
*/
|
|
if (list_length(insertSelectQuery->cteList) > 0)
|
|
{
|
|
selectQuery = WrapSubquery(selectRte->subquery);
|
|
|
|
/* copy CTEs from the INSERT ... SELECT statement into outer SELECT */
|
|
selectQuery->cteList = copyObject(insertSelectQuery->cteList);
|
|
selectQuery->hasModifyingCTE = insertSelectQuery->hasModifyingCTE;
|
|
}
|
|
else if (selectQuery->setOperations != NULL)
|
|
{
|
|
/* top-level set operations confuse the ReorderInsertSelectTargetLists logic */
|
|
selectQuery = WrapSubquery(selectRte->subquery);
|
|
}
|
|
|
|
return selectQuery;
|
|
}
|
|
|
|
|
|
/*
|
|
* WrapSubquery wraps the given query as a subquery in a newly constructed
|
|
* "SELECT * FROM (...subquery...) citus_insert_select_subquery" query.
|
|
*/
|
|
static Query *
|
|
WrapSubquery(Query *subquery)
|
|
{
|
|
ParseState *pstate = make_parsestate(NULL);
|
|
List *newTargetList = NIL;
|
|
|
|
Query *outerQuery = makeNode(Query);
|
|
outerQuery->commandType = CMD_SELECT;
|
|
|
|
/* create range table entries */
|
|
Alias *selectAlias = makeAlias("citus_insert_select_subquery", NIL);
|
|
RangeTblEntry *newRangeTableEntry = addRangeTableEntryForSubquery(pstate, subquery,
|
|
selectAlias, false,
|
|
true);
|
|
outerQuery->rtable = list_make1(newRangeTableEntry);
|
|
|
|
/* set the FROM expression to the subquery */
|
|
RangeTblRef *newRangeTableRef = makeNode(RangeTblRef);
|
|
newRangeTableRef->rtindex = 1;
|
|
outerQuery->jointree = makeFromExpr(list_make1(newRangeTableRef), NULL);
|
|
|
|
/* create a target list that matches the SELECT */
|
|
TargetEntry *selectTargetEntry = NULL;
|
|
foreach_ptr(selectTargetEntry, subquery->targetList)
|
|
{
|
|
/* exactly 1 entry in FROM */
|
|
int indexInRangeTable = 1;
|
|
|
|
if (selectTargetEntry->resjunk)
|
|
{
|
|
continue;
|
|
}
|
|
|
|
Var *newSelectVar = makeVar(indexInRangeTable, selectTargetEntry->resno,
|
|
exprType((Node *) selectTargetEntry->expr),
|
|
exprTypmod((Node *) selectTargetEntry->expr),
|
|
exprCollation((Node *) selectTargetEntry->expr), 0);
|
|
|
|
TargetEntry *newSelectTargetEntry = makeTargetEntry((Expr *) newSelectVar,
|
|
selectTargetEntry->resno,
|
|
selectTargetEntry->resname,
|
|
selectTargetEntry->resjunk);
|
|
|
|
newTargetList = lappend(newTargetList, newSelectTargetEntry);
|
|
}
|
|
|
|
outerQuery->targetList = newTargetList;
|
|
|
|
return outerQuery;
|
|
}
|
|
|
|
|
|
/*
|
|
* TwoPhaseInsertSelectTaskList generates a list of tasks for a query that
|
|
* inserts into a target relation and selects from a set of co-located
|
|
* intermediate results.
|
|
*/
|
|
static List *
|
|
TwoPhaseInsertSelectTaskList(Oid targetRelationId, Query *insertSelectQuery,
|
|
char *resultIdPrefix)
|
|
{
|
|
List *taskList = NIL;
|
|
|
|
/*
|
|
* Make a copy of the INSERT ... SELECT. We'll repeatedly replace the
|
|
* subquery of insertResultQuery for different intermediate results and
|
|
* then deparse it.
|
|
*/
|
|
Query *insertResultQuery = copyObject(insertSelectQuery);
|
|
RangeTblEntry *insertRte = ExtractResultRelationRTE(insertResultQuery);
|
|
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertResultQuery);
|
|
|
|
CitusTableCacheEntry *targetCacheEntry = GetCitusTableCacheEntry(targetRelationId);
|
|
int shardCount = targetCacheEntry->shardIntervalArrayLength;
|
|
uint32 taskIdIndex = 1;
|
|
uint64 jobId = INVALID_JOB_ID;
|
|
|
|
for (int shardOffset = 0; shardOffset < shardCount; shardOffset++)
|
|
{
|
|
ShardInterval *targetShardInterval =
|
|
targetCacheEntry->sortedShardIntervalArray[shardOffset];
|
|
uint64 shardId = targetShardInterval->shardId;
|
|
List *columnAliasList = NIL;
|
|
StringInfo queryString = makeStringInfo();
|
|
StringInfo resultId = makeStringInfo();
|
|
|
|
/* during COPY, the shard ID is appended to the result name */
|
|
appendStringInfo(resultId, "%s_" UINT64_FORMAT, resultIdPrefix, shardId);
|
|
|
|
/* generate the query on the intermediate result */
|
|
Query *resultSelectQuery = BuildSubPlanResultQuery(insertSelectQuery->targetList,
|
|
columnAliasList,
|
|
resultId->data);
|
|
|
|
/* put the intermediate result query in the INSERT..SELECT */
|
|
selectRte->subquery = resultSelectQuery;
|
|
|
|
/* setting an alias simplifies deparsing of RETURNING */
|
|
if (insertRte->alias == NULL)
|
|
{
|
|
Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL);
|
|
insertRte->alias = alias;
|
|
}
|
|
|
|
/*
|
|
* Generate a query string for the query that inserts into a shard and reads
|
|
* from an intermediate result.
|
|
*
|
|
* Since CTEs have already been converted to intermediate results, they need
|
|
* to removed from the query. Otherwise, worker queries include both
|
|
* intermediate results and CTEs in the query.
|
|
*/
|
|
insertResultQuery->cteList = NIL;
|
|
deparse_shard_query(insertResultQuery, targetRelationId, shardId, queryString);
|
|
ereport(DEBUG2, (errmsg("distributed statement: %s", queryString->data)));
|
|
|
|
LockShardDistributionMetadata(shardId, ShareLock);
|
|
List *insertShardPlacementList = ActiveShardPlacementList(shardId);
|
|
|
|
RelationShard *relationShard = CitusMakeNode(RelationShard);
|
|
relationShard->relationId = targetShardInterval->relationId;
|
|
relationShard->shardId = targetShardInterval->shardId;
|
|
|
|
Task *modifyTask = CreateBasicTask(jobId, taskIdIndex, MODIFY_TASK,
|
|
queryString->data);
|
|
modifyTask->dependentTaskList = NIL;
|
|
modifyTask->anchorShardId = shardId;
|
|
modifyTask->taskPlacementList = insertShardPlacementList;
|
|
modifyTask->relationShardList = list_make1(relationShard);
|
|
modifyTask->replicationModel = targetCacheEntry->replicationModel;
|
|
|
|
taskList = lappend(taskList, modifyTask);
|
|
|
|
taskIdIndex++;
|
|
}
|
|
|
|
return taskList;
|
|
}
|
|
|
|
|
|
/*
|
|
* ExecutePlanIntoColocatedIntermediateResults executes the given PlannedStmt
|
|
* and inserts tuples into a set of intermediate results that are colocated with
|
|
* the target table for further processing of ON CONFLICT or RETURNING. It also
|
|
* returns the hash of shard states that were used to insert tuplesinto the target
|
|
* relation.
|
|
*/
|
|
static HTAB *
|
|
ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId,
|
|
List *insertTargetList,
|
|
PlannedStmt *selectPlan,
|
|
EState *executorState,
|
|
char *intermediateResultIdPrefix)
|
|
{
|
|
ParamListInfo paramListInfo = executorState->es_param_list_info;
|
|
bool stopOnFailure = false;
|
|
|
|
char partitionMethod = PartitionMethod(targetRelationId);
|
|
if (partitionMethod == DISTRIBUTE_BY_NONE)
|
|
{
|
|
stopOnFailure = true;
|
|
}
|
|
|
|
/* Get column name list and partition column index for the target table */
|
|
List *columnNameList = BuildColumnNameListFromTargetList(targetRelationId,
|
|
insertTargetList);
|
|
int partitionColumnIndex = PartitionColumnIndexFromColumnList(targetRelationId,
|
|
columnNameList);
|
|
|
|
/* set up a DestReceiver that copies into the intermediate table */
|
|
CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(targetRelationId,
|
|
columnNameList,
|
|
partitionColumnIndex,
|
|
executorState,
|
|
stopOnFailure,
|
|
intermediateResultIdPrefix);
|
|
|
|
ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest);
|
|
|
|
executorState->es_processed = copyDest->tuplesSent;
|
|
|
|
XactModificationLevel = XACT_MODIFICATION_DATA;
|
|
|
|
return copyDest->shardStateHash;
|
|
}
|
|
|
|
|
|
/*
|
|
* ExecutePlanIntoRelation executes the given plan and inserts the
|
|
* results into the target relation, which is assumed to be a distributed
|
|
* table.
|
|
*/
|
|
static void
|
|
ExecutePlanIntoRelation(Oid targetRelationId, List *insertTargetList,
|
|
PlannedStmt *selectPlan, EState *executorState)
|
|
{
|
|
ParamListInfo paramListInfo = executorState->es_param_list_info;
|
|
bool stopOnFailure = false;
|
|
|
|
char partitionMethod = PartitionMethod(targetRelationId);
|
|
if (partitionMethod == DISTRIBUTE_BY_NONE)
|
|
{
|
|
stopOnFailure = true;
|
|
}
|
|
|
|
/* Get column name list and partition column index for the target table */
|
|
List *columnNameList = BuildColumnNameListFromTargetList(targetRelationId,
|
|
insertTargetList);
|
|
int partitionColumnIndex = PartitionColumnIndexFromColumnList(targetRelationId,
|
|
columnNameList);
|
|
|
|
/* set up a DestReceiver that copies into the distributed table */
|
|
CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(targetRelationId,
|
|
columnNameList,
|
|
partitionColumnIndex,
|
|
executorState,
|
|
stopOnFailure, NULL);
|
|
|
|
ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest);
|
|
|
|
executorState->es_processed = copyDest->tuplesSent;
|
|
|
|
XactModificationLevel = XACT_MODIFICATION_DATA;
|
|
}
|
|
|
|
|
|
/*
|
|
* BuildColumnNameListForCopyStatement build the column name list given the insert
|
|
* target list.
|
|
*/
|
|
static List *
|
|
BuildColumnNameListFromTargetList(Oid targetRelationId, List *insertTargetList)
|
|
{
|
|
List *columnNameList = NIL;
|
|
|
|
/* build the list of column names for the COPY statement */
|
|
TargetEntry *insertTargetEntry = NULL;
|
|
foreach_ptr(insertTargetEntry, insertTargetList)
|
|
{
|
|
columnNameList = lappend(columnNameList, insertTargetEntry->resname);
|
|
}
|
|
|
|
return columnNameList;
|
|
}
|
|
|
|
|
|
/*
|
|
* PartitionColumnIndexFromColumnList returns the index of partition column from given
|
|
* column name list and relation ID. If given list doesn't contain the partition
|
|
* column, it returns -1.
|
|
*/
|
|
static int
|
|
PartitionColumnIndexFromColumnList(Oid relationId, List *columnNameList)
|
|
{
|
|
Var *partitionColumn = PartitionColumn(relationId, 0);
|
|
int partitionColumnIndex = 0;
|
|
|
|
const char *columnName = NULL;
|
|
foreach_ptr(columnName, columnNameList)
|
|
{
|
|
AttrNumber attrNumber = get_attnum(relationId, columnName);
|
|
|
|
/* check whether this is the partition column */
|
|
if (partitionColumn != NULL && attrNumber == partitionColumn->varattno)
|
|
{
|
|
return partitionColumnIndex;
|
|
}
|
|
|
|
partitionColumnIndex++;
|
|
}
|
|
|
|
return -1;
|
|
}
|
|
|
|
|
|
/* ExecutingInsertSelect returns true if we are executing an INSERT ...SELECT query */
|
|
bool
|
|
ExecutingInsertSelect(void)
|
|
{
|
|
return insertSelectExecutorLevel > 0;
|
|
}
|
|
|
|
|
|
/*
|
|
* AddInsertSelectCasts makes sure that the types in columns in the given
|
|
* target lists have the same type as the columns of the given relation.
|
|
* It might add casts to ensure that.
|
|
*
|
|
* It returns the updated selectTargetList.
|
|
*/
|
|
static List *
|
|
AddInsertSelectCasts(List *insertTargetList, List *selectTargetList,
|
|
Oid targetRelationId)
|
|
{
|
|
ListCell *insertEntryCell = NULL;
|
|
ListCell *selectEntryCell = NULL;
|
|
List *projectedEntries = NIL;
|
|
List *nonProjectedEntries = NIL;
|
|
|
|
/*
|
|
* ReorderInsertSelectTargetLists() makes sure that first few columns of
|
|
* the SELECT query match the insert targets. It might contain additional
|
|
* items for GROUP BY, etc.
|
|
*/
|
|
Assert(list_length(insertTargetList) <= list_length(selectTargetList));
|
|
|
|
Relation distributedRelation = heap_open(targetRelationId, RowExclusiveLock);
|
|
TupleDesc destTupleDescriptor = RelationGetDescr(distributedRelation);
|
|
|
|
int targetEntryIndex = 0;
|
|
forboth(insertEntryCell, insertTargetList, selectEntryCell, selectTargetList)
|
|
{
|
|
TargetEntry *insertEntry = (TargetEntry *) lfirst(insertEntryCell);
|
|
TargetEntry *selectEntry = (TargetEntry *) lfirst(selectEntryCell);
|
|
Var *insertColumn = (Var *) insertEntry->expr;
|
|
Form_pg_attribute attr = TupleDescAttr(destTupleDescriptor,
|
|
insertEntry->resno - 1);
|
|
|
|
Oid sourceType = insertColumn->vartype;
|
|
Oid targetType = attr->atttypid;
|
|
if (sourceType != targetType)
|
|
{
|
|
insertEntry->expr = CastExpr((Expr *) insertColumn, sourceType, targetType,
|
|
attr->attcollation, attr->atttypmod);
|
|
|
|
/*
|
|
* We cannot modify the selectEntry in-place, because ORDER BY or
|
|
* GROUP BY clauses might be pointing to it with comparison types
|
|
* of the source type. So instead we keep the original one as a
|
|
* non-projected entry, so GROUP BY and ORDER BY are happy, and
|
|
* create a duplicated projected entry with the coerced expression.
|
|
*/
|
|
TargetEntry *coercedEntry = copyObject(selectEntry);
|
|
coercedEntry->expr = CastExpr((Expr *) selectEntry->expr, sourceType,
|
|
targetType, attr->attcollation,
|
|
attr->atttypmod);
|
|
coercedEntry->ressortgroupref = 0;
|
|
|
|
/*
|
|
* The only requirement is that users don't use this name in ORDER BY
|
|
* or GROUP BY, and it should be unique across the same query.
|
|
*/
|
|
StringInfo resnameString = makeStringInfo();
|
|
appendStringInfo(resnameString, "auto_coerced_by_citus_%d", targetEntryIndex);
|
|
coercedEntry->resname = resnameString->data;
|
|
|
|
projectedEntries = lappend(projectedEntries, coercedEntry);
|
|
|
|
if (selectEntry->ressortgroupref != 0)
|
|
{
|
|
selectEntry->resjunk = true;
|
|
|
|
/*
|
|
* This entry might still end up in the SELECT output list, so
|
|
* rename it to avoid ambiguity.
|
|
*
|
|
* See https://github.com/citusdata/citus/pull/3470.
|
|
*/
|
|
resnameString = makeStringInfo();
|
|
appendStringInfo(resnameString, "discarded_target_item_%d",
|
|
targetEntryIndex);
|
|
selectEntry->resname = resnameString->data;
|
|
|
|
nonProjectedEntries = lappend(nonProjectedEntries, selectEntry);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
projectedEntries = lappend(projectedEntries, selectEntry);
|
|
}
|
|
|
|
targetEntryIndex++;
|
|
}
|
|
|
|
for (int entryIndex = list_length(insertTargetList);
|
|
entryIndex < list_length(selectTargetList);
|
|
entryIndex++)
|
|
{
|
|
nonProjectedEntries = lappend(nonProjectedEntries, list_nth(selectTargetList,
|
|
entryIndex));
|
|
}
|
|
|
|
/* selectEntry->resno must be the ordinal number of the entry */
|
|
selectTargetList = list_concat(projectedEntries, nonProjectedEntries);
|
|
int entryResNo = 1;
|
|
TargetEntry *selectTargetEntry = NULL;
|
|
foreach_ptr(selectTargetEntry, selectTargetList)
|
|
{
|
|
selectTargetEntry->resno = entryResNo++;
|
|
}
|
|
|
|
heap_close(distributedRelation, NoLock);
|
|
|
|
return selectTargetList;
|
|
}
|
|
|
|
|
|
/*
|
|
* CastExpr returns an expression which casts the given expr from sourceType to
|
|
* the given targetType.
|
|
*/
|
|
static Expr *
|
|
CastExpr(Expr *expr, Oid sourceType, Oid targetType, Oid targetCollation,
|
|
int targetTypeMod)
|
|
{
|
|
Oid coercionFuncId = InvalidOid;
|
|
CoercionPathType coercionType = find_coercion_pathway(targetType, sourceType,
|
|
COERCION_EXPLICIT,
|
|
&coercionFuncId);
|
|
|
|
if (coercionType == COERCION_PATH_FUNC)
|
|
{
|
|
FuncExpr *coerceExpr = makeNode(FuncExpr);
|
|
coerceExpr->funcid = coercionFuncId;
|
|
coerceExpr->args = list_make1(copyObject(expr));
|
|
coerceExpr->funccollid = targetCollation;
|
|
coerceExpr->funcresulttype = targetType;
|
|
|
|
return (Expr *) coerceExpr;
|
|
}
|
|
else if (coercionType == COERCION_PATH_RELABELTYPE)
|
|
{
|
|
RelabelType *coerceExpr = makeNode(RelabelType);
|
|
coerceExpr->arg = copyObject(expr);
|
|
coerceExpr->resulttype = targetType;
|
|
coerceExpr->resulttypmod = targetTypeMod;
|
|
coerceExpr->resultcollid = targetCollation;
|
|
coerceExpr->relabelformat = COERCE_IMPLICIT_CAST;
|
|
coerceExpr->location = -1;
|
|
|
|
return (Expr *) coerceExpr;
|
|
}
|
|
else if (coercionType == COERCION_PATH_ARRAYCOERCE)
|
|
{
|
|
Oid sourceBaseType = get_base_element_type(sourceType);
|
|
Oid targetBaseType = get_base_element_type(targetType);
|
|
|
|
CaseTestExpr *elemExpr = makeNode(CaseTestExpr);
|
|
elemExpr->collation = targetCollation;
|
|
elemExpr->typeId = sourceBaseType;
|
|
elemExpr->typeMod = -1;
|
|
|
|
Expr *elemCastExpr = CastExpr((Expr *) elemExpr, sourceBaseType,
|
|
targetBaseType, targetCollation,
|
|
targetTypeMod);
|
|
|
|
ArrayCoerceExpr *coerceExpr = makeNode(ArrayCoerceExpr);
|
|
coerceExpr->arg = copyObject(expr);
|
|
coerceExpr->elemexpr = elemCastExpr;
|
|
coerceExpr->resultcollid = targetCollation;
|
|
coerceExpr->resulttype = targetType;
|
|
coerceExpr->resulttypmod = targetTypeMod;
|
|
coerceExpr->location = -1;
|
|
coerceExpr->coerceformat = COERCE_IMPLICIT_CAST;
|
|
|
|
return (Expr *) coerceExpr;
|
|
}
|
|
else if (coercionType == COERCION_PATH_COERCEVIAIO)
|
|
{
|
|
CoerceViaIO *coerceExpr = makeNode(CoerceViaIO);
|
|
coerceExpr->arg = (Expr *) copyObject(expr);
|
|
coerceExpr->resulttype = targetType;
|
|
coerceExpr->resultcollid = targetCollation;
|
|
coerceExpr->coerceformat = COERCE_IMPLICIT_CAST;
|
|
coerceExpr->location = -1;
|
|
|
|
return (Expr *) coerceExpr;
|
|
}
|
|
else
|
|
{
|
|
ereport(ERROR, (errmsg("could not find a conversion path from type %d to %d",
|
|
sourceType, targetType)));
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
* IsSupportedRedistributionTarget determines whether re-partitioning into the
|
|
* given target relation is supported.
|
|
*/
|
|
bool
|
|
IsSupportedRedistributionTarget(Oid targetRelationId)
|
|
{
|
|
CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(targetRelationId);
|
|
|
|
/* only range and hash-distributed tables are currently supported */
|
|
if (tableEntry->partitionMethod != DISTRIBUTE_BY_HASH &&
|
|
tableEntry->partitionMethod != DISTRIBUTE_BY_RANGE)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
|
|
/*
|
|
* RedistributedInsertSelectTaskList returns a task list to insert given
|
|
* redistributedResults into the given target relation.
|
|
* redistributedResults[shardIndex] is list of cstrings each of which is
|
|
* a result name which should be inserted into
|
|
* targetRelation->sortedShardIntervalArray[shardIndex].
|
|
*/
|
|
static List *
|
|
RedistributedInsertSelectTaskList(Query *insertSelectQuery,
|
|
CitusTableCacheEntry *targetRelation,
|
|
List **redistributedResults,
|
|
bool useBinaryFormat)
|
|
{
|
|
List *taskList = NIL;
|
|
|
|
/*
|
|
* Make a copy of the INSERT ... SELECT. We'll repeatedly replace the
|
|
* subquery of insertResultQuery for different intermediate results and
|
|
* then deparse it.
|
|
*/
|
|
Query *insertResultQuery = copyObject(insertSelectQuery);
|
|
RangeTblEntry *insertRte = ExtractResultRelationRTE(insertResultQuery);
|
|
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertResultQuery);
|
|
List *selectTargetList = selectRte->subquery->targetList;
|
|
Oid targetRelationId = targetRelation->relationId;
|
|
|
|
int shardCount = targetRelation->shardIntervalArrayLength;
|
|
int shardOffset = 0;
|
|
uint32 taskIdIndex = 1;
|
|
uint64 jobId = INVALID_JOB_ID;
|
|
|
|
for (shardOffset = 0; shardOffset < shardCount; shardOffset++)
|
|
{
|
|
ShardInterval *targetShardInterval =
|
|
targetRelation->sortedShardIntervalArray[shardOffset];
|
|
List *resultIdList = redistributedResults[targetShardInterval->shardIndex];
|
|
uint64 shardId = targetShardInterval->shardId;
|
|
StringInfo queryString = makeStringInfo();
|
|
|
|
/* skip empty tasks */
|
|
if (resultIdList == NIL)
|
|
{
|
|
continue;
|
|
}
|
|
|
|
/* sort result ids for consistent test output */
|
|
List *sortedResultIds = SortList(resultIdList, pg_qsort_strcmp);
|
|
|
|
/* generate the query on the intermediate result */
|
|
Query *fragmentSetQuery = BuildReadIntermediateResultsArrayQuery(selectTargetList,
|
|
NIL,
|
|
sortedResultIds,
|
|
useBinaryFormat);
|
|
|
|
/* put the intermediate result query in the INSERT..SELECT */
|
|
selectRte->subquery = fragmentSetQuery;
|
|
|
|
/* setting an alias simplifies deparsing of RETURNING */
|
|
if (insertRte->alias == NULL)
|
|
{
|
|
Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL);
|
|
insertRte->alias = alias;
|
|
}
|
|
|
|
/*
|
|
* Generate a query string for the query that inserts into a shard and reads
|
|
* from an intermediate result.
|
|
*
|
|
* Since CTEs have already been converted to intermediate results, they need
|
|
* to removed from the query. Otherwise, worker queries include both
|
|
* intermediate results and CTEs in the query.
|
|
*/
|
|
insertResultQuery->cteList = NIL;
|
|
deparse_shard_query(insertResultQuery, targetRelationId, shardId, queryString);
|
|
ereport(DEBUG2, (errmsg("distributed statement: %s", queryString->data)));
|
|
|
|
LockShardDistributionMetadata(shardId, ShareLock);
|
|
List *insertShardPlacementList = ActiveShardPlacementList(shardId);
|
|
|
|
RelationShard *relationShard = CitusMakeNode(RelationShard);
|
|
relationShard->relationId = targetShardInterval->relationId;
|
|
relationShard->shardId = targetShardInterval->shardId;
|
|
|
|
Task *modifyTask = CreateBasicTask(jobId, taskIdIndex, MODIFY_TASK,
|
|
queryString->data);
|
|
modifyTask->dependentTaskList = NIL;
|
|
modifyTask->anchorShardId = shardId;
|
|
modifyTask->taskPlacementList = insertShardPlacementList;
|
|
modifyTask->relationShardList = list_make1(relationShard);
|
|
modifyTask->replicationModel = targetRelation->replicationModel;
|
|
|
|
taskList = lappend(taskList, modifyTask);
|
|
|
|
taskIdIndex++;
|
|
}
|
|
|
|
return taskList;
|
|
}
|
|
|
|
|
|
/*
|
|
* PartitionColumnIndex finds the index of given partition column in the
|
|
* given target list.
|
|
*/
|
|
static int
|
|
PartitionColumnIndex(List *insertTargetList, Var *partitionColumn)
|
|
{
|
|
TargetEntry *insertTargetEntry = NULL;
|
|
int targetEntryIndex = 0;
|
|
foreach_ptr(insertTargetEntry, insertTargetList)
|
|
{
|
|
if (insertTargetEntry->resno == partitionColumn->varattno)
|
|
{
|
|
return targetEntryIndex;
|
|
}
|
|
|
|
targetEntryIndex++;
|
|
}
|
|
|
|
return -1;
|
|
}
|
|
|
|
|
|
/*
|
|
* IsRedistributablePlan returns true if the given plan is a redistrituable plan.
|
|
*/
|
|
bool
|
|
IsRedistributablePlan(Plan *selectPlan)
|
|
{
|
|
if (!EnableRepartitionedInsertSelect)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
/* don't redistribute if query is not distributed or requires merge on coordinator */
|
|
if (!IsCitusCustomScan(selectPlan))
|
|
{
|
|
return false;
|
|
}
|
|
|
|
DistributedPlan *distSelectPlan =
|
|
GetDistributedPlan((CustomScan *) selectPlan);
|
|
Job *distSelectJob = distSelectPlan->workerJob;
|
|
List *distSelectTaskList = distSelectJob->taskList;
|
|
|
|
/*
|
|
* Don't use redistribution if only one task. This is to keep the existing
|
|
* behaviour for CTEs that the last step is a read_intermediate_result()
|
|
* call. It doesn't hurt much in other cases too.
|
|
*/
|
|
if (list_length(distSelectTaskList) <= 1)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
/* don't use redistribution for repartition joins for now */
|
|
if (distSelectJob->dependentJobList != NIL)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
if (distSelectPlan->combineQuery != NULL)
|
|
{
|
|
Query *combineQuery = (Query *) distSelectPlan->combineQuery;
|
|
|
|
if (contain_nextval_expression_walker((Node *) combineQuery->targetList, NULL))
|
|
{
|
|
/* nextval needs to be evaluated on the coordinator */
|
|
return false;
|
|
}
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
|
|
/*
|
|
* WrapTaskListForProjection wraps task query string to only select given
|
|
* projected columns. It modifies the taskList.
|
|
*/
|
|
static void
|
|
WrapTaskListForProjection(List *taskList, List *projectedTargetEntries)
|
|
{
|
|
StringInfo projectedColumnsString = makeStringInfo();
|
|
int entryIndex = 0;
|
|
TargetEntry *targetEntry = NULL;
|
|
foreach_ptr(targetEntry, projectedTargetEntries)
|
|
{
|
|
if (entryIndex != 0)
|
|
{
|
|
appendStringInfoChar(projectedColumnsString, ',');
|
|
}
|
|
|
|
char *columnName = targetEntry->resname;
|
|
Assert(columnName != NULL);
|
|
appendStringInfoString(projectedColumnsString, quote_identifier(columnName));
|
|
|
|
entryIndex++;
|
|
}
|
|
|
|
Task *task = NULL;
|
|
foreach_ptr(task, taskList)
|
|
{
|
|
StringInfo wrappedQuery = makeStringInfo();
|
|
appendStringInfo(wrappedQuery, "SELECT %s FROM (%s) subquery",
|
|
projectedColumnsString->data,
|
|
TaskQueryStringForAllPlacements(task));
|
|
SetTaskQueryString(task, wrappedQuery->data);
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
* RelableTargetEntryList relabels select target list to have matching names with
|
|
* insert target list.
|
|
*/
|
|
static void
|
|
RelableTargetEntryList(List *selectTargetList, List *insertTargetList)
|
|
{
|
|
ListCell *selectTargetCell = NULL;
|
|
ListCell *insertTargetCell = NULL;
|
|
|
|
forboth(selectTargetCell, selectTargetList, insertTargetCell, insertTargetList)
|
|
{
|
|
TargetEntry *selectTargetEntry = lfirst(selectTargetCell);
|
|
TargetEntry *insertTargetEntry = lfirst(insertTargetCell);
|
|
|
|
selectTargetEntry->resname = insertTargetEntry->resname;
|
|
}
|
|
}
|