From 2f8ac826607842ddbe5b86f471d568da16f38f3a Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Mon, 8 May 2017 11:35:15 +0200 Subject: [PATCH] Execute INSERT..SELECT via coordinator if it cannot be pushed down Add a second implementation of INSERT INTO distributed_table SELECT ... that is used if the query cannot be pushed down. The basic idea is to execute the SELECT query separately and pass the results into the distributed table using a CopyDestReceiver, which is also used for COPY and create_distributed_table. When planning the SELECT, we go through planner hooks again, which means the SELECT can also be a distributed query. EXPLAIN is supported, but EXPLAIN ANALYZE is not because preventing double execution was a lot more complicated in this case. --- src/backend/distributed/commands/multi_copy.c | 2 + .../executor/insert_select_executor.c | 157 ++++++ .../distributed/executor/multi_executor.c | 32 ++ .../executor/multi_server_executor.c | 22 +- .../planner/insert_select_planner.c | 254 +++++++++ .../distributed/planner/multi_explain.c | 75 +++ .../planner/multi_logical_planner.c | 10 + .../distributed/planner/multi_planner.c | 21 +- .../planner/multi_router_planner.c | 107 ++-- .../distributed/test/deparse_shard_query.c | 2 +- src/backend/distributed/utils/citus_clauses.c | 2 +- .../distributed/utils/citus_outfuncs.c | 5 + .../distributed/utils/citus_readfuncs.c | 5 + .../distributed/utils/citus_ruleutils.c | 4 +- src/include/distributed/citus_ruleutils.h | 1 + .../distributed/insert_select_executor.h | 23 + .../distributed/insert_select_planner.h | 29 + src/include/distributed/multi_copy.h | 3 + src/include/distributed/multi_executor.h | 1 + .../distributed/multi_physical_planner.h | 5 + .../distributed/multi_router_planner.h | 2 +- .../distributed/multi_server_executor.h | 3 +- .../multi_behavioral_analytics_basics.out | 16 +- ...avioral_analytics_single_shard_queries.out | 9 +- src/test/regress/expected/multi_explain.out | 59 ++ src/test/regress/expected/multi_explain_0.out | 59 ++ .../regress/expected/multi_insert_select.out | 511 ++++++++++++++++-- ...lti_insert_select_non_pushable_queries.out | 18 +- .../expected/multi_mx_modifications.out | 19 +- .../expected/multi_reference_table.out | 30 +- .../multi_unsupported_worker_operations.out | 11 +- src/test/regress/expected/multi_view.out | 3 - .../sql/multi_behavioral_analytics_basics.sql | 6 +- ...avioral_analytics_single_shard_queries.sql | 4 +- src/test/regress/sql/multi_explain.sql | 29 + src/test/regress/sql/multi_insert_select.sql | 286 +++++++++- .../regress/sql/multi_mx_modifications.sql | 13 +- .../regress/sql/multi_reference_table.sql | 18 +- .../multi_unsupported_worker_operations.sql | 6 +- src/test/regress/sql/multi_view.sql | 3 - 40 files changed, 1679 insertions(+), 186 deletions(-) create mode 100644 src/backend/distributed/executor/insert_select_executor.c create mode 100644 src/backend/distributed/planner/insert_select_planner.c create mode 100644 src/include/distributed/insert_select_executor.h create mode 100644 src/include/distributed/insert_select_planner.h diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index c631715d0..c6f01ae0b 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -1927,6 +1927,8 @@ CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) MemoryContextSwitchTo(oldContext); + copyDest->tuplesSent++; + #if PG_VERSION_NUM >= 90600 return true; #endif diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c new file mode 100644 index 000000000..e6e144930 --- /dev/null +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -0,0 +1,157 @@ +/*------------------------------------------------------------------------- + * + * insert_select_executor.c + * + * Executor logic for INSERT..SELECT. + * + * Copyright (c) 2017, Citus Data, Inc. + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "distributed/insert_select_executor.h" +#include "distributed/insert_select_planner.h" +#include "distributed/multi_copy.h" +#include "distributed/multi_executor.h" +#include "distributed/multi_physical_planner.h" +#include "distributed/multi_planner.h" +#include "distributed/transaction_management.h" +#include "executor/executor.h" +#include "nodes/execnodes.h" +#include "nodes/makefuncs.h" +#include "nodes/nodeFuncs.h" +#include "nodes/parsenodes.h" +#include "nodes/plannodes.h" +#include "parser/parse_coerce.h" +#include "parser/parsetree.h" +#include "tcop/pquery.h" +#include "tcop/tcopprot.h" +#include "utils/lsyscache.h" +#include "utils/portal.h" +#include "utils/snapmgr.h" + + +static void ExecuteSelectIntoRelation(Oid targetRelationId, List *insertTargetList, + Query *selectQuery, EState *executorState); +static void ExecuteIntoDestReceiver(Query *query, ParamListInfo params, + DestReceiver *dest); + + +/* + * CoordinatorInsertSelectExecScan executes an INSERT INTO distributed_table + * SELECT .. query by setting up a DestReceiver that copies tuples into the + * distributed table and then executing the SELECT query using that DestReceiver + * as the tuple destination. + */ +TupleTableSlot * +CoordinatorInsertSelectExecScan(CustomScanState *node) +{ + CitusScanState *scanState = (CitusScanState *) node; + TupleTableSlot *resultSlot = NULL; + + if (!scanState->finishedRemoteScan) + { + EState *executorState = scanState->customScanState.ss.ps.state; + MultiPlan *multiPlan = scanState->multiPlan; + Query *selectQuery = multiPlan->insertSelectSubquery; + List *insertTargetList = multiPlan->insertTargetList; + Oid targetRelationId = multiPlan->targetRelationId; + + ereport(DEBUG1, (errmsg("Collecting INSERT ... SELECT results on coordinator"))); + + ExecuteSelectIntoRelation(targetRelationId, insertTargetList, selectQuery, + executorState); + + scanState->finishedRemoteScan = true; + } + + resultSlot = ReturnTupleFromTuplestore(scanState); + + return resultSlot; +} + + +/* + * ExecuteSelectIntoRelation executes given SELECT query and inserts the + * results into the target relation, which is assumed to be a distributed + * table. + */ +static void +ExecuteSelectIntoRelation(Oid targetRelationId, List *insertTargetList, + Query *selectQuery, EState *executorState) +{ + ParamListInfo paramListInfo = executorState->es_param_list_info; + + ListCell *insertTargetCell = NULL; + List *columnNameList = NIL; + bool stopOnFailure = false; + char partitionMethod = 0; + + CitusCopyDestReceiver *copyDest = NULL; + + BeginOrContinueCoordinatedTransaction(); + + partitionMethod = PartitionMethod(targetRelationId); + if (partitionMethod == DISTRIBUTE_BY_NONE) + { + stopOnFailure = true; + } + + /* build the list of column names for the COPY statement */ + foreach(insertTargetCell, insertTargetList) + { + TargetEntry *insertTargetEntry = (TargetEntry *) lfirst(insertTargetCell); + + columnNameList = lappend(columnNameList, insertTargetEntry->resname); + } + + /* set up a DestReceiver that copies into the distributed table */ + copyDest = CreateCitusCopyDestReceiver(targetRelationId, columnNameList, + executorState, stopOnFailure); + + ExecuteIntoDestReceiver(selectQuery, paramListInfo, (DestReceiver *) copyDest); + + executorState->es_processed = copyDest->tuplesSent; + + XactModificationLevel = XACT_MODIFICATION_DATA; +} + + +/* + * ExecuteIntoDestReceiver plans and executes a query and sends results to the given + * DestReceiver. + */ +static void +ExecuteIntoDestReceiver(Query *query, ParamListInfo params, DestReceiver *dest) +{ + PlannedStmt *queryPlan = NULL; + Portal portal = NULL; + int eflags = 0; + int cursorOptions = 0; + long count = FETCH_ALL; + + /* create a new portal for executing the query */ + portal = CreateNewPortal(); + + /* don't display the portal in pg_cursors, it is for internal use only */ + portal->visible = false; + +#if (PG_VERSION_NUM >= 90600) + cursorOptions = CURSOR_OPT_PARALLEL_OK; +#endif + + /* plan the subquery, this may be another distributed query */ + queryPlan = pg_plan_query(query, cursorOptions, params); + + PortalDefineQuery(portal, + NULL, + "", + "SELECT", + list_make1(queryPlan), + NULL); + + PortalStart(portal, params, eflags, GetActiveSnapshot()); + PortalRun(portal, count, false, dest, dest, NULL); + PortalDrop(portal, false); +} diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index dd2c26733..40abbff78 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -15,6 +15,9 @@ #include "access/xact.h" #include "catalog/dependency.h" #include "catalog/namespace.h" +#include "distributed/insert_select_executor.h" +#include "distributed/insert_select_planner.h" +#include "distributed/multi_copy.h" #include "distributed/multi_executor.h" #include "distributed/multi_master_planner.h" #include "distributed/multi_planner.h" @@ -26,6 +29,7 @@ #include "executor/execdebug.h" #include "commands/copy.h" #include "nodes/makefuncs.h" +#include "parser/parsetree.h" #include "storage/lmgr.h" #include "tcop/utility.h" #include "utils/snapmgr.h" @@ -80,6 +84,15 @@ static CustomExecMethods RouterSelectCustomExecMethods = { .ExplainCustomScan = CitusExplainScan }; +static CustomExecMethods CoordinatorInsertSelectCustomExecMethods = { + .CustomName = "CoordinatorInsertSelectScan", + .BeginCustomScan = CitusSelectBeginScan, + .ExecCustomScan = CoordinatorInsertSelectExecScan, + .EndCustomScan = CitusEndScan, + .ReScanCustomScan = CitusReScan, + .ExplainCustomScan = CoordinatorInsertSelectExplainScan +}; + /* local function forward declarations */ static void PrepareMasterJobDirectory(Job *workerJob); @@ -167,6 +180,25 @@ RouterCreateScan(CustomScan *scan) } +/* + * CoordinatorInsertSelectCrateScan creates the scan state for executing + * INSERT..SELECT into a distributed table via the coordinator. + */ +Node * +CoordinatorInsertSelectCreateScan(CustomScan *scan) +{ + CitusScanState *scanState = palloc0(sizeof(CitusScanState)); + + scanState->executorType = MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT; + scanState->customScanState.ss.ps.type = T_CustomScanState; + scanState->multiPlan = GetMultiPlan(scan); + + scanState->customScanState.methods = &CoordinatorInsertSelectCustomExecMethods; + + return (Node *) scanState; +} + + /* * DelayedErrorCreateScan is only called if we could not plan for the given * query. This is the case when a plan is not ready for execution because diff --git a/src/backend/distributed/executor/multi_server_executor.c b/src/backend/distributed/executor/multi_server_executor.c index 127daff20..c57be21d2 100644 --- a/src/backend/distributed/executor/multi_server_executor.c +++ b/src/backend/distributed/executor/multi_server_executor.c @@ -43,12 +43,10 @@ MultiExecutorType JobExecutorType(MultiPlan *multiPlan) { Job *job = multiPlan->workerJob; - List *workerTaskList = job->taskList; - List *workerNodeList = ActiveWorkerNodeList(); - int taskCount = list_length(workerTaskList); - int workerNodeCount = list_length(workerNodeList); - double tasksPerNode = taskCount / ((double) workerNodeCount); - int dependedJobCount = list_length(job->dependedJobList); + List *workerNodeList = NIL; + int workerNodeCount = 0; + int taskCount = 0; + double tasksPerNode = 0.; MultiExecutorType executorType = TaskExecutorType; bool routerExecutablePlan = multiPlan->routerExecutable; @@ -59,6 +57,11 @@ JobExecutorType(MultiPlan *multiPlan) return MULTI_EXECUTOR_ROUTER; } + if (multiPlan->insertSelectSubquery != NULL) + { + return MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT; + } + /* if it is not a router executable plan, inform user according to the log level */ if (MultiTaskQueryLogLevel != MULTI_TASK_QUERY_INFO_OFF) { @@ -68,9 +71,15 @@ JobExecutorType(MultiPlan *multiPlan) " queries on the workers."))); } + workerNodeList = ActiveWorkerNodeList(); + workerNodeCount = list_length(workerNodeList); + taskCount = list_length(job->taskList); + tasksPerNode = taskCount / ((double) workerNodeCount); + if (executorType == MULTI_EXECUTOR_REAL_TIME) { double reasonableConnectionCount = 0; + int dependedJobCount = 0; /* if we need to open too many connections per worker, warn the user */ if (tasksPerNode >= MaxConnections) @@ -98,6 +107,7 @@ JobExecutorType(MultiPlan *multiPlan) } /* if we have repartition jobs with real time executor, error out */ + dependedJobCount = list_length(job->dependedJobList); if (dependedJobCount > 0) { ereport(ERROR, (errmsg("cannot use real time executor with repartition jobs"), diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c new file mode 100644 index 000000000..14f0ed99c --- /dev/null +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -0,0 +1,254 @@ +/*------------------------------------------------------------------------- + * + * insert_select_planner.c + * + * Planning logic for INSERT..SELECT. + * + * Copyright (c) 2017, Citus Data, Inc. + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "distributed/citus_ruleutils.h" +#include "distributed/errormessage.h" +#include "distributed/insert_select_planner.h" +#include "distributed/multi_executor.h" +#include "distributed/multi_logical_planner.h" +#include "distributed/multi_physical_planner.h" +#include "distributed/multi_router_planner.h" +#include "distributed/pg_dist_partition.h" +#include "nodes/makefuncs.h" +#include "nodes/nodeFuncs.h" +#include "nodes/parsenodes.h" +#include "optimizer/planner.h" +#include "parser/parsetree.h" +#include "parser/parse_coerce.h" +#include "parser/parse_relation.h" +#include "utils/lsyscache.h" + + +static DeferredErrorMessage * DeferErrorIfCoordinatorInsertSelectUnsupported( + Query *insertSelectQuery); +static Query * WrapSubquery(Query *subquery); + + +/* + * CreatteCoordinatorInsertSelectPlan creates a query plan for a SELECT into a + * distributed table. The query plan can also be executed on a worker in MX. + */ +MultiPlan * +CreateCoordinatorInsertSelectPlan(Query *parse) +{ + Query *insertSelectQuery = copyObject(parse); + Query *selectQuery = NULL; + + RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery); + RangeTblEntry *insertRte = ExtractInsertRangeTableEntry(insertSelectQuery); + Oid targetRelationId = insertRte->relid; + + ListCell *selectTargetCell = NULL; + ListCell *insertTargetCell = NULL; + + MultiPlan *multiPlan = CitusMakeNode(MultiPlan); + multiPlan->operation = CMD_INSERT; + + multiPlan->planningError = + DeferErrorIfCoordinatorInsertSelectUnsupported(insertSelectQuery); + + if (multiPlan->planningError != NULL) + { + return multiPlan; + } + + selectQuery = selectRte->subquery; + + /* + * Wrap the SELECT as a subquery if the INSERT...SELECT has CTEs or the SELECT + * has top-level set operations. + * + * We could simply wrap all queries, but that might create a subquery that is + * not supported by the logical planner. Since the logical planner also does + * not support CTEs and top-level set operations, we can wrap queries containing + * those without breaking anything. + */ + if (list_length(insertSelectQuery->cteList) > 0) + { + selectQuery = WrapSubquery(selectRte->subquery); + + /* copy CTEs from the INSERT ... SELECT statement into outer SELECT */ + selectQuery->cteList = copyObject(insertSelectQuery->cteList); + } + else if (selectQuery->setOperations != NULL) + { + /* top-level set operations confuse the ReorderInsertSelectTargetLists logic */ + selectQuery = WrapSubquery(selectRte->subquery); + } + + selectRte->subquery = selectQuery; + + ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte); + + /* add casts when the SELECT output does not directly match the table */ + forboth(insertTargetCell, insertSelectQuery->targetList, + selectTargetCell, selectQuery->targetList) + { + TargetEntry *insertTargetEntry = (TargetEntry *) lfirst(insertTargetCell); + TargetEntry *selectTargetEntry = (TargetEntry *) lfirst(selectTargetCell); + + Var *columnVar = NULL; + Oid columnType = InvalidOid; + int32 columnTypeMod = 0; + Oid selectOutputType = InvalidOid; + + /* indirection is not supported, e.g. INSERT INTO table (composite_column.x) */ + if (!IsA(insertTargetEntry->expr, Var)) + { + ereport(ERROR, (errmsg("can only handle regular columns in the target " + "list"))); + } + + columnVar = (Var *) insertTargetEntry->expr; + columnType = get_atttype(targetRelationId, columnVar->varattno); + columnTypeMod = get_atttypmod(targetRelationId, columnVar->varattno); + selectOutputType = columnVar->vartype; + + /* + * If the type in the target list does not match the type of the column, + * we need to cast to the column type. PostgreSQL would do this + * automatically during the insert, but we're passing the SELECT + * output directly to COPY. + */ + if (columnType != selectOutputType) + { + Expr *selectExpression = selectTargetEntry->expr; + Expr *typeCastedSelectExpr = + (Expr *) coerce_to_target_type(NULL, (Node *) selectExpression, + selectOutputType, columnType, + columnTypeMod, COERCION_EXPLICIT, + COERCE_IMPLICIT_CAST, -1); + + selectTargetEntry->expr = typeCastedSelectExpr; + } + } + + multiPlan->insertSelectSubquery = selectQuery; + multiPlan->insertTargetList = insertSelectQuery->targetList; + multiPlan->targetRelationId = targetRelationId; + + return multiPlan; +} + + +/* + * DeferErrorIfCoordinatorInsertSelectUnsupported returns an error if executing an + * INSERT ... SELECT command by pulling results of the SELECT to the coordinator + * is unsupported because it uses RETURNING, ON CONFLICT, or an append-distributed + * table. + */ +static DeferredErrorMessage * +DeferErrorIfCoordinatorInsertSelectUnsupported(Query *insertSelectQuery) +{ + RangeTblEntry *insertRte = NULL; + RangeTblEntry *subqueryRte = NULL; + Query *subquery = NULL; + + if (list_length(insertSelectQuery->returningList) > 0) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "RETURNING is not supported in INSERT ... SELECT via " + "coordinator", NULL, NULL); + } + + if (insertSelectQuery->onConflict) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "ON CONFLICT is not supported in INSERT ... SELECT via " + "coordinator", NULL, NULL); + } + + insertRte = ExtractInsertRangeTableEntry(insertSelectQuery); + if (PartitionMethod(insertRte->relid) == DISTRIBUTE_BY_APPEND) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "INSERT ... SELECT into an append-distributed table is " + "not supported", NULL, NULL); + } + + subqueryRte = ExtractSelectRangeTableEntry(insertSelectQuery); + subquery = (Query *) subqueryRte->subquery; + + if (NeedsDistributedPlanning(subquery) && + contain_nextval_expression_walker((Node *) insertSelectQuery->targetList, NULL)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "INSERT ... SELECT cannot generate sequence values when " + "selecting from a distributed table", + NULL, NULL); + } + + return NULL; +} + + +/* + * WrapSubquery wraps the given query as a subquery in a newly constructed + * "SELECT * FROM (...subquery...) citus_insert_select_subquery" query. + */ +static Query * +WrapSubquery(Query *subquery) +{ + Query *outerQuery = NULL; + ParseState *pstate = make_parsestate(NULL); + Alias *selectAlias = NULL; + RangeTblEntry *newRangeTableEntry = NULL; + RangeTblRef *newRangeTableRef = NULL; + ListCell *selectTargetCell = NULL; + List *newTargetList = NIL; + + outerQuery = makeNode(Query); + outerQuery->commandType = CMD_SELECT; + + /* create range table entries */ + selectAlias = makeAlias("citus_insert_select_subquery", NIL); + newRangeTableEntry = addRangeTableEntryForSubquery(pstate, subquery, + selectAlias, false, true); + outerQuery->rtable = list_make1(newRangeTableEntry); + + /* set the FROM expression to the subquery */ + newRangeTableRef = makeNode(RangeTblRef); + newRangeTableRef->rtindex = 1; + outerQuery->jointree = makeFromExpr(list_make1(newRangeTableRef), NULL); + + /* create a target list that matches the SELECT */ + foreach(selectTargetCell, subquery->targetList) + { + TargetEntry *selectTargetEntry = (TargetEntry *) lfirst(selectTargetCell); + Var *newSelectVar = NULL; + TargetEntry *newSelectTargetEntry = NULL; + + /* exactly 1 entry in FROM */ + int indexInRangeTable = 1; + + if (selectTargetEntry->resjunk) + { + continue; + } + + newSelectVar = makeVar(indexInRangeTable, selectTargetEntry->resno, + exprType((Node *) selectTargetEntry->expr), + exprTypmod((Node *) selectTargetEntry->expr), + exprCollation((Node *) selectTargetEntry->expr), 0); + + newSelectTargetEntry = makeTargetEntry((Expr *) newSelectVar, + selectTargetEntry->resno, + selectTargetEntry->resname, + selectTargetEntry->resjunk); + + newTargetList = lappend(newTargetList, newSelectTargetEntry); + } + + outerQuery->targetList = newTargetList; + + return outerQuery; +} diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index 52d2e10dc..e6a5b3e94 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -22,6 +22,7 @@ #include "optimizer/cost.h" #include "distributed/citus_nodefuncs.h" #include "distributed/connection_management.h" +#include "distributed/insert_select_planner.h" #include "distributed/multi_client_executor.h" #include "distributed/multi_executor.h" #include "distributed/multi_explain.h" @@ -44,7 +45,9 @@ #include "tcop/dest.h" #include "tcop/tcopprot.h" #include "tcop/utility.h" +#include "utils/builtins.h" #include "utils/json.h" +#include "utils/lsyscache.h" #include "utils/snapmgr.h" @@ -69,6 +72,8 @@ typedef struct RemoteExplainPlan /* Explain functions for distributed queries */ +static void CitusExplainOneQuery(Query *query, IntoClause *into, ExplainState *es, + const char *queryString, ParamListInfo params); static void ExplainJob(Job *job, ExplainState *es); static void ExplainMapMergeJob(MapMergeJob *mapMergeJob, ExplainState *es); static void ExplainTaskList(List *taskList, ExplainState *es); @@ -710,3 +715,73 @@ ExplainYAMLLineStarting(ExplainState *es) appendStringInfoSpaces(es->str, es->indent * 2); } } + + +/* + * CoordinatorInsertSelectExplainScan is a custom scan explain callback function + * which is used to print explain information of a Citus plan for an INSERT INTO + * distributed_table SELECT ... query that is evaluated on the coordinator. + */ +void +CoordinatorInsertSelectExplainScan(CustomScanState *node, List *ancestors, + struct ExplainState *es) +{ + CitusScanState *scanState = (CitusScanState *) node; + MultiPlan *multiPlan = scanState->multiPlan; + Query *query = multiPlan->insertSelectSubquery; + IntoClause *into = NULL; + ParamListInfo params = NULL; + char *queryString = NULL; + + if (es->analyze) + { + /* avoiding double execution here is tricky, error out for now */ + ereport(ERROR, (errmsg("EXPLAIN ANALYZE is currently not supported for INSERT " + "... SELECT commands via the coordinator"))); + } + + ExplainOpenGroup("Select Query", "Select Query", false, es); + + /* explain the inner SELECT query */ + CitusExplainOneQuery(query, into, es, queryString, params); + + ExplainCloseGroup("Select Query", "Select Query", false, es); +} + + +/* + * CitusExplainOneQuery is simply a duplicate of ExplainOneQuery in explain.c, which + * is static. + */ +static void +CitusExplainOneQuery(Query *query, IntoClause *into, ExplainState *es, + const char *queryString, ParamListInfo params) +{ + /* copied from ExplainOneQuery in explain.c */ + if (ExplainOneQuery_hook) + { + (*ExplainOneQuery_hook) (query, into, es, queryString, params); + } + else + { + PlannedStmt *plan; + instr_time planstart, + planduration; + int cursorOptions = 0; + + INSTR_TIME_SET_CURRENT(planstart); + + #if (PG_VERSION_NUM >= 90600) + cursorOptions = into ? 0 : CURSOR_OPT_PARALLEL_OK; + #endif + + /* plan the query */ + plan = pg_plan_query(query, cursorOptions, params); + + INSTR_TIME_SET_CURRENT(planduration); + INSTR_TIME_SUBTRACT(planduration, planstart); + + /* run it (if needed) and produce output */ + ExplainOnePlan(plan, into, es, queryString, params, &planduration); + } +} diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index 45a1576dc..f26371430 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -26,6 +26,7 @@ #include "distributed/multi_logical_planner.h" #include "distributed/multi_physical_planner.h" #include "distributed/relation_restriction_equivalence.h" +#include "distributed/multi_router_planner.h" #include "distributed/worker_protocol.h" #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" @@ -2662,6 +2663,15 @@ NeedsDistributedPlanning(Query *queryTree) return false; } + /* + * We can handle INSERT INTO distributed_table SELECT ... even if the SELECT + * part references local tables, so skip the remaining checks. + */ + if (InsertSelectIntoDistributedTable(queryTree)) + { + return true; + } + /* extract range table entries for simple relations only */ ExtractRangeTableRelationWalker((Node *) queryTree, &rangeTableList); diff --git a/src/backend/distributed/planner/multi_planner.c b/src/backend/distributed/planner/multi_planner.c index c09bd6760..c630a984c 100644 --- a/src/backend/distributed/planner/multi_planner.c +++ b/src/backend/distributed/planner/multi_planner.c @@ -52,6 +52,11 @@ static CustomScanMethods RouterCustomScanMethods = { RouterCreateScan }; +static CustomScanMethods CoordinatorInsertSelectCustomScanMethods = { + "Citus INSERT ... SELECT via coordinator", + CoordinatorInsertSelectCreateScan +}; + static CustomScanMethods DelayedErrorCustomScanMethods = { "Citus Delayed Error", DelayedErrorCreateScan @@ -274,10 +279,18 @@ CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query if (IsModifyCommand(query)) { - if (InsertSelectQuery(originalQuery)) + if (InsertSelectIntoDistributedTable(originalQuery)) { distributedPlan = CreateDistributedInsertSelectPlan(originalQuery, plannerRestrictionContext); + + if (distributedPlan->planningError != NULL) + { + RaiseDeferredError(distributedPlan->planningError, DEBUG1); + + /* if INSERT..SELECT cannot be distributed, pull to coordinator */ + distributedPlan = CreateCoordinatorInsertSelectPlan(originalQuery); + } } else { @@ -503,6 +516,12 @@ FinalizePlan(PlannedStmt *localPlan, MultiPlan *multiPlan) break; } + case MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT: + { + customScan->methods = &CoordinatorInsertSelectCustomScanMethods; + break; + } + default: { customScan->methods = &DelayedErrorCustomScanMethods; diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 044aa775c..64a13de63 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -741,9 +741,12 @@ ExtractSelectRangeTableEntry(Query *query) RangeTblRef *reference = NULL; RangeTblEntry *subqueryRte = NULL; - Assert(InsertSelectQuery(query)); + Assert(InsertSelectIntoDistributedTable(query)); - /* since we already asserted InsertSelectQuery() it is safe to access both lists */ + /* + * Since we already asserted InsertSelectIntoDistributedTable() it is safe to access + * both lists + */ fromList = query->jointree->fromlist; reference = linitial(fromList); subqueryRte = rt_fetch(reference->rtindex, query->rtable); @@ -764,8 +767,6 @@ ExtractInsertRangeTableEntry(Query *query) List *rangeTableList = query->rtable; RangeTblEntry *insertRTE = NULL; - AssertArg(InsertSelectQuery(query)); - insertRTE = rt_fetch(resultRelation, rangeTableList); return insertRTE; @@ -788,9 +789,25 @@ InsertSelectQuerySupported(Query *queryTree, RangeTblEntry *insertRte, DeferredErrorMessage *error = NULL; /* we only do this check for INSERT ... SELECT queries */ - AssertArg(InsertSelectQuery(queryTree)); + AssertArg(InsertSelectIntoDistributedTable(queryTree)); - EnsureCoordinator(); + subquery = subqueryRte->subquery; + + if (!NeedsDistributedPlanning(subquery)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "distributed INSERT ... SELECT can only select from " + "distributed tables", + NULL, NULL); + } + + if (GetLocalGroupId() != 0) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "distributed INSERT ... SELECT can only be performed from " + "the coordinator", + NULL, NULL); + } /* we do not expect to see a view in modify target */ foreach(rangeTableCell, queryTree->rtable) @@ -805,13 +822,11 @@ InsertSelectQuerySupported(Query *queryTree, RangeTblEntry *insertRte, } } - subquery = subqueryRte->subquery; - if (contain_volatile_functions((Node *) queryTree)) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "volatile functions are not allowed in INSERT ... SELECT " - "queries", + "volatile functions are not allowed in distributed " + "INSERT ... SELECT queries", NULL, NULL); } @@ -832,7 +847,7 @@ InsertSelectQuerySupported(Query *queryTree, RangeTblEntry *insertRte, { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, "only reference tables may be queried when targeting " - "a reference table with INSERT ... SELECT", + "a reference table with distributed INSERT ... SELECT", NULL, NULL); } } @@ -857,7 +872,7 @@ InsertSelectQuerySupported(Query *queryTree, RangeTblEntry *insertRte, { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, "INSERT target table and the source relation of the SELECT partition " - "column value must be colocated", + "column value must be colocated in distributed INSERT ... SELECT", NULL, NULL); } } @@ -888,7 +903,7 @@ MultiTaskRouterSelectQuerySupported(Query *query) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, "Subqueries without relations are not allowed in " - "INSERT ... SELECT queries", + "distributed INSERT ... SELECT queries", NULL, NULL); } @@ -896,8 +911,8 @@ MultiTaskRouterSelectQuerySupported(Query *query) if (subquery->limitCount != NULL) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "LIMIT clauses are not allowed in INSERT ... SELECT " - "queries", + "LIMIT clauses are not allowed in distirbuted INSERT " + "... SELECT queries", NULL, NULL); } @@ -905,8 +920,8 @@ MultiTaskRouterSelectQuerySupported(Query *query) if (subquery->limitOffset != NULL) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "OFFSET clauses are not allowed in INSERT ... SELECT " - "queries", + "OFFSET clauses are not allowed in distributed " + "INSERT ... SELECT queries", NULL, NULL); } @@ -918,16 +933,16 @@ MultiTaskRouterSelectQuerySupported(Query *query) if (subquery->windowClause != NULL) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "window functions are not allowed in INSERT ... SELECT " - "queries", + "window functions are not allowed in distributed " + "INSERT ... SELECT queries", NULL, NULL); } if (subquery->setOperations != NULL) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "Set operations are not allowed in INSERT ... SELECT " - "queries", + "Set operations are not allowed in distributed " + "INSERT ... SELECT queries", NULL, NULL); } @@ -940,8 +955,8 @@ MultiTaskRouterSelectQuerySupported(Query *query) if (subquery->groupingSets != NULL) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "grouping sets are not allowed in INSERT ... SELECT " - "queries", + "grouping sets are not allowed in distributed " + "INSERT ... SELECT queries", NULL, NULL); } @@ -952,7 +967,7 @@ MultiTaskRouterSelectQuerySupported(Query *query) if (subquery->hasDistinctOn) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "DISTINCT ON clauses are not allowed in " + "DISTINCT ON clauses are not allowed in distributed " "INSERT ... SELECT queries", NULL, NULL); } @@ -1134,8 +1149,9 @@ InsertPartitionColumnMatchesSelect(Query *query, RangeTblEntry *insertRte, } return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "INSERT INTO ... SELECT partition columns in the source " - "table and subquery do not match", + "cannot perform distributed INSERT INTO ... SELECT " + "because the partition columns in the source table " + "and subquery do not match", psprintf(errorDetailTemplate, exprDescription), "Ensure the target table's partition column has a " "corresponding simple column reference to a distributed " @@ -1149,8 +1165,9 @@ InsertPartitionColumnMatchesSelect(Query *query, RangeTblEntry *insertRte, if (!IsA(targetEntry->expr, Var)) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "INSERT INTO ... SELECT partition columns in the source " - "table and subquery do not match", + "cannot perform distributed INSERT INTO ... SELECT " + "because the partition columns in the source table " + "and subquery do not match", "The data type of the target table's partition column " "should exactly match the data type of the " "corresponding simple column reference in the subquery.", @@ -1161,13 +1178,15 @@ InsertPartitionColumnMatchesSelect(Query *query, RangeTblEntry *insertRte, if (!IsPartitionColumn(selectTargetExpr, subquery)) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "INSERT INTO ... SELECT partition columns in the source " - "table and subquery do not match", + "cannot perform distributed INSERT INTO ... SELECT " + "becuase the partition columns in the source table " + "and subquery do not match", "The target table's partition column should correspond " "to a partition column in the subquery.", NULL); } + /* finally, check that the select target column is a partition column */ /* we can set the select relation id */ *selectPartitionColumnTableId = subqueryPartitionColumnRelationId; @@ -1177,8 +1196,9 @@ InsertPartitionColumnMatchesSelect(Query *query, RangeTblEntry *insertRte, if (!targetTableHasPartitionColumn) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "INSERT INTO ... SELECT partition columns in the source " - "table and subquery do not match", + "cannot perform distributed INSERT INTO ... SELECT " + "because the partition columns in the source table " + "and subquery do not match", "the query doesn't include the target table's " "partition column", NULL); @@ -1210,8 +1230,6 @@ ModifyQuerySupported(Query *queryTree) Node *onConflictWhere = NULL; CmdType commandType = queryTree->commandType; - Assert(commandType == CMD_INSERT || commandType == CMD_UPDATE || - commandType == CMD_DELETE); /* * Reject subqueries which are in SELECT or WHERE clause. @@ -2731,7 +2749,7 @@ ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte, int subqueryTargetLength = 0; int targetEntryIndex = 0; - AssertArg(InsertSelectQuery(originalQuery)); + AssertArg(InsertSelectIntoDistributedTable(originalQuery)); subquery = subqueryRte->subquery; @@ -2867,8 +2885,9 @@ ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte, /* - * InsertSelectQuery returns true when the input query - * is INSERT INTO ... SELECT kind of query. + * InsertSelectIntoDistributedTable returns true when the input query is an + * INSERT INTO ... SELECT kind of query and the target is a distributed + * table. * * Note that the input query should be the original parsetree of * the query (i.e., not passed trough the standard planner). @@ -2877,12 +2896,13 @@ ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte, * rewrite/rewriteManip.c. */ bool -InsertSelectQuery(Query *query) +InsertSelectIntoDistributedTable(Query *query) { CmdType commandType = query->commandType; List *fromList = NULL; RangeTblRef *rangeTableReference = NULL; RangeTblEntry *subqueryRte = NULL; + RangeTblEntry *insertRte = NULL; if (commandType != CMD_INSERT) { @@ -2901,7 +2921,10 @@ InsertSelectQuery(Query *query) } rangeTableReference = linitial(fromList); - Assert(IsA(rangeTableReference, RangeTblRef)); + if (!IsA(rangeTableReference, RangeTblRef)) + { + return false; + } subqueryRte = rt_fetch(rangeTableReference->rtindex, query->rtable); if (subqueryRte->rtekind != RTE_SUBQUERY) @@ -2912,6 +2935,12 @@ InsertSelectQuery(Query *query) /* ensure that there is a query */ Assert(IsA(subqueryRte->subquery, Query)); + insertRte = ExtractInsertRangeTableEntry(query); + if (!IsDistributedTable(insertRte->relid)) + { + return false; + } + return true; } diff --git a/src/backend/distributed/test/deparse_shard_query.c b/src/backend/distributed/test/deparse_shard_query.c index 883f59258..fe70cb13a 100644 --- a/src/backend/distributed/test/deparse_shard_query.c +++ b/src/backend/distributed/test/deparse_shard_query.c @@ -60,7 +60,7 @@ deparse_shard_query_test(PG_FUNCTION_ARGS) StringInfo buffer = makeStringInfo(); /* reoreder the target list only for INSERT .. SELECT queries */ - if (InsertSelectQuery(query)) + if (InsertSelectIntoDistributedTable(query)) { RangeTblEntry *insertRte = linitial(query->rtable); RangeTblEntry *subqueryRte = lsecond(query->rtable); diff --git a/src/backend/distributed/utils/citus_clauses.c b/src/backend/distributed/utils/citus_clauses.c index 0016499a3..d357e953c 100644 --- a/src/backend/distributed/utils/citus_clauses.c +++ b/src/backend/distributed/utils/citus_clauses.c @@ -104,7 +104,7 @@ ExecuteMasterEvaluableFunctions(Query *query, PlanState *planState) ListCell *rteCell = NULL; ListCell *cteCell = NULL; Node *modifiedNode = NULL; - bool insertSelectQuery = InsertSelectQuery(query); + bool insertSelectQuery = InsertSelectIntoDistributedTable(query); if (query->jointree && query->jointree->quals) { diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 460de189b..866c23ddc 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -282,6 +282,11 @@ OutMultiPlan(OUTFUNC_ARGS) WRITE_NODE_FIELD(workerJob); WRITE_NODE_FIELD(masterQuery); WRITE_BOOL_FIELD(routerExecutable); + + WRITE_NODE_FIELD(insertSelectSubquery); + WRITE_NODE_FIELD(insertTargetList); + WRITE_OID_FIELD(targetRelationId); + WRITE_NODE_FIELD(planningError); } diff --git a/src/backend/distributed/utils/citus_readfuncs.c b/src/backend/distributed/utils/citus_readfuncs.c index 9bcf61efc..a1396e196 100644 --- a/src/backend/distributed/utils/citus_readfuncs.c +++ b/src/backend/distributed/utils/citus_readfuncs.c @@ -190,6 +190,11 @@ ReadMultiPlan(READFUNC_ARGS) READ_NODE_FIELD(workerJob); READ_NODE_FIELD(masterQuery); READ_BOOL_FIELD(routerExecutable); + + READ_NODE_FIELD(insertSelectSubquery); + READ_NODE_FIELD(insertTargetList); + READ_OID_FIELD(targetRelationId); + READ_NODE_FIELD(planningError); READ_DONE(); diff --git a/src/backend/distributed/utils/citus_ruleutils.c b/src/backend/distributed/utils/citus_ruleutils.c index b5c8a7dc4..baf7248d8 100644 --- a/src/backend/distributed/utils/citus_ruleutils.c +++ b/src/backend/distributed/utils/citus_ruleutils.c @@ -59,7 +59,7 @@ static void AppendOptionListToString(StringInfo stringData, List *options); static const char * convert_aclright_to_string(int aclright); -static bool contain_nextval_expression_walker(Node *node, void *context); + /* * pg_get_extensiondef_string finds the foreign data wrapper that corresponds to @@ -989,7 +989,7 @@ convert_aclright_to_string(int aclright) * contain_nextval_expression_walker walks over expression tree and returns * true if it contains call to 'nextval' function. */ -static bool +bool contain_nextval_expression_walker(Node *node, void *context) { if (node == NULL) diff --git a/src/include/distributed/citus_ruleutils.h b/src/include/distributed/citus_ruleutils.h index e7eaf9204..e79224326 100644 --- a/src/include/distributed/citus_ruleutils.h +++ b/src/include/distributed/citus_ruleutils.h @@ -36,6 +36,7 @@ extern void deparse_shard_index_statement(IndexStmt *origStmt, Oid distrelid, int64 shardid, StringInfo buffer); extern char * pg_get_indexclusterdef_string(Oid indexRelationId); extern List * pg_get_table_grants(Oid relationId); +extern bool contain_nextval_expression_walker(Node *node, void *context); /* Function declarations for version dependent PostgreSQL ruleutils functions */ extern void pg_get_query_def(Query *query, StringInfo buffer); diff --git a/src/include/distributed/insert_select_executor.h b/src/include/distributed/insert_select_executor.h new file mode 100644 index 000000000..bbbac7725 --- /dev/null +++ b/src/include/distributed/insert_select_executor.h @@ -0,0 +1,23 @@ +/*------------------------------------------------------------------------- + * + * insert_select_executor.h + * + * Declarations for public functions and types related to executing + * INSERT..SELECT commands. + * + * Copyright (c) 2017, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef INSERT_SELECT_EXECUTOR_H +#define INSERT_SELECT_EXECUTOR_H + + +#include "executor/execdesc.h" + + +extern TupleTableSlot * CoordinatorInsertSelectExecScan(CustomScanState *node); + + +#endif /* INSERT_SELECT_EXECUTOR_H */ diff --git a/src/include/distributed/insert_select_planner.h b/src/include/distributed/insert_select_planner.h new file mode 100644 index 000000000..5fccdacde --- /dev/null +++ b/src/include/distributed/insert_select_planner.h @@ -0,0 +1,29 @@ +/*------------------------------------------------------------------------- + * + * insert_select_planner.h + * + * Declarations for public functions and types related to planning + * INSERT..SELECT commands. + * + * Copyright (c) 2017, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef INSERT_SELECT_PLANNER_H +#define INSERT_SELECT_PLANNER_H + + +#include "postgres.h" + +#include "distributed/multi_physical_planner.h" +#include "nodes/execnodes.h" +#include "nodes/parsenodes.h" +#include "nodes/plannodes.h" + + +extern MultiPlan * CreateCoordinatorInsertSelectPlan(Query *originalQuery); +extern void CoordinatorInsertSelectExplainScan(CustomScanState *node, List *ancestors, + struct ExplainState *es); + +#endif /* INSERT_SELECT_PLANNER_H */ diff --git a/src/include/distributed/multi_copy.h b/src/include/distributed/multi_copy.h index 1b6fcc412..07751d598 100644 --- a/src/include/distributed/multi_copy.h +++ b/src/include/distributed/multi_copy.h @@ -84,6 +84,9 @@ typedef struct CitusCopyDestReceiver /* state on how to copy out data types */ CopyOutState copyOutState; FmgrInfo *columnOutputFunctions; + + /* number of tuples sent */ + int64 tuplesSent; } CitusCopyDestReceiver; diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index e3b53a327..c05a9f820 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -38,6 +38,7 @@ typedef struct CitusScanState extern Node * RealTimeCreateScan(CustomScan *scan); extern Node * TaskTrackerCreateScan(CustomScan *scan); extern Node * RouterCreateScan(CustomScan *scan); +extern Node * CoordinatorInsertSelectCreateScan(CustomScan *scan); extern Node * DelayedErrorCreateScan(CustomScan *scan); extern void CitusSelectBeginScan(CustomScanState *node, EState *estate, int eflags); extern TupleTableSlot * RealTimeExecScan(CustomScanState *node); diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 299ba46f7..44edac1f4 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -221,6 +221,11 @@ typedef struct MultiPlan Query *masterQuery; bool routerExecutable; + /* INSERT ... SELECT via coordinator only */ + Query *insertSelectSubquery; + List *insertTargetList; + Oid targetRelationId; + /* * NULL if this a valid plan, an error description otherwise. This will * e.g. be set if SQL features are present that a planner doesn't support, diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index 54a8abc7b..32f54ce3d 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -46,7 +46,7 @@ extern List * ShardIntervalOpExpressions(ShardInterval *shardInterval, Index rte extern RelationRestrictionContext * CopyRelationRestrictionContext( RelationRestrictionContext *oldContext); -extern bool InsertSelectQuery(Query *query); +extern bool InsertSelectIntoDistributedTable(Query *query); extern Oid ExtractFirstDistributedTableId(Query *query); extern RangeTblEntry * ExtractSelectRangeTableEntry(Query *query); extern RangeTblEntry * ExtractInsertRangeTableEntry(Query *query); diff --git a/src/include/distributed/multi_server_executor.h b/src/include/distributed/multi_server_executor.h index 20421860d..b6997496a 100644 --- a/src/include/distributed/multi_server_executor.h +++ b/src/include/distributed/multi_server_executor.h @@ -94,7 +94,8 @@ typedef enum MULTI_EXECUTOR_INVALID_FIRST = 0, MULTI_EXECUTOR_REAL_TIME = 1, MULTI_EXECUTOR_TASK_TRACKER = 2, - MULTI_EXECUTOR_ROUTER = 3 + MULTI_EXECUTOR_ROUTER = 3, + MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT = 4 } MultiExecutorType; diff --git a/src/test/regress/expected/multi_behavioral_analytics_basics.out b/src/test/regress/expected/multi_behavioral_analytics_basics.out index 80e8b1a96..eb6da2a9c 100644 --- a/src/test/regress/expected/multi_behavioral_analytics_basics.out +++ b/src/test/regress/expected/multi_behavioral_analytics_basics.out @@ -69,9 +69,13 @@ FROM ( ) t2 ON (t1.user_id = t2.user_id) GROUP BY t1.user_id, hasdone_event ) t GROUP BY user_id, hasdone_event; -ERROR: Set operations are not allowed in INSERT ... SELECT queries -- get some statistics from the aggregated results to ensure the results are correct --- SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results; +SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results; + count | count | avg +-------+-------+--------------------- + 8 | 8 | 16.1250000000000000 +(1 row) + ------------------------------------ ------------------------------------ -- Funnel, grouped by the number of times a user has done an event @@ -143,9 +147,13 @@ GROUP BY count_pay, user_id ORDER BY count_pay; -ERROR: Set operations are not allowed in INSERT ... SELECT queries -- get some statistics from the aggregated results to ensure the results are correct --- SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results; +SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results; + count | count | avg +-------+-------+--------------------- + 8 | 8 | 45.0000000000000000 +(1 row) + ------------------------------------ ------------------------------------ -- Most recently seen users_table events_table diff --git a/src/test/regress/expected/multi_behavioral_analytics_single_shard_queries.out b/src/test/regress/expected/multi_behavioral_analytics_single_shard_queries.out index a6a1b1faa..c04e894f5 100644 --- a/src/test/regress/expected/multi_behavioral_analytics_single_shard_queries.out +++ b/src/test/regress/expected/multi_behavioral_analytics_single_shard_queries.out @@ -101,7 +101,6 @@ FROM ( WHERE t1.user_id = 20 GROUP BY t1.user_id, hasdone_event ) t GROUP BY user_id, hasdone_event; -ERROR: Set operations are not allowed in INSERT ... SELECT queries ------------------------------------ ------------------------------------ -- Funnel grouped by whether or not a user has done an event -- two shards query @@ -145,9 +144,13 @@ FROM ( WHERE (t1.user_id = 20 OR t1.user_id = 17) GROUP BY t1.user_id, hasdone_event ) t GROUP BY user_id, hasdone_event; -ERROR: Set operations are not allowed in INSERT ... SELECT queries -- get some statistics from the aggregated results to ensure the results are correct --- SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results_second; +SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results_second; + count | count | avg +-------+-------+--------------------- + 2 | 2 | 18.5000000000000000 +(1 row) + ------------------------------------ ------------------------------------ -- Most recently seen users_table events_table -- single shard query diff --git a/src/test/regress/expected/multi_explain.out b/src/test/regress/expected/multi_explain.out index 322445b7f..126319156 100644 --- a/src/test/regress/expected/multi_explain.out +++ b/src/test/regress/expected/multi_explain.out @@ -985,3 +985,62 @@ Custom Scan (Citus Router) -> Seq Scan on explain_table_570001 explain_table Filter: (id = 1) ROLLBACK; +-- test explain with local INSERT ... SELECT +EXPLAIN (COSTS OFF) +INSERT INTO lineitem_hash_part +SELECT o_orderkey FROM orders_hash_part LIMIT 3; +Custom Scan (Citus INSERT ... SELECT via coordinator) + -> Limit + -> Custom Scan (Citus Real-Time) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Limit + -> Seq Scan on orders_hash_part_360295 orders_hash_part +SELECT true AS valid FROM explain_json($$ + INSERT INTO lineitem_hash_part (l_orderkey) + SELECT o_orderkey FROM orders_hash_part LIMIT 3; +$$); +t +EXPLAIN (COSTS OFF) +INSERT INTO lineitem_hash_part (l_orderkey, l_quantity) +SELECT o_orderkey, 5 FROM orders_hash_part LIMIT 3; +Custom Scan (Citus INSERT ... SELECT via coordinator) + -> Limit + -> Custom Scan (Citus Real-Time) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Limit + -> Seq Scan on orders_hash_part_360295 orders_hash_part +EXPLAIN (COSTS OFF) +INSERT INTO lineitem_hash_part (l_orderkey) +SELECT s FROM generate_series(1,5) s; +Custom Scan (Citus INSERT ... SELECT via coordinator) + -> Function Scan on generate_series s +EXPLAIN (COSTS OFF) +WITH cte1 AS (SELECT s FROM generate_series(1,10) s) +INSERT INTO lineitem_hash_part +WITH cte1 AS (SELECT * FROM cte1 LIMIT 5) +SELECT s FROM cte1; +Custom Scan (Citus INSERT ... SELECT via coordinator) + -> Subquery Scan on citus_insert_select_subquery + CTE cte1 + -> Function Scan on generate_series s + -> CTE Scan on cte1 + CTE cte1 + -> Limit + -> CTE Scan on cte1 cte1_1 +EXPLAIN (COSTS OFF) +INSERT INTO lineitem_hash_part +( SELECT s FROM generate_series(1,5) s) UNION +( SELECT s FROM generate_series(5,10) s); +Custom Scan (Citus INSERT ... SELECT via coordinator) + -> Subquery Scan on citus_insert_select_subquery + -> HashAggregate + Group Key: s.s + -> Append + -> Function Scan on generate_series s + -> Function Scan on generate_series s_1 diff --git a/src/test/regress/expected/multi_explain_0.out b/src/test/regress/expected/multi_explain_0.out index 0f76dc9d0..b0f5db6b5 100644 --- a/src/test/regress/expected/multi_explain_0.out +++ b/src/test/regress/expected/multi_explain_0.out @@ -940,3 +940,62 @@ Custom Scan (Citus Router) -> Seq Scan on explain_table_570001 explain_table Filter: (id = 1) ROLLBACK; +-- test explain with local INSERT ... SELECT +EXPLAIN (COSTS OFF) +INSERT INTO lineitem_hash_part +SELECT o_orderkey FROM orders_hash_part LIMIT 3; +Custom Scan (Citus INSERT ... SELECT via coordinator) + -> Limit + -> Custom Scan (Citus Real-Time) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Limit + -> Seq Scan on orders_hash_part_360295 orders_hash_part +SELECT true AS valid FROM explain_json($$ + INSERT INTO lineitem_hash_part (l_orderkey) + SELECT o_orderkey FROM orders_hash_part LIMIT 3; +$$); +t +EXPLAIN (COSTS OFF) +INSERT INTO lineitem_hash_part (l_orderkey, l_quantity) +SELECT o_orderkey, 5 FROM orders_hash_part LIMIT 3; +Custom Scan (Citus INSERT ... SELECT via coordinator) + -> Limit + -> Custom Scan (Citus Real-Time) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Limit + -> Seq Scan on orders_hash_part_360295 orders_hash_part +EXPLAIN (COSTS OFF) +INSERT INTO lineitem_hash_part (l_orderkey) +SELECT s FROM generate_series(1,5) s; +Custom Scan (Citus INSERT ... SELECT via coordinator) + -> Function Scan on generate_series s +EXPLAIN (COSTS OFF) +WITH cte1 AS (SELECT s FROM generate_series(1,10) s) +INSERT INTO lineitem_hash_part +WITH cte1 AS (SELECT * FROM cte1 LIMIT 5) +SELECT s FROM cte1; +Custom Scan (Citus INSERT ... SELECT via coordinator) + -> Subquery Scan on citus_insert_select_subquery + CTE cte1 + -> Function Scan on generate_series s + -> CTE Scan on cte1 + CTE cte1 + -> Limit + -> CTE Scan on cte1 cte1_1 +EXPLAIN (COSTS OFF) +INSERT INTO lineitem_hash_part +( SELECT s FROM generate_series(1,5) s) UNION +( SELECT s FROM generate_series(5,10) s); +Custom Scan (Citus INSERT ... SELECT via coordinator) + -> Subquery Scan on citus_insert_select_subquery + -> HashAggregate + Group Key: s.s + -> Append + -> Function Scan on generate_series s + -> Function Scan on generate_series s_1 diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index 123d833d1..ebb863fe6 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -141,20 +141,6 @@ WARNING: function public.evaluate_on_master(integer) does not exist WARNING: function public.evaluate_on_master(integer) does not exist ERROR: could not modify any active placements \set VERBOSITY default --- volatile functions should be disallowed -INSERT INTO raw_events_second (user_id, value_1) -SELECT - user_id, (random()*10)::int -FROM - raw_events_first; -ERROR: volatile functions are not allowed in INSERT ... SELECT queries -INSERT INTO raw_events_second (user_id, value_1) -WITH sub_cte AS (SELECT (random()*10)::int) -SELECT - user_id, (SELECT * FROM sub_cte) -FROM - raw_events_first; -ERROR: volatile functions are not allowed in INSERT ... SELECT queries -- add one more row INSERT INTO raw_events_first (user_id, time) VALUES (7, now()); @@ -645,7 +631,10 @@ INSERT INTO agg_events (value_1_agg, user_id) DISTINCT ON (value_1) value_1, user_id FROM raw_events_first; -ERROR: DISTINCT ON clauses are not allowed in INSERT ... SELECT queries +DEBUG: DISTINCT ON clauses are not allowed in distributed INSERT ... SELECT queries +DEBUG: Collecting INSERT ... SELECT results on coordinator +ERROR: could not run distributed query with DISTINCT clause +HINT: Consider using an equality filter on the distributed table's partition column. -- We do not support some CTEs WITH fist_table_agg AS (SELECT sum(value_1) as v1_agg, user_id FROM raw_events_first GROUP BY user_id) @@ -655,8 +644,10 @@ INSERT INTO agg_events v1_agg, user_id FROM fist_table_agg; -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match -DETAIL: The target table's partition column should correspond to a partition column in the subquery. +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +ERROR: could not run distributed query with complex table expressions +HINT: Consider using an equality filter on the distributed table's partition column. -- We don't support CTEs that consist of const values as well INSERT INTO agg_events WITH sub_cte AS (SELECT 1) @@ -664,8 +655,12 @@ INSERT INTO agg_events raw_events_first.user_id, (SELECT * FROM sub_cte) FROM raw_events_first; -ERROR: Subqueries without relations are not allowed in INSERT ... SELECT queries --- We do not support any set operations +DEBUG: Subqueries without relations are not allowed in distributed INSERT ... SELECT queries +DEBUG: Collecting INSERT ... SELECT results on coordinator +ERROR: could not run distributed query with common table expressions +HINT: Consider using an equality filter on the distributed table's partition column. +-- We support set operations via the coordinator +BEGIN; INSERT INTO raw_events_first(user_id) SELECT @@ -673,14 +668,19 @@ SELECT FROM ((SELECT user_id FROM raw_events_first) UNION (SELECT user_id FROM raw_events_second)) as foo; -ERROR: Set operations are not allowed in INSERT ... SELECT queries +DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries +DEBUG: Collecting INSERT ... SELECT results on coordinator +ROLLBACK; -- We do not support any set operations INSERT INTO raw_events_first(user_id) (SELECT user_id FROM raw_events_first) INTERSECT (SELECT user_id FROM raw_events_first); -ERROR: Set operations are not allowed in INSERT ... SELECT queries --- We do not support any set operations +DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries +DEBUG: Collecting INSERT ... SELECT results on coordinator +ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys +DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator. +-- If the query is router plannable then it is executed via the coordinator INSERT INTO raw_events_first(user_id) SELECT @@ -688,7 +688,10 @@ SELECT FROM ((SELECT user_id FROM raw_events_first WHERE user_id = 15) EXCEPT (SELECT user_id FROM raw_events_second where user_id = 17)) as foo; -ERROR: Set operations are not allowed in INSERT ... SELECT queries +DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries +DEBUG: Collecting INSERT ... SELECT results on coordinator +DEBUG: Creating router plan +DEBUG: Plan is router executable -- some supported LEFT joins INSERT INTO agg_events (user_id) SELECT @@ -1027,42 +1030,53 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4, raw_events_second WHERE raw_events_first.user_id = raw_events_second.user_id GROUP BY raw_events_second.value_3) AS foo; -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: The data type of the target table's partition column should exactly match the data type of the corresponding simple column reference in the subquery. +DEBUG: Collecting INSERT ... SELECT results on coordinator +ERROR: cannot push down this subquery +DETAIL: Group by list without partition column is currently unsupported -- error cases -- no part column at all INSERT INTO raw_events_second (value_1) SELECT value_1 FROM raw_events_first; -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: the query doesn't include the target table's partition column +DEBUG: Collecting INSERT ... SELECT results on coordinator +ERROR: the partition column of table public.raw_events_second should have a value INSERT INTO raw_events_second (value_1) SELECT user_id FROM raw_events_first; -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: the query doesn't include the target table's partition column +DEBUG: Collecting INSERT ... SELECT results on coordinator +ERROR: the partition column of table public.raw_events_second should have a value INSERT INTO raw_events_second (user_id) SELECT value_1 FROM raw_events_first; -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match +DEBUG: cannot perform distributed INSERT INTO ... SELECT becuase the partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. +DEBUG: Collecting INSERT ... SELECT results on coordinator +ERROR: the partition column of table public.raw_events_second should have a value INSERT INTO raw_events_second (user_id) SELECT user_id * 2 FROM raw_events_first; -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: Subquery contains an operator in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. +DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO raw_events_second (user_id) SELECT user_id :: bigint FROM raw_events_first; -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: Subquery contains an explicit cast in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. +DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO agg_events (value_3_agg, value_4_agg, @@ -1076,9 +1090,11 @@ SELECT SUM(value_3), Avg(value_2) FROM raw_events_first GROUP BY user_id; -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: Subquery contains an aggregation in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. +DEBUG: Collecting INSERT ... SELECT results on coordinator +ERROR: the partition column of table public.agg_events should have a value INSERT INTO agg_events (value_3_agg, value_4_agg, @@ -1093,16 +1109,21 @@ SELECT SUM(value_3), FROM raw_events_first GROUP BY user_id, value_2; -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match +DEBUG: cannot perform distributed INSERT INTO ... SELECT becuase the partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. +DEBUG: Collecting INSERT ... SELECT results on coordinator +ERROR: the partition column of table public.agg_events should have a value -- tables should be co-located INSERT INTO agg_events (user_id) SELECT user_id FROM reference_table; -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match +DEBUG: cannot perform distributed INSERT INTO ... SELECT becuase the partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. +DEBUG: Collecting INSERT ... SELECT results on coordinator +DEBUG: Creating router plan +DEBUG: Plan is router executable -- unsupported joins between subqueries -- we do not return bare partition column on the inner query INSERT INTO agg_events @@ -1127,9 +1148,12 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4, GROUP BY raw_events_second.value_1 HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2 ON (f.id = f2.id); -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: Subquery contains an expression that is not a simple column reference in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. +DEBUG: Collecting INSERT ... SELECT results on coordinator +ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys +DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator. -- the second part of the query is not routable since -- GROUP BY not on the partition column (i.e., value_1) and thus join -- on f.id = f2.id is not on the partition key (instead on the sum of partition key) @@ -1564,7 +1588,10 @@ SELECT user_id, Sum(value_2) AS sum_val2 FROM raw_events_second GROUP BY grouping sets ( ( user_id ), ( value_1 ), ( user_id, value_1 ), ( ) ); -ERROR: grouping sets are not allowed in INSERT ... SELECT queries +DEBUG: grouping sets are not allowed in distributed INSERT ... SELECT queries +DEBUG: Collecting INSERT ... SELECT results on coordinator +ERROR: could not run distributed query with GROUPING SETS, CUBE, or ROLLUP +HINT: Consider using an equality filter on the distributed table's partition column. -- set back to INFO SET client_min_messages TO INFO; -- avoid constraint violations @@ -1592,7 +1619,8 @@ WHERE user_id ) as f_inner ) ) AS f2); -ERROR: LIMIT clauses are not allowed in INSERT ... SELECT queries +ERROR: cannot push down this subquery +DETAIL: Limit in subquery is currently unsupported -- Altering a table and selecting from it using a multi-shard statement -- in the same transaction is allowed because we will use the same -- connections for all co-located placements. @@ -1646,7 +1674,7 @@ INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) SELECT count(*) FROM raw_events_second; count ------- - 18 + 36 (1 row) INSERT INTO raw_events_second SELECT * FROM test_view; @@ -1656,12 +1684,9 @@ INSERT INTO raw_events_second SELECT * FROM test_view WHERE user_id = 17 GROUP B SELECT count(*) FROM raw_events_second; count ------- - 20 + 38 (1 row) --- inserting into views does not -INSERT INTO test_view SELECT * FROM raw_events_second; -ERROR: cannot insert into view over distributed table -- we need this in our next test truncate raw_events_first; SET client_min_messages TO DEBUG2; @@ -1912,7 +1937,7 @@ FROM table_with_defaults GROUP BY store_id; -ERROR: volatile functions are not allowed in INSERT ... SELECT queries +ERROR: INSERT ... SELECT cannot generate sequence values when selecting from a distributed table -- do some more error/error message checks SET citus.shard_count TO 4; SET citus.shard_replication_factor TO 1; @@ -1937,49 +1962,61 @@ SELECT create_distributed_table('table_with_starts_with_defaults', 'c'); (1 row) +SET client_min_messages TO DEBUG; INSERT INTO text_table (part_col) SELECT CASE WHEN part_col = 'onder' THEN 'marco' END FROM text_table ; -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: Subquery contains a case expression in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. +DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO text_table (part_col) SELECT COALESCE(part_col, 'onder') FROM text_table; -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: Subquery contains a coalesce expression in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. +DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO text_table (part_col) SELECT GREATEST(part_col, 'jason') FROM text_table; -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: Subquery contains a min/max expression in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. +DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO text_table (part_col) SELECT LEAST(part_col, 'andres') FROM text_table; -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: Subquery contains a min/max expression in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. +DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO text_table (part_col) SELECT NULLIF(part_col, 'metin') FROM text_table; -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: Subquery contains an expression that is not a simple column reference in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. +DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO text_table (part_col) SELECT part_col isnull FROM text_table; -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: Subquery contains an expression that is not a simple column reference in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. +DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO text_table (part_col) SELECT part_col::text from char_table; -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: Subquery contains an explicit coercion in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. +DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO text_table (part_col) SELECT (part_col = 'burak') is true FROM text_table; -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: Subquery contains an expression that is not a simple column reference in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. +DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO text_table (part_col) SELECT val FROM text_table; -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: The data type of the target table's partition column should exactly match the data type of the corresponding simple column reference in the subquery. +DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO text_table (part_col) SELECT val::text FROM text_table; -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: Subquery contains an explicit coercion in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. +DEBUG: Collecting INSERT ... SELECT results on coordinator +RESET client_min_messages; insert into table_with_starts_with_defaults (b,c) select b,c FROM table_with_starts_with_defaults; -- Test on partition column without native hash function CREATE TABLE raw_table @@ -2012,10 +2049,380 @@ SELECT * FROM summary_table; 11-11-1980 | 1 (1 row) +-- Test INSERT ... SELECT via coordinator +-- Select from constants +TRUNCATE raw_events_first; +INSERT INTO raw_events_first (user_id, value_1) +SELECT * FROM (VALUES (1,2), (3,4), (5,6)) AS v(int,int); +SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id; + user_id | value_1 +---------+--------- + 1 | 2 + 3 | 4 + 5 | 6 +(3 rows) + +-- Select from local functions +TRUNCATE raw_events_first; +CREATE SEQUENCE insert_select_test_seq; +SET client_min_messages TO DEBUG; +INSERT INTO raw_events_first (user_id, value_1, value_2) +SELECT + s, nextval('insert_select_test_seq'), (random()*10)::int +FROM + generate_series(1, 5) s; +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1; + user_id | value_1 +---------+--------- + 1 | 1 + 2 | 2 + 3 | 3 + 4 | 4 + 5 | 5 +(5 rows) + +-- ON CONFLICT is unsupported +INSERT INTO raw_events_first (user_id, value_1) +SELECT s, nextval('insert_select_test_seq') FROM generate_series(1, 5) s +ON CONFLICT DO NOTHING; +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +ERROR: ON CONFLICT is not supported in INSERT ... SELECT via coordinator +-- RETURNING is unsupported +INSERT INTO raw_events_first (user_id, value_1) +SELECT s, nextval('insert_select_test_seq') FROM generate_series(1, 5) s +RETURNING *; +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +ERROR: RETURNING is not supported in INSERT ... SELECT via coordinator +RESET client_min_messages; +-- INSERT ... SELECT and multi-shard SELECT in the same transaction is unsupported +TRUNCATE raw_events_first; +BEGIN; +INSERT INTO raw_events_first (user_id, value_1) +SELECT s, s FROM generate_series(1, 5) s; +SELECT user_id, value_1 FROM raw_events_first; +ERROR: cannot open new connections after the first modification command within a transaction +ROLLBACK; +-- INSERT ... SELECT and single-shard SELECT in the same transaction is supported +TRUNCATE raw_events_first; +BEGIN; +INSERT INTO raw_events_first (user_id, value_1) +SELECT s, s FROM generate_series(1, 5) s; +SELECT user_id, value_1 FROM raw_events_first WHERE user_id = 1; + user_id | value_1 +---------+--------- + 1 | 1 +(1 row) + +COMMIT; +-- Select from local table +TRUNCATE raw_events_first; +CREATE TEMPORARY TABLE raw_events_first_local AS +SELECT s AS u, 2*s AS v FROM generate_series(1, 5) s; +INSERT INTO raw_events_first (user_id, value_1) +SELECT u, v FROM raw_events_first_local; +SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1; + user_id | value_1 +---------+--------- + 1 | 2 + 2 | 4 + 3 | 6 + 4 | 8 + 5 | 10 +(5 rows) + +-- Use columns in opposite order +TRUNCATE raw_events_first; +INSERT INTO raw_events_first (value_1, user_id) +SELECT u, v FROM raw_events_first_local; +SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1; + user_id | value_1 +---------+--------- + 2 | 1 + 4 | 2 + 6 | 3 + 8 | 4 + 10 | 5 +(5 rows) + +-- Set operations can work with opposite column order +TRUNCATE raw_events_first; +INSERT INTO raw_events_first (value_3, user_id) +( SELECT v, u::bigint FROM raw_events_first_local ) +UNION ALL +( SELECT v, u FROM raw_events_first_local ); +SELECT user_id, value_3 FROM raw_events_first ORDER BY user_id, value_3; + user_id | value_3 +---------+--------- + 1 | 2 + 1 | 2 + 2 | 4 + 2 | 4 + 3 | 6 + 3 | 6 + 4 | 8 + 4 | 8 + 5 | 10 + 5 | 10 +(10 rows) + +-- Select from other distributed table with limit +TRUNCATE raw_events_first; +TRUNCATE raw_events_second; +INSERT INTO raw_events_second (user_id, value_4) +SELECT s, 3*s FROM generate_series (1,5) s; +INSERT INTO raw_events_first (user_id, value_1) +SELECT user_id, value_4 FROM raw_events_second LIMIT 5; +SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1; + user_id | value_1 +---------+--------- + 1 | 3 + 2 | 6 + 3 | 9 + 4 | 12 + 5 | 15 +(5 rows) + +-- CTEs are supported in local queries +TRUNCATE raw_events_first; +WITH removed_rows AS ( + DELETE FROM raw_events_first_local RETURNING u +) +INSERT INTO raw_events_first (user_id, value_1) +WITH value AS (SELECT 1) +SELECT * FROM removed_rows, value; +SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1; + user_id | value_1 +---------+--------- + 1 | 1 + 2 | 1 + 3 | 1 + 4 | 1 + 5 | 1 +(5 rows) + +-- nested CTEs are also supported +TRUNCATE raw_events_first; +INSERT INTO raw_events_first_local SELECT s, 2*s FROM generate_series(0, 10) s; +WITH rows_to_remove AS ( + SELECT u FROM raw_events_first_local WHERE u > 0 +), +removed_rows AS ( + DELETE FROM raw_events_first_local + WHERE u IN (SELECT * FROM rows_to_remove) + RETURNING u, v +) +INSERT INTO raw_events_first (user_id, value_1) +WITH ultra_rows AS ( + WITH numbers AS ( + SELECT s FROM generate_series(1,10) s + ), + super_rows AS ( + SELECT u, v FROM removed_rows JOIN numbers ON (u = s) + ) + SELECT * FROM super_rows LIMIT 5 +) +SELECT u, v FROM ultra_rows; +SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1; + user_id | value_1 +---------+--------- + 1 | 2 + 2 | 4 + 3 | 6 + 4 | 8 + 5 | 10 +(5 rows) + +-- CTEs with duplicate names are also supported +TRUNCATE raw_events_first; +WITH super_rows AS ( + SELECT u FROM raw_events_first_local +) +INSERT INTO raw_events_first (user_id, value_1) +WITH super_rows AS ( + SELECT * FROM super_rows GROUP BY u +) +SELECT u, 5 FROM super_rows; +SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1; + user_id | value_1 +---------+--------- + 0 | 5 +(1 row) + +-- CTEs are supported in router queries +TRUNCATE raw_events_first; +WITH user_two AS ( + SELECT user_id, value_4 FROM raw_events_second WHERE user_id = 2 +) +INSERT INTO raw_events_first (user_id, value_1) +SELECT * FROM user_two; +SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1; + user_id | value_1 +---------+--------- + 2 | 6 +(1 row) + +-- CTEs are supported when there are name collisions +WITH numbers AS ( + SELECT s FROM generate_series(1,10) s +) +INSERT INTO raw_events_first(user_id, value_1) +WITH numbers AS ( + SELECT s, s FROM generate_series(1,5) s +) +SELECT * FROM numbers; +-- Select into distributed table with a sequence +CREATE TABLE "CaseSensitiveTable" ("UserID" int, "Value1" int); +SELECT create_distributed_table('"CaseSensitiveTable"', 'UserID'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO "CaseSensitiveTable" +SELECT s, s FROM generate_series(1,10) s; +SELECT * FROM "CaseSensitiveTable" ORDER BY "UserID"; + UserID | Value1 +--------+-------- + 1 | 1 + 2 | 2 + 3 | 3 + 4 | 4 + 5 | 5 + 6 | 6 + 7 | 7 + 8 | 8 + 9 | 9 + 10 | 10 +(10 rows) + +DROP TABLE "CaseSensitiveTable"; +-- Select into distributed table with a sequence +CREATE TABLE dist_table_with_sequence (user_id serial, value_1 serial); +SELECT create_distributed_table('dist_table_with_sequence', 'user_id'); + create_distributed_table +-------------------------- + +(1 row) + +-- from local query +INSERT INTO dist_table_with_sequence (value_1) +SELECT s FROM generate_series(1,5) s; +SELECT * FROM dist_table_with_sequence ORDER BY user_id; + user_id | value_1 +---------+--------- + 1 | 1 + 2 | 2 + 3 | 3 + 4 | 4 + 5 | 5 +(5 rows) + +-- from a distributed query +INSERT INTO dist_table_with_sequence (value_1) +SELECT value_1 FROM dist_table_with_sequence; +ERROR: INSERT ... SELECT cannot generate sequence values when selecting from a distributed table +SELECT * FROM dist_table_with_sequence ORDER BY user_id; + user_id | value_1 +---------+--------- + 1 | 1 + 2 | 2 + 3 | 3 + 4 | 4 + 5 | 5 +(5 rows) + +-- Select from distributed table into reference table +CREATE TABLE ref_table (user_id int, value_1 int); +SELECT create_reference_table('ref_table'); + create_reference_table +------------------------ + +(1 row) + +INSERT INTO ref_table +SELECT user_id, value_1 FROM raw_events_second; +SELECT * FROM ref_table ORDER BY user_id, value_1; + user_id | value_1 +---------+--------- + 1 | + 2 | + 3 | + 4 | + 5 | +(5 rows) + +DROP TABLE ref_table; +-- Select into an append-partitioned table is not supported +CREATE TABLE insert_append_table (user_id int, value_4 bigint); +SELECT create_distributed_table('insert_append_table', 'user_id', 'append'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO insert_append_table (user_id, value_4) +SELECT user_id, 1 FROM raw_events_second LIMIT 5; +ERROR: INSERT ... SELECT into an append-distributed table is not supported +DROP TABLE insert_append_table; +-- Insert from other distributed table as prepared statement +TRUNCATE raw_events_first; +PREPARE insert_prep(int) AS +INSERT INTO raw_events_first (user_id, value_1) +SELECT $1, value_4 FROM raw_events_second ORDER BY value_4 LIMIT 1; +EXECUTE insert_prep(1); +EXECUTE insert_prep(2); +EXECUTE insert_prep(3); +EXECUTE insert_prep(4); +EXECUTE insert_prep(5); +EXECUTE insert_prep(6); +SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1; + user_id | value_1 +---------+--------- + 1 | 3 + 2 | 3 + 3 | 3 + 4 | 3 + 5 | 3 + 6 | 3 +(6 rows) + +-- Inserting into views is handled via coordinator +TRUNCATE raw_events_first; +INSERT INTO test_view +SELECT * FROM raw_events_second; +SELECT user_id, value_4 FROM test_view ORDER BY user_id, value_4; + user_id | value_4 +---------+--------- + 1 | 3 + 2 | 6 + 3 | 9 + 4 | 12 + 5 | 15 +(5 rows) + +-- Drop the view now, because the column we are about to drop depends on it +DROP VIEW test_view; +-- Make sure we handle dropped columns correctly +TRUNCATE raw_events_first; +ALTER TABLE raw_events_first DROP COLUMN value_1; +INSERT INTO raw_events_first (user_id, value_4) +SELECT value_4, user_id FROM raw_events_second LIMIT 5; +SELECT user_id, value_4 FROM raw_events_first ORDER BY user_id; + user_id | value_4 +---------+--------- + 3 | 1 + 6 | 2 + 9 | 3 + 12 | 4 + 15 | 5 +(5 rows) + +RESET client_min_messages; DROP TABLE raw_table; DROP TABLE summary_table; DROP TABLE raw_events_first CASCADE; -NOTICE: drop cascades to view test_view DROP TABLE raw_events_second; DROP TABLE reference_table; DROP TABLE agg_events; diff --git a/src/test/regress/expected/multi_insert_select_non_pushable_queries.out b/src/test/regress/expected/multi_insert_select_non_pushable_queries.out index ef0ec218b..d99bf6c20 100644 --- a/src/test/regress/expected/multi_insert_select_non_pushable_queries.out +++ b/src/test/regress/expected/multi_insert_select_non_pushable_queries.out @@ -66,7 +66,8 @@ FROM ( ) t2 ON (t1.user_id = t2.user_id) GROUP BY t1.user_id, hasdone_event ) t GROUP BY user_id, hasdone_event; -ERROR: Set operations are not allowed in INSERT ... SELECT queries +ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys +DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator. -- not pushable since the JOIN is not an equi join right part of the UNION -- is not joined on the partition key INSERT INTO agg_results_third (user_id, value_1_agg, value_2_agg ) @@ -107,7 +108,8 @@ FROM ( ) t2 ON (t1.user_id = t2.user_id) GROUP BY t1.user_id, hasdone_event ) t GROUP BY user_id, hasdone_event; -ERROR: Set operations are not allowed in INSERT ... SELECT queries +ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys +DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator. -- the LEFT JOIN conditon is not on the partition column (i.e., is it part_key divided by 2) INSERT INTO agg_results_third (user_id, value_1_agg, value_2_agg ) SELECT user_id, sum(array_length(events_table, 1)), length(hasdone_event) @@ -147,7 +149,8 @@ FROM ( ) t2 ON (t1.user_id = (t2.user_id)/2) GROUP BY t1.user_id, hasdone_event ) t GROUP BY user_id, hasdone_event; -ERROR: Set operations are not allowed in INSERT ... SELECT queries +ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys +DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator. ------------------------------------ ------------------------------------ -- Funnel, grouped by the number of times a user has done an event @@ -220,7 +223,8 @@ GROUP BY count_pay, user_id ORDER BY count_pay; -ERROR: Set operations are not allowed in INSERT ... SELECT queries +ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys +DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator. -- not pushable since the JOIN condition is not equi JOIN -- (subquery_1 JOIN subquery_2) INSERT INTO agg_results_third (user_id, value_1_agg, value_2_agg) @@ -288,7 +292,8 @@ GROUP BY count_pay, user_id ORDER BY count_pay; -ERROR: Set operations are not allowed in INSERT ... SELECT queries +ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys +DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator. ------------------------------------ ------------------------------------ -- Most recently seen users_table events_table @@ -686,4 +691,5 @@ FROM ( GROUP BY user_id ) AS shard_union ORDER BY user_lastseen DESC; -ERROR: Subqueries without relations are not allowed in INSERT ... SELECT queries +ERROR: cannot push down this subquery +DETAIL: Subqueries without relations are unsupported diff --git a/src/test/regress/expected/multi_mx_modifications.out b/src/test/regress/expected/multi_mx_modifications.out index f306fbf6b..35fa1d657 100644 --- a/src/test/regress/expected/multi_mx_modifications.out +++ b/src/test/regress/expected/multi_mx_modifications.out @@ -108,10 +108,6 @@ DELETE FROM limit_orders_mx WHERE id = 246 AND placed_at = current_timestamp::ti INSERT INTO limit_orders_mx VALUES (DEFAULT), (DEFAULT); ERROR: cannot perform distributed planning for the given modification DETAIL: Multi-row INSERTs to distributed tables are not supported. --- INSERT ... SELECT ... FROM commands are unsupported from workers -INSERT INTO limit_orders_mx SELECT * FROM limit_orders_mx; -ERROR: operation is not allowed on this node -HINT: Connect to the coordinator and run it again. -- connect back to the other node \c - - - :worker_1_port -- commands containing a CTE are unsupported @@ -433,6 +429,21 @@ SELECT * FROM multiple_hash_mx WHERE category = '2' ORDER BY category, data; ----------+------ (0 rows) +--- INSERT ... SELECT ... FROM commands are supported from workers +INSERT INTO multiple_hash_mx +SELECT s, s*2 FROM generate_series(1,10) s; +INSERT 0 10 +-- but are never distributed +BEGIN; +BEGIN +SET LOCAL client_min_messages TO DEBUG1; +SET +INSERT INTO multiple_hash_mx SELECT * FROM multiple_hash_mx; +DEBUG: distributed INSERT ... SELECT can only be performed from the coordinator +DEBUG: Collecting INSERT ... SELECT results on coordinator +INSERT 0 10 +END; +COMMIT -- verify interaction of default values, SERIAL, and RETURNING \set QUIET on INSERT INTO app_analytics_events_mx VALUES (DEFAULT, 101, 'Fauxkemon Geaux') RETURNING id; diff --git a/src/test/regress/expected/multi_reference_table.out b/src/test/regress/expected/multi_reference_table.out index dc1b94891..3a4c4a8e4 100644 --- a/src/test/regress/expected/multi_reference_table.out +++ b/src/test/regress/expected/multi_reference_table.out @@ -1091,12 +1091,11 @@ LOG: join order: [ "colocated_table_test" ][ broadcast join "reference_table_te 2 (2 rows) -SET client_min_messages TO NOTICE; SET citus.log_multi_join_order TO FALSE; SET citus.shard_count TO DEFAULT; SET citus.task_executor_type to "real-time"; -- some INSERT .. SELECT queries that involve both hash distributed and reference tables --- should error out since we're inserting into reference table where +-- should go via coordinator since we're inserting into reference table where -- not all the participants are reference tables INSERT INTO reference_table_test (value_1) @@ -1105,9 +1104,10 @@ SELECT FROM colocated_table_test, colocated_table_test_2 WHERE - colocated_table_test.value_1 = colocated_table_test.value_1; -ERROR: only reference tables may be queried when targeting a reference table with INSERT ... SELECT --- should error out, same as the above + colocated_table_test.value_1 = colocated_table_test_2.value_1; +DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT +DEBUG: Collecting INSERT ... SELECT results on coordinator +-- should go via coordinator, same as the above INSERT INTO reference_table_test (value_1) SELECT @@ -1116,7 +1116,8 @@ FROM colocated_table_test, reference_table_test WHERE colocated_table_test.value_1 = reference_table_test.value_1; -ERROR: only reference tables may be queried when targeting a reference table with INSERT ... SELECT +DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT +DEBUG: Collecting INSERT ... SELECT results on coordinator -- now, insert into the hash partitioned table and use reference -- tables in the SELECT queries INSERT INTO @@ -1150,8 +1151,7 @@ RETURNING value_1, value_2; 2 | 2 (2 rows) --- partition column value comes from reference table but still first error is --- on data type mismatch +-- partition column value comes from reference table, goes via coordinator INSERT INTO colocated_table_test (value_1, value_2) SELECT @@ -1159,11 +1159,10 @@ SELECT FROM colocated_table_test_2, reference_table_test WHERE - colocated_table_test_2.value_4 = reference_table_test.value_4 -RETURNING value_1, value_2; -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match + colocated_table_test_2.value_4 = reference_table_test.value_4; +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: The data type of the target table's partition column should exactly match the data type of the corresponding simple column reference in the subquery. --- partition column value comes from reference table which should error out +DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO colocated_table_test (value_1, value_2) SELECT @@ -1171,10 +1170,11 @@ SELECT FROM colocated_table_test_2, reference_table_test WHERE - colocated_table_test_2.value_4 = reference_table_test.value_4 -RETURNING value_1, value_2; -ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match + colocated_table_test_2.value_4 = reference_table_test.value_4; +DEBUG: cannot perform distributed INSERT INTO ... SELECT becuase the partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. +DEBUG: Collecting INSERT ... SELECT results on coordinator +RESET client_min_messages; -- some tests for mark_tables_colocated -- should error out SELECT mark_tables_colocated('colocated_table_test_2', ARRAY['reference_table_test']); diff --git a/src/test/regress/expected/multi_unsupported_worker_operations.out b/src/test/regress/expected/multi_unsupported_worker_operations.out index 437d1d300..a8101b0d5 100644 --- a/src/test/regress/expected/multi_unsupported_worker_operations.out +++ b/src/test/regress/expected/multi_unsupported_worker_operations.out @@ -266,14 +266,17 @@ SELECT count(*) FROM mx_table; 5 (1 row) --- INSERT / SELECT +-- INSERT / SELECT pulls results to worker +BEGIN; +SET LOCAL client_min_messages TO DEBUG; INSERT INTO mx_table_2 SELECT * FROM mx_table; -ERROR: operation is not allowed on this node -HINT: Connect to the coordinator and run it again. +DEBUG: distributed INSERT ... SELECT can only be performed from the coordinator +DEBUG: Collecting INSERT ... SELECT results on coordinator +END; SELECT count(*) FROM mx_table_2; count ------- - 0 + 5 (1 row) -- mark_tables_colocated diff --git a/src/test/regress/expected/multi_view.out b/src/test/regress/expected/multi_view.out index eccad4b00..a1a5822dc 100644 --- a/src/test/regress/expected/multi_view.out +++ b/src/test/regress/expected/multi_view.out @@ -197,9 +197,6 @@ SELECT count(*) FROM temp_lineitem; 1706 (1 row) --- modifying views is disallowed -INSERT INTO air_shipped_lineitems SELECT * from temp_lineitem; -ERROR: cannot insert into view over distributed table SET citus.task_executor_type to "task-tracker"; -- single view repartition subqueries are not supported SELECT l_suppkey, count(*) FROM diff --git a/src/test/regress/sql/multi_behavioral_analytics_basics.sql b/src/test/regress/sql/multi_behavioral_analytics_basics.sql index b8ad4baa7..720d55fe2 100644 --- a/src/test/regress/sql/multi_behavioral_analytics_basics.sql +++ b/src/test/regress/sql/multi_behavioral_analytics_basics.sql @@ -71,7 +71,7 @@ FROM ( ) t GROUP BY user_id, hasdone_event; -- get some statistics from the aggregated results to ensure the results are correct --- SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results; +SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results; ------------------------------------ ------------------------------------ @@ -148,7 +148,7 @@ ORDER BY count_pay; -- get some statistics from the aggregated results to ensure the results are correct --- SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results; +SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results; ------------------------------------ ------------------------------------ @@ -417,4 +417,4 @@ FROM -- get some statistics from the aggregated results to ensure the results are correct SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results; - \ No newline at end of file + diff --git a/src/test/regress/sql/multi_behavioral_analytics_single_shard_queries.sql b/src/test/regress/sql/multi_behavioral_analytics_single_shard_queries.sql index ede431ee5..54e63b722 100644 --- a/src/test/regress/sql/multi_behavioral_analytics_single_shard_queries.sql +++ b/src/test/regress/sql/multi_behavioral_analytics_single_shard_queries.sql @@ -146,7 +146,7 @@ FROM ( ) t GROUP BY user_id, hasdone_event; -- get some statistics from the aggregated results to ensure the results are correct --- SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results_second; +SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results_second; ------------------------------------ @@ -398,4 +398,4 @@ INSERT INTO agg_results_second(user_id, value_2_agg) -- get some statistics from the aggregated results to ensure the results are correct SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results_second; - \ No newline at end of file + diff --git a/src/test/regress/sql/multi_explain.sql b/src/test/regress/sql/multi_explain.sql index e5d32c922..b6099e389 100644 --- a/src/test/regress/sql/multi_explain.sql +++ b/src/test/regress/sql/multi_explain.sql @@ -465,3 +465,32 @@ ALTER TABLE explain_table ADD COLUMN value int; EXPLAIN (COSTS FALSE) SELECT value FROM explain_table WHERE id = 1; ROLLBACK; + +-- test explain with local INSERT ... SELECT +EXPLAIN (COSTS OFF) +INSERT INTO lineitem_hash_part +SELECT o_orderkey FROM orders_hash_part LIMIT 3; + +SELECT true AS valid FROM explain_json($$ + INSERT INTO lineitem_hash_part (l_orderkey) + SELECT o_orderkey FROM orders_hash_part LIMIT 3; +$$); + +EXPLAIN (COSTS OFF) +INSERT INTO lineitem_hash_part (l_orderkey, l_quantity) +SELECT o_orderkey, 5 FROM orders_hash_part LIMIT 3; + +EXPLAIN (COSTS OFF) +INSERT INTO lineitem_hash_part (l_orderkey) +SELECT s FROM generate_series(1,5) s; + +EXPLAIN (COSTS OFF) +WITH cte1 AS (SELECT s FROM generate_series(1,10) s) +INSERT INTO lineitem_hash_part +WITH cte1 AS (SELECT * FROM cte1 LIMIT 5) +SELECT s FROM cte1; + +EXPLAIN (COSTS OFF) +INSERT INTO lineitem_hash_part +( SELECT s FROM generate_series(1,5) s) UNION +( SELECT s FROM generate_series(5,10) s); diff --git a/src/test/regress/sql/multi_insert_select.sql b/src/test/regress/sql/multi_insert_select.sql index 53c510f4e..7d49b6e2b 100644 --- a/src/test/regress/sql/multi_insert_select.sql +++ b/src/test/regress/sql/multi_insert_select.sql @@ -116,21 +116,6 @@ WHERE \set VERBOSITY default --- volatile functions should be disallowed -INSERT INTO raw_events_second (user_id, value_1) -SELECT - user_id, (random()*10)::int -FROM - raw_events_first; - -INSERT INTO raw_events_second (user_id, value_1) -WITH sub_cte AS (SELECT (random()*10)::int) -SELECT - user_id, (SELECT * FROM sub_cte) -FROM - raw_events_first; - - -- add one more row INSERT INTO raw_events_first (user_id, time) VALUES (7, now()); @@ -517,7 +502,9 @@ INSERT INTO agg_events FROM raw_events_first; --- We do not support any set operations +-- We support set operations via the coordinator +BEGIN; + INSERT INTO raw_events_first(user_id) SELECT @@ -526,13 +513,15 @@ FROM ((SELECT user_id FROM raw_events_first) UNION (SELECT user_id FROM raw_events_second)) as foo; +ROLLBACK; + -- We do not support any set operations INSERT INTO raw_events_first(user_id) (SELECT user_id FROM raw_events_first) INTERSECT (SELECT user_id FROM raw_events_first); --- We do not support any set operations +-- If the query is router plannable then it is executed via the coordinator INSERT INTO raw_events_first(user_id) SELECT @@ -1389,9 +1378,6 @@ INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) INSERT INTO raw_events_second SELECT * FROM test_view WHERE user_id = 17 GROUP BY 1,2,3,4,5,6; SELECT count(*) FROM raw_events_second; --- inserting into views does not -INSERT INTO test_view SELECT * FROM raw_events_second; - -- we need this in our next test truncate raw_events_first; @@ -1611,6 +1597,8 @@ SELECT create_distributed_table('text_table', 'part_col'); SELECT create_distributed_table('char_table','part_col'); SELECT create_distributed_table('table_with_starts_with_defaults', 'c'); +SET client_min_messages TO DEBUG; + INSERT INTO text_table (part_col) SELECT CASE WHEN part_col = 'onder' THEN 'marco' @@ -1628,6 +1616,9 @@ INSERT INTO text_table (part_col) SELECT part_col::text from char_table; INSERT INTO text_table (part_col) SELECT (part_col = 'burak') is true FROM text_table; INSERT INTO text_table (part_col) SELECT val FROM text_table; INSERT INTO text_table (part_col) SELECT val::text FROM text_table; + +RESET client_min_messages; + insert into table_with_starts_with_defaults (b,c) select b,c FROM table_with_starts_with_defaults; -- Test on partition column without native hash function @@ -1651,6 +1642,261 @@ INSERT INTO summary_table SELECT time, COUNT(*) FROM raw_table GROUP BY time; SELECT * FROM summary_table; +-- Test INSERT ... SELECT via coordinator + +-- Select from constants +TRUNCATE raw_events_first; + +INSERT INTO raw_events_first (user_id, value_1) +SELECT * FROM (VALUES (1,2), (3,4), (5,6)) AS v(int,int); + +SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id; + +-- Select from local functions +TRUNCATE raw_events_first; + +CREATE SEQUENCE insert_select_test_seq; + +SET client_min_messages TO DEBUG; + +INSERT INTO raw_events_first (user_id, value_1, value_2) +SELECT + s, nextval('insert_select_test_seq'), (random()*10)::int +FROM + generate_series(1, 5) s; + +SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1; + +-- ON CONFLICT is unsupported +INSERT INTO raw_events_first (user_id, value_1) +SELECT s, nextval('insert_select_test_seq') FROM generate_series(1, 5) s +ON CONFLICT DO NOTHING; + +-- RETURNING is unsupported +INSERT INTO raw_events_first (user_id, value_1) +SELECT s, nextval('insert_select_test_seq') FROM generate_series(1, 5) s +RETURNING *; + +RESET client_min_messages; + +-- INSERT ... SELECT and multi-shard SELECT in the same transaction is unsupported +TRUNCATE raw_events_first; + +BEGIN; +INSERT INTO raw_events_first (user_id, value_1) +SELECT s, s FROM generate_series(1, 5) s; +SELECT user_id, value_1 FROM raw_events_first; +ROLLBACK; + +-- INSERT ... SELECT and single-shard SELECT in the same transaction is supported +TRUNCATE raw_events_first; + +BEGIN; +INSERT INTO raw_events_first (user_id, value_1) +SELECT s, s FROM generate_series(1, 5) s; +SELECT user_id, value_1 FROM raw_events_first WHERE user_id = 1; +COMMIT; + +-- Select from local table +TRUNCATE raw_events_first; + +CREATE TEMPORARY TABLE raw_events_first_local AS +SELECT s AS u, 2*s AS v FROM generate_series(1, 5) s; + +INSERT INTO raw_events_first (user_id, value_1) +SELECT u, v FROM raw_events_first_local; + +SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1; + +-- Use columns in opposite order +TRUNCATE raw_events_first; + +INSERT INTO raw_events_first (value_1, user_id) +SELECT u, v FROM raw_events_first_local; + +SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1; + +-- Set operations can work with opposite column order +TRUNCATE raw_events_first; + +INSERT INTO raw_events_first (value_3, user_id) +( SELECT v, u::bigint FROM raw_events_first_local ) +UNION ALL +( SELECT v, u FROM raw_events_first_local ); + +SELECT user_id, value_3 FROM raw_events_first ORDER BY user_id, value_3; + +-- Select from other distributed table with limit +TRUNCATE raw_events_first; +TRUNCATE raw_events_second; + +INSERT INTO raw_events_second (user_id, value_4) +SELECT s, 3*s FROM generate_series (1,5) s; + +INSERT INTO raw_events_first (user_id, value_1) +SELECT user_id, value_4 FROM raw_events_second LIMIT 5; + +SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1; + +-- CTEs are supported in local queries +TRUNCATE raw_events_first; + +WITH removed_rows AS ( + DELETE FROM raw_events_first_local RETURNING u +) +INSERT INTO raw_events_first (user_id, value_1) +WITH value AS (SELECT 1) +SELECT * FROM removed_rows, value; + +SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1; + +-- nested CTEs are also supported +TRUNCATE raw_events_first; +INSERT INTO raw_events_first_local SELECT s, 2*s FROM generate_series(0, 10) s; + +WITH rows_to_remove AS ( + SELECT u FROM raw_events_first_local WHERE u > 0 +), +removed_rows AS ( + DELETE FROM raw_events_first_local + WHERE u IN (SELECT * FROM rows_to_remove) + RETURNING u, v +) +INSERT INTO raw_events_first (user_id, value_1) +WITH ultra_rows AS ( + WITH numbers AS ( + SELECT s FROM generate_series(1,10) s + ), + super_rows AS ( + SELECT u, v FROM removed_rows JOIN numbers ON (u = s) + ) + SELECT * FROM super_rows LIMIT 5 +) +SELECT u, v FROM ultra_rows; + +SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1; + +-- CTEs with duplicate names are also supported +TRUNCATE raw_events_first; + +WITH super_rows AS ( + SELECT u FROM raw_events_first_local +) +INSERT INTO raw_events_first (user_id, value_1) +WITH super_rows AS ( + SELECT * FROM super_rows GROUP BY u +) +SELECT u, 5 FROM super_rows; + +SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1; + +-- CTEs are supported in router queries +TRUNCATE raw_events_first; + +WITH user_two AS ( + SELECT user_id, value_4 FROM raw_events_second WHERE user_id = 2 +) +INSERT INTO raw_events_first (user_id, value_1) +SELECT * FROM user_two; + +SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1; + +-- CTEs are supported when there are name collisions +WITH numbers AS ( + SELECT s FROM generate_series(1,10) s +) +INSERT INTO raw_events_first(user_id, value_1) +WITH numbers AS ( + SELECT s, s FROM generate_series(1,5) s +) +SELECT * FROM numbers; + +-- Select into distributed table with a sequence +CREATE TABLE "CaseSensitiveTable" ("UserID" int, "Value1" int); +SELECT create_distributed_table('"CaseSensitiveTable"', 'UserID'); + +INSERT INTO "CaseSensitiveTable" +SELECT s, s FROM generate_series(1,10) s; + +SELECT * FROM "CaseSensitiveTable" ORDER BY "UserID"; + +DROP TABLE "CaseSensitiveTable"; + +-- Select into distributed table with a sequence +CREATE TABLE dist_table_with_sequence (user_id serial, value_1 serial); +SELECT create_distributed_table('dist_table_with_sequence', 'user_id'); + +-- from local query +INSERT INTO dist_table_with_sequence (value_1) +SELECT s FROM generate_series(1,5) s; + +SELECT * FROM dist_table_with_sequence ORDER BY user_id; + +-- from a distributed query +INSERT INTO dist_table_with_sequence (value_1) +SELECT value_1 FROM dist_table_with_sequence; + +SELECT * FROM dist_table_with_sequence ORDER BY user_id; + +-- Select from distributed table into reference table +CREATE TABLE ref_table (user_id int, value_1 int); +SELECT create_reference_table('ref_table'); + +INSERT INTO ref_table +SELECT user_id, value_1 FROM raw_events_second; + +SELECT * FROM ref_table ORDER BY user_id, value_1; + +DROP TABLE ref_table; + +-- Select into an append-partitioned table is not supported +CREATE TABLE insert_append_table (user_id int, value_4 bigint); +SELECT create_distributed_table('insert_append_table', 'user_id', 'append'); + +INSERT INTO insert_append_table (user_id, value_4) +SELECT user_id, 1 FROM raw_events_second LIMIT 5; + +DROP TABLE insert_append_table; + +-- Insert from other distributed table as prepared statement +TRUNCATE raw_events_first; + +PREPARE insert_prep(int) AS +INSERT INTO raw_events_first (user_id, value_1) +SELECT $1, value_4 FROM raw_events_second ORDER BY value_4 LIMIT 1; + +EXECUTE insert_prep(1); +EXECUTE insert_prep(2); +EXECUTE insert_prep(3); +EXECUTE insert_prep(4); +EXECUTE insert_prep(5); +EXECUTE insert_prep(6); + +SELECT user_id, value_1 FROM raw_events_first ORDER BY user_id, value_1; + +-- Inserting into views is handled via coordinator +TRUNCATE raw_events_first; + +INSERT INTO test_view +SELECT * FROM raw_events_second; + +SELECT user_id, value_4 FROM test_view ORDER BY user_id, value_4; + +-- Drop the view now, because the column we are about to drop depends on it +DROP VIEW test_view; + +-- Make sure we handle dropped columns correctly +TRUNCATE raw_events_first; + +ALTER TABLE raw_events_first DROP COLUMN value_1; + +INSERT INTO raw_events_first (user_id, value_4) +SELECT value_4, user_id FROM raw_events_second LIMIT 5; + +SELECT user_id, value_4 FROM raw_events_first ORDER BY user_id; + +RESET client_min_messages; + DROP TABLE raw_table; DROP TABLE summary_table; DROP TABLE raw_events_first CASCADE; diff --git a/src/test/regress/sql/multi_mx_modifications.sql b/src/test/regress/sql/multi_mx_modifications.sql index 334d174cb..2c9509407 100644 --- a/src/test/regress/sql/multi_mx_modifications.sql +++ b/src/test/regress/sql/multi_mx_modifications.sql @@ -79,9 +79,6 @@ DELETE FROM limit_orders_mx WHERE id = 246 AND placed_at = current_timestamp::ti -- commands with multiple rows are unsupported INSERT INTO limit_orders_mx VALUES (DEFAULT), (DEFAULT); --- INSERT ... SELECT ... FROM commands are unsupported from workers -INSERT INTO limit_orders_mx SELECT * FROM limit_orders_mx; - -- connect back to the other node \c - - - :worker_1_port @@ -273,6 +270,16 @@ DELETE FROM multiple_hash_mx WHERE category = '1' RETURNING category; SELECT * FROM multiple_hash_mx WHERE category = '1' ORDER BY category, data; SELECT * FROM multiple_hash_mx WHERE category = '2' ORDER BY category, data; +--- INSERT ... SELECT ... FROM commands are supported from workers +INSERT INTO multiple_hash_mx +SELECT s, s*2 FROM generate_series(1,10) s; + +-- but are never distributed +BEGIN; +SET LOCAL client_min_messages TO DEBUG1; +INSERT INTO multiple_hash_mx SELECT * FROM multiple_hash_mx; +END; + -- verify interaction of default values, SERIAL, and RETURNING \set QUIET on diff --git a/src/test/regress/sql/multi_reference_table.sql b/src/test/regress/sql/multi_reference_table.sql index 46dcaa3bf..85f656961 100644 --- a/src/test/regress/sql/multi_reference_table.sql +++ b/src/test/regress/sql/multi_reference_table.sql @@ -675,7 +675,6 @@ WHERE colocated_table_test.value_1 = reference_table_test.value_1 AND colocated_table_test_2.value_1 = reference_table_test.value_1; -SET client_min_messages TO NOTICE; SET citus.log_multi_join_order TO FALSE; SET citus.shard_count TO DEFAULT; @@ -683,7 +682,7 @@ SET citus.task_executor_type to "real-time"; -- some INSERT .. SELECT queries that involve both hash distributed and reference tables --- should error out since we're inserting into reference table where +-- should go via coordinator since we're inserting into reference table where -- not all the participants are reference tables INSERT INTO reference_table_test (value_1) @@ -692,9 +691,9 @@ SELECT FROM colocated_table_test, colocated_table_test_2 WHERE - colocated_table_test.value_1 = colocated_table_test.value_1; + colocated_table_test.value_1 = colocated_table_test_2.value_1; --- should error out, same as the above +-- should go via coordinator, same as the above INSERT INTO reference_table_test (value_1) SELECT @@ -727,8 +726,7 @@ WHERE colocated_table_test_2.value_2 = reference_table_test.value_2 RETURNING value_1, value_2; --- partition column value comes from reference table but still first error is --- on data type mismatch +-- partition column value comes from reference table, goes via coordinator INSERT INTO colocated_table_test (value_1, value_2) SELECT @@ -736,10 +734,8 @@ SELECT FROM colocated_table_test_2, reference_table_test WHERE - colocated_table_test_2.value_4 = reference_table_test.value_4 -RETURNING value_1, value_2; + colocated_table_test_2.value_4 = reference_table_test.value_4; --- partition column value comes from reference table which should error out INSERT INTO colocated_table_test (value_1, value_2) SELECT @@ -747,9 +743,9 @@ SELECT FROM colocated_table_test_2, reference_table_test WHERE - colocated_table_test_2.value_4 = reference_table_test.value_4 -RETURNING value_1, value_2; + colocated_table_test_2.value_4 = reference_table_test.value_4; +RESET client_min_messages; -- some tests for mark_tables_colocated -- should error out diff --git a/src/test/regress/sql/multi_unsupported_worker_operations.sql b/src/test/regress/sql/multi_unsupported_worker_operations.sql index 89789de18..c98b233cd 100644 --- a/src/test/regress/sql/multi_unsupported_worker_operations.sql +++ b/src/test/regress/sql/multi_unsupported_worker_operations.sql @@ -137,8 +137,12 @@ SELECT master_remove_node('localhost', 5432); TRUNCATE mx_table; SELECT count(*) FROM mx_table; --- INSERT / SELECT +-- INSERT / SELECT pulls results to worker +BEGIN; +SET LOCAL client_min_messages TO DEBUG; INSERT INTO mx_table_2 SELECT * FROM mx_table; +END; + SELECT count(*) FROM mx_table_2; -- mark_tables_colocated diff --git a/src/test/regress/sql/multi_view.sql b/src/test/regress/sql/multi_view.sql index c80977fb2..05dab5fd8 100644 --- a/src/test/regress/sql/multi_view.sql +++ b/src/test/regress/sql/multi_view.sql @@ -93,9 +93,6 @@ SELECT count(*) FROM temp_lineitem; INSERT INTO temp_lineitem SELECT * FROM air_shipped_lineitems WHERE l_shipmode = 'MAIL'; SELECT count(*) FROM temp_lineitem; --- modifying views is disallowed -INSERT INTO air_shipped_lineitems SELECT * from temp_lineitem; - SET citus.task_executor_type to "task-tracker"; -- single view repartition subqueries are not supported