diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 675ac9d0d..46acf055e 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -44,6 +44,7 @@ #include "optimizer/clauses.h" #include "optimizer/predtest.h" #include "optimizer/restrictinfo.h" +#include "optimizer/var.h" #include "parser/parsetree.h" #include "storage/lock.h" #include "utils/elog.h" @@ -90,7 +91,7 @@ static bool UpdateRelationNames(Node *node, static Job * RouterQueryJob(Query *query, Task *task, List *placementList); static bool MultiRouterPlannableQuery(Query *query, MultiExecutorType taskExecutorType, RelationRestrictionContext *restrictionContext); - +static bool InsertSelectQuery(Query *query); /* * MultiRouterPlanCreate creates a physical plan for given query. The created plan is @@ -1558,3 +1559,190 @@ MultiRouterPlannableQuery(Query *query, MultiExecutorType taskExecutorType, return true; } + + +/* + * ReorderInsertSelectTargetListsIfExists reorders the target lists of INSERT/SELECT + * query which is required for deparsing purposes. The reordered query is returned. + * + * The necessity for this function comes from the fact that ruleutils.c is not supposed to be + * used on "rewritten" queries (i.e. ones that have been passed through QueryRewrite()). + * Query rewriting is the process in which views and such are expanded, + * and, INSERT/UPDATE targetlists are reordered to match the physical order, + * defaults etc. For the details of reordeing, see transformInsertRow(). + */ +Query * +ReorderInsertSelectTargetListsIfExists(Query *originalQuery) +{ + RangeTblEntry *insertRte = NULL; + RangeTblEntry *subqueryRte = NULL; + Query *subquery = NULL; + ListCell *insertTargetEntryCell; + List *newSubqueryTargetlist = NIL; + List *newInsertTargetlist = NIL; + int resno = 1; + Index insertTableId = 1; + int updatedSubqueryEntryCount = 0; + Oid insertRelationId = InvalidOid; + int subqueryTargetLength = 0; + + /* we only apply the reording for INSERT ... SELECT queries */ + if (!InsertSelectQuery(originalQuery)) + { + return originalQuery; + } + + insertRte = linitial(originalQuery->rtable); + subqueryRte = lsecond(originalQuery->rtable); + subquery = subqueryRte->subquery; + + insertRelationId = insertRte->relid; + + /* + * We implement the following algorithm for the reoderding: + * - Iterate over the INSERT target list entries + * - If the target entry includes a Var, find the corresponding + * SELECT target entry on the original query and update resno + * - If the target entry does not include a Var (i.e., defaults), + * create new target entry and add that to SELECT target list + * - Create a new INSERT target entry with respect to the new + * SELECT target entry created. + */ + foreach(insertTargetEntryCell, originalQuery->targetList) + { + TargetEntry *oldInsertTargetEntry = lfirst(insertTargetEntryCell); + TargetEntry *newInsertTargetEntry = NULL; + Var *newInsertVar = NULL; + TargetEntry *newSubqueryTargetEntry = NULL; + List *targetVarList = NULL; + int targetVarCount = 0; + AttrNumber originalAttrNo = get_attnum(insertRelationId, + oldInsertTargetEntry->resname); + + + /* see transformInsertRow() for the details */ + if (IsA(oldInsertTargetEntry->expr, ArrayRef) || + IsA(oldInsertTargetEntry->expr, FieldStore)) + { + ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot plan distributed INSERT INTO .. SELECT query"), + errhint("Do not use array references and field stores " + "on the INSERT target list."))); + } + + /* + * It is safe to pull Var clause and ignore the coercions since that + * are already going to be added on the workers implicitly. + */ + targetVarList = pull_var_clause((Node *) oldInsertTargetEntry->expr, + PVC_RECURSE_AGGREGATES, + PVC_RECURSE_PLACEHOLDERS); + targetVarCount = list_length(targetVarList); + + /* a single INSERT target entry cannot have more than one Var */ + Assert(targetVarCount <= 1); + + if (targetVarCount == 1) + { + Var *oldInsertVar = (Var *) linitial(targetVarList); + TargetEntry *oldSubqueryTle = list_nth(subquery->targetList, + oldInsertVar->varattno - 1); + + newSubqueryTargetEntry = copyObject(oldSubqueryTle); + + newSubqueryTargetEntry->resno = resno; + newSubqueryTargetlist = lappend(newSubqueryTargetlist, + newSubqueryTargetEntry); + + updatedSubqueryEntryCount++; + } + else + { + newSubqueryTargetEntry = makeTargetEntry(oldInsertTargetEntry->expr, + resno, + oldInsertTargetEntry->resname, + oldInsertTargetEntry->resjunk); + newSubqueryTargetlist = lappend(newSubqueryTargetlist, + newSubqueryTargetEntry); + } + + newInsertVar = makeVar(insertTableId, originalAttrNo, + exprType( + (Node *) newSubqueryTargetEntry->expr), + exprTypmod( + (Node *) newSubqueryTargetEntry->expr), + exprCollation( + (Node *) newSubqueryTargetEntry->expr), + 0); + newInsertTargetEntry = makeTargetEntry((Expr *) newInsertVar, originalAttrNo, + oldInsertTargetEntry->resname, + oldInsertTargetEntry->resjunk); + + newInsertTargetlist = lappend(newInsertTargetlist, newInsertTargetEntry); + resno++; + } + + /* + * if there are any remaining target list entries (i.e., GROUP BY column not on the + * target list of subquery), update the remaining resnos. + */ + subqueryTargetLength = list_length(subquery->targetList); + if (subqueryTargetLength != updatedSubqueryEntryCount) + { + int targetEntryIndex = updatedSubqueryEntryCount; + + for (; targetEntryIndex < subqueryTargetLength; ++targetEntryIndex) + { + TargetEntry *oldSubqueryTle = list_nth(subquery->targetList, + targetEntryIndex); + TargetEntry *newSubqueryTargetEntry = copyObject(oldSubqueryTle); + + Assert(newSubqueryTargetEntry->resjunk == true); + + newSubqueryTargetEntry->resno = resno; + newSubqueryTargetlist = lappend(newSubqueryTargetlist, + newSubqueryTargetEntry); + + resno++; + } + } + + originalQuery->targetList = newInsertTargetlist; + subquery->targetList = newSubqueryTargetlist; + + return NULL; +} + + +/* + * InsertSelectQuery returns true when the input query + * is INSERT INTO ... SELECT kind of query. + */ +static bool +InsertSelectQuery(Query *query) +{ + CmdType commandType = query->commandType; + List *rangeTableList = query->rtable; + RangeTblEntry *subqueryRte = NULL; + Query *subquery = NULL; + + if (commandType != CMD_INSERT) + { + return false; + } + + rangeTableList = query->rtable; + if (list_length(rangeTableList) < 2) + { + return false; + } + + subqueryRte = lsecond(query->rtable); + subquery = subqueryRte->subquery; + if (subquery == NULL) + { + return false; + } + + return true; +} diff --git a/src/backend/distributed/test/deparse_shard_query.c b/src/backend/distributed/test/deparse_shard_query.c new file mode 100644 index 000000000..4d586d3a0 --- /dev/null +++ b/src/backend/distributed/test/deparse_shard_query.c @@ -0,0 +1,70 @@ +/*------------------------------------------------------------------------- + * + * test/src/depase_shard_query.c + * + * This file contains functions to exercise deparsing of INSERT .. SELECT queries + * for distributed tables. + * + * Copyright (c) 2014-2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "c.h" +#include "fmgr.h" + +#include + +#include "catalog/pg_type.h" +#include "distributed/master_protocol.h" +#include "distributed/citus_ruleutils.h" +#include "distributed/multi_router_planner.h" +#include "distributed/test_helper_functions.h" /* IWYU pragma: keep */ +#include "lib/stringinfo.h" +#include "nodes/makefuncs.h" +#include "nodes/nodes.h" +#include "nodes/parsenodes.h" +#include "nodes/pg_list.h" +#include "nodes/value.h" +#include "tcop/tcopprot.h" +#include "utils/array.h" +#include "utils/builtins.h" +#include "utils/palloc.h" + + +/* declarations for dynamic loading */ +PG_FUNCTION_INFO_V1(deparse_shard_query_test); + + +Datum +deparse_shard_query_test(PG_FUNCTION_ARGS) +{ + text *queryString = PG_GETARG_TEXT_P(0); + + char *queryStringChar = text_to_cstring(queryString); + List *parseTreeList = pg_parse_query(queryStringChar); + ListCell *parseTreeCell = NULL; + + foreach(parseTreeCell, parseTreeList) + { + Node *parsetree = (Node *) lfirst(parseTreeCell); + ListCell *queryTreeCell = NULL; + + List *queryTreeList = pg_analyze_and_rewrite(parsetree, queryStringChar, + NULL, 0); + + foreach(queryTreeCell, queryTreeList) + { + Query *query = lfirst(queryTreeCell); + StringInfo buffer = makeStringInfo(); + + ReorderInsertSelectTargetListsIfExists(query); + deparse_shard_query(query, InvalidOid, 0, buffer); + + elog(INFO, "query: %s", buffer->data); + } + } + + PG_RETURN_VOID(); +} diff --git a/src/backend/distributed/test/test_helper_functions.c b/src/backend/distributed/test/test_helper_functions.c index 43f1a8fcb..e98a1f7f3 100644 --- a/src/backend/distributed/test/test_helper_functions.c +++ b/src/backend/distributed/test/test_helper_functions.c @@ -1,6 +1,6 @@ /*------------------------------------------------------------------------- * - * test/src/test_helper_functions.c + * le * test/src/test_helper_functions.c * * This file contains helper functions used in many Citus tests. * diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index cb558acd8..5d380575a 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -29,5 +29,6 @@ extern MultiPlan * MultiRouterPlanCreate(Query *originalQuery, Query *query, MultiExecutorType taskExecutorType, RelationRestrictionContext *restrictionContext); extern void ErrorIfModifyQueryNotSupported(Query *queryTree); +extern Query * ReorderInsertSelectTargetListsIfExists(Query *originalQuery); #endif /* MULTI_ROUTER_PLANNER_H */ diff --git a/src/test/regress/expected/multi_deparse_shard_query.out b/src/test/regress/expected/multi_deparse_shard_query.out new file mode 100644 index 000000000..051d2dbd0 --- /dev/null +++ b/src/test/regress/expected/multi_deparse_shard_query.out @@ -0,0 +1,428 @@ +-- +-- MULTI_DEPARSE_SHARD_QUERY +-- +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 13100000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 13100000; +CREATE FUNCTION deparse_shard_query_test(text) + RETURNS VOID + AS 'citus' + LANGUAGE C STRICT; +-- create the first table +CREATE TABLE raw_events_1 + (tenant_id bigint, + value_1 int, + value_2 int, + value_3 float, + value_4 bigint, + value_5 text, + value_6 int DEfAULT 10, + value_7 int, + event_at date DEfAULT now() + ); +SELECT master_create_distributed_table('raw_events_1', 'tenant_id', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_worker_shards('raw_events_1', 4, 1); + master_create_worker_shards +----------------------------- + +(1 row) + +-- create the first table +CREATE TABLE raw_events_2 + (tenant_id bigint, + value_1 int, + value_2 int, + value_3 float, + value_4 bigint, + value_5 text, + value_6 float DEfAULT (random()*100)::float, + value_7 int, + event_at date DEfAULT now() + ); +SELECT master_create_distributed_table('raw_events_2', 'tenant_id', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_worker_shards('raw_events_2', 4, 1); + master_create_worker_shards +----------------------------- + +(1 row) + +CREATE TABLE aggregated_events + (tenant_id bigint, + sum_value_1 bigint, + average_value_2 float, + average_value_3 float, + sum_value_4 bigint, + sum_value_5 float, + average_value_6 int, + rollup_hour date); +SELECT master_create_distributed_table('aggregated_events', 'tenant_id', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_worker_shards('aggregated_events', 4, 1); + master_create_worker_shards +----------------------------- + +(1 row) + +-- start with very simple examples on a single table +SELECT deparse_shard_query_test(' +INSERT INTO raw_events_1 +SELECT * FROM raw_events_1; +'); +INFO: query: INSERT INTO public.raw_events_1 (tenant_id, value_1, value_2, value_3, value_4, value_5, value_6, value_7, event_at) SELECT tenant_id, value_1, value_2, value_3, value_4, value_5, value_6, value_7, event_at FROM public.raw_events_1 + deparse_shard_query_test +-------------------------- + +(1 row) + +SELECT deparse_shard_query_test(' +INSERT INTO raw_events_1(tenant_id, value_4) +SELECT + tenant_id, value_4 +FROM + raw_events_1; +'); +INFO: query: INSERT INTO public.raw_events_1 (tenant_id, value_4, value_6, event_at) SELECT tenant_id, value_4, 10 AS value_6, (now())::date AS event_at FROM public.raw_events_1 + deparse_shard_query_test +-------------------------- + +(1 row) + +-- now that shuffle columns a bit on a single table +SELECT deparse_shard_query_test(' +INSERT INTO raw_events_1(value_5, value_2, tenant_id, value_4) +SELECT + value_2::text, value_5::int, tenant_id, value_4 +FROM + raw_events_1; +'); +INFO: query: INSERT INTO public.raw_events_1 (tenant_id, value_2, value_4, value_5, value_6, event_at) SELECT tenant_id, (value_5)::integer AS value_5, value_4, (value_2)::text AS value_2, 10 AS value_6, (now())::date AS event_at FROM public.raw_events_1 + deparse_shard_query_test +-------------------------- + +(1 row) + +-- same test on two different tables +SELECT deparse_shard_query_test(' +INSERT INTO raw_events_1(value_5, value_2, tenant_id, value_4) +SELECT + value_2::text, value_5::int, tenant_id, value_4 +FROM + raw_events_2; +'); +INFO: query: INSERT INTO public.raw_events_1 (tenant_id, value_2, value_4, value_5, value_6, event_at) SELECT tenant_id, (value_5)::integer AS value_5, value_4, (value_2)::text AS value_2, 10 AS value_6, (now())::date AS event_at FROM public.raw_events_2 + deparse_shard_query_test +-------------------------- + +(1 row) + +-- lets do some simple aggregations +SELECT deparse_shard_query_test(E' +INSERT INTO aggregated_events (tenant_id, rollup_hour, sum_value_1, average_value_3, average_value_6, sum_value_4) +SELECT + tenant_id, date_trunc(\'hour\', event_at) , sum(value_1), avg(value_3), avg(value_6), sum(value_4) +FROM + raw_events_1 +GROUP BY + tenant_id, date_trunc(\'hour\', event_at) +'); +INFO: query: INSERT INTO public.aggregated_events (tenant_id, sum_value_1, average_value_3, sum_value_4, average_value_6, rollup_hour) SELECT tenant_id, sum(value_1) AS sum, avg(value_3) AS avg, sum(value_4) AS sum, avg(value_6) AS avg, date_trunc('hour'::text, (event_at)::timestamp with time zone) AS date_trunc FROM public.raw_events_1 GROUP BY tenant_id, (date_trunc('hour'::text, (event_at)::timestamp with time zone)) + deparse_shard_query_test +-------------------------- + +(1 row) + +-- also some subqueries, JOINS with a complicated target lists +-- a simple JOIN +SELECT deparse_shard_query_test(' +INSERT INTO raw_events_1 (value_3, tenant_id) +SELECT + raw_events_2.value_3, raw_events_1.tenant_id +FROM + raw_events_1, raw_events_2 +WHERE + raw_events_1.tenant_id = raw_events_2.tenant_id; +'); +INFO: query: INSERT INTO public.raw_events_1 (tenant_id, value_3, value_6, event_at) SELECT raw_events_1.tenant_id, raw_events_2.value_3, 10 AS value_6, (now())::date AS event_at FROM public.raw_events_1, public.raw_events_2 WHERE (raw_events_1.tenant_id = raw_events_2.tenant_id) + deparse_shard_query_test +-------------------------- + +(1 row) + +-- join with group by +SELECT deparse_shard_query_test(' +INSERT INTO raw_events_1 (value_3, tenant_id) +SELECT + max(raw_events_2.value_3), avg(raw_events_1.value_3) +FROM + raw_events_1, raw_events_2 +WHERE + raw_events_1.tenant_id = raw_events_2.tenant_id GROUP BY raw_events_1.event_at +'); +INFO: query: INSERT INTO public.raw_events_1 (tenant_id, value_3, value_6, event_at) SELECT avg(raw_events_1.value_3) AS avg, max(raw_events_2.value_3) AS max, 10 AS value_6, (now())::date AS event_at FROM public.raw_events_1, public.raw_events_2 WHERE (raw_events_1.tenant_id = raw_events_2.tenant_id) GROUP BY raw_events_1.event_at + deparse_shard_query_test +-------------------------- + +(1 row) + +-- a more complicated JOIN +SELECT deparse_shard_query_test(' +INSERT INTO aggregated_events (sum_value_4, tenant_id) +SELECT + max(r1.value_4), r3.tenant_id +FROM + raw_events_1 r1, raw_events_2 r2, raw_events_1 r3 +WHERE + r1.tenant_id = r2.tenant_id AND r2.tenant_id = r3.tenant_id +GROUP BY + r1.value_1, r3.tenant_id, r2.event_at +ORDER BY + r2.event_at DESC; +'); +INFO: query: INSERT INTO public.aggregated_events (tenant_id, sum_value_4) SELECT r3.tenant_id, max(r1.value_4) AS max FROM public.raw_events_1 r1, public.raw_events_2 r2, public.raw_events_1 r3 WHERE ((r1.tenant_id = r2.tenant_id) AND (r2.tenant_id = r3.tenant_id)) GROUP BY r1.value_1, r3.tenant_id, r2.event_at ORDER BY r2.event_at DESC + deparse_shard_query_test +-------------------------- + +(1 row) + +-- queries with CTEs are supported +SELECT deparse_shard_query_test(' +WITH first_tenant AS (SELECT event_at, value_5, tenant_id FROM raw_events_1) +INSERT INTO aggregated_events (rollup_hour, sum_value_5, tenant_id) +SELECT + event_at, sum(value_5::int), tenant_id +FROM + raw_events_1 +GROUP BY + event_at, tenant_id; +'); +INFO: query: WITH first_tenant AS (SELECT raw_events_1.event_at, raw_events_1.value_5, raw_events_1.tenant_id FROM public.raw_events_1) INSERT INTO public.aggregated_events (tenant_id, sum_value_5, rollup_hour) SELECT tenant_id, sum((value_5)::integer) AS sum, event_at FROM public.raw_events_1 GROUP BY event_at, tenant_id + deparse_shard_query_test +-------------------------- + +(1 row) + +SELECT deparse_shard_query_test(' +WITH first_tenant AS (SELECT event_at, value_5, tenant_id FROM raw_events_1) +INSERT INTO aggregated_events (sum_value_5, tenant_id) +SELECT + sum(value_5::int), tenant_id +FROM + raw_events_1 +GROUP BY + event_at, tenant_id; +'); +INFO: query: WITH first_tenant AS (SELECT raw_events_1.event_at, raw_events_1.value_5, raw_events_1.tenant_id FROM public.raw_events_1) INSERT INTO public.aggregated_events (tenant_id, sum_value_5) SELECT tenant_id, sum((value_5)::integer) AS sum FROM public.raw_events_1 GROUP BY event_at, tenant_id + deparse_shard_query_test +-------------------------- + +(1 row) + +SELECT deparse_shard_query_test(' +INSERT INTO aggregated_events (sum_value_1, sum_value_5, tenant_id) +WITH RECURSIVE hierarchy as ( + SELECT value_1, 1 AS LEVEL, tenant_id + FROM raw_events_1 + WHERE tenant_id = 1 + UNION + SELECT re.value_2, (h.level+1), re.tenant_id + FROM hierarchy h JOIN raw_events_1 re + ON (h.tenant_id = re.tenant_id AND + h.value_1 = re.value_6)) +SELECT * FROM hierarchy WHERE LEVEL <= 2; +'); +INFO: query: INSERT INTO public.aggregated_events (tenant_id, sum_value_1, sum_value_5) WITH RECURSIVE hierarchy AS (SELECT raw_events_1.value_1, 1 AS level, raw_events_1.tenant_id FROM public.raw_events_1 WHERE (raw_events_1.tenant_id = 1) UNION SELECT re.value_2, (h.level + 1), re.tenant_id FROM (hierarchy h JOIN public.raw_events_1 re ON (((h.tenant_id = re.tenant_id) AND (h.value_1 = re.value_6))))) SELECT tenant_id, value_1, level FROM hierarchy WHERE (level <= 2) + deparse_shard_query_test +-------------------------- + +(1 row) + +SELECT deparse_shard_query_test(' +INSERT INTO aggregated_events (sum_value_1) +SELECT + DISTINCT value_1 +FROM + raw_events_1; +'); +INFO: query: INSERT INTO public.aggregated_events (sum_value_1) SELECT DISTINCT value_1 FROM public.raw_events_1 + deparse_shard_query_test +-------------------------- + +(1 row) + +-- many filters suffled +SELECT deparse_shard_query_test(E' +INSERT INTO aggregated_events (sum_value_5, sum_value_1, tenant_id) +SELECT value_3, value_2, tenant_id + FROM raw_events_1 + WHERE (value_5 like \'%s\' or value_5 like \'%a\') and (tenant_id = 1) and (value_6 < 3000 or value_3 > 8000); +'); +INFO: query: INSERT INTO public.aggregated_events (tenant_id, sum_value_1, sum_value_5) SELECT tenant_id, value_2, value_3 FROM public.raw_events_1 WHERE (((value_5 ~~ '%s'::text) OR (value_5 ~~ '%a'::text)) AND (tenant_id = 1) AND ((value_6 < 3000) OR (value_3 > (8000)::double precision))) + deparse_shard_query_test +-------------------------- + +(1 row) + +SELECT deparse_shard_query_test(E' +INSERT INTO aggregated_events (sum_value_5, tenant_id) +SELECT rank() OVER (PARTITION BY tenant_id ORDER BY value_6), tenant_id + FROM raw_events_1 + WHERE event_at = now(); +'); +INFO: query: INSERT INTO public.aggregated_events (tenant_id, sum_value_5) SELECT tenant_id, rank() OVER (PARTITION BY tenant_id ORDER BY value_6) AS rank FROM public.raw_events_1 WHERE (event_at = now()) + deparse_shard_query_test +-------------------------- + +(1 row) + +SELECT deparse_shard_query_test(E' +INSERT INTO aggregated_events (sum_value_5, tenant_id, sum_value_4) +SELECT random(), int4eq(1, max(value_1))::int, value_6 + FROM raw_events_1 + WHERE event_at = now() + GROUP BY event_at, value_7, value_6; +'); +INFO: query: INSERT INTO public.aggregated_events (tenant_id, sum_value_4, sum_value_5) SELECT (int4eq(1, max(value_1)))::integer AS int4eq, value_6, random() AS random FROM public.raw_events_1 WHERE (event_at = now()) GROUP BY event_at, value_7, value_6 + deparse_shard_query_test +-------------------------- + +(1 row) + +SELECT deparse_shard_query_test(' +INSERT INTO aggregated_events (sum_value_1, tenant_id) +SELECT + count(DISTINCT CASE + WHEN + value_1 > 100 + THEN + tenant_id + ELSE + value_6 + END) as c, + max(tenant_id) + FROM + raw_events_1; +'); +INFO: query: INSERT INTO public.aggregated_events (tenant_id, sum_value_1) SELECT max(tenant_id) AS max, count(DISTINCT CASE WHEN (value_1 > 100) THEN tenant_id ELSE (value_6)::bigint END) AS c FROM public.raw_events_1 + deparse_shard_query_test +-------------------------- + +(1 row) + +SELECT deparse_shard_query_test(' +INSERT INTO raw_events_1(value_7, value_1, tenant_id) +SELECT + value_7, value_1, tenant_id +FROM + (SELECT + tenant_id, value_2 as value_7, value_1 + FROM + raw_events_2 + ) as foo +'); +INFO: query: INSERT INTO public.raw_events_1 (tenant_id, value_1, value_6, value_7, event_at) SELECT tenant_id, value_1, 10 AS value_6, value_7, (now())::date AS event_at FROM (SELECT raw_events_2.tenant_id, raw_events_2.value_2 AS value_7, raw_events_2.value_1 FROM public.raw_events_2) foo + deparse_shard_query_test +-------------------------- + +(1 row) + +SELECT deparse_shard_query_test(E' +INSERT INTO aggregated_events(sum_value_1, tenant_id, sum_value_5) +SELECT + sum(value_1), tenant_id, sum(value_5::bigint) +FROM + (SELECT + raw_events_1.event_at, raw_events_2.tenant_id, raw_events_2.value_5, raw_events_1.value_1 + FROM + raw_events_2, raw_events_1 + WHERE + raw_events_1.tenant_id = raw_events_2.tenant_id + ) as foo +GROUP BY + tenant_id, date_trunc(\'hour\', event_at) +'); +INFO: query: INSERT INTO public.aggregated_events (tenant_id, sum_value_1, sum_value_5) SELECT tenant_id, sum(value_1) AS sum, sum((value_5)::bigint) AS sum FROM (SELECT raw_events_1.event_at, raw_events_2.tenant_id, raw_events_2.value_5, raw_events_1.value_1 FROM public.raw_events_2, public.raw_events_1 WHERE (raw_events_1.tenant_id = raw_events_2.tenant_id)) foo GROUP BY tenant_id, (date_trunc('hour'::text, (event_at)::timestamp with time zone)) + deparse_shard_query_test +-------------------------- + +(1 row) + +SELECT deparse_shard_query_test(E' +INSERT INTO raw_events_2(tenant_id, value_1, value_2, value_3, value_4) +SELECT + tenant_id, value_1, value_2, value_3, value_4 +FROM + (SELECT + value_2, value_4, tenant_id, value_1, value_3 + FROM + raw_events_1 + ) as foo +'); +INFO: query: INSERT INTO public.raw_events_2 (tenant_id, value_1, value_2, value_3, value_4, value_6, event_at) SELECT tenant_id, value_1, value_2, value_3, value_4, (random() * (100)::double precision) AS value_6, (now())::date AS event_at FROM (SELECT raw_events_1.value_2, raw_events_1.value_4, raw_events_1.tenant_id, raw_events_1.value_1, raw_events_1.value_3 FROM public.raw_events_1) foo + deparse_shard_query_test +-------------------------- + +(1 row) + +SELECT deparse_shard_query_test(E' +INSERT INTO raw_events_2(tenant_id, value_1, value_4, value_2, value_3) +SELECT + * +FROM + (SELECT + value_2, value_4, tenant_id, value_1, value_3 + FROM + raw_events_1 + ) as foo +'); +INFO: query: INSERT INTO public.raw_events_2 (tenant_id, value_1, value_2, value_3, value_4, value_6, event_at) SELECT value_2, value_4, value_1, value_3, tenant_id, (random() * (100)::double precision) AS value_6, (now())::date AS event_at FROM (SELECT raw_events_1.value_2, raw_events_1.value_4, raw_events_1.tenant_id, raw_events_1.value_1, raw_events_1.value_3 FROM public.raw_events_1) foo + deparse_shard_query_test +-------------------------- + +(1 row) + +-- use a column multiple times +SELECT deparse_shard_query_test(' +INSERT INTO raw_events_1(tenant_id, value_7, value_4) +SELECT + tenant_id, value_7, value_7 +FROM + raw_events_1 +ORDER BY + value_2, value_1; +'); +INFO: query: INSERT INTO public.raw_events_1 (tenant_id, value_4, value_6, value_7, event_at) SELECT tenant_id, value_7, 10 AS value_6, value_7, (now())::date AS event_at FROM public.raw_events_1 ORDER BY value_2, value_1 + deparse_shard_query_test +-------------------------- + +(1 row) + +-- test dropped table as well +ALTER TABLE raw_events_1 DROP COLUMN value_5; +NOTICE: using one-phase commit for distributed DDL commands +HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' +SELECT deparse_shard_query_test(' +INSERT INTO raw_events_1(tenant_id, value_7, value_4) +SELECT + tenant_id, value_7, value_4 +FROM + raw_events_1; +'); +INFO: query: INSERT INTO public.raw_events_1 (tenant_id, value_4, value_6, value_7, event_at) SELECT tenant_id, value_4, 10 AS value_6, value_7, (now())::date AS event_at FROM public.raw_events_1 + deparse_shard_query_test +-------------------------- + +(1 row) + diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 8c2b2acd7..9f8862be9 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -32,6 +32,7 @@ test: multi_load_data # ---------- # Miscellaneous tests to check our query planning behavior # ---------- +test: multi_deparse_shard_query test: multi_basic_queries multi_complex_expressions multi_verify_no_subquery test: multi_explain test: multi_subquery diff --git a/src/test/regress/sql/multi_deparse_shard_query.sql b/src/test/regress/sql/multi_deparse_shard_query.sql new file mode 100644 index 000000000..e52242238 --- /dev/null +++ b/src/test/regress/sql/multi_deparse_shard_query.sql @@ -0,0 +1,304 @@ +-- +-- MULTI_DEPARSE_SHARD_QUERY +-- + + +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 13100000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 13100000; + +CREATE FUNCTION deparse_shard_query_test(text) + RETURNS VOID + AS 'citus' + LANGUAGE C STRICT; + +-- create the first table +CREATE TABLE raw_events_1 + (tenant_id bigint, + value_1 int, + value_2 int, + value_3 float, + value_4 bigint, + value_5 text, + value_6 int DEfAULT 10, + value_7 int, + event_at date DEfAULT now() + ); + +SELECT master_create_distributed_table('raw_events_1', 'tenant_id', 'hash'); +SELECT master_create_worker_shards('raw_events_1', 4, 1); + +-- create the first table +CREATE TABLE raw_events_2 + (tenant_id bigint, + value_1 int, + value_2 int, + value_3 float, + value_4 bigint, + value_5 text, + value_6 float DEfAULT (random()*100)::float, + value_7 int, + event_at date DEfAULT now() + ); + +SELECT master_create_distributed_table('raw_events_2', 'tenant_id', 'hash'); +SELECT master_create_worker_shards('raw_events_2', 4, 1); + +CREATE TABLE aggregated_events + (tenant_id bigint, + sum_value_1 bigint, + average_value_2 float, + average_value_3 float, + sum_value_4 bigint, + sum_value_5 float, + average_value_6 int, + rollup_hour date); + +SELECT master_create_distributed_table('aggregated_events', 'tenant_id', 'hash'); +SELECT master_create_worker_shards('aggregated_events', 4, 1); + + +-- start with very simple examples on a single table +SELECT deparse_shard_query_test(' +INSERT INTO raw_events_1 +SELECT * FROM raw_events_1; +'); + +SELECT deparse_shard_query_test(' +INSERT INTO raw_events_1(tenant_id, value_4) +SELECT + tenant_id, value_4 +FROM + raw_events_1; +'); + +-- now that shuffle columns a bit on a single table +SELECT deparse_shard_query_test(' +INSERT INTO raw_events_1(value_5, value_2, tenant_id, value_4) +SELECT + value_2::text, value_5::int, tenant_id, value_4 +FROM + raw_events_1; +'); + +-- same test on two different tables +SELECT deparse_shard_query_test(' +INSERT INTO raw_events_1(value_5, value_2, tenant_id, value_4) +SELECT + value_2::text, value_5::int, tenant_id, value_4 +FROM + raw_events_2; +'); + +-- lets do some simple aggregations +SELECT deparse_shard_query_test(E' +INSERT INTO aggregated_events (tenant_id, rollup_hour, sum_value_1, average_value_3, average_value_6, sum_value_4) +SELECT + tenant_id, date_trunc(\'hour\', event_at) , sum(value_1), avg(value_3), avg(value_6), sum(value_4) +FROM + raw_events_1 +GROUP BY + tenant_id, date_trunc(\'hour\', event_at) +'); + + +-- also some subqueries, JOINS with a complicated target lists +-- a simple JOIN +SELECT deparse_shard_query_test(' +INSERT INTO raw_events_1 (value_3, tenant_id) +SELECT + raw_events_2.value_3, raw_events_1.tenant_id +FROM + raw_events_1, raw_events_2 +WHERE + raw_events_1.tenant_id = raw_events_2.tenant_id; +'); + +-- join with group by +SELECT deparse_shard_query_test(' +INSERT INTO raw_events_1 (value_3, tenant_id) +SELECT + max(raw_events_2.value_3), avg(raw_events_1.value_3) +FROM + raw_events_1, raw_events_2 +WHERE + raw_events_1.tenant_id = raw_events_2.tenant_id GROUP BY raw_events_1.event_at +'); + +-- a more complicated JOIN +SELECT deparse_shard_query_test(' +INSERT INTO aggregated_events (sum_value_4, tenant_id) +SELECT + max(r1.value_4), r3.tenant_id +FROM + raw_events_1 r1, raw_events_2 r2, raw_events_1 r3 +WHERE + r1.tenant_id = r2.tenant_id AND r2.tenant_id = r3.tenant_id +GROUP BY + r1.value_1, r3.tenant_id, r2.event_at +ORDER BY + r2.event_at DESC; +'); + + +-- queries with CTEs are supported +SELECT deparse_shard_query_test(' +WITH first_tenant AS (SELECT event_at, value_5, tenant_id FROM raw_events_1) +INSERT INTO aggregated_events (rollup_hour, sum_value_5, tenant_id) +SELECT + event_at, sum(value_5::int), tenant_id +FROM + raw_events_1 +GROUP BY + event_at, tenant_id; +'); + +SELECT deparse_shard_query_test(' +WITH first_tenant AS (SELECT event_at, value_5, tenant_id FROM raw_events_1) +INSERT INTO aggregated_events (sum_value_5, tenant_id) +SELECT + sum(value_5::int), tenant_id +FROM + raw_events_1 +GROUP BY + event_at, tenant_id; +'); + +SELECT deparse_shard_query_test(' +INSERT INTO aggregated_events (sum_value_1, sum_value_5, tenant_id) +WITH RECURSIVE hierarchy as ( + SELECT value_1, 1 AS LEVEL, tenant_id + FROM raw_events_1 + WHERE tenant_id = 1 + UNION + SELECT re.value_2, (h.level+1), re.tenant_id + FROM hierarchy h JOIN raw_events_1 re + ON (h.tenant_id = re.tenant_id AND + h.value_1 = re.value_6)) +SELECT * FROM hierarchy WHERE LEVEL <= 2; +'); + + +SELECT deparse_shard_query_test(' +INSERT INTO aggregated_events (sum_value_1) +SELECT + DISTINCT value_1 +FROM + raw_events_1; +'); + + +-- many filters suffled +SELECT deparse_shard_query_test(E' +INSERT INTO aggregated_events (sum_value_5, sum_value_1, tenant_id) +SELECT value_3, value_2, tenant_id + FROM raw_events_1 + WHERE (value_5 like \'%s\' or value_5 like \'%a\') and (tenant_id = 1) and (value_6 < 3000 or value_3 > 8000); +'); + +SELECT deparse_shard_query_test(E' +INSERT INTO aggregated_events (sum_value_5, tenant_id) +SELECT rank() OVER (PARTITION BY tenant_id ORDER BY value_6), tenant_id + FROM raw_events_1 + WHERE event_at = now(); +'); + +SELECT deparse_shard_query_test(E' +INSERT INTO aggregated_events (sum_value_5, tenant_id, sum_value_4) +SELECT random(), int4eq(1, max(value_1))::int, value_6 + FROM raw_events_1 + WHERE event_at = now() + GROUP BY event_at, value_7, value_6; +'); + +SELECT deparse_shard_query_test(' +INSERT INTO aggregated_events (sum_value_1, tenant_id) +SELECT + count(DISTINCT CASE + WHEN + value_1 > 100 + THEN + tenant_id + ELSE + value_6 + END) as c, + max(tenant_id) + FROM + raw_events_1; +'); + +SELECT deparse_shard_query_test(' +INSERT INTO raw_events_1(value_7, value_1, tenant_id) +SELECT + value_7, value_1, tenant_id +FROM + (SELECT + tenant_id, value_2 as value_7, value_1 + FROM + raw_events_2 + ) as foo +'); + +SELECT deparse_shard_query_test(E' +INSERT INTO aggregated_events(sum_value_1, tenant_id, sum_value_5) +SELECT + sum(value_1), tenant_id, sum(value_5::bigint) +FROM + (SELECT + raw_events_1.event_at, raw_events_2.tenant_id, raw_events_2.value_5, raw_events_1.value_1 + FROM + raw_events_2, raw_events_1 + WHERE + raw_events_1.tenant_id = raw_events_2.tenant_id + ) as foo +GROUP BY + tenant_id, date_trunc(\'hour\', event_at) +'); + + +SELECT deparse_shard_query_test(E' +INSERT INTO raw_events_2(tenant_id, value_1, value_2, value_3, value_4) +SELECT + tenant_id, value_1, value_2, value_3, value_4 +FROM + (SELECT + value_2, value_4, tenant_id, value_1, value_3 + FROM + raw_events_1 + ) as foo +'); + + +SELECT deparse_shard_query_test(E' +INSERT INTO raw_events_2(tenant_id, value_1, value_4, value_2, value_3) +SELECT + * +FROM + (SELECT + value_2, value_4, tenant_id, value_1, value_3 + FROM + raw_events_1 + ) as foo +'); + + +-- use a column multiple times +SELECT deparse_shard_query_test(' +INSERT INTO raw_events_1(tenant_id, value_7, value_4) +SELECT + tenant_id, value_7, value_7 +FROM + raw_events_1 +ORDER BY + value_2, value_1; +'); + +-- test dropped table as well +ALTER TABLE raw_events_1 DROP COLUMN value_5; + +SELECT deparse_shard_query_test(' +INSERT INTO raw_events_1(tenant_id, value_7, value_4) +SELECT + tenant_id, value_7, value_4 +FROM + raw_events_1; +');