Merge pull request #1402 from citusdata/local_insert_select

Perform INSERT..SELECT via coordinator if it cannot be pushed down
pull/1468/head
Marco Slot 2017-06-22 16:23:34 +02:00 committed by GitHub
commit ce28b6af0d
41 changed files with 1761 additions and 269 deletions

View File

@ -1676,6 +1676,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
Relation distributedRelation = NULL;
int columnIndex = 0;
List *columnNameList = copyDest->columnNameList;
List *quotedColumnNameList = NIL;
ListCell *columnNameCell = NULL;
@ -1769,6 +1770,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
foreach(columnNameCell, columnNameList)
{
char *columnName = (char *) lfirst(columnNameCell);
char *quotedColumnName = (char *) quote_identifier(columnName);
/* load the column information from pg_attribute */
AttrNumber attrNumber = get_attnum(tableId, columnName);
@ -1782,6 +1784,8 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
}
columnIndex++;
quotedColumnNameList = lappend(quotedColumnNameList, quotedColumnName);
}
if (partitionMethod != DISTRIBUTE_BY_NONE && partitionColumnIndex == -1)
@ -1797,7 +1801,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
copyStatement = makeNode(CopyStmt);
copyStatement->relation = makeRangeVar(schemaName, relationName, -1);
copyStatement->query = NULL;
copyStatement->attlist = columnNameList;
copyStatement->attlist = quotedColumnNameList;
copyStatement->is_from = true;
copyStatement->is_program = false;
copyStatement->filename = NULL;
@ -1870,7 +1874,7 @@ CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
relationName);
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("the partition column of table %s should have a value",
errmsg("the partition column of table %s cannot be NULL",
qualifiedTableName)));
}
@ -1923,6 +1927,8 @@ CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
MemoryContextSwitchTo(oldContext);
copyDest->tuplesSent++;
#if PG_VERSION_NUM >= 90600
return true;
#endif

View File

@ -0,0 +1,157 @@
/*-------------------------------------------------------------------------
*
* insert_select_executor.c
*
* Executor logic for INSERT..SELECT.
*
* Copyright (c) 2017, Citus Data, Inc.
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "distributed/insert_select_executor.h"
#include "distributed/insert_select_planner.h"
#include "distributed/multi_copy.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_planner.h"
#include "distributed/transaction_management.h"
#include "executor/executor.h"
#include "nodes/execnodes.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "nodes/parsenodes.h"
#include "nodes/plannodes.h"
#include "parser/parse_coerce.h"
#include "parser/parsetree.h"
#include "tcop/pquery.h"
#include "tcop/tcopprot.h"
#include "utils/lsyscache.h"
#include "utils/portal.h"
#include "utils/snapmgr.h"
static void ExecuteSelectIntoRelation(Oid targetRelationId, List *insertTargetList,
Query *selectQuery, EState *executorState);
static void ExecuteIntoDestReceiver(Query *query, ParamListInfo params,
DestReceiver *dest);
/*
* CoordinatorInsertSelectExecScan executes an INSERT INTO distributed_table
* SELECT .. query by setting up a DestReceiver that copies tuples into the
* distributed table and then executing the SELECT query using that DestReceiver
* as the tuple destination.
*/
TupleTableSlot *
CoordinatorInsertSelectExecScan(CustomScanState *node)
{
CitusScanState *scanState = (CitusScanState *) node;
TupleTableSlot *resultSlot = NULL;
if (!scanState->finishedRemoteScan)
{
EState *executorState = scanState->customScanState.ss.ps.state;
MultiPlan *multiPlan = scanState->multiPlan;
Query *selectQuery = multiPlan->insertSelectSubquery;
List *insertTargetList = multiPlan->insertTargetList;
Oid targetRelationId = multiPlan->targetRelationId;
ereport(DEBUG1, (errmsg("Collecting INSERT ... SELECT results on coordinator")));
ExecuteSelectIntoRelation(targetRelationId, insertTargetList, selectQuery,
executorState);
scanState->finishedRemoteScan = true;
}
resultSlot = ReturnTupleFromTuplestore(scanState);
return resultSlot;
}
/*
* ExecuteSelectIntoRelation executes given SELECT query and inserts the
* results into the target relation, which is assumed to be a distributed
* table.
*/
static void
ExecuteSelectIntoRelation(Oid targetRelationId, List *insertTargetList,
Query *selectQuery, EState *executorState)
{
ParamListInfo paramListInfo = executorState->es_param_list_info;
ListCell *insertTargetCell = NULL;
List *columnNameList = NIL;
bool stopOnFailure = false;
char partitionMethod = 0;
CitusCopyDestReceiver *copyDest = NULL;
BeginOrContinueCoordinatedTransaction();
partitionMethod = PartitionMethod(targetRelationId);
if (partitionMethod == DISTRIBUTE_BY_NONE)
{
stopOnFailure = true;
}
/* build the list of column names for the COPY statement */
foreach(insertTargetCell, insertTargetList)
{
TargetEntry *insertTargetEntry = (TargetEntry *) lfirst(insertTargetCell);
columnNameList = lappend(columnNameList, insertTargetEntry->resname);
}
/* set up a DestReceiver that copies into the distributed table */
copyDest = CreateCitusCopyDestReceiver(targetRelationId, columnNameList,
executorState, stopOnFailure);
ExecuteIntoDestReceiver(selectQuery, paramListInfo, (DestReceiver *) copyDest);
executorState->es_processed = copyDest->tuplesSent;
XactModificationLevel = XACT_MODIFICATION_DATA;
}
/*
* ExecuteIntoDestReceiver plans and executes a query and sends results to the given
* DestReceiver.
*/
static void
ExecuteIntoDestReceiver(Query *query, ParamListInfo params, DestReceiver *dest)
{
PlannedStmt *queryPlan = NULL;
Portal portal = NULL;
int eflags = 0;
int cursorOptions = 0;
long count = FETCH_ALL;
/* create a new portal for executing the query */
portal = CreateNewPortal();
/* don't display the portal in pg_cursors, it is for internal use only */
portal->visible = false;
#if (PG_VERSION_NUM >= 90600)
cursorOptions = CURSOR_OPT_PARALLEL_OK;
#endif
/* plan the subquery, this may be another distributed query */
queryPlan = pg_plan_query(query, cursorOptions, params);
PortalDefineQuery(portal,
NULL,
"",
"SELECT",
list_make1(queryPlan),
NULL);
PortalStart(portal, params, eflags, GetActiveSnapshot());
PortalRun(portal, count, false, dest, dest, NULL);
PortalDrop(portal, false);
}

View File

@ -15,6 +15,9 @@
#include "access/xact.h"
#include "catalog/dependency.h"
#include "catalog/namespace.h"
#include "distributed/insert_select_executor.h"
#include "distributed/insert_select_planner.h"
#include "distributed/multi_copy.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_master_planner.h"
#include "distributed/multi_planner.h"
@ -26,6 +29,7 @@
#include "executor/execdebug.h"
#include "commands/copy.h"
#include "nodes/makefuncs.h"
#include "parser/parsetree.h"
#include "storage/lmgr.h"
#include "tcop/utility.h"
#include "utils/snapmgr.h"
@ -80,6 +84,15 @@ static CustomExecMethods RouterSelectCustomExecMethods = {
.ExplainCustomScan = CitusExplainScan
};
static CustomExecMethods CoordinatorInsertSelectCustomExecMethods = {
.CustomName = "CoordinatorInsertSelectScan",
.BeginCustomScan = CitusSelectBeginScan,
.ExecCustomScan = CoordinatorInsertSelectExecScan,
.EndCustomScan = CitusEndScan,
.ReScanCustomScan = CitusReScan,
.ExplainCustomScan = CoordinatorInsertSelectExplainScan
};
/* local function forward declarations */
static void PrepareMasterJobDirectory(Job *workerJob);
@ -167,6 +180,25 @@ RouterCreateScan(CustomScan *scan)
}
/*
* CoordinatorInsertSelectCrateScan creates the scan state for executing
* INSERT..SELECT into a distributed table via the coordinator.
*/
Node *
CoordinatorInsertSelectCreateScan(CustomScan *scan)
{
CitusScanState *scanState = palloc0(sizeof(CitusScanState));
scanState->executorType = MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT;
scanState->customScanState.ss.ps.type = T_CustomScanState;
scanState->multiPlan = GetMultiPlan(scan);
scanState->customScanState.methods = &CoordinatorInsertSelectCustomExecMethods;
return (Node *) scanState;
}
/*
* DelayedErrorCreateScan is only called if we could not plan for the given
* query. This is the case when a plan is not ready for execution because

View File

@ -43,12 +43,10 @@ MultiExecutorType
JobExecutorType(MultiPlan *multiPlan)
{
Job *job = multiPlan->workerJob;
List *workerTaskList = job->taskList;
List *workerNodeList = ActiveWorkerNodeList();
int taskCount = list_length(workerTaskList);
int workerNodeCount = list_length(workerNodeList);
double tasksPerNode = taskCount / ((double) workerNodeCount);
int dependedJobCount = list_length(job->dependedJobList);
List *workerNodeList = NIL;
int workerNodeCount = 0;
int taskCount = 0;
double tasksPerNode = 0.;
MultiExecutorType executorType = TaskExecutorType;
bool routerExecutablePlan = multiPlan->routerExecutable;
@ -59,6 +57,11 @@ JobExecutorType(MultiPlan *multiPlan)
return MULTI_EXECUTOR_ROUTER;
}
if (multiPlan->insertSelectSubquery != NULL)
{
return MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT;
}
/* if it is not a router executable plan, inform user according to the log level */
if (MultiTaskQueryLogLevel != MULTI_TASK_QUERY_INFO_OFF)
{
@ -68,9 +71,15 @@ JobExecutorType(MultiPlan *multiPlan)
" queries on the workers.")));
}
workerNodeList = ActiveWorkerNodeList();
workerNodeCount = list_length(workerNodeList);
taskCount = list_length(job->taskList);
tasksPerNode = taskCount / ((double) workerNodeCount);
if (executorType == MULTI_EXECUTOR_REAL_TIME)
{
double reasonableConnectionCount = 0;
int dependedJobCount = 0;
/* if we need to open too many connections per worker, warn the user */
if (tasksPerNode >= MaxConnections)
@ -98,6 +107,7 @@ JobExecutorType(MultiPlan *multiPlan)
}
/* if we have repartition jobs with real time executor, error out */
dependedJobCount = list_length(job->dependedJobList);
if (dependedJobCount > 0)
{
ereport(ERROR, (errmsg("cannot use real time executor with repartition jobs"),

View File

@ -0,0 +1,254 @@
/*-------------------------------------------------------------------------
*
* insert_select_planner.c
*
* Planning logic for INSERT..SELECT.
*
* Copyright (c) 2017, Citus Data, Inc.
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/errormessage.h"
#include "distributed/insert_select_planner.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_logical_planner.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_router_planner.h"
#include "distributed/pg_dist_partition.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "nodes/parsenodes.h"
#include "optimizer/planner.h"
#include "parser/parsetree.h"
#include "parser/parse_coerce.h"
#include "parser/parse_relation.h"
#include "utils/lsyscache.h"
static DeferredErrorMessage * DeferErrorIfCoordinatorInsertSelectUnsupported(
Query *insertSelectQuery);
static Query * WrapSubquery(Query *subquery);
/*
* CreatteCoordinatorInsertSelectPlan creates a query plan for a SELECT into a
* distributed table. The query plan can also be executed on a worker in MX.
*/
MultiPlan *
CreateCoordinatorInsertSelectPlan(Query *parse)
{
Query *insertSelectQuery = copyObject(parse);
Query *selectQuery = NULL;
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery);
RangeTblEntry *insertRte = ExtractInsertRangeTableEntry(insertSelectQuery);
Oid targetRelationId = insertRte->relid;
ListCell *selectTargetCell = NULL;
ListCell *insertTargetCell = NULL;
MultiPlan *multiPlan = CitusMakeNode(MultiPlan);
multiPlan->operation = CMD_INSERT;
multiPlan->planningError =
DeferErrorIfCoordinatorInsertSelectUnsupported(insertSelectQuery);
if (multiPlan->planningError != NULL)
{
return multiPlan;
}
selectQuery = selectRte->subquery;
/*
* Wrap the SELECT as a subquery if the INSERT...SELECT has CTEs or the SELECT
* has top-level set operations.
*
* We could simply wrap all queries, but that might create a subquery that is
* not supported by the logical planner. Since the logical planner also does
* not support CTEs and top-level set operations, we can wrap queries containing
* those without breaking anything.
*/
if (list_length(insertSelectQuery->cteList) > 0)
{
selectQuery = WrapSubquery(selectRte->subquery);
/* copy CTEs from the INSERT ... SELECT statement into outer SELECT */
selectQuery->cteList = copyObject(insertSelectQuery->cteList);
}
else if (selectQuery->setOperations != NULL)
{
/* top-level set operations confuse the ReorderInsertSelectTargetLists logic */
selectQuery = WrapSubquery(selectRte->subquery);
}
selectRte->subquery = selectQuery;
ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte);
/* add casts when the SELECT output does not directly match the table */
forboth(insertTargetCell, insertSelectQuery->targetList,
selectTargetCell, selectQuery->targetList)
{
TargetEntry *insertTargetEntry = (TargetEntry *) lfirst(insertTargetCell);
TargetEntry *selectTargetEntry = (TargetEntry *) lfirst(selectTargetCell);
Var *columnVar = NULL;
Oid columnType = InvalidOid;
int32 columnTypeMod = 0;
Oid selectOutputType = InvalidOid;
/* indirection is not supported, e.g. INSERT INTO table (composite_column.x) */
if (!IsA(insertTargetEntry->expr, Var))
{
ereport(ERROR, (errmsg("can only handle regular columns in the target "
"list")));
}
columnVar = (Var *) insertTargetEntry->expr;
columnType = get_atttype(targetRelationId, columnVar->varattno);
columnTypeMod = get_atttypmod(targetRelationId, columnVar->varattno);
selectOutputType = columnVar->vartype;
/*
* If the type in the target list does not match the type of the column,
* we need to cast to the column type. PostgreSQL would do this
* automatically during the insert, but we're passing the SELECT
* output directly to COPY.
*/
if (columnType != selectOutputType)
{
Expr *selectExpression = selectTargetEntry->expr;
Expr *typeCastedSelectExpr =
(Expr *) coerce_to_target_type(NULL, (Node *) selectExpression,
selectOutputType, columnType,
columnTypeMod, COERCION_EXPLICIT,
COERCE_IMPLICIT_CAST, -1);
selectTargetEntry->expr = typeCastedSelectExpr;
}
}
multiPlan->insertSelectSubquery = selectQuery;
multiPlan->insertTargetList = insertSelectQuery->targetList;
multiPlan->targetRelationId = targetRelationId;
return multiPlan;
}
/*
* DeferErrorIfCoordinatorInsertSelectUnsupported returns an error if executing an
* INSERT ... SELECT command by pulling results of the SELECT to the coordinator
* is unsupported because it uses RETURNING, ON CONFLICT, or an append-distributed
* table.
*/
static DeferredErrorMessage *
DeferErrorIfCoordinatorInsertSelectUnsupported(Query *insertSelectQuery)
{
RangeTblEntry *insertRte = NULL;
RangeTblEntry *subqueryRte = NULL;
Query *subquery = NULL;
if (list_length(insertSelectQuery->returningList) > 0)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"RETURNING is not supported in INSERT ... SELECT via "
"coordinator", NULL, NULL);
}
if (insertSelectQuery->onConflict)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"ON CONFLICT is not supported in INSERT ... SELECT via "
"coordinator", NULL, NULL);
}
insertRte = ExtractInsertRangeTableEntry(insertSelectQuery);
if (PartitionMethod(insertRte->relid) == DISTRIBUTE_BY_APPEND)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"INSERT ... SELECT into an append-distributed table is "
"not supported", NULL, NULL);
}
subqueryRte = ExtractSelectRangeTableEntry(insertSelectQuery);
subquery = (Query *) subqueryRte->subquery;
if (NeedsDistributedPlanning(subquery) &&
contain_nextval_expression_walker((Node *) insertSelectQuery->targetList, NULL))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"INSERT ... SELECT cannot generate sequence values when "
"selecting from a distributed table",
NULL, NULL);
}
return NULL;
}
/*
* WrapSubquery wraps the given query as a subquery in a newly constructed
* "SELECT * FROM (...subquery...) citus_insert_select_subquery" query.
*/
static Query *
WrapSubquery(Query *subquery)
{
Query *outerQuery = NULL;
ParseState *pstate = make_parsestate(NULL);
Alias *selectAlias = NULL;
RangeTblEntry *newRangeTableEntry = NULL;
RangeTblRef *newRangeTableRef = NULL;
ListCell *selectTargetCell = NULL;
List *newTargetList = NIL;
outerQuery = makeNode(Query);
outerQuery->commandType = CMD_SELECT;
/* create range table entries */
selectAlias = makeAlias("citus_insert_select_subquery", NIL);
newRangeTableEntry = addRangeTableEntryForSubquery(pstate, subquery,
selectAlias, false, true);
outerQuery->rtable = list_make1(newRangeTableEntry);
/* set the FROM expression to the subquery */
newRangeTableRef = makeNode(RangeTblRef);
newRangeTableRef->rtindex = 1;
outerQuery->jointree = makeFromExpr(list_make1(newRangeTableRef), NULL);
/* create a target list that matches the SELECT */
foreach(selectTargetCell, subquery->targetList)
{
TargetEntry *selectTargetEntry = (TargetEntry *) lfirst(selectTargetCell);
Var *newSelectVar = NULL;
TargetEntry *newSelectTargetEntry = NULL;
/* exactly 1 entry in FROM */
int indexInRangeTable = 1;
if (selectTargetEntry->resjunk)
{
continue;
}
newSelectVar = makeVar(indexInRangeTable, selectTargetEntry->resno,
exprType((Node *) selectTargetEntry->expr),
exprTypmod((Node *) selectTargetEntry->expr),
exprCollation((Node *) selectTargetEntry->expr), 0);
newSelectTargetEntry = makeTargetEntry((Expr *) newSelectVar,
selectTargetEntry->resno,
selectTargetEntry->resname,
selectTargetEntry->resjunk);
newTargetList = lappend(newTargetList, newSelectTargetEntry);
}
outerQuery->targetList = newTargetList;
return outerQuery;
}

