Support INSERT...SELECT with ON CONFLICT or RETURNING via coordinator

Before this commit, Citus supported INSERT...SELECT queries with
ON CONFLICT or RETURNING clauses only for pushdownable ones, since
queries supported via coordinator were utilizing COPY infrastructure
of PG to send selected tuples to the target worker nodes.

After this PR, INSERT...SELECT queries with ON CONFLICT or RETURNING
clauses will be performed in two phases via coordinator. In the first
phase selected tuples will be saved to the intermediate table which
is colocated with target table of the INSERT...SELECT query. Note that,
a utility function to save results to the colocated intermediate result
also implemented as a part of this commit. In the second phase, INSERT..
SELECT query is directly run on the worker node using the intermediate
table as the source table.
pull/2305/head
Marco Slot 2018-07-13 12:55:46 +02:00 committed by velioglu
parent a9c299473b
commit 8893cc141d
27 changed files with 1814 additions and 136 deletions

View File

@ -1296,7 +1296,8 @@ CopyLocalDataIntoShards(Oid distributedRelationId)
(DestReceiver *) CreateCitusCopyDestReceiver(distributedRelationId,
columnNameList,
partitionColumnIndex,
estate, stopOnFailure);
estate, stopOnFailure,
NULL);
/* initialise state for writing to shards, we'll open connections on demand */
copyDest->rStartup(copyDest, 0, tupleDescriptor);

View File

@ -141,7 +141,6 @@ static FmgrInfo * TypeOutputFunctions(uint32 columnCount, Oid *typeIdArray,
static Datum CoerceColumnValue(Datum inputValue, CopyCoercionData *coercionPath);
static void CreateLocalTable(RangeVar *relation, char *nodeName, int32 nodePort);
static List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist);
static bool IsCopyResultStmt(CopyStmt *copyStatement);
static bool CopyStatementHasFormat(CopyStmt *copyStatement, char *formatName);
static bool IsCopyFromWorker(CopyStmt *copyStatement);
static NodeAddress * MasterNodeAddress(CopyStmt *copyStatement);
@ -389,7 +388,7 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
/* set up the destination for the COPY */
copyDest = CreateCitusCopyDestReceiver(tableId, columnNameList, partitionColumnIndex,
executorState, stopOnFailure);
executorState, stopOnFailure, NULL);
dest = (DestReceiver *) copyDest;
dest->rStartup(dest, 0, tupleDescriptor);
@ -1150,7 +1149,11 @@ ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId, bool useBinaryCop
appendStringInfo(command, "FROM STDIN WITH ");
if (useBinaryCopyFormat)
if (IsCopyResultStmt(copyStatement))
{
appendStringInfoString(command, "(FORMAT RESULT)");
}
else if (useBinaryCopyFormat)
{
appendStringInfoString(command, "(FORMAT BINARY)");
}
@ -2049,10 +2052,16 @@ CopyFlushOutput(CopyOutState cstate, char *start, char *pointer)
* The caller should provide the list of column names to use in the
* remote COPY statement, and the partition column index in the tuple
* descriptor (*not* the column name list).
*
* If intermediateResultIdPrefix is not NULL, the COPY will go into a set
* of intermediate results that are co-located with the actual table.
* The names of the intermediate results with be of the form:
* intermediateResultIdPrefix_<shardid>
*/
CitusCopyDestReceiver *
CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColumnIndex,
EState *executorState, bool stopOnFailure)
EState *executorState, bool stopOnFailure,
char *intermediateResultIdPrefix)
{
CitusCopyDestReceiver *copyDest = NULL;
@ -2071,6 +2080,7 @@ CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColu
copyDest->partitionColumnIndex = partitionColumnIndex;
copyDest->executorState = executorState;
copyDest->stopOnFailure = stopOnFailure;
copyDest->intermediateResultIdPrefix = intermediateResultIdPrefix;
copyDest->memoryContext = CurrentMemoryContext;
return copyDest;
@ -2215,13 +2225,31 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
/* define the template for the COPY statement that is sent to workers */
copyStatement = makeNode(CopyStmt);
copyStatement->relation = makeRangeVar(schemaName, relationName, -1);
if (copyDest->intermediateResultIdPrefix != NULL)
{
DefElem *formatResultOption = NULL;
copyStatement->relation = makeRangeVar(NULL, copyDest->intermediateResultIdPrefix,
-1);
#if (PG_VERSION_NUM >= 100000)
formatResultOption = makeDefElem("format", (Node *) makeString("result"), -1);
#else
formatResultOption = makeDefElem("format", (Node *) makeString("result"));
#endif
copyStatement->options = list_make1(formatResultOption);
}
else
{
copyStatement->relation = makeRangeVar(schemaName, relationName, -1);
copyStatement->options = NIL;
}
copyStatement->query = NULL;
copyStatement->attlist = quotedColumnNameList;
copyStatement->is_from = true;
copyStatement->is_program = false;
copyStatement->filename = NULL;
copyStatement->options = NIL;
copyDest->copyStatement = copyStatement;
copyDest->shardConnectionHash = CreateShardConnectionHash(TopTransactionContext);
@ -2443,7 +2471,7 @@ CitusCopyDestReceiverDestroy(DestReceiver *destReceiver)
* COPY "resultkey" FROM STDIN WITH (format result) statement, which is used
* to copy query results from the coordinator into workers.
*/
static bool
bool
IsCopyResultStmt(CopyStmt *copyStatement)
{
return CopyStatementHasFormat(copyStatement, "result");

View File

@ -16,6 +16,7 @@
#include "distributed/multi_executor.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_router_executor.h"
#include "distributed/distributed_planner.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/resource_lock.h"
@ -37,6 +38,15 @@
static void ExecuteSelectIntoRelation(Oid targetRelationId, List *insertTargetList,
Query *selectQuery, EState *executorState);
static HTAB * ExecuteSelectIntoColocatedIntermediateResults(Oid targetRelationId,
List *insertTargetList,
Query *selectQuery,
EState *executorState,
char *
intermediateResultIdPrefix);
static List * BuildColumnNameListFromTargetList(Oid targetRelationId,
List *insertTargetList);
static int PartitionColumnIndexFromColumnList(Oid relationId, List *columnNameList);
/*
@ -58,6 +68,8 @@ CoordinatorInsertSelectExecScan(CustomScanState *node)
Query *selectQuery = distributedPlan->insertSelectSubquery;
List *insertTargetList = distributedPlan->insertTargetList;
Oid targetRelationId = distributedPlan->targetRelationId;
char *intermediateResultIdPrefix = distributedPlan->intermediateResultIdPrefix;
HTAB *shardConnectionsHash = NULL;
ereport(DEBUG1, (errmsg("Collecting INSERT ... SELECT results on coordinator")));
@ -71,8 +83,60 @@ CoordinatorInsertSelectExecScan(CustomScanState *node)
LockPartitionRelations(targetRelationId, RowExclusiveLock);
}
ExecuteSelectIntoRelation(targetRelationId, insertTargetList, selectQuery,
executorState);
if (distributedPlan->workerJob != NULL)
{
/*
* If we also have a workerJob that means there is a second step
* to the INSERT...SELECT. This happens when there is a RETURNING
* or ON CONFLICT clause which is implemented as a separate
* distributed INSERT...SELECT from a set of intermediate results
* to the target relation.
*/
Job *workerJob = distributedPlan->workerJob;
ListCell *taskCell = NULL;
List *taskList = workerJob->taskList;
List *prunedTaskList = NIL;
bool hasReturning = distributedPlan->hasReturning;
bool isModificationQuery = true;
shardConnectionsHash = ExecuteSelectIntoColocatedIntermediateResults(
targetRelationId,
insertTargetList,
selectQuery,
executorState,
intermediateResultIdPrefix);
/*
* We cannot actually execute INSERT...SELECT tasks that read from
* intermediate results that weren't created because no rows were
* written to them. Prune those tasks out by only including tasks
* on shards with connections.
*/
foreach(taskCell, taskList)
{
Task *task = (Task *) lfirst(taskCell);
uint64 shardId = task->anchorShardId;
bool shardModified = false;
hash_search(shardConnectionsHash, &shardId, HASH_FIND, &shardModified);
if (shardModified)
{
prunedTaskList = lappend(prunedTaskList, task);
}
}
if (prunedTaskList != NIL)
{
ExecuteMultipleTasks(scanState, prunedTaskList, isModificationQuery,
hasReturning);
}
}
else
{
ExecuteSelectIntoRelation(targetRelationId, insertTargetList, selectQuery,
executorState);
}
scanState->finishedRemoteScan = true;
}
@ -83,6 +147,61 @@ CoordinatorInsertSelectExecScan(CustomScanState *node)
}
/*
* ExecuteSelectIntoColocatedIntermediateResults executes the given select query
* and inserts tuples into a set of intermediate results that are colocated with
* the target table for further processing of ON CONFLICT or RETURNING. It also
* returns the hash of connections that were used to insert tuplesinto the target
* relation.
*/
static HTAB *
ExecuteSelectIntoColocatedIntermediateResults(Oid targetRelationId,
List *insertTargetList,
Query *selectQuery, EState *executorState,
char *intermediateResultIdPrefix)
{
ParamListInfo paramListInfo = executorState->es_param_list_info;
int partitionColumnIndex = -1;
List *columnNameList = NIL;
bool stopOnFailure = false;
char partitionMethod = 0;
CitusCopyDestReceiver *copyDest = NULL;
Query *queryCopy = NULL;
partitionMethod = PartitionMethod(targetRelationId);
if (partitionMethod == DISTRIBUTE_BY_NONE)
{
stopOnFailure = true;
}
/* Get column name list and partition column index for the target table */
columnNameList = BuildColumnNameListFromTargetList(targetRelationId,
insertTargetList);
partitionColumnIndex = PartitionColumnIndexFromColumnList(targetRelationId,
columnNameList);
/* set up a DestReceiver that copies into the intermediate table */
copyDest = CreateCitusCopyDestReceiver(targetRelationId, columnNameList,
partitionColumnIndex, executorState,
stopOnFailure, intermediateResultIdPrefix);
/*
* Make a copy of the query, since ExecuteQueryIntoDestReceiver may scribble on it
* and we want it to be replanned every time if it is stored in a prepared
* statement.
*/
queryCopy = copyObject(selectQuery);
ExecuteQueryIntoDestReceiver(queryCopy, paramListInfo, (DestReceiver *) copyDest);
executorState->es_processed = copyDest->tuplesSent;
XactModificationLevel = XACT_MODIFICATION_DATA;
return copyDest->shardConnectionHash;
}
/*
* ExecuteSelectIntoRelation executes given SELECT query and inserts the
* results into the target relation, which is assumed to be a distributed
@ -93,14 +212,10 @@ ExecuteSelectIntoRelation(Oid targetRelationId, List *insertTargetList,
Query *selectQuery, EState *executorState)
{
ParamListInfo paramListInfo = executorState->es_param_list_info;
ListCell *insertTargetCell = NULL;
int partitionColumnIndex = -1;
List *columnNameList = NIL;
bool stopOnFailure = false;
char partitionMethod = 0;
Var *partitionColumn = NULL;
int partitionColumnIndex = -1;
CitusCopyDestReceiver *copyDest = NULL;
Query *queryCopy = NULL;
@ -110,32 +225,16 @@ ExecuteSelectIntoRelation(Oid targetRelationId, List *insertTargetList,
stopOnFailure = true;
}
partitionColumn = PartitionColumn(targetRelationId, 0);
/* build the list of column names for the COPY statement */
foreach(insertTargetCell, insertTargetList)
{
TargetEntry *insertTargetEntry = (TargetEntry *) lfirst(insertTargetCell);
char *columnName = insertTargetEntry->resname;
/* load the column information from pg_attribute */
AttrNumber attrNumber = get_attnum(targetRelationId, columnName);
/* check whether this is the partition column */
if (partitionColumn != NULL && attrNumber == partitionColumn->varattno)
{
Assert(partitionColumnIndex == -1);
partitionColumnIndex = list_length(columnNameList);
}
columnNameList = lappend(columnNameList, insertTargetEntry->resname);
}
/* Get column name list and partition column index for the target table */
columnNameList = BuildColumnNameListFromTargetList(targetRelationId,
insertTargetList);
partitionColumnIndex = PartitionColumnIndexFromColumnList(targetRelationId,
columnNameList);
/* set up a DestReceiver that copies into the distributed table */
copyDest = CreateCitusCopyDestReceiver(targetRelationId, columnNameList,
partitionColumnIndex, executorState,
stopOnFailure);
stopOnFailure, NULL);
/*
* Make a copy of the query, since ExecuteQueryIntoDestReceiver may scribble on it
@ -150,3 +249,56 @@ ExecuteSelectIntoRelation(Oid targetRelationId, List *insertTargetList,
XactModificationLevel = XACT_MODIFICATION_DATA;
}
/*
* BuildColumnNameListForCopyStatement build the column name list given the insert
* target list.
*/
static List *
BuildColumnNameListFromTargetList(Oid targetRelationId, List *insertTargetList)
{
ListCell *insertTargetCell = NULL;
List *columnNameList = NIL;
/* build the list of column names for the COPY statement */
foreach(insertTargetCell, insertTargetList)
{
TargetEntry *insertTargetEntry = (TargetEntry *) lfirst(insertTargetCell);
columnNameList = lappend(columnNameList, insertTargetEntry->resname);
}
return columnNameList;
}
/*
* PartitionColumnIndexFromColumnList returns the index of partition column from given
* column name list and relation ID. If given list doesn't contain the partition
* column, it returns -1.
*/
static int
PartitionColumnIndexFromColumnList(Oid relationId, List *columnNameList)
{
ListCell *columnNameCell = NULL;
Var *partitionColumn = PartitionColumn(relationId, 0);
int partitionColumnIndex = 0;
foreach(columnNameCell, columnNameList)
{
char *columnName = (char *) lfirst(columnNameCell);
AttrNumber attrNumber = get_attnum(relationId, columnName);
/* check whether this is the partition column */
if (partitionColumn != NULL && attrNumber == partitionColumn->varattno)
{
return partitionColumnIndex;
}
partitionColumnIndex++;
}
return -1;
}

View File

@ -95,8 +95,6 @@ static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task);
static List * BuildPlacementAccessList(uint32 groupId, List *relationShardList,
ShardPlacementAccessType accessType);
static List * GetModifyConnections(Task *task, bool markCritical);
static void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList,
bool isModificationQuery, bool expectResults);
static int64 ExecuteModifyTasks(List *taskList, bool expectResults,
ParamListInfo paramListInfo, CitusScanState *scanState);
static void AcquireExecutorShardLock(Task *task, CmdType commandType);
@ -1233,7 +1231,7 @@ GetModifyConnections(Task *task, bool markCritical)
* Otherwise, the changes are committed using 2PC when the local transaction
* commits.
*/
static void
void
ExecuteMultipleTasks(CitusScanState *scanState, List *taskList,
bool isModificationQuery, bool expectResults)
{

View File

@ -603,7 +603,7 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
}
distributedPlan =
CreateInsertSelectPlan(originalQuery, plannerRestrictionContext);
CreateInsertSelectPlan(planId, originalQuery, plannerRestrictionContext);
}
else
{

View File

@ -24,6 +24,7 @@
#include "distributed/multi_router_planner.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/query_pushdown_planning.h"
#include "distributed/recursive_planning.h"
#include "distributed/resource_lock.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
@ -37,6 +38,7 @@
#include "parser/parse_coerce.h"
#include "parser/parse_relation.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
static DistributedPlan * CreateDistributedInsertSelectPlan(Query *originalQuery,
@ -60,10 +62,12 @@ static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query,
subqueryRte,
Oid *
selectPartitionColumnTableId);
static DistributedPlan * CreateCoordinatorInsertSelectPlan(Query *parse);
static DistributedPlan * CreateCoordinatorInsertSelectPlan(uint64 planId, Query *parse);
static DeferredErrorMessage * CoordinatorInsertSelectSupported(Query *insertSelectQuery);
static Query * WrapSubquery(Query *subquery);
static bool CheckInsertSelectQuery(Query *query);
static List * TwoPhaseInsertSelectTaskList(Oid targetRelationId, Query *insertSelectQuery,
char *resultIdPrefix);
/*
@ -172,7 +176,7 @@ CheckInsertSelectQuery(Query *query)
* plan for evaluating the SELECT on the coordinator.
*/
DistributedPlan *
CreateInsertSelectPlan(Query *originalQuery,
CreateInsertSelectPlan(uint64 planId, Query *originalQuery,
PlannerRestrictionContext *plannerRestrictionContext)
{
DistributedPlan *distributedPlan = NULL;
@ -193,7 +197,7 @@ CreateInsertSelectPlan(Query *originalQuery,
RaiseDeferredError(distributedPlan->planningError, DEBUG1);
/* if INSERT..SELECT cannot be distributed, pull to coordinator */
distributedPlan = CreateCoordinatorInsertSelectPlan(originalQuery);
distributedPlan = CreateCoordinatorInsertSelectPlan(planId, originalQuery);
}
return distributedPlan;
@ -201,7 +205,7 @@ CreateInsertSelectPlan(Query *originalQuery,
/*
* CreateDistributedInsertSelectPlan Creates a DistributedPlan for distributed
* CreateDistributedInsertSelectPlan creates a DistributedPlan for distributed
* INSERT ... SELECT queries which could consists of multiple tasks.
*
* The function never returns NULL, it errors out if cannot create the DistributedPlan.
@ -264,6 +268,9 @@ CreateDistributedInsertSelectPlan(Query *originalQuery,
taskIdIndex,
allDistributionKeysInQueryAreEqual);
/* Planning error gelmisse return et, ustteki fonksiyona */
/* distributed plan gecir */
/* add the task if it could be created */
if (modifyTask != NULL)
{
@ -1126,7 +1133,7 @@ InsertPartitionColumnMatchesSelect(Query *query, RangeTblEntry *insertRte,
* distributed table. The query plan can also be executed on a worker in MX.
*/
static DistributedPlan *
CreateCoordinatorInsertSelectPlan(Query *parse)
CreateCoordinatorInsertSelectPlan(uint64 planId, Query *parse)
{
Query *insertSelectQuery = copyObject(parse);
Query *selectQuery = NULL;
@ -1175,6 +1182,40 @@ CreateCoordinatorInsertSelectPlan(Query *parse)
ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte);
if (insertSelectQuery->onConflict || insertSelectQuery->returningList != NIL)
{
/*
* We cannot perform a COPY operation with RETURNING or ON CONFLICT.
* We therefore perform the INSERT...SELECT in two phases. First we
* copy the result of the SELECT query in a set of intermediate
* results, one for each shard placement in the destination table.
* Second, we perform an INSERT..SELECT..ON CONFLICT/RETURNING from
* the intermediate results into the destination table. This is
* represented in the plan by simply having both an
* insertSelectSubuery and a workerJob to execute afterwards.
*/
uint64 jobId = INVALID_JOB_ID;
Job *workerJob = NULL;
List *taskList = NIL;
char *resultIdPrefix = InsertSelectResultIdPrefix(planId);
/* generate tasks for the INSERT..SELECT phase */
taskList = TwoPhaseInsertSelectTaskList(targetRelationId, insertSelectQuery,
resultIdPrefix);
workerJob = CitusMakeNode(Job);
workerJob->taskList = taskList;
workerJob->subqueryPushdown = false;
workerJob->dependedJobList = NIL;
workerJob->jobId = jobId;
workerJob->jobQuery = insertSelectQuery;
workerJob->requiresMasterEvaluation = false;
distributedPlan->workerJob = workerJob;
distributedPlan->hasReturning = insertSelectQuery->returningList != NIL;
distributedPlan->intermediateResultIdPrefix = resultIdPrefix;
}
distributedPlan->insertSelectSubquery = selectQuery;
distributedPlan->insertTargetList = insertSelectQuery->targetList;
distributedPlan->targetRelationId = targetRelationId;
@ -1186,8 +1227,8 @@ CreateCoordinatorInsertSelectPlan(Query *parse)
/*
* CoordinatorInsertSelectSupported returns an error if executing an
* INSERT ... SELECT command by pulling results of the SELECT to the coordinator
* is unsupported because it uses RETURNING, ON CONFLICT, or an append-distributed
* table.
* is unsupported because it needs to generate sequence values or insert into an
* append-distributed table.
*/
static DeferredErrorMessage *
CoordinatorInsertSelectSupported(Query *insertSelectQuery)
@ -1195,19 +1236,12 @@ CoordinatorInsertSelectSupported(Query *insertSelectQuery)
RangeTblEntry *insertRte = NULL;
RangeTblEntry *subqueryRte = NULL;
Query *subquery = NULL;
DeferredErrorMessage *deferredError = NULL;
if (list_length(insertSelectQuery->returningList) > 0)
deferredError = ErrorIfOnConflictNotSupported(insertSelectQuery);
if (deferredError)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"RETURNING is not supported in INSERT ... SELECT via "
"coordinator", NULL, NULL);
}
if (insertSelectQuery->onConflict)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"ON CONFLICT is not supported in INSERT ... SELECT via "
"coordinator", NULL, NULL);
return deferredError;
}
insertRte = ExtractInsertRangeTableEntry(insertSelectQuery);
@ -1295,3 +1329,145 @@ WrapSubquery(Query *subquery)
return outerQuery;
}
/*
* TwoPhaseInsertSelectTaskList generates a list of task for a query that
* inserts into a target relation and selects from a set of co-located
* intermediate results.
*/
static List *
TwoPhaseInsertSelectTaskList(Oid targetRelationId, Query *insertSelectQuery,
char *resultIdPrefix)
{
List *taskList = NIL;
/*
* Make a copy of the INSERT ... SELECT. We'll repeatedly replace the
* subquery of insertResultQuery for different intermediate results and
* then deparse it.
*/
Query *insertResultQuery = copyObject(insertSelectQuery);
RangeTblEntry *insertRte = ExtractInsertRangeTableEntry(insertResultQuery);
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertResultQuery);
DistTableCacheEntry *targetCacheEntry = DistributedTableCacheEntry(targetRelationId);
int shardCount = targetCacheEntry->shardIntervalArrayLength;
int shardOffset = 0;
uint32 taskIdIndex = 1;
uint64 jobId = INVALID_JOB_ID;
ListCell *targetEntryCell = NULL;
Relation distributedRelation = NULL;
TupleDesc destTupleDescriptor = NULL;
distributedRelation = heap_open(targetRelationId, RowExclusiveLock);
destTupleDescriptor = RelationGetDescr(distributedRelation);
/*
* If the type of insert column and target table's column type is
* different from each other. Cast insert column't type to target
* table's column
*/
foreach(targetEntryCell, insertSelectQuery->targetList)
{
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
Var *insertColumn = (Var *) targetEntry->expr;
Form_pg_attribute attr = TupleDescAttr(destTupleDescriptor, targetEntry->resno -
1);
if (insertColumn->vartype != attr->atttypid)
{
CoerceViaIO *coerceExpr = makeNode(CoerceViaIO);
coerceExpr->arg = (Expr *) copyObject(insertColumn);
coerceExpr->resulttype = attr->atttypid;
coerceExpr->resultcollid = attr->attcollation;
coerceExpr->coerceformat = COERCE_IMPLICIT_CAST;
coerceExpr->location = -1;
targetEntry->expr = (Expr *) coerceExpr;
}
}
for (shardOffset = 0; shardOffset < shardCount; shardOffset++)
{
ShardInterval *targetShardInterval =
targetCacheEntry->sortedShardIntervalArray[shardOffset];
uint64 shardId = targetShardInterval->shardId;
List *columnAliasList = NIL;
List *insertShardPlacementList = NIL;
Query *resultSelectQuery = NULL;
StringInfo queryString = makeStringInfo();
RelationShard *relationShard = NULL;
Task *modifyTask = NULL;
StringInfo resultId = makeStringInfo();
/* during COPY, the shard ID is appended to the result name */
appendStringInfo(resultId, "%s_" UINT64_FORMAT, resultIdPrefix, shardId);
/* generate the query on the intermediate result */
resultSelectQuery = BuildSubPlanResultQuery(insertSelectQuery->targetList,
columnAliasList, resultId->data);
/* put the intermediate result query in the INSERT..SELECT */
selectRte->subquery = resultSelectQuery;
/* setting an alias simplifies deparsing of RETURNING */
if (insertRte->alias == NULL)
{
Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL);
insertRte->alias = alias;
}
/*
* Generate a query string for the query that inserts into a shard and reads
* from an intermediate result.
*
* Since CTEs have already been converted to intermediate results, they need
* to removed from the query. Otherwise, worker queries include both
* intermediate results and CTEs in the query.
*/
insertResultQuery->cteList = NIL;
deparse_shard_query(insertResultQuery, targetRelationId, shardId, queryString);
ereport(DEBUG2, (errmsg("distributed statement: %s", queryString->data)));
LockShardDistributionMetadata(shardId, ShareLock);
insertShardPlacementList = FinalizedShardPlacementList(shardId);
relationShard = CitusMakeNode(RelationShard);
relationShard->relationId = targetShardInterval->relationId;
relationShard->shardId = targetShardInterval->shardId;
modifyTask = CreateBasicTask(jobId, taskIdIndex, MODIFY_TASK, queryString->data);
modifyTask->dependedTaskList = NULL;
modifyTask->anchorShardId = shardId;
modifyTask->taskPlacementList = insertShardPlacementList;
modifyTask->upsertQuery = insertResultQuery->onConflict != NULL;
modifyTask->relationShardList = list_make1(relationShard);
modifyTask->replicationModel = targetCacheEntry->replicationModel;
taskList = lappend(taskList, modifyTask);
taskIdIndex++;
}
heap_close(distributedRelation, NoLock);
return taskList;
}
/*
* InsertSelectResultPrefix returns the prefix to use for intermediate
* results of an INSERT ... SELECT via the coordinator that runs in two
* phases in order to do RETURNING or ON CONFLICT.
*/
char *
InsertSelectResultIdPrefix(uint64 planId)
{
StringInfo resultIdPrefix = makeStringInfo();
appendStringInfo(resultIdPrefix, "insert_select_" UINT64_FORMAT, planId);
return resultIdPrefix->data;
}

View File

@ -555,6 +555,7 @@ DeferredErrorMessage *
ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuery,
PlannerRestrictionContext *plannerRestrictionContext)
{
DeferredErrorMessage *deferredError = NULL;
Oid distributedTableId = ExtractFirstDistributedTableId(queryTree);
uint32 rangeTableId = 1;
Var *partitionColumn = PartitionColumn(distributedTableId, rangeTableId);
@ -562,7 +563,6 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer
ListCell *rangeTableCell = NULL;
uint32 queryTableCount = 0;
CmdType commandType = queryTree->commandType;
DeferredErrorMessage *deferredError = NULL;
/*
* Here, we check if a recursively planned query tries to modify

View File

@ -0,0 +1,15 @@
# INSERT ... SELECT query planning
Citus supports `INSERT ... SELECT` queries either by pushing down the whole query to the worker nodes or pulling the `SELECT` part to the coordinator.
## INSERT ... SELECT - by pushing down
If `INSERT ... SELECT` query can be planned by pushing down it to the worker nodes, Citus selects to choose that logic first. Query is planned separately for each shard in the target table. Do so by replacing the partitioning qual parameter using the shard's actual boundary values to create modify task for each shard. Then, shard pruning is performed to decide on to which shards query will be pushed down. Finally, checks if the target shardInterval has exactly same placements with the select task's available anchor placements.
## INSERT...SELECT - via the coordinator
If the query can not be pushed down to the worker nodes, two different approaches can be followed depending on whether ON CONFLICT or RETURNING clauses are used.
* If `ON CONFLICT` or `RETURNING` are not used, Citus uses `COPY` command to handle such queries. After planning the `SELECT` part of the `INSERT ... SELECT` query, including subqueries and CTEs, it executes the plan and send results back to the DestReceiver which is created using the target table info.
* Since `COPY` command supports neither `ON CONFLICT` nor `RETURNING` clauses, Citus perform `INSERT ... SELECT` queries with `ON CONFLICT` or `RETURNING` clause in two phases. First, Citus plans the `SELECT` part of the query, executes the plan and saves results to the intermediate table which is colocated with target table of the `INSERT ... SELECT` query. Then, `INSERT ... SELECT` query is directly run on the worker node using the intermediate table as the source table.

View File

@ -162,8 +162,6 @@ static bool CteReferenceListWalker(Node *node, CteReferenceWalkerContext *contex
static bool ContainsReferencesToOuterQuery(Query *query);
static bool ContainsReferencesToOuterQueryWalker(Node *node,
VarLevelsUpWalkerContext *context);
static Query * BuildSubPlanResultQuery(Query *subquery, List *columnAliasList,
uint64 planId, uint32 subPlanId);
/*
@ -686,6 +684,8 @@ RecursivelyPlanCTEs(Query *query, RecursivePlanningContext *planningContext)
Query *subquery = (Query *) cte->ctequery;
uint64 planId = planningContext->planId;
uint32 subPlanId = 0;
char *resultId = NULL;
List *cteTargetList = NIL;
Query *resultQuery = NULL;
DistributedSubPlan *subPlan = NULL;
ListCell *rteCell = NULL;
@ -725,9 +725,23 @@ RecursivelyPlanCTEs(Query *query, RecursivePlanningContext *planningContext)
subPlan = CreateDistributedSubPlan(subPlanId, subquery);
planningContext->subPlanList = lappend(planningContext->subPlanList, subPlan);
/* build the result_id parameter for the call to read_intermediate_result */
resultId = GenerateResultId(planId, subPlanId);
if (subquery->returningList)
{
/* modifying CTE with returning */
cteTargetList = subquery->returningList;
}
else
{
/* regular SELECT CTE */
cteTargetList = subquery->targetList;
}
/* replace references to the CTE with a subquery that reads results */
resultQuery = BuildSubPlanResultQuery(subquery, cte->aliascolnames, planId,
subPlanId);
resultQuery = BuildSubPlanResultQuery(cteTargetList, cte->aliascolnames,
resultId);
foreach(rteCell, context.cteReferenceList)
{
@ -1080,7 +1094,7 @@ RecursivelyPlanSubquery(Query *subquery, RecursivePlanningContext *planningConte
DistributedSubPlan *subPlan = NULL;
uint64 planId = planningContext->planId;
int subPlanId = 0;
char *resultId = NULL;
Query *resultQuery = NULL;
Query *debugQuery = NULL;
@ -1109,11 +1123,14 @@ RecursivelyPlanSubquery(Query *subquery, RecursivePlanningContext *planningConte
subPlan = CreateDistributedSubPlan(subPlanId, subquery);
planningContext->subPlanList = lappend(planningContext->subPlanList, subPlan);
/* build the result_id parameter for the call to read_intermediate_result */
resultId = GenerateResultId(planId, subPlanId);
/*
* BuildSubPlanResultQuery() can optionally use provided column aliases.
* We do not need to send additional alias list for subqueries.
*/
resultQuery = BuildSubPlanResultQuery(subquery, NIL, planId, subPlanId);
resultQuery = BuildSubPlanResultQuery(subquery->targetList, NIL, resultId);
if (log_min_messages <= DEBUG1 || client_min_messages <= DEBUG1)
{
@ -1294,21 +1311,19 @@ ContainsReferencesToOuterQueryWalker(Node *node, VarLevelsUpWalkerContext *conte
* SELECT
* <target list>
* FROM
* read_intermediate_result('<planId>_<subPlanId>', '<copy format'>)
* read_intermediate_result('<resultId>', '<copy format'>)
* AS res (<column definition list>);
*
* The target list and column definition list are derived from the given subquery
* and columm name alias list.
* The caller can optionally supply a columnAliasList, which is useful for
* CTEs that have column aliases.
*
* If any of the types in the target list cannot be used in the binary copy format,
* then the copy format 'text' is used, otherwise 'binary' is used.
*/
static Query *
BuildSubPlanResultQuery(Query *subquery, List *columnAliasList, uint64 planId,
uint32 subPlanId)
Query *
BuildSubPlanResultQuery(List *targetEntryList, List *columnAliasList, char *resultId)
{
Query *resultQuery = NULL;
char *resultIdString = NULL;
Const *resultIdConst = NULL;
Const *resultFormatConst = NULL;
FuncExpr *funcExpr = NULL;
@ -1328,16 +1343,6 @@ BuildSubPlanResultQuery(Query *subquery, List *columnAliasList, uint64 planId,
Oid copyFormatId = BinaryCopyFormatId();
int columnAliasCount = list_length(columnAliasList);
List *targetEntryList = NIL;
if (subquery->returningList)
{
targetEntryList = subquery->returningList;
}
else
{
targetEntryList = subquery->targetList;
}
/* build the target list and column definition list */
foreach(targetEntryCell, targetEntryList)
{
@ -1403,14 +1408,11 @@ BuildSubPlanResultQuery(Query *subquery, List *columnAliasList, uint64 planId,
columnNumber++;
}
/* build the result_id parameter for the call to read_intermediate_result */
resultIdString = GenerateResultId(planId, subPlanId);
resultIdConst = makeNode(Const);
resultIdConst->consttype = TEXTOID;
resultIdConst->consttypmod = -1;
resultIdConst->constlen = -1;
resultIdConst->constvalue = CStringGetTextDatum(resultIdString);
resultIdConst->constvalue = CStringGetTextDatum(resultId);
resultIdConst->constbyval = false;
resultIdConst->constisnull = false;
resultIdConst->location = -1;

View File

@ -115,6 +115,7 @@ CopyNodeDistributedPlan(COPYFUNC_ARGS)
COPY_NODE_FIELD(insertSelectSubquery);
COPY_NODE_FIELD(insertTargetList);
COPY_SCALAR_FIELD(targetRelationId);
COPY_STRING_FIELD(intermediateResultIdPrefix);
COPY_NODE_FIELD(subPlanList);

View File

@ -188,6 +188,7 @@ OutDistributedPlan(OUTFUNC_ARGS)
WRITE_NODE_FIELD(insertSelectSubquery);
WRITE_NODE_FIELD(insertTargetList);
WRITE_OID_FIELD(targetRelationId);
WRITE_STRING_FIELD(intermediateResultIdPrefix);
WRITE_NODE_FIELD(subPlanList);

View File

@ -214,6 +214,7 @@ ReadDistributedPlan(READFUNC_ARGS)
READ_NODE_FIELD(insertSelectSubquery);
READ_NODE_FIELD(insertTargetList);
READ_OID_FIELD(targetRelationId);
READ_STRING_FIELD(intermediateResultIdPrefix);
READ_NODE_FIELD(subPlanList);

View File

@ -108,6 +108,9 @@ typedef struct CitusCopyDestReceiver
/* useful for tracking multi shard accesses */
bool multiShardCopy;
/* copy into intermediate result */
char *intermediateResultIdPrefix;
} CitusCopyDestReceiver;
@ -116,7 +119,8 @@ extern CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid relationId,
List *columnNameList,
int partitionColumnIndex,
EState *executorState,
bool stopOnFailure);
bool stopOnFailure,
char *intermediateResultPrefix);
extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat);
extern bool CanUseBinaryCopyFormat(TupleDesc tupleDescription);
extern bool CanUseBinaryCopyFormatForType(Oid typeId);
@ -131,6 +135,7 @@ extern void EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailur
extern Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag,
const char *queryString);
extern void CheckCopyPermissions(CopyStmt *copyStatement);
extern bool IsCopyResultStmt(CopyStmt *copyStatement);
#endif /* MULTI_COPY_H */

View File

@ -30,9 +30,10 @@ extern Query * ReorderInsertSelectTargetLists(Query *originalQuery,
RangeTblEntry *subqueryRte);
extern void CoordinatorInsertSelectExplainScan(CustomScanState *node, List *ancestors,
struct ExplainState *es);
extern DistributedPlan * CreateInsertSelectPlan(Query *originalQuery,
extern DistributedPlan * CreateInsertSelectPlan(uint64 planId, Query *originalQuery,
PlannerRestrictionContext *
plannerRestrictionContext);
extern char * InsertSelectResultIdPrefix(uint64 planId);
#endif /* INSERT_SELECT_PLANNER_H */

View File

@ -259,6 +259,16 @@ typedef struct DistributedPlan
/* target relation of an INSERT ... SELECT via the coordinator */
Oid targetRelationId;
/*
* If intermediateResultIdPrefix is non-null, an INSERT ... SELECT
* via the coordinator is written to a set of intermediate results
* named according to <intermediateResultIdPrefix>_<anchorShardId>.
* That way we can run a distributed INSERT ... SELECT with
* RETURNING or ON CONFLICT from the intermediate results to the
* target relation.
*/
char *intermediateResultIdPrefix;
/* list of subplans to execute before the distributed query */
List *subPlanList;

View File

@ -44,6 +44,9 @@ extern void CitusModifyBeginScan(CustomScanState *node, EState *estate, int efla
extern TupleTableSlot * RouterSelectExecScan(CustomScanState *node);
extern TupleTableSlot * RouterModifyExecScan(CustomScanState *node);
extern void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList,
bool isModificationQuery, bool expectResults);
extern int64 ExecuteModifyTasksWithoutResults(List *taskList);
extern int64 ExecuteModifyTasksSequentiallyWithoutResults(List *taskList,
CmdType operation);

View File

@ -22,6 +22,8 @@ extern List * GenerateSubplansForSubqueriesAndCTEs(uint64 planId, Query *origina
PlannerRestrictionContext *
plannerRestrictionContext);
extern char * GenerateResultId(uint64 planId, uint32 subPlanId);
extern Query * BuildSubPlanResultQuery(List *targetEntryList, List *columnAliasList,
char *resultId);
#endif /* RECURSIVE_PLANNING_H */

View File

@ -0,0 +1,301 @@
Parsed test spec with 2 sessions
starting permutation: s1-begin s1-insert-into-select-conflict-update s2-begin s2-update s1-commit s2-commit
create_distributed_table
step s1-begin:
SET citus.shard_replication_factor to 1;
BEGIN;
step s1-insert-into-select-conflict-update:
INSERT INTO target_table
SELECT
col_1, col_2
FROM (
SELECT
col_1, col_2, col_3
FROM
source_table
LIMIT 5
) as foo
ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 RETURNING *;
col_1 col_2
1 1
5 5
3 3
4 4
2 2
step s2-begin:
BEGIN;
step s2-update:
UPDATE target_table SET col_2 = 5;
<waiting ...>
step s1-commit:
COMMIT;
step s2-update: <... completed>
step s2-commit:
COMMIT;
starting permutation: s1-begin s1-insert-into-select-conflict-do-nothing s2-begin s2-delete s1-commit s2-commit
create_distributed_table
step s1-begin:
SET citus.shard_replication_factor to 1;
BEGIN;
step s1-insert-into-select-conflict-do-nothing:
INSERT INTO target_table
SELECT
col_1, col_2
FROM (
SELECT
col_1, col_2, col_3
FROM
source_table
LIMIT 5
) as foo
ON CONFLICT DO NOTHING;
step s2-begin:
BEGIN;
step s2-delete:
DELETE FROM target_table;
<waiting ...>
step s1-commit:
COMMIT;
step s2-delete: <... completed>
step s2-commit:
COMMIT;
starting permutation: s1-begin s1-insert-into-select-conflict-do-nothing s2-begin s2-insert-into-select-conflict-update s1-commit s2-commit
create_distributed_table
step s1-begin:
SET citus.shard_replication_factor to 1;
BEGIN;
step s1-insert-into-select-conflict-do-nothing:
INSERT INTO target_table
SELECT
col_1, col_2
FROM (
SELECT
col_1, col_2, col_3
FROM
source_table
LIMIT 5
) as foo
ON CONFLICT DO NOTHING;
step s2-begin:
BEGIN;
step s2-insert-into-select-conflict-update:
INSERT INTO target_table
SELECT
col_1, col_2
FROM (
SELECT
col_1, col_2, col_3
FROM
source_table
LIMIT 5
) as foo
ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 RETURNING *;
<waiting ...>
step s1-commit:
COMMIT;
step s2-insert-into-select-conflict-update: <... completed>
col_1 col_2
1 1
5 5
3 3
4 4
2 2
step s2-commit:
COMMIT;
starting permutation: s1-begin s1-insert-into-select-conflict-update s2-begin s2-insert-into-select-conflict-update s1-commit s2-commit
create_distributed_table
step s1-begin:
SET citus.shard_replication_factor to 1;
BEGIN;
step s1-insert-into-select-conflict-update:
INSERT INTO target_table
SELECT
col_1, col_2
FROM (
SELECT
col_1, col_2, col_3
FROM
source_table
LIMIT 5
) as foo
ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 RETURNING *;
col_1 col_2
1 1
5 5
3 3
4 4
2 2
step s2-begin:
BEGIN;
step s2-insert-into-select-conflict-update:
INSERT INTO target_table
SELECT
col_1, col_2
FROM (
SELECT
col_1, col_2, col_3
FROM
source_table
LIMIT 5
) as foo
ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 RETURNING *;
<waiting ...>
step s1-commit:
COMMIT;
step s2-insert-into-select-conflict-update: <... completed>
col_1 col_2
1 1
5 5
3 3
4 4
2 2
step s2-commit:
COMMIT;
starting permutation: s1-begin s1-insert-into-select-conflict-update s2-begin s2-insert-into-select-conflict-do-nothing s1-commit s2-commit
create_distributed_table
step s1-begin:
SET citus.shard_replication_factor to 1;
BEGIN;
step s1-insert-into-select-conflict-update:
INSERT INTO target_table
SELECT
col_1, col_2
FROM (
SELECT
col_1, col_2, col_3
FROM
source_table
LIMIT 5
) as foo
ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 RETURNING *;
col_1 col_2
1 1
5 5
3 3
4 4
2 2
step s2-begin:
BEGIN;
step s2-insert-into-select-conflict-do-nothing:
INSERT INTO target_table
SELECT
col_1, col_2
FROM (
SELECT
col_1, col_2, col_3
FROM
source_table
LIMIT 5
) as foo
ON CONFLICT DO NOTHING;
<waiting ...>
step s1-commit:
COMMIT;
step s2-insert-into-select-conflict-do-nothing: <... completed>
step s2-commit:
COMMIT;
starting permutation: s1-begin-replication-factor-2 s1-insert-into-select-conflict-update-replication-factor-2 s2-begin-replication-factor-2 s2-insert-into-select-conflict-update-replication-factor-2 s1-commit s2-commit
create_distributed_table
step s1-begin-replication-factor-2:
SET citus.shard_replication_factor to 2;
BEGIN;
step s1-insert-into-select-conflict-update-replication-factor-2:
INSERT INTO target_table_2
SELECT
col_1, col_2
FROM (
SELECT
col_1, col_2, col_3
FROM
source_table
LIMIT 5
) as foo
ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 RETURNING *;
col_1 col_2 col_3
1 1
5 5
3 3
4 4
2 2
step s2-begin-replication-factor-2:
SET citus.shard_replication_factor to 2;
BEGIN;
step s2-insert-into-select-conflict-update-replication-factor-2:
INSERT INTO target_table_2
SELECT
col_1, col_2
FROM (
SELECT
col_1, col_2, col_3
FROM
source_table
LIMIT 5
) as foo
ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 RETURNING *;
<waiting ...>
step s1-commit:
COMMIT;
step s2-insert-into-select-conflict-update-replication-factor-2: <... completed>
col_1 col_2 col_3
1 1
5 5
3 3
4 4
2 2
step s2-commit:
COMMIT;

View File

@ -2197,20 +2197,35 @@ SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1;
5 | 5
(5 rows)
-- ON CONFLICT is unsupported
-- ON CONFLICT is supported
INSERT INTO raw_events_first (user_id, value_1)
SELECT s, nextval('insert_select_test_seq') FROM generate_series(1, 5) s
ON CONFLICT DO NOTHING;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: ON CONFLICT is not supported in INSERT ... SELECT via coordinator
ERROR: ON CONFLICT is not supported in INSERT ... SELECT via coordinator
-- RETURNING is unsupported
DEBUG: distributed statement: INSERT INTO public.raw_events_first_13300000 AS citus_table_alias (user_id, value_1) SELECT user_id, value_1 FROM read_intermediate_result('insert_select_206_13300000'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, value_1 integer) ON CONFLICT DO NOTHING
DEBUG: distributed statement: INSERT INTO public.raw_events_first_13300001 AS citus_table_alias (user_id, value_1) SELECT user_id, value_1 FROM read_intermediate_result('insert_select_206_13300001'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, value_1 integer) ON CONFLICT DO NOTHING
DEBUG: distributed statement: INSERT INTO public.raw_events_first_13300002 AS citus_table_alias (user_id, value_1) SELECT user_id, value_1 FROM read_intermediate_result('insert_select_206_13300002'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, value_1 integer) ON CONFLICT DO NOTHING
DEBUG: distributed statement: INSERT INTO public.raw_events_first_13300003 AS citus_table_alias (user_id, value_1) SELECT user_id, value_1 FROM read_intermediate_result('insert_select_206_13300003'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, value_1 integer) ON CONFLICT DO NOTHING
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- RETURNING is supported
INSERT INTO raw_events_first (user_id, value_1)
SELECT s, nextval('insert_select_test_seq') FROM generate_series(1, 5) s
RETURNING *;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: RETURNING is not supported in INSERT ... SELECT via coordinator
ERROR: RETURNING is not supported in INSERT ... SELECT via coordinator
DEBUG: distributed statement: INSERT INTO public.raw_events_first_13300000 AS citus_table_alias (user_id, value_1) SELECT user_id, value_1 FROM read_intermediate_result('insert_select_207_13300000'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, value_1 integer) RETURNING citus_table_alias.user_id, citus_table_alias."time", citus_table_alias.value_1, citus_table_alias.value_2, citus_table_alias.value_3, citus_table_alias.value_4
DEBUG: distributed statement: INSERT INTO public.raw_events_first_13300001 AS citus_table_alias (user_id, value_1) SELECT user_id, value_1 FROM read_intermediate_result('insert_select_207_13300001'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, value_1 integer) RETURNING citus_table_alias.user_id, citus_table_alias."time", citus_table_alias.value_1, citus_table_alias.value_2, citus_table_alias.value_3, citus_table_alias.value_4
DEBUG: distributed statement: INSERT INTO public.raw_events_first_13300002 AS citus_table_alias (user_id, value_1) SELECT user_id, value_1 FROM read_intermediate_result('insert_select_207_13300002'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, value_1 integer) RETURNING citus_table_alias.user_id, citus_table_alias."time", citus_table_alias.value_1, citus_table_alias.value_2, citus_table_alias.value_3, citus_table_alias.value_4
DEBUG: distributed statement: INSERT INTO public.raw_events_first_13300003 AS citus_table_alias (user_id, value_1) SELECT user_id, value_1 FROM read_intermediate_result('insert_select_207_13300003'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, value_1 integer) RETURNING citus_table_alias.user_id, citus_table_alias."time", citus_table_alias.value_1, citus_table_alias.value_2, citus_table_alias.value_3, citus_table_alias.value_4
DEBUG: Collecting INSERT ... SELECT results on coordinator
user_id | time | value_1 | value_2 | value_3 | value_4
---------+------+---------+---------+---------+---------
1 | | 11 | | |
5 | | 15 | | |
3 | | 13 | | |
4 | | 14 | | |
2 | | 12 | | |
(5 rows)
RESET client_min_messages;
-- INSERT ... SELECT and multi-shard SELECT in the same transaction is supported
TRUNCATE raw_events_first;

View File

@ -0,0 +1,538 @@
CREATE SCHEMA on_conflict;
SET search_path TO on_conflict, public;
SET citus.next_shard_id TO 1900000;
SET citus.shard_replication_factor TO 1;
CREATE TABLE target_table(col_1 int primary key, col_2 int);
SELECT create_distributed_table('target_table','col_1');
create_distributed_table
--------------------------
(1 row)
INSERT INTO target_table VALUES(1,2),(2,3),(3,4),(4,5),(5,6);
CREATE TABLE source_table_1(col_1 int primary key, col_2 int, col_3 int);
SELECT create_distributed_table('source_table_1','col_1');
create_distributed_table
--------------------------
(1 row)
INSERT INTO source_table_1 VALUES(1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5);
CREATE TABLE source_table_2(col_1 int, col_2 int, col_3 int);
SELECT create_distributed_table('source_table_2','col_1');
create_distributed_table
--------------------------
(1 row)
INSERT INTO source_table_2 VALUES(6,6,6),(7,7,7),(8,8,8),(9,9,9),(10,10,10);
SET client_min_messages to debug1;
-- Generate series directly on the coordinator and on conflict do nothing
INSERT INTO target_table (col_1, col_2)
SELECT
s, s
FROM
generate_series(1,10) s
ON CONFLICT DO NOTHING;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- Generate series directly on the coordinator and on conflict update the target table
INSERT INTO target_table (col_1, col_2)
SELECT s, s
FROM
generate_series(1,10) s
ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 + 1;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- Since partition columns do not match, pull the data to the coordinator
-- and do not change conflicted values
INSERT INTO target_table
SELECT
col_2, col_3
FROM
source_table_1
ON CONFLICT DO NOTHING;
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: The target table's partition column should correspond to a partition column in the subquery.
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- Since partition columns do not match, pull the data to the coordinator
-- and update the non-partition column. Query is wrapped by CTE to return
-- ordered result.
WITH inserted_table AS (
INSERT INTO target_table
SELECT
col_2, col_3
FROM
source_table_1
ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 RETURNING *
) SELECT * FROM inserted_table ORDER BY 1;
DEBUG: data-modifying statements are not supported in the WITH clauses of distributed queries
DEBUG: generating subplan 8_1 for CTE inserted_table: INSERT INTO on_conflict.target_table (col_1, col_2) SELECT col_2, col_3 FROM on_conflict.source_table_1 ON CONFLICT(col_1) DO UPDATE SET col_2 = excluded.col_2 RETURNING target_table.col_1, target_table.col_2
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: The target table's partition column should correspond to a partition column in the subquery.
DEBUG: Plan 8 query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('8_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) inserted_table ORDER BY col_1
DEBUG: Collecting INSERT ... SELECT results on coordinator
col_1 | col_2
-------+-------
1 | 1
2 | 2
3 | 3
4 | 4
5 | 5
(5 rows)
-- Subquery should be recursively planned due to the limit and do nothing on conflict
INSERT INTO target_table
SELECT
col_1, col_2
FROM (
SELECT
col_1, col_2, col_3
FROM
source_table_1
LIMIT 5
) as foo
ON CONFLICT DO NOTHING;
DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries
DEBUG: Collecting INSERT ... SELECT results on coordinator
DEBUG: push down of limit count: 5
DEBUG: generating subplan 12_1 for subquery SELECT col_1, col_2, col_3 FROM on_conflict.source_table_1 LIMIT 5
DEBUG: Plan 12 query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('12_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer)) foo
-- Subquery should be recursively planned due to the limit and update on conflict
-- Query is wrapped by CTE to return ordered result.
WITH inserted_table AS (
INSERT INTO target_table
SELECT
col_1, col_2
FROM (
SELECT
col_1, col_2, col_3
FROM
source_table_1
LIMIT 5
) as foo
ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 RETURNING *
) SELECT * FROM inserted_table ORDER BY 1;
DEBUG: data-modifying statements are not supported in the WITH clauses of distributed queries
DEBUG: generating subplan 14_1 for CTE inserted_table: INSERT INTO on_conflict.target_table (col_1, col_2) SELECT col_1, col_2 FROM (SELECT source_table_1.col_1, source_table_1.col_2, source_table_1.col_3 FROM on_conflict.source_table_1 LIMIT 5) foo ON CONFLICT(col_1) DO UPDATE SET col_2 = excluded.col_2 RETURNING target_table.col_1, target_table.col_2
DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries
DEBUG: Plan 14 query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('14_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) inserted_table ORDER BY col_1
DEBUG: Collecting INSERT ... SELECT results on coordinator
DEBUG: push down of limit count: 5
DEBUG: generating subplan 16_1 for subquery SELECT col_1, col_2, col_3 FROM on_conflict.source_table_1 LIMIT 5
DEBUG: Plan 16 query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('16_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer)) foo
col_1 | col_2
-------+-------
1 | 1
2 | 2
3 | 3
4 | 4
5 | 5
(5 rows)
-- Test with multiple subqueries. Query is wrapped by CTE to return ordered result.
WITH inserted_table AS (
INSERT INTO target_table
SELECT
col_1, col_2
FROM (
(SELECT
col_1, col_2, col_3
FROM
source_table_1
LIMIT 5)
UNION
(SELECT
col_1, col_2, col_3
FROM
source_table_2
LIMIT 5)
) as foo
ON CONFLICT(col_1) DO UPDATE SET col_2 = 0 RETURNING *
) SELECT * FROM inserted_table ORDER BY 1;
DEBUG: data-modifying statements are not supported in the WITH clauses of distributed queries
DEBUG: generating subplan 18_1 for CTE inserted_table: INSERT INTO on_conflict.target_table (col_1, col_2) SELECT col_1, col_2 FROM ((SELECT source_table_1.col_1, source_table_1.col_2, source_table_1.col_3 FROM on_conflict.source_table_1 LIMIT 5) UNION (SELECT source_table_2.col_1, source_table_2.col_2, source_table_2.col_3 FROM on_conflict.source_table_2 LIMIT 5)) foo ON CONFLICT(col_1) DO UPDATE SET col_2 = 0 RETURNING target_table.col_1, target_table.col_2
DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries
DEBUG: Plan 18 query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('18_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) inserted_table ORDER BY col_1
DEBUG: Collecting INSERT ... SELECT results on coordinator
DEBUG: push down of limit count: 5
DEBUG: generating subplan 20_1 for subquery SELECT col_1, col_2, col_3 FROM on_conflict.source_table_1 LIMIT 5
DEBUG: push down of limit count: 5
DEBUG: generating subplan 20_2 for subquery SELECT col_1, col_2, col_3 FROM on_conflict.source_table_2 LIMIT 5
DEBUG: generating subplan 20_3 for subquery SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('20_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer) UNION SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('20_2'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer)
DEBUG: Plan 20 query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('20_3'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer)) foo
col_1 | col_2
-------+-------
1 | 0
2 | 0
3 | 0
4 | 0
5 | 0
6 | 0
7 | 0
8 | 0
9 | 0
10 | 0
(10 rows)
-- Get the select part from cte and do nothing on conflict
WITH cte AS(
SELECT col_1, col_2 FROM source_table_1
)
INSERT INTO target_table SELECT * FROM cte ON CONFLICT DO NOTHING;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
DEBUG: generating subplan 25_1 for CTE cte: SELECT col_1, col_2 FROM on_conflict.source_table_1
DEBUG: Plan 25 query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT cte.col_1, cte.col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('25_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) cte) citus_insert_select_subquery
-- Get the select part from cte and update on conflict
WITH cte AS(
SELECT col_1, col_2 FROM source_table_1
)
INSERT INTO target_table SELECT * FROM cte ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 + 1;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
DEBUG: generating subplan 28_1 for CTE cte: SELECT col_1, col_2 FROM on_conflict.source_table_1
DEBUG: Plan 28 query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT cte.col_1, cte.col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('28_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) cte) citus_insert_select_subquery
SELECT * FROM target_table ORDER BY 1;
col_1 | col_2
-------+-------
1 | 2
2 | 3
3 | 4
4 | 5
5 | 6
6 | 0
7 | 0
8 | 0
9 | 0
10 | 0
(10 rows)
-- Test with multiple CTEs
WITH cte AS(
SELECT col_1, col_2 FROM source_table_1
), cte_2 AS(
SELECT col_1, col_2 FROM source_table_2
)
INSERT INTO target_table ((SELECT * FROM cte) UNION (SELECT * FROM cte_2)) ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 + 1;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
DEBUG: generating subplan 32_1 for CTE cte: SELECT col_1, col_2 FROM on_conflict.source_table_1
DEBUG: generating subplan 32_2 for CTE cte_2: SELECT col_1, col_2 FROM on_conflict.source_table_2
DEBUG: generating subplan 32_3 for subquery SELECT cte.col_1, cte.col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('32_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) cte UNION SELECT cte_2.col_1, cte_2.col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('32_2'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) cte_2
DEBUG: Plan 32 query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('32_3'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) citus_insert_select_subquery
SELECT * FROM target_table ORDER BY 1;
col_1 | col_2
-------+-------
1 | 2
2 | 3
3 | 4
4 | 5
5 | 6
6 | 7
7 | 8
8 | 9
9 | 10
10 | 11
(10 rows)
WITH inserted_table AS (
WITH cte AS(
SELECT col_1, col_2, col_3 FROM source_table_1
), cte_2 AS(
SELECT col_1, col_2 FROM cte
)
INSERT INTO target_table SELECT * FROM cte_2 ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 + 1 RETURNING *
) SELECT * FROM inserted_table ORDER BY 1;
DEBUG: data-modifying statements are not supported in the WITH clauses of distributed queries
DEBUG: generating subplan 37_1 for CTE inserted_table: WITH cte AS (SELECT source_table_1.col_1, source_table_1.col_2, source_table_1.col_3 FROM on_conflict.source_table_1), cte_2 AS (SELECT cte.col_1, cte.col_2 FROM cte) INSERT INTO on_conflict.target_table (col_1, col_2) SELECT col_1, col_2 FROM cte_2 ON CONFLICT(col_1) DO UPDATE SET col_2 = (excluded.col_2 OPERATOR(pg_catalog.+) 1) RETURNING target_table.col_1, target_table.col_2
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Plan 37 query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('37_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) inserted_table ORDER BY col_1
DEBUG: Collecting INSERT ... SELECT results on coordinator
DEBUG: generating subplan 39_1 for CTE cte: SELECT col_1, col_2, col_3 FROM on_conflict.source_table_1
DEBUG: generating subplan 39_2 for CTE cte_2: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('39_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer)) cte
DEBUG: Plan 39 query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT cte_2.col_1, cte_2.col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('39_2'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) cte_2) citus_insert_select_subquery
col_1 | col_2
-------+-------
1 | 2
2 | 3
3 | 4
4 | 5
5 | 6
(5 rows)
WITH cte AS (
WITH basic AS (
SELECT col_1, col_2 FROM source_table_1
)
INSERT INTO target_table (SELECT * FROM basic) ON CONFLICT DO NOTHING RETURNING *
)
UPDATE target_table SET col_2 = 4 WHERE col_1 IN (SELECT col_1 FROM cte);
DEBUG: common table expressions are not supported in distributed modifications
DEBUG: generating subplan 42_1 for CTE cte: WITH basic AS (SELECT source_table_1.col_1, source_table_1.col_2 FROM on_conflict.source_table_1) INSERT INTO on_conflict.target_table (col_1, col_2) SELECT col_1, col_2 FROM basic ON CONFLICT DO NOTHING RETURNING target_table.col_1, target_table.col_2
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Plan 42 query after replacing subqueries and CTEs: UPDATE on_conflict.target_table SET col_2 = 4 WHERE (col_1 OPERATOR(pg_catalog.=) ANY (SELECT cte.col_1 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('42_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) cte))
DEBUG: Collecting INSERT ... SELECT results on coordinator
DEBUG: generating subplan 44_1 for CTE basic: SELECT col_1, col_2 FROM on_conflict.source_table_1
DEBUG: Plan 44 query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT basic.col_1, basic.col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('44_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) basic) citus_insert_select_subquery
RESET client_min_messages;
-- Following query is not supported since error checks of the subquery pushdown planner
-- and insert select planner have not been unified. It should work after unifying them.
WITH cte AS (
SELECT
col_1, col_2
FROM
source_table_1
)
INSERT INTO target_table
SELECT
source_table_1.col_1,
source_table_1.col_2
FROM cte, source_table_1
WHERE cte.col_1 = source_table_1.col_1 ON CONFLICT DO NOTHING;
ERROR: cannot perform distributed planning for the given modification
DETAIL: Select query cannot be pushed down to the worker.
-- Tests with foreign key to reference table
CREATE TABLE test_ref_table (key int PRIMARY KEY);
SELECT create_reference_table('test_ref_table');
create_reference_table
------------------------
(1 row)
INSERT INTO test_ref_table VALUES (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);
ALTER TABLE target_table ADD CONSTRAINT fkey FOREIGN KEY (col_1) REFERENCES test_ref_table(key) ON DELETE CASCADE;
-- Since we try to apply DML command after modification on test_ref_table which
-- has foreign key from target_table, following two queries are not supported.
BEGIN;
TRUNCATE test_ref_table CASCADE;
NOTICE: truncate cascades to table "target_table"
INSERT INTO
target_table
SELECT
col_2,
col_1
FROM source_table_1 ON CONFLICT (col_1) DO UPDATE SET col_2 = 55 RETURNING *;
ERROR: cannot execute parallel COPY on relation "target_table" after DDL command on reference relation "test_ref_table" because there is a foreign key between them and "test_ref_table" has been modified in this transaction
DETAIL: COPY to a distributed table uses a separate set of connections which will not be able to see the uncommitted changes to the reference table.
HINT: Perform the COPY in a separate transaction.
ROLLBACK;
BEGIN;
DELETE FROM test_ref_table;
INSERT INTO
target_table
SELECT
col_2,
col_1
FROM source_table_1 ON CONFLICT (col_1) DO UPDATE SET col_2 = 55 RETURNING *;
ERROR: cannot execute parallel COPY on relation "target_table" after DML command on reference relation "test_ref_table" because there is a foreign key between them and "test_ref_table" has been modified in this transaction
DETAIL: COPY to a distributed table uses a separate set of connections which will not be able to see the uncommitted changes to the reference table.
HINT: Perform the COPY in a separate transaction.
ROLLBACK;
-- Following two queries are supported since we no not modify but only select from
-- the target_table after modification on test_ref_table.
BEGIN;
TRUNCATE test_ref_table CASCADE;
NOTICE: truncate cascades to table "target_table"
INSERT INTO
source_table_1
SELECT
col_2,
col_1
FROM target_table ON CONFLICT (col_1) DO UPDATE SET col_2 = 55 RETURNING *;
col_1 | col_2 | col_3
-------+-------+-------
(0 rows)
ROLLBACK;
BEGIN;
DELETE FROM test_ref_table;
INSERT INTO
source_table_1
SELECT
col_2,
col_1
FROM target_table ON CONFLICT (col_1) DO UPDATE SET col_2 = 55 RETURNING *;
col_1 | col_2 | col_3
-------+-------+-------
(0 rows)
ROLLBACK;
-- INSERT .. SELECT with different column types
CREATE TABLE source_table_3(col_1 numeric, col_2 numeric, col_3 numeric);
SELECT create_distributed_table('source_table_3','col_1');
create_distributed_table
--------------------------
(1 row)
INSERT INTO source_table_3 VALUES(1,11,1),(2,22,2),(3,33,3),(4,44,4),(5,55,5);
CREATE TABLE source_table_4(id int, arr_val text[]);
SELECT create_distributed_table('source_table_4','id');
create_distributed_table
--------------------------
(1 row)
INSERT INTO source_table_4 VALUES(1, '{"abc","cde","efg"}'), (2, '{"xyz","tvu"}');
CREATE TABLE target_table_2(id int primary key, arr_val char(10)[]);
SELECT create_distributed_table('target_table_2','id');
create_distributed_table
--------------------------
(1 row)
INSERT INTO target_table_2 VALUES(1, '{"abc","def","gyx"}');
SET client_min_messages to debug1;
INSERT INTO target_table
SELECT
col_1, col_2
FROM
source_table_3
ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2;
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: The data type of the target table's partition column should exactly match the data type of the corresponding simple column reference in the subquery.
DEBUG: Collecting INSERT ... SELECT results on coordinator
SELECT * FROM target_table ORDER BY 1;
col_1 | col_2
-------+-------
1 | 11
2 | 22
3 | 33
4 | 44
5 | 55
6 | 7
7 | 8
8 | 9
9 | 10
10 | 11
(10 rows)
INSERT INTO target_table_2
SELECT
*
FROM
source_table_4
ON CONFLICT DO NOTHING;
SELECT * FROM target_table_2 ORDER BY 1;
id | arr_val
----+------------------------------------------
1 | {"abc ","def ","gyx "}
2 | {"xyz ","tvu "}
(2 rows)
RESET client_min_messages;
-- Test with shard_replication_factor = 2
SET citus.shard_replication_factor to 2;
DROP TABLE target_table, source_table_1, source_table_2;
CREATE TABLE target_table(col_1 int primary key, col_2 int);
SELECT create_distributed_table('target_table','col_1');
create_distributed_table
--------------------------
(1 row)
INSERT INTO target_table VALUES(1,2),(2,3),(3,4),(4,5),(5,6);
CREATE TABLE source_table_1(col_1 int, col_2 int, col_3 int);
SELECT create_distributed_table('source_table_1','col_1');
create_distributed_table
--------------------------
(1 row)
INSERT INTO source_table_1 VALUES(1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5);
CREATE TABLE source_table_2(col_1 int, col_2 int, col_3 int);
SELECT create_distributed_table('source_table_2','col_1');
create_distributed_table
--------------------------
(1 row)
INSERT INTO source_table_2 VALUES(6,6,6),(7,7,7),(8,8,8),(9,9,9),(10,10,10);
SET client_min_messages to debug1;
-- Generate series directly on the coordinator and on conflict do nothing
INSERT INTO target_table (col_1, col_2)
SELECT
s, s
FROM
generate_series(1,10) s
ON CONFLICT DO NOTHING;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- Test with multiple subqueries
INSERT INTO target_table
SELECT
col_1, col_2
FROM (
(SELECT
col_1, col_2, col_3
FROM
source_table_1
LIMIT 5)
UNION
(SELECT
col_1, col_2, col_3
FROM
source_table_2
LIMIT 5)
) as foo
ON CONFLICT(col_1) DO UPDATE SET col_2 = 0;
DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries
DEBUG: Collecting INSERT ... SELECT results on coordinator
DEBUG: push down of limit count: 5
DEBUG: generating subplan 71_1 for subquery SELECT col_1, col_2, col_3 FROM on_conflict.source_table_1 LIMIT 5
DEBUG: push down of limit count: 5
DEBUG: generating subplan 71_2 for subquery SELECT col_1, col_2, col_3 FROM on_conflict.source_table_2 LIMIT 5
DEBUG: generating subplan 71_3 for subquery SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('71_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer) UNION SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('71_2'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer)
DEBUG: Plan 71 query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('71_3'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer)) foo
SELECT * FROM target_table ORDER BY 1;
col_1 | col_2
-------+-------
1 | 0
2 | 0
3 | 0
4 | 0
5 | 0
6 | 0
7 | 0
8 | 0
9 | 0
10 | 0
(10 rows)
WITH cte AS(
SELECT col_1, col_2, col_3 FROM source_table_1
), cte_2 AS(
SELECT col_1, col_2 FROM cte
)
INSERT INTO target_table SELECT * FROM cte_2 ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 + 1;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
DEBUG: generating subplan 77_1 for CTE cte: SELECT col_1, col_2, col_3 FROM on_conflict.source_table_1
DEBUG: generating subplan 77_2 for CTE cte_2: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('77_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer)) cte
DEBUG: Plan 77 query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT cte_2.col_1, cte_2.col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('77_2'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) cte_2) citus_insert_select_subquery
SELECT * FROM target_table ORDER BY 1;
col_1 | col_2
-------+-------
1 | 2
2 | 3
3 | 4
4 | 5
5 | 6
6 | 0
7 | 0
8 | 0
9 | 0
10 | 0
(10 rows)
RESET client_min_messages;
DROP SCHEMA on_conflict CASCADE;
NOTICE: drop cascades to 7 other objects
DETAIL: drop cascades to table test_ref_table
drop cascades to table source_table_3
drop cascades to table source_table_4
drop cascades to table target_table_2
drop cascades to table target_table
drop cascades to table source_table_1
drop cascades to table source_table_2

View File

@ -205,25 +205,6 @@ WITH cte AS (
SELECT value_2 FROM users_table WHERE user_id IN (1, 2, 3)
)
UPDATE modify_table SET val=-1 WHERE val IN (SELECT * FROM cte);
WITH cte AS (
WITH basic AS (
SELECT value_2 FROM users_table WHERE user_id IN (1, 2, 3)
)
INSERT INTO modify_table (SELECT * FROM basic) RETURNING *
)
UPDATE modify_table SET val=-2 WHERE id IN (SELECT id FROM cte);
ERROR: RETURNING is not supported in INSERT ... SELECT via coordinator
WITH cte AS (
WITH basic AS (
SELECT * FROM events_table WHERE event_type = 5
),
basic_2 AS (
SELECT user_id FROM users_table
)
INSERT INTO modify_table (SELECT user_id FROM events_table) RETURNING *
)
SELECT * FROM cte;
ERROR: RETURNING is not supported in INSERT ... SELECT via coordinator
WITH user_data AS (
SELECT user_id, value_2 FROM users_table
)

View File

@ -44,6 +44,7 @@ test: isolation_drop_vs_all
test: isolation_ddl_vs_all
test: isolation_citus_dist_activity
test: isolation_validate_vs_insert
test: isolation_insert_select_conflict
# MX tests
test: isolation_reference_on_mx

View File

@ -34,6 +34,7 @@ test: multi_create_table
test: multi_create_table_constraints multi_master_protocol multi_load_data multi_behavioral_analytics_create_table
test: multi_behavioral_analytics_basics multi_behavioral_analytics_single_shard_queries multi_insert_select_non_pushable_queries multi_insert_select
test: multi_insert_select_window multi_shard_update_delete window_functions dml_recursive recursive_dml_with_different_planners_executors
test: multi_insert_select_conflict
# ---------
# at the end of the regression tests regaring recursively planned modifications

View File

@ -0,0 +1,163 @@
setup
{
CREATE TABLE target_table(col_1 int primary key, col_2 int);
SELECT create_distributed_table('target_table','col_1');
INSERT INTO target_table VALUES(1,2),(2,3),(3,4),(4,5),(5,6);
CREATE TABLE source_table(col_1 int, col_2 int, col_3 int);
SELECT create_distributed_table('source_table','col_1');
INSERT INTO source_table VALUES(1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5);
SET citus.shard_replication_factor to 2;
CREATE TABLE target_table_2(col_1 int primary key, col_2 int, col_3 int);
SELECT create_distributed_table('target_table_2', 'col_1');
}
teardown
{
DROP TABLE target_table, target_table_2, source_table;
}
session "s1"
step "s1-begin"
{
SET citus.shard_replication_factor to 1;
BEGIN;
}
step "s1-begin-replication-factor-2"
{
SET citus.shard_replication_factor to 2;
BEGIN;
}
step "s1-insert-into-select-conflict-update"
{
INSERT INTO target_table
SELECT
col_1, col_2
FROM (
SELECT
col_1, col_2, col_3
FROM
source_table
LIMIT 5
) as foo
ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 RETURNING *;
}
step "s1-insert-into-select-conflict-do-nothing"
{
INSERT INTO target_table
SELECT
col_1, col_2
FROM (
SELECT
col_1, col_2, col_3
FROM
source_table
LIMIT 5
) as foo
ON CONFLICT DO NOTHING;
}
step "s1-commit"
{
COMMIT;
}
step "s1-insert-into-select-conflict-update-replication-factor-2"
{
INSERT INTO target_table_2
SELECT
col_1, col_2
FROM (
SELECT
col_1, col_2, col_3
FROM
source_table
LIMIT 5
) as foo
ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 RETURNING *;
}
session "s2"
step "s2-begin"
{
BEGIN;
}
step "s2-begin-replication-factor-2"
{
SET citus.shard_replication_factor to 2;
BEGIN;
}
step "s2-insert-into-select-conflict-update"
{
INSERT INTO target_table
SELECT
col_1, col_2
FROM (
SELECT
col_1, col_2, col_3
FROM
source_table
LIMIT 5
) as foo
ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 RETURNING *;
}
step "s2-insert-into-select-conflict-update-replication-factor-2"
{
INSERT INTO target_table_2
SELECT
col_1, col_2
FROM (
SELECT
col_1, col_2, col_3
FROM
source_table
LIMIT 5
) as foo
ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 RETURNING *;
}
step "s2-insert-into-select-conflict-do-nothing"
{
INSERT INTO target_table
SELECT
col_1, col_2
FROM (
SELECT
col_1, col_2, col_3
FROM
source_table
LIMIT 5
) as foo
ON CONFLICT DO NOTHING;
}
step "s2-update"
{
UPDATE target_table SET col_2 = 5;
}
step "s2-delete"
{
DELETE FROM target_table;
}
step "s2-commit"
{
COMMIT;
}
permutation "s1-begin" "s1-insert-into-select-conflict-update" "s2-begin" "s2-update" "s1-commit" "s2-commit"
permutation "s1-begin" "s1-insert-into-select-conflict-do-nothing" "s2-begin" "s2-delete" "s1-commit" "s2-commit"
permutation "s1-begin" "s1-insert-into-select-conflict-do-nothing" "s2-begin" "s2-insert-into-select-conflict-update" "s1-commit" "s2-commit"
permutation "s1-begin" "s1-insert-into-select-conflict-update" "s2-begin" "s2-insert-into-select-conflict-update" "s1-commit" "s2-commit"
permutation "s1-begin" "s1-insert-into-select-conflict-update" "s2-begin" "s2-insert-into-select-conflict-do-nothing" "s1-commit" "s2-commit"
permutation "s1-begin-replication-factor-2" "s1-insert-into-select-conflict-update-replication-factor-2" "s2-begin-replication-factor-2" "s2-insert-into-select-conflict-update-replication-factor-2" "s1-commit" "s2-commit"

View File

@ -1732,12 +1732,12 @@ FROM
SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1;
-- ON CONFLICT is unsupported
-- ON CONFLICT is supported
INSERT INTO raw_events_first (user_id, value_1)
SELECT s, nextval('insert_select_test_seq') FROM generate_series(1, 5) s
ON CONFLICT DO NOTHING;
-- RETURNING is unsupported
-- RETURNING is supported
INSERT INTO raw_events_first (user_id, value_1)
SELECT s, nextval('insert_select_test_seq') FROM generate_series(1, 5) s
RETURNING *;

View File

@ -0,0 +1,302 @@
CREATE SCHEMA on_conflict;
SET search_path TO on_conflict, public;
SET citus.next_shard_id TO 1900000;
SET citus.shard_replication_factor TO 1;
CREATE TABLE target_table(col_1 int primary key, col_2 int);
SELECT create_distributed_table('target_table','col_1');
INSERT INTO target_table VALUES(1,2),(2,3),(3,4),(4,5),(5,6);
CREATE TABLE source_table_1(col_1 int primary key, col_2 int, col_3 int);
SELECT create_distributed_table('source_table_1','col_1');
INSERT INTO source_table_1 VALUES(1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5);
CREATE TABLE source_table_2(col_1 int, col_2 int, col_3 int);
SELECT create_distributed_table('source_table_2','col_1');
INSERT INTO source_table_2 VALUES(6,6,6),(7,7,7),(8,8,8),(9,9,9),(10,10,10);
SET client_min_messages to debug1;
-- Generate series directly on the coordinator and on conflict do nothing
INSERT INTO target_table (col_1, col_2)
SELECT
s, s
FROM
generate_series(1,10) s
ON CONFLICT DO NOTHING;
-- Generate series directly on the coordinator and on conflict update the target table
INSERT INTO target_table (col_1, col_2)
SELECT s, s
FROM
generate_series(1,10) s
ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 + 1;
-- Since partition columns do not match, pull the data to the coordinator
-- and do not change conflicted values
INSERT INTO target_table
SELECT
col_2, col_3
FROM
source_table_1
ON CONFLICT DO NOTHING;
-- Since partition columns do not match, pull the data to the coordinator
-- and update the non-partition column. Query is wrapped by CTE to return
-- ordered result.
WITH inserted_table AS (
INSERT INTO target_table
SELECT
col_2, col_3
FROM
source_table_1
ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 RETURNING *
) SELECT * FROM inserted_table ORDER BY 1;
-- Subquery should be recursively planned due to the limit and do nothing on conflict
INSERT INTO target_table
SELECT
col_1, col_2
FROM (
SELECT
col_1, col_2, col_3
FROM
source_table_1
LIMIT 5
) as foo
ON CONFLICT DO NOTHING;
-- Subquery should be recursively planned due to the limit and update on conflict
-- Query is wrapped by CTE to return ordered result.
WITH inserted_table AS (
INSERT INTO target_table
SELECT
col_1, col_2
FROM (
SELECT
col_1, col_2, col_3
FROM
source_table_1
LIMIT 5
) as foo
ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 RETURNING *
) SELECT * FROM inserted_table ORDER BY 1;
-- Test with multiple subqueries. Query is wrapped by CTE to return ordered result.
WITH inserted_table AS (
INSERT INTO target_table
SELECT
col_1, col_2
FROM (
(SELECT
col_1, col_2, col_3
FROM
source_table_1
LIMIT 5)
UNION
(SELECT
col_1, col_2, col_3
FROM
source_table_2
LIMIT 5)
) as foo
ON CONFLICT(col_1) DO UPDATE SET col_2 = 0 RETURNING *
) SELECT * FROM inserted_table ORDER BY 1;
-- Get the select part from cte and do nothing on conflict
WITH cte AS(
SELECT col_1, col_2 FROM source_table_1
)
INSERT INTO target_table SELECT * FROM cte ON CONFLICT DO NOTHING;
-- Get the select part from cte and update on conflict
WITH cte AS(
SELECT col_1, col_2 FROM source_table_1
)
INSERT INTO target_table SELECT * FROM cte ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 + 1;
SELECT * FROM target_table ORDER BY 1;
-- Test with multiple CTEs
WITH cte AS(
SELECT col_1, col_2 FROM source_table_1
), cte_2 AS(
SELECT col_1, col_2 FROM source_table_2
)
INSERT INTO target_table ((SELECT * FROM cte) UNION (SELECT * FROM cte_2)) ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 + 1;
SELECT * FROM target_table ORDER BY 1;
WITH inserted_table AS (
WITH cte AS(
SELECT col_1, col_2, col_3 FROM source_table_1
), cte_2 AS(
SELECT col_1, col_2 FROM cte
)
INSERT INTO target_table SELECT * FROM cte_2 ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 + 1 RETURNING *
) SELECT * FROM inserted_table ORDER BY 1;
WITH cte AS (
WITH basic AS (
SELECT col_1, col_2 FROM source_table_1
)
INSERT INTO target_table (SELECT * FROM basic) ON CONFLICT DO NOTHING RETURNING *
)
UPDATE target_table SET col_2 = 4 WHERE col_1 IN (SELECT col_1 FROM cte);
RESET client_min_messages;
-- Following query is not supported since error checks of the subquery pushdown planner
-- and insert select planner have not been unified. It should work after unifying them.
WITH cte AS (
SELECT
col_1, col_2
FROM
source_table_1
)
INSERT INTO target_table
SELECT
source_table_1.col_1,
source_table_1.col_2
FROM cte, source_table_1
WHERE cte.col_1 = source_table_1.col_1 ON CONFLICT DO NOTHING;
-- Tests with foreign key to reference table
CREATE TABLE test_ref_table (key int PRIMARY KEY);
SELECT create_reference_table('test_ref_table');
INSERT INTO test_ref_table VALUES (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);
ALTER TABLE target_table ADD CONSTRAINT fkey FOREIGN KEY (col_1) REFERENCES test_ref_table(key) ON DELETE CASCADE;
-- Since we try to apply DML command after modification on test_ref_table which
-- has foreign key from target_table, following two queries are not supported.
BEGIN;
TRUNCATE test_ref_table CASCADE;
INSERT INTO
target_table
SELECT
col_2,
col_1
FROM source_table_1 ON CONFLICT (col_1) DO UPDATE SET col_2 = 55 RETURNING *;
ROLLBACK;
BEGIN;
DELETE FROM test_ref_table;
INSERT INTO
target_table
SELECT
col_2,
col_1
FROM source_table_1 ON CONFLICT (col_1) DO UPDATE SET col_2 = 55 RETURNING *;
ROLLBACK;
-- Following two queries are supported since we no not modify but only select from
-- the target_table after modification on test_ref_table.
BEGIN;
TRUNCATE test_ref_table CASCADE;
INSERT INTO
source_table_1
SELECT
col_2,
col_1
FROM target_table ON CONFLICT (col_1) DO UPDATE SET col_2 = 55 RETURNING *;
ROLLBACK;
BEGIN;
DELETE FROM test_ref_table;
INSERT INTO
source_table_1
SELECT
col_2,
col_1
FROM target_table ON CONFLICT (col_1) DO UPDATE SET col_2 = 55 RETURNING *;
ROLLBACK;
-- INSERT .. SELECT with different column types
CREATE TABLE source_table_3(col_1 numeric, col_2 numeric, col_3 numeric);
SELECT create_distributed_table('source_table_3','col_1');
INSERT INTO source_table_3 VALUES(1,11,1),(2,22,2),(3,33,3),(4,44,4),(5,55,5);
CREATE TABLE source_table_4(id int, arr_val text[]);
SELECT create_distributed_table('source_table_4','id');
INSERT INTO source_table_4 VALUES(1, '{"abc","cde","efg"}'), (2, '{"xyz","tvu"}');
CREATE TABLE target_table_2(id int primary key, arr_val char(10)[]);
SELECT create_distributed_table('target_table_2','id');
INSERT INTO target_table_2 VALUES(1, '{"abc","def","gyx"}');
SET client_min_messages to debug1;
INSERT INTO target_table
SELECT
col_1, col_2
FROM
source_table_3
ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2;
SELECT * FROM target_table ORDER BY 1;
INSERT INTO target_table_2
SELECT
*
FROM
source_table_4
ON CONFLICT DO NOTHING;
SELECT * FROM target_table_2 ORDER BY 1;
RESET client_min_messages;
-- Test with shard_replication_factor = 2
SET citus.shard_replication_factor to 2;
DROP TABLE target_table, source_table_1, source_table_2;
CREATE TABLE target_table(col_1 int primary key, col_2 int);
SELECT create_distributed_table('target_table','col_1');
INSERT INTO target_table VALUES(1,2),(2,3),(3,4),(4,5),(5,6);
CREATE TABLE source_table_1(col_1 int, col_2 int, col_3 int);
SELECT create_distributed_table('source_table_1','col_1');
INSERT INTO source_table_1 VALUES(1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5);
CREATE TABLE source_table_2(col_1 int, col_2 int, col_3 int);
SELECT create_distributed_table('source_table_2','col_1');
INSERT INTO source_table_2 VALUES(6,6,6),(7,7,7),(8,8,8),(9,9,9),(10,10,10);
SET client_min_messages to debug1;
-- Generate series directly on the coordinator and on conflict do nothing
INSERT INTO target_table (col_1, col_2)
SELECT
s, s
FROM
generate_series(1,10) s
ON CONFLICT DO NOTHING;
-- Test with multiple subqueries
INSERT INTO target_table
SELECT
col_1, col_2
FROM (
(SELECT
col_1, col_2, col_3
FROM
source_table_1
LIMIT 5)
UNION
(SELECT
col_1, col_2, col_3
FROM
source_table_2
LIMIT 5)
) as foo
ON CONFLICT(col_1) DO UPDATE SET col_2 = 0;
SELECT * FROM target_table ORDER BY 1;
WITH cte AS(
SELECT col_1, col_2, col_3 FROM source_table_1
), cte_2 AS(
SELECT col_1, col_2 FROM cte
)
INSERT INTO target_table SELECT * FROM cte_2 ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 + 1;
SELECT * FROM target_table ORDER BY 1;
RESET client_min_messages;
DROP SCHEMA on_conflict CASCADE;

View File

@ -129,26 +129,6 @@ WITH cte AS (
UPDATE modify_table SET val=-1 WHERE val IN (SELECT * FROM cte);
WITH cte AS (
WITH basic AS (
SELECT value_2 FROM users_table WHERE user_id IN (1, 2, 3)
)
INSERT INTO modify_table (SELECT * FROM basic) RETURNING *
)
UPDATE modify_table SET val=-2 WHERE id IN (SELECT id FROM cte);
WITH cte AS (
WITH basic AS (
SELECT * FROM events_table WHERE event_type = 5
),
basic_2 AS (
SELECT user_id FROM users_table
)
INSERT INTO modify_table (SELECT user_id FROM events_table) RETURNING *
)
SELECT * FROM cte;
WITH user_data AS (
SELECT user_id, value_2 FROM users_table
)