diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 3a07aecb2..13359d3d5 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -57,23 +57,18 @@ multi_ExecutorStart(QueryDesc *queryDesc, int eflags) executorType = JobExecutorType(multiPlan); if (executorType == MULTI_EXECUTOR_ROUTER) { - Task *task = NULL; - List *taskList = workerJob->taskList; TupleDesc tupleDescriptor = ExecCleanTypeFromTL( planStatement->planTree->targetlist, false); List *dependendJobList PG_USED_FOR_ASSERTS_ONLY = workerJob->dependedJobList; - /* router executor can only execute distributed plans with a single task */ - Assert(list_length(taskList) == 1); + /* router executor cannot execute task with depencencies */ Assert(dependendJobList == NIL); - task = (Task *) linitial(taskList); - /* we need to set tupleDesc in executorStart */ queryDesc->tupDesc = tupleDescriptor; /* drop into the router executor */ - RouterExecutorStart(queryDesc, eflags, task); + RouterExecutorStart(queryDesc, eflags); } else { diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 071038024..cb9171bb4 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -133,14 +133,11 @@ static void MarkRemainingInactivePlacements(void); * execution. */ void -RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task) +RouterExecutorStart(QueryDesc *queryDesc, int eflags) { EState *executorState = NULL; CmdType commandType = queryDesc->operation; - /* ensure that the task is not NULL */ - Assert(task != NULL); - /* disallow triggers during distributed modify commands */ if (commandType != CMD_SELECT) { diff --git a/src/backend/distributed/executor/multi_server_executor.c b/src/backend/distributed/executor/multi_server_executor.c index 703fa4449..33aa6d65f 100644 --- a/src/backend/distributed/executor/multi_server_executor.c +++ b/src/backend/distributed/executor/multi_server_executor.c @@ -127,8 +127,8 @@ RouterExecutablePlan(MultiPlan *multiPlan, MultiExecutorType executorType) List *workerDependentTaskList = NIL; bool masterQueryHasAggregates = false; - /* router executor cannot execute queries that hit more than one shard */ - if (taskCount != 1) + /* router executor cannot execute queries that hit zero shards */ + if (taskCount == 0) { return false; } @@ -141,6 +141,12 @@ RouterExecutablePlan(MultiPlan *multiPlan, MultiExecutorType executorType) return true; } + /* router executor cannot execute SELECT queries that hit more than one shard */ + if (taskCount != 1) + { + return false; + } + if (executorType == MULTI_EXECUTOR_TASK_TRACKER) { return false; diff --git a/src/backend/distributed/planner/multi_planner.c b/src/backend/distributed/planner/multi_planner.c index d4dfc9f98..51b5f14c0 100644 --- a/src/backend/distributed/planner/multi_planner.c +++ b/src/backend/distributed/planner/multi_planner.c @@ -72,8 +72,10 @@ multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) * then generate a query for that individual shard. If any of the * involved tables don't prune down to a single shard, or if the * pruned shards aren't colocated, we error out. + * + * TODO: we currently not support CTEs. */ - if (InsertSelectQuery(parse)) + if (InsertSelectQuery(parse) && parse->cteList == NULL) { AddHiddenPartitionColumnParameter(parse); } diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 063942354..9ad4ad18e 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -106,12 +106,14 @@ static bool MultiRouterPlannableQuery(Query *query, MultiExecutorType taskExecut static RelationRestrictionContext * copyRelationRestrictionContext( RelationRestrictionContext *oldContext); static Node * ReplaceHiddenParameter(Node *node, void *context); -static Var * MakeInt4Column(); +static Var * MakeInt4Column(void); static Const * MakeInt4Constant(Datum constantValue); static void ErrorIfInsertSelectQueryNotSupported(Query *queryTree); +static void ErrorIfMultiTaskRouterSelectQueryUnsupported(Query *query); static void ErrorIfNotAllParticipatingTablesAreColocated(Query *query); static void ErrorIfInsertPartitionColumnDoesNotMatchSelect(Query *query); -static void AddHiddenParameterToFirstTableRecursively(Query *query); +static Query * FirstQueryReferencingDistributedTable(Query *query); +static void AddHiddenParameterToFirstTable(Query *query); /* @@ -249,6 +251,7 @@ CreateMultiTaskRouterPlan(Query *originalQuery, Query *query, uint64 selectAnchorShardId = INVALID_SHARD_ID; List *insertShardPlacementList = NULL; List *intersectedPlacementList = NULL; + RangeTblEntry *rangeTableEntry = NULL; /* * Replace the magic value in all baserestrictinfos. Note that @@ -276,7 +279,11 @@ CreateMultiTaskRouterPlan(Query *originalQuery, Query *query, if (routerQuery == NULL) { - elog(ERROR, "couldn't prune down sufficiently for insert pushdown"); + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot perform distributed planning for the given " + "modification"), + errdetail( + "Select query cannot be pushed down to the worker."))); } /* Ensure that we have INSERTed table's placement exists on the same worker */ @@ -287,21 +294,34 @@ CreateMultiTaskRouterPlan(Query *originalQuery, Query *query, if (list_length(insertShardPlacementList) != list_length( intersectedPlacementList)) { - ereport(DEBUG2, (errmsg("insert table does not have the same placements on " - "the select placement list. Skipping this task"))); + ereport(DEBUG2, (errmsg("skipping the task"), + errdetail("Insert query hits %d placements, Select query " + "hits %d placements and only %d of those placements match.", + list_length(insertShardPlacementList), list_length( + selectPlacementList), + list_length(intersectedPlacementList)))); continue; } + /* this is required for correct deparsing of the query */ ReorderInsertSelectTargetListsIfExists(copiedOriginal); + /* setting an alias simplifies deparsing of RETURNING */ + rangeTableEntry = linitial(copiedOriginal->rtable); + if (rangeTableEntry->alias == NULL) + { + Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL); + rangeTableEntry->alias = alias; + } + /* and generate the full query string */ deparse_shard_query(copiedOriginal, distributedTableId, shardInterval->shardId, queryString); ereport(DEBUG4, (errmsg("distributed statement: %s", queryString->data))); - sqlTask = CreateBasicTask(jobId, taskIdIndex++, SQL_TASK, queryString->data); + sqlTask = CreateBasicTask(jobId, taskIdIndex++, MODIFY_TASK, queryString->data); sqlTask->dependedTaskList = NULL; sqlTask->anchorShardId = shardId; sqlTask->taskPlacementList = insertShardPlacementList; @@ -336,18 +356,14 @@ CreateMultiTaskRouterPlan(Query *originalQuery, Query *query, static void ErrorIfInsertSelectQueryNotSupported(Query *queryTree) { - RangeTblEntry *insertRte = NULL; RangeTblEntry *subqueryRte = NULL; Query *subquery = NULL; - Oid insertRelationId = InvalidOid; /* we only do this check for INSERT ... SELECT queries */ AssertArg(InsertSelectQuery(queryTree)); - insertRte = linitial(queryTree->rtable); subqueryRte = lsecond(queryTree->rtable); subquery = subqueryRte->subquery; - insertRelationId = insertRte->relid; /* we support this feature only for colocated tables */ ErrorIfNotAllParticipatingTablesAreColocated(queryTree); @@ -355,36 +371,93 @@ ErrorIfInsertSelectQueryNotSupported(Query *queryTree) if (contain_mutable_functions((Node *) queryTree)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("non-IMMUTABLE functions are not allowed in INSERT ... " - "SELECT queries"))); + errmsg("cannot perform distributed planning for the given " + "modification"), + errdetail( + "Stable and volatile functions are not allowed in INSERT ... " + "SELECT queries"))); } - if (subquery->limitCount != NULL) + if (queryTree->cteList != NULL) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("LIMIT clause are not allowed in INSERT ... SELECT " - "queries"))); + errmsg("cannot perform distributed planning for the given " + "modification"), + errdetail( + "Common table expressions are not allowed in INSERT ... SELECT " + "queries"))); } - if (subquery->limitOffset != NULL) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("OFFSET clause are not allowed in INSERT ... SELECT " - "queries"))); - } - - /*TODO: check with Andres. Should we allow on partition column? I'm cool with not having window functions */ - if (subquery->windowClause != NULL) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("Window functions are not allowed in INSERT ... SELECT " - "queries"))); - } + /* we don't support LIMIT, OFFSET and WINDOW functions */ + ErrorIfMultiTaskRouterSelectQueryUnsupported(subquery); + /* ensure that INSERT's partition column comes from SELECT's partition column */ ErrorIfInsertPartitionColumnDoesNotMatchSelect(queryTree); } +/* + * ErrorUnsupportedMultiTaskSelectQuery errors out on queries that we support + * for single task router queries, but, cannot allow for multi task router + * queries. We do these checks recursively to prevent any wrong results. + */ +static void +ErrorIfMultiTaskRouterSelectQueryUnsupported(Query *query) +{ + List *queryList = NIL; + ListCell *queryCell = NULL; + + ExtractQueryWalker((Node *) query, &queryList); + foreach(queryCell, queryList) + { + Query *subquery = (Query *) lfirst(queryCell); + + Assert(subquery->commandType == CMD_SELECT); + + if (subquery->limitCount != NULL) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot perform distributed planning for the given " + "modification"), + errdetail( + "LIMIT clauses are not allowed in INSERT ... SELECT " + "queries"))); + } + + if (subquery->limitOffset != NULL) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot perform distributed planning for the given " + "modification"), + errdetail( + "OFFSET clauses are not allowed in INSERT ... SELECT " + "queries") + + )); + } + + if (subquery->windowClause != NULL) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot perform distributed planning for the given " + "modification"), + errdetail( + "Window functions are not allowed in INSERT ... SELECT " + "queries"))); + } + + if (subquery->setOperations != NULL) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot perform distributed planning for the given " + "modification"), + errdetail( + "Union, Intersect and Except are currently unsupported are not allowed in INSERT ... SELECT " + "queries"))); + } + } +} + /* * ErrorIfNotAllParticipatingTablesAreColocated errors out of all tables @@ -428,8 +501,6 @@ ErrorIfNotAllParticipatingTablesAreColocated(Query *query) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("all participating tables should be colocated"))); } - - return; } @@ -497,8 +568,7 @@ ErrorIfInsertPartitionColumnDoesNotMatchSelect(Query *query) /* * AddHiddenPartitionColumnParameter() can only be used with - * INSERT ... SELECT queries. We add this hidden parameter to - * recursively for subqueries. + * INSERT ... SELECT queries. * * If the input query is not INSERT .. SELECT the function errors-out. */ @@ -506,26 +576,68 @@ void AddHiddenPartitionColumnParameter(Query *originalQuery) { Query *subquery = NULL; + Query *referencedSubquery = NULL; RangeTblEntry *subqueryEntry = NULL; if (!InsertSelectQuery(originalQuery)) { elog(ERROR, "Only INSERT .. SELECT queries can be modified"); + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot perform distributed planning for the given" + " modification"))); } + /* TODO: once CTEs are present, this does not work */ subqueryEntry = (RangeTblEntry *) list_nth(originalQuery->rtable, 1); subquery = subqueryEntry->subquery; - AddHiddenParameterToFirstTableRecursively(subquery); + referencedSubquery = FirstQueryReferencingDistributedTable(subquery); + + AddHiddenParameterToFirstTable(referencedSubquery); +} + + +static Query * +FirstQueryReferencingDistributedTable(Query *query) +{ + List *queryList = NIL; + ListCell *queryCell = NULL; + Query *subquery = NULL; + + ExtractQueryWalker((Node *) query, &queryList); + + /* iterate and find the query which references to distributed table */ + foreach(queryCell, queryList) + { + Query *innerSubquery = (Query *) lfirst(queryCell); + + List *rangeTableList = NIL; + ListCell *rangeTableCell = NULL; + + /* extract range table entries */ + ExtractRangeTableEntryWalker((Node *) innerSubquery, &rangeTableList); + foreach(rangeTableCell, rangeTableList) + { + RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); + + if (IsDistributedTable(rangeTableEntry->relid)) + { + subquery = innerSubquery; + break; + } + } + } + + return subquery; } /* - * AddHiddenParameterToFirstTableRecursively adds a hidden parameter + * AddHiddenParameterToFirstTable adds a hidden parameter * ($1 = partitionColumn) for the first table on the query. */ static void -AddHiddenParameterToFirstTableRecursively(Query *query) +AddHiddenParameterToFirstTable(Query *query) { Param *hiddenParam = makeNode(Param); Node *hiddenBound = NULL; @@ -537,9 +649,6 @@ AddHiddenParameterToFirstTableRecursively(Query *query) Oid greaterOperator = InvalidOid; bool hashable = false; - List *subqueryEntryList = NIL; - ListCell *rangeTableEntryCell = NULL; - AssertArg(query->commandType == CMD_SELECT); hiddenParam->paramkind = PARAM_EXTERN; @@ -574,17 +683,6 @@ AddHiddenParameterToFirstTableRecursively(Query *query) query->jointree->quals = make_and_qual(query->jointree->quals, hiddenBound); } - - /* recursively do same addition for subqueries of this query */ - subqueryEntryList = SubqueryEntryList(query); - foreach(rangeTableEntryCell, subqueryEntryList) - { - RangeTblEntry *rangeTableEntry = - (RangeTblEntry *) lfirst(rangeTableEntryCell); - - Query *innerSubquery = rangeTableEntry->subquery; - AddHiddenParameterToFirstTableRecursively(innerSubquery); - } } @@ -1205,7 +1303,7 @@ RouterModifyTask(Query *originalQuery, Query *query) rangeTableEntry = linitial(originalQuery->rtable); if (rangeTableEntry->alias == NULL) { - Alias *alias = makeAlias(UPSERT_ALIAS, NIL); + Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL); rangeTableEntry->alias = alias; } } @@ -1494,7 +1592,8 @@ RouterSelectTask(Query *originalQuery, Query *query, task->anchorShardId = shardId; task->dependedTaskList = NIL; task->upsertQuery = upsertQuery; - //task->requiresMasterEvaluation = false; + + /* task->requiresMasterEvaluation = false; */ return task; } @@ -1622,11 +1721,6 @@ TargetShardIntervalsForSelect(Query *query, List *pseudoRestrictionList = extract_actual_clauses(joinInfoList, true); bool whereFalseQuery = false; - /* elog(DEBUG2, "relation id: %d", relationId); */ - /* elog(DEBUG2, "restrictClauseList-: %s", pretty_format_node_dump(nodeToString(restrictClauseList))); */ - /* elog(DEBUG2, "join info-: %s", pretty_format_node_dump(nodeToString(relationRestriction->relOptInfo->joininfo))); */ - - relationRestriction->prunedShardIntervalList = NIL; /* @@ -2021,6 +2115,8 @@ MultiRouterPlannableQuery(Query *query, MultiExecutorType taskExecutorType, } +#include "nodes/print.h" + /* * ReorderInsertSelectTargetListsIfExists reorders the target lists of INSERT/SELECT * query which is required for deparsing purposes. The reordered query is returned. @@ -2288,7 +2384,7 @@ ReplaceHiddenParameter(Node *node, void *context) { param = (Param *) leftop; } - else if (IsA(rightop, Param)) //IsA(leftop, Var))/* &&) */ + else if (IsA(rightop, Param)) /* IsA(leftop, Var))/ * &&) * / */ { param = (Param *) rightop; } diff --git a/src/include/distributed/multi_router_executor.h b/src/include/distributed/multi_router_executor.h index 3abbef272..fd3cc1c5e 100644 --- a/src/include/distributed/multi_router_executor.h +++ b/src/include/distributed/multi_router_executor.h @@ -33,7 +33,7 @@ typedef struct XactShardConnSet extern bool AllModificationsCommutative; -extern void RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task); +extern void RouterExecutorStart(QueryDesc *queryDesc, int eflags); extern void RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count); extern void RouterExecutorFinish(QueryDesc *queryDesc); extern void RouterExecutorEnd(QueryDesc *queryDesc); diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index 86e2e15a0..2f362f9e3 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -25,7 +25,7 @@ #define HIDDEN_PARAMETER_ID 0xdeadbeef /* reserved alias name for UPSERTs */ -#define UPSERT_ALIAS "citus_table_alias" +#define CITUS_TABLE_ALIAS "citus_table_alias" extern MultiPlan * MultiRouterPlanCreate(Query *originalQuery, Query *query, diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out new file mode 100644 index 000000000..91613c6df --- /dev/null +++ b/src/test/regress/expected/multi_insert_select.out @@ -0,0 +1,736 @@ +-- +-- MULTI_INSERT_SELECT +-- +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 13300000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 13300000; +CREATE TABLE raw_events_first (user_id int, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint, UNIQUE(user_id, value_1)); +SELECT master_create_distributed_table('raw_events_first', 'user_id', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_worker_shards('raw_events_first', 4, 2); + master_create_worker_shards +----------------------------- + +(1 row) + +CREATE TABLE raw_events_second (user_id int, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint, UNIQUE(user_id, value_1)); +SELECT master_create_distributed_table('raw_events_second', 'user_id', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_worker_shards('raw_events_second', 4, 2); + master_create_worker_shards +----------------------------- + +(1 row) + +CREATE TABLE agg_events (user_id int, value_1_agg int, value_2_agg int, value_3_agg float, value_4_agg bigint, agg_time timestamp, UNIQUE(user_id, value_1_agg)); +SELECT master_create_distributed_table('agg_events', 'user_id', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_worker_shards('agg_events', 4, 2); + master_create_worker_shards +----------------------------- + +(1 row) + +-- make tables as co-located +UPDATE pg_dist_partition SET colocationid = 100000 WHERE logicalrelid IN ('raw_events_first', 'raw_events_second', 'agg_events'); +INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES + (1, now(), 10, 100, 1000.1, 10000); +INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES + (2, now(), 20, 200, 2000.1, 20000); +INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES + (3, now(), 30, 300, 3000.1, 30000); +INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES + (4, now(), 40, 400, 4000.1, 40000); +INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES + (5, now(), 50, 500, 5000.1, 50000); +INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES + (6, now(), 60, 600, 6000.1, 60000); +SET client_min_messages TO DEBUG4; +DEBUG: CommitTransactionCommand +DEBUG: CommitTransaction +DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +-- raw table to raw table +INSERT INTO raw_events_second SELECT * FROM raw_events_first; +DEBUG: StartTransactionCommand +DEBUG: StartTransaction +DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300004 AS citus_table_alias (user_id, "time", value_1, value_2, value_3, value_4) SELECT user_id, "time", value_1, value_2, value_3, value_4 FROM public.raw_events_first_13300000 raw_events_first +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300005 AS citus_table_alias (user_id, "time", value_1, value_2, value_3, value_4) SELECT user_id, "time", value_1, value_2, value_3, value_4 FROM public.raw_events_first_13300001 raw_events_first +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300006 AS citus_table_alias (user_id, "time", value_1, value_2, value_3, value_4) SELECT user_id, "time", value_1, value_2, value_3, value_4 FROM public.raw_events_first_13300002 raw_events_first +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300007 AS citus_table_alias (user_id, "time", value_1, value_2, value_3, value_4) SELECT user_id, "time", value_1, value_2, value_3, value_4 FROM public.raw_events_first_13300003 raw_events_first +DEBUG: ProcessQuery +DEBUG: Plan is router executable +DEBUG: CommitTransactionCommand +DEBUG: CommitTransaction +DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +DEBUG: sent COMMIT over connection 13300007 +DEBUG: sent COMMIT over connection 13300007 +DEBUG: sent COMMIT over connection 13300005 +DEBUG: sent COMMIT over connection 13300005 +DEBUG: sent COMMIT over connection 13300006 +DEBUG: sent COMMIT over connection 13300006 +DEBUG: sent COMMIT over connection 13300004 +DEBUG: sent COMMIT over connection 13300004 +-- see that our first multi shard INSERT...SELECT works expected +SET client_min_messages TO INFO; +DEBUG: StartTransactionCommand +DEBUG: StartTransaction +DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +DEBUG: ProcessUtility +SELECT + raw_events_first.user_id +FROM + raw_events_first, raw_events_second +WHERE + raw_events_first.user_id = raw_events_second.user_id; + user_id +--------- + 1 + 5 + 3 + 4 + 6 + 2 +(6 rows) + +-- see that we get unique vialitons +INSERT INTO raw_events_second SELECT * FROM raw_events_first; +ERROR: duplicate key value violates unique constraint "raw_events_second_user_id_value_1_key_13300004" +DETAIL: Key (user_id, value_1)=(1, 10) already exists. +CONTEXT: while executing command on localhost:57638 +-- add one more row +INSERT INTO raw_events_first (user_id, time) VALUES + (7, now()); +-- try a single shard query +SET client_min_messages TO DEBUG4; +DEBUG: CommitTransactionCommand +DEBUG: CommitTransaction +DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +INSERT INTO raw_events_second (user_id, time) SELECT user_id, time FROM raw_events_first WHERE user_id = 7; +DEBUG: StartTransactionCommand +DEBUG: StartTransaction +DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: skipping the task +DETAIL: Insert query hits 2 placements, Select query hits 1 placements and only 1 of those placements match. +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300005 AS citus_table_alias (user_id, "time") SELECT user_id, "time" FROM public.raw_events_first_13300001 raw_events_first WHERE (user_id = 7) +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: skipping the task +DETAIL: Insert query hits 2 placements, Select query hits 1 placements and only 1 of those placements match. +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: skipping the task +DETAIL: Insert query hits 2 placements, Select query hits 1 placements and only 1 of those placements match. +DEBUG: ProcessQuery +DEBUG: Plan is router executable +DEBUG: CommitTransactionCommand +DEBUG: CommitTransaction +DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +SET client_min_messages TO INFO; +DEBUG: StartTransactionCommand +DEBUG: StartTransaction +DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +DEBUG: ProcessUtility +-- add one more row +INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES + (8, now(), 80, 800, 8000, 80000); +-- reorder columns +SET client_min_messages TO DEBUG4; +DEBUG: CommitTransactionCommand +DEBUG: CommitTransaction +DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +INSERT INTO raw_events_second (value_2, value_1, value_3, value_4, user_id, time) +SELECT + value_2, value_1, value_3, value_4, user_id, time +FROM + raw_events_first +WHERE + user_id = 8; +DEBUG: StartTransactionCommand +DEBUG: StartTransaction +DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300004 AS citus_table_alias (user_id, "time", value_1, value_2, value_3, value_4) SELECT user_id, "time", value_1, value_2, value_3, value_4 FROM public.raw_events_first_13300000 raw_events_first WHERE (user_id = 8) +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: skipping the task +DETAIL: Insert query hits 2 placements, Select query hits 1 placements and only 1 of those placements match. +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: skipping the task +DETAIL: Insert query hits 2 placements, Select query hits 1 placements and only 1 of those placements match. +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: skipping the task +DETAIL: Insert query hits 2 placements, Select query hits 1 placements and only 1 of those placements match. +DEBUG: ProcessQuery +DEBUG: Plan is router executable +DEBUG: CommitTransactionCommand +DEBUG: CommitTransaction +DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +-- add one more row +SET client_min_messages TO INFO; +DEBUG: StartTransactionCommand +DEBUG: StartTransaction +DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +DEBUG: ProcessUtility +INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES + (9, now(), 90, 900, 9000, 90000); +-- show that RETURNING also works +SET client_min_messages TO DEBUG4; +DEBUG: CommitTransactionCommand +DEBUG: CommitTransaction +DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +INSERT INTO raw_events_second (user_id, value_1, value_3) +SELECT + user_id, value_1, value_3 +FROM + raw_events_first +WHERE + value_3 = 9000 +RETURNING *; +DEBUG: StartTransactionCommand +DEBUG: StartTransaction +DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300004 AS citus_table_alias (user_id, value_1, value_3) SELECT user_id, value_1, value_3 FROM public.raw_events_first_13300000 raw_events_first WHERE (value_3 = (9000)::double precision) RETURNING citus_table_alias.user_id, citus_table_alias."time", citus_table_alias.value_1, citus_table_alias.value_2, citus_table_alias.value_3, citus_table_alias.value_4 +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300005 AS citus_table_alias (user_id, value_1, value_3) SELECT user_id, value_1, value_3 FROM public.raw_events_first_13300001 raw_events_first WHERE (value_3 = (9000)::double precision) RETURNING citus_table_alias.user_id, citus_table_alias."time", citus_table_alias.value_1, citus_table_alias.value_2, citus_table_alias.value_3, citus_table_alias.value_4 +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300006 AS citus_table_alias (user_id, value_1, value_3) SELECT user_id, value_1, value_3 FROM public.raw_events_first_13300002 raw_events_first WHERE (value_3 = (9000)::double precision) RETURNING citus_table_alias.user_id, citus_table_alias."time", citus_table_alias.value_1, citus_table_alias.value_2, citus_table_alias.value_3, citus_table_alias.value_4 +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300007 AS citus_table_alias (user_id, value_1, value_3) SELECT user_id, value_1, value_3 FROM public.raw_events_first_13300003 raw_events_first WHERE (value_3 = (9000)::double precision) RETURNING citus_table_alias.user_id, citus_table_alias."time", citus_table_alias.value_1, citus_table_alias.value_2, citus_table_alias.value_3, citus_table_alias.value_4 +DEBUG: ProcessQuery +DEBUG: Plan is router executable +DEBUG: CommitTransactionCommand +DEBUG: CommitTransaction +DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +DEBUG: sent COMMIT over connection 13300007 +DEBUG: sent COMMIT over connection 13300007 +DEBUG: sent COMMIT over connection 13300005 +DEBUG: sent COMMIT over connection 13300005 +DEBUG: sent COMMIT over connection 13300006 +DEBUG: sent COMMIT over connection 13300006 +DEBUG: sent COMMIT over connection 13300004 +DEBUG: sent COMMIT over connection 13300004 + user_id | time | value_1 | value_2 | value_3 | value_4 +---------+------+---------+---------+---------+--------- + 9 | | 90 | | 9000 | +(1 row) + +-- hits two shards +INSERT INTO raw_events_second (user_id, value_1, value_3) +SELECT + user_id, value_1, value_3 +FROM + raw_events_first +WHERE + user_id = 9 OR user_id = 16 +RETURNING *; +DEBUG: StartTransactionCommand +DEBUG: StartTransaction +DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: skipping the task +DETAIL: Insert query hits 2 placements, Select query hits 1 placements and only 1 of those placements match. +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300005 AS citus_table_alias (user_id, value_1, value_3) SELECT user_id, value_1, value_3 FROM public.raw_events_first_13300001 raw_events_first WHERE ((user_id = 9) OR (user_id = 16)) RETURNING citus_table_alias.user_id, citus_table_alias."time", citus_table_alias.value_1, citus_table_alias.value_2, citus_table_alias.value_3, citus_table_alias.value_4 +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: skipping the task +DETAIL: Insert query hits 2 placements, Select query hits 1 placements and only 1 of those placements match. +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300007 AS citus_table_alias (user_id, value_1, value_3) SELECT user_id, value_1, value_3 FROM public.raw_events_first_13300003 raw_events_first WHERE ((user_id = 9) OR (user_id = 16)) RETURNING citus_table_alias.user_id, citus_table_alias."time", citus_table_alias.value_1, citus_table_alias.value_2, citus_table_alias.value_3, citus_table_alias.value_4 +DEBUG: ProcessQuery +DEBUG: Plan is router executable +ERROR: duplicate key value violates unique constraint "raw_events_second_user_id_value_1_key_13300007" +DETAIL: Key (user_id, value_1)=(9, 90) already exists. +CONTEXT: while executing command on localhost:57637 +-- now do some aggregations +INSERT INTO agg_events +SELECT + user_id, sum(value_1), avg(value_2), sum(value_3), count(value_4) +FROM + raw_events_first +GROUP BY + user_id; +DEBUG: StartTransactionCommand +DEBUG: StartTransaction +DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS citus_table_alias (user_id, value_1_agg, value_2_agg, value_3_agg, value_4_agg) SELECT user_id, sum(value_1) AS sum, avg(value_2) AS avg, sum(value_3) AS sum, count(value_4) AS count FROM public.raw_events_first_13300000 raw_events_first GROUP BY user_id +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id, value_1_agg, value_2_agg, value_3_agg, value_4_agg) SELECT user_id, sum(value_1) AS sum, avg(value_2) AS avg, sum(value_3) AS sum, count(value_4) AS count FROM public.raw_events_first_13300001 raw_events_first GROUP BY user_id +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id, value_1_agg, value_2_agg, value_3_agg, value_4_agg) SELECT user_id, sum(value_1) AS sum, avg(value_2) AS avg, sum(value_3) AS sum, count(value_4) AS count FROM public.raw_events_first_13300002 raw_events_first GROUP BY user_id +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id, value_1_agg, value_2_agg, value_3_agg, value_4_agg) SELECT user_id, sum(value_1) AS sum, avg(value_2) AS avg, sum(value_3) AS sum, count(value_4) AS count FROM public.raw_events_first_13300003 raw_events_first GROUP BY user_id +DEBUG: ProcessQuery +DEBUG: Plan is router executable +DEBUG: CommitTransactionCommand +DEBUG: CommitTransaction +DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +DEBUG: sent COMMIT over connection 13300008 +DEBUG: sent COMMIT over connection 13300008 +DEBUG: sent COMMIT over connection 13300011 +DEBUG: sent COMMIT over connection 13300011 +DEBUG: sent COMMIT over connection 13300009 +DEBUG: sent COMMIT over connection 13300009 +DEBUG: sent COMMIT over connection 13300010 +DEBUG: sent COMMIT over connection 13300010 +-- group by column not exists on the SELECT target list +-- TODO: there is a bug on RETURNING +INSERT INTO agg_events (value_3_agg, value_4_agg, value_1_agg, user_id) +SELECT + sum(value_3), count(value_4), sum(value_1), user_id +FROM + raw_events_first +GROUP BY + value_2, user_id +RETURNING *; +DEBUG: StartTransactionCommand +DEBUG: StartTransaction +DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS citus_table_alias (user_id, value_1_agg, value_3_agg, value_4_agg) SELECT user_id, sum(value_1) AS sum, sum(value_3) AS sum, count(value_4) AS count FROM public.raw_events_first_13300000 raw_events_first GROUP BY value_2, user_id RETURNING citus_table_alias.user_id, citus_table_alias.value_1_agg, citus_table_alias.value_2_agg, citus_table_alias.value_3_agg, citus_table_alias.value_4_agg, citus_table_alias.agg_time +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id, value_1_agg, value_3_agg, value_4_agg) SELECT user_id, sum(value_1) AS sum, sum(value_3) AS sum, count(value_4) AS count FROM public.raw_events_first_13300001 raw_events_first GROUP BY value_2, user_id RETURNING citus_table_alias.user_id, citus_table_alias.value_1_agg, citus_table_alias.value_2_agg, citus_table_alias.value_3_agg, citus_table_alias.value_4_agg, citus_table_alias.agg_time +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id, value_1_agg, value_3_agg, value_4_agg) SELECT user_id, sum(value_1) AS sum, sum(value_3) AS sum, count(value_4) AS count FROM public.raw_events_first_13300002 raw_events_first GROUP BY value_2, user_id RETURNING citus_table_alias.user_id, citus_table_alias.value_1_agg, citus_table_alias.value_2_agg, citus_table_alias.value_3_agg, citus_table_alias.value_4_agg, citus_table_alias.agg_time +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id, value_1_agg, value_3_agg, value_4_agg) SELECT user_id, sum(value_1) AS sum, sum(value_3) AS sum, count(value_4) AS count FROM public.raw_events_first_13300003 raw_events_first GROUP BY value_2, user_id RETURNING citus_table_alias.user_id, citus_table_alias.value_1_agg, citus_table_alias.value_2_agg, citus_table_alias.value_3_agg, citus_table_alias.value_4_agg, citus_table_alias.agg_time +DEBUG: ProcessQuery +DEBUG: Plan is router executable +ERROR: duplicate key value violates unique constraint "agg_events_user_id_value_1_agg_key_13300008" +DETAIL: Key (user_id, value_1_agg)=(8, 80) already exists. +CONTEXT: while executing command on localhost:57638 +-- some subquery tests +INSERT INTO agg_events + (value_1_agg, + user_id) +SELECT SUM(value_1), + id +FROM (SELECT raw_events_second.user_id AS id, + raw_events_second.value_1 + FROM raw_events_first, + raw_events_second + WHERE raw_events_first.user_id = raw_events_second.user_id) AS foo +GROUP BY id; +DEBUG: StartTransactionCommand +DEBUG: StartTransaction +DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS citus_table_alias (user_id, value_1_agg) SELECT id, sum(value_1) AS sum FROM (SELECT raw_events_second.user_id AS id, raw_events_second.value_1 FROM public.raw_events_first_13300000 raw_events_first, public.raw_events_second_13300004 raw_events_second WHERE (raw_events_first.user_id = raw_events_second.user_id)) foo GROUP BY id +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id, value_1_agg) SELECT id, sum(value_1) AS sum FROM (SELECT raw_events_second.user_id AS id, raw_events_second.value_1 FROM public.raw_events_first_13300001 raw_events_first, public.raw_events_second_13300005 raw_events_second WHERE (raw_events_first.user_id = raw_events_second.user_id)) foo GROUP BY id +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id, value_1_agg) SELECT id, sum(value_1) AS sum FROM (SELECT raw_events_second.user_id AS id, raw_events_second.value_1 FROM public.raw_events_first_13300002 raw_events_first, public.raw_events_second_13300006 raw_events_second WHERE (raw_events_first.user_id = raw_events_second.user_id)) foo GROUP BY id +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id, value_1_agg) SELECT id, sum(value_1) AS sum FROM (SELECT raw_events_second.user_id AS id, raw_events_second.value_1 FROM public.raw_events_first_13300003 raw_events_first, public.raw_events_second_13300007 raw_events_second WHERE (raw_events_first.user_id = raw_events_second.user_id)) foo GROUP BY id +DEBUG: ProcessQuery +DEBUG: Plan is router executable +ERROR: duplicate key value violates unique constraint "agg_events_user_id_value_1_agg_key_13300008" +DETAIL: Key (user_id, value_1_agg)=(1, 10) already exists. +CONTEXT: while executing command on localhost:57638 +-- subquery one more level depth +INSERT INTO agg_events + (value_4_agg, + value_1_agg, + user_id) +SELECT v4, + v1, + id +FROM (SELECT SUM(raw_events_second.value_4) AS v4, + SUM(raw_events_first.value_1) AS v1, + raw_events_second.user_id AS id + FROM raw_events_first, + raw_events_second + WHERE raw_events_first.user_id = raw_events_second.user_id + GROUP BY raw_events_second.user_id) AS foo; +DEBUG: StartTransactionCommand +DEBUG: StartTransaction +DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS citus_table_alias (user_id, value_1_agg, value_4_agg) SELECT id, v1, v4 FROM (SELECT sum(raw_events_second.value_4) AS v4, sum(raw_events_first.value_1) AS v1, raw_events_second.user_id AS id FROM public.raw_events_first_13300000 raw_events_first, public.raw_events_second_13300004 raw_events_second WHERE (raw_events_first.user_id = raw_events_second.user_id) GROUP BY raw_events_second.user_id) foo +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id, value_1_agg, value_4_agg) SELECT id, v1, v4 FROM (SELECT sum(raw_events_second.value_4) AS v4, sum(raw_events_first.value_1) AS v1, raw_events_second.user_id AS id FROM public.raw_events_first_13300001 raw_events_first, public.raw_events_second_13300005 raw_events_second WHERE (raw_events_first.user_id = raw_events_second.user_id) GROUP BY raw_events_second.user_id) foo +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300007 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id, value_1_agg, value_4_agg) SELECT id, v1, v4 FROM (SELECT sum(raw_events_second.value_4) AS v4, sum(raw_events_first.value_1) AS v1, raw_events_second.user_id AS id FROM public.raw_events_first_13300002 raw_events_first, public.raw_events_second_13300006 raw_events_second WHERE (raw_events_first.user_id = raw_events_second.user_id) GROUP BY raw_events_second.user_id) foo +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300004 +DEBUG: predicate pruning for shardId 13300005 +DEBUG: predicate pruning for shardId 13300006 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id, value_1_agg, value_4_agg) SELECT id, v1, v4 FROM (SELECT sum(raw_events_second.value_4) AS v4, sum(raw_events_first.value_1) AS v1, raw_events_second.user_id AS id FROM public.raw_events_first_13300003 raw_events_first, public.raw_events_second_13300007 raw_events_second WHERE (raw_events_first.user_id = raw_events_second.user_id) GROUP BY raw_events_second.user_id) foo +DEBUG: ProcessQuery +DEBUG: Plan is router executable +ERROR: duplicate key value violates unique constraint "agg_events_user_id_value_1_agg_key_13300008" +DETAIL: Key (user_id, value_1_agg)=(1, 10) already exists. +CONTEXT: while executing command on localhost:57638 +-- some UPSERTS +INSERT INTO agg_events AS ae + ( + user_id, + value_1_agg, + agg_time + ) +SELECT user_id, + value_1, + time +FROM raw_events_first +ON conflict (user_id, value_1_agg) +DO UPDATE + SET agg_time = EXCLUDED.agg_time + WHERE ae.agg_time < EXCLUDED.agg_time; +DEBUG: StartTransactionCommand +DEBUG: StartTransaction +DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS ae (user_id, value_1_agg, agg_time) SELECT user_id, value_1, "time" FROM public.raw_events_first_13300000 raw_events_first ON CONFLICT(user_id, value_1_agg) DO UPDATE SET agg_time = excluded.agg_time WHERE (ae.agg_time < excluded.agg_time) +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS ae (user_id, value_1_agg, agg_time) SELECT user_id, value_1, "time" FROM public.raw_events_first_13300001 raw_events_first ON CONFLICT(user_id, value_1_agg) DO UPDATE SET agg_time = excluded.agg_time WHERE (ae.agg_time < excluded.agg_time) +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS ae (user_id, value_1_agg, agg_time) SELECT user_id, value_1, "time" FROM public.raw_events_first_13300002 raw_events_first ON CONFLICT(user_id, value_1_agg) DO UPDATE SET agg_time = excluded.agg_time WHERE (ae.agg_time < excluded.agg_time) +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS ae (user_id, value_1_agg, agg_time) SELECT user_id, value_1, "time" FROM public.raw_events_first_13300003 raw_events_first ON CONFLICT(user_id, value_1_agg) DO UPDATE SET agg_time = excluded.agg_time WHERE (ae.agg_time < excluded.agg_time) +DEBUG: ProcessQuery +DEBUG: Plan is router executable +DEBUG: CommitTransactionCommand +DEBUG: CommitTransaction +DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +DEBUG: sent COMMIT over connection 13300008 +DEBUG: sent COMMIT over connection 13300008 +DEBUG: sent COMMIT over connection 13300011 +DEBUG: sent COMMIT over connection 13300011 +DEBUG: sent COMMIT over connection 13300009 +DEBUG: sent COMMIT over connection 13300009 +DEBUG: sent COMMIT over connection 13300010 +DEBUG: sent COMMIT over connection 13300010 +-- upserts with returning +INSERT INTO agg_events AS ae + ( + user_id, + value_1_agg, + agg_time + ) +SELECT user_id, + value_1, + time +FROM raw_events_first +ON conflict (user_id, value_1_agg) +DO UPDATE + SET agg_time = EXCLUDED.agg_time + WHERE ae.agg_time < EXCLUDED.agg_time +RETURNING user_id, value_1_agg; +DEBUG: StartTransactionCommand +DEBUG: StartTransaction +DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS ae (user_id, value_1_agg, agg_time) SELECT user_id, value_1, "time" FROM public.raw_events_first_13300000 raw_events_first ON CONFLICT(user_id, value_1_agg) DO UPDATE SET agg_time = excluded.agg_time WHERE (ae.agg_time < excluded.agg_time) RETURNING ae.user_id, ae.value_1_agg +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS ae (user_id, value_1_agg, agg_time) SELECT user_id, value_1, "time" FROM public.raw_events_first_13300001 raw_events_first ON CONFLICT(user_id, value_1_agg) DO UPDATE SET agg_time = excluded.agg_time WHERE (ae.agg_time < excluded.agg_time) RETURNING ae.user_id, ae.value_1_agg +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS ae (user_id, value_1_agg, agg_time) SELECT user_id, value_1, "time" FROM public.raw_events_first_13300002 raw_events_first ON CONFLICT(user_id, value_1_agg) DO UPDATE SET agg_time = excluded.agg_time WHERE (ae.agg_time < excluded.agg_time) RETURNING ae.user_id, ae.value_1_agg +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS ae (user_id, value_1_agg, agg_time) SELECT user_id, value_1, "time" FROM public.raw_events_first_13300003 raw_events_first ON CONFLICT(user_id, value_1_agg) DO UPDATE SET agg_time = excluded.agg_time WHERE (ae.agg_time < excluded.agg_time) RETURNING ae.user_id, ae.value_1_agg +DEBUG: ProcessQuery +DEBUG: Plan is router executable +DEBUG: CommitTransactionCommand +DEBUG: CommitTransaction +DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +DEBUG: sent COMMIT over connection 13300008 +DEBUG: sent COMMIT over connection 13300008 +DEBUG: sent COMMIT over connection 13300011 +DEBUG: sent COMMIT over connection 13300011 +DEBUG: sent COMMIT over connection 13300009 +DEBUG: sent COMMIT over connection 13300009 +DEBUG: sent COMMIT over connection 13300010 +DEBUG: sent COMMIT over connection 13300010 + user_id | value_1_agg +---------+------------- + 7 | +(1 row) + +-- TODO:: add hll and date_trunc +INSERT INTO agg_events (user_id, value_1_agg) +SELECT + user_id, sum(value_1 + value_2) +FROM + raw_events_first GROUP BY user_id; +DEBUG: StartTransactionCommand +DEBUG: StartTransaction +DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS citus_table_alias (user_id, value_1_agg) SELECT user_id, sum((value_1 + value_2)) AS sum FROM public.raw_events_first_13300000 raw_events_first GROUP BY user_id +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id, value_1_agg) SELECT user_id, sum((value_1 + value_2)) AS sum FROM public.raw_events_first_13300001 raw_events_first GROUP BY user_id +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id, value_1_agg) SELECT user_id, sum((value_1 + value_2)) AS sum FROM public.raw_events_first_13300002 raw_events_first GROUP BY user_id +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id, value_1_agg) SELECT user_id, sum((value_1 + value_2)) AS sum FROM public.raw_events_first_13300003 raw_events_first GROUP BY user_id +DEBUG: ProcessQuery +DEBUG: Plan is router executable +DEBUG: CommitTransactionCommand +DEBUG: CommitTransaction +DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +DEBUG: sent COMMIT over connection 13300008 +DEBUG: sent COMMIT over connection 13300008 +DEBUG: sent COMMIT over connection 13300011 +DEBUG: sent COMMIT over connection 13300011 +DEBUG: sent COMMIT over connection 13300009 +DEBUG: sent COMMIT over connection 13300009 +DEBUG: sent COMMIT over connection 13300010 +DEBUG: sent COMMIT over connection 13300010 +-- FILTER CLAUSE +INSERT INTO agg_events (user_id, value_1_agg) +SELECT + user_id, sum(value_1 + value_2) FILTER (where value_3 = 15) +FROM + raw_events_first GROUP BY user_id; +DEBUG: StartTransactionCommand +DEBUG: StartTransaction +DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS citus_table_alias (user_id, value_1_agg) SELECT user_id, sum((value_1 + value_2)) FILTER (WHERE (value_3 = (15)::double precision)) AS sum FROM public.raw_events_first_13300000 raw_events_first GROUP BY user_id +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id, value_1_agg) SELECT user_id, sum((value_1 + value_2)) FILTER (WHERE (value_3 = (15)::double precision)) AS sum FROM public.raw_events_first_13300001 raw_events_first GROUP BY user_id +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300003 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id, value_1_agg) SELECT user_id, sum((value_1 + value_2)) FILTER (WHERE (value_3 = (15)::double precision)) AS sum FROM public.raw_events_first_13300002 raw_events_first GROUP BY user_id +DEBUG: predicate pruning for shardId 13300000 +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id, value_1_agg) SELECT user_id, sum((value_1 + value_2)) FILTER (WHERE (value_3 = (15)::double precision)) AS sum FROM public.raw_events_first_13300003 raw_events_first GROUP BY user_id +DEBUG: ProcessQuery +DEBUG: Plan is router executable +DEBUG: CommitTransactionCommand +DEBUG: CommitTransaction +DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +DEBUG: sent COMMIT over connection 13300008 +DEBUG: sent COMMIT over connection 13300008 +DEBUG: sent COMMIT over connection 13300011 +DEBUG: sent COMMIT over connection 13300011 +DEBUG: sent COMMIT over connection 13300009 +DEBUG: sent COMMIT over connection 13300009 +DEBUG: sent COMMIT over connection 13300010 +DEBUG: sent COMMIT over connection 13300010 +-- TODO: UUIDs +-- unsupported JOIN +INSERT INTO agg_events + (value_4_agg, + value_1_agg, + user_id) +SELECT v4, + v1, + id +FROM (SELECT SUM(raw_events_second.value_4) AS v4, + SUM(raw_events_first.value_1) AS v1, + raw_events_second.user_id AS id + FROM raw_events_first, + raw_events_second + WHERE raw_events_first.user_id != raw_events_second.user_id + GROUP BY raw_events_second.user_id) AS foo; +DEBUG: StartTransactionCommand +DEBUG: StartTransaction +DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +DEBUG: predicate pruning for shardId 13300001 +DEBUG: predicate pruning for shardId 13300002 +DEBUG: predicate pruning for shardId 13300003 +ERROR: cannot perform distributed planning for the given modification +DETAIL: Select query cannot be pushed down to the worker. +-- INSERT partition column does not match with SELECT partition column +INSERT INTO agg_events + (value_4_agg, + value_1_agg, + user_id) +SELECT v4, + v1, + id +FROM (SELECT SUM(raw_events_second.value_4) AS v4, + SUM(raw_events_first.value_1) AS v1, + raw_events_second.value_3 AS id + FROM raw_events_first, + raw_events_second + WHERE raw_events_first.user_id = raw_events_second.user_id + GROUP BY raw_events_second.value_3) AS foo; +DEBUG: StartTransactionCommand +DEBUG: StartTransaction +DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +ERROR: SELECT query should return bare partition column on the same ordinal position with INSERT query's partition column +-- error cases +-- no part column at all +INSERT INTO raw_events_second (value_1) SELECT value_1 FROM raw_events_first; +DEBUG: StartTransactionCommand +DEBUG: StartTransaction +DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +ERROR: SELECT query should return bare partition column on the same ordinal position with INSERT query's partition column +INSERT INTO raw_events_second (value_1) SELECT user_id FROM raw_events_first; +DEBUG: StartTransactionCommand +DEBUG: StartTransaction +DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +ERROR: SELECT query should return bare partition column on the same ordinal position with INSERT query's partition column +INSERT INTO raw_events_second (user_id) SELECT value_1 FROM raw_events_first; +DEBUG: StartTransactionCommand +DEBUG: StartTransaction +DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +ERROR: SELECT query should return bare partition column on the same ordinal position with INSERT query's partition column +INSERT INTO raw_events_second (user_id) SELECT user_id * 2 FROM raw_events_first; +DEBUG: StartTransactionCommand +DEBUG: StartTransaction +DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +ERROR: SELECT query should return bare partition column on the same ordinal position with INSERT query's partition column +INSERT INTO raw_events_second (user_id) SELECT user_id::bigint FROM raw_events_first; +DEBUG: StartTransactionCommand +DEBUG: StartTransaction +DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +ERROR: SELECT query should return bare partition column on the same ordinal position with INSERT query's partition column +INSERT INTO agg_events (value_3_agg, value_4_agg, value_1_agg, value_2_agg, user_id) SELECT sum(value_3), count(value_4), user_id, sum(value_1), avg(value_2) FROM raw_events_first GROUP BY user_id; +DEBUG: StartTransactionCommand +DEBUG: StartTransaction +DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +ERROR: SELECT query should return bare partition column on the same ordinal position with INSERT query's partition column +INSERT INTO agg_events (value_3_agg, value_4_agg, value_1_agg, value_2_agg, user_id) SELECT sum(value_3), count(value_4), user_id, sum(value_1), value_2 FROM raw_events_first GROUP BY user_id, value_2; +DEBUG: StartTransactionCommand +DEBUG: StartTransaction +DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +ERROR: SELECT query should return bare partition column on the same ordinal position with INSERT query's partition column diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 9f8862be9..1f236f22e 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -29,6 +29,8 @@ test: multi_create_table_constraints test: multi_master_protocol test: multi_load_data +test: multi_insert_select + # ---------- # Miscellaneous tests to check our query planning behavior # ---------- diff --git a/src/test/regress/sql/multi_insert_select.sql b/src/test/regress/sql/multi_insert_select.sql new file mode 100644 index 000000000..44e5af07b --- /dev/null +++ b/src/test/regress/sql/multi_insert_select.sql @@ -0,0 +1,256 @@ +-- +-- MULTI_INSERT_SELECT +-- + +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 13300000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 13300000; + + +CREATE TABLE raw_events_first (user_id int, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint, UNIQUE(user_id, value_1)); +SELECT master_create_distributed_table('raw_events_first', 'user_id', 'hash'); +SELECT master_create_worker_shards('raw_events_first', 4, 2); + +CREATE TABLE raw_events_second (user_id int, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint, UNIQUE(user_id, value_1)); +SELECT master_create_distributed_table('raw_events_second', 'user_id', 'hash'); +SELECT master_create_worker_shards('raw_events_second', 4, 2); + + +CREATE TABLE agg_events (user_id int, value_1_agg int, value_2_agg int, value_3_agg float, value_4_agg bigint, agg_time timestamp, UNIQUE(user_id, value_1_agg)); +SELECT master_create_distributed_table('agg_events', 'user_id', 'hash'); +SELECT master_create_worker_shards('agg_events', 4, 2); + +-- make tables as co-located +UPDATE pg_dist_partition SET colocationid = 100000 WHERE logicalrelid IN ('raw_events_first', 'raw_events_second', 'agg_events'); + + +INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES + (1, now(), 10, 100, 1000.1, 10000); +INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES + (2, now(), 20, 200, 2000.1, 20000); +INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES + (3, now(), 30, 300, 3000.1, 30000); +INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES + (4, now(), 40, 400, 4000.1, 40000); +INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES + (5, now(), 50, 500, 5000.1, 50000); +INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES + (6, now(), 60, 600, 6000.1, 60000); + +SET client_min_messages TO DEBUG4; + +-- raw table to raw table +INSERT INTO raw_events_second SELECT * FROM raw_events_first; + +-- see that our first multi shard INSERT...SELECT works expected +SET client_min_messages TO INFO; +SELECT + raw_events_first.user_id +FROM + raw_events_first, raw_events_second +WHERE + raw_events_first.user_id = raw_events_second.user_id; + +-- see that we get unique vialitons +INSERT INTO raw_events_second SELECT * FROM raw_events_first; + +-- add one more row +INSERT INTO raw_events_first (user_id, time) VALUES + (7, now()); + +-- try a single shard query +SET client_min_messages TO DEBUG4; +INSERT INTO raw_events_second (user_id, time) SELECT user_id, time FROM raw_events_first WHERE user_id = 7; + + +SET client_min_messages TO INFO; + +-- add one more row +INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES + (8, now(), 80, 800, 8000, 80000); + + +-- reorder columns +SET client_min_messages TO DEBUG4; +INSERT INTO raw_events_second (value_2, value_1, value_3, value_4, user_id, time) +SELECT + value_2, value_1, value_3, value_4, user_id, time +FROM + raw_events_first +WHERE + user_id = 8; + + +-- add one more row +SET client_min_messages TO INFO; +INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES + (9, now(), 90, 900, 9000, 90000); + + +-- show that RETURNING also works +SET client_min_messages TO DEBUG4; +INSERT INTO raw_events_second (user_id, value_1, value_3) +SELECT + user_id, value_1, value_3 +FROM + raw_events_first +WHERE + value_3 = 9000 +RETURNING *; + +-- hits two shards +INSERT INTO raw_events_second (user_id, value_1, value_3) +SELECT + user_id, value_1, value_3 +FROM + raw_events_first +WHERE + user_id = 9 OR user_id = 16 +RETURNING *; + + +-- now do some aggregations +INSERT INTO agg_events +SELECT + user_id, sum(value_1), avg(value_2), sum(value_3), count(value_4) +FROM + raw_events_first +GROUP BY + user_id; + +-- group by column not exists on the SELECT target list +-- TODO: there is a bug on RETURNING +INSERT INTO agg_events (value_3_agg, value_4_agg, value_1_agg, user_id) +SELECT + sum(value_3), count(value_4), sum(value_1), user_id +FROM + raw_events_first +GROUP BY + value_2, user_id +RETURNING *; + + +-- some subquery tests +INSERT INTO agg_events + (value_1_agg, + user_id) +SELECT SUM(value_1), + id +FROM (SELECT raw_events_second.user_id AS id, + raw_events_second.value_1 + FROM raw_events_first, + raw_events_second + WHERE raw_events_first.user_id = raw_events_second.user_id) AS foo +GROUP BY id; + + +-- subquery one more level depth +INSERT INTO agg_events + (value_4_agg, + value_1_agg, + user_id) +SELECT v4, + v1, + id +FROM (SELECT SUM(raw_events_second.value_4) AS v4, + SUM(raw_events_first.value_1) AS v1, + raw_events_second.user_id AS id + FROM raw_events_first, + raw_events_second + WHERE raw_events_first.user_id = raw_events_second.user_id + GROUP BY raw_events_second.user_id) AS foo; + + +-- some UPSERTS +INSERT INTO agg_events AS ae + ( + user_id, + value_1_agg, + agg_time + ) +SELECT user_id, + value_1, + time +FROM raw_events_first +ON conflict (user_id, value_1_agg) +DO UPDATE + SET agg_time = EXCLUDED.agg_time + WHERE ae.agg_time < EXCLUDED.agg_time; + +-- upserts with returning +INSERT INTO agg_events AS ae + ( + user_id, + value_1_agg, + agg_time + ) +SELECT user_id, + value_1, + time +FROM raw_events_first +ON conflict (user_id, value_1_agg) +DO UPDATE + SET agg_time = EXCLUDED.agg_time + WHERE ae.agg_time < EXCLUDED.agg_time +RETURNING user_id, value_1_agg; + + + +-- TODO:: add hll and date_trunc +INSERT INTO agg_events (user_id, value_1_agg) +SELECT + user_id, sum(value_1 + value_2) +FROM + raw_events_first GROUP BY user_id; + +-- FILTER CLAUSE +INSERT INTO agg_events (user_id, value_1_agg) +SELECT + user_id, sum(value_1 + value_2) FILTER (where value_3 = 15) +FROM + raw_events_first GROUP BY user_id; + + +-- TODO: UUIDs + +-- unsupported JOIN +INSERT INTO agg_events + (value_4_agg, + value_1_agg, + user_id) +SELECT v4, + v1, + id +FROM (SELECT SUM(raw_events_second.value_4) AS v4, + SUM(raw_events_first.value_1) AS v1, + raw_events_second.user_id AS id + FROM raw_events_first, + raw_events_second + WHERE raw_events_first.user_id != raw_events_second.user_id + GROUP BY raw_events_second.user_id) AS foo; + + +-- INSERT partition column does not match with SELECT partition column +INSERT INTO agg_events + (value_4_agg, + value_1_agg, + user_id) +SELECT v4, + v1, + id +FROM (SELECT SUM(raw_events_second.value_4) AS v4, + SUM(raw_events_first.value_1) AS v1, + raw_events_second.value_3 AS id + FROM raw_events_first, + raw_events_second + WHERE raw_events_first.user_id = raw_events_second.user_id + GROUP BY raw_events_second.value_3) AS foo; + +-- error cases +-- no part column at all +INSERT INTO raw_events_second (value_1) SELECT value_1 FROM raw_events_first; +INSERT INTO raw_events_second (value_1) SELECT user_id FROM raw_events_first; +INSERT INTO raw_events_second (user_id) SELECT value_1 FROM raw_events_first; +INSERT INTO raw_events_second (user_id) SELECT user_id * 2 FROM raw_events_first; +INSERT INTO raw_events_second (user_id) SELECT user_id::bigint FROM raw_events_first; +INSERT INTO agg_events (value_3_agg, value_4_agg, value_1_agg, value_2_agg, user_id) SELECT sum(value_3), count(value_4), user_id, sum(value_1), avg(value_2) FROM raw_events_first GROUP BY user_id; +INSERT INTO agg_events (value_3_agg, value_4_agg, value_1_agg, value_2_agg, user_id) SELECT sum(value_3), count(value_4), user_id, sum(value_1), value_2 FROM raw_events_first GROUP BY user_id, value_2;