From 9fb897a074d122ede92462f3b440adde5b1de29e Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Fri, 8 Nov 2019 15:36:18 +0100 Subject: [PATCH] Fix queries with repartition joins and group by unique column (#3157) Postgres doesn't require you to add all columns that are in the target list to the GROUP BY when you group by a unique column (or columns). It even actively removes these group by clauses when you do. This is normally fine, but for repartition joins it is not. The reason for this is that the temporary tables don't have these primary key columns. So when the worker executes the query it will complain that it is missing columns in the group by. This PR fixes that by adding an ANY_VALUE aggregate around each variable in the target list that does is not contained in the group by or in an aggregate. This is done only for repartition joins. The ANY_VALUE aggregate chooses the value from an undefined row in the group. --- .../planner/multi_logical_optimizer.c | 15 +- .../planner/multi_physical_planner.c | 80 ++++ .../distributed/sql/citus--9.0-2--9.1-1.sql | 2 + .../distributed/sql/udfs/any_value/9.1-1.sql | 14 + .../distributed/sql/udfs/any_value/latest.sql | 14 + .../distributed/utils/metadata_cache.c | 16 + src/include/distributed/metadata_cache.h | 1 + .../distributed/multi_logical_optimizer.h | 6 +- .../multi_repartition_join_planning.out | 341 ++++++++++++++++++ .../sql/multi_repartition_join_planning.sql | 75 ++++ 10 files changed, 560 insertions(+), 4 deletions(-) create mode 100644 src/backend/distributed/sql/udfs/any_value/9.1-1.sql create mode 100644 src/backend/distributed/sql/udfs/any_value/latest.sql diff --git a/src/backend/distributed/planner/multi_logical_optimizer.c b/src/backend/distributed/planner/multi_logical_optimizer.c index 948c10fae..0da294be9 100644 --- a/src/backend/distributed/planner/multi_logical_optimizer.c +++ b/src/backend/distributed/planner/multi_logical_optimizer.c @@ -1847,6 +1847,7 @@ MasterAggregateExpression(Aggref *originalAggregate, */ Var *column = NULL; TargetEntry *columnTargetEntry = NULL; + Aggref *newMasterAggregate = NULL; /* worker aggregate and original aggregate have the same return type */ Oid workerReturnType = exprType((Node *) originalAggregate); @@ -1857,7 +1858,16 @@ MasterAggregateExpression(Aggref *originalAggregate, Oid aggregateFunctionId = AggregateFunctionOid(aggregateName, workerReturnType); Oid masterReturnType = get_func_rettype(aggregateFunctionId); - Aggref *newMasterAggregate = copyObject(originalAggregate); + /* + * If return type aggregate is anyelement, it's actual return type is + * determined on the type of its argument. So we replace it with the + * argument type in that case. + */ + if (masterReturnType == ANYELEMENTOID) + { + masterReturnType = workerReturnType; + } + newMasterAggregate = copyObject(originalAggregate); newMasterAggregate->aggdistinct = NULL; newMasterAggregate->aggfnoid = aggregateFunctionId; newMasterAggregate->aggtype = masterReturnType; @@ -2968,7 +2978,8 @@ AggregateFunctionOid(const char *functionName, Oid inputType) if (argumentCount == 1) { /* check if input type and found value type match */ - if (procForm->proargtypes.values[0] == inputType) + if (procForm->proargtypes.values[0] == inputType || + procForm->proargtypes.values[0] == ANYELEMENTOID) { #if PG_VERSION_NUM < 120000 functionOid = HeapTupleGetOid(heapTuple); diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index aa50b3e01..9da71ec31 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -24,6 +24,7 @@ #include "access/nbtree.h" #include "access/skey.h" #include "access/xlog.h" +#include "catalog/pg_aggregate.h" #include "catalog/pg_am.h" #include "catalog/pg_operator.h" #include "catalog/pg_type.h" @@ -203,6 +204,7 @@ static StringInfo IntermediateTableQueryString(uint64 jobId, uint32 taskIdIndex, static uint32 FinalTargetEntryCount(List *targetEntryList); static bool CoPlacedShardIntervals(ShardInterval *firstInterval, ShardInterval *secondInterval); +static Node * AddAnyValueAggregates(Node *node, void *context); /* @@ -593,6 +595,7 @@ BuildJobQuery(MultiNode *multiNode, List *dependedJobList) Node *havingQual = NULL; bool hasDistinctOn = false; List *distinctClause = NIL; + bool isRepartitionJoin = false; /* we start building jobs from below the collect node */ Assert(!CitusIsA(multiNode, MultiCollect)); @@ -623,6 +626,7 @@ BuildJobQuery(MultiNode *multiNode, List *dependedJobList) if (CitusIsA(job, MapMergeJob)) { MapMergeJob *mapMergeJob = (MapMergeJob *) job; + isRepartitionJoin = true; if (mapMergeJob->reduceQuery) { updateColumnAttributes = false; @@ -672,6 +676,7 @@ BuildJobQuery(MultiNode *multiNode, List *dependedJobList) /* build group clauses */ groupClauseList = QueryGroupClauseList(multiNode); + /* build the where clause list using select predicates */ selectClauseList = QuerySelectClauseList(multiNode); @@ -683,6 +688,23 @@ BuildJobQuery(MultiNode *multiNode, List *dependedJobList) UpdateAllColumnAttributes(havingQual, rangeTableList, dependedJobList); } + /* + * Group by on primary key allows all columns to appear in the target + * list, but after re-partitioning we will be querying an intermediate + * table that does not have the primary key. We therefore wrap all the + * columns that do not appear in the GROUP BY in an any_value aggregate. + */ + if (groupClauseList != NIL && isRepartitionJoin) + { + targetList = (List *) expression_tree_mutator((Node *) targetList, + AddAnyValueAggregates, + groupClauseList); + + havingQual = expression_tree_mutator((Node *) havingQual, + AddAnyValueAggregates, + groupClauseList); + } + /* * Build the From/Where construct. We keep the where-clause list implicitly * AND'd, since both partition and join pruning depends on the clauses being @@ -955,6 +977,63 @@ TargetEntryList(List *expressionList) } +/* + * AddAnyValueAggregates wraps all vars that do not apear in the GROUP BY + * clause or are inside an aggregate function in an any_value aggregate + * function. This is needed for repartition joins because primary keys are not + * present on intermediate tables. + */ +static Node * +AddAnyValueAggregates(Node *node, void *context) +{ + List *groupClauseList = context; + if (node == NULL) + { + return node; + } + + if (IsA(node, Var)) + { + Var *var = (Var *) node; + Aggref *agg = makeNode(Aggref); + agg->aggfnoid = CitusAnyValueFunctionId(); + agg->aggtype = var->vartype; + agg->args = list_make1(makeTargetEntry((Expr *) var, 1, NULL, false)); + agg->aggkind = AGGKIND_NORMAL; + agg->aggtranstype = InvalidOid; + agg->aggargtypes = list_make1_oid(var->vartype); + agg->aggsplit = AGGSPLIT_SIMPLE; + return (Node *) agg; + } + if (IsA(node, TargetEntry)) + { + TargetEntry *targetEntry = (TargetEntry *) node; + + + /* + * Stop searching this part of the tree if the targetEntry is part of + * the group by clause. + */ + if (targetEntry->ressortgroupref != 0) + { + SortGroupClause *sortGroupClause = NULL; + foreach_ptr(sortGroupClause, groupClauseList) + { + if (sortGroupClause->tleSortGroupRef == targetEntry->ressortgroupref) + { + return node; + } + } + } + } + if (IsA(node, Aggref)) + { + return node; + } + return expression_tree_mutator(node, AddAnyValueAggregates, context); +} + + /* * QueryGroupClauseList extracts the group clause list from the logical plan. If * no grouping clauses exist, the function returns an empty list. @@ -1489,6 +1568,7 @@ BuildSubqueryJobQuery(MultiNode *multiNode) distinctClause = NIL; } + /* * Build the From/Where construct. We keep the where-clause list implicitly * AND'd, since both partition and join pruning depends on the clauses being diff --git a/src/backend/distributed/sql/citus--9.0-2--9.1-1.sql b/src/backend/distributed/sql/citus--9.0-2--9.1-1.sql index 4da3f4560..ef538bb3e 100644 --- a/src/backend/distributed/sql/citus--9.0-2--9.1-1.sql +++ b/src/backend/distributed/sql/citus--9.0-2--9.1-1.sql @@ -8,3 +8,5 @@ COMMENT ON COLUMN pg_catalog.pg_dist_node.shouldhaveshards IS -- we don't maintain replication factor of reference tables anymore and just -- use -1 instead. UPDATE pg_dist_colocation SET replicationfactor = -1 WHERE distributioncolumntype = 0; + +#include "udfs/any_value/9.1-1.sql" diff --git a/src/backend/distributed/sql/udfs/any_value/9.1-1.sql b/src/backend/distributed/sql/udfs/any_value/9.1-1.sql new file mode 100644 index 000000000..7eb9fdb25 --- /dev/null +++ b/src/backend/distributed/sql/udfs/any_value/9.1-1.sql @@ -0,0 +1,14 @@ +CREATE OR REPLACE FUNCTION pg_catalog.any_value_agg ( anyelement, anyelement ) +RETURNS anyelement AS $$ + SELECT CASE WHEN $1 IS NULL THEN $2 ELSE $1 END; +$$ LANGUAGE SQL STABLE; + +CREATE AGGREGATE pg_catalog.any_value ( + sfunc = pg_catalog.any_value_agg, + combinefunc = pg_catalog.any_value_agg, + basetype = anyelement, + stype = anyelement +); +COMMENT ON AGGREGATE pg_catalog.any_value(anyelement) IS + 'Returns the value of any row in the group. It is mostly useful when you know there will be only 1 element.'; + diff --git a/src/backend/distributed/sql/udfs/any_value/latest.sql b/src/backend/distributed/sql/udfs/any_value/latest.sql new file mode 100644 index 000000000..7eb9fdb25 --- /dev/null +++ b/src/backend/distributed/sql/udfs/any_value/latest.sql @@ -0,0 +1,14 @@ +CREATE OR REPLACE FUNCTION pg_catalog.any_value_agg ( anyelement, anyelement ) +RETURNS anyelement AS $$ + SELECT CASE WHEN $1 IS NULL THEN $2 ELSE $1 END; +$$ LANGUAGE SQL STABLE; + +CREATE AGGREGATE pg_catalog.any_value ( + sfunc = pg_catalog.any_value_agg, + combinefunc = pg_catalog.any_value_agg, + basetype = anyelement, + stype = anyelement +); +COMMENT ON AGGREGATE pg_catalog.any_value(anyelement) IS + 'Returns the value of any row in the group. It is mostly useful when you know there will be only 1 element.'; + diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 7a75c919a..9f952e4ea 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -136,6 +136,7 @@ typedef struct MetadataCacheData Oid readIntermediateResultFuncId; Oid extraDataContainerFuncId; Oid workerHashFunctionId; + Oid anyValueFunctionId; Oid textSendAsJsonbFunctionId; Oid extensionOwner; Oid binaryCopyFormatId; @@ -2216,6 +2217,21 @@ CitusWorkerHashFunctionId(void) } +/* return oid of the any_value aggregate function */ +Oid +CitusAnyValueFunctionId(void) +{ + if (MetadataCache.anyValueFunctionId == InvalidOid) + { + const int argCount = 1; + MetadataCache.anyValueFunctionId = + FunctionOid("pg_catalog", "any_value", argCount); + } + + return MetadataCache.anyValueFunctionId; +} + + /* return oid of the citus_text_send_as_jsonb(text) function */ Oid CitusTextSendAsJsonbFunctionId(void) diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index f590a6035..5b5b746f8 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -188,6 +188,7 @@ extern Oid CitusCopyFormatTypeId(void); extern Oid CitusReadIntermediateResultFuncId(void); extern Oid CitusExtraDataContainerFuncId(void); extern Oid CitusWorkerHashFunctionId(void); +extern Oid CitusAnyValueFunctionId(void); extern Oid CitusTextSendAsJsonbFunctionId(void); extern Oid PgTableVisibleFuncId(void); extern Oid CitusTableVisibleFuncId(void); diff --git a/src/include/distributed/multi_logical_optimizer.h b/src/include/distributed/multi_logical_optimizer.h index 7ef725b57..46c7e422d 100644 --- a/src/include/distributed/multi_logical_optimizer.h +++ b/src/include/distributed/multi_logical_optimizer.h @@ -75,7 +75,8 @@ typedef enum AGGREGATE_HLL_ADD = 16, AGGREGATE_HLL_UNION = 17, AGGREGATE_TOPN_ADD_AGG = 18, - AGGREGATE_TOPN_UNION_AGG = 19 + AGGREGATE_TOPN_UNION_AGG = 19, + AGGREGATE_ANY_VALUE = 20 } AggregateType; @@ -122,7 +123,8 @@ static const char *const AggregateNames[] = { "json_agg", "json_object_agg", "bit_and", "bit_or", "bool_and", "bool_or", "every", "hll_add_agg", "hll_union_agg", - "topn_add_agg", "topn_union_agg" + "topn_add_agg", "topn_union_agg", + "any_value" }; diff --git a/src/test/regress/expected/multi_repartition_join_planning.out b/src/test/regress/expected/multi_repartition_join_planning.out index 3b34f42f0..5fd294540 100644 --- a/src/test/regress/expected/multi_repartition_join_planning.out +++ b/src/test/regress/expected/multi_repartition_join_planning.out @@ -7,6 +7,38 @@ -- executor here, as we cannot run repartition jobs with real time executor. SET citus.next_shard_id TO 690000; SET citus.enable_unique_job_ids TO off; +create schema repartition_join; +DROP TABLE IF EXISTS repartition_join.order_line; +NOTICE: table "order_line" does not exist, skipping +CREATE TABLE order_line ( + ol_w_id int NOT NULL, + ol_d_id int NOT NULL, + ol_o_id int NOT NULL, + ol_number int NOT NULL, + ol_i_id int NOT NULL, + ol_quantity decimal(2,0) NOT NULL, + PRIMARY KEY (ol_w_id,ol_d_id,ol_o_id,ol_number) +); +DROP TABLE IF EXISTS repartition_join.stock; +NOTICE: table "stock" does not exist, skipping +CREATE TABLE stock ( + s_w_id int NOT NULL, + s_i_id int NOT NULL, + s_quantity decimal(4,0) NOT NULL, + PRIMARY KEY (s_w_id,s_i_id) +); +SELECT create_distributed_table('order_line','ol_w_id'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT create_distributed_table('stock','s_w_id'); + create_distributed_table +-------------------------- + +(1 row) + BEGIN; SET client_min_messages TO DEBUG4; SET citus.task_executor_type TO 'task-tracker'; @@ -187,6 +219,315 @@ DEBUG: completed cleanup query for job 5 -----------+------------+------- (0 rows) +-- Check that grouping by primary key allows o_shippriority to be in the target list +SELECT + o_orderkey, o_shippriority, count(*) +FROM + lineitem, orders +WHERE + l_suppkey = o_shippriority +GROUP BY + o_orderkey +ORDER BY + o_orderkey; +DEBUG: Router planner does not support append-partitioned tables. +DEBUG: generated sql query for task 1 +DETAIL: query string: "SELECT l_suppkey FROM lineitem_290000 lineitem WHERE true" +DEBUG: generated sql query for task 2 +DETAIL: query string: "SELECT l_suppkey FROM lineitem_290001 lineitem WHERE true" +DEBUG: assigned task 2 to node localhost:57637 +DEBUG: assigned task 1 to node localhost:57638 +DEBUG: generated sql query for task 1 +DETAIL: query string: "SELECT o_orderkey, o_shippriority FROM orders_290002 orders WHERE true" +DEBUG: generated sql query for task 2 +DETAIL: query string: "SELECT o_orderkey, o_shippriority FROM orders_290003 orders WHERE true" +DEBUG: assigned task 2 to node localhost:57637 +DEBUG: assigned task 1 to node localhost:57638 +DEBUG: join prunable for task partitionId 0 and 1 +DEBUG: join prunable for task partitionId 0 and 2 +DEBUG: join prunable for task partitionId 0 and 3 +DEBUG: join prunable for task partitionId 1 and 0 +DEBUG: join prunable for task partitionId 1 and 2 +DEBUG: join prunable for task partitionId 1 and 3 +DEBUG: join prunable for task partitionId 2 and 0 +DEBUG: join prunable for task partitionId 2 and 1 +DEBUG: join prunable for task partitionId 2 and 3 +DEBUG: join prunable for task partitionId 3 and 0 +DEBUG: join prunable for task partitionId 3 and 1 +DEBUG: join prunable for task partitionId 3 and 2 +DEBUG: generated sql query for task 3 +DETAIL: query string: "SELECT "pg_merge_job_0008.task_000003".intermediate_column_8_0 AS o_orderkey, any_value("pg_merge_job_0008.task_000003".intermediate_column_8_1) AS o_shippriority, count(*) AS count FROM (pg_merge_job_0007.task_000003 "pg_merge_job_0007.task_000003" JOIN pg_merge_job_0008.task_000003 "pg_merge_job_0008.task_000003" ON (("pg_merge_job_0007.task_000003".intermediate_column_7_0 OPERATOR(pg_catalog.=) "pg_merge_job_0008.task_000003".intermediate_column_8_1))) WHERE true GROUP BY "pg_merge_job_0008.task_000003".intermediate_column_8_0" +DEBUG: generated sql query for task 6 +DETAIL: query string: "SELECT "pg_merge_job_0008.task_000006".intermediate_column_8_0 AS o_orderkey, any_value("pg_merge_job_0008.task_000006".intermediate_column_8_1) AS o_shippriority, count(*) AS count FROM (pg_merge_job_0007.task_000006 "pg_merge_job_0007.task_000006" JOIN pg_merge_job_0008.task_000006 "pg_merge_job_0008.task_000006" ON (("pg_merge_job_0007.task_000006".intermediate_column_7_0 OPERATOR(pg_catalog.=) "pg_merge_job_0008.task_000006".intermediate_column_8_1))) WHERE true GROUP BY "pg_merge_job_0008.task_000006".intermediate_column_8_0" +DEBUG: generated sql query for task 9 +DETAIL: query string: "SELECT "pg_merge_job_0008.task_000009".intermediate_column_8_0 AS o_orderkey, any_value("pg_merge_job_0008.task_000009".intermediate_column_8_1) AS o_shippriority, count(*) AS count FROM (pg_merge_job_0007.task_000009 "pg_merge_job_0007.task_000009" JOIN pg_merge_job_0008.task_000009 "pg_merge_job_0008.task_000009" ON (("pg_merge_job_0007.task_000009".intermediate_column_7_0 OPERATOR(pg_catalog.=) "pg_merge_job_0008.task_000009".intermediate_column_8_1))) WHERE true GROUP BY "pg_merge_job_0008.task_000009".intermediate_column_8_0" +DEBUG: generated sql query for task 12 +DETAIL: query string: "SELECT "pg_merge_job_0008.task_000012".intermediate_column_8_0 AS o_orderkey, any_value("pg_merge_job_0008.task_000012".intermediate_column_8_1) AS o_shippriority, count(*) AS count FROM (pg_merge_job_0007.task_000012 "pg_merge_job_0007.task_000012" JOIN pg_merge_job_0008.task_000012 "pg_merge_job_0008.task_000012" ON (("pg_merge_job_0007.task_000012".intermediate_column_7_0 OPERATOR(pg_catalog.=) "pg_merge_job_0008.task_000012".intermediate_column_8_1))) WHERE true GROUP BY "pg_merge_job_0008.task_000012".intermediate_column_8_0" +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 3 +DEBUG: pruning merge fetch taskId 2 +DETAIL: Creating dependency on merge taskId 3 +DEBUG: pruning merge fetch taskId 4 +DETAIL: Creating dependency on merge taskId 6 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 6 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 9 +DEBUG: pruning merge fetch taskId 8 +DETAIL: Creating dependency on merge taskId 9 +DEBUG: pruning merge fetch taskId 10 +DETAIL: Creating dependency on merge taskId 12 +DEBUG: pruning merge fetch taskId 11 +DETAIL: Creating dependency on merge taskId 12 +DEBUG: assigned task 3 to node localhost:57638 +DEBUG: assigned task 6 to node localhost:57637 +DEBUG: assigned task 9 to node localhost:57638 +DEBUG: assigned task 12 to node localhost:57637 +DEBUG: completed cleanup query for job 9 +DEBUG: completed cleanup query for job 9 +DEBUG: completed cleanup query for job 7 +DEBUG: completed cleanup query for job 7 +DEBUG: completed cleanup query for job 8 +DEBUG: completed cleanup query for job 8 + o_orderkey | o_shippriority | count +------------+----------------+------- +(0 rows) + +-- Check that grouping by primary key allows o_shippriority to be in the target +-- list +-- Postgres removes o_shippriority from the group by clause here +SELECT + o_orderkey, o_shippriority, count(*) +FROM + lineitem, orders +WHERE + l_suppkey = o_shippriority +GROUP BY + o_orderkey, o_shippriority +ORDER BY + o_orderkey; +DEBUG: Router planner does not support append-partitioned tables. +DEBUG: generated sql query for task 1 +DETAIL: query string: "SELECT l_suppkey FROM lineitem_290000 lineitem WHERE true" +DEBUG: generated sql query for task 2 +DETAIL: query string: "SELECT l_suppkey FROM lineitem_290001 lineitem WHERE true" +DEBUG: assigned task 2 to node localhost:57637 +DEBUG: assigned task 1 to node localhost:57638 +DEBUG: generated sql query for task 1 +DETAIL: query string: "SELECT o_orderkey, o_shippriority FROM orders_290002 orders WHERE true" +DEBUG: generated sql query for task 2 +DETAIL: query string: "SELECT o_orderkey, o_shippriority FROM orders_290003 orders WHERE true" +DEBUG: assigned task 2 to node localhost:57637 +DEBUG: assigned task 1 to node localhost:57638 +DEBUG: join prunable for task partitionId 0 and 1 +DEBUG: join prunable for task partitionId 0 and 2 +DEBUG: join prunable for task partitionId 0 and 3 +DEBUG: join prunable for task partitionId 1 and 0 +DEBUG: join prunable for task partitionId 1 and 2 +DEBUG: join prunable for task partitionId 1 and 3 +DEBUG: join prunable for task partitionId 2 and 0 +DEBUG: join prunable for task partitionId 2 and 1 +DEBUG: join prunable for task partitionId 2 and 3 +DEBUG: join prunable for task partitionId 3 and 0 +DEBUG: join prunable for task partitionId 3 and 1 +DEBUG: join prunable for task partitionId 3 and 2 +DEBUG: generated sql query for task 3 +DETAIL: query string: "SELECT "pg_merge_job_0011.task_000003".intermediate_column_11_0 AS o_orderkey, any_value("pg_merge_job_0011.task_000003".intermediate_column_11_1) AS o_shippriority, count(*) AS count FROM (pg_merge_job_0010.task_000003 "pg_merge_job_0010.task_000003" JOIN pg_merge_job_0011.task_000003 "pg_merge_job_0011.task_000003" ON (("pg_merge_job_0010.task_000003".intermediate_column_10_0 OPERATOR(pg_catalog.=) "pg_merge_job_0011.task_000003".intermediate_column_11_1))) WHERE true GROUP BY "pg_merge_job_0011.task_000003".intermediate_column_11_0" +DEBUG: generated sql query for task 6 +DETAIL: query string: "SELECT "pg_merge_job_0011.task_000006".intermediate_column_11_0 AS o_orderkey, any_value("pg_merge_job_0011.task_000006".intermediate_column_11_1) AS o_shippriority, count(*) AS count FROM (pg_merge_job_0010.task_000006 "pg_merge_job_0010.task_000006" JOIN pg_merge_job_0011.task_000006 "pg_merge_job_0011.task_000006" ON (("pg_merge_job_0010.task_000006".intermediate_column_10_0 OPERATOR(pg_catalog.=) "pg_merge_job_0011.task_000006".intermediate_column_11_1))) WHERE true GROUP BY "pg_merge_job_0011.task_000006".intermediate_column_11_0" +DEBUG: generated sql query for task 9 +DETAIL: query string: "SELECT "pg_merge_job_0011.task_000009".intermediate_column_11_0 AS o_orderkey, any_value("pg_merge_job_0011.task_000009".intermediate_column_11_1) AS o_shippriority, count(*) AS count FROM (pg_merge_job_0010.task_000009 "pg_merge_job_0010.task_000009" JOIN pg_merge_job_0011.task_000009 "pg_merge_job_0011.task_000009" ON (("pg_merge_job_0010.task_000009".intermediate_column_10_0 OPERATOR(pg_catalog.=) "pg_merge_job_0011.task_000009".intermediate_column_11_1))) WHERE true GROUP BY "pg_merge_job_0011.task_000009".intermediate_column_11_0" +DEBUG: generated sql query for task 12 +DETAIL: query string: "SELECT "pg_merge_job_0011.task_000012".intermediate_column_11_0 AS o_orderkey, any_value("pg_merge_job_0011.task_000012".intermediate_column_11_1) AS o_shippriority, count(*) AS count FROM (pg_merge_job_0010.task_000012 "pg_merge_job_0010.task_000012" JOIN pg_merge_job_0011.task_000012 "pg_merge_job_0011.task_000012" ON (("pg_merge_job_0010.task_000012".intermediate_column_10_0 OPERATOR(pg_catalog.=) "pg_merge_job_0011.task_000012".intermediate_column_11_1))) WHERE true GROUP BY "pg_merge_job_0011.task_000012".intermediate_column_11_0" +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 3 +DEBUG: pruning merge fetch taskId 2 +DETAIL: Creating dependency on merge taskId 3 +DEBUG: pruning merge fetch taskId 4 +DETAIL: Creating dependency on merge taskId 6 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 6 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 9 +DEBUG: pruning merge fetch taskId 8 +DETAIL: Creating dependency on merge taskId 9 +DEBUG: pruning merge fetch taskId 10 +DETAIL: Creating dependency on merge taskId 12 +DEBUG: pruning merge fetch taskId 11 +DETAIL: Creating dependency on merge taskId 12 +DEBUG: assigned task 3 to node localhost:57637 +DEBUG: assigned task 6 to node localhost:57638 +DEBUG: assigned task 9 to node localhost:57637 +DEBUG: assigned task 12 to node localhost:57638 +DEBUG: completed cleanup query for job 12 +DEBUG: completed cleanup query for job 12 +DEBUG: completed cleanup query for job 10 +DEBUG: completed cleanup query for job 10 +DEBUG: completed cleanup query for job 11 +DEBUG: completed cleanup query for job 11 + o_orderkey | o_shippriority | count +------------+----------------+------- +(0 rows) + +-- Check that calling any_value manually works as well +SELECT + o_orderkey, any_value(o_shippriority) +FROM + lineitem, orders +WHERE + l_suppkey = o_shippriority +GROUP BY + o_orderkey, o_shippriority +ORDER BY + o_orderkey; +DEBUG: Router planner does not support append-partitioned tables. +DEBUG: generated sql query for task 1 +DETAIL: query string: "SELECT l_suppkey FROM lineitem_290000 lineitem WHERE true" +DEBUG: generated sql query for task 2 +DETAIL: query string: "SELECT l_suppkey FROM lineitem_290001 lineitem WHERE true" +DEBUG: assigned task 2 to node localhost:57637 +DEBUG: assigned task 1 to node localhost:57638 +DEBUG: generated sql query for task 1 +DETAIL: query string: "SELECT o_orderkey, o_shippriority FROM orders_290002 orders WHERE true" +DEBUG: generated sql query for task 2 +DETAIL: query string: "SELECT o_orderkey, o_shippriority FROM orders_290003 orders WHERE true" +DEBUG: assigned task 2 to node localhost:57637 +DEBUG: assigned task 1 to node localhost:57638 +DEBUG: join prunable for task partitionId 0 and 1 +DEBUG: join prunable for task partitionId 0 and 2 +DEBUG: join prunable for task partitionId 0 and 3 +DEBUG: join prunable for task partitionId 1 and 0 +DEBUG: join prunable for task partitionId 1 and 2 +DEBUG: join prunable for task partitionId 1 and 3 +DEBUG: join prunable for task partitionId 2 and 0 +DEBUG: join prunable for task partitionId 2 and 1 +DEBUG: join prunable for task partitionId 2 and 3 +DEBUG: join prunable for task partitionId 3 and 0 +DEBUG: join prunable for task partitionId 3 and 1 +DEBUG: join prunable for task partitionId 3 and 2 +DEBUG: generated sql query for task 3 +DETAIL: query string: "SELECT "pg_merge_job_0014.task_000003".intermediate_column_14_0 AS o_orderkey, any_value("pg_merge_job_0014.task_000003".intermediate_column_14_1) AS any_value, any_value("pg_merge_job_0014.task_000003".intermediate_column_14_1) AS worker_column_3 FROM (pg_merge_job_0013.task_000003 "pg_merge_job_0013.task_000003" JOIN pg_merge_job_0014.task_000003 "pg_merge_job_0014.task_000003" ON (("pg_merge_job_0013.task_000003".intermediate_column_13_0 OPERATOR(pg_catalog.=) "pg_merge_job_0014.task_000003".intermediate_column_14_1))) WHERE true GROUP BY "pg_merge_job_0014.task_000003".intermediate_column_14_0" +DEBUG: generated sql query for task 6 +DETAIL: query string: "SELECT "pg_merge_job_0014.task_000006".intermediate_column_14_0 AS o_orderkey, any_value("pg_merge_job_0014.task_000006".intermediate_column_14_1) AS any_value, any_value("pg_merge_job_0014.task_000006".intermediate_column_14_1) AS worker_column_3 FROM (pg_merge_job_0013.task_000006 "pg_merge_job_0013.task_000006" JOIN pg_merge_job_0014.task_000006 "pg_merge_job_0014.task_000006" ON (("pg_merge_job_0013.task_000006".intermediate_column_13_0 OPERATOR(pg_catalog.=) "pg_merge_job_0014.task_000006".intermediate_column_14_1))) WHERE true GROUP BY "pg_merge_job_0014.task_000006".intermediate_column_14_0" +DEBUG: generated sql query for task 9 +DETAIL: query string: "SELECT "pg_merge_job_0014.task_000009".intermediate_column_14_0 AS o_orderkey, any_value("pg_merge_job_0014.task_000009".intermediate_column_14_1) AS any_value, any_value("pg_merge_job_0014.task_000009".intermediate_column_14_1) AS worker_column_3 FROM (pg_merge_job_0013.task_000009 "pg_merge_job_0013.task_000009" JOIN pg_merge_job_0014.task_000009 "pg_merge_job_0014.task_000009" ON (("pg_merge_job_0013.task_000009".intermediate_column_13_0 OPERATOR(pg_catalog.=) "pg_merge_job_0014.task_000009".intermediate_column_14_1))) WHERE true GROUP BY "pg_merge_job_0014.task_000009".intermediate_column_14_0" +DEBUG: generated sql query for task 12 +DETAIL: query string: "SELECT "pg_merge_job_0014.task_000012".intermediate_column_14_0 AS o_orderkey, any_value("pg_merge_job_0014.task_000012".intermediate_column_14_1) AS any_value, any_value("pg_merge_job_0014.task_000012".intermediate_column_14_1) AS worker_column_3 FROM (pg_merge_job_0013.task_000012 "pg_merge_job_0013.task_000012" JOIN pg_merge_job_0014.task_000012 "pg_merge_job_0014.task_000012" ON (("pg_merge_job_0013.task_000012".intermediate_column_13_0 OPERATOR(pg_catalog.=) "pg_merge_job_0014.task_000012".intermediate_column_14_1))) WHERE true GROUP BY "pg_merge_job_0014.task_000012".intermediate_column_14_0" +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 3 +DEBUG: pruning merge fetch taskId 2 +DETAIL: Creating dependency on merge taskId 3 +DEBUG: pruning merge fetch taskId 4 +DETAIL: Creating dependency on merge taskId 6 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 6 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 9 +DEBUG: pruning merge fetch taskId 8 +DETAIL: Creating dependency on merge taskId 9 +DEBUG: pruning merge fetch taskId 10 +DETAIL: Creating dependency on merge taskId 12 +DEBUG: pruning merge fetch taskId 11 +DETAIL: Creating dependency on merge taskId 12 +DEBUG: assigned task 3 to node localhost:57638 +DEBUG: assigned task 6 to node localhost:57637 +DEBUG: assigned task 9 to node localhost:57638 +DEBUG: assigned task 12 to node localhost:57637 +DEBUG: completed cleanup query for job 15 +DEBUG: completed cleanup query for job 15 +DEBUG: completed cleanup query for job 13 +DEBUG: completed cleanup query for job 13 +DEBUG: completed cleanup query for job 14 +DEBUG: completed cleanup query for job 14 + o_orderkey | any_value +------------+----------- +(0 rows) + +-- Check that grouping by primary key allows s_quantity to be in the having +-- list +-- Postgres removes s_quantity from the group by clause here +select s_i_id + from stock, order_line + where ol_i_id=s_i_id + group by s_i_id, s_w_id, s_quantity + having s_quantity > random() +; +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: generated sql query for task 1 +DETAIL: query string: "SELECT s_i_id, s_w_id, s_quantity FROM stock_690004 stock WHERE true" +DEBUG: generated sql query for task 2 +DETAIL: query string: "SELECT s_i_id, s_w_id, s_quantity FROM stock_690005 stock WHERE true" +DEBUG: generated sql query for task 3 +DETAIL: query string: "SELECT s_i_id, s_w_id, s_quantity FROM stock_690006 stock WHERE true" +DEBUG: generated sql query for task 4 +DETAIL: query string: "SELECT s_i_id, s_w_id, s_quantity FROM stock_690007 stock WHERE true" +DEBUG: assigned task 1 to node localhost:57637 +DEBUG: assigned task 2 to node localhost:57638 +DEBUG: assigned task 3 to node localhost:57637 +DEBUG: assigned task 4 to node localhost:57638 +DEBUG: generated sql query for task 1 +DETAIL: query string: "SELECT ol_i_id FROM order_line_690000 order_line WHERE true" +DEBUG: generated sql query for task 2 +DETAIL: query string: "SELECT ol_i_id FROM order_line_690001 order_line WHERE true" +DEBUG: generated sql query for task 3 +DETAIL: query string: "SELECT ol_i_id FROM order_line_690002 order_line WHERE true" +DEBUG: generated sql query for task 4 +DETAIL: query string: "SELECT ol_i_id FROM order_line_690003 order_line WHERE true" +DEBUG: assigned task 1 to node localhost:57637 +DEBUG: assigned task 2 to node localhost:57638 +DEBUG: assigned task 3 to node localhost:57637 +DEBUG: assigned task 4 to node localhost:57638 +DEBUG: join prunable for task partitionId 0 and 1 +DEBUG: join prunable for task partitionId 0 and 2 +DEBUG: join prunable for task partitionId 0 and 3 +DEBUG: join prunable for task partitionId 1 and 0 +DEBUG: join prunable for task partitionId 1 and 2 +DEBUG: join prunable for task partitionId 1 and 3 +DEBUG: join prunable for task partitionId 2 and 0 +DEBUG: join prunable for task partitionId 2 and 1 +DEBUG: join prunable for task partitionId 2 and 3 +DEBUG: join prunable for task partitionId 3 and 0 +DEBUG: join prunable for task partitionId 3 and 1 +DEBUG: join prunable for task partitionId 3 and 2 +DEBUG: generated sql query for task 3 +DETAIL: query string: "SELECT "pg_merge_job_0016.task_000005".intermediate_column_16_0 AS s_i_id, "pg_merge_job_0016.task_000005".intermediate_column_16_1 AS worker_column_2, any_value("pg_merge_job_0016.task_000005".intermediate_column_16_2) AS worker_column_3, any_value("pg_merge_job_0016.task_000005".intermediate_column_16_2) AS worker_column_4 FROM (pg_merge_job_0016.task_000005 "pg_merge_job_0016.task_000005" JOIN pg_merge_job_0017.task_000005 "pg_merge_job_0017.task_000005" ON (("pg_merge_job_0017.task_000005".intermediate_column_17_0 OPERATOR(pg_catalog.=) "pg_merge_job_0016.task_000005".intermediate_column_16_0))) WHERE true GROUP BY "pg_merge_job_0016.task_000005".intermediate_column_16_0, "pg_merge_job_0016.task_000005".intermediate_column_16_1 HAVING ((any_value("pg_merge_job_0016.task_000005".intermediate_column_16_2))::double precision OPERATOR(pg_catalog.>) random())" +DEBUG: generated sql query for task 6 +DETAIL: query string: "SELECT "pg_merge_job_0016.task_000010".intermediate_column_16_0 AS s_i_id, "pg_merge_job_0016.task_000010".intermediate_column_16_1 AS worker_column_2, any_value("pg_merge_job_0016.task_000010".intermediate_column_16_2) AS worker_column_3, any_value("pg_merge_job_0016.task_000010".intermediate_column_16_2) AS worker_column_4 FROM (pg_merge_job_0016.task_000010 "pg_merge_job_0016.task_000010" JOIN pg_merge_job_0017.task_000010 "pg_merge_job_0017.task_000010" ON (("pg_merge_job_0017.task_000010".intermediate_column_17_0 OPERATOR(pg_catalog.=) "pg_merge_job_0016.task_000010".intermediate_column_16_0))) WHERE true GROUP BY "pg_merge_job_0016.task_000010".intermediate_column_16_0, "pg_merge_job_0016.task_000010".intermediate_column_16_1 HAVING ((any_value("pg_merge_job_0016.task_000010".intermediate_column_16_2))::double precision OPERATOR(pg_catalog.>) random())" +DEBUG: generated sql query for task 9 +DETAIL: query string: "SELECT "pg_merge_job_0016.task_000015".intermediate_column_16_0 AS s_i_id, "pg_merge_job_0016.task_000015".intermediate_column_16_1 AS worker_column_2, any_value("pg_merge_job_0016.task_000015".intermediate_column_16_2) AS worker_column_3, any_value("pg_merge_job_0016.task_000015".intermediate_column_16_2) AS worker_column_4 FROM (pg_merge_job_0016.task_000015 "pg_merge_job_0016.task_000015" JOIN pg_merge_job_0017.task_000015 "pg_merge_job_0017.task_000015" ON (("pg_merge_job_0017.task_000015".intermediate_column_17_0 OPERATOR(pg_catalog.=) "pg_merge_job_0016.task_000015".intermediate_column_16_0))) WHERE true GROUP BY "pg_merge_job_0016.task_000015".intermediate_column_16_0, "pg_merge_job_0016.task_000015".intermediate_column_16_1 HAVING ((any_value("pg_merge_job_0016.task_000015".intermediate_column_16_2))::double precision OPERATOR(pg_catalog.>) random())" +DEBUG: generated sql query for task 12 +DETAIL: query string: "SELECT "pg_merge_job_0016.task_000020".intermediate_column_16_0 AS s_i_id, "pg_merge_job_0016.task_000020".intermediate_column_16_1 AS worker_column_2, any_value("pg_merge_job_0016.task_000020".intermediate_column_16_2) AS worker_column_3, any_value("pg_merge_job_0016.task_000020".intermediate_column_16_2) AS worker_column_4 FROM (pg_merge_job_0016.task_000020 "pg_merge_job_0016.task_000020" JOIN pg_merge_job_0017.task_000020 "pg_merge_job_0017.task_000020" ON (("pg_merge_job_0017.task_000020".intermediate_column_17_0 OPERATOR(pg_catalog.=) "pg_merge_job_0016.task_000020".intermediate_column_16_0))) WHERE true GROUP BY "pg_merge_job_0016.task_000020".intermediate_column_16_0, "pg_merge_job_0016.task_000020".intermediate_column_16_1 HAVING ((any_value("pg_merge_job_0016.task_000020".intermediate_column_16_2))::double precision OPERATOR(pg_catalog.>) random())" +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 5 +DEBUG: pruning merge fetch taskId 2 +DETAIL: Creating dependency on merge taskId 5 +DEBUG: pruning merge fetch taskId 4 +DETAIL: Creating dependency on merge taskId 10 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 10 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 15 +DEBUG: pruning merge fetch taskId 8 +DETAIL: Creating dependency on merge taskId 15 +DEBUG: pruning merge fetch taskId 10 +DETAIL: Creating dependency on merge taskId 20 +DEBUG: pruning merge fetch taskId 11 +DETAIL: Creating dependency on merge taskId 20 +DEBUG: assigned task 3 to node localhost:57637 +DEBUG: assigned task 6 to node localhost:57638 +DEBUG: assigned task 9 to node localhost:57637 +DEBUG: assigned task 12 to node localhost:57638 +DEBUG: completed cleanup query for job 18 +DEBUG: completed cleanup query for job 18 +DEBUG: completed cleanup query for job 16 +DEBUG: completed cleanup query for job 16 +DEBUG: completed cleanup query for job 17 +DEBUG: completed cleanup query for job 17 + s_i_id +-------- +(0 rows) + -- Reset client logging level to its previous value SET client_min_messages TO NOTICE; COMMIT; +drop schema repartition_join; diff --git a/src/test/regress/sql/multi_repartition_join_planning.sql b/src/test/regress/sql/multi_repartition_join_planning.sql index 82c74e56b..a7bd65e7f 100644 --- a/src/test/regress/sql/multi_repartition_join_planning.sql +++ b/src/test/regress/sql/multi_repartition_join_planning.sql @@ -10,6 +10,29 @@ SET citus.next_shard_id TO 690000; SET citus.enable_unique_job_ids TO off; +create schema repartition_join; +DROP TABLE IF EXISTS repartition_join.order_line; +CREATE TABLE order_line ( + ol_w_id int NOT NULL, + ol_d_id int NOT NULL, + ol_o_id int NOT NULL, + ol_number int NOT NULL, + ol_i_id int NOT NULL, + ol_quantity decimal(2,0) NOT NULL, + PRIMARY KEY (ol_w_id,ol_d_id,ol_o_id,ol_number) +); + +DROP TABLE IF EXISTS repartition_join.stock; +CREATE TABLE stock ( + s_w_id int NOT NULL, + s_i_id int NOT NULL, + s_quantity decimal(4,0) NOT NULL, + PRIMARY KEY (s_w_id,s_i_id) +); + +SELECT create_distributed_table('order_line','ol_w_id'); +SELECT create_distributed_table('stock','s_w_id'); + BEGIN; SET client_min_messages TO DEBUG4; SET citus.task_executor_type TO 'task-tracker'; @@ -52,7 +75,59 @@ GROUP BY ORDER BY l_partkey, o_orderkey; +-- Check that grouping by primary key allows o_shippriority to be in the target list +SELECT + o_orderkey, o_shippriority, count(*) +FROM + lineitem, orders +WHERE + l_suppkey = o_shippriority +GROUP BY + o_orderkey +ORDER BY + o_orderkey; + +-- Check that grouping by primary key allows o_shippriority to be in the target +-- list +-- Postgres removes o_shippriority from the group by clause here +SELECT + o_orderkey, o_shippriority, count(*) +FROM + lineitem, orders +WHERE + l_suppkey = o_shippriority +GROUP BY + o_orderkey, o_shippriority +ORDER BY + o_orderkey; + +-- Check that calling any_value manually works as well +SELECT + o_orderkey, any_value(o_shippriority) +FROM + lineitem, orders +WHERE + l_suppkey = o_shippriority +GROUP BY + o_orderkey, o_shippriority +ORDER BY + o_orderkey; + + +-- Check that grouping by primary key allows s_quantity to be in the having +-- list +-- Postgres removes s_quantity from the group by clause here + +select s_i_id + from stock, order_line + where ol_i_id=s_i_id + group by s_i_id, s_w_id, s_quantity + having s_quantity > random() +; + -- Reset client logging level to its previous value SET client_min_messages TO NOTICE; COMMIT; + +drop schema repartition_join;