diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index c631715d0..c6f01ae0b 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -1927,6 +1927,8 @@ CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) MemoryContextSwitchTo(oldContext); + copyDest->tuplesSent++; + #if PG_VERSION_NUM >= 90600 return true; #endif diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c new file mode 100644 index 000000000..e6e144930 --- /dev/null +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -0,0 +1,157 @@ +/*------------------------------------------------------------------------- + * + * insert_select_executor.c + * + * Executor logic for INSERT..SELECT. + * + * Copyright (c) 2017, Citus Data, Inc. + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "distributed/insert_select_executor.h" +#include "distributed/insert_select_planner.h" +#include "distributed/multi_copy.h" +#include "distributed/multi_executor.h" +#include "distributed/multi_physical_planner.h" +#include "distributed/multi_planner.h" +#include "distributed/transaction_management.h" +#include "executor/executor.h" +#include "nodes/execnodes.h" +#include "nodes/makefuncs.h" +#include "nodes/nodeFuncs.h" +#include "nodes/parsenodes.h" +#include "nodes/plannodes.h" +#include "parser/parse_coerce.h" +#include "parser/parsetree.h" +#include "tcop/pquery.h" +#include "tcop/tcopprot.h" +#include "utils/lsyscache.h" +#include "utils/portal.h" +#include "utils/snapmgr.h" + + +static void ExecuteSelectIntoRelation(Oid targetRelationId, List *insertTargetList, + Query *selectQuery, EState *executorState); +static void ExecuteIntoDestReceiver(Query *query, ParamListInfo params, + DestReceiver *dest); + + +/* + * CoordinatorInsertSelectExecScan executes an INSERT INTO distributed_table + * SELECT .. query by setting up a DestReceiver that copies tuples into the + * distributed table and then executing the SELECT query using that DestReceiver + * as the tuple destination. + */ +TupleTableSlot * +CoordinatorInsertSelectExecScan(CustomScanState *node) +{ + CitusScanState *scanState = (CitusScanState *) node; + TupleTableSlot *resultSlot = NULL; + + if (!scanState->finishedRemoteScan) + { + EState *executorState = scanState->customScanState.ss.ps.state; + MultiPlan *multiPlan = scanState->multiPlan; + Query *selectQuery = multiPlan->insertSelectSubquery; + List *insertTargetList = multiPlan->insertTargetList; + Oid targetRelationId = multiPlan->targetRelationId; + + ereport(DEBUG1, (errmsg("Collecting INSERT ... SELECT results on coordinator"))); + + ExecuteSelectIntoRelation(targetRelationId, insertTargetList, selectQuery, + executorState); + + scanState->finishedRemoteScan = true; + } + + resultSlot = ReturnTupleFromTuplestore(scanState); + + return resultSlot; +} + + +/* + * ExecuteSelectIntoRelation executes given SELECT query and inserts the + * results into the target relation, which is assumed to be a distributed + * table. + */ +static void +ExecuteSelectIntoRelation(Oid targetRelationId, List *insertTargetList, + Query *selectQuery, EState *executorState) +{ + ParamListInfo paramListInfo = executorState->es_param_list_info; + + ListCell *insertTargetCell = NULL; + List *columnNameList = NIL; + bool stopOnFailure = false; + char partitionMethod = 0; + + CitusCopyDestReceiver *copyDest = NULL; + + BeginOrContinueCoordinatedTransaction(); + + partitionMethod = PartitionMethod(targetRelationId); + if (partitionMethod == DISTRIBUTE_BY_NONE) + { + stopOnFailure = true; + } + + /* build the list of column names for the COPY statement */ + foreach(insertTargetCell, insertTargetList) + { + TargetEntry *insertTargetEntry = (TargetEntry *) lfirst(insertTargetCell); + + columnNameList = lappend(columnNameList, insertTargetEntry->resname); + } + + /* set up a DestReceiver that copies into the distributed table */ + copyDest = CreateCitusCopyDestReceiver(targetRelationId, columnNameList, + executorState, stopOnFailure); + + ExecuteIntoDestReceiver(selectQuery, paramListInfo, (DestReceiver *) copyDest); + + executorState->es_processed = copyDest->tuplesSent; + + XactModificationLevel = XACT_MODIFICATION_DATA; +} + + +/* + * ExecuteIntoDestReceiver plans and executes a query and sends results to the given + * DestReceiver. + */ +static void +ExecuteIntoDestReceiver(Query *query, ParamListInfo params, DestReceiver *dest) +{ + PlannedStmt *queryPlan = NULL; + Portal portal = NULL; + int eflags = 0; + int cursorOptions = 0; + long count = FETCH_ALL; + + /* create a new portal for executing the query */ + portal = CreateNewPortal(); + + /* don't display the portal in pg_cursors, it is for internal use only */ + portal->visible = false; + +#if (PG_VERSION_NUM >= 90600) + cursorOptions = CURSOR_OPT_PARALLEL_OK; +#endif + + /* plan the subquery, this may be another distributed query */ + queryPlan = pg_plan_query(query, cursorOptions, params); + + PortalDefineQuery(portal, + NULL, + "", + "SELECT", + list_make1(queryPlan), + NULL); + + PortalStart(portal, params, eflags, GetActiveSnapshot()); + PortalRun(portal, count, false, dest, dest, NULL); + PortalDrop(portal, false); +} diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index dd2c26733..40abbff78 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -15,6 +15,9 @@ #include "access/xact.h" #include "catalog/dependency.h" #include "catalog/namespace.h" +#include "distributed/insert_select_executor.h" +#include "distributed/insert_select_planner.h" +#include "distributed/multi_copy.h" #include "distributed/multi_executor.h" #include "distributed/multi_master_planner.h" #include "distributed/multi_planner.h" @@ -26,6 +29,7 @@ #include "executor/execdebug.h" #include "commands/copy.h" #include "nodes/makefuncs.h" +#include "parser/parsetree.h" #include "storage/lmgr.h" #include "tcop/utility.h" #include "utils/snapmgr.h" @@ -80,6 +84,15 @@ static CustomExecMethods RouterSelectCustomExecMethods = { .ExplainCustomScan = CitusExplainScan }; +static CustomExecMethods CoordinatorInsertSelectCustomExecMethods = { + .CustomName = "CoordinatorInsertSelectScan", + .BeginCustomScan = CitusSelectBeginScan, + .ExecCustomScan = CoordinatorInsertSelectExecScan, + .EndCustomScan = CitusEndScan, + .ReScanCustomScan = CitusReScan, + .ExplainCustomScan = CoordinatorInsertSelectExplainScan +}; + /* local function forward declarations */ static void PrepareMasterJobDirectory(Job *workerJob); @@ -167,6 +180,25 @@ RouterCreateScan(CustomScan *scan) } +/* + * CoordinatorInsertSelectCrateScan creates the scan state for executing + * INSERT..SELECT into a distributed table via the coordinator. + */ +Node * +CoordinatorInsertSelectCreateScan(CustomScan *scan) +{ + CitusScanState *scanState = palloc0(sizeof(CitusScanState)); + + scanState->executorType = MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT; + scanState->customScanState.ss.ps.type = T_CustomScanState; + scanState->multiPlan = GetMultiPlan(scan); + + scanState->customScanState.methods = &CoordinatorInsertSelectCustomExecMethods; + + return (Node *) scanState; +} + + /* * DelayedErrorCreateScan is only called if we could not plan for the given * query. This is the case when a plan is not ready for execution because diff --git a/src/backend/distributed/executor/multi_server_executor.c b/src/backend/distributed/executor/multi_server_executor.c index 127daff20..c57be21d2 100644 --- a/src/backend/distributed/executor/multi_server_executor.c +++ b/src/backend/distributed/executor/multi_server_executor.c @@ -43,12 +43,10 @@ MultiExecutorType JobExecutorType(MultiPlan *multiPlan) { Job *job = multiPlan->workerJob; - List *workerTaskList = job->taskList; - List *workerNodeList = ActiveWorkerNodeList(); - int taskCount = list_length(workerTaskList); - int workerNodeCount = list_length(workerNodeList); - double tasksPerNode = taskCount / ((double) workerNodeCount); - int dependedJobCount = list_length(job->dependedJobList); + List *workerNodeList = NIL; + int workerNodeCount = 0; + int taskCount = 0; + double tasksPerNode = 0.; MultiExecutorType executorType = TaskExecutorType; bool routerExecutablePlan = multiPlan->routerExecutable; @@ -59,6 +57,11 @@ JobExecutorType(MultiPlan *multiPlan) return MULTI_EXECUTOR_ROUTER; } + if (multiPlan->insertSelectSubquery != NULL) + { + return MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT; + } + /* if it is not a router executable plan, inform user according to the log level */ if (MultiTaskQueryLogLevel != MULTI_TASK_QUERY_INFO_OFF) { @@ -68,9 +71,15 @@ JobExecutorType(MultiPlan *multiPlan) " queries on the workers."))); } + workerNodeList = ActiveWorkerNodeList(); + workerNodeCount = list_length(workerNodeList); + taskCount = list_length(job->taskList); + tasksPerNode = taskCount / ((double) workerNodeCount); + if (executorType == MULTI_EXECUTOR_REAL_TIME) { double reasonableConnectionCount = 0; + int dependedJobCount = 0; /* if we need to open too many connections per worker, warn the user */ if (tasksPerNode >= MaxConnections) @@ -98,6 +107,7 @@ JobExecutorType(MultiPlan *multiPlan) } /* if we have repartition jobs with real time executor, error out */ + dependedJobCount = list_length(job->dependedJobList); if (dependedJobCount > 0) { ereport(ERROR, (errmsg("cannot use real time executor with repartition jobs"), diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c new file mode 100644 index 000000000..14f0ed99c --- /dev/null +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -0,0 +1,254 @@ +/*------------------------------------------------------------------------- + * + * insert_select_planner.c + * + * Planning logic for INSERT..SELECT. + * + * Copyright (c) 2017, Citus Data, Inc. + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "distributed/citus_ruleutils.h" +#include "distributed/errormessage.h" +#include "distributed/insert_select_planner.h" +#include "distributed/multi_executor.h" +#include "distributed/multi_logical_planner.h" +#include "distributed/multi_physical_planner.h" +#include "distributed/multi_router_planner.h" +#include "distributed/pg_dist_partition.h" +#include "nodes/makefuncs.h" +#include "nodes/nodeFuncs.h" +#include "nodes/parsenodes.h" +#include "optimizer/planner.h" +#include "parser/parsetree.h" +#include "parser/parse_coerce.h" +#include "parser/parse_relation.h" +#include "utils/lsyscache.h" + + +static DeferredErrorMessage * DeferErrorIfCoordinatorInsertSelectUnsupported( + Query *insertSelectQuery); +static Query * WrapSubquery(Query *subquery); + + +/* + * CreatteCoordinatorInsertSelectPlan creates a query plan for a SELECT into a + * distributed table. The query plan can also be executed on a worker in MX. + */ +MultiPlan * +CreateCoordinatorInsertSelectPlan(Query *parse) +{ + Query *insertSelectQuery = copyObject(parse); + Query *selectQuery = NULL; + + RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery); + RangeTblEntry *insertRte = ExtractInsertRangeTableEntry(insertSelectQuery); + Oid targetRelationId = insertRte->relid; + + ListCell *selectTargetCell = NULL; + ListCell *insertTargetCell = NULL; + + MultiPlan *multiPlan = CitusMakeNode(MultiPlan); + multiPlan->operation = CMD_INSERT; + + multiPlan->planningError = + DeferErrorIfCoordinatorInsertSelectUnsupported(insertSelectQuery); + + if (multiPlan->planningError != NULL) + { + return multiPlan; + } + + selectQuery = selectRte->subquery; + + /* + * Wrap the SELECT as a subquery if the INSERT...SELECT has CTEs or the SELECT + * has top-level set operations. + * + * We could simply wrap all queries, but that might create a subquery that is + * not supported by the logical planner. Since the logical planner also does + * not support CTEs and top-level set operations, we can wrap queries containing + * those without breaking anything. + */ + if (list_length(insertSelectQuery->cteList) > 0) + { + selectQuery = WrapSubquery(selectRte->subquery); + + /* copy CTEs from the INSERT ... SELECT statement into outer SELECT */ + selectQuery->cteList = copyObject(insertSelectQuery->cteList); + } + else if (selectQuery->setOperations != NULL) + { + /* top-level set operations confuse the ReorderInsertSelectTargetLists logic */ + selectQuery = WrapSubquery(selectRte->subquery); + } + + selectRte->subquery = selectQuery; + + ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte); + + /* add casts when the SELECT output does not directly match the table */ + forboth(insertTargetCell, insertSelectQuery->targetList, + selectTargetCell, selectQuery->targetList) + { + TargetEntry *insertTargetEntry = (TargetEntry *) lfirst(insertTargetCell); + TargetEntry *selectTargetEntry = (TargetEntry *) lfirst(selectTargetCell); + + Var *columnVar = NULL; + Oid columnType = InvalidOid; + int32 columnTypeMod = 0; + Oid selectOutputType = InvalidOid; + + /* indirection is not supported, e.g. INSERT INTO table (composite_column.x) */ + if (!IsA(insertTargetEntry->expr, Var)) + { + ereport(ERROR, (errmsg("can only handle regular columns in the target " + "list"))); + } + + columnVar = (Var *) insertTargetEntry->expr; + columnType = get_atttype(targetRelationId, columnVar->varattno); + columnTypeMod = get_atttypmod(targetRelationId, columnVar->varattno); + selectOutputType = columnVar->vartype; + + /* + * If the type in the target list does not match the type of the column, + * we need to cast to the column type. PostgreSQL would do this + * automatically during the insert, but we're passing the SELECT + * output directly to COPY. + */ + if (columnType != selectOutputType) + { + Expr *selectExpression = selectTargetEntry->expr; + Expr *typeCastedSelectExpr = + (Expr *) coerce_to_target_type(NULL, (Node *) selectExpression, + selectOutputType, columnType, + columnTypeMod, COERCION_EXPLICIT, + COERCE_IMPLICIT_CAST, -1); + + selectTargetEntry->expr = typeCastedSelectExpr; + } + } + + multiPlan->insertSelectSubquery = selectQuery; + multiPlan->insertTargetList = insertSelectQuery->targetList; + multiPlan->targetRelationId = targetRelationId; + + return multiPlan; +} + + +/* + * DeferErrorIfCoordinatorInsertSelectUnsupported returns an error if executing an + * INSERT ... SELECT command by pulling results of the SELECT to the coordinator + * is unsupported because it uses RETURNING, ON CONFLICT, or an append-distributed + * table. + */ +static DeferredErrorMessage * +DeferErrorIfCoordinatorInsertSelectUnsupported(Query *insertSelectQuery) +{ + RangeTblEntry *insertRte = NULL; + RangeTblEntry *subqueryRte = NULL; + Query *subquery = NULL; + + if (list_length(insertSelectQuery->returningList) > 0) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "RETURNING is not supported in INSERT ... SELECT via " + "coordinator", NULL, NULL); + } + + if (insertSelectQuery->onConflict) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "ON CONFLICT is not supported in INSERT ... SELECT via " + "coordinator", NULL, NULL); + } + + insertRte = ExtractInsertRangeTableEntry(insertSelectQuery); + if (PartitionMethod(insertRte->relid) == DISTRIBUTE_BY_APPEND) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "INSERT ... SELECT into an append-distributed table is " + "not supported", NULL, NULL); + } + + subqueryRte = ExtractSelectRangeTableEntry(insertSelectQuery); + subquery = (Query *) subqueryRte->subquery; + + if (NeedsDistributedPlanning(subquery) && + contain_nextval_expression_walker((Node *) insertSelectQuery->targetList, NULL)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "INSERT ... SELECT cannot generate sequence values when " + "selecting from a distributed table", + NULL, NULL); + } + + return NULL; +} + + +/* + * WrapSubquery wraps the given query as a subquery in a newly constructed + * "SELECT * FROM (...subquery...) citus_insert_select_subquery" query. + */ +static Query * +WrapSubquery(Query *subquery) +{ + Query *outerQuery = NULL; + ParseState *pstate = make_parsestate(NULL); + Alias *selectAlias = NULL; + RangeTblEntry *newRangeTableEntry = NULL; + RangeTblRef *newRangeTableRef = NULL; + ListCell *selectTargetCell = NULL; + List *newTargetList = NIL; + + outerQuery = makeNode(Query); + outerQuery->commandType = CMD_SELECT; + + /* create range table entries */ + selectAlias = makeAlias("citus_insert_select_subquery", NIL); + newRangeTableEntry = addRangeTableEntryForSubquery(pstate, subquery, + selectAlias, false, true); + outerQuery->rtable = list_make1(newRangeTableEntry); + + /* set the FROM expression to the subquery */ + newRangeTableRef = makeNode(RangeTblRef); + newRangeTableRef->rtindex = 1; + outerQuery->jointree = makeFromExpr(list_make1(newRangeTableRef), NULL); + + /* create a target list that matches the SELECT */ + foreach(selectTargetCell, subquery->targetList) + { + TargetEntry *selectTargetEntry = (TargetEntry *) lfirst(selectTargetCell); + Var *newSelectVar = NULL; + TargetEntry *newSelectTargetEntry = NULL; + + /* exactly 1 entry in FROM */ + int indexInRangeTable = 1; + + if (selectTargetEntry->resjunk) + { + continue; + } + + newSelectVar = makeVar(indexInRangeTable, selectTargetEntry->resno, + exprType((Node *) selectTargetEntry->expr), + exprTypmod((Node *) selectTargetEntry->expr), + exprCollation((Node *) selectTargetEntry->expr), 0); + + newSelectTargetEntry = makeTargetEntry((Expr *) newSelectVar, + selectTargetEntry->resno, + selectTargetEntry->resname, + selectTargetEntry->resjunk); + + newTargetList = lappend(newTargetList, newSelectTargetEntry); + } + + outerQuery->targetList = newTargetList; + + return outerQuery; +} diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index 52d2e10dc..e6a5b3e94 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -22,6 +22,7 @@ #include "optimizer/cost.h" #include "distributed/citus_nodefuncs.h" #include "distributed/connection_management.h" +#include "distributed/insert_select_planner.h" #include "distributed/multi_client_executor.h" #include "distributed/multi_executor.h" #include "distributed/multi_explain.h" @@ -44,7 +45,9 @@ #include "tcop/dest.h" #include "tcop/tcopprot.h" #include "tcop/utility.h" +#include "utils/builtins.h" #include "utils/json.h" +#include "utils/lsyscache.h" #include "utils/snapmgr.h" @@ -69,6 +72,8 @@ typedef struct RemoteExplainPlan /* Explain functions for distributed queries */ +static void CitusExplainOneQuery(Query *query, IntoClause *into, ExplainState *es, + const char *queryString, ParamListInfo params); static void ExplainJob(Job *job, ExplainState *es); static void ExplainMapMergeJob(MapMergeJob *mapMergeJob, ExplainState *es); static void ExplainTaskList(List *taskList, ExplainState *es); @@ -710,3 +715,73 @@ ExplainYAMLLineStarting(ExplainState *es) appendStringInfoSpaces(es->str, es->indent * 2); } } + + +/* + * CoordinatorInsertSelectExplainScan is a custom scan explain callback function + * which is used to print explain information of a Citus plan for an INSERT INTO + * distributed_table SELECT ... query that is evaluated on the coordinator. + */ +void +CoordinatorInsertSelectExplainScan(CustomScanState *node, List *ancestors, + struct ExplainState *es) +{ + CitusScanState *scanState = (CitusScanState *) node; + MultiPlan *multiPlan = scanState->multiPlan; + Query *query = multiPlan->insertSelectSubquery; + IntoClause *into = NULL; + ParamListInfo params = NULL; + char *queryString = NULL; + + if (es->analyze) + { + /* avoiding double execution here is tricky, error out for now */ + ereport(ERROR, (errmsg("EXPLAIN ANALYZE is currently not supported for INSERT " + "... SELECT commands via the coordinator"))); + } + + ExplainOpenGroup("Select Query", "Select Query", false, es); + + /* explain the inner SELECT query */ + CitusExplainOneQuery(query, into, es, queryString, params); + + ExplainCloseGroup("Select Query", "Select Query", false, es); +} + + +/* + * CitusExplainOneQuery is simply a duplicate of ExplainOneQuery in explain.c, which + * is static. + */ +static void +CitusExplainOneQuery(Query *query, IntoClause *into, ExplainState *es, + const char *queryString, ParamListInfo params) +{ + /* copied from ExplainOneQuery in explain.c */ + if (ExplainOneQuery_hook) + { + (*ExplainOneQuery_hook) (query, into, es, queryString, params); + } + else + { + PlannedStmt *plan; + instr_time planstart, + planduration; + int cursorOptions = 0; + + INSTR_TIME_SET_CURRENT(planstart); + + #if (PG_VERSION_NUM >= 90600) + cursorOptions = into ? 0 : CURSOR_OPT_PARALLEL_OK; + #endif + + /* plan the query */ + plan = pg_plan_query(query, cursorOptions, params); + + INSTR_TIME_SET_CURRENT(planduration); + INSTR_TIME_SUBTRACT(planduration, planstart); + + /* run it (if needed) and produce output */ + ExplainOnePlan(plan, into, es, queryString, params, &planduration); + } +} diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index 45a1576dc..f26371430 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -26,6 +26,7 @@ #include "distributed/multi_logical_planner.h" #include "distributed/multi_physical_planner.h" #include "distributed/relation_restriction_equivalence.h" +#include "distributed/multi_router_planner.h" #include "distributed/worker_protocol.h" #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" @@ -2662,6 +2663,15 @@ NeedsDistributedPlanning(Query *queryTree) return false; } + /* + * We can handle INSERT INTO distributed_table SELECT ... even if the SELECT + * part references local tables, so skip the remaining checks. + */ + if (InsertSelectIntoDistributedTable(queryTree)) + { + return true; + } + /* extract range table entries for simple relations only */ ExtractRangeTableRelationWalker((Node *) queryTree, &rangeTableList); diff --git a/src/backend/distributed/planner/multi_planner.c b/src/backend/distributed/planner/multi_planner.c index c09bd6760..c630a984c 100644 --- a/src/backend/distributed/planner/multi_planner.c +++ b/src/backend/distributed/planner/multi_planner.c @@ -52,6 +52,11 @@ static CustomScanMethods RouterCustomScanMethods = { RouterCreateScan }; +static CustomScanMethods CoordinatorInsertSelectCustomScanMethods = { + "Citus INSERT ... SELECT via coordinator", + CoordinatorInsertSelectCreateScan +}; + static CustomScanMethods DelayedErrorCustomScanMethods = { "Citus Delayed Error", DelayedErrorCreateScan @@ -274,10 +279,18 @@ CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query if (IsModifyCommand(query)) { - if (InsertSelectQuery(originalQuery)) + if (InsertSelectIntoDistributedTable(originalQuery)) { distributedPlan = CreateDistributedInsertSelectPlan(originalQuery, plannerRestrictionContext); + + if (distributedPlan->planningError != NULL) + { + RaiseDeferredError(distributedPlan->planningError, DEBUG1); + + /* if INSERT..SELECT cannot be distributed, pull to coordinator */ + distributedPlan = CreateCoordinatorInsertSelectPlan(originalQuery); + } } else { @@ -503,6 +516,12 @@ FinalizePlan(PlannedStmt *localPlan, MultiPlan *multiPlan) break; } + case MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT: + { + customScan->methods = &CoordinatorInsertSelectCustomScanMethods; + break; + } + default: { customScan->methods = &DelayedErrorCustomScanMethods; diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 044aa775c..64a13de63 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -741,9 +741,12 @@ ExtractSelectRangeTableEntry(Query *query) RangeTblRef *reference = NULL; RangeTblEntry *subqueryRte = NULL; - Assert(InsertSelectQuery(query)); + Assert(InsertSelectIntoDistributedTable(query)); - /* since we already asserted InsertSelectQuery() it is safe to access both lists */ + /* + * Since we already asserted InsertSelectIntoDistributedTable() it is safe to access + * both lists + */ fromList = query->jointree->fromlist; reference = linitial(fromList); subqueryRte = rt_fetch(reference->rtindex, query->rtable); @@ -764,8 +767,6 @@ ExtractInsertRangeTableEntry(Query *query) List *rangeTableList = query->rtable; RangeTblEntry *insertRTE = NULL; - AssertArg(InsertSelectQuery(query)); - insertRTE = rt_fetch(resultRelation, rangeTableList); return insertRTE; @@ -788,9 +789,25 @@ InsertSelectQuerySupported(Query *queryTree, RangeTblEntry *insertRte, DeferredErrorMessage *error = NULL; /* we only do this check for INSERT ... SELECT queries */ - AssertArg(InsertSelectQuery(queryTree)); + AssertArg(InsertSelectIntoDistributedTable(queryTree)); - EnsureCoordinator(); + subquery = subqueryRte->subquery; + + if (!NeedsDistributedPlanning(subquery)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "distributed INSERT ... SELECT can only select from " + "distributed tables", + NULL, NULL); + } + + if (GetLocalGroupId() != 0) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "distributed INSERT ... SELECT can only be performed from " + "the coordinator", + NULL, NULL); + } /* we do not expect to see a view in modify target */ foreach(rangeTableCell, queryTree->rtable) @@ -805,13 +822,11 @@ InsertSelectQuerySupported(Query *queryTree, RangeTblEntry *insertRte, } } - subquery = subqueryRte->subquery; - if (contain_volatile_functions((Node *) queryTree)) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "volatile functions are not allowed in INSERT ... SELECT " - "queries", + "volatile functions are not allowed in distributed " + "INSERT ... SELECT queries", NULL, NULL); } @@ -832,7 +847,7 @@ InsertSelectQuerySupported(Query *queryTree, RangeTblEntry *insertRte, { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, "only reference tables may be queried when targeting " - "a reference table with INSERT ... SELECT", + "a reference table with distributed INSERT ... SELECT", NULL, NULL); } } @@ -857,7 +872,7 @@ InsertSelectQuerySupported(Query *queryTree, RangeTblEntry *insertRte, { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, "INSERT target table and the source relation of the SELECT partition " - "column value must be colocated", + "column value must be colocated in distributed INSERT ... SELECT", NULL, NULL); } } @@ -888,7 +903,7 @@ MultiTaskRouterSelectQuerySupported(Query *query) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, "Subqueries without relations are not allowed in " - "INSERT ... SELECT queries", + "distributed INSERT ... SELECT queries", NULL, NULL); } @@ -896,8 +911,8 @@ MultiTaskRouterSelectQuerySupported(Query *query) if (subquery->limitCount != NULL) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "LIMIT clauses are not allowed in INSERT ... SELECT " - "queries", + "LIMIT clauses are not allowed in distirbuted INSERT " + "... SELECT queries", NULL, NULL); } @@ -905,8 +920,8 @@ MultiTaskRouterSelectQuerySupported(Query *query) if (subquery->limitOffset != NULL) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "OFFSET clauses are not allowed in INSERT ... SELECT " - "queries", + "OFFSET clauses are not allowed in distributed " + "INSERT ... SELECT queries", NULL, NULL); } @@ -918,16 +933,16 @@ MultiTaskRouterSelectQuerySupported(Query *query) if (subquery->windowClause != NULL) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "window functions are not allowed in INSERT ... SELECT " - "queries", + "window functions are not allowed in distributed " + "INSERT ... SELECT queries", NULL, NULL); } if (subquery->setOperations != NULL) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "Set operations are not allowed in INSERT ... SELECT " - "queries", + "Set operations are not allowed in distributed " + "INSERT ... SELECT queries", NULL, NULL); } @@ -940,8 +955,8 @@ MultiTaskRouterSelectQuerySupported(Query *query) if (subquery->groupingSets != NULL) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "grouping sets are not allowed in INSERT ... SELECT " - "queries", + "grouping sets are not allowed in distributed " + "INSERT ... SELECT queries", NULL, NULL); } @@ -952,7 +967,7 @@ MultiTaskRouterSelectQuerySupported(Query *query) if (subquery->hasDistinctOn) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "DISTINCT ON clauses are not allowed in " + "DISTINCT ON clauses are not allowed in distributed " "INSERT ... SELECT queries", NULL, NULL); } @@ -1134,8 +1149,9 @@ InsertPartitionColumnMatchesSelect(Query *query, RangeTblEntry *insertRte, } return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "INSERT INTO ... SELECT partition columns in the source " - "table and subquery do not match", + "cannot perform distributed INSERT INTO ... SELECT " + "because the partition columns in the source table " + "and subquery do not match", psprintf(errorDetailTemplate, exprDescription), "Ensure the target table's partition column has a " "corresponding simple column reference to a distributed " @@ -1149,8 +1165,9 @@ InsertPartitionColumnMatchesSelect(Query *query, RangeTblEntry *insertRte, if (!IsA(targetEntry->expr, Var)) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "INSERT INTO ... SELECT partition columns in the source " - "table and subquery do not match", + "cannot perform distributed INSERT INTO ... SELECT " + "because the partition columns in the source table " + "and subquery do not match", "The data type of the target table's partition column " "should exactly match the data type of the " "corresponding simple column reference in the subquery.", @@ -1161,13 +1178,15 @@ InsertPartitionColumnMatchesSelect(Query *query, RangeTblEntry *insertRte, if (!IsPartitionColumn(selectTargetExpr, subquery)) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "INSERT INTO ... SELECT partition columns in the source " - "table and subquery do not match", + "cannot perform distributed INSERT INTO ... SELECT " + "becuase the partition columns in the source table " + "and subquery do not match", "The target table's partition column should correspond " "to a partition column in the subquery.", NULL); } + /* finally, check that the select target column is a partition column */ /* we can set the select relation id */ *selectPartitionColumnTableId = subqueryPartitionColumnRelationId; @@ -1177,8 +1196,9 @@ InsertPartitionColumnMatchesSelect(Query *query, RangeTblEntry *insertRte, if (!targetTableHasPartitionColumn) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "INSERT INTO ... SELECT partition columns in the source " - "table and subquery do not match", + "cannot perform distributed INSERT INTO ... SELECT " + "because the partition columns in the source table " + "and subquery do not match", "the query doesn't include the target table's " "partition column", NULL); @@ -1210,8 +1230,6 @@ ModifyQuerySupported(Query *queryTree) Node *onConflictWhere = NULL; CmdType commandType = queryTree->commandType; - Assert(commandType == CMD_INSERT || commandType == CMD_UPDATE || - commandType == CMD_DELETE); /* * Reject subqueries which are in SELECT or WHERE clause. @@ -2731,7 +2749,7 @@ ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte, int subqueryTargetLength = 0; int targetEntryIndex = 0; - AssertArg(InsertSelectQuery(originalQuery)); + AssertArg(InsertSelectIntoDistributedTable(originalQuery)); subquery = subqueryRte->subquery; @@ -2867,8 +2885,9 @@ ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte, /* - * InsertSelectQuery returns true when the input query - * is INSERT INTO ... SELECT kind of query. + * InsertSelectIntoDistributedTable returns true when the input query is an + * INSERT INTO ... SELECT kind of query and the target is a distributed + * table. * * Note that the input query should be the original parsetree of * the query (i.e., not passed trough the standard planner). @@ -2877,12 +2896,13 @@ ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte, * rewrite/rewriteManip.c. */ bool -InsertSelectQuery(Query *query) +InsertSelectIntoDistributedTable(Query *query) { CmdType commandType = query->commandType; List *fromList = NULL; RangeTblRef *rangeTableReference = NULL; RangeTblEntry *subqueryRte = NULL; + RangeTblEntry *insertRte = NULL; if (commandType != CMD_INSERT) { @@ -2901,7 +2921,10 @@ InsertSelectQuery(Query *query) } rangeTableReference = linitial(fromList); - Assert(IsA(rangeTableReference, RangeTblRef)); + if (!IsA(rangeTableReference, RangeTblRef)) + { + return false; + } subqueryRte = rt_fetch(rangeTableReference->rtindex, query->rtable); if (subqueryRte->rtekind != RTE_SUBQUERY) @@ -2912,6 +2935,12 @@ InsertSelectQuery(Query *query) /* ensure that there is a query */ Assert(IsA(subqueryRte->subquery, Query)); + insertRte = ExtractInsertRangeTableEntry(query); + if (!IsDistributedTable(insertRte->relid)) + { + return false; + } + return true; } diff --git a/src/backend/distributed/test/deparse_shard_query.c b/src/backend/distributed/test/deparse_shard_query.c index 883f59258..fe70cb13a 100644 --- a/src/backend/distributed/test/deparse_shard_query.c +++ b/src/backend/distributed/test/deparse_shard_query.c @@ -60,7 +60,7 @@ deparse_shard_query_test(PG_FUNCTION_ARGS) StringInfo buffer = makeStringInfo(); /* reoreder the target list only for INSERT .. SELECT queries */ - if (InsertSelectQuery(query)) + if (InsertSelectIntoDistributedTable(query)) { RangeTblEntry *insertRte = linitial(query->rtable); RangeTblEntry *subqueryRte = lsecond(query->rtable); diff --git a/src/backend/distributed/utils/citus_clauses.c b/src/backend/distributed/utils/citus_clauses.c index 0016499a3..d357e953c 100644 --- a/src/backend/distributed/utils/citus_clauses.c +++ b/src/backend/distributed/utils/citus_clauses.c @@ -104,7 +104,7 @@ ExecuteMasterEvaluableFunctions(Query *query, PlanState *planState) ListCell *rteCell = NULL; ListCell *cteCell = NULL; Node *modifiedNode = NULL; - bool insertSelectQuery = InsertSelectQuery(query); + bool insertSelectQuery = InsertSelectIntoDistributedTable(query); if (query->jointree && query->jointree->quals) { diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 460de189b..866c23ddc 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -282,6 +282,11 @@ OutMultiPlan(OUTFUNC_ARGS) WRITE_NODE_FIELD(workerJob); WRITE_NODE_FIELD(masterQuery); WRITE_BOOL_FIELD(routerExecutable); + + WRITE_NODE_FIELD(insertSelectSubquery); + WRITE_NODE_FIELD(insertTargetList); + WRITE_OID_FIELD(targetRelationId); + WRITE_NODE_FIELD(planningError); } diff --git a/src/backend/distributed/utils/citus_readfuncs.c b/src/backend/distributed/utils/citus_readfuncs.c index 9bcf61efc..a1396e196 100644 --- a/src/backend/distributed/utils/citus_readfuncs.c +++ b/src/backend/distributed/utils/citus_readfuncs.c @@ -190,6 +190,11 @@ ReadMultiPlan(READFUNC_ARGS) READ_NODE_FIELD(workerJob); READ_NODE_FIELD(masterQuery); READ_BOOL_FIELD(routerExecutable); + + READ_NODE_FIELD(insertSelectSubquery); + READ_NODE_FIELD(insertTargetList); + READ_OID_FIELD(targetRelationId); + READ_NODE_FIELD(planningError); READ_DONE(); diff --git a/src/backend/distributed/utils/citus_ruleutils.c b/src/backend/distributed/utils/citus_ruleutils.c index b5c8a7dc4..baf7248d8 100644 --- a/src/backend/distributed/utils/citus_ruleutils.c +++ b/src/backend/distributed/utils/citus_ruleutils.c @@ -59,7 +59,7 @@ static void AppendOptionListToString(StringInfo stringData, List *options); static const char * convert_aclright_to_string(int aclright); -static bool contain_nextval_expression_walker(Node *node, void *context); + /* * pg_get_extensiondef_string finds the foreign data wrapper that corresponds to @@ -989,7 +989,7 @@ convert_aclright_to_string(int aclright) * contain_nextval_expression_walker walks over expression tree and returns * true if it contains call to 'nextval' function. */ -static bool +bool contain_nextval_expression_walker(Node *node, void *context) { if (node == NULL) diff --git a/src/include/distributed/citus_ruleutils.h b/src/include/distributed/citus_ruleutils.h index e7eaf9204..e79224326 100644 --- a/src/include/distributed/citus_ruleutils.h +++ b/src/include/distributed/citus_ruleutils.h @@ -36,6 +36,7 @@ extern void deparse_shard_index_statement(IndexStmt *origStmt, Oid distrelid, int64 shardid, StringInfo buffer); extern char * pg_get_indexclusterdef_string(Oid indexRelationId); extern List * pg_get_table_grants(Oid relationId); +extern bool contain_nextval_expression_walker(Node *node, void *context); /* Function declarations for version dependent PostgreSQL ruleutils functions */ extern void pg_get_query_def(Query *query, StringInfo buffer); diff --git a/src/include/distributed/insert_select_executor.h b/src/include/distributed/insert_select_executor.h new file mode 100644 index 000000000..bbbac7725 --- /dev/null +++ b/src/include/distributed/insert_select_executor.h @@ -0,0 +1,23 @@ +/*------------------------------------------------------------------------- + * + * insert_select_executor.h + * + * Declarations for public functions and types related to executing + * INSERT..SELECT commands. + * + * Copyright (c) 2017, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef INSERT_SELECT_EXECUTOR_H +#define INSERT_SELECT_EXECUTOR_H + + +#include "executor/execdesc.h" + + +extern TupleTableSlot * CoordinatorInsertSelectExecScan(CustomScanState *node); + + +#endif /* INSERT_SELECT_EXECUTOR_H */ diff --git a/src/include/distributed/insert_select_planner.h b/src/include/distributed/insert_select_planner.h new file mode 100644 index 000000000..5fccdacde --- /dev/null +++ b/src/include/distributed/insert_select_planner.h @@ -0,0 +1,29 @@ +/*------------------------------------------------------------------------- + * + * insert_select_planner.h + * + * Declarations for public functions and types related to planning + * INSERT..SELECT commands. + * + * Copyright (c) 2017, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef INSERT_SELECT_PLANNER_H +#define INSERT_SELECT_PLANNER_H + + +#include "postgres.h" + +#include "distributed/multi_physical_planner.h" +#include "nodes/execnodes.h" +#include "nodes/parsenodes.h" +#include "nodes/plannodes.h" + + +extern MultiPlan * CreateCoordinatorInsertSelectPlan(Query *originalQuery); +extern void CoordinatorInsertSelectExplainScan(CustomScanState *node, List *ancestors, + struct ExplainState *es); + +#endif /* INSERT_SELECT_PLANNER_H */ diff --git a/src/include/distributed/multi_copy.h b/src/include/distributed/multi_copy.h index 1b6fcc412..07751d598 100644 --- a/src/include/distributed/multi_copy.h +++ b/src/include/distributed/multi_copy.h @@ -84,6 +84,9 @@ typedef struct CitusCopyDestReceiver /* state on how to copy out data types */ CopyOutState copyOutState; FmgrInfo *columnOutputFunctions; + + /* number of tuples sent */ + int64 tuplesSent; } CitusCopyDestReceiver; diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index e3b53a327..c05a9f820 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -38,6 +38,7 @@ typedef struct CitusScanState extern Node * RealTimeCreateScan(CustomScan *scan); extern Node * TaskTrackerCreateScan(CustomScan *scan); extern Node * RouterCreateScan(CustomScan *scan); +extern Node * CoordinatorInsertSelectCreateScan(CustomScan *scan); extern Node * DelayedErrorCreateScan(CustomScan *scan); extern void CitusSelectBeginScan(CustomScanState *node, EState *estate, int eflags); extern TupleTableSlot * RealTimeExecScan(CustomScanState *node); diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 299ba46f7..44edac1f4 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -221,6 +221,11 @@ typedef struct MultiPlan Query *masterQuery; bool routerExecutable; + /* INSERT ... SELECT via coordinator only */ + Query *insertSelectSubquery; + List *insertTargetList; + Oid targetRelationId; + /* * NULL if this a valid plan, an error description otherwise. This will * e.g. be set if SQL features are present that a planner doesn't support, diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index 54a8abc7b..32f54ce3d 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -46,7 +46,7 @@ extern List * ShardIntervalOpExpressions(ShardInterval *shardInterval, Index rte extern RelationRestrictionContext * CopyRelationRestrictionContext( RelationRestrictionContext *oldContext); -extern bool InsertSelectQuery(Query *query); +extern bool InsertSelectIntoDistributedTable(Query *query); extern Oid ExtractFirstDistributedTableId(Query *query); extern RangeTblEntry * ExtractSelectRangeTableEntry(Query *query); extern RangeTblEntry * ExtractInsertRangeTableEntry(Query *query); diff --git a/src/include/distributed/multi_server_executor.h b/src/include/distributed/multi_server_executor.h index 20421860d..b6997496a 100644 --- a/src/include/distributed/multi_server_executor.h +++ b/src/include/distributed/multi_server_executor.h @@ -94,7 +94,8 @@ typedef enum MULTI_EXECUTOR_INVALID_FIRST = 0, MULTI_EXECUTOR_REAL_TIME = 1, MULTI_EXECUTOR_TASK_TRACKER = 2, - MULTI_EXECUTOR_ROUTER = 3 + MULTI_EXECUTOR_ROUTER = 3, + MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT = 4 } MultiExecutorType; diff --git a/src/test/regress/expected/multi_behavioral_analytics_basics.out b/src/test/regress/expected/multi_behavioral_analytics_basics.out index 80e8b1a96..eb6da2a9c 100644 --- a/src/test/regress/expected/multi_behavioral_analytics_basics.out +++ b/src/test/regress/expected/multi_behavioral_analytics_basics.out @@ -69,9 +69,13 @@ FROM ( ) t2 ON (t1.user_id = t2.user_id) GROUP BY t1.user_id, hasdone_event ) t GROUP BY user_id, hasdone_event; -ERROR: Set operations are not allowed in INSERT ... SELECT queries -- get some statistics from the aggregated results to ensure the results are correct --- SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results; +SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results; + count | count | avg +-------+-------+--------------------- + 8 | 8 | 16.1250000000000000 +(1 row) + ------------------------------------ ------------------------------------ -- Funnel, grouped by the number of times a user has done an event @@ -143,9 +147,13 @@ GROUP BY count_pay, user_id ORDER BY count_pay; -ERROR: Set operations are not allowed in INSERT ... SELECT queries -- get some statistics from the aggregated results to ensure the results are correct --- SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results; +SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results; + count | count | avg +-------+-------+--------------------- + 8 | 8 | 45.0000000000000000 +(1 row) + ------------------------------------ ------------------------------------ -- Most recently seen users_table events_table diff --git a/src/test/regress/expected/multi_behavioral_analytics_single_shard_queries.out b/src/test/regress/expected/multi_behavioral_analytics_single_shard_queries.out index a6a1b1faa..c04e894f5 100644 --- a/src/test/regress/expected/multi_behavioral_analytics_single_shard_queries.out +++ b/src/test/regress/expected/multi_behavioral_analytics_single_shard_queries.out @@ -101,7 +101,6 @@ FROM ( WHERE t1.user_id = 20 GROUP BY t1.user_id, hasdone_event ) t GROUP BY user_id, hasdone_event; -ERROR: Set operations are not allowed in INSERT ... SELECT queries ------------------------------------ ------------------------------------ -- Funnel grouped by whether or not a user has done an event -- two shards query @@ -145,9 +144,13 @@ FROM ( WHERE (t1.user_id = 20 OR t1.user_id = 17) GROUP BY t1.user_id, hasdone_event ) t GROUP BY user_id, hasdone_event; -ERROR: Set operations are not allowed in INSERT ... SELECT queries -- get some statistics from the aggregated results to ensure the results are correct --- SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results_second; +SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results_second; + count | count | avg +-------+-------+--------------------- + 2 | 2 | 18.5000000000000000 +(1 row) + ------------------------------------ ------------------------------------ -- Most recently seen users_table events_table -- single shard query diff --git a/src/test/regress/expected/multi_explain.out b/src/test/regress/expected/multi_explain.out index 322445b7f..126319156 100644 --- a/src/test/regress/expected/multi_explain.out +++ b/src/test/regress/expected/multi_explain.out @@ -985,3 +985,62 @@ Custom Scan (Citus Router) -> Seq Scan on explain_table_570001 explain_table Filter: (id = 1) ROLLBACK; +-- test explain with local INSERT ... SELECT +EXPLAIN (COSTS OFF) +INSERT INTO lineitem_hash_part +SELECT o_orderkey FROM orders_hash_part LIMIT 3; +Custom Scan (Citus INSERT ... SELECT via coordinator) + -> Limit + -> Custom Scan (Citus Real-Time) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Limit + -> Seq Scan on orders_hash_part_360295 orders_hash_part +SELECT true AS valid FROM explain_json($$ + INSERT INTO lineitem_hash_part (l_orderkey) + SELECT o_orderkey FROM orders_hash_part LIMIT 3; +$$); +t +EXPLAIN (COSTS OFF) +INSERT INTO lineitem_hash_part (l_orderkey, l_quantity) +SELECT o_orderkey, 5 FROM orders_hash_part LIMIT 3; +Custom Scan (Citus INSERT ... SELECT via coordinator) + -> Limit + -> Custom Scan (Citus Real-Time) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Limit + -> Seq Scan on orders_hash_part_360295 orders_hash_part +EXPLAIN (COSTS OFF) +INSERT INTO lineitem_hash_part (l_orderkey) +SELECT s FROM generate_series(1,5) s; +Custom Scan (Citus INSERT ... SELECT via coordinator) + -> Function Scan on generate_series s +EXPLAIN (COSTS OFF) +WITH cte1 AS (SELECT s FROM generate_series(1,10) s) +INSERT INTO lineitem_hash_part +WITH cte1 AS (SELECT * FROM cte1 LIMIT 5) +SELECT s FROM cte1; +Custom Scan (Citus INSERT ... SELECT via coordinator) + -> Subquery Scan on citus_insert_select_subquery + CTE cte1 + -> Function Scan on generate_series s + -> CTE Scan on cte1 + CTE cte1 + -> Limit + -> CTE Scan on cte1 cte1_1 +EXPLAIN (COSTS OFF) +INSERT INTO lineitem_hash_part +( SELECT s FROM generate_series(1,5) s) UNION +( SELECT s FROM generate_series(5,10) s); +Custom Scan (Citus INSERT ... SELECT via coordinator) + -> Subquery Scan on citus_insert_select_subquery + -> HashAggregate + Group Key: s.s + -> Append + -> Function Scan on generate_series s + -> Function Scan on generate_series s_1 diff --git a/src/test/regress/expected/multi_explain_0.out b/src/test/regress/expected/multi_explain_0.out index 0f76dc9d0..b0f5db6b5 100644 --- a/src/test/regress/expected/multi_explain_0.out +++ b/src/test/regress/expected/multi_explain_0.out @@ -940,3 +940,62 @@ Custom Scan (Citus Router) -> Seq Scan on explain_table_570001 explain_table Filter: (id = 1) ROLLBACK; +-- test explain with local INSERT ... SELECT +EXPLAIN (COSTS OFF) +INSERT INTO lineitem_hash_part +SELECT o_orderkey FROM orders_hash_part LIMIT 3; +Custom Scan (Citus INSERT ... SELECT via coordinator) + -> Limit + -> Custom Scan (Citus Real-Time) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Limit + -> Seq Scan on orders_hash_part_360295 orders_hash_part +SELECT true AS valid FROM explain_json($$ + INSERT INTO lineitem_hash_part (l_orderkey) + SELECT o_orderkey FROM orders_hash_part LIMIT 3; +$$); +t +EXPLAIN (COSTS OFF) +INSERT INTO lineitem_hash_part (l_orderkey, l_quantity) +SELECT o_orderkey, 5 FROM orders_hash_part LIMIT 3; +Custom Scan (Citus INSERT ... SELECT via coordinator) + -> Limit + -> Custom Scan (Citus Real-Time) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Limit + -> Seq Scan on orders_hash_part_360295 orders_hash_part +EXPLAIN (COSTS OFF) +INSERT INTO lineitem_hash_part (l_orderkey) +SELECT s FROM generate_series(1,5) s; +Custom Scan (Citus INSERT ... SELECT via coordinator) + -> Function Scan on generate_series s +EXPLAIN (COSTS OFF) +WITH cte1 AS (SELECT s FROM generate_series(1,10) s) +INSERT INTO lineitem_hash_part +WITH cte1 AS (SELECT * FROM cte1 LIMIT 5) +SELECT s FROM cte1; +Custom Scan (Citus INSERT ... SELECT via coordinator) + -> Subquery Scan on citus_insert_select_subquery + CTE cte1 + -> Function Scan on generate_series s + -> CTE Scan on cte1 + CTE cte1 + -> Limit + -> CTE Scan on cte1 cte1_1 +EXPLAIN (COSTS OFF) +INSERT INTO lineitem_hash_part +( SELECT s FROM generate_series(1,5) s) UNION +( SELECT s FROM generate_series(5,10) s); +Custom Scan (Citus INSERT ... SELECT via coordinator) + -> Subquery Scan on citus_insert_select_subquery + -> HashAggregate + Group Key: s.s + -> Append + -> Function Scan on generate_series s + -> Function Scan on generate_series s_1 diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index 123d833d1..ebb863fe6 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -141,20 +141,6 @@ WARNING: function public.evaluate_on_master(integer) does not exist WARNING: function public.evaluate_on_master(integer) does not exist ERROR: could not modify any active placements \set VERBOSITY default --- volatile functions should be disallowed -INSERT INTO raw_events_second (user_id, value_1) -SELECT - user_id, (random()*10)::int -FROM - raw_events_first; -ERROR: volatile functions are not allowed in INSERT ... SELECT queries -INSERT INTO raw_events_second (user_id, value_1) -WITH sub_cte AS (SELECT (random()*10)::int) -SELECT - user_id, (SELECT * FROM sub_cte) -FROM - raw_events_first; -ERROR: volatile functions are not allowed in INSERT ... SELECT queries -- add one more row INSERT INTO raw_events_first (user_id, time) VALUES (7, now()); @@ -645,7 +631,10 @@ INSERT INTO agg_events (value_1_agg, user_id) DISTINCT ON (value_1) value_1, user_id FROM raw_events_first; -ERROR: DISTINCT ON clauses are not allowed in INSERT ... SELECT queries +DEBUG: DISTINCT ON clauses are not allowed in distributed INSERT ... SELECT queries +DEBUG: Collecting INSERT ... SELECT results on coordinator +ERROR: could not run distributed query with DISTINCT clause +HINT: Consider using an equality filter on the distributed table's partition column. -- We do not support some CTEs WITH fist_table_agg AS (SELECT sum(value_1) as v1_agg, user_id FROM raw_events_first GROUP BY user_id) @@ -655,8 +644,10 @@ INSERT INTO agg_events v1_agg, user_id FROM fist_table_agg; -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match -DETAIL: The target table's partition column should correspond to a partition column in the subquery. +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +ERROR: could not run distributed query with complex table expressions +HINT: Consider using an equality filter on the distributed table's partition column. -- We don't support CTEs that consist of const values as well INSERT INTO agg_events WITH sub_cte AS (SELECT 1) @@ -664,8 +655,12 @@ INSERT INTO agg_events raw_events_first.user_id, (SELECT * FROM sub_cte) FROM raw_events_first; -ERROR: Subqueries without relations are not allowed in INSERT ... SELECT queries --- We do not support any set operations +DEBUG: Subqueries without relations are not allowed in distributed INSERT ... SELECT queries +DEBUG: Collecting INSERT ... SELECT results on coordinator +ERROR: could not run distributed query with common table expressions +HINT: Consider using an equality filter on the distributed table's partition column. +-- We support set operations via the coordinator +BEGIN; INSERT INTO raw_events_first(user_id) SELECT @@ -673,14 +668,19 @@ SELECT FROM ((SELECT user_id FROM raw_events_first) UNION (SELECT user_id FROM raw_events_second)) as foo; -ERROR: Set operations are not allowed in INSERT ... SELECT queries +DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries +DEBUG: Collecting INSERT ... SELECT results on coordinator +ROLLBACK; -- We do not support any set operations INSERT INTO raw_events_first(user_id) (SELECT user_id FROM raw_events_first) INTERSECT (SELECT user_id FROM raw_events_first); -ERROR: Set operations are not allowed in INSERT ... SELECT queries --- We do not support any set operations +DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries +DEBUG: Collecting INSERT ... SELECT results on coordinator +ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys +DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator. +-- If the query is router plannable then it is executed via the coordinator INSERT INTO raw_events_first(user_id) SELECT @@ -688,7 +688,10 @@ SELECT FROM ((SELECT user_id FROM raw_events_first WHERE user_id = 15) EXCEPT (SELECT user_id FROM raw_events_second where user_id = 17)) as foo; -ERROR: Set operations are not allowed in INSERT ... SELECT queries +DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries +DEBUG: Collecting INSERT ... SELECT results on coordinator +DEBUG: Creating router plan +DEBUG: Plan is router executable -- some supported LEFT joins INSERT INTO agg_events (user_id) SELECT @@ -1027,42 +1030,53 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4, raw_events_second WHERE raw_events_first.user_id = raw_events_second.user_id GROUP BY raw_events_second.value_3) AS foo; -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: The data type of the target table's partition column should exactly match the data type of the corresponding simple column reference in the subquery. +DEBUG: Collecting INSERT ... SELECT results on coordinator +ERROR: cannot push down this subquery +DETAIL: Group by list without partition column is currently unsupported -- error cases -- no part column at all INSERT INTO raw_events_second (value_1) SELECT value_1 FROM raw_events_first; -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: the query doesn't include the target table's partition column +DEBUG: Collecting INSERT ... SELECT results on coordinator +ERROR: the partition column of table public.raw_events_second should have a value INSERT INTO raw_events_second (value_1) SELECT user_id FROM raw_events_first; -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: the query doesn't include the target table's partition column +DEBUG: Collecting INSERT ... SELECT results on coordinator +ERROR: the partition column of table public.raw_events_second should have a value INSERT INTO raw_events_second (user_id) SELECT value_1 FROM raw_events_first; -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match +DEBUG: cannot perform distributed INSERT INTO ... SELECT becuase the partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. +DEBUG: Collecting INSERT ... SELECT results on coordinator +ERROR: the partition column of table public.raw_events_second should have a value INSERT INTO raw_events_second (user_id) SELECT user_id * 2 FROM raw_events_first; -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: Subquery contains an operator in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. +DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO raw_events_second (user_id) SELECT user_id :: bigint FROM raw_events_first; -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: Subquery contains an explicit cast in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. +DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO agg_events (value_3_agg, value_4_agg, @@ -1076,9 +1090,11 @@ SELECT SUM(value_3), Avg(value_2) FROM raw_events_first GROUP BY user_id; -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: Subquery contains an aggregation in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. +DEBUG: Collecting INSERT ... SELECT results on coordinator +ERROR: the partition column of table public.agg_events should have a value INSERT INTO agg_events (value_3_agg, value_4_agg, @@ -1093,16 +1109,21 @@ SELECT SUM(value_3), FROM raw_events_first GROUP BY user_id, value_2; -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match +DEBUG: cannot perform distributed INSERT INTO ... SELECT becuase the partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. +DEBUG: Collecting INSERT ... SELECT results on coordinator +ERROR: the partition column of table public.agg_events should have a value -- tables should be co-located INSERT INTO agg_events (user_id) SELECT user_id FROM reference_table; -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match +DEBUG: cannot perform distributed INSERT INTO ... SELECT becuase the partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. +DEBUG: Collecting INSERT ... SELECT results on coordinator +DEBUG: Creating router plan +DEBUG: Plan is router executable -- unsupported joins between subqueries -- we do not return bare partition column on the inner query INSERT INTO agg_events @@ -1127,9 +1148,12 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4, GROUP BY raw_events_second.value_1 HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2 ON (f.id = f2.id); -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: Subquery contains an expression that is not a simple column reference in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. +DEBUG: Collecting INSERT ... SELECT results on coordinator +ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys +DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator. -- the second part of the query is not routable since -- GROUP BY not on the partition column (i.e., value_1) and thus join -- on f.id = f2.id is not on the partition key (instead on the sum of partition key) @@ -1564,7 +1588,10 @@ SELECT user_id, Sum(value_2) AS sum_val2 FROM raw_events_second GROUP BY grouping sets ( ( user_id ), ( value_1 ), ( user_id, value_1 ), ( ) ); -ERROR: grouping sets are not allowed in INSERT ... SELECT queries +DEBUG: grouping sets are not allowed in distributed INSERT ... SELECT queries +DEBUG: Collecting INSERT ... SELECT results on coordinator +ERROR: could not run distributed query with GROUPING SETS, CUBE, or ROLLUP +HINT: Consider using an equality filter on the distributed table's partition column. -- set back to INFO SET client_min_messages TO INFO; -- avoid constraint violations @@ -1592,7 +1619,8 @@ WHERE user_id ) as f_inner ) ) AS f2); -ERROR: LIMIT clauses are not allowed in INSERT ... SELECT queries +ERROR: cannot push down this subquery +DETAIL: Limit in subquery is currently unsupported -- Altering a table and selecting from it using a multi-shard statement -- in the same transaction is allowed because we will use the same -- connections for all co-located placements. @@ -1646,7 +1674,7 @@ INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) SELECT count(*) FROM raw_events_second; count ------- - 18 + 36 (1 row) INSERT INTO raw_events_second SELECT * FROM test_view; @@ -1656,12 +1684,9 @@ INSERT INTO raw_events_second SELECT * FROM test_view WHERE user_id = 17 GROUP B SELECT count(*) FROM raw_events_second; count ------- - 20 + 38 (1 row) --- inserting into views does not -INSERT INTO test_view SELECT * FROM raw_events_second; -ERROR: cannot insert into view over distributed table -- we need this in our next test truncate raw_events_first; SET client_min_messages TO DEBUG2; @@ -1912,7 +1937,7 @@ FROM table_with_defaults GROUP BY store_id; -ERROR: volatile functions are not allowed in INSERT ... SELECT queries +ERROR: INSERT ... SELECT cannot generate sequence values when selecting from a distributed table -- do some more error/error message checks SET citus.shard_count TO 4; SET citus.shard_replication_factor TO 1; @@ -1937,49 +1962,61 @@ SELECT create_distributed_table('table_with_starts_with_defaults', 'c'); (1 row) +SET client_min_messages TO DEBUG; INSERT INTO text_table (part_col) SELECT CASE WHEN part_col = 'onder' THEN 'marco' END FROM text_table ; -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: Subquery contains a case expression in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. +DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO text_table (part_col) SELECT COALESCE(part_col, 'onder') FROM text_table; -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: Subquery contains a coalesce expression in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. +DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO text_table (part_col) SELECT GREATEST(part_col, 'jason') FROM text_table; -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: Subquery contains a min/max expression in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. +DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO text_table (part_col) SELECT LEAST(part_col, 'andres') FROM text_table; -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: Subquery contains a min/max expression in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. +DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO text_table (part_col) SELECT NULLIF(part_col, 'metin') FROM text_table; -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: Subquery contains an expression that is not a simple column reference in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. +DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO text_table (part_col) SELECT part_col isnull FROM text_table; -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: Subquery contains an expression that is not a simple column reference in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. +DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO text_table (part_col) SELECT part_col::text from char_table; -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: Subquery contains an explicit coercion in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. +DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO text_table (part_col) SELECT (part_col = 'burak') is true FROM text_table; -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: Subquery contains an expression that is not a simple column reference in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. +DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO text_table (part_col) SELECT val FROM text_table; -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: The data type of the target table's partition column should exactly match the data type of the corresponding simple column reference in the subquery. +DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO text_table (part_col) SELECT val::text FROM text_table; -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: Subquery contains an explicit coercion in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. +DEBUG: Collecting INSERT ... SELECT results on coordinator +RESET client_min_messages; insert into table_with_starts_with_defaults (b,c) select b,c FROM table_with_starts_with_defaults; -- Test on partition column without native hash function CREATE TABLE raw_table @@ -2012,10 +2049,380 @@ SELECT * FROM summary_table; 11-11-1980 | 1 (1 row) +-- Test INSERT ... SELECT via coordinator +-- Select from constants +TRUNCATE raw_events_first; +INSERT INTO raw_events_first (user_id, value_1) +SELECT * FROM (VALUES (1,2), (3,4), (5,6)) AS v(int,int); +SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id; + user_id | value_1 +---------+--------- + 1 | 2 + 3 | 4 + 5 | 6 +(3 rows) + +-- Select from local functions +TRUNCATE raw_events_first; +CREATE SEQUENCE insert_select_test_seq; +SET client_min_messages TO DEBUG; +INSERT INTO raw_events_first (user_id, value_1, value_2) +SELECT + s, nextval('insert_select_test_seq'), (random()*10)::int +FROM + generate_series(1, 5) s; +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1; + user_id | value_1 +---------+--------- + 1 | 1 + 2 | 2 + 3 | 3 + 4 | 4 + 5 | 5 +(5 rows) + +-- ON CONFLICT is unsupported +INSERT INTO raw_events_first (user_id, value_1) +SELECT s, nextval('insert_select_test_seq') FROM generate_series(1, 5) s +ON CONFLICT DO NOTHING; +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +ERROR: ON CONFLICT is not supported in INSERT ... SELECT via coordinator +-- RETURNING is unsupported +INSERT INTO raw_events_first (user_id, value_1) +SELECT s, nextval('insert_select_test_seq') FROM generate_series(1, 5) s +RETURNING *; +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +ERROR: RETURNING is not supported in INSERT ... SELECT via coordinator +RESET client_min_messages; +-- INSERT ... SELECT and multi-shard SELECT in the same transaction is unsupported +TRUNCATE raw_events_first; +BEGIN; +INSERT INTO raw_events_first (user_id, value_1) +SELECT s, s FROM generate_series(1, 5) s; +SELECT user_id, value_1 FROM raw_events_first; +ERROR: cannot open new connections after the first modification command within a transaction +ROLLBACK; +-- INSERT ... SELECT and single-shard SELECT in the same transaction is supported +TRUNCATE raw_events_first; +BEGIN; +INSERT INTO raw_events_first (user_id, value_1) +SELECT s, s FROM generate_series(1, 5) s; +SELECT user_id, value_1 FROM raw_events_first WHERE user_id = 1; + user_id | value_1 +---------+--------- + 1 | 1 +(1 row) + +COMMIT; +-- Select from local table +TRUNCATE raw_events_first; +CREATE TEMPORARY TABLE raw_events_first_local AS +SELECT s AS u, 2*s AS v FROM generate_series(1, 5) s; +INSERT INTO raw_events_first (user_id, value_1) +SELECT u, v FROM raw_events_first_local; +SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1; + user_id | value_1 +---------+--------- + 1 | 2 + 2 | 4 + 3 | 6 + 4 | 8 + 5 | 10 +(5 rows) + +-- Use columns in opposite order +TRUNCATE raw_events_first; +INSERT INTO raw_events_first (value_1, user_id) +SELECT u, v FROM raw_events_first_local; +SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1; + user_id | value_1 +---------+--------- + 2 | 1 + 4 | 2 + 6 | 3 + 8 | 4 + 10 | 5 +(5 rows) + +-- Set operations can work with opposite column order +TRUNCATE raw_events_first; +INSERT INTO raw_events_first (value_3, user_id) +( SELECT v, u::bigint FROM raw_events_first_local ) +UNION ALL +( SELECT v, u FROM raw_events_first_local ); +SELECT user_id, value_3 FROM raw_events_first ORDER BY user_id, value_3; + user_id | value_3 +---------+--------- + 1 | 2 + 1 | 2 + 2 | 4 + 2 | 4 + 3 | 6 + 3 | 6 + 4 | 8 + 4 | 8 + 5 | 10 + 5 | 10 +(10 rows) + +-- Select from other distributed table with limit +TRUNCATE raw_events_first; +TRUNCATE raw_events_second; +INSERT INTO raw_events_second (user_id, value_4) +SELECT s, 3*s FROM generate_series (1,5) s; +INSERT INTO raw_events_first (user_id, value_1) +SELECT user_id, value_4 FROM raw_events_second LIMIT 5; +SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1; + user_id | value_1 +---------+--------- + 1 | 3 + 2 | 6 + 3 | 9 + 4 | 12 + 5 | 15 +(5 rows) + +-- CTEs are supported in local queries +TRUNCATE raw_events_first; +WITH removed_rows AS ( + DELETE FROM raw_events_first_local RETURNING u +) +INSERT INTO raw_events_first (user_id, value_1) +WITH value AS (SELECT 1) +SELECT * FROM removed_rows, value; +SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1; + user_id | value_1 +---------+--------- + 1 | 1 + 2 | 1 + 3 | 1 + 4 | 1 + 5 | 1 +(5 rows) + +-- nested CTEs are also supported +TRUNCATE raw_events_first; +INSERT INTO raw_events_first_local SELECT s, 2*s FROM generate_series(0, 10) s; +WITH rows_to_remove AS ( + SELECT u FROM raw_events_first_local WHERE u > 0 +), +removed_rows AS ( + DELETE FROM raw_events_first_local + WHERE u IN (SELECT * FROM rows_to_remove) + RETURNING u, v +) +INSERT INTO raw_events_first (user_id, value_1) +WITH ultra_rows AS ( + WITH numbers AS ( + SELECT s FROM generate_series(1,10) s + ), + super_rows AS ( + SELECT u, v FROM removed_rows JOIN numbers ON (u = s) + ) + SELECT * FROM super_rows LIMIT 5 +) +SELECT u, v FROM ultra_rows; +SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1; + user_id | value_1 +---------+--------- + 1 | 2 + 2 | 4 + 3 | 6 + 4 | 8 + 5 | 10 +(5 rows) + +-- CTEs with duplicate names are also supported +TRUNCATE raw_events_first; +WITH super_rows AS ( + SELECT u FROM raw_events_first_local +) +INSERT INTO raw_events_first (user_id, value_1) +WITH super_rows AS ( + SELECT * FROM super_rows GROUP BY u +) +SELECT u, 5 FROM super_rows; +SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1; + user_id | value_1 +---------+--------- + 0 | 5 +(1 row) + +-- CTEs are supported in router queries +TRUNCATE raw_events_first; +WITH user_two AS ( + SELECT user_id, value_4 FROM raw_events_second WHERE user_id = 2 +) +INSERT INTO raw_events_first (user_id, value_1) +SELECT * FROM user_two; +SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1; + user_id | value_1 +---------+--------- + 2 | 6 +(1 row) + +-- CTEs are supported when there are name collisions +WITH numbers AS ( + SELECT s FROM generate_series(1,10) s +) +INSERT INTO raw_events_first(user_id, value_1) +WITH numbers AS ( + SELECT s, s FROM generate_series(1,5) s +) +SELECT * FROM numbers; +-- Select into distributed table with a sequence +CREATE TABLE "CaseSensitiveTable" ("UserID" int, "Value1" int); +SELECT create_distributed_table('"CaseSensitiveTable"', 'UserID'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO "CaseSensitiveTable" +SELECT s, s FROM generate_series(1,10) s; +SELECT * FROM "CaseSensitiveTable" ORDER BY "UserID"; + UserID | Value1 +--------+-------- + 1 | 1 + 2 | 2 + 3 | 3 + 4 | 4 + 5 | 5 + 6 | 6 + 7 | 7 + 8 | 8 + 9 | 9 + 10 | 10 +(10 rows) + +DROP TABLE "CaseSensitiveTable"; +-- Select into distributed table with a sequence +CREATE TABLE dist_table_with_sequence (user_id serial, value_1 serial); +SELECT create_distributed_table('dist_table_with_sequence', 'user_id'); + create_distributed_table +-------------------------- + +(1 row) + +-- from local query +INSERT INTO dist_table_with_sequence (value_1) +SELECT s FROM generate_series(1,5) s; +SELECT * FROM dist_table_with_sequence ORDER BY user_id; + user_id | value_1 +---------+--------- + 1 | 1 + 2 | 2 + 3 | 3 + 4 | 4 + 5 | 5 +(5 rows) + +-- from a distributed query +INSERT INTO dist_table_with_sequence (value_1) +SELECT value_1 FROM dist_table_with_sequence; +ERROR: INSERT ... SELECT cannot generate sequence values when selecting from a distributed table +SELECT * FROM dist_table_with_sequence ORDER BY user_id; + user_id | value_1 +---------+--------- + 1 | 1 + 2 | 2 + 3 | 3 + 4 | 4 + 5 | 5 +(5 rows) + +-- Select from distributed table into reference table +CREATE TABLE ref_table (user_id int, value_1 int); +SELECT create_reference_table('ref_table'); + create_reference_table +------------------------ + +(1 row) + +INSERT INTO ref_table +SELECT user_id, value_1 FROM raw_events_second; +SELECT * FROM ref_table ORDER BY user_id, value_1; + user_id | value_1 +---------+--------- + 1 | + 2 | + 3 | + 4 | + 5 | +(5 rows) + +DROP TABLE ref_table; +-- Select into an append-partitioned table is not supported +CREATE TABLE insert_append_table (user_id int, value_4 bigint); +SELECT create_distributed_table('insert_append_table', 'user_id', 'append'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO insert_append_table (user_id, value_4) +SELECT user_id, 1 FROM raw_events_second LIMIT 5; +ERROR: INSERT ... SELECT into an append-distributed table is not supported +DROP TABLE insert_append_table; +-- Insert from other distributed table as prepared statement +TRUNCATE raw_events_first; +PREPARE insert_prep(int) AS +INSERT INTO raw_events_first (user_id, value_1) +SELECT $1, value_4 FROM raw_events_second ORDER BY value_4 LIMIT 1; +EXECUTE insert_prep(1); +EXECUTE insert_prep(2); +EXECUTE insert_prep(3); +EXECUTE insert_prep(4); +EXECUTE insert_prep(5); +EXECUTE insert_prep(6); +SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1; + user_id | value_1 +---------+--------- + 1 | 3 + 2 | 3 + 3 | 3 + 4 | 3 + 5 | 3 + 6 | 3 +(6 rows) + +-- Inserting into views is handled via coordinator +TRUNCATE raw_events_first; +INSERT INTO test_view +SELECT * FROM raw_events_second; +SELECT user_id, value_4 FROM test_view ORDER BY user_id, value_4; + user_id | value_4 +---------+--------- + 1 | 3 + 2 | 6 + 3 | 9 + 4 | 12 + 5 | 15 +(5 rows) + +-- Drop the view now, because the column we are about to drop depends on it +DROP VIEW test_view; +-- Make sure we handle dropped columns correctly +TRUNCATE raw_events_first; +ALTER TABLE raw_events_first DROP COLUMN value_1; +INSERT INTO raw_events_first (user_id, value_4) +SELECT value_4, user_id FROM raw_events_second LIMIT 5; +SELECT user_id, value_4 FROM raw_events_first ORDER BY user_id; + user_id | value_4 +---------+--------- + 3 | 1 + 6 | 2 + 9 | 3 + 12 | 4 + 15 | 5 +(5 rows) + +RESET client_min_messages; DROP TABLE raw_table; DROP TABLE summary_table; DROP TABLE raw_events_first CASCADE; -NOTICE: drop cascades to view test_view DROP TABLE raw_events_second; DROP TABLE reference_table; DROP TABLE agg_events; diff --git a/src/test/regress/expected/multi_insert_select_non_pushable_queries.out b/src/test/regress/expected/multi_insert_select_non_pushable_queries.out index ef0ec218b..d99bf6c20 100644 --- a/src/test/regress/expected/multi_insert_select_non_pushable_queries.out +++ b/src/test/regress/expected/multi_insert_select_non_pushable_queries.out @@ -66,7 +66,8 @@ FROM ( ) t2 ON (t1.user_id = t2.user_id) GROUP BY t1.user_id, hasdone_event ) t GROUP BY user_id, hasdone_event; -ERROR: Set operations are not allowed in INSERT ... SELECT queries +ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys +DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator. -- not pushable since the JOIN is not an equi join right part of the UNION -- is not joined on the partition key INSERT INTO agg_results_third (user_id, value_1_agg, value_2_agg ) @@ -107,7 +108,8 @@ FROM ( ) t2 ON (t1.user_id = t2.user_id) GROUP BY t1.user_id, hasdone_event ) t GROUP BY user_id, hasdone_event; -ERROR: Set operations are not allowed in INSERT ... SELECT queries +ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys +DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator. -- the LEFT JOIN conditon is not on the partition column (i.e., is it part_key divided by 2) INSERT INTO agg_results_third (user_id, value_1_agg, value_2_agg ) SELECT user_id, sum(array_length(events_table, 1)), length(hasdone_event) @@ -147,7 +149,8 @@ FROM ( ) t2 ON (t1.user_id = (t2.user_id)/2) GROUP BY t1.user_id, hasdone_event ) t GROUP BY user_id, hasdone_event; -ERROR: Set operations are not allowed in INSERT ... SELECT queries +ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys +DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator. ------------------------------------ ------------------------------------ -- Funnel, grouped by the number of times a user has done an event @@ -220,7 +223,8 @@ GROUP BY count_pay, user_id ORDER BY count_pay; -ERROR: Set operations are not allowed in INSERT ... SELECT queries +ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys +DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator. -- not pushable since the JOIN condition is not equi JOIN -- (subquery_1 JOIN subquery_2) INSERT INTO agg_results_third (user_id, value_1_agg, value_2_agg) @@ -288,7 +292,8 @@ GROUP BY count_pay, user_id ORDER BY count_pay; -ERROR: Set operations are not allowed in INSERT ... SELECT queries +ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys +DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator. ------------------------------------ ------------------------------------ -- Most recently seen users_table events_table @@ -686,4 +691,5 @@ FROM ( GROUP BY user_id ) AS shard_union ORDER BY user_lastseen DESC; -ERROR: Subqueries without relations are not allowed in INSERT ... SELECT queries +ERROR: cannot push down this subquery +DETAIL: Subqueries without relations are unsupported diff --git a/src/test/regress/expected/multi_mx_modifications.out b/src/test/regress/expected/multi_mx_modifications.out index f306fbf6b..35fa1d657 100644 --- a/src/test/regress/expected/multi_mx_modifications.out +++ b/src/test/regress/expected/multi_mx_modifications.out @@ -108,10 +108,6 @@ DELETE FROM limit_orders_mx WHERE id = 246 AND placed_at = current_timestamp::ti INSERT INTO limit_orders_mx VALUES (DEFAULT), (DEFAULT); ERROR: cannot perform distributed planning for the given modification DETAIL: Multi-row INSERTs to distributed tables are not supported. --- INSERT ... SELECT ... FROM commands are unsupported from workers -INSERT INTO limit_orders_mx SELECT * FROM limit_orders_mx; -ERROR: operation is not allowed on this node -HINT: Connect to the coordinator and run it again. -- connect back to the other node \c - - - :worker_1_port -- commands containing a CTE are unsupported @@ -433,6 +429,21 @@ SELECT * FROM multiple_hash_mx WHERE category = '2' ORDER BY category, data; ----------+------ (0 rows) +--- INSERT ... SELECT ... FROM commands are supported from workers +INSERT INTO multiple_hash_mx +SELECT s, s*2 FROM generate_series(1,10) s; +INSERT 0 10 +-- but are never distributed +BEGIN; +BEGIN +SET LOCAL client_min_messages TO DEBUG1; +SET +INSERT INTO multiple_hash_mx SELECT * FROM multiple_hash_mx; +DEBUG: distributed INSERT ... SELECT can only be performed from the coordinator +DEBUG: Collecting INSERT ... SELECT results on coordinator +INSERT 0 10 +END; +COMMIT -- verify interaction of default values, SERIAL, and RETURNING \set QUIET on INSERT INTO app_analytics_events_mx VALUES (DEFAULT, 101, 'Fauxkemon Geaux') RETURNING id; diff --git a/src/test/regress/expected/multi_reference_table.out b/src/test/regress/expected/multi_reference_table.out index dc1b94891..3a4c4a8e4 100644 --- a/src/test/regress/expected/multi_reference_table.out +++ b/src/test/regress/expected/multi_reference_table.out @@ -1091,12 +1091,11 @@ LOG: join order: [ "colocated_table_test" ][ broadcast join "reference_table_te 2 (2 rows) -SET client_min_messages TO NOTICE; SET citus.log_multi_join_order TO FALSE; SET citus.shard_count TO DEFAULT; SET citus.task_executor_type to "real-time"; -- some INSERT .. SELECT queries that involve both hash distributed and reference tables --- should error out since we're inserting into reference table where +-- should go via coordinator since we're inserting into reference table where -- not all the participants are reference tables INSERT INTO reference_table_test (value_1) @@ -1105,9 +1104,10 @@ SELECT FROM colocated_table_test, colocated_table_test_2 WHERE - colocated_table_test.value_1 = colocated_table_test.value_1; -ERROR: only reference tables may be queried when targeting a reference table with INSERT ... SELECT --- should error out, same as the above + colocated_table_test.value_1 = colocated_table_test_2.value_1; +DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT +DEBUG: Collecting INSERT ... SELECT results on coordinator +-- should go via coordinator, same as the above INSERT INTO reference_table_test (value_1) SELECT @@ -1116,7 +1116,8 @@ FROM colocated_table_test, reference_table_test WHERE colocated_table_test.value_1 = reference_table_test.value_1; -ERROR: only reference tables may be queried when targeting a reference table with INSERT ... SELECT +DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT +DEBUG: Collecting INSERT ... SELECT results on coordinator -- now, insert into the hash partitioned table and use reference -- tables in the SELECT queries INSERT INTO @@ -1150,8 +1151,7 @@ RETURNING value_1, value_2; 2 | 2 (2 rows) --- partition column value comes from reference table but still first error is --- on data type mismatch +-- partition column value comes from reference table, goes via coordinator INSERT INTO colocated_table_test (value_1, value_2) SELECT @@ -1159,11 +1159,10 @@ SELECT FROM colocated_table_test_2, reference_table_test WHERE - colocated_table_test_2.value_4 = reference_table_test.value_4 -RETURNING value_1, value_2; -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match + colocated_table_test_2.value_4 = reference_table_test.value_4; +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: The data type of the target table's partition column should exactly match the data type of the corresponding simple column reference in the subquery. --- partition column value comes from reference table which should error out +DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO colocated_table_test (value_1, value_2) SELECT @@ -1171,10 +1170,11 @@ SELECT FROM colocated_table_test_2, reference_table_test WHERE - colocated_table_test_2.value_4 = reference_table_test.value_4 -RETURNING value_1, value_2; -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match + colocated_table_test_2.value_4 = reference_table_test.value_4; +DEBUG: cannot perform distributed INSERT INTO ... SELECT becuase the partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. +DEBUG: Collecting INSERT ... SELECT results on coordinator +RESET client_min_messages; -- some tests for mark_tables_colocated -- should error out SELECT mark_tables_colocated('colocated_table_test_2', ARRAY['reference_table_test']); diff --git a/src/test/regress/expected/multi_unsupported_worker_operations.out b/src/test/regress/expected/multi_unsupported_worker_operations.out index 437d1d300..a8101b0d5 100644 --- a/src/test/regress/expected/multi_unsupported_worker_operations.out +++ b/src/test/regress/expected/multi_unsupported_worker_operations.out @@ -266,14 +266,17 @@ SELECT count(*) FROM mx_table; 5 (1 row) --- INSERT / SELECT +-- INSERT / SELECT pulls results to worker +BEGIN; +SET LOCAL client_min_messages TO DEBUG; INSERT INTO mx_table_2 SELECT * FROM mx_table; -ERROR: operation is not allowed on this node -HINT: Connect to the coordinator and run it again. +DEBUG: distributed INSERT ... SELECT can only be performed from the coordinator +DEBUG: Collecting INSERT ... SELECT results on coordinator +END; SELECT count(*) FROM mx_table_2; count ------- - 0 + 5 (1 row) -- mark_tables_colocated diff --git a/src/test/regress/expected/multi_view.out b/src/test/regress/expected/multi_view.out index eccad4b00..a1a5822dc 100644 --- a/src/test/regress/expected/multi_view.out +++ b/src/test/regress/expected/multi_view.out @@ -197,9 +197,6 @@ SELECT count(*) FROM temp_lineitem; 1706 (1 row) --- modifying views is disallowed -INSERT INTO air_shipped_lineitems SELECT * from temp_lineitem; -ERROR: cannot insert into view over distributed table SET citus.task_executor_type to "task-tracker"; -- single view repartition subqueries are not supported SELECT l_suppkey, count(*) FROM diff --git a/src/test/regress/sql/multi_behavioral_analytics_basics.sql b/src/test/regress/sql/multi_behavioral_analytics_basics.sql index b8ad4baa7..720d55fe2 100644 --- a/src/test/regress/sql/multi_behavioral_analytics_basics.sql +++ b/src/test/regress/sql/multi_behavioral_analytics_basics.sql @@ -71,7 +71,7 @@ FROM ( ) t GROUP BY user_id, hasdone_event; -- get some statistics from the aggregated results to ensure the results are correct --- SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results; +SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results; ------------------------------------ ------------------------------------ @@ -148,7 +148,7 @@ ORDER BY count_pay; -- get some statistics from the aggregated results to ensure the results are correct --- SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results; +SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results; ------------------------------------ ------------------------------------ @@ -417,4 +417,4 @@ FROM -- get some statistics from the aggregated results to ensure the results are correct SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results; - \ No newline at end of file + diff --git a/src/test/regress/sql/multi_behavioral_analytics_single_shard_queries.sql b/src/test/regress/sql/multi_behavioral_analytics_single_shard_queries.sql index ede431ee5..54e63b722 100644 --- a/src/test/regress/sql/multi_behavioral_analytics_single_shard_queries.sql +++ b/src/test/regress/sql/multi_behavioral_analytics_single_shard_queries.sql @@ -146,7 +146,7 @@ FROM ( ) t GROUP BY user_id, hasdone_event; -- get some statistics from the aggregated results to ensure the results are correct --- SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results_second; +SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results_second; ------------------------------------ @@ -398,4 +398,4 @@ INSERT INTO agg_results_second(user_id, value_2_agg) -- get some statistics from the aggregated results to ensure the results are correct SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results_second; - \ No newline at end of file + diff --git a/src/test/regress/sql/multi_explain.sql b/src/test/regress/sql/multi_explain.sql index e5d32c922..b6099e389 100644 --- a/src/test/regress/sql/multi_explain.sql +++ b/src/test/regress/sql/multi_explain.sql @@ -465,3 +465,32 @@ ALTER TABLE explain_table ADD COLUMN value int; EXPLAIN (COSTS FALSE) SELECT value FROM explain_table WHERE id = 1; ROLLBACK; + +-- test explain with local INSERT ... SELECT +EXPLAIN (COSTS OFF) +INSERT INTO lineitem_hash_part +SELECT o_orderkey FROM orders_hash_part LIMIT 3; + +SELECT true AS valid FROM explain_json($$ + INSERT INTO lineitem_hash_part (l_orderkey) + SELECT o_orderkey FROM orders_hash_part LIMIT 3; +$$); + +EXPLAIN (COSTS OFF) +INSERT INTO lineitem_hash_part (l_orderkey, l_quantity) +SELECT o_orderkey, 5 FROM orders_hash_part LIMIT 3; + +EXPLAIN (COSTS OFF) +INSERT INTO lineitem_hash_part (l_orderkey) +SELECT s FROM generate_series(1,5) s; + +EXPLAIN (COSTS OFF) +WITH cte1 AS (SELECT s FROM generate_series(1,10) s) +INSERT INTO lineitem_hash_part +WITH cte1 AS (SELECT * FROM cte1 LIMIT 5) +SELECT s FROM cte1; + +EXPLAIN (COSTS OFF) +INSERT INTO lineitem_hash_part +( SELECT s FROM generate_series(1,5) s) UNION +( SELECT s FROM generate_series(5,10) s); diff --git a/src/test/regress/sql/multi_insert_select.sql b/src/test/regress/sql/multi_insert_select.sql index 53c510f4e..7d49b6e2b 100644 --- a/src/test/regress/sql/multi_insert_select.sql +++ b/src/test/regress/sql/multi_insert_select.sql @@ -116,21 +116,6 @@ WHERE \set VERBOSITY default --- volatile functions should be disallowed -INSERT INTO raw_events_second (user_id, value_1) -SELECT - user_id, (random()*10)::int -FROM - raw_events_first; - -INSERT INTO raw_events_second (user_id, value_1) -WITH sub_cte AS (SELECT (random()*10)::int) -SELECT - user_id, (SELECT * FROM sub_cte) -FROM - raw_events_first; - - -- add one more row INSERT INTO raw_events_first (user_id, time) VALUES (7, now()); @@ -517,7 +502,9 @@ INSERT INTO agg_events FROM raw_events_first; --- We do not support any set operations +-- We support set operations via the coordinator +BEGIN; + INSERT INTO raw_events_first(user_id) SELECT @@ -526,13 +513,15 @@ FROM ((SELECT user_id FROM raw_events_first) UNION (SELECT user_id FROM raw_events_second)) as foo; +ROLLBACK; + -- We do not support any set operations INSERT INTO raw_events_first(user_id) (SELECT user_id FROM raw_events_first) INTERSECT (SELECT user_id FROM raw_events_first); --- We do not support any set operations +-- If the query is router plannable then it is executed via the coordinator INSERT INTO raw_events_first(user_id) SELECT @@ -1389,9 +1378,6 @@ INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) INSERT INTO raw_events_second SELECT * FROM test_view WHERE user_id = 17 GROUP BY 1,2,3,4,5,6; SELECT count(*) FROM raw_events_second; --- inserting into views does not -INSERT INTO test_view SELECT * FROM raw_events_second; - -- we need this in our next test truncate raw_events_first; @@ -1611,6 +1597,8 @@ SELECT create_distributed_table('text_table', 'part_col'); SELECT create_distributed_table('char_table','part_col'); SELECT create_distributed_table('table_with_starts_with_defaults', 'c'); +SET client_min_messages TO DEBUG; + INSERT INTO text_table (part_col) SELECT CASE WHEN part_col = 'onder' THEN 'marco' @@ -1628,6 +1616,9 @@ INSERT INTO text_table (part_col) SELECT part_col::text from char_table; INSERT INTO text_table (part_col) SELECT (part_col = 'burak') is true FROM text_table; INSERT INTO text_table (part_col) SELECT val FROM text_table; INSERT INTO text_table (part_col) SELECT val::text FROM text_table; + +RESET client_min_messages; + insert into table_with_starts_with_defaults (b,c) select b,c FROM table_with_starts_with_defaults; -- Test on partition column without native hash function @@ -1651,6 +1642,261 @@ INSERT INTO summary_table SELECT time, COUNT(*) FROM raw_table GROUP BY time; SELECT * FROM summary_table; +-- Test INSERT ... SELECT via coordinator + +-- Select from constants +TRUNCATE raw_events_first; + +INSERT INTO raw_events_first (user_id, value_1) +SELECT * FROM (VALUES (1,2), (3,4), (5,6)) AS v(int,int); + +SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id; + +-- Select from local functions +TRUNCATE raw_events_first; + +CREATE SEQUENCE insert_select_test_seq; + +SET client_min_messages TO DEBUG; + +INSERT INTO raw_events_first (user_id, value_1, value_2) +SELECT + s, nextval('insert_select_test_seq'), (random()*10)::int +FROM + generate_series(1, 5) s; + +SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1; + +-- ON CONFLICT is unsupported +INSERT INTO raw_events_first (user_id, value_1) +SELECT s, nextval('insert_select_test_seq') FROM generate_series(1, 5) s +ON CONFLICT DO NOTHING; + +-- RETURNING is unsupported +INSERT INTO raw_events_first (user_id, value_1) +SELECT s, nextval('insert_select_test_seq') FROM generate_series(1, 5) s +RETURNING *; + +RESET client_min_messages; + +-- INSERT ... SELECT and multi-shard SELECT in the same transaction is unsupported +TRUNCATE raw_events_first; + +BEGIN; +INSERT INTO raw_events_first (user_id, value_1) +SELECT s, s FROM generate_series(1, 5) s; +SELECT user_id, value_1 FROM raw_events_first; +ROLLBACK; + +-- INSERT ... SELECT and single-shard SELECT in the same transaction is supported +TRUNCATE raw_events_first; + +BEGIN; +INSERT INTO raw_events_first (user_id, value_1) +SELECT s, s FROM generate_series(1, 5) s; +SELECT user_id, value_1 FROM raw_events_first WHERE user_id = 1; +COMMIT; + +-- Select from local table +TRUNCATE raw_events_first; + +CREATE TEMPORARY TABLE raw_events_first_local AS +SELECT s AS u, 2*s AS v FROM generate_series(1, 5) s; + +INSERT INTO raw_events_first (user_id, value_1) +SELECT u, v FROM raw_events_first_local; + +SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1; + +-- Use columns in opposite order +TRUNCATE raw_events_first; + +INSERT INTO raw_events_first (value_1, user_id) +SELECT u, v FROM raw_events_first_local; + +SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1; + +-- Set operations can work with opposite column order +TRUNCATE raw_events_first; + +INSERT INTO raw_events_first (value_3, user_id) +( SELECT v, u::bigint FROM raw_events_first_local ) +UNION ALL +( SELECT v, u FROM raw_events_first_local ); + +SELECT user_id, value_3 FROM raw_events_first ORDER BY user_id, value_3; + +-- Select from other distributed table with limit +TRUNCATE raw_events_first; +TRUNCATE raw_events_second; + +INSERT INTO raw_events_second (user_id, value_4) +SELECT s, 3*s FROM generate_series (1,5) s; + +INSERT INTO raw_events_first (user_id, value_1) +SELECT user_id, value_4 FROM raw_events_second LIMIT 5; + +SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1; + +-- CTEs are supported in local queries +TRUNCATE raw_events_first; + +WITH removed_rows AS ( + DELETE FROM raw_events_first_local RETURNING u +) +INSERT INTO raw_events_first (user_id, value_1) +WITH value AS (SELECT 1) +SELECT * FROM removed_rows, value; + +SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1; + +-- nested CTEs are also supported +TRUNCATE raw_events_first; +INSERT INTO raw_events_first_local SELECT s, 2*s FROM generate_series(0, 10) s; + +WITH rows_to_remove AS ( + SELECT u FROM raw_events_first_local WHERE u > 0 +), +removed_rows AS ( + DELETE FROM raw_events_first_local + WHERE u IN (SELECT * FROM rows_to_remove) + RETURNING u, v +) +INSERT INTO raw_events_first (user_id, value_1) +WITH ultra_rows AS ( + WITH numbers AS ( + SELECT s FROM generate_series(1,10) s + ), + super_rows AS ( + SELECT u, v FROM removed_rows JOIN numbers ON (u = s) + ) + SELECT * FROM super_rows LIMIT 5 +) +SELECT u, v FROM ultra_rows; + +SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1; + +-- CTEs with duplicate names are also supported +TRUNCATE raw_events_first; + +WITH super_rows AS ( + SELECT u FROM raw_events_first_local +) +INSERT INTO raw_events_first (user_id, value_1) +WITH super_rows AS ( + SELECT * FROM super_rows GROUP BY u +) +SELECT u, 5 FROM super_rows; + +SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1; + +-- CTEs are supported in router queries +TRUNCATE raw_events_first; + +WITH user_two AS ( + SELECT user_id, value_4 FROM raw_events_second WHERE user_id = 2 +) +INSERT INTO raw_events_first (user_id, value_1) +SELECT * FROM user_two; + +SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1; + +-- CTEs are supported when there are name collisions +WITH numbers AS ( + SELECT s FROM generate_series(1,10) s +) +INSERT INTO raw_events_first(user_id, value_1) +WITH numbers AS ( + SELECT s, s FROM generate_series(1,5) s +) +SELECT * FROM numbers; + +-- Select into distributed table with a sequence +CREATE TABLE "CaseSensitiveTable" ("UserID" int, "Value1" int); +SELECT create_distributed_table('"CaseSensitiveTable"', 'UserID'); + +INSERT INTO "CaseSensitiveTable" +SELECT s, s FROM generate_series(1,10) s; + +SELECT * FROM "CaseSensitiveTable" ORDER BY "UserID"; + +DROP TABLE "CaseSensitiveTable"; + +-- Select into distributed table with a sequence +CREATE TABLE dist_table_with_sequence (user_id serial, value_1 serial); +SELECT create_distributed_table('dist_table_with_sequence', 'user_id'); + +-- from local query +INSERT INTO dist_table_with_sequence (value_1) +SELECT s FROM generate_series(1,5) s; + +SELECT * FROM dist_table_with_sequence ORDER BY user_id; + +-- from a distributed query +INSERT INTO dist_table_with_sequence (value_1) +SELECT value_1 FROM dist_table_with_sequence; + +SELECT * FROM dist_table_with_sequence ORDER BY user_id; + +-- Select from distributed table into reference table +CREATE TABLE ref_table (user_id int, value_1 int); +SELECT create_reference_table('ref_table'); + +INSERT INTO ref_table +SELECT user_id, value_1 FROM raw_events_second; + +SELECT * FROM ref_table ORDER BY user_id, value_1; + +DROP TABLE ref_table; + +-- Select into an append-partitioned table is not supported +CREATE TABLE insert_append_table (user_id int, value_4 bigint); +SELECT create_distributed_table('insert_append_table', 'user_id', 'append'); + +INSERT INTO insert_append_table (user_id, value_4) +SELECT user_id, 1 FROM raw_events_second LIMIT 5; + +DROP TABLE insert_append_table; + +-- Insert from other distributed table as prepared statement +TRUNCATE raw_events_first; + +PREPARE insert_prep(int) AS +INSERT INTO raw_events_first (user_id, value_1) +SELECT $1, value_4 FROM raw_events_second ORDER BY value_4 LIMIT 1; + +EXECUTE insert_prep(1); +EXECUTE insert_prep(2); +EXECUTE insert_prep(3); +EXECUTE insert_prep(4); +EXECUTE insert_prep(5); +EXECUTE insert_prep(6); + +SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1; + +-- Inserting into views is handled via coordinator +TRUNCATE raw_events_first; + +INSERT INTO test_view +SELECT * FROM raw_events_second; + +SELECT user_id, value_4 FROM test_view ORDER BY user_id, value_4; + +-- Drop the view now, because the column we are about to drop depends on it +DROP VIEW test_view; + +-- Make sure we handle dropped columns correctly +TRUNCATE raw_events_first; + +ALTER TABLE raw_events_first DROP COLUMN value_1; + +INSERT INTO raw_events_first (user_id, value_4) +SELECT value_4, user_id FROM raw_events_second LIMIT 5; + +SELECT user_id, value_4 FROM raw_events_first ORDER BY user_id; + +RESET client_min_messages; + DROP TABLE raw_table; DROP TABLE summary_table; DROP TABLE raw_events_first CASCADE; diff --git a/src/test/regress/sql/multi_mx_modifications.sql b/src/test/regress/sql/multi_mx_modifications.sql index 334d174cb..2c9509407 100644 --- a/src/test/regress/sql/multi_mx_modifications.sql +++ b/src/test/regress/sql/multi_mx_modifications.sql @@ -79,9 +79,6 @@ DELETE FROM limit_orders_mx WHERE id = 246 AND placed_at = current_timestamp::ti -- commands with multiple rows are unsupported INSERT INTO limit_orders_mx VALUES (DEFAULT), (DEFAULT); --- INSERT ... SELECT ... FROM commands are unsupported from workers -INSERT INTO limit_orders_mx SELECT * FROM limit_orders_mx; - -- connect back to the other node \c - - - :worker_1_port @@ -273,6 +270,16 @@ DELETE FROM multiple_hash_mx WHERE category = '1' RETURNING category; SELECT * FROM multiple_hash_mx WHERE category = '1' ORDER BY category, data; SELECT * FROM multiple_hash_mx WHERE category = '2' ORDER BY category, data; +--- INSERT ... SELECT ... FROM commands are supported from workers +INSERT INTO multiple_hash_mx +SELECT s, s*2 FROM generate_series(1,10) s; + +-- but are never distributed +BEGIN; +SET LOCAL client_min_messages TO DEBUG1; +INSERT INTO multiple_hash_mx SELECT * FROM multiple_hash_mx; +END; + -- verify interaction of default values, SERIAL, and RETURNING \set QUIET on diff --git a/src/test/regress/sql/multi_reference_table.sql b/src/test/regress/sql/multi_reference_table.sql index 46dcaa3bf..85f656961 100644 --- a/src/test/regress/sql/multi_reference_table.sql +++ b/src/test/regress/sql/multi_reference_table.sql @@ -675,7 +675,6 @@ WHERE colocated_table_test.value_1 = reference_table_test.value_1 AND colocated_table_test_2.value_1 = reference_table_test.value_1; -SET client_min_messages TO NOTICE; SET citus.log_multi_join_order TO FALSE; SET citus.shard_count TO DEFAULT; @@ -683,7 +682,7 @@ SET citus.task_executor_type to "real-time"; -- some INSERT .. SELECT queries that involve both hash distributed and reference tables --- should error out since we're inserting into reference table where +-- should go via coordinator since we're inserting into reference table where -- not all the participants are reference tables INSERT INTO reference_table_test (value_1) @@ -692,9 +691,9 @@ SELECT FROM colocated_table_test, colocated_table_test_2 WHERE - colocated_table_test.value_1 = colocated_table_test.value_1; + colocated_table_test.value_1 = colocated_table_test_2.value_1; --- should error out, same as the above +-- should go via coordinator, same as the above INSERT INTO reference_table_test (value_1) SELECT @@ -727,8 +726,7 @@ WHERE colocated_table_test_2.value_2 = reference_table_test.value_2 RETURNING value_1, value_2; --- partition column value comes from reference table but still first error is --- on data type mismatch +-- partition column value comes from reference table, goes via coordinator INSERT INTO colocated_table_test (value_1, value_2) SELECT @@ -736,10 +734,8 @@ SELECT FROM colocated_table_test_2, reference_table_test WHERE - colocated_table_test_2.value_4 = reference_table_test.value_4 -RETURNING value_1, value_2; + colocated_table_test_2.value_4 = reference_table_test.value_4; --- partition column value comes from reference table which should error out INSERT INTO colocated_table_test (value_1, value_2) SELECT @@ -747,9 +743,9 @@ SELECT FROM colocated_table_test_2, reference_table_test WHERE - colocated_table_test_2.value_4 = reference_table_test.value_4 -RETURNING value_1, value_2; + colocated_table_test_2.value_4 = reference_table_test.value_4; +RESET client_min_messages; -- some tests for mark_tables_colocated -- should error out diff --git a/src/test/regress/sql/multi_unsupported_worker_operations.sql b/src/test/regress/sql/multi_unsupported_worker_operations.sql index 89789de18..c98b233cd 100644 --- a/src/test/regress/sql/multi_unsupported_worker_operations.sql +++ b/src/test/regress/sql/multi_unsupported_worker_operations.sql @@ -137,8 +137,12 @@ SELECT master_remove_node('localhost', 5432); TRUNCATE mx_table; SELECT count(*) FROM mx_table; --- INSERT / SELECT +-- INSERT / SELECT pulls results to worker +BEGIN; +SET LOCAL client_min_messages TO DEBUG; INSERT INTO mx_table_2 SELECT * FROM mx_table; +END; + SELECT count(*) FROM mx_table_2; -- mark_tables_colocated diff --git a/src/test/regress/sql/multi_view.sql b/src/test/regress/sql/multi_view.sql index c80977fb2..05dab5fd8 100644 --- a/src/test/regress/sql/multi_view.sql +++ b/src/test/regress/sql/multi_view.sql @@ -93,9 +93,6 @@ SELECT count(*) FROM temp_lineitem; INSERT INTO temp_lineitem SELECT * FROM air_shipped_lineitems WHERE l_shipmode = 'MAIL'; SELECT count(*) FROM temp_lineitem; --- modifying views is disallowed -INSERT INTO air_shipped_lineitems SELECT * from temp_lineitem; - SET citus.task_executor_type to "task-tracker"; -- single view repartition subqueries are not supported