mirror of https://github.com/citusdata/citus.git
Merge 086f43d55d
into 770d09b48e
commit
542fd1ee8e
|
@ -44,6 +44,7 @@
|
|||
#include "optimizer/clauses.h"
|
||||
#include "optimizer/predtest.h"
|
||||
#include "optimizer/restrictinfo.h"
|
||||
#include "optimizer/var.h"
|
||||
#include "parser/parsetree.h"
|
||||
#include "storage/lock.h"
|
||||
#include "utils/elog.h"
|
||||
|
@ -90,7 +91,7 @@ static bool UpdateRelationNames(Node *node,
|
|||
static Job * RouterQueryJob(Query *query, Task *task, List *placementList);
|
||||
static bool MultiRouterPlannableQuery(Query *query, MultiExecutorType taskExecutorType,
|
||||
RelationRestrictionContext *restrictionContext);
|
||||
|
||||
static bool InsertSelectQuery(Query *query);
|
||||
|
||||
/*
|
||||
* MultiRouterPlanCreate creates a physical plan for given query. The created plan is
|
||||
|
@ -1547,3 +1548,190 @@ MultiRouterPlannableQuery(Query *query, MultiExecutorType taskExecutorType,
|
|||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ReorderInsertSelectTargetListsIfExists reorders the target lists of INSERT/SELECT
|
||||
* query which is required for deparsing purposes. The reordered query is returned.
|
||||
*
|
||||
* The necessity for this function comes from the fact that ruleutils.c is not supposed to be
|
||||
* used on "rewritten" queries (i.e. ones that have been passed through QueryRewrite()).
|
||||
* Query rewriting is the process in which views and such are expanded,
|
||||
* and, INSERT/UPDATE targetlists are reordered to match the physical order,
|
||||
* defaults etc. For the details of reordeing, see transformInsertRow().
|
||||
*/
|
||||
Query *
|
||||
ReorderInsertSelectTargetListsIfExists(Query *originalQuery)
|
||||
{
|
||||
RangeTblEntry *insertRte = NULL;
|
||||
RangeTblEntry *subqueryRte = NULL;
|
||||
Query *subquery = NULL;
|
||||
ListCell *insertTargetEntryCell;
|
||||
List *newSubqueryTargetlist = NIL;
|
||||
List *newInsertTargetlist = NIL;
|
||||
int resno = 1;
|
||||
Index insertTableId = 1;
|
||||
int updatedSubqueryEntryCount = 0;
|
||||
Oid insertRelationId = InvalidOid;
|
||||
int subqueryTargetLength = 0;
|
||||
|
||||
/* we only apply the reording for INSERT ... SELECT queries */
|
||||
if (!InsertSelectQuery(originalQuery))
|
||||
{
|
||||
return originalQuery;
|
||||
}
|
||||
|
||||
insertRte = linitial(originalQuery->rtable);
|
||||
subqueryRte = lsecond(originalQuery->rtable);
|
||||
subquery = subqueryRte->subquery;
|
||||
|
||||
insertRelationId = insertRte->relid;
|
||||
|
||||
/*
|
||||
* We implement the following algorithm for the reoderding:
|
||||
* - Iterate over the INSERT target list entries
|
||||
* - If the target entry includes a Var, find the corresponding
|
||||
* SELECT target entry on the original query and update resno
|
||||
* - If the target entry does not include a Var (i.e., defaults),
|
||||
* create new target entry and add that to SELECT target list
|
||||
* - Create a new INSERT target entry with respect to the new
|
||||
* SELECT target entry created.
|
||||
*/
|
||||
foreach(insertTargetEntryCell, originalQuery->targetList)
|
||||
{
|
||||
TargetEntry *oldInsertTargetEntry = lfirst(insertTargetEntryCell);
|
||||
TargetEntry *newInsertTargetEntry = NULL;
|
||||
Var *newInsertVar = NULL;
|
||||
TargetEntry *newSubqueryTargetEntry = NULL;
|
||||
List *targetVarList = NULL;
|
||||
int targetVarCount = 0;
|
||||
AttrNumber originalAttrNo = get_attnum(insertRelationId,
|
||||
oldInsertTargetEntry->resname);
|
||||
|
||||
|
||||
/* see transformInsertRow() for the details */
|
||||
if (IsA(oldInsertTargetEntry->expr, ArrayRef) ||
|
||||
IsA(oldInsertTargetEntry->expr, FieldStore))
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE),
|
||||
errmsg("cannot plan distributed INSERT INTO .. SELECT query"),
|
||||
errhint("Do not use array references and field stores "
|
||||
"on the INSERT target list.")));
|
||||
}
|
||||
|
||||
/*
|
||||
* It is safe to pull Var clause and ignore the coercions since that
|
||||
* are already going to be added on the workers implicitly.
|
||||
*/
|
||||
targetVarList = pull_var_clause((Node *) oldInsertTargetEntry->expr,
|
||||
PVC_RECURSE_AGGREGATES,
|
||||
PVC_RECURSE_PLACEHOLDERS);
|
||||
targetVarCount = list_length(targetVarList);
|
||||
|
||||
/* a single INSERT target entry cannot have more than one Var */
|
||||
Assert(targetVarCount <= 1);
|
||||
|
||||
if (targetVarCount == 1)
|
||||
{
|
||||
Var *oldInsertVar = (Var *) linitial(targetVarList);
|
||||
TargetEntry *oldSubqueryTle = list_nth(subquery->targetList,
|
||||
oldInsertVar->varattno - 1);
|
||||
|
||||
newSubqueryTargetEntry = copyObject(oldSubqueryTle);
|
||||
|
||||
newSubqueryTargetEntry->resno = resno;
|
||||
newSubqueryTargetlist = lappend(newSubqueryTargetlist,
|
||||
newSubqueryTargetEntry);
|
||||
|
||||
updatedSubqueryEntryCount++;
|
||||
}
|
||||
else
|
||||
{
|
||||
newSubqueryTargetEntry = makeTargetEntry(oldInsertTargetEntry->expr,
|
||||
resno,
|
||||
oldInsertTargetEntry->resname,
|
||||
oldInsertTargetEntry->resjunk);
|
||||
newSubqueryTargetlist = lappend(newSubqueryTargetlist,
|
||||
newSubqueryTargetEntry);
|
||||
}
|
||||
|
||||
newInsertVar = makeVar(insertTableId, originalAttrNo,
|
||||
exprType(
|
||||
(Node *) newSubqueryTargetEntry->expr),
|
||||
exprTypmod(
|
||||
(Node *) newSubqueryTargetEntry->expr),
|
||||
exprCollation(
|
||||
(Node *) newSubqueryTargetEntry->expr),
|
||||
0);
|
||||
newInsertTargetEntry = makeTargetEntry((Expr *) newInsertVar, originalAttrNo,
|
||||
oldInsertTargetEntry->resname,
|
||||
oldInsertTargetEntry->resjunk);
|
||||
|
||||
newInsertTargetlist = lappend(newInsertTargetlist, newInsertTargetEntry);
|
||||
resno++;
|
||||
}
|
||||
|
||||
/*
|
||||
* if there are any remaining target list entries (i.e., GROUP BY column not on the
|
||||
* target list of subquery), update the remaining resnos.
|
||||
*/
|
||||
subqueryTargetLength = list_length(subquery->targetList);
|
||||
if (subqueryTargetLength != updatedSubqueryEntryCount)
|
||||
{
|
||||
int targetEntryIndex = updatedSubqueryEntryCount;
|
||||
|
||||
for (; targetEntryIndex < subqueryTargetLength; ++targetEntryIndex)
|
||||
{
|
||||
TargetEntry *oldSubqueryTle = list_nth(subquery->targetList,
|
||||
targetEntryIndex);
|
||||
TargetEntry *newSubqueryTargetEntry = copyObject(oldSubqueryTle);
|
||||
|
||||
Assert(newSubqueryTargetEntry->resjunk == true);
|
||||
|
||||
newSubqueryTargetEntry->resno = resno;
|
||||
newSubqueryTargetlist = lappend(newSubqueryTargetlist,
|
||||
newSubqueryTargetEntry);
|
||||
|
||||
resno++;
|
||||
}
|
||||
}
|
||||
|
||||
originalQuery->targetList = newInsertTargetlist;
|
||||
subquery->targetList = newSubqueryTargetlist;
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* InsertSelectQuery returns true when the input query
|
||||
* is INSERT INTO ... SELECT kind of query.
|
||||
*/
|
||||
static bool
|
||||
InsertSelectQuery(Query *query)
|
||||
{
|
||||
CmdType commandType = query->commandType;
|
||||
List *rangeTableList = query->rtable;
|
||||
RangeTblEntry *subqueryRte = NULL;
|
||||
Query *subquery = NULL;
|
||||
|
||||
if (commandType != CMD_INSERT)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
rangeTableList = query->rtable;
|
||||
if (list_length(rangeTableList) < 2)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
subqueryRte = lsecond(query->rtable);
|
||||
subquery = subqueryRte->subquery;
|
||||
if (subquery == NULL)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,70 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* test/src/depase_shard_query.c
|
||||
*
|
||||
* This file contains functions to exercise deparsing of INSERT .. SELECT queries
|
||||
* for distributed tables.
|
||||
*
|
||||
* Copyright (c) 2014-2016, Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#include "postgres.h"
|
||||
#include "c.h"
|
||||
#include "fmgr.h"
|
||||
|
||||
#include <stddef.h>
|
||||
|
||||
#include "catalog/pg_type.h"
|
||||
#include "distributed/master_protocol.h"
|
||||
#include "distributed/citus_ruleutils.h"
|
||||
#include "distributed/multi_router_planner.h"
|
||||
#include "distributed/test_helper_functions.h" /* IWYU pragma: keep */
|
||||
#include "lib/stringinfo.h"
|
||||
#include "nodes/makefuncs.h"
|
||||
#include "nodes/nodes.h"
|
||||
#include "nodes/parsenodes.h"
|
||||
#include "nodes/pg_list.h"
|
||||
#include "nodes/value.h"
|
||||
#include "tcop/tcopprot.h"
|
||||
#include "utils/array.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/palloc.h"
|
||||
|
||||
|
||||
/* declarations for dynamic loading */
|
||||
PG_FUNCTION_INFO_V1(deparse_shard_query_test);
|
||||
|
||||
|
||||
Datum
|
||||
deparse_shard_query_test(PG_FUNCTION_ARGS)
|
||||
{
|
||||
text *queryString = PG_GETARG_TEXT_P(0);
|
||||
|
||||
char *queryStringChar = text_to_cstring(queryString);
|
||||
List *parseTreeList = pg_parse_query(queryStringChar);
|
||||
ListCell *parseTreeCell = NULL;
|
||||
|
||||
foreach(parseTreeCell, parseTreeList)
|
||||
{
|
||||
Node *parsetree = (Node *) lfirst(parseTreeCell);
|
||||
ListCell *queryTreeCell = NULL;
|
||||
|
||||
List *queryTreeList = pg_analyze_and_rewrite(parsetree, queryStringChar,
|
||||
NULL, 0);
|
||||
|
||||
foreach(queryTreeCell, queryTreeList)
|
||||
{
|
||||
Query *query = lfirst(queryTreeCell);
|
||||
StringInfo buffer = makeStringInfo();
|
||||
|
||||
ReorderInsertSelectTargetListsIfExists(query);
|
||||
deparse_shard_query(query, InvalidOid, 0, buffer);
|
||||
|
||||
elog(INFO, "query: %s", buffer->data);
|
||||
}
|
||||
}
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
|
@ -1,6 +1,6 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* test/src/test_helper_functions.c
|
||||
* le * test/src/test_helper_functions.c
|
||||
*
|
||||
* This file contains helper functions used in many Citus tests.
|
||||
*
|
||||
|
|
|
@ -33,5 +33,6 @@ extern MultiPlan * MultiRouterPlanCreate(Query *originalQuery, Query *query,
|
|||
MultiExecutorType taskExecutorType,
|
||||
RelationRestrictionContext *restrictionContext);
|
||||
extern void ErrorIfModifyQueryNotSupported(Query *queryTree);
|
||||
extern Query * ReorderInsertSelectTargetListsIfExists(Query *originalQuery);
|
||||
|
||||
#endif /* MULTI_ROUTER_PLANNER_H */
|
||||
|
|
|
@ -0,0 +1,428 @@
|
|||
--
|
||||
-- MULTI_DEPARSE_SHARD_QUERY
|
||||
--
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 13100000;
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 13100000;
|
||||
CREATE FUNCTION deparse_shard_query_test(text)
|
||||
RETURNS VOID
|
||||
AS 'citus'
|
||||
LANGUAGE C STRICT;
|
||||
-- create the first table
|
||||
CREATE TABLE raw_events_1
|
||||
(tenant_id bigint,
|
||||
value_1 int,
|
||||
value_2 int,
|
||||
value_3 float,
|
||||
value_4 bigint,
|
||||
value_5 text,
|
||||
value_6 int DEfAULT 10,
|
||||
value_7 int,
|
||||
event_at date DEfAULT now()
|
||||
);
|
||||
SELECT master_create_distributed_table('raw_events_1', 'tenant_id', 'hash');
|
||||
master_create_distributed_table
|
||||
---------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT master_create_worker_shards('raw_events_1', 4, 1);
|
||||
master_create_worker_shards
|
||||
-----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- create the first table
|
||||
CREATE TABLE raw_events_2
|
||||
(tenant_id bigint,
|
||||
value_1 int,
|
||||
value_2 int,
|
||||
value_3 float,
|
||||
value_4 bigint,
|
||||
value_5 text,
|
||||
value_6 float DEfAULT (random()*100)::float,
|
||||
value_7 int,
|
||||
event_at date DEfAULT now()
|
||||
);
|
||||
SELECT master_create_distributed_table('raw_events_2', 'tenant_id', 'hash');
|
||||
master_create_distributed_table
|
||||
---------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT master_create_worker_shards('raw_events_2', 4, 1);
|
||||
master_create_worker_shards
|
||||
-----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE aggregated_events
|
||||
(tenant_id bigint,
|
||||
sum_value_1 bigint,
|
||||
average_value_2 float,
|
||||
average_value_3 float,
|
||||
sum_value_4 bigint,
|
||||
sum_value_5 float,
|
||||
average_value_6 int,
|
||||
rollup_hour date);
|
||||
SELECT master_create_distributed_table('aggregated_events', 'tenant_id', 'hash');
|
||||
master_create_distributed_table
|
||||
---------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT master_create_worker_shards('aggregated_events', 4, 1);
|
||||
master_create_worker_shards
|
||||
-----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- start with very simple examples on a single table
|
||||
SELECT deparse_shard_query_test('
|
||||
INSERT INTO raw_events_1
|
||||
SELECT * FROM raw_events_1;
|
||||
');
|
||||
INFO: query: INSERT INTO public.raw_events_1 (tenant_id, value_1, value_2, value_3, value_4, value_5, value_6, value_7, event_at) SELECT tenant_id, value_1, value_2, value_3, value_4, value_5, value_6, value_7, event_at FROM public.raw_events_1
|
||||
deparse_shard_query_test
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT deparse_shard_query_test('
|
||||
INSERT INTO raw_events_1(tenant_id, value_4)
|
||||
SELECT
|
||||
tenant_id, value_4
|
||||
FROM
|
||||
raw_events_1;
|
||||
');
|
||||
INFO: query: INSERT INTO public.raw_events_1 (tenant_id, value_4, value_6, event_at) SELECT tenant_id, value_4, 10 AS value_6, (now())::date AS event_at FROM public.raw_events_1
|
||||
deparse_shard_query_test
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- now that shuffle columns a bit on a single table
|
||||
SELECT deparse_shard_query_test('
|
||||
INSERT INTO raw_events_1(value_5, value_2, tenant_id, value_4)
|
||||
SELECT
|
||||
value_2::text, value_5::int, tenant_id, value_4
|
||||
FROM
|
||||
raw_events_1;
|
||||
');
|
||||
INFO: query: INSERT INTO public.raw_events_1 (tenant_id, value_2, value_4, value_5, value_6, event_at) SELECT tenant_id, (value_5)::integer AS value_5, value_4, (value_2)::text AS value_2, 10 AS value_6, (now())::date AS event_at FROM public.raw_events_1
|
||||
deparse_shard_query_test
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- same test on two different tables
|
||||
SELECT deparse_shard_query_test('
|
||||
INSERT INTO raw_events_1(value_5, value_2, tenant_id, value_4)
|
||||
SELECT
|
||||
value_2::text, value_5::int, tenant_id, value_4
|
||||
FROM
|
||||
raw_events_2;
|
||||
');
|
||||
INFO: query: INSERT INTO public.raw_events_1 (tenant_id, value_2, value_4, value_5, value_6, event_at) SELECT tenant_id, (value_5)::integer AS value_5, value_4, (value_2)::text AS value_2, 10 AS value_6, (now())::date AS event_at FROM public.raw_events_2
|
||||
deparse_shard_query_test
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- lets do some simple aggregations
|
||||
SELECT deparse_shard_query_test(E'
|
||||
INSERT INTO aggregated_events (tenant_id, rollup_hour, sum_value_1, average_value_3, average_value_6, sum_value_4)
|
||||
SELECT
|
||||
tenant_id, date_trunc(\'hour\', event_at) , sum(value_1), avg(value_3), avg(value_6), sum(value_4)
|
||||
FROM
|
||||
raw_events_1
|
||||
GROUP BY
|
||||
tenant_id, date_trunc(\'hour\', event_at)
|
||||
');
|
||||
INFO: query: INSERT INTO public.aggregated_events (tenant_id, sum_value_1, average_value_3, sum_value_4, average_value_6, rollup_hour) SELECT tenant_id, sum(value_1) AS sum, avg(value_3) AS avg, sum(value_4) AS sum, avg(value_6) AS avg, date_trunc('hour'::text, (event_at)::timestamp with time zone) AS date_trunc FROM public.raw_events_1 GROUP BY tenant_id, (date_trunc('hour'::text, (event_at)::timestamp with time zone))
|
||||
deparse_shard_query_test
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- also some subqueries, JOINS with a complicated target lists
|
||||
-- a simple JOIN
|
||||
SELECT deparse_shard_query_test('
|
||||
INSERT INTO raw_events_1 (value_3, tenant_id)
|
||||
SELECT
|
||||
raw_events_2.value_3, raw_events_1.tenant_id
|
||||
FROM
|
||||
raw_events_1, raw_events_2
|
||||
WHERE
|
||||
raw_events_1.tenant_id = raw_events_2.tenant_id;
|
||||
');
|
||||
INFO: query: INSERT INTO public.raw_events_1 (tenant_id, value_3, value_6, event_at) SELECT raw_events_1.tenant_id, raw_events_2.value_3, 10 AS value_6, (now())::date AS event_at FROM public.raw_events_1, public.raw_events_2 WHERE (raw_events_1.tenant_id = raw_events_2.tenant_id)
|
||||
deparse_shard_query_test
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- join with group by
|
||||
SELECT deparse_shard_query_test('
|
||||
INSERT INTO raw_events_1 (value_3, tenant_id)
|
||||
SELECT
|
||||
max(raw_events_2.value_3), avg(raw_events_1.value_3)
|
||||
FROM
|
||||
raw_events_1, raw_events_2
|
||||
WHERE
|
||||
raw_events_1.tenant_id = raw_events_2.tenant_id GROUP BY raw_events_1.event_at
|
||||
');
|
||||
INFO: query: INSERT INTO public.raw_events_1 (tenant_id, value_3, value_6, event_at) SELECT avg(raw_events_1.value_3) AS avg, max(raw_events_2.value_3) AS max, 10 AS value_6, (now())::date AS event_at FROM public.raw_events_1, public.raw_events_2 WHERE (raw_events_1.tenant_id = raw_events_2.tenant_id) GROUP BY raw_events_1.event_at
|
||||
deparse_shard_query_test
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- a more complicated JOIN
|
||||
SELECT deparse_shard_query_test('
|
||||
INSERT INTO aggregated_events (sum_value_4, tenant_id)
|
||||
SELECT
|
||||
max(r1.value_4), r3.tenant_id
|
||||
FROM
|
||||
raw_events_1 r1, raw_events_2 r2, raw_events_1 r3
|
||||
WHERE
|
||||
r1.tenant_id = r2.tenant_id AND r2.tenant_id = r3.tenant_id
|
||||
GROUP BY
|
||||
r1.value_1, r3.tenant_id, r2.event_at
|
||||
ORDER BY
|
||||
r2.event_at DESC;
|
||||
');
|
||||
INFO: query: INSERT INTO public.aggregated_events (tenant_id, sum_value_4) SELECT r3.tenant_id, max(r1.value_4) AS max FROM public.raw_events_1 r1, public.raw_events_2 r2, public.raw_events_1 r3 WHERE ((r1.tenant_id = r2.tenant_id) AND (r2.tenant_id = r3.tenant_id)) GROUP BY r1.value_1, r3.tenant_id, r2.event_at ORDER BY r2.event_at DESC
|
||||
deparse_shard_query_test
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- queries with CTEs are supported
|
||||
SELECT deparse_shard_query_test('
|
||||
WITH first_tenant AS (SELECT event_at, value_5, tenant_id FROM raw_events_1)
|
||||
INSERT INTO aggregated_events (rollup_hour, sum_value_5, tenant_id)
|
||||
SELECT
|
||||
event_at, sum(value_5::int), tenant_id
|
||||
FROM
|
||||
raw_events_1
|
||||
GROUP BY
|
||||
event_at, tenant_id;
|
||||
');
|
||||
INFO: query: WITH first_tenant AS (SELECT raw_events_1.event_at, raw_events_1.value_5, raw_events_1.tenant_id FROM public.raw_events_1) INSERT INTO public.aggregated_events (tenant_id, sum_value_5, rollup_hour) SELECT tenant_id, sum((value_5)::integer) AS sum, event_at FROM public.raw_events_1 GROUP BY event_at, tenant_id
|
||||
deparse_shard_query_test
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT deparse_shard_query_test('
|
||||
WITH first_tenant AS (SELECT event_at, value_5, tenant_id FROM raw_events_1)
|
||||
INSERT INTO aggregated_events (sum_value_5, tenant_id)
|
||||
SELECT
|
||||
sum(value_5::int), tenant_id
|
||||
FROM
|
||||
raw_events_1
|
||||
GROUP BY
|
||||
event_at, tenant_id;
|
||||
');
|
||||
INFO: query: WITH first_tenant AS (SELECT raw_events_1.event_at, raw_events_1.value_5, raw_events_1.tenant_id FROM public.raw_events_1) INSERT INTO public.aggregated_events (tenant_id, sum_value_5) SELECT tenant_id, sum((value_5)::integer) AS sum FROM public.raw_events_1 GROUP BY event_at, tenant_id
|
||||
deparse_shard_query_test
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT deparse_shard_query_test('
|
||||
INSERT INTO aggregated_events (sum_value_1, sum_value_5, tenant_id)
|
||||
WITH RECURSIVE hierarchy as (
|
||||
SELECT value_1, 1 AS LEVEL, tenant_id
|
||||
FROM raw_events_1
|
||||
WHERE tenant_id = 1
|
||||
UNION
|
||||
SELECT re.value_2, (h.level+1), re.tenant_id
|
||||
FROM hierarchy h JOIN raw_events_1 re
|
||||
ON (h.tenant_id = re.tenant_id AND
|
||||
h.value_1 = re.value_6))
|
||||
SELECT * FROM hierarchy WHERE LEVEL <= 2;
|
||||
');
|
||||
INFO: query: INSERT INTO public.aggregated_events (tenant_id, sum_value_1, sum_value_5) WITH RECURSIVE hierarchy AS (SELECT raw_events_1.value_1, 1 AS level, raw_events_1.tenant_id FROM public.raw_events_1 WHERE (raw_events_1.tenant_id = 1) UNION SELECT re.value_2, (h.level + 1), re.tenant_id FROM (hierarchy h JOIN public.raw_events_1 re ON (((h.tenant_id = re.tenant_id) AND (h.value_1 = re.value_6))))) SELECT tenant_id, value_1, level FROM hierarchy WHERE (level <= 2)
|
||||
deparse_shard_query_test
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT deparse_shard_query_test('
|
||||
INSERT INTO aggregated_events (sum_value_1)
|
||||
SELECT
|
||||
DISTINCT value_1
|
||||
FROM
|
||||
raw_events_1;
|
||||
');
|
||||
INFO: query: INSERT INTO public.aggregated_events (sum_value_1) SELECT DISTINCT value_1 FROM public.raw_events_1
|
||||
deparse_shard_query_test
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- many filters suffled
|
||||
SELECT deparse_shard_query_test(E'
|
||||
INSERT INTO aggregated_events (sum_value_5, sum_value_1, tenant_id)
|
||||
SELECT value_3, value_2, tenant_id
|
||||
FROM raw_events_1
|
||||
WHERE (value_5 like \'%s\' or value_5 like \'%a\') and (tenant_id = 1) and (value_6 < 3000 or value_3 > 8000);
|
||||
');
|
||||
INFO: query: INSERT INTO public.aggregated_events (tenant_id, sum_value_1, sum_value_5) SELECT tenant_id, value_2, value_3 FROM public.raw_events_1 WHERE (((value_5 ~~ '%s'::text) OR (value_5 ~~ '%a'::text)) AND (tenant_id = 1) AND ((value_6 < 3000) OR (value_3 > (8000)::double precision)))
|
||||
deparse_shard_query_test
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT deparse_shard_query_test(E'
|
||||
INSERT INTO aggregated_events (sum_value_5, tenant_id)
|
||||
SELECT rank() OVER (PARTITION BY tenant_id ORDER BY value_6), tenant_id
|
||||
FROM raw_events_1
|
||||
WHERE event_at = now();
|
||||
');
|
||||
INFO: query: INSERT INTO public.aggregated_events (tenant_id, sum_value_5) SELECT tenant_id, rank() OVER (PARTITION BY tenant_id ORDER BY value_6) AS rank FROM public.raw_events_1 WHERE (event_at = now())
|
||||
deparse_shard_query_test
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT deparse_shard_query_test(E'
|
||||
INSERT INTO aggregated_events (sum_value_5, tenant_id, sum_value_4)
|
||||
SELECT random(), int4eq(1, max(value_1))::int, value_6
|
||||
FROM raw_events_1
|
||||
WHERE event_at = now()
|
||||
GROUP BY event_at, value_7, value_6;
|
||||
');
|
||||
INFO: query: INSERT INTO public.aggregated_events (tenant_id, sum_value_4, sum_value_5) SELECT (int4eq(1, max(value_1)))::integer AS int4eq, value_6, random() AS random FROM public.raw_events_1 WHERE (event_at = now()) GROUP BY event_at, value_7, value_6
|
||||
deparse_shard_query_test
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT deparse_shard_query_test('
|
||||
INSERT INTO aggregated_events (sum_value_1, tenant_id)
|
||||
SELECT
|
||||
count(DISTINCT CASE
|
||||
WHEN
|
||||
value_1 > 100
|
||||
THEN
|
||||
tenant_id
|
||||
ELSE
|
||||
value_6
|
||||
END) as c,
|
||||
max(tenant_id)
|
||||
FROM
|
||||
raw_events_1;
|
||||
');
|
||||
INFO: query: INSERT INTO public.aggregated_events (tenant_id, sum_value_1) SELECT max(tenant_id) AS max, count(DISTINCT CASE WHEN (value_1 > 100) THEN tenant_id ELSE (value_6)::bigint END) AS c FROM public.raw_events_1
|
||||
deparse_shard_query_test
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT deparse_shard_query_test('
|
||||
INSERT INTO raw_events_1(value_7, value_1, tenant_id)
|
||||
SELECT
|
||||
value_7, value_1, tenant_id
|
||||
FROM
|
||||
(SELECT
|
||||
tenant_id, value_2 as value_7, value_1
|
||||
FROM
|
||||
raw_events_2
|
||||
) as foo
|
||||
');
|
||||
INFO: query: INSERT INTO public.raw_events_1 (tenant_id, value_1, value_6, value_7, event_at) SELECT tenant_id, value_1, 10 AS value_6, value_7, (now())::date AS event_at FROM (SELECT raw_events_2.tenant_id, raw_events_2.value_2 AS value_7, raw_events_2.value_1 FROM public.raw_events_2) foo
|
||||
deparse_shard_query_test
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT deparse_shard_query_test(E'
|
||||
INSERT INTO aggregated_events(sum_value_1, tenant_id, sum_value_5)
|
||||
SELECT
|
||||
sum(value_1), tenant_id, sum(value_5::bigint)
|
||||
FROM
|
||||
(SELECT
|
||||
raw_events_1.event_at, raw_events_2.tenant_id, raw_events_2.value_5, raw_events_1.value_1
|
||||
FROM
|
||||
raw_events_2, raw_events_1
|
||||
WHERE
|
||||
raw_events_1.tenant_id = raw_events_2.tenant_id
|
||||
) as foo
|
||||
GROUP BY
|
||||
tenant_id, date_trunc(\'hour\', event_at)
|
||||
');
|
||||
INFO: query: INSERT INTO public.aggregated_events (tenant_id, sum_value_1, sum_value_5) SELECT tenant_id, sum(value_1) AS sum, sum((value_5)::bigint) AS sum FROM (SELECT raw_events_1.event_at, raw_events_2.tenant_id, raw_events_2.value_5, raw_events_1.value_1 FROM public.raw_events_2, public.raw_events_1 WHERE (raw_events_1.tenant_id = raw_events_2.tenant_id)) foo GROUP BY tenant_id, (date_trunc('hour'::text, (event_at)::timestamp with time zone))
|
||||
deparse_shard_query_test
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT deparse_shard_query_test(E'
|
||||
INSERT INTO raw_events_2(tenant_id, value_1, value_2, value_3, value_4)
|
||||
SELECT
|
||||
tenant_id, value_1, value_2, value_3, value_4
|
||||
FROM
|
||||
(SELECT
|
||||
value_2, value_4, tenant_id, value_1, value_3
|
||||
FROM
|
||||
raw_events_1
|
||||
) as foo
|
||||
');
|
||||
INFO: query: INSERT INTO public.raw_events_2 (tenant_id, value_1, value_2, value_3, value_4, value_6, event_at) SELECT tenant_id, value_1, value_2, value_3, value_4, (random() * (100)::double precision) AS value_6, (now())::date AS event_at FROM (SELECT raw_events_1.value_2, raw_events_1.value_4, raw_events_1.tenant_id, raw_events_1.value_1, raw_events_1.value_3 FROM public.raw_events_1) foo
|
||||
deparse_shard_query_test
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT deparse_shard_query_test(E'
|
||||
INSERT INTO raw_events_2(tenant_id, value_1, value_4, value_2, value_3)
|
||||
SELECT
|
||||
*
|
||||
FROM
|
||||
(SELECT
|
||||
value_2, value_4, tenant_id, value_1, value_3
|
||||
FROM
|
||||
raw_events_1
|
||||
) as foo
|
||||
');
|
||||
INFO: query: INSERT INTO public.raw_events_2 (tenant_id, value_1, value_2, value_3, value_4, value_6, event_at) SELECT value_2, value_4, value_1, value_3, tenant_id, (random() * (100)::double precision) AS value_6, (now())::date AS event_at FROM (SELECT raw_events_1.value_2, raw_events_1.value_4, raw_events_1.tenant_id, raw_events_1.value_1, raw_events_1.value_3 FROM public.raw_events_1) foo
|
||||
deparse_shard_query_test
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- use a column multiple times
|
||||
SELECT deparse_shard_query_test('
|
||||
INSERT INTO raw_events_1(tenant_id, value_7, value_4)
|
||||
SELECT
|
||||
tenant_id, value_7, value_7
|
||||
FROM
|
||||
raw_events_1
|
||||
ORDER BY
|
||||
value_2, value_1;
|
||||
');
|
||||
INFO: query: INSERT INTO public.raw_events_1 (tenant_id, value_4, value_6, value_7, event_at) SELECT tenant_id, value_7, 10 AS value_6, value_7, (now())::date AS event_at FROM public.raw_events_1 ORDER BY value_2, value_1
|
||||
deparse_shard_query_test
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- test dropped table as well
|
||||
ALTER TABLE raw_events_1 DROP COLUMN value_5;
|
||||
NOTICE: using one-phase commit for distributed DDL commands
|
||||
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
|
||||
SELECT deparse_shard_query_test('
|
||||
INSERT INTO raw_events_1(tenant_id, value_7, value_4)
|
||||
SELECT
|
||||
tenant_id, value_7, value_4
|
||||
FROM
|
||||
raw_events_1;
|
||||
');
|
||||
INFO: query: INSERT INTO public.raw_events_1 (tenant_id, value_4, value_6, value_7, event_at) SELECT tenant_id, value_4, 10 AS value_6, value_7, (now())::date AS event_at FROM public.raw_events_1
|
||||
deparse_shard_query_test
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
|
@ -32,6 +32,7 @@ test: multi_load_data
|
|||
# ----------
|
||||
# Miscellaneous tests to check our query planning behavior
|
||||
# ----------
|
||||
test: multi_deparse_shard_query
|
||||
test: multi_basic_queries multi_complex_expressions multi_verify_no_subquery
|
||||
test: multi_explain
|
||||
test: multi_subquery
|
||||
|
|
|
@ -0,0 +1,304 @@
|
|||
--
|
||||
-- MULTI_DEPARSE_SHARD_QUERY
|
||||
--
|
||||
|
||||
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 13100000;
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 13100000;
|
||||
|
||||
CREATE FUNCTION deparse_shard_query_test(text)
|
||||
RETURNS VOID
|
||||
AS 'citus'
|
||||
LANGUAGE C STRICT;
|
||||
|
||||
-- create the first table
|
||||
CREATE TABLE raw_events_1
|
||||
(tenant_id bigint,
|
||||
value_1 int,
|
||||
value_2 int,
|
||||
value_3 float,
|
||||
value_4 bigint,
|
||||
value_5 text,
|
||||
value_6 int DEfAULT 10,
|
||||
value_7 int,
|
||||
event_at date DEfAULT now()
|
||||
);
|
||||
|
||||
SELECT master_create_distributed_table('raw_events_1', 'tenant_id', 'hash');
|
||||
SELECT master_create_worker_shards('raw_events_1', 4, 1);
|
||||
|
||||
-- create the first table
|
||||
CREATE TABLE raw_events_2
|
||||
(tenant_id bigint,
|
||||
value_1 int,
|
||||
value_2 int,
|
||||
value_3 float,
|
||||
value_4 bigint,
|
||||
value_5 text,
|
||||
value_6 float DEfAULT (random()*100)::float,
|
||||
value_7 int,
|
||||
event_at date DEfAULT now()
|
||||
);
|
||||
|
||||
SELECT master_create_distributed_table('raw_events_2', 'tenant_id', 'hash');
|
||||
SELECT master_create_worker_shards('raw_events_2', 4, 1);
|
||||
|
||||
CREATE TABLE aggregated_events
|
||||
(tenant_id bigint,
|
||||
sum_value_1 bigint,
|
||||
average_value_2 float,
|
||||
average_value_3 float,
|
||||
sum_value_4 bigint,
|
||||
sum_value_5 float,
|
||||
average_value_6 int,
|
||||
rollup_hour date);
|
||||
|
||||
SELECT master_create_distributed_table('aggregated_events', 'tenant_id', 'hash');
|
||||
SELECT master_create_worker_shards('aggregated_events', 4, 1);
|
||||
|
||||
|
||||
-- start with very simple examples on a single table
|
||||
SELECT deparse_shard_query_test('
|
||||
INSERT INTO raw_events_1
|
||||
SELECT * FROM raw_events_1;
|
||||
');
|
||||
|
||||
SELECT deparse_shard_query_test('
|
||||
INSERT INTO raw_events_1(tenant_id, value_4)
|
||||
SELECT
|
||||
tenant_id, value_4
|
||||
FROM
|
||||
raw_events_1;
|
||||
');
|
||||
|
||||
-- now that shuffle columns a bit on a single table
|
||||
SELECT deparse_shard_query_test('
|
||||
INSERT INTO raw_events_1(value_5, value_2, tenant_id, value_4)
|
||||
SELECT
|
||||
value_2::text, value_5::int, tenant_id, value_4
|
||||
FROM
|
||||
raw_events_1;
|
||||
');
|
||||
|
||||
-- same test on two different tables
|
||||
SELECT deparse_shard_query_test('
|
||||
INSERT INTO raw_events_1(value_5, value_2, tenant_id, value_4)
|
||||
SELECT
|
||||
value_2::text, value_5::int, tenant_id, value_4
|
||||
FROM
|
||||
raw_events_2;
|
||||
');
|
||||
|
||||
-- lets do some simple aggregations
|
||||
SELECT deparse_shard_query_test(E'
|
||||
INSERT INTO aggregated_events (tenant_id, rollup_hour, sum_value_1, average_value_3, average_value_6, sum_value_4)
|
||||
SELECT
|
||||
tenant_id, date_trunc(\'hour\', event_at) , sum(value_1), avg(value_3), avg(value_6), sum(value_4)
|
||||
FROM
|
||||
raw_events_1
|
||||
GROUP BY
|
||||
tenant_id, date_trunc(\'hour\', event_at)
|
||||
');
|
||||
|
||||
|
||||
-- also some subqueries, JOINS with a complicated target lists
|
||||
-- a simple JOIN
|
||||
SELECT deparse_shard_query_test('
|
||||
INSERT INTO raw_events_1 (value_3, tenant_id)
|
||||
SELECT
|
||||
raw_events_2.value_3, raw_events_1.tenant_id
|
||||
FROM
|
||||
raw_events_1, raw_events_2
|
||||
WHERE
|
||||
raw_events_1.tenant_id = raw_events_2.tenant_id;
|
||||
');
|
||||
|
||||
-- join with group by
|
||||
SELECT deparse_shard_query_test('
|
||||
INSERT INTO raw_events_1 (value_3, tenant_id)
|
||||
SELECT
|
||||
max(raw_events_2.value_3), avg(raw_events_1.value_3)
|
||||
FROM
|
||||
raw_events_1, raw_events_2
|
||||
WHERE
|
||||
raw_events_1.tenant_id = raw_events_2.tenant_id GROUP BY raw_events_1.event_at
|
||||
');
|
||||
|
||||
-- a more complicated JOIN
|
||||
SELECT deparse_shard_query_test('
|
||||
INSERT INTO aggregated_events (sum_value_4, tenant_id)
|
||||
SELECT
|
||||
max(r1.value_4), r3.tenant_id
|
||||
FROM
|
||||
raw_events_1 r1, raw_events_2 r2, raw_events_1 r3
|
||||
WHERE
|
||||
r1.tenant_id = r2.tenant_id AND r2.tenant_id = r3.tenant_id
|
||||
GROUP BY
|
||||
r1.value_1, r3.tenant_id, r2.event_at
|
||||
ORDER BY
|
||||
r2.event_at DESC;
|
||||
');
|
||||
|
||||
|
||||
-- queries with CTEs are supported
|
||||
SELECT deparse_shard_query_test('
|
||||
WITH first_tenant AS (SELECT event_at, value_5, tenant_id FROM raw_events_1)
|
||||
INSERT INTO aggregated_events (rollup_hour, sum_value_5, tenant_id)
|
||||
SELECT
|
||||
event_at, sum(value_5::int), tenant_id
|
||||
FROM
|
||||
raw_events_1
|
||||
GROUP BY
|
||||
event_at, tenant_id;
|
||||
');
|
||||
|
||||
SELECT deparse_shard_query_test('
|
||||
WITH first_tenant AS (SELECT event_at, value_5, tenant_id FROM raw_events_1)
|
||||
INSERT INTO aggregated_events (sum_value_5, tenant_id)
|
||||
SELECT
|
||||
sum(value_5::int), tenant_id
|
||||
FROM
|
||||
raw_events_1
|
||||
GROUP BY
|
||||
event_at, tenant_id;
|
||||
');
|
||||
|
||||
SELECT deparse_shard_query_test('
|
||||
INSERT INTO aggregated_events (sum_value_1, sum_value_5, tenant_id)
|
||||
WITH RECURSIVE hierarchy as (
|
||||
SELECT value_1, 1 AS LEVEL, tenant_id
|
||||
FROM raw_events_1
|
||||
WHERE tenant_id = 1
|
||||
UNION
|
||||
SELECT re.value_2, (h.level+1), re.tenant_id
|
||||
FROM hierarchy h JOIN raw_events_1 re
|
||||
ON (h.tenant_id = re.tenant_id AND
|
||||
h.value_1 = re.value_6))
|
||||
SELECT * FROM hierarchy WHERE LEVEL <= 2;
|
||||
');
|
||||
|
||||
|
||||
SELECT deparse_shard_query_test('
|
||||
INSERT INTO aggregated_events (sum_value_1)
|
||||
SELECT
|
||||
DISTINCT value_1
|
||||
FROM
|
||||
raw_events_1;
|
||||
');
|
||||
|
||||
|
||||
-- many filters suffled
|
||||
SELECT deparse_shard_query_test(E'
|
||||
INSERT INTO aggregated_events (sum_value_5, sum_value_1, tenant_id)
|
||||
SELECT value_3, value_2, tenant_id
|
||||
FROM raw_events_1
|
||||
WHERE (value_5 like \'%s\' or value_5 like \'%a\') and (tenant_id = 1) and (value_6 < 3000 or value_3 > 8000);
|
||||
');
|
||||
|
||||
SELECT deparse_shard_query_test(E'
|
||||
INSERT INTO aggregated_events (sum_value_5, tenant_id)
|
||||
SELECT rank() OVER (PARTITION BY tenant_id ORDER BY value_6), tenant_id
|
||||
FROM raw_events_1
|
||||
WHERE event_at = now();
|
||||
');
|
||||
|
||||
SELECT deparse_shard_query_test(E'
|
||||
INSERT INTO aggregated_events (sum_value_5, tenant_id, sum_value_4)
|
||||
SELECT random(), int4eq(1, max(value_1))::int, value_6
|
||||
FROM raw_events_1
|
||||
WHERE event_at = now()
|
||||
GROUP BY event_at, value_7, value_6;
|
||||
');
|
||||
|
||||
SELECT deparse_shard_query_test('
|
||||
INSERT INTO aggregated_events (sum_value_1, tenant_id)
|
||||
SELECT
|
||||
count(DISTINCT CASE
|
||||
WHEN
|
||||
value_1 > 100
|
||||
THEN
|
||||
tenant_id
|
||||
ELSE
|
||||
value_6
|
||||
END) as c,
|
||||
max(tenant_id)
|
||||
FROM
|
||||
raw_events_1;
|
||||
');
|
||||
|
||||
SELECT deparse_shard_query_test('
|
||||
INSERT INTO raw_events_1(value_7, value_1, tenant_id)
|
||||
SELECT
|
||||
value_7, value_1, tenant_id
|
||||
FROM
|
||||
(SELECT
|
||||
tenant_id, value_2 as value_7, value_1
|
||||
FROM
|
||||
raw_events_2
|
||||
) as foo
|
||||
');
|
||||
|
||||
SELECT deparse_shard_query_test(E'
|
||||
INSERT INTO aggregated_events(sum_value_1, tenant_id, sum_value_5)
|
||||
SELECT
|
||||
sum(value_1), tenant_id, sum(value_5::bigint)
|
||||
FROM
|
||||
(SELECT
|
||||
raw_events_1.event_at, raw_events_2.tenant_id, raw_events_2.value_5, raw_events_1.value_1
|
||||
FROM
|
||||
raw_events_2, raw_events_1
|
||||
WHERE
|
||||
raw_events_1.tenant_id = raw_events_2.tenant_id
|
||||
) as foo
|
||||
GROUP BY
|
||||
tenant_id, date_trunc(\'hour\', event_at)
|
||||
');
|
||||
|
||||
|
||||
SELECT deparse_shard_query_test(E'
|
||||
INSERT INTO raw_events_2(tenant_id, value_1, value_2, value_3, value_4)
|
||||
SELECT
|
||||
tenant_id, value_1, value_2, value_3, value_4
|
||||
FROM
|
||||
(SELECT
|
||||
value_2, value_4, tenant_id, value_1, value_3
|
||||
FROM
|
||||
raw_events_1
|
||||
) as foo
|
||||
');
|
||||
|
||||
|
||||
SELECT deparse_shard_query_test(E'
|
||||
INSERT INTO raw_events_2(tenant_id, value_1, value_4, value_2, value_3)
|
||||
SELECT
|
||||
*
|
||||
FROM
|
||||
(SELECT
|
||||
value_2, value_4, tenant_id, value_1, value_3
|
||||
FROM
|
||||
raw_events_1
|
||||
) as foo
|
||||
');
|
||||
|
||||
|
||||
-- use a column multiple times
|
||||
SELECT deparse_shard_query_test('
|
||||
INSERT INTO raw_events_1(tenant_id, value_7, value_4)
|
||||
SELECT
|
||||
tenant_id, value_7, value_7
|
||||
FROM
|
||||
raw_events_1
|
||||
ORDER BY
|
||||
value_2, value_1;
|
||||
');
|
||||
|
||||
-- test dropped table as well
|
||||
ALTER TABLE raw_events_1 DROP COLUMN value_5;
|
||||
|
||||
SELECT deparse_shard_query_test('
|
||||
INSERT INTO raw_events_1(tenant_id, value_7, value_4)
|
||||
SELECT
|
||||
tenant_id, value_7, value_4
|
||||
FROM
|
||||
raw_events_1;
|
||||
');
|
Loading…
Reference in New Issue