View File

@ -22,6 +22,7 @@
#include "optimizer/cost.h"
#include "distributed/citus_nodefuncs.h"
#include "distributed/connection_management.h"
#include "distributed/insert_select_planner.h"
#include "distributed/multi_client_executor.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_explain.h"
@ -44,7 +45,9 @@
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "tcop/utility.h"
#include "utils/builtins.h"
#include "utils/json.h"
#include "utils/lsyscache.h"
#include "utils/snapmgr.h"
@ -69,6 +72,8 @@ typedef struct RemoteExplainPlan
/* Explain functions for distributed queries */
static void CitusExplainOneQuery(Query *query, IntoClause *into, ExplainState *es,
const char *queryString, ParamListInfo params);
static void ExplainJob(Job *job, ExplainState *es);
static void ExplainMapMergeJob(MapMergeJob *mapMergeJob, ExplainState *es);
static void ExplainTaskList(List *taskList, ExplainState *es);
@ -710,3 +715,73 @@ ExplainYAMLLineStarting(ExplainState *es)
appendStringInfoSpaces(es->str, es->indent * 2);
}
}
/*
* CoordinatorInsertSelectExplainScan is a custom scan explain callback function
* which is used to print explain information of a Citus plan for an INSERT INTO
* distributed_table SELECT ... query that is evaluated on the coordinator.
*/
void
CoordinatorInsertSelectExplainScan(CustomScanState *node, List *ancestors,
struct ExplainState *es)
{
CitusScanState *scanState = (CitusScanState *) node;
MultiPlan *multiPlan = scanState->multiPlan;
Query *query = multiPlan->insertSelectSubquery;
IntoClause *into = NULL;
ParamListInfo params = NULL;
char *queryString = NULL;
if (es->analyze)
{
/* avoiding double execution here is tricky, error out for now */
ereport(ERROR, (errmsg("EXPLAIN ANALYZE is currently not supported for INSERT "
"... SELECT commands via the coordinator")));
}
ExplainOpenGroup("Select Query", "Select Query", false, es);
/* explain the inner SELECT query */
CitusExplainOneQuery(query, into, es, queryString, params);
ExplainCloseGroup("Select Query", "Select Query", false, es);
}
/*
* CitusExplainOneQuery is simply a duplicate of ExplainOneQuery in explain.c, which
* is static.
*/
static void
CitusExplainOneQuery(Query *query, IntoClause *into, ExplainState *es,
const char *queryString, ParamListInfo params)
{
/* copied from ExplainOneQuery in explain.c */
if (ExplainOneQuery_hook)
{
(*ExplainOneQuery_hook) (query, into, es, queryString, params);
}
else
{
PlannedStmt *plan;
instr_time planstart,
planduration;
int cursorOptions = 0;
INSTR_TIME_SET_CURRENT(planstart);
#if (PG_VERSION_NUM >= 90600)
cursorOptions = into ? 0 : CURSOR_OPT_PARALLEL_OK;
#endif
/* plan the query */
plan = pg_plan_query(query, cursorOptions, params);
INSTR_TIME_SET_CURRENT(planduration);
INSTR_TIME_SUBTRACT(planduration, planstart);
/* run it (if needed) and produce output */
ExplainOnePlan(plan, into, es, queryString, params, &planduration);
}
}

View File

@ -26,6 +26,7 @@
#include "distributed/multi_logical_planner.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/relation_restriction_equivalence.h"
#include "distributed/multi_router_planner.h"
#include "distributed/worker_protocol.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
@ -2662,6 +2663,15 @@ NeedsDistributedPlanning(Query *queryTree)
return false;
}
/*
* We can handle INSERT INTO distributed_table SELECT ... even if the SELECT
* part references local tables, so skip the remaining checks.
*/
if (InsertSelectIntoDistributedTable(queryTree))
{
return true;
}
/* extract range table entries for simple relations only */
ExtractRangeTableRelationWalker((Node *) queryTree, &rangeTableList);

View File

@ -16,6 +16,7 @@
#include "distributed/citus_nodefuncs.h"
#include "distributed/citus_nodes.h"
#include "distributed/insert_select_planner.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_planner.h"
@ -51,6 +52,11 @@ static CustomScanMethods RouterCustomScanMethods = {
RouterCreateScan
};
static CustomScanMethods CoordinatorInsertSelectCustomScanMethods = {
"Citus INSERT ... SELECT via coordinator",
CoordinatorInsertSelectCreateScan
};
static CustomScanMethods DelayedErrorCustomScanMethods = {
"Citus Delayed Error",
DelayedErrorCreateScan
@ -225,7 +231,7 @@ IsModifyCommand(Query *query)
CmdType commandType = query->commandType;
if (commandType == CMD_INSERT || commandType == CMD_UPDATE ||
commandType == CMD_DELETE || query->hasModifyingCTE)
commandType == CMD_DELETE)
{
return true;
}
@ -273,9 +279,25 @@ CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query
if (IsModifyCommand(query))
{
/* modifications are always routed through the same planner/executor */
distributedPlan =
CreateModifyPlan(originalQuery, query, plannerRestrictionContext);
if (InsertSelectIntoDistributedTable(originalQuery))
{
distributedPlan = CreateDistributedInsertSelectPlan(originalQuery,
plannerRestrictionContext);
if (distributedPlan->planningError != NULL)
{
RaiseDeferredError(distributedPlan->planningError, DEBUG1);
/* if INSERT..SELECT cannot be distributed, pull to coordinator */
distributedPlan = CreateCoordinatorInsertSelectPlan(originalQuery);
}
}
else
{
/* modifications are always routed through the same planner/executor */
distributedPlan =
CreateModifyPlan(originalQuery, query, plannerRestrictionContext);
}
Assert(distributedPlan);
}
@ -494,6 +516,12 @@ FinalizePlan(PlannedStmt *localPlan, MultiPlan *multiPlan)
break;
}
case MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT:
{
customScan->methods = &CoordinatorInsertSelectCustomScanMethods;
break;
}
default:
{
customScan->methods = &DelayedErrorCustomScanMethods;
@ -506,7 +534,6 @@ FinalizePlan(PlannedStmt *localPlan, MultiPlan *multiPlan)
customScan->custom_private = list_make1(multiPlanData);
customScan->flags = CUSTOMPATH_SUPPORT_BACKWARD_SCAN;
/* check if we have a master query */
if (multiPlan->masterQuery)
{
finalPlan = FinalizeNonRouterPlan(localPlan, multiPlan, customScan);

View File

@ -86,9 +86,6 @@ static MultiPlan * CreateSingleTaskRouterPlan(Query *originalQuery,
Query *query,
RelationRestrictionContext *
restrictionContext);
static MultiPlan * CreateInsertSelectRouterPlan(Query *originalQuery,
PlannerRestrictionContext *
plannerRestrictionContext);
static bool SafeToPushDownSubquery(PlannerRestrictionContext *plannerRestrictionContext,
Query *originalQuery);
static Task * RouterModifyTaskForShardInterval(Query *originalQuery,
@ -170,92 +167,33 @@ MultiPlan *
CreateModifyPlan(Query *originalQuery, Query *query,
PlannerRestrictionContext *plannerRestrictionContext)
{
if (InsertSelectQuery(originalQuery))
{
return CreateInsertSelectRouterPlan(originalQuery, plannerRestrictionContext);
}
else
{
RelationRestrictionContext *relationRestrictionContext =
plannerRestrictionContext->relationRestrictionContext;
return CreateSingleTaskRouterPlan(originalQuery, query,
relationRestrictionContext);
}
}
/*
* CreateSingleTaskRouterPlan creates a physical plan for given query. The created plan is
* either a modify task that changes a single shard, or a router task that returns
* query results from a single worker. Supported modify queries (insert/update/delete)
* are router plannable by default. If query is not router plannable then either NULL is
* returned, or the returned plan has planningError set to a description of the problem.
*/
static MultiPlan *
CreateSingleTaskRouterPlan(Query *originalQuery, Query *query,
RelationRestrictionContext *restrictionContext)
{
CmdType commandType = query->commandType;
bool modifyTask = false;
Job *job = NULL;
Oid distributedTableId = ExtractFirstDistributedTableId(originalQuery);
ShardInterval *targetShardInterval = NULL;
Task *task = NULL;
Job *job = NULL;
List *placementList = NIL;
MultiPlan *multiPlan = CitusMakeNode(MultiPlan);
multiPlan->operation = query->commandType;
if (commandType == CMD_INSERT || commandType == CMD_UPDATE ||
commandType == CMD_DELETE)
multiPlan->planningError = ModifyQuerySupported(query);
if (multiPlan->planningError != NULL)
{
modifyTask = true;
return multiPlan;
}
if (modifyTask)
targetShardInterval = TargetShardIntervalForModify(distributedTableId, query,
&multiPlan->planningError);
if (multiPlan->planningError != NULL)
{
Oid distributedTableId = ExtractFirstDistributedTableId(originalQuery);
ShardInterval *targetShardInterval = NULL;
DeferredErrorMessage *planningError = NULL;
/* FIXME: this should probably rather be inlined into CreateModifyPlan */
planningError = ModifyQuerySupported(query);
if (planningError != NULL)
{
multiPlan->planningError = planningError;
return multiPlan;
}
targetShardInterval = TargetShardIntervalForModify(distributedTableId, query,
&planningError);
if (planningError != NULL)
{
multiPlan->planningError = planningError;
return multiPlan;
}
task = RouterModifyTask(distributedTableId, originalQuery, targetShardInterval);
Assert(task);
}
else
{
/* FIXME: this should probably rather be inlined into CreateSelectPlan */
multiPlan->planningError = ErrorIfQueryHasModifyingCTE(query);
if (multiPlan->planningError)
{
return multiPlan;
}
task = RouterSelectTask(originalQuery, restrictionContext, &placementList);
return multiPlan;
}
if (task == NULL)
{
return NULL;
}
task = RouterModifyTask(distributedTableId, originalQuery, targetShardInterval);
ereport(DEBUG2, (errmsg("Creating router plan")));
job = RouterQueryJob(originalQuery, task, placementList);
multiPlan->workerJob = job;
multiPlan->masterQuery = NULL;
multiPlan->routerExecutable = true;
@ -270,15 +208,59 @@ CreateSingleTaskRouterPlan(Query *originalQuery, Query *query,
}
/*
* CreateSingleTaskRouterPlan creates a physical plan for given query. The created plan is
* either a modify task that changes a single shard, or a router task that returns
* query results from a single worker. Supported modify queries (insert/update/delete)
* are router plannable by default. If query is not router plannable then either NULL is
* returned, or the returned plan has planningError set to a description of the problem.
*/
static MultiPlan *
CreateSingleTaskRouterPlan(Query *originalQuery, Query *query,
RelationRestrictionContext *restrictionContext)
{
Job *job = NULL;
Task *task = NULL;
List *placementList = NIL;
MultiPlan *multiPlan = CitusMakeNode(MultiPlan);
multiPlan->operation = query->commandType;
/* FIXME: this should probably rather be inlined into CreateRouterPlan */
multiPlan->planningError = ErrorIfQueryHasModifyingCTE(query);
if (multiPlan->planningError)
{
return multiPlan;
}
task = RouterSelectTask(originalQuery, restrictionContext, &placementList);
if (task == NULL)
{
return NULL;
}
ereport(DEBUG2, (errmsg("Creating router plan")));
job = RouterQueryJob(originalQuery, task, placementList);
multiPlan->workerJob = job;
multiPlan->masterQuery = NULL;
multiPlan->routerExecutable = true;
multiPlan->hasReturning = false;
return multiPlan;
}
/*
* Creates a router plan for INSERT ... SELECT queries which could consists of
* multiple tasks.
*
* The function never returns NULL, it errors out if cannot create the multi plan.
*/
static MultiPlan *
CreateInsertSelectRouterPlan(Query *originalQuery,
PlannerRestrictionContext *plannerRestrictionContext)
MultiPlan *
CreateDistributedInsertSelectPlan(Query *originalQuery,
PlannerRestrictionContext *plannerRestrictionContext)
{
int shardOffset = 0;
List *sqlTaskList = NIL;
@ -759,9 +741,12 @@ ExtractSelectRangeTableEntry(Query *query)
RangeTblRef *reference = NULL;
RangeTblEntry *subqueryRte = NULL;
Assert(InsertSelectQuery(query));
Assert(InsertSelectIntoDistributedTable(query));
/* since we already asserted InsertSelectQuery() it is safe to access both lists */
/*
* Since we already asserted InsertSelectIntoDistributedTable() it is safe to access
* both lists
*/
fromList = query->jointree->fromlist;
reference = linitial(fromList);
subqueryRte = rt_fetch(reference->rtindex, query->rtable);
@ -782,8 +767,6 @@ ExtractInsertRangeTableEntry(Query *query)
List *rangeTableList = query->rtable;
RangeTblEntry *insertRTE = NULL;
AssertArg(InsertSelectQuery(query));
insertRTE = rt_fetch(resultRelation, rangeTableList);
return insertRTE;
@ -806,9 +789,25 @@ InsertSelectQuerySupported(Query *queryTree, RangeTblEntry *insertRte,
DeferredErrorMessage *error = NULL;
/* we only do this check for INSERT ... SELECT queries */
AssertArg(InsertSelectQuery(queryTree));
AssertArg(InsertSelectIntoDistributedTable(queryTree));
EnsureCoordinator();
subquery = subqueryRte->subquery;
if (!NeedsDistributedPlanning(subquery))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"distributed INSERT ... SELECT can only select from "
"distributed tables",
NULL, NULL);
}
if (GetLocalGroupId() != 0)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"distributed INSERT ... SELECT can only be performed from "
"the coordinator",
NULL, NULL);
}
/* we do not expect to see a view in modify target */
foreach(rangeTableCell, queryTree->rtable)
@ -823,13 +822,11 @@ InsertSelectQuerySupported(Query *queryTree, RangeTblEntry *insertRte,
}
}
subquery = subqueryRte->subquery;
if (contain_volatile_functions((Node *) queryTree))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"volatile functions are not allowed in INSERT ... SELECT "
"queries",
"volatile functions are not allowed in distributed "
"INSERT ... SELECT queries",
NULL, NULL);
}
@ -850,7 +847,7 @@ InsertSelectQuerySupported(Query *queryTree, RangeTblEntry *insertRte,
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"only reference tables may be queried when targeting "
"a reference table with INSERT ... SELECT",
"a reference table with distributed INSERT ... SELECT",
NULL, NULL);
}
}
@ -875,7 +872,7 @@ InsertSelectQuerySupported(Query *queryTree, RangeTblEntry *insertRte,
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"INSERT target table and the source relation of the SELECT partition "
"column value must be colocated",
"column value must be colocated in distributed INSERT ... SELECT",
NULL, NULL);
}
}
@ -906,7 +903,7 @@ MultiTaskRouterSelectQuerySupported(Query *query)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"Subqueries without relations are not allowed in "
"INSERT ... SELECT queries",
"distributed INSERT ... SELECT queries",
NULL, NULL);
}
@ -914,8 +911,8 @@ MultiTaskRouterSelectQuerySupported(Query *query)
if (subquery->limitCount != NULL)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"LIMIT clauses are not allowed in INSERT ... SELECT "
"queries",
"LIMIT clauses are not allowed in distirbuted INSERT "
"... SELECT queries",
NULL, NULL);
}
@ -923,8 +920,8 @@ MultiTaskRouterSelectQuerySupported(Query *query)
if (subquery->limitOffset != NULL)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"OFFSET clauses are not allowed in INSERT ... SELECT "
"queries",
"OFFSET clauses are not allowed in distributed "
"INSERT ... SELECT queries",
NULL, NULL);
}
@ -936,16 +933,16 @@ MultiTaskRouterSelectQuerySupported(Query *query)
if (subquery->windowClause != NULL)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"window functions are not allowed in INSERT ... SELECT "
"queries",
"window functions are not allowed in distributed "
"INSERT ... SELECT queries",
NULL, NULL);
}
if (subquery->setOperations != NULL)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"Set operations are not allowed in INSERT ... SELECT "
"queries",
"Set operations are not allowed in distributed "
"INSERT ... SELECT queries",
NULL, NULL);
}
@ -958,8 +955,8 @@ MultiTaskRouterSelectQuerySupported(Query *query)
if (subquery->groupingSets != NULL)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"grouping sets are not allowed in INSERT ... SELECT "
"queries",
"grouping sets are not allowed in distributed "
"INSERT ... SELECT queries",
NULL, NULL);
}
@ -970,7 +967,7 @@ MultiTaskRouterSelectQuerySupported(Query *query)
if (subquery->hasDistinctOn)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"DISTINCT ON clauses are not allowed in "
"DISTINCT ON clauses are not allowed in distributed "
"INSERT ... SELECT queries",
NULL, NULL);
}
@ -1152,8 +1149,9 @@ InsertPartitionColumnMatchesSelect(Query *query, RangeTblEntry *insertRte,
}
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"INSERT INTO ... SELECT partition columns in the source "
"table and subquery do not match",
"cannot perform distributed INSERT INTO ... SELECT "
"because the partition columns in the source table "
"and subquery do not match",
psprintf(errorDetailTemplate, exprDescription),
"Ensure the target table's partition column has a "
"corresponding simple column reference to a distributed "
@ -1167,8 +1165,9 @@ InsertPartitionColumnMatchesSelect(Query *query, RangeTblEntry *insertRte,
if (!IsA(targetEntry->expr, Var))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"INSERT INTO ... SELECT partition columns in the source "
"table and subquery do not match",
"cannot perform distributed INSERT INTO ... SELECT "
"because the partition columns in the source table "
"and subquery do not match",
"The data type of the target table's partition column "
"should exactly match the data type of the "
"corresponding simple column reference in the subquery.",
@ -1179,13 +1178,15 @@ InsertPartitionColumnMatchesSelect(Query *query, RangeTblEntry *insertRte,
if (!IsPartitionColumn(selectTargetExpr, subquery))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"INSERT INTO ... SELECT partition columns in the source "
"table and subquery do not match",
"cannot perform distributed INSERT INTO ... SELECT "
"becuase the partition columns in the source table "
"and subquery do not match",
"The target table's partition column should correspond "
"to a partition column in the subquery.",
NULL);
}
/* finally, check that the select target column is a partition column */
/* we can set the select relation id */
*selectPartitionColumnTableId = subqueryPartitionColumnRelationId;
@ -1195,8 +1196,9 @@ InsertPartitionColumnMatchesSelect(Query *query, RangeTblEntry *insertRte,
if (!targetTableHasPartitionColumn)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"INSERT INTO ... SELECT partition columns in the source "
"table and subquery do not match",
"cannot perform distributed INSERT INTO ... SELECT "
"because the partition columns in the source table "
"and subquery do not match",
"the query doesn't include the target table's "
"partition column",
NULL);
@ -1228,8 +1230,6 @@ ModifyQuerySupported(Query *queryTree)
Node *onConflictWhere = NULL;
CmdType commandType = queryTree->commandType;
Assert(commandType == CMD_INSERT || commandType == CMD_UPDATE ||
commandType == CMD_DELETE);
/*
* Reject subqueries which are in SELECT or WHERE clause.
@ -2749,7 +2749,7 @@ ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte,
int subqueryTargetLength = 0;
int targetEntryIndex = 0;
AssertArg(InsertSelectQuery(originalQuery));
AssertArg(InsertSelectIntoDistributedTable(originalQuery));
subquery = subqueryRte->subquery;
@ -2885,8 +2885,9 @@ ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte,
/*
* InsertSelectQuery returns true when the input query
* is INSERT INTO ... SELECT kind of query.
* InsertSelectIntoDistributedTable returns true when the input query is an
* INSERT INTO ... SELECT kind of query and the target is a distributed
* table.
*
* Note that the input query should be the original parsetree of
* the query (i.e., not passed trough the standard planner).
@ -2895,12 +2896,13 @@ ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte,
* rewrite/rewriteManip.c.
*/
bool
InsertSelectQuery(Query *query)
InsertSelectIntoDistributedTable(Query *query)
{
CmdType commandType = query->commandType;
List *fromList = NULL;
RangeTblRef *rangeTableReference = NULL;
RangeTblEntry *subqueryRte = NULL;
RangeTblEntry *insertRte = NULL;
if (commandType != CMD_INSERT)
{
@ -2919,7 +2921,10 @@ InsertSelectQuery(Query *query)
}
rangeTableReference = linitial(fromList);
Assert(IsA(rangeTableReference, RangeTblRef));
if (!IsA(rangeTableReference, RangeTblRef))
{
return false;
}
subqueryRte = rt_fetch(rangeTableReference->rtindex, query->rtable);
if (subqueryRte->rtekind != RTE_SUBQUERY)
@ -2930,6 +2935,12 @@ InsertSelectQuery(Query *query)
/* ensure that there is a query */
Assert(IsA(subqueryRte->subquery, Query));
insertRte = ExtractInsertRangeTableEntry(query);
if (!IsDistributedTable(insertRte->relid))
{
return false;
}
return true;
}

View File

@ -60,7 +60,7 @@ deparse_shard_query_test(PG_FUNCTION_ARGS)
StringInfo buffer = makeStringInfo();
/* reoreder the target list only for INSERT .. SELECT queries */
if (InsertSelectQuery(query))
if (InsertSelectIntoDistributedTable(query))
{
RangeTblEntry *insertRte = linitial(query->rtable);
RangeTblEntry *subqueryRte = lsecond(query->rtable);

View File

@ -104,7 +104,7 @@ ExecuteMasterEvaluableFunctions(Query *query, PlanState *planState)
ListCell *rteCell = NULL;
ListCell *cteCell = NULL;
Node *modifiedNode = NULL;
bool insertSelectQuery = InsertSelectQuery(query);
bool insertSelectQuery = InsertSelectIntoDistributedTable(query);
if (query->jointree && query->jointree->quals)
{

View File

@ -282,6 +282,11 @@ OutMultiPlan(OUTFUNC_ARGS)
WRITE_NODE_FIELD(workerJob);
WRITE_NODE_FIELD(masterQuery);
WRITE_BOOL_FIELD(routerExecutable);
WRITE_NODE_FIELD(insertSelectSubquery);
WRITE_NODE_FIELD(insertTargetList);
WRITE_OID_FIELD(targetRelationId);
WRITE_NODE_FIELD(planningError);
}

View File

@ -190,6 +190,11 @@ ReadMultiPlan(READFUNC_ARGS)
READ_NODE_FIELD(workerJob);
READ_NODE_FIELD(masterQuery);
READ_BOOL_FIELD(routerExecutable);
READ_NODE_FIELD(insertSelectSubquery);
READ_NODE_FIELD(insertTargetList);
READ_OID_FIELD(targetRelationId);
READ_NODE_FIELD(planningError);
READ_DONE();

View File

@ -59,7 +59,7 @@
static void AppendOptionListToString(StringInfo stringData, List *options);
static const char * convert_aclright_to_string(int aclright);
static bool contain_nextval_expression_walker(Node *node, void *context);
/*
* pg_get_extensiondef_string finds the foreign data wrapper that corresponds to
@ -989,7 +989,7 @@ convert_aclright_to_string(int aclright)
* contain_nextval_expression_walker walks over expression tree and returns
* true if it contains call to 'nextval' function.
*/
static bool
bool
contain_nextval_expression_walker(Node *node, void *context)
{
if (node == NULL)

View File

@ -36,6 +36,7 @@ extern void deparse_shard_index_statement(IndexStmt *origStmt, Oid distrelid,
int64 shardid, StringInfo buffer);
extern char * pg_get_indexclusterdef_string(Oid indexRelationId);
extern List * pg_get_table_grants(Oid relationId);
extern bool contain_nextval_expression_walker(Node *node, void *context);
/* Function declarations for version dependent PostgreSQL ruleutils functions */
extern void pg_get_query_def(Query *query, StringInfo buffer);

View File

@ -0,0 +1,23 @@
/*-------------------------------------------------------------------------
*
* insert_select_executor.h
*
* Declarations for public functions and types related to executing
* INSERT..SELECT commands.
*
* Copyright (c) 2017, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef INSERT_SELECT_EXECUTOR_H
#define INSERT_SELECT_EXECUTOR_H
#include "executor/execdesc.h"
extern TupleTableSlot * CoordinatorInsertSelectExecScan(CustomScanState *node);
#endif /* INSERT_SELECT_EXECUTOR_H */

View File

@ -0,0 +1,29 @@
/*-------------------------------------------------------------------------
*
* insert_select_planner.h
*
* Declarations for public functions and types related to planning
* INSERT..SELECT commands.
*
* Copyright (c) 2017, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef INSERT_SELECT_PLANNER_H
#define INSERT_SELECT_PLANNER_H
#include "postgres.h"
#include "distributed/multi_physical_planner.h"
#include "nodes/execnodes.h"
#include "nodes/parsenodes.h"
#include "nodes/plannodes.h"
extern MultiPlan * CreateCoordinatorInsertSelectPlan(Query *originalQuery);
extern void CoordinatorInsertSelectExplainScan(CustomScanState *node, List *ancestors,
struct ExplainState *es);
#endif /* INSERT_SELECT_PLANNER_H */

View File

@ -84,6 +84,9 @@ typedef struct CitusCopyDestReceiver
/* state on how to copy out data types */
CopyOutState copyOutState;
FmgrInfo *columnOutputFunctions;
/* number of tuples sent */
int64 tuplesSent;
} CitusCopyDestReceiver;

View File

@ -38,6 +38,7 @@ typedef struct CitusScanState
extern Node * RealTimeCreateScan(CustomScan *scan);
extern Node * TaskTrackerCreateScan(CustomScan *scan);
extern Node * RouterCreateScan(CustomScan *scan);
extern Node * CoordinatorInsertSelectCreateScan(CustomScan *scan);
extern Node * DelayedErrorCreateScan(CustomScan *scan);
extern void CitusSelectBeginScan(CustomScanState *node, EState *estate, int eflags);
extern TupleTableSlot * RealTimeExecScan(CustomScanState *node);

View File

@ -221,6 +221,11 @@ typedef struct MultiPlan
Query *masterQuery;
bool routerExecutable;
/* INSERT ... SELECT via coordinator only */
Query *insertSelectSubquery;
List *insertTargetList;
Oid targetRelationId;
/*
* NULL if this a valid plan, an error description otherwise. This will
* e.g. be set if SQL features are present that a planner doesn't support,

View File

@ -35,6 +35,9 @@ extern bool RouterSelectQuery(Query *originalQuery,
RelationRestrictionContext *restrictionContext,
List **placementList, uint64 *anchorShardId,
List **relationShardList, bool replacePrunedQueryWithDummy);
extern MultiPlan * CreateDistributedInsertSelectPlan(Query *originalQuery,
PlannerRestrictionContext *
plannerRestrictionContext);
extern DeferredErrorMessage * ModifyQuerySupported(Query *queryTree);
extern Query * ReorderInsertSelectTargetLists(Query *originalQuery,
RangeTblEntry *insertRte,
@ -43,7 +46,7 @@ extern List * ShardIntervalOpExpressions(ShardInterval *shardInterval, Index rte
extern RelationRestrictionContext * CopyRelationRestrictionContext(
RelationRestrictionContext *oldContext);
extern bool InsertSelectQuery(Query *query);
extern bool InsertSelectIntoDistributedTable(Query *query);
extern Oid ExtractFirstDistributedTableId(Query *query);
extern RangeTblEntry * ExtractSelectRangeTableEntry(Query *query);
extern RangeTblEntry * ExtractInsertRangeTableEntry(Query *query);

View File

@ -94,7 +94,8 @@ typedef enum
MULTI_EXECUTOR_INVALID_FIRST = 0,
MULTI_EXECUTOR_REAL_TIME = 1,
MULTI_EXECUTOR_TASK_TRACKER = 2,
MULTI_EXECUTOR_ROUTER = 3
MULTI_EXECUTOR_ROUTER = 3,
MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT = 4
} MultiExecutorType;

View File

@ -69,9 +69,13 @@ FROM (
) t2 ON (t1.user_id = t2.user_id)
GROUP BY t1.user_id, hasdone_event
) t GROUP BY user_id, hasdone_event;
ERROR: Set operations are not allowed in INSERT ... SELECT queries
-- get some statistics from the aggregated results to ensure the results are correct
-- SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
count | count | avg
-------+-------+---------------------
8 | 8 | 16.1250000000000000
(1 row)
------------------------------------
------------------------------------
-- Funnel, grouped by the number of times a user has done an event
@ -143,9 +147,13 @@ GROUP BY
count_pay, user_id
ORDER BY
count_pay;
ERROR: Set operations are not allowed in INSERT ... SELECT queries
-- get some statistics from the aggregated results to ensure the results are correct
-- SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
count | count | avg
-------+-------+---------------------
8 | 8 | 45.0000000000000000
(1 row)
------------------------------------
------------------------------------
-- Most recently seen users_table events_table

View File

@ -101,7 +101,6 @@ FROM (
WHERE t1.user_id = 20
GROUP BY t1.user_id, hasdone_event
) t GROUP BY user_id, hasdone_event;
ERROR: Set operations are not allowed in INSERT ... SELECT queries
------------------------------------
------------------------------------
-- Funnel grouped by whether or not a user has done an event -- two shards query
@ -145,9 +144,13 @@ FROM (
WHERE (t1.user_id = 20 OR t1.user_id = 17)
GROUP BY t1.user_id, hasdone_event
) t GROUP BY user_id, hasdone_event;
ERROR: Set operations are not allowed in INSERT ... SELECT queries
-- get some statistics from the aggregated results to ensure the results are correct
-- SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results_second;
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results_second;
count | count | avg
-------+-------+---------------------
2 | 2 | 18.5000000000000000
(1 row)
------------------------------------
------------------------------------
-- Most recently seen users_table events_table -- single shard query

View File

@ -985,3 +985,62 @@ Custom Scan (Citus Router)
-> Seq Scan on explain_table_570001 explain_table
Filter: (id = 1)
ROLLBACK;
-- test explain with local INSERT ... SELECT
EXPLAIN (COSTS OFF)
INSERT INTO lineitem_hash_part
SELECT o_orderkey FROM orders_hash_part LIMIT 3;
Custom Scan (Citus INSERT ... SELECT via coordinator)
-> Limit
-> Custom Scan (Citus Real-Time)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=57637 dbname=regression
-> Limit
-> Seq Scan on orders_hash_part_360295 orders_hash_part
SELECT true AS valid FROM explain_json($$
INSERT INTO lineitem_hash_part (l_orderkey)
SELECT o_orderkey FROM orders_hash_part LIMIT 3;
$$);
t
EXPLAIN (COSTS OFF)
INSERT INTO lineitem_hash_part (l_orderkey, l_quantity)
SELECT o_orderkey, 5 FROM orders_hash_part LIMIT 3;
Custom Scan (Citus INSERT ... SELECT via coordinator)
-> Limit
-> Custom Scan (Citus Real-Time)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=57637 dbname=regression
-> Limit
-> Seq Scan on orders_hash_part_360295 orders_hash_part
EXPLAIN (COSTS OFF)
INSERT INTO lineitem_hash_part (l_orderkey)
SELECT s FROM generate_series(1,5) s;
Custom Scan (Citus INSERT ... SELECT via coordinator)
-> Function Scan on generate_series s
EXPLAIN (COSTS OFF)
WITH cte1 AS (SELECT s FROM generate_series(1,10) s)
INSERT INTO lineitem_hash_part
WITH cte1 AS (SELECT * FROM cte1 LIMIT 5)
SELECT s FROM cte1;
Custom Scan (Citus INSERT ... SELECT via coordinator)
-> Subquery Scan on citus_insert_select_subquery
CTE cte1
-> Function Scan on generate_series s
-> CTE Scan on cte1
CTE cte1
-> Limit
-> CTE Scan on cte1 cte1_1
EXPLAIN (COSTS OFF)
INSERT INTO lineitem_hash_part
( SELECT s FROM generate_series(1,5) s) UNION
( SELECT s FROM generate_series(5,10) s);
Custom Scan (Citus INSERT ... SELECT via coordinator)
-> Subquery Scan on citus_insert_select_subquery
-> HashAggregate
Group Key: s.s
-> Append
-> Function Scan on generate_series s
-> Function Scan on generate_series s_1

View File

@ -940,3 +940,62 @@ Custom Scan (Citus Router)
-> Seq Scan on explain_table_570001 explain_table
Filter: (id = 1)
ROLLBACK;
-- test explain with local INSERT ... SELECT
EXPLAIN (COSTS OFF)
INSERT INTO lineitem_hash_part
SELECT o_orderkey FROM orders_hash_part LIMIT 3;
Custom Scan (Citus INSERT ... SELECT via coordinator)
-> Limit
-> Custom Scan (Citus Real-Time)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=57637 dbname=regression
-> Limit
-> Seq Scan on orders_hash_part_360295 orders_hash_part
SELECT true AS valid FROM explain_json($$
INSERT INTO lineitem_hash_part (l_orderkey)
SELECT o_orderkey FROM orders_hash_part LIMIT 3;
$$);
t
EXPLAIN (COSTS OFF)
INSERT INTO lineitem_hash_part (l_orderkey, l_quantity)
SELECT o_orderkey, 5 FROM orders_hash_part LIMIT 3;
Custom Scan (Citus INSERT ... SELECT via coordinator)
-> Limit
-> Custom Scan (Citus Real-Time)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=57637 dbname=regression
-> Limit
-> Seq Scan on orders_hash_part_360295 orders_hash_part
EXPLAIN (COSTS OFF)
INSERT INTO lineitem_hash_part (l_orderkey)
SELECT s FROM generate_series(1,5) s;
Custom Scan (Citus INSERT ... SELECT via coordinator)
-> Function Scan on generate_series s
EXPLAIN (COSTS OFF)
WITH cte1 AS (SELECT s FROM generate_series(1,10) s)
INSERT INTO lineitem_hash_part
WITH cte1 AS (SELECT * FROM cte1 LIMIT 5)
SELECT s FROM cte1;
Custom Scan (Citus INSERT ... SELECT via coordinator)
-> Subquery Scan on citus_insert_select_subquery
CTE cte1
-> Function Scan on generate_series s
-> CTE Scan on cte1
CTE cte1
-> Limit
-> CTE Scan on cte1 cte1_1
EXPLAIN (COSTS OFF)
INSERT INTO lineitem_hash_part
( SELECT s FROM generate_series(1,5) s) UNION
( SELECT s FROM generate_series(5,10) s);
Custom Scan (Citus INSERT ... SELECT via coordinator)
-> Subquery Scan on citus_insert_select_subquery
-> HashAggregate
Group Key: s.s
-> Append
-> Function Scan on generate_series s
-> Function Scan on generate_series s_1

View File

@ -141,20 +141,6 @@ WARNING: function public.evaluate_on_master(integer) does not exist
WARNING: function public.evaluate_on_master(integer) does not exist
ERROR: could not modify any active placements
\set VERBOSITY default
-- volatile functions should be disallowed
INSERT INTO raw_events_second (user_id, value_1)
SELECT
user_id, (random()*10)::int
FROM
raw_events_first;
ERROR: volatile functions are not allowed in INSERT ... SELECT queries
INSERT INTO raw_events_second (user_id, value_1)
WITH sub_cte AS (SELECT (random()*10)::int)
SELECT
user_id, (SELECT * FROM sub_cte)
FROM
raw_events_first;
ERROR: volatile functions are not allowed in INSERT ... SELECT queries
-- add one more row
INSERT INTO raw_events_first (user_id, time) VALUES
(7, now());
@ -645,7 +631,10 @@ INSERT INTO agg_events (value_1_agg, user_id)
DISTINCT ON (value_1) value_1, user_id
FROM
raw_events_first;
ERROR: DISTINCT ON clauses are not allowed in INSERT ... SELECT queries
DEBUG: DISTINCT ON clauses are not allowed in distributed INSERT ... SELECT queries
DEBUG: Collecting INSERT ... SELECT results on coordinator
ERROR: could not run distributed query with DISTINCT clause
HINT: Consider using an equality filter on the distributed table's partition column.
-- We do not support some CTEs
WITH fist_table_agg AS
(SELECT sum(value_1) as v1_agg, user_id FROM raw_events_first GROUP BY user_id)
@ -655,8 +644,10 @@ INSERT INTO agg_events
v1_agg, user_id
FROM
fist_table_agg;
ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DETAIL: The target table's partition column should correspond to a partition column in the subquery.
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
ERROR: could not run distributed query with complex table expressions
HINT: Consider using an equality filter on the distributed table's partition column.
-- We don't support CTEs that consist of const values as well
INSERT INTO agg_events
WITH sub_cte AS (SELECT 1)
@ -664,8 +655,12 @@ INSERT INTO agg_events
raw_events_first.user_id, (SELECT * FROM sub_cte)
FROM
raw_events_first;
ERROR: Subqueries without relations are not allowed in INSERT ... SELECT queries
-- We do not support any set operations
DEBUG: Subqueries without relations are not allowed in distributed INSERT ... SELECT queries
DEBUG: Collecting INSERT ... SELECT results on coordinator
ERROR: could not run distributed query with common table expressions
HINT: Consider using an equality filter on the distributed table's partition column.
-- We support set operations via the coordinator
BEGIN;
INSERT INTO
raw_events_first(user_id)
SELECT
@ -673,14 +668,19 @@ SELECT
FROM
((SELECT user_id FROM raw_events_first) UNION
(SELECT user_id FROM raw_events_second)) as foo;
ERROR: Set operations are not allowed in INSERT ... SELECT queries
DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries
DEBUG: Collecting INSERT ... SELECT results on coordinator
ROLLBACK;
-- We do not support any set operations
INSERT INTO
raw_events_first(user_id)
(SELECT user_id FROM raw_events_first) INTERSECT
(SELECT user_id FROM raw_events_first);
ERROR: Set operations are not allowed in INSERT ... SELECT queries
-- We do not support any set operations
DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries
DEBUG: Collecting INSERT ... SELECT results on coordinator
ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys
DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator.
-- If the query is router plannable then it is executed via the coordinator
INSERT INTO
raw_events_first(user_id)
SELECT
@ -688,7 +688,10 @@ SELECT
FROM
((SELECT user_id FROM raw_events_first WHERE user_id = 15) EXCEPT
(SELECT user_id FROM raw_events_second where user_id = 17)) as foo;
ERROR: Set operations are not allowed in INSERT ... SELECT queries
DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries
DEBUG: Collecting INSERT ... SELECT results on coordinator
DEBUG: Creating router plan
DEBUG: Plan is router executable
-- some supported LEFT joins
INSERT INTO agg_events (user_id)
SELECT
@ -1027,42 +1030,53 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4,
raw_events_second
WHERE raw_events_first.user_id = raw_events_second.user_id
GROUP BY raw_events_second.value_3) AS foo;
ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: The data type of the target table's partition column should exactly match the data type of the corresponding simple column reference in the subquery.
DEBUG: Collecting INSERT ... SELECT results on coordinator
ERROR: cannot push down this subquery
DETAIL: Group by list without partition column is currently unsupported
-- error cases
-- no part column at all
INSERT INTO raw_events_second
(value_1)
SELECT value_1
FROM raw_events_first;
ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: the query doesn't include the target table's partition column
DEBUG: Collecting INSERT ... SELECT results on coordinator
ERROR: the partition column of table public.raw_events_second should have a value
INSERT INTO raw_events_second
(value_1)
SELECT user_id
FROM raw_events_first;
ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: the query doesn't include the target table's partition column
DEBUG: Collecting INSERT ... SELECT results on coordinator
ERROR: the partition column of table public.raw_events_second should have a value
INSERT INTO raw_events_second
(user_id)
SELECT value_1
FROM raw_events_first;
ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DEBUG: cannot perform distributed INSERT INTO ... SELECT becuase the partition columns in the source table and subquery do not match
DETAIL: The target table's partition column should correspond to a partition column in the subquery.
DEBUG: Collecting INSERT ... SELECT results on coordinator
ERROR: the partition column of table public.raw_events_second cannot be NULL
INSERT INTO raw_events_second
(user_id)
SELECT user_id * 2
FROM raw_events_first;
ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: Subquery contains an operator in the same position as the target table's partition column.
HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery.
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO raw_events_second
(user_id)
SELECT user_id :: bigint
FROM raw_events_first;
ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: Subquery contains an explicit cast in the same position as the target table's partition column.
HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery.
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO agg_events
(value_3_agg,
value_4_agg,
@ -1076,9 +1090,11 @@ SELECT SUM(value_3),
Avg(value_2)
FROM raw_events_first
GROUP BY user_id;
ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: Subquery contains an aggregation in the same position as the target table's partition column.
HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery.
DEBUG: Collecting INSERT ... SELECT results on coordinator
ERROR: the partition column of table public.agg_events cannot be NULL
INSERT INTO agg_events
(value_3_agg,
value_4_agg,
@ -1093,16 +1109,21 @@ SELECT SUM(value_3),
FROM raw_events_first
GROUP BY user_id,
value_2;
ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DEBUG: cannot perform distributed INSERT INTO ... SELECT becuase the partition columns in the source table and subquery do not match
DETAIL: The target table's partition column should correspond to a partition column in the subquery.
DEBUG: Collecting INSERT ... SELECT results on coordinator
ERROR: the partition column of table public.agg_events cannot be NULL
-- tables should be co-located
INSERT INTO agg_events (user_id)
SELECT
user_id
FROM
reference_table;
ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DEBUG: cannot perform distributed INSERT INTO ... SELECT becuase the partition columns in the source table and subquery do not match
DETAIL: The target table's partition column should correspond to a partition column in the subquery.
DEBUG: Collecting INSERT ... SELECT results on coordinator
DEBUG: Creating router plan
DEBUG: Plan is router executable
-- unsupported joins between subqueries
-- we do not return bare partition column on the inner query
INSERT INTO agg_events
@ -1127,9 +1148,12 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4,
GROUP BY raw_events_second.value_1
HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2
ON (f.id = f2.id);
ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: Subquery contains an expression that is not a simple column reference in the same position as the target table's partition column.
HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery.
DEBUG: Collecting INSERT ... SELECT results on coordinator
ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys
DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator.
-- the second part of the query is not routable since
-- GROUP BY not on the partition column (i.e., value_1) and thus join
-- on f.id = f2.id is not on the partition key (instead on the sum of partition key)
@ -1564,7 +1588,10 @@ SELECT user_id,
Sum(value_2) AS sum_val2
FROM raw_events_second
GROUP BY grouping sets ( ( user_id ), ( value_1 ), ( user_id, value_1 ), ( ) );
ERROR: grouping sets are not allowed in INSERT ... SELECT queries
DEBUG: grouping sets are not allowed in distributed INSERT ... SELECT queries
DEBUG: Collecting INSERT ... SELECT results on coordinator
ERROR: could not run distributed query with GROUPING SETS, CUBE, or ROLLUP
HINT: Consider using an equality filter on the distributed table's partition column.
-- set back to INFO
SET client_min_messages TO INFO;
-- avoid constraint violations
@ -1592,7 +1619,8 @@ WHERE user_id
) as f_inner
)
) AS f2);
ERROR: LIMIT clauses are not allowed in INSERT ... SELECT queries
ERROR: cannot push down this subquery
DETAIL: Limit in subquery is currently unsupported
-- Altering a table and selecting from it using a multi-shard statement
-- in the same transaction is allowed because we will use the same
-- connections for all co-located placements.
@ -1646,7 +1674,7 @@ INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4)
SELECT count(*) FROM raw_events_second;
count
-------
18
36
(1 row)
INSERT INTO raw_events_second SELECT * FROM test_view;
@ -1656,12 +1684,9 @@ INSERT INTO raw_events_second SELECT * FROM test_view WHERE user_id = 17 GROUP B
SELECT count(*) FROM raw_events_second;
count
-------
20
38
(1 row)
-- inserting into views does not
INSERT INTO test_view SELECT * FROM raw_events_second;
ERROR: cannot insert into view over distributed table
-- we need this in our next test
truncate raw_events_first;
SET client_min_messages TO DEBUG2;
@ -1912,7 +1937,7 @@ FROM
table_with_defaults
GROUP BY
store_id;
ERROR: volatile functions are not allowed in INSERT ... SELECT queries
ERROR: INSERT ... SELECT cannot generate sequence values when selecting from a distributed table
-- do some more error/error message checks
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
@ -1937,49 +1962,61 @@ SELECT create_distributed_table('table_with_starts_with_defaults', 'c');
(1 row)
SET client_min_messages TO DEBUG;
INSERT INTO text_table (part_col)
SELECT
CASE WHEN part_col = 'onder' THEN 'marco'
END
FROM text_table ;
ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: Subquery contains a case expression in the same position as the target table's partition column.
HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery.
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO text_table (part_col) SELECT COALESCE(part_col, 'onder') FROM text_table;
ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: Subquery contains a coalesce expression in the same position as the target table's partition column.
HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery.
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO text_table (part_col) SELECT GREATEST(part_col, 'jason') FROM text_table;
ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: Subquery contains a min/max expression in the same position as the target table's partition column.
HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery.
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO text_table (part_col) SELECT LEAST(part_col, 'andres') FROM text_table;
ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: Subquery contains a min/max expression in the same position as the target table's partition column.
HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery.
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO text_table (part_col) SELECT NULLIF(part_col, 'metin') FROM text_table;
ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: Subquery contains an expression that is not a simple column reference in the same position as the target table's partition column.
HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery.
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO text_table (part_col) SELECT part_col isnull FROM text_table;
ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: Subquery contains an expression that is not a simple column reference in the same position as the target table's partition column.
HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery.
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO text_table (part_col) SELECT part_col::text from char_table;
ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: Subquery contains an explicit coercion in the same position as the target table's partition column.
HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery.
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO text_table (part_col) SELECT (part_col = 'burak') is true FROM text_table;
ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: Subquery contains an expression that is not a simple column reference in the same position as the target table's partition column.
HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery.
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO text_table (part_col) SELECT val FROM text_table;
ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: The data type of the target table's partition column should exactly match the data type of the corresponding simple column reference in the subquery.
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO text_table (part_col) SELECT val::text FROM text_table;
ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: Subquery contains an explicit coercion in the same position as the target table's partition column.
HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery.
DEBUG: Collecting INSERT ... SELECT results on coordinator
RESET client_min_messages;
insert into table_with_starts_with_defaults (b,c) select b,c FROM table_with_starts_with_defaults;
-- Test on partition column without native hash function
CREATE TABLE raw_table
@ -2012,10 +2049,380 @@ SELECT * FROM summary_table;
11-11-1980 | 1
(1 row)
-- Test INSERT ... SELECT via coordinator
-- Select from constants
TRUNCATE raw_events_first;
INSERT INTO raw_events_first (user_id, value_1)
SELECT * FROM (VALUES (1,2), (3,4), (5,6)) AS v(int,int);
SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id;
user_id | value_1
---------+---------
1 | 2
3 | 4
5 | 6
(3 rows)
-- Select from local functions
TRUNCATE raw_events_first;
CREATE SEQUENCE insert_select_test_seq;
SET client_min_messages TO DEBUG;
INSERT INTO raw_events_first (user_id, value_1, value_2)
SELECT
s, nextval('insert_select_test_seq'), (random()*10)::int
FROM
generate_series(1, 5) s;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1;
user_id | value_1
---------+---------
1 | 1
2 | 2
3 | 3
4 | 4
5 | 5
(5 rows)
-- ON CONFLICT is unsupported
INSERT INTO raw_events_first (user_id, value_1)
SELECT s, nextval('insert_select_test_seq') FROM generate_series(1, 5) s
ON CONFLICT DO NOTHING;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
ERROR: ON CONFLICT is not supported in INSERT ... SELECT via coordinator
-- RETURNING is unsupported
INSERT INTO raw_events_first (user_id, value_1)
SELECT s, nextval('insert_select_test_seq') FROM generate_series(1, 5) s
RETURNING *;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
ERROR: RETURNING is not supported in INSERT ... SELECT via coordinator
RESET client_min_messages;
-- INSERT ... SELECT and multi-shard SELECT in the same transaction is unsupported
TRUNCATE raw_events_first;
BEGIN;
INSERT INTO raw_events_first (user_id, value_1)
SELECT s, s FROM generate_series(1, 5) s;
SELECT user_id, value_1 FROM raw_events_first;
ERROR: cannot open new connections after the first modification command within a transaction
ROLLBACK;
-- INSERT ... SELECT and single-shard SELECT in the same transaction is supported
TRUNCATE raw_events_first;
BEGIN;
INSERT INTO raw_events_first (user_id, value_1)
SELECT s, s FROM generate_series(1, 5) s;
SELECT user_id, value_1 FROM raw_events_first WHERE user_id = 1;
user_id | value_1
---------+---------
1 | 1
(1 row)
COMMIT;
-- Select from local table
TRUNCATE raw_events_first;
CREATE TEMPORARY TABLE raw_events_first_local AS
SELECT s AS u, 2*s AS v FROM generate_series(1, 5) s;
INSERT INTO raw_events_first (user_id, value_1)
SELECT u, v FROM raw_events_first_local;
SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1;
user_id | value_1
---------+---------
1 | 2
2 | 4
3 | 6
4 | 8
5 | 10
(5 rows)
-- Use columns in opposite order
TRUNCATE raw_events_first;
INSERT INTO raw_events_first (value_1, user_id)
SELECT u, v FROM raw_events_first_local;
SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1;
user_id | value_1
---------+---------
2 | 1
4 | 2
6 | 3
8 | 4
10 | 5
(5 rows)
-- Set operations can work with opposite column order
TRUNCATE raw_events_first;
INSERT INTO raw_events_first (value_3, user_id)
( SELECT v, u::bigint FROM raw_events_first_local )
UNION ALL
( SELECT v, u FROM raw_events_first_local );
SELECT user_id, value_3 FROM raw_events_first ORDER BY user_id, value_3;
user_id | value_3
---------+---------
1 | 2
1 | 2
2 | 4
2 | 4
3 | 6
3 | 6
4 | 8
4 | 8
5 | 10
5 | 10
(10 rows)
-- Select from other distributed table with limit
TRUNCATE raw_events_first;
TRUNCATE raw_events_second;
INSERT INTO raw_events_second (user_id, value_4)
SELECT s, 3*s FROM generate_series (1,5) s;
INSERT INTO raw_events_first (user_id, value_1)
SELECT user_id, value_4 FROM raw_events_second LIMIT 5;
SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1;
user_id | value_1
---------+---------
1 | 3
2 | 6
3 | 9
4 | 12
5 | 15
(5 rows)
-- CTEs are supported in local queries
TRUNCATE raw_events_first;
WITH removed_rows AS (
DELETE FROM raw_events_first_local RETURNING u
)
INSERT INTO raw_events_first (user_id, value_1)
WITH value AS (SELECT 1)
SELECT * FROM removed_rows, value;
SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1;
user_id | value_1
---------+---------
1 | 1
2 | 1
3 | 1
4 | 1
5 | 1
(5 rows)
-- nested CTEs are also supported
TRUNCATE raw_events_first;
INSERT INTO raw_events_first_local SELECT s, 2*s FROM generate_series(0, 10) s;
WITH rows_to_remove AS (
SELECT u FROM raw_events_first_local WHERE u > 0
),
removed_rows AS (
DELETE FROM raw_events_first_local
WHERE u IN (SELECT * FROM rows_to_remove)
RETURNING u, v
)
INSERT INTO raw_events_first (user_id, value_1)
WITH ultra_rows AS (
WITH numbers AS (
SELECT s FROM generate_series(1,10) s
),
super_rows AS (
SELECT u, v FROM removed_rows JOIN numbers ON (u = s)
)
SELECT * FROM super_rows LIMIT 5
)
SELECT u, v FROM ultra_rows;
SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1;
user_id | value_1
---------+---------
1 | 2
2 | 4
3 | 6
4 | 8
5 | 10
(5 rows)
-- CTEs with duplicate names are also supported
TRUNCATE raw_events_first;
WITH super_rows AS (
SELECT u FROM raw_events_first_local
)
INSERT INTO raw_events_first (user_id, value_1)
WITH super_rows AS (
SELECT * FROM super_rows GROUP BY u
)
SELECT u, 5 FROM super_rows;
SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1;
user_id | value_1
---------+---------
0 | 5
(1 row)
-- CTEs are supported in router queries
TRUNCATE raw_events_first;
WITH user_two AS (
SELECT user_id, value_4 FROM raw_events_second WHERE user_id = 2
)
INSERT INTO raw_events_first (user_id, value_1)
SELECT * FROM user_two;
SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1;
user_id | value_1
---------+---------
2 | 6
(1 row)
-- CTEs are supported when there are name collisions
WITH numbers AS (
SELECT s FROM generate_series(1,10) s
)
INSERT INTO raw_events_first(user_id, value_1)
WITH numbers AS (
SELECT s, s FROM generate_series(1,5) s
)
SELECT * FROM numbers;
-- Select into distributed table with a sequence
CREATE TABLE "CaseSensitiveTable" ("UserID" int, "Value1" int);
SELECT create_distributed_table('"CaseSensitiveTable"', 'UserID');
create_distributed_table
--------------------------
(1 row)
INSERT INTO "CaseSensitiveTable"
SELECT s, s FROM generate_series(1,10) s;
SELECT * FROM "CaseSensitiveTable" ORDER BY "UserID";
UserID | Value1
--------+--------
1 | 1
2 | 2
3 | 3
4 | 4
5 | 5
6 | 6
7 | 7
8 | 8
9 | 9
10 | 10
(10 rows)
DROP TABLE "CaseSensitiveTable";
-- Select into distributed table with a sequence
CREATE TABLE dist_table_with_sequence (user_id serial, value_1 serial);
SELECT create_distributed_table('dist_table_with_sequence', 'user_id');
create_distributed_table
--------------------------
(1 row)
-- from local query
INSERT INTO dist_table_with_sequence (value_1)
SELECT s FROM generate_series(1,5) s;
SELECT * FROM dist_table_with_sequence ORDER BY user_id;
user_id | value_1
---------+---------
1 | 1
2 | 2
3 | 3
4 | 4
5 | 5
(5 rows)
-- from a distributed query
INSERT INTO dist_table_with_sequence (value_1)
SELECT value_1 FROM dist_table_with_sequence;
ERROR: INSERT ... SELECT cannot generate sequence values when selecting from a distributed table
SELECT * FROM dist_table_with_sequence ORDER BY user_id;
user_id | value_1
---------+---------
1 | 1
2 | 2
3 | 3
4 | 4
5 | 5
(5 rows)
-- Select from distributed table into reference table
CREATE TABLE ref_table (user_id int, value_1 int);
SELECT create_reference_table('ref_table');
create_reference_table
------------------------
(1 row)
INSERT INTO ref_table
SELECT user_id, value_1 FROM raw_events_second;
SELECT * FROM ref_table ORDER BY user_id, value_1;
user_id | value_1
---------+---------
1 |
2 |
3 |
4 |
5 |
(5 rows)
DROP TABLE ref_table;
-- Select into an append-partitioned table is not supported
CREATE TABLE insert_append_table (user_id int, value_4 bigint);
SELECT create_distributed_table('insert_append_table', 'user_id', 'append');
create_distributed_table
--------------------------
(1 row)
INSERT INTO insert_append_table (user_id, value_4)
SELECT user_id, 1 FROM raw_events_second LIMIT 5;
ERROR: INSERT ... SELECT into an append-distributed table is not supported
DROP TABLE insert_append_table;
-- Insert from other distributed table as prepared statement
TRUNCATE raw_events_first;
PREPARE insert_prep(int) AS
INSERT INTO raw_events_first (user_id, value_1)
SELECT $1, value_4 FROM raw_events_second ORDER BY value_4 LIMIT 1;
EXECUTE insert_prep(1);
EXECUTE insert_prep(2);
EXECUTE insert_prep(3);
EXECUTE insert_prep(4);
EXECUTE insert_prep(5);
EXECUTE insert_prep(6);
SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1;
user_id | value_1
---------+---------
1 | 3
2 | 3
3 | 3
4 | 3
5 | 3
6 | 3
(6 rows)
-- Inserting into views is handled via coordinator
TRUNCATE raw_events_first;
INSERT INTO test_view
SELECT * FROM raw_events_second;
SELECT user_id, value_4 FROM test_view ORDER BY user_id, value_4;
user_id | value_4
---------+---------
1 | 3
2 | 6
3 | 9
4 | 12
5 | 15
(5 rows)
-- Drop the view now, because the column we are about to drop depends on it
DROP VIEW test_view;
-- Make sure we handle dropped columns correctly
TRUNCATE raw_events_first;
ALTER TABLE raw_events_first DROP COLUMN value_1;
INSERT INTO raw_events_first (user_id, value_4)
SELECT value_4, user_id FROM raw_events_second LIMIT 5;
SELECT user_id, value_4 FROM raw_events_first ORDER BY user_id;
user_id | value_4
---------+---------
3 | 1
6 | 2
9 | 3
12 | 4
15 | 5
(5 rows)
RESET client_min_messages;
DROP TABLE raw_table;
DROP TABLE summary_table;
DROP TABLE raw_events_first CASCADE;
NOTICE: drop cascades to view test_view
DROP TABLE raw_events_second;
DROP TABLE reference_table;
DROP TABLE agg_events;

View File

@ -66,7 +66,8 @@ FROM (
) t2 ON (t1.user_id = t2.user_id)
GROUP BY t1.user_id, hasdone_event
) t GROUP BY user_id, hasdone_event;
ERROR: Set operations are not allowed in INSERT ... SELECT queries
ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys
DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator.
-- not pushable since the JOIN is not an equi join right part of the UNION
-- is not joined on the partition key
INSERT INTO agg_results_third (user_id, value_1_agg, value_2_agg )
@ -107,7 +108,8 @@ FROM (
) t2 ON (t1.user_id = t2.user_id)
GROUP BY t1.user_id, hasdone_event
) t GROUP BY user_id, hasdone_event;
ERROR: Set operations are not allowed in INSERT ... SELECT queries
ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys
DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator.
-- the LEFT JOIN conditon is not on the partition column (i.e., is it part_key divided by 2)
INSERT INTO agg_results_third (user_id, value_1_agg, value_2_agg )
SELECT user_id, sum(array_length(events_table, 1)), length(hasdone_event)
@ -147,7 +149,8 @@ FROM (
) t2 ON (t1.user_id = (t2.user_id)/2)
GROUP BY t1.user_id, hasdone_event
) t GROUP BY user_id, hasdone_event;
ERROR: Set operations are not allowed in INSERT ... SELECT queries
ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys
DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator.
------------------------------------
------------------------------------
-- Funnel, grouped by the number of times a user has done an event
@ -220,7 +223,8 @@ GROUP BY
count_pay, user_id
ORDER BY
count_pay;
ERROR: Set operations are not allowed in INSERT ... SELECT queries
ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys
DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator.
-- not pushable since the JOIN condition is not equi JOIN
-- (subquery_1 JOIN subquery_2)
INSERT INTO agg_results_third (user_id, value_1_agg, value_2_agg)
@ -288,7 +292,8 @@ GROUP BY
count_pay, user_id
ORDER BY
count_pay;
ERROR: Set operations are not allowed in INSERT ... SELECT queries
ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys
DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator.
------------------------------------
------------------------------------
-- Most recently seen users_table events_table
@ -686,4 +691,5 @@ FROM (
GROUP BY user_id
) AS shard_union
ORDER BY user_lastseen DESC;
ERROR: Subqueries without relations are not allowed in INSERT ... SELECT queries
ERROR: cannot push down this subquery
DETAIL: Subqueries without relations are unsupported

View File

@ -108,10 +108,6 @@ DELETE FROM limit_orders_mx WHERE id = 246 AND placed_at = current_timestamp::ti
INSERT INTO limit_orders_mx VALUES (DEFAULT), (DEFAULT);
ERROR: cannot perform distributed planning for the given modification
DETAIL: Multi-row INSERTs to distributed tables are not supported.
-- INSERT ... SELECT ... FROM commands are unsupported from workers
INSERT INTO limit_orders_mx SELECT * FROM limit_orders_mx;
ERROR: operation is not allowed on this node
HINT: Connect to the coordinator and run it again.
-- connect back to the other node
\c - - - :worker_1_port
-- commands containing a CTE are unsupported
@ -433,6 +429,21 @@ SELECT * FROM multiple_hash_mx WHERE category = '2' ORDER BY category, data;
----------+------
(0 rows)
--- INSERT ... SELECT ... FROM commands are supported from workers
INSERT INTO multiple_hash_mx
SELECT s, s*2 FROM generate_series(1,10) s;
INSERT 0 10
-- but are never distributed
BEGIN;
BEGIN
SET LOCAL client_min_messages TO DEBUG1;
SET
INSERT INTO multiple_hash_mx SELECT * FROM multiple_hash_mx;
DEBUG: distributed INSERT ... SELECT can only be performed from the coordinator
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT 0 10
END;
COMMIT
-- verify interaction of default values, SERIAL, and RETURNING
\set QUIET on
INSERT INTO app_analytics_events_mx VALUES (DEFAULT, 101, 'Fauxkemon Geaux') RETURNING id;

View File

@ -1091,12 +1091,11 @@ LOG: join order: [ "colocated_table_test" ][ broadcast join "reference_table_te
2
(2 rows)
SET client_min_messages TO NOTICE;
SET citus.log_multi_join_order TO FALSE;
SET citus.shard_count TO DEFAULT;
SET citus.task_executor_type to "real-time";
-- some INSERT .. SELECT queries that involve both hash distributed and reference tables
-- should error out since we're inserting into reference table where
-- should go via coordinator since we're inserting into reference table where
-- not all the participants are reference tables
INSERT INTO
reference_table_test (value_1)
@ -1105,9 +1104,10 @@ SELECT
FROM
colocated_table_test, colocated_table_test_2
WHERE
colocated_table_test.value_1 = colocated_table_test.value_1;
ERROR: only reference tables may be queried when targeting a reference table with INSERT ... SELECT
-- should error out, same as the above
colocated_table_test.value_1 = colocated_table_test_2.value_1;
DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- should go via coordinator, same as the above
INSERT INTO
reference_table_test (value_1)
SELECT
@ -1116,7 +1116,8 @@ FROM
colocated_table_test, reference_table_test
WHERE
colocated_table_test.value_1 = reference_table_test.value_1;
ERROR: only reference tables may be queried when targeting a reference table with INSERT ... SELECT
DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- now, insert into the hash partitioned table and use reference
-- tables in the SELECT queries
INSERT INTO
@ -1150,8 +1151,7 @@ RETURNING value_1, value_2;
2 | 2
(2 rows)
-- partition column value comes from reference table but still first error is
-- on data type mismatch
-- partition column value comes from reference table, goes via coordinator
INSERT INTO
colocated_table_test (value_1, value_2)
SELECT
@ -1159,11 +1159,10 @@ SELECT
FROM
colocated_table_test_2, reference_table_test
WHERE
colocated_table_test_2.value_4 = reference_table_test.value_4
RETURNING value_1, value_2;
ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
colocated_table_test_2.value_4 = reference_table_test.value_4;
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: The data type of the target table's partition column should exactly match the data type of the corresponding simple column reference in the subquery.
-- partition column value comes from reference table which should error out
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO
colocated_table_test (value_1, value_2)
SELECT
@ -1171,10 +1170,11 @@ SELECT
FROM
colocated_table_test_2, reference_table_test
WHERE
colocated_table_test_2.value_4 = reference_table_test.value_4
RETURNING value_1, value_2;
ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
colocated_table_test_2.value_4 = reference_table_test.value_4;
DEBUG: cannot perform distributed INSERT INTO ... SELECT becuase the partition columns in the source table and subquery do not match
DETAIL: The target table's partition column should correspond to a partition column in the subquery.
DEBUG: Collecting INSERT ... SELECT results on coordinator
RESET client_min_messages;
-- some tests for mark_tables_colocated
-- should error out
SELECT mark_tables_colocated('colocated_table_test_2', ARRAY['reference_table_test']);

View File

@ -426,7 +426,9 @@ WITH new_article AS (
INSERT INTO articles_hash VALUES (1, 1, 'arsenous', 9572) RETURNING *
)
SELECT * FROM new_article;
ERROR: data-modifying statements are not supported in the WITH clauses of distributed queries
DEBUG: data-modifying statements are not supported in the WITH clauses of distributed queries
ERROR: could not run distributed query with complex table expressions
HINT: Consider using an equality filter on the distributed table's partition column.
-- Modifying statement in nested CTE case is covered by PostgreSQL itself
WITH new_article AS (
WITH nested_cte AS (

View File

@ -266,14 +266,17 @@ SELECT count(*) FROM mx_table;
5
(1 row)
-- INSERT / SELECT
-- INSERT / SELECT pulls results to worker
BEGIN;
SET LOCAL client_min_messages TO DEBUG;
INSERT INTO mx_table_2 SELECT * FROM mx_table;
ERROR: operation is not allowed on this node
HINT: Connect to the coordinator and run it again.
DEBUG: distributed INSERT ... SELECT can only be performed from the coordinator
DEBUG: Collecting INSERT ... SELECT results on coordinator
END;
SELECT count(*) FROM mx_table_2;
count
-------
0
5
(1 row)
-- mark_tables_colocated

View File

@ -197,9 +197,6 @@ SELECT count(*) FROM temp_lineitem;
1706
(1 row)
-- modifying views is disallowed
INSERT INTO air_shipped_lineitems SELECT * from temp_lineitem;
ERROR: cannot insert into view over distributed table
SET citus.task_executor_type to "task-tracker";
-- single view repartition subqueries are not supported
SELECT l_suppkey, count(*) FROM

View File

@ -71,7 +71,7 @@ FROM (
) t GROUP BY user_id, hasdone_event;
-- get some statistics from the aggregated results to ensure the results are correct
-- SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
------------------------------------
------------------------------------
@ -148,7 +148,7 @@ ORDER BY
count_pay;
-- get some statistics from the aggregated results to ensure the results are correct
-- SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
------------------------------------
------------------------------------
@ -417,4 +417,4 @@ FROM
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;

View File

@ -146,7 +146,7 @@ FROM (
) t GROUP BY user_id, hasdone_event;
-- get some statistics from the aggregated results to ensure the results are correct
-- SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results_second;
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results_second;
------------------------------------
@ -398,4 +398,4 @@ INSERT INTO agg_results_second(user_id, value_2_agg)
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results_second;

View File

@ -465,3 +465,32 @@ ALTER TABLE explain_table ADD COLUMN value int;
EXPLAIN (COSTS FALSE) SELECT value FROM explain_table WHERE id = 1;
ROLLBACK;
-- test explain with local INSERT ... SELECT
EXPLAIN (COSTS OFF)
INSERT INTO lineitem_hash_part
SELECT o_orderkey FROM orders_hash_part LIMIT 3;
SELECT true AS valid FROM explain_json($$
INSERT INTO lineitem_hash_part (l_orderkey)
SELECT o_orderkey FROM orders_hash_part LIMIT 3;
$$);
EXPLAIN (COSTS OFF)
INSERT INTO lineitem_hash_part (l_orderkey, l_quantity)
SELECT o_orderkey, 5 FROM orders_hash_part LIMIT 3;
EXPLAIN (COSTS OFF)
INSERT INTO lineitem_hash_part (l_orderkey)
SELECT s FROM generate_series(1,5) s;
EXPLAIN (COSTS OFF)
WITH cte1 AS (SELECT s FROM generate_series(1,10) s)
INSERT INTO lineitem_hash_part
WITH cte1 AS (SELECT * FROM cte1 LIMIT 5)
SELECT s FROM cte1;
EXPLAIN (COSTS OFF)
INSERT INTO lineitem_hash_part
( SELECT s FROM generate_series(1,5) s) UNION
( SELECT s FROM generate_series(5,10) s);

View File

@ -116,21 +116,6 @@ WHERE
\set VERBOSITY default
-- volatile functions should be disallowed
INSERT INTO raw_events_second (user_id, value_1)
SELECT
user_id, (random()*10)::int
FROM
raw_events_first;
INSERT INTO raw_events_second (user_id, value_1)
WITH sub_cte AS (SELECT (random()*10)::int)
SELECT
user_id, (SELECT * FROM sub_cte)
FROM
raw_events_first;
-- add one more row
INSERT INTO raw_events_first (user_id, time) VALUES
(7, now());
@ -517,7 +502,9 @@ INSERT INTO agg_events
FROM
raw_events_first;
-- We do not support any set operations
-- We support set operations via the coordinator
BEGIN;
INSERT INTO
raw_events_first(user_id)
SELECT
@ -526,13 +513,15 @@ FROM
((SELECT user_id FROM raw_events_first) UNION
(SELECT user_id FROM raw_events_second)) as foo;
ROLLBACK;
-- We do not support any set operations
INSERT INTO
raw_events_first(user_id)
(SELECT user_id FROM raw_events_first) INTERSECT
(SELECT user_id FROM raw_events_first);
-- We do not support any set operations
-- If the query is router plannable then it is executed via the coordinator
INSERT INTO
raw_events_first(user_id)
SELECT
@ -1389,9 +1378,6 @@ INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4)
INSERT INTO raw_events_second SELECT * FROM test_view WHERE user_id = 17 GROUP BY 1,2,3,4,5,6;
SELECT count(*) FROM raw_events_second;
-- inserting into views does not
INSERT INTO test_view SELECT * FROM raw_events_second;
-- we need this in our next test
truncate raw_events_first;
@ -1611,6 +1597,8 @@ SELECT create_distributed_table('text_table', 'part_col');
SELECT create_distributed_table('char_table','part_col');
SELECT create_distributed_table('table_with_starts_with_defaults', 'c');
SET client_min_messages TO DEBUG;
INSERT INTO text_table (part_col)
SELECT
CASE WHEN part_col = 'onder' THEN 'marco'
@ -1628,6 +1616,9 @@ INSERT INTO text_table (part_col) SELECT part_col::text from char_table;
INSERT INTO text_table (part_col) SELECT (part_col = 'burak') is true FROM text_table;
INSERT INTO text_table (part_col) SELECT val FROM text_table;
INSERT INTO text_table (part_col) SELECT val::text FROM text_table;
RESET client_min_messages;
insert into table_with_starts_with_defaults (b,c) select b,c FROM table_with_starts_with_defaults;
-- Test on partition column without native hash function
@ -1651,6 +1642,261 @@ INSERT INTO summary_table SELECT time, COUNT(*) FROM raw_table GROUP BY time;
SELECT * FROM summary_table;
-- Test INSERT ... SELECT via coordinator
-- Select from constants
TRUNCATE raw_events_first;
INSERT INTO raw_events_first (user_id, value_1)
SELECT * FROM (VALUES (1,2), (3,4), (5,6)) AS v(int,int);
SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id;
-- Select from local functions
TRUNCATE raw_events_first;
CREATE SEQUENCE insert_select_test_seq;
SET client_min_messages TO DEBUG;
INSERT INTO raw_events_first (user_id, value_1, value_2)
SELECT
s, nextval('insert_select_test_seq'), (random()*10)::int
FROM
generate_series(1, 5) s;
SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1;
-- ON CONFLICT is unsupported
INSERT INTO raw_events_first (user_id, value_1)
SELECT s, nextval('insert_select_test_seq') FROM generate_series(1, 5) s
ON CONFLICT DO NOTHING;
-- RETURNING is unsupported
INSERT INTO raw_events_first (user_id, value_1)
SELECT s, nextval('insert_select_test_seq') FROM generate_series(1, 5) s
RETURNING *;
RESET client_min_messages;
-- INSERT ... SELECT and multi-shard SELECT in the same transaction is unsupported
TRUNCATE raw_events_first;
BEGIN;
INSERT INTO raw_events_first (user_id, value_1)
SELECT s, s FROM generate_series(1, 5) s;
SELECT user_id, value_1 FROM raw_events_first;
ROLLBACK;
-- INSERT ... SELECT and single-shard SELECT in the same transaction is supported
TRUNCATE raw_events_first;
BEGIN;
INSERT INTO raw_events_first (user_id, value_1)
SELECT s, s FROM generate_series(1, 5) s;
SELECT user_id, value_1 FROM raw_events_first WHERE user_id = 1;
COMMIT;
-- Select from local table
TRUNCATE raw_events_first;
CREATE TEMPORARY TABLE raw_events_first_local AS
SELECT s AS u, 2*s AS v FROM generate_series(1, 5) s;
INSERT INTO raw_events_first (user_id, value_1)
SELECT u, v FROM raw_events_first_local;
SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1;
-- Use columns in opposite order
TRUNCATE raw_events_first;
INSERT INTO raw_events_first (value_1, user_id)
SELECT u, v FROM raw_events_first_local;
SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1;
-- Set operations can work with opposite column order
TRUNCATE raw_events_first;
INSERT INTO raw_events_first (value_3, user_id)
( SELECT v, u::bigint FROM raw_events_first_local )
UNION ALL
( SELECT v, u FROM raw_events_first_local );
SELECT user_id, value_3 FROM raw_events_first ORDER BY user_id, value_3;
-- Select from other distributed table with limit
TRUNCATE raw_events_first;
TRUNCATE raw_events_second;
INSERT INTO raw_events_second (user_id, value_4)
SELECT s, 3*s FROM generate_series (1,5) s;
INSERT INTO raw_events_first (user_id, value_1)
SELECT user_id, value_4 FROM raw_events_second LIMIT 5;
SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1;
-- CTEs are supported in local queries
TRUNCATE raw_events_first;
WITH removed_rows AS (
DELETE FROM raw_events_first_local RETURNING u
)
INSERT INTO raw_events_first (user_id, value_1)
WITH value AS (SELECT 1)
SELECT * FROM removed_rows, value;
SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1;
-- nested CTEs are also supported
TRUNCATE raw_events_first;
INSERT INTO raw_events_first_local SELECT s, 2*s FROM generate_series(0, 10) s;
WITH rows_to_remove AS (
SELECT u FROM raw_events_first_local WHERE u > 0
),
removed_rows AS (
DELETE FROM raw_events_first_local
WHERE u IN (SELECT * FROM rows_to_remove)
RETURNING u, v
)
INSERT INTO raw_events_first (user_id, value_1)
WITH ultra_rows AS (
WITH numbers AS (
SELECT s FROM generate_series(1,10) s
),
super_rows AS (
SELECT u, v FROM removed_rows JOIN numbers ON (u = s)
)
SELECT * FROM super_rows LIMIT 5
)
SELECT u, v FROM ultra_rows;
SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1;
-- CTEs with duplicate names are also supported
TRUNCATE raw_events_first;
WITH super_rows AS (
SELECT u FROM raw_events_first_local
)
INSERT INTO raw_events_first (user_id, value_1)
WITH super_rows AS (
SELECT * FROM super_rows GROUP BY u
)
SELECT u, 5 FROM super_rows;
SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1;
-- CTEs are supported in router queries
TRUNCATE raw_events_first;
WITH user_two AS (
SELECT user_id, value_4 FROM raw_events_second WHERE user_id = 2
)
INSERT INTO raw_events_first (user_id, value_1)
SELECT * FROM user_two;
SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1;
-- CTEs are supported when there are name collisions
WITH numbers AS (
SELECT s FROM generate_series(1,10) s
)
INSERT INTO raw_events_first(user_id, value_1)
WITH numbers AS (
SELECT s, s FROM generate_series(1,5) s
)
SELECT * FROM numbers;
-- Select into distributed table with a sequence
CREATE TABLE "CaseSensitiveTable" ("UserID" int, "Value1" int);
SELECT create_distributed_table('"CaseSensitiveTable"', 'UserID');
INSERT INTO "CaseSensitiveTable"
SELECT s, s FROM generate_series(1,10) s;
SELECT * FROM "CaseSensitiveTable" ORDER BY "UserID";
DROP TABLE "CaseSensitiveTable";
-- Select into distributed table with a sequence
CREATE TABLE dist_table_with_sequence (user_id serial, value_1 serial);
SELECT create_distributed_table('dist_table_with_sequence', 'user_id');
-- from local query
INSERT INTO dist_table_with_sequence (value_1)
SELECT s FROM generate_series(1,5) s;
SELECT * FROM dist_table_with_sequence ORDER BY user_id;
-- from a distributed query
INSERT INTO dist_table_with_sequence (value_1)
SELECT value_1 FROM dist_table_with_sequence;
SELECT * FROM dist_table_with_sequence ORDER BY user_id;
-- Select from distributed table into reference table
CREATE TABLE ref_table (user_id int, value_1 int);
SELECT create_reference_table('ref_table');
INSERT INTO ref_table
SELECT user_id, value_1 FROM raw_events_second;
SELECT * FROM ref_table ORDER BY user_id, value_1;
DROP TABLE ref_table;
-- Select into an append-partitioned table is not supported
CREATE TABLE insert_append_table (user_id int, value_4 bigint);
SELECT create_distributed_table('insert_append_table', 'user_id', 'append');
INSERT INTO insert_append_table (user_id, value_4)
SELECT user_id, 1 FROM raw_events_second LIMIT 5;
DROP TABLE insert_append_table;
-- Insert from other distributed table as prepared statement
TRUNCATE raw_events_first;
PREPARE insert_prep(int) AS
INSERT INTO raw_events_first (user_id, value_1)
SELECT $1, value_4 FROM raw_events_second ORDER BY value_4 LIMIT 1;
EXECUTE insert_prep(1);
EXECUTE insert_prep(2);
EXECUTE insert_prep(3);
EXECUTE insert_prep(4);
EXECUTE insert_prep(5);
EXECUTE insert_prep(6);
SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1;
-- Inserting into views is handled via coordinator
TRUNCATE raw_events_first;
INSERT INTO test_view
SELECT * FROM raw_events_second;
SELECT user_id, value_4 FROM test_view ORDER BY user_id, value_4;
-- Drop the view now, because the column we are about to drop depends on it
DROP VIEW test_view;
-- Make sure we handle dropped columns correctly
TRUNCATE raw_events_first;
ALTER TABLE raw_events_first DROP COLUMN value_1;
INSERT INTO raw_events_first (user_id, value_4)
SELECT value_4, user_id FROM raw_events_second LIMIT 5;
SELECT user_id, value_4 FROM raw_events_first ORDER BY user_id;
RESET client_min_messages;
DROP TABLE raw_table;
DROP TABLE summary_table;
DROP TABLE raw_events_first CASCADE;

View File

@ -79,9 +79,6 @@ DELETE FROM limit_orders_mx WHERE id = 246 AND placed_at = current_timestamp::ti
-- commands with multiple rows are unsupported
INSERT INTO limit_orders_mx VALUES (DEFAULT), (DEFAULT);
-- INSERT ... SELECT ... FROM commands are unsupported from workers
INSERT INTO limit_orders_mx SELECT * FROM limit_orders_mx;
-- connect back to the other node
\c - - - :worker_1_port
@ -273,6 +270,16 @@ DELETE FROM multiple_hash_mx WHERE category = '1' RETURNING category;
SELECT * FROM multiple_hash_mx WHERE category = '1' ORDER BY category, data;
SELECT * FROM multiple_hash_mx WHERE category = '2' ORDER BY category, data;
--- INSERT ... SELECT ... FROM commands are supported from workers
INSERT INTO multiple_hash_mx
SELECT s, s*2 FROM generate_series(1,10) s;
-- but are never distributed
BEGIN;
SET LOCAL client_min_messages TO DEBUG1;
INSERT INTO multiple_hash_mx SELECT * FROM multiple_hash_mx;
END;
-- verify interaction of default values, SERIAL, and RETURNING
\set QUIET on

View File

@ -675,7 +675,6 @@ WHERE
colocated_table_test.value_1 = reference_table_test.value_1 AND colocated_table_test_2.value_1 = reference_table_test.value_1;
SET client_min_messages TO NOTICE;
SET citus.log_multi_join_order TO FALSE;
SET citus.shard_count TO DEFAULT;
@ -683,7 +682,7 @@ SET citus.task_executor_type to "real-time";
-- some INSERT .. SELECT queries that involve both hash distributed and reference tables
-- should error out since we're inserting into reference table where
-- should go via coordinator since we're inserting into reference table where
-- not all the participants are reference tables
INSERT INTO
reference_table_test (value_1)
@ -692,9 +691,9 @@ SELECT
FROM
colocated_table_test, colocated_table_test_2
WHERE
colocated_table_test.value_1 = colocated_table_test.value_1;
colocated_table_test.value_1 = colocated_table_test_2.value_1;
-- should error out, same as the above
-- should go via coordinator, same as the above
INSERT INTO
reference_table_test (value_1)
SELECT
@ -727,8 +726,7 @@ WHERE
colocated_table_test_2.value_2 = reference_table_test.value_2
RETURNING value_1, value_2;
-- partition column value comes from reference table but still first error is
-- on data type mismatch
-- partition column value comes from reference table, goes via coordinator
INSERT INTO
colocated_table_test (value_1, value_2)
SELECT
@ -736,10 +734,8 @@ SELECT
FROM
colocated_table_test_2, reference_table_test
WHERE
colocated_table_test_2.value_4 = reference_table_test.value_4
RETURNING value_1, value_2;
colocated_table_test_2.value_4 = reference_table_test.value_4;
-- partition column value comes from reference table which should error out
INSERT INTO
colocated_table_test (value_1, value_2)
SELECT
@ -747,9 +743,9 @@ SELECT
FROM
colocated_table_test_2, reference_table_test
WHERE
colocated_table_test_2.value_4 = reference_table_test.value_4
RETURNING value_1, value_2;
colocated_table_test_2.value_4 = reference_table_test.value_4;
RESET client_min_messages;
-- some tests for mark_tables_colocated
-- should error out

View File

@ -137,8 +137,12 @@ SELECT master_remove_node('localhost', 5432);
TRUNCATE mx_table;
SELECT count(*) FROM mx_table;
-- INSERT / SELECT
-- INSERT / SELECT pulls results to worker
BEGIN;
SET LOCAL client_min_messages TO DEBUG;
INSERT INTO mx_table_2 SELECT * FROM mx_table;
END;
SELECT count(*) FROM mx_table_2;
-- mark_tables_colocated

View File

@ -93,9 +93,6 @@ SELECT count(*) FROM temp_lineitem;
INSERT INTO temp_lineitem SELECT * FROM air_shipped_lineitems WHERE l_shipmode = 'MAIL';
SELECT count(*) FROM temp_lineitem;
-- modifying views is disallowed
INSERT INTO air_shipped_lineitems SELECT * from temp_lineitem;
SET citus.task_executor_type to "task-tracker";
-- single view repartition subqueries are not supported