diff --git a/src/backend/distributed/planner/multi_logical_optimizer.c b/src/backend/distributed/planner/multi_logical_optimizer.c index 948c10fae..0da294be9 100644 --- a/src/backend/distributed/planner/multi_logical_optimizer.c +++ b/src/backend/distributed/planner/multi_logical_optimizer.c @@ -1847,6 +1847,7 @@ MasterAggregateExpression(Aggref *originalAggregate, */ Var *column = NULL; TargetEntry *columnTargetEntry = NULL; + Aggref *newMasterAggregate = NULL; /* worker aggregate and original aggregate have the same return type */ Oid workerReturnType = exprType((Node *) originalAggregate); @@ -1857,7 +1858,16 @@ MasterAggregateExpression(Aggref *originalAggregate, Oid aggregateFunctionId = AggregateFunctionOid(aggregateName, workerReturnType); Oid masterReturnType = get_func_rettype(aggregateFunctionId); - Aggref *newMasterAggregate = copyObject(originalAggregate); + /* + * If return type aggregate is anyelement, it's actual return type is + * determined on the type of its argument. So we replace it with the + * argument type in that case. + */ + if (masterReturnType == ANYELEMENTOID) + { + masterReturnType = workerReturnType; + } + newMasterAggregate = copyObject(originalAggregate); newMasterAggregate->aggdistinct = NULL; newMasterAggregate->aggfnoid = aggregateFunctionId; newMasterAggregate->aggtype = masterReturnType; @@ -2968,7 +2978,8 @@ AggregateFunctionOid(const char *functionName, Oid inputType) if (argumentCount == 1) { /* check if input type and found value type match */ - if (procForm->proargtypes.values[0] == inputType) + if (procForm->proargtypes.values[0] == inputType || + procForm->proargtypes.values[0] == ANYELEMENTOID) { #if PG_VERSION_NUM < 120000 functionOid = HeapTupleGetOid(heapTuple); diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index aa50b3e01..9da71ec31 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -24,6 +24,7 @@ #include "access/nbtree.h" #include "access/skey.h" #include "access/xlog.h" +#include "catalog/pg_aggregate.h" #include "catalog/pg_am.h" #include "catalog/pg_operator.h" #include "catalog/pg_type.h" @@ -203,6 +204,7 @@ static StringInfo IntermediateTableQueryString(uint64 jobId, uint32 taskIdIndex, static uint32 FinalTargetEntryCount(List *targetEntryList); static bool CoPlacedShardIntervals(ShardInterval *firstInterval, ShardInterval *secondInterval); +static Node * AddAnyValueAggregates(Node *node, void *context); /* @@ -593,6 +595,7 @@ BuildJobQuery(MultiNode *multiNode, List *dependedJobList) Node *havingQual = NULL; bool hasDistinctOn = false; List *distinctClause = NIL; + bool isRepartitionJoin = false; /* we start building jobs from below the collect node */ Assert(!CitusIsA(multiNode, MultiCollect)); @@ -623,6 +626,7 @@ BuildJobQuery(MultiNode *multiNode, List *dependedJobList) if (CitusIsA(job, MapMergeJob)) { MapMergeJob *mapMergeJob = (MapMergeJob *) job; + isRepartitionJoin = true; if (mapMergeJob->reduceQuery) { updateColumnAttributes = false; @@ -672,6 +676,7 @@ BuildJobQuery(MultiNode *multiNode, List *dependedJobList) /* build group clauses */ groupClauseList = QueryGroupClauseList(multiNode); + /* build the where clause list using select predicates */ selectClauseList = QuerySelectClauseList(multiNode); @@ -683,6 +688,23 @@ BuildJobQuery(MultiNode *multiNode, List *dependedJobList) UpdateAllColumnAttributes(havingQual, rangeTableList, dependedJobList); } + /* + * Group by on primary key allows all columns to appear in the target + * list, but after re-partitioning we will be querying an intermediate + * table that does not have the primary key. We therefore wrap all the + * columns that do not appear in the GROUP BY in an any_value aggregate. + */ + if (groupClauseList != NIL && isRepartitionJoin) + { + targetList = (List *) expression_tree_mutator((Node *) targetList, + AddAnyValueAggregates, + groupClauseList); + + havingQual = expression_tree_mutator((Node *) havingQual, + AddAnyValueAggregates, + groupClauseList); + } + /* * Build the From/Where construct. We keep the where-clause list implicitly * AND'd, since both partition and join pruning depends on the clauses being @@ -955,6 +977,63 @@ TargetEntryList(List *expressionList) } +/* + * AddAnyValueAggregates wraps all vars that do not apear in the GROUP BY + * clause or are inside an aggregate function in an any_value aggregate + * function. This is needed for repartition joins because primary keys are not + * present on intermediate tables. + */ +static Node * +AddAnyValueAggregates(Node *node, void *context) +{ + List *groupClauseList = context; + if (node == NULL) + { + return node; + } + + if (IsA(node, Var)) + { + Var *var = (Var *) node; + Aggref *agg = makeNode(Aggref); + agg->aggfnoid = CitusAnyValueFunctionId(); + agg->aggtype = var->vartype; + agg->args = list_make1(makeTargetEntry((Expr *) var, 1, NULL, false)); + agg->aggkind = AGGKIND_NORMAL; + agg->aggtranstype = InvalidOid; + agg->aggargtypes = list_make1_oid(var->vartype); + agg->aggsplit = AGGSPLIT_SIMPLE; + return (Node *) agg; + } + if (IsA(node, TargetEntry)) + { + TargetEntry *targetEntry = (TargetEntry *) node; + + + /* + * Stop searching this part of the tree if the targetEntry is part of + * the group by clause. + */ + if (targetEntry->ressortgroupref != 0) + { + SortGroupClause *sortGroupClause = NULL; + foreach_ptr(sortGroupClause, groupClauseList) + { + if (sortGroupClause->tleSortGroupRef == targetEntry->ressortgroupref) + { + return node; + } + } + } + } + if (IsA(node, Aggref)) + { + return node; + } + return expression_tree_mutator(node, AddAnyValueAggregates, context); +} + + /* * QueryGroupClauseList extracts the group clause list from the logical plan. If * no grouping clauses exist, the function returns an empty list. @@ -1489,6 +1568,7 @@ BuildSubqueryJobQuery(MultiNode *multiNode) distinctClause = NIL; } + /* * Build the From/Where construct. We keep the where-clause list implicitly * AND'd, since both partition and join pruning depends on the clauses being diff --git a/src/backend/distributed/sql/citus--9.0-2--9.1-1.sql b/src/backend/distributed/sql/citus--9.0-2--9.1-1.sql index 4da3f4560..ef538bb3e 100644 --- a/src/backend/distributed/sql/citus--9.0-2--9.1-1.sql +++ b/src/backend/distributed/sql/citus--9.0-2--9.1-1.sql @@ -8,3 +8,5 @@ COMMENT ON COLUMN pg_catalog.pg_dist_node.shouldhaveshards IS -- we don't maintain replication factor of reference tables anymore and just -- use -1 instead. UPDATE pg_dist_colocation SET replicationfactor = -1 WHERE distributioncolumntype = 0; + +#include "udfs/any_value/9.1-1.sql" diff --git a/src/backend/distributed/sql/udfs/any_value/9.1-1.sql b/src/backend/distributed/sql/udfs/any_value/9.1-1.sql new file mode 100644 index 000000000..7eb9fdb25 --- /dev/null +++ b/src/backend/distributed/sql/udfs/any_value/9.1-1.sql @@ -0,0 +1,14 @@ +CREATE OR REPLACE FUNCTION pg_catalog.any_value_agg ( anyelement, anyelement ) +RETURNS anyelement AS $$ + SELECT CASE WHEN $1 IS NULL THEN $2 ELSE $1 END; +$$ LANGUAGE SQL STABLE; + +CREATE AGGREGATE pg_catalog.any_value ( + sfunc = pg_catalog.any_value_agg, + combinefunc = pg_catalog.any_value_agg, + basetype = anyelement, + stype = anyelement +); +COMMENT ON AGGREGATE pg_catalog.any_value(anyelement) IS + 'Returns the value of any row in the group. It is mostly useful when you know there will be only 1 element.'; + diff --git a/src/backend/distributed/sql/udfs/any_value/latest.sql b/src/backend/distributed/sql/udfs/any_value/latest.sql new file mode 100644 index 000000000..7eb9fdb25 --- /dev/null +++ b/src/backend/distributed/sql/udfs/any_value/latest.sql @@ -0,0 +1,14 @@ +CREATE OR REPLACE FUNCTION pg_catalog.any_value_agg ( anyelement, anyelement ) +RETURNS anyelement AS $$ + SELECT CASE WHEN $1 IS NULL THEN $2 ELSE $1 END; +$$ LANGUAGE SQL STABLE; + +CREATE AGGREGATE pg_catalog.any_value ( + sfunc = pg_catalog.any_value_agg, + combinefunc = pg_catalog.any_value_agg, + basetype = anyelement, + stype = anyelement +); +COMMENT ON AGGREGATE pg_catalog.any_value(anyelement) IS + 'Returns the value of any row in the group. It is mostly useful when you know there will be only 1 element.'; + diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 7a75c919a..9f952e4ea 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -136,6 +136,7 @@ typedef struct MetadataCacheData Oid readIntermediateResultFuncId; Oid extraDataContainerFuncId; Oid workerHashFunctionId; + Oid anyValueFunctionId; Oid textSendAsJsonbFunctionId; Oid extensionOwner; Oid binaryCopyFormatId; @@ -2216,6 +2217,21 @@ CitusWorkerHashFunctionId(void) } +/* return oid of the any_value aggregate function */ +Oid +CitusAnyValueFunctionId(void) +{ + if (MetadataCache.anyValueFunctionId == InvalidOid) + { + const int argCount = 1; + MetadataCache.anyValueFunctionId = + FunctionOid("pg_catalog", "any_value", argCount); + } + + return MetadataCache.anyValueFunctionId; +} + + /* return oid of the citus_text_send_as_jsonb(text) function */ Oid CitusTextSendAsJsonbFunctionId(void) diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index f590a6035..5b5b746f8 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -188,6 +188,7 @@ extern Oid CitusCopyFormatTypeId(void); extern Oid CitusReadIntermediateResultFuncId(void); extern Oid CitusExtraDataContainerFuncId(void); extern Oid CitusWorkerHashFunctionId(void); +extern Oid CitusAnyValueFunctionId(void); extern Oid CitusTextSendAsJsonbFunctionId(void); extern Oid PgTableVisibleFuncId(void); extern Oid CitusTableVisibleFuncId(void); diff --git a/src/include/distributed/multi_logical_optimizer.h b/src/include/distributed/multi_logical_optimizer.h index 7ef725b57..46c7e422d 100644 --- a/src/include/distributed/multi_logical_optimizer.h +++ b/src/include/distributed/multi_logical_optimizer.h @@ -75,7 +75,8 @@ typedef enum AGGREGATE_HLL_ADD = 16, AGGREGATE_HLL_UNION = 17, AGGREGATE_TOPN_ADD_AGG = 18, - AGGREGATE_TOPN_UNION_AGG = 19 + AGGREGATE_TOPN_UNION_AGG = 19, + AGGREGATE_ANY_VALUE = 20 } AggregateType; @@ -122,7 +123,8 @@ static const char *const AggregateNames[] = { "json_agg", "json_object_agg", "bit_and", "bit_or", "bool_and", "bool_or", "every", "hll_add_agg", "hll_union_agg", - "topn_add_agg", "topn_union_agg" + "topn_add_agg", "topn_union_agg", + "any_value" }; diff --git a/src/test/regress/expected/multi_repartition_join_planning.out b/src/test/regress/expected/multi_repartition_join_planning.out index 3b34f42f0..5fd294540 100644 --- a/src/test/regress/expected/multi_repartition_join_planning.out +++ b/src/test/regress/expected/multi_repartition_join_planning.out @@ -7,6 +7,38 @@ -- executor here, as we cannot run repartition jobs with real time executor. SET citus.next_shard_id TO 690000; SET citus.enable_unique_job_ids TO off; +create schema repartition_join; +DROP TABLE IF EXISTS repartition_join.order_line; +NOTICE: table "order_line" does not exist, skipping +CREATE TABLE order_line ( + ol_w_id int NOT NULL, + ol_d_id int NOT NULL, + ol_o_id int NOT NULL, + ol_number int NOT NULL, + ol_i_id int NOT NULL, + ol_quantity decimal(2,0) NOT NULL, + PRIMARY KEY (ol_w_id,ol_d_id,ol_o_id,ol_number) +); +DROP TABLE IF EXISTS repartition_join.stock; +NOTICE: table "stock" does not exist, skipping +CREATE TABLE stock ( + s_w_id int NOT NULL, + s_i_id int NOT NULL, + s_quantity decimal(4,0) NOT NULL, + PRIMARY KEY (s_w_id,s_i_id) +); +SELECT create_distributed_table('order_line','ol_w_id'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT create_distributed_table('stock','s_w_id'); + create_distributed_table +-------------------------- + +(1 row) + BEGIN; SET client_min_messages TO DEBUG4; SET citus.task_executor_type TO 'task-tracker'; @@ -187,6 +219,315 @@ DEBUG: completed cleanup query for job 5 -----------+------------+------- (0 rows) +-- Check that grouping by primary key allows o_shippriority to be in the target list +SELECT + o_orderkey, o_shippriority, count(*) +FROM + lineitem, orders +WHERE + l_suppkey = o_shippriority +GROUP BY + o_orderkey +ORDER BY + o_orderkey; +DEBUG: Router planner does not support append-partitioned tables. +DEBUG: generated sql query for task 1 +DETAIL: query string: "SELECT l_suppkey FROM lineitem_290000 lineitem WHERE true" +DEBUG: generated sql query for task 2 +DETAIL: query string: "SELECT l_suppkey FROM lineitem_290001 lineitem WHERE true" +DEBUG: assigned task 2 to node localhost:57637 +DEBUG: assigned task 1 to node localhost:57638 +DEBUG: generated sql query for task 1 +DETAIL: query string: "SELECT o_orderkey, o_shippriority FROM orders_290002 orders WHERE true" +DEBUG: generated sql query for task 2 +DETAIL: query string: "SELECT o_orderkey, o_shippriority FROM orders_290003 orders WHERE true" +DEBUG: assigned task 2 to node localhost:57637 +DEBUG: assigned task 1 to node localhost:57638 +DEBUG: join prunable for task partitionId 0 and 1 +DEBUG: join prunable for task partitionId 0 and 2 +DEBUG: join prunable for task partitionId 0 and 3 +DEBUG: join prunable for task partitionId 1 and 0 +DEBUG: join prunable for task partitionId 1 and 2 +DEBUG: join prunable for task partitionId 1 and 3 +DEBUG: join prunable for task partitionId 2 and 0 +DEBUG: join prunable for task partitionId 2 and 1 +DEBUG: join prunable for task partitionId 2 and 3 +DEBUG: join prunable for task partitionId 3 and 0 +DEBUG: join prunable for task partitionId 3 and 1 +DEBUG: join prunable for task partitionId 3 and 2 +DEBUG: generated sql query for task 3 +DETAIL: query string: "SELECT "pg_merge_job_0008.task_000003".intermediate_column_8_0 AS o_orderkey, any_value("pg_merge_job_0008.task_000003".intermediate_column_8_1) AS o_shippriority, count(*) AS count FROM (pg_merge_job_0007.task_000003 "pg_merge_job_0007.task_000003" JOIN pg_merge_job_0008.task_000003 "pg_merge_job_0008.task_000003" ON (("pg_merge_job_0007.task_000003".intermediate_column_7_0 OPERATOR(pg_catalog.=) "pg_merge_job_0008.task_000003".intermediate_column_8_1))) WHERE true GROUP BY "pg_merge_job_0008.task_000003".intermediate_column_8_0" +DEBUG: generated sql query for task 6 +DETAIL: query string: "SELECT "pg_merge_job_0008.task_000006".intermediate_column_8_0 AS o_orderkey, any_value("pg_merge_job_0008.task_000006".intermediate_column_8_1) AS o_shippriority, count(*) AS count FROM (pg_merge_job_0007.task_000006 "pg_merge_job_0007.task_000006" JOIN pg_merge_job_0008.task_000006 "pg_merge_job_0008.task_000006" ON (("pg_merge_job_0007.task_000006".intermediate_column_7_0 OPERATOR(pg_catalog.=) "pg_merge_job_0008.task_000006".intermediate_column_8_1))) WHERE true GROUP BY "pg_merge_job_0008.task_000006".intermediate_column_8_0" +DEBUG: generated sql query for task 9 +DETAIL: query string: "SELECT "pg_merge_job_0008.task_000009".intermediate_column_8_0 AS o_orderkey, any_value("pg_merge_job_0008.task_000009".intermediate_column_8_1) AS o_shippriority, count(*) AS count FROM (pg_merge_job_0007.task_000009 "pg_merge_job_0007.task_000009" JOIN pg_merge_job_0008.task_000009 "pg_merge_job_0008.task_000009" ON (("pg_merge_job_0007.task_000009".intermediate_column_7_0 OPERATOR(pg_catalog.=) "pg_merge_job_0008.task_000009".intermediate_column_8_1))) WHERE true GROUP BY "pg_merge_job_0008.task_000009".intermediate_column_8_0" +DEBUG: generated sql query for task 12 +DETAIL: query string: "SELECT "pg_merge_job_0008.task_000012".intermediate_column_8_0 AS o_orderkey, any_value("pg_merge_job_0008.task_000012".intermediate_column_8_1) AS o_shippriority, count(*) AS count FROM (pg_merge_job_0007.task_000012 "pg_merge_job_0007.task_000012" JOIN pg_merge_job_0008.task_000012 "pg_merge_job_0008.task_000012" ON (("pg_merge_job_0007.task_000012".intermediate_column_7_0 OPERATOR(pg_catalog.=) "pg_merge_job_0008.task_000012".intermediate_column_8_1))) WHERE true GROUP BY "pg_merge_job_0008.task_000012".intermediate_column_8_0" +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 3 +DEBUG: pruning merge fetch taskId 2 +DETAIL: Creating dependency on merge taskId 3 +DEBUG: pruning merge fetch taskId 4 +DETAIL: Creating dependency on merge taskId 6 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 6 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 9 +DEBUG: pruning merge fetch taskId 8 +DETAIL: Creating dependency on merge taskId 9 +DEBUG: pruning merge fetch taskId 10 +DETAIL: Creating dependency on merge taskId 12 +DEBUG: pruning merge fetch taskId 11 +DETAIL: Creating dependency on merge taskId 12 +DEBUG: assigned task 3 to node localhost:57638 +DEBUG: assigned task 6 to node localhost:57637 +DEBUG: assigned task 9 to node localhost:57638 +DEBUG: assigned task 12 to node localhost:57637 +DEBUG: completed cleanup query for job 9 +DEBUG: completed cleanup query for job 9 +DEBUG: completed cleanup query for job 7 +DEBUG: completed cleanup query for job 7 +DEBUG: completed cleanup query for job 8 +DEBUG: completed cleanup query for job 8 + o_orderkey | o_shippriority | count +------------+----------------+------- +(0 rows) + +-- Check that grouping by primary key allows o_shippriority to be in the target +-- list +-- Postgres removes o_shippriority from the group by clause here +SELECT + o_orderkey, o_shippriority, count(*) +FROM + lineitem, orders +WHERE + l_suppkey = o_shippriority +GROUP BY + o_orderkey, o_shippriority +ORDER BY + o_orderkey; +DEBUG: Router planner does not support append-partitioned tables. +DEBUG: generated sql query for task 1 +DETAIL: query string: "SELECT l_suppkey FROM lineitem_290000 lineitem WHERE true" +DEBUG: generated sql query for task 2 +DETAIL: query string: "SELECT l_suppkey FROM lineitem_290001 lineitem WHERE true" +DEBUG: assigned task 2 to node localhost:57637 +DEBUG: assigned task 1 to node localhost:57638 +DEBUG: generated sql query for task 1 +DETAIL: query string: "SELECT o_orderkey, o_shippriority FROM orders_290002 orders WHERE true" +DEBUG: generated sql query for task 2 +DETAIL: query string: "SELECT o_orderkey, o_shippriority FROM orders_290003 orders WHERE true" +DEBUG: assigned task 2 to node localhost:57637 +DEBUG: assigned task 1 to node localhost:57638 +DEBUG: join prunable for task partitionId 0 and 1 +DEBUG: join prunable for task partitionId 0 and 2 +DEBUG: join prunable for task partitionId 0 and 3 +DEBUG: join prunable for task partitionId 1 and 0 +DEBUG: join prunable for task partitionId 1 and 2 +DEBUG: join prunable for task partitionId 1 and 3 +DEBUG: join prunable for task partitionId 2 and 0 +DEBUG: join prunable for task partitionId 2 and 1 +DEBUG: join prunable for task partitionId 2 and 3 +DEBUG: join prunable for task partitionId 3 and 0 +DEBUG: join prunable for task partitionId 3 and 1 +DEBUG: join prunable for task partitionId 3 and 2 +DEBUG: generated sql query for task 3 +DETAIL: query string: "SELECT "pg_merge_job_0011.task_000003".intermediate_column_11_0 AS o_orderkey, any_value("pg_merge_job_0011.task_000003".intermediate_column_11_1) AS o_shippriority, count(*) AS count FROM (pg_merge_job_0010.task_000003 "pg_merge_job_0010.task_000003" JOIN pg_merge_job_0011.task_000003 "pg_merge_job_0011.task_000003" ON (("pg_merge_job_0010.task_000003".intermediate_column_10_0 OPERATOR(pg_catalog.=) "pg_merge_job_0011.task_000003".intermediate_column_11_1))) WHERE true GROUP BY "pg_merge_job_0011.task_000003".intermediate_column_11_0" +DEBUG: generated sql query for task 6 +DETAIL: query string: "SELECT "pg_merge_job_0011.task_000006".intermediate_column_11_0 AS o_orderkey, any_value("pg_merge_job_0011.task_000006".intermediate_column_11_1) AS o_shippriority, count(*) AS count FROM (pg_merge_job_0010.task_000006 "pg_merge_job_0010.task_000006" JOIN pg_merge_job_0011.task_000006 "pg_merge_job_0011.task_000006" ON (("pg_merge_job_0010.task_000006".intermediate_column_10_0 OPERATOR(pg_catalog.=) "pg_merge_job_0011.task_000006".intermediate_column_11_1))) WHERE true GROUP BY "pg_merge_job_0011.task_000006".intermediate_column_11_0" +DEBUG: generated sql query for task 9 +DETAIL: query string: "SELECT "pg_merge_job_0011.task_000009".intermediate_column_11_0 AS o_orderkey, any_value("pg_merge_job_0011.task_000009".intermediate_column_11_1) AS o_shippriority, count(*) AS count FROM (pg_merge_job_0010.task_000009 "pg_merge_job_0010.task_000009" JOIN pg_merge_job_0011.task_000009 "pg_merge_job_0011.task_000009" ON (("pg_merge_job_0010.task_000009".intermediate_column_10_0 OPERATOR(pg_catalog.=) "pg_merge_job_0011.task_000009".intermediate_column_11_1))) WHERE true GROUP BY "pg_merge_job_0011.task_000009".intermediate_column_11_0" +DEBUG: generated sql query for task 12 +DETAIL: query string: "SELECT "pg_merge_job_0011.task_000012".intermediate_column_11_0 AS o_orderkey, any_value("pg_merge_job_0011.task_000012".intermediate_column_11_1) AS o_shippriority, count(*) AS count FROM (pg_merge_job_0010.task_000012 "pg_merge_job_0010.task_000012" JOIN pg_merge_job_0011.task_000012 "pg_merge_job_0011.task_000012" ON (("pg_merge_job_0010.task_000012".intermediate_column_10_0 OPERATOR(pg_catalog.=) "pg_merge_job_0011.task_000012".intermediate_column_11_1))) WHERE true GROUP BY "pg_merge_job_0011.task_000012".intermediate_column_11_0" +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 3 +DEBUG: pruning merge fetch taskId 2 +DETAIL: Creating dependency on merge taskId 3 +DEBUG: pruning merge fetch taskId 4 +DETAIL: Creating dependency on merge taskId 6 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 6 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 9 +DEBUG: pruning merge fetch taskId 8 +DETAIL: Creating dependency on merge taskId 9 +DEBUG: pruning merge fetch taskId 10 +DETAIL: Creating dependency on merge taskId 12 +DEBUG: pruning merge fetch taskId 11 +DETAIL: Creating dependency on merge taskId 12 +DEBUG: assigned task 3 to node localhost:57637 +DEBUG: assigned task 6 to node localhost:57638 +DEBUG: assigned task 9 to node localhost:57637 +DEBUG: assigned task 12 to node localhost:57638 +DEBUG: completed cleanup query for job 12 +DEBUG: completed cleanup query for job 12 +DEBUG: completed cleanup query for job 10 +DEBUG: completed cleanup query for job 10 +DEBUG: completed cleanup query for job 11 +DEBUG: completed cleanup query for job 11 + o_orderkey | o_shippriority | count +------------+----------------+------- +(0 rows) + +-- Check that calling any_value manually works as well +SELECT + o_orderkey, any_value(o_shippriority) +FROM + lineitem, orders +WHERE + l_suppkey = o_shippriority +GROUP BY + o_orderkey, o_shippriority +ORDER BY + o_orderkey; +DEBUG: Router planner does not support append-partitioned tables. +DEBUG: generated sql query for task 1 +DETAIL: query string: "SELECT l_suppkey FROM lineitem_290000 lineitem WHERE true" +DEBUG: generated sql query for task 2 +DETAIL: query string: "SELECT l_suppkey FROM lineitem_290001 lineitem WHERE true" +DEBUG: assigned task 2 to node localhost:57637 +DEBUG: assigned task 1 to node localhost:57638 +DEBUG: generated sql query for task 1 +DETAIL: query string: "SELECT o_orderkey, o_shippriority FROM orders_290002 orders WHERE true" +DEBUG: generated sql query for task 2 +DETAIL: query string: "SELECT o_orderkey, o_shippriority FROM orders_290003 orders WHERE true" +DEBUG: assigned task 2 to node localhost:57637 +DEBUG: assigned task 1 to node localhost:57638 +DEBUG: join prunable for task partitionId 0 and 1 +DEBUG: join prunable for task partitionId 0 and 2 +DEBUG: join prunable for task partitionId 0 and 3 +DEBUG: join prunable for task partitionId 1 and 0 +DEBUG: join prunable for task partitionId 1 and 2 +DEBUG: join prunable for task partitionId 1 and 3 +DEBUG: join prunable for task partitionId 2 and 0 +DEBUG: join prunable for task partitionId 2 and 1 +DEBUG: join prunable for task partitionId 2 and 3 +DEBUG: join prunable for task partitionId 3 and 0 +DEBUG: join prunable for task partitionId 3 and 1 +DEBUG: join prunable for task partitionId 3 and 2 +DEBUG: generated sql query for task 3 +DETAIL: query string: "SELECT "pg_merge_job_0014.task_000003".intermediate_column_14_0 AS o_orderkey, any_value("pg_merge_job_0014.task_000003".intermediate_column_14_1) AS any_value, any_value("pg_merge_job_0014.task_000003".intermediate_column_14_1) AS worker_column_3 FROM (pg_merge_job_0013.task_000003 "pg_merge_job_0013.task_000003" JOIN pg_merge_job_0014.task_000003 "pg_merge_job_0014.task_000003" ON (("pg_merge_job_0013.task_000003".intermediate_column_13_0 OPERATOR(pg_catalog.=) "pg_merge_job_0014.task_000003".intermediate_column_14_1))) WHERE true GROUP BY "pg_merge_job_0014.task_000003".intermediate_column_14_0" +DEBUG: generated sql query for task 6 +DETAIL: query string: "SELECT "pg_merge_job_0014.task_000006".intermediate_column_14_0 AS o_orderkey, any_value("pg_merge_job_0014.task_000006".intermediate_column_14_1) AS any_value, any_value("pg_merge_job_0014.task_000006".intermediate_column_14_1) AS worker_column_3 FROM (pg_merge_job_0013.task_000006 "pg_merge_job_0013.task_000006" JOIN pg_merge_job_0014.task_000006 "pg_merge_job_0014.task_000006" ON (("pg_merge_job_0013.task_000006".intermediate_column_13_0 OPERATOR(pg_catalog.=) "pg_merge_job_0014.task_000006".intermediate_column_14_1))) WHERE true GROUP BY "pg_merge_job_0014.task_000006".intermediate_column_14_0" +DEBUG: generated sql query for task 9 +DETAIL: query string: "SELECT "pg_merge_job_0014.task_000009".intermediate_column_14_0 AS o_orderkey, any_value("pg_merge_job_0014.task_000009".intermediate_column_14_1) AS any_value, any_value("pg_merge_job_0014.task_000009".intermediate_column_14_1) AS worker_column_3 FROM (pg_merge_job_0013.task_000009 "pg_merge_job_0013.task_000009" JOIN pg_merge_job_0014.task_000009 "pg_merge_job_0014.task_000009" ON (("pg_merge_job_0013.task_000009".intermediate_column_13_0 OPERATOR(pg_catalog.=) "pg_merge_job_0014.task_000009".intermediate_column_14_1))) WHERE true GROUP BY "pg_merge_job_0014.task_000009".intermediate_column_14_0" +DEBUG: generated sql query for task 12 +DETAIL: query string: "SELECT "pg_merge_job_0014.task_000012".intermediate_column_14_0 AS o_orderkey, any_value("pg_merge_job_0014.task_000012".intermediate_column_14_1) AS any_value, any_value("pg_merge_job_0014.task_000012".intermediate_column_14_1) AS worker_column_3 FROM (pg_merge_job_0013.task_000012 "pg_merge_job_0013.task_000012" JOIN pg_merge_job_0014.task_000012 "pg_merge_job_0014.task_000012" ON (("pg_merge_job_0013.task_000012".intermediate_column_13_0 OPERATOR(pg_catalog.=) "pg_merge_job_0014.task_000012".intermediate_column_14_1))) WHERE true GROUP BY "pg_merge_job_0014.task_000012".intermediate_column_14_0" +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 3 +DEBUG: pruning merge fetch taskId 2 +DETAIL: Creating dependency on merge taskId 3 +DEBUG: pruning merge fetch taskId 4 +DETAIL: Creating dependency on merge taskId 6 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 6 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 9 +DEBUG: pruning merge fetch taskId 8 +DETAIL: Creating dependency on merge taskId 9 +DEBUG: pruning merge fetch taskId 10 +DETAIL: Creating dependency on merge taskId 12 +DEBUG: pruning merge fetch taskId 11 +DETAIL: Creating dependency on merge taskId 12 +DEBUG: assigned task 3 to node localhost:57638 +DEBUG: assigned task 6 to node localhost:57637 +DEBUG: assigned task 9 to node localhost:57638 +DEBUG: assigned task 12 to node localhost:57637 +DEBUG: completed cleanup query for job 15 +DEBUG: completed cleanup query for job 15 +DEBUG: completed cleanup query for job 13 +DEBUG: completed cleanup query for job 13 +DEBUG: completed cleanup query for job 14 +DEBUG: completed cleanup query for job 14 + o_orderkey | any_value +------------+----------- +(0 rows) + +-- Check that grouping by primary key allows s_quantity to be in the having +-- list +-- Postgres removes s_quantity from the group by clause here +select s_i_id + from stock, order_line + where ol_i_id=s_i_id + group by s_i_id, s_w_id, s_quantity + having s_quantity > random() +; +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: generated sql query for task 1 +DETAIL: query string: "SELECT s_i_id, s_w_id, s_quantity FROM stock_690004 stock WHERE true" +DEBUG: generated sql query for task 2 +DETAIL: query string: "SELECT s_i_id, s_w_id, s_quantity FROM stock_690005 stock WHERE true" +DEBUG: generated sql query for task 3 +DETAIL: query string: "SELECT s_i_id, s_w_id, s_quantity FROM stock_690006 stock WHERE true" +DEBUG: generated sql query for task 4 +DETAIL: query string: "SELECT s_i_id, s_w_id, s_quantity FROM stock_690007 stock WHERE true" +DEBUG: assigned task 1 to node localhost:57637 +DEBUG: assigned task 2 to node localhost:57638 +DEBUG: assigned task 3 to node localhost:57637 +DEBUG: assigned task 4 to node localhost:57638 +DEBUG: generated sql query for task 1 +DETAIL: query string: "SELECT ol_i_id FROM order_line_690000 order_line WHERE true" +DEBUG: generated sql query for task 2 +DETAIL: query string: "SELECT ol_i_id FROM order_line_690001 order_line WHERE true" +DEBUG: generated sql query for task 3 +DETAIL: query string: "SELECT ol_i_id FROM order_line_690002 order_line WHERE true" +DEBUG: generated sql query for task 4 +DETAIL: query string: "SELECT ol_i_id FROM order_line_690003 order_line WHERE true" +DEBUG: assigned task 1 to node localhost:57637 +DEBUG: assigned task 2 to node localhost:57638 +DEBUG: assigned task 3 to node localhost:57637 +DEBUG: assigned task 4 to node localhost:57638 +DEBUG: join prunable for task partitionId 0 and 1 +DEBUG: join prunable for task partitionId 0 and 2 +DEBUG: join prunable for task partitionId 0 and 3 +DEBUG: join prunable for task partitionId 1 and 0 +DEBUG: join prunable for task partitionId 1 and 2 +DEBUG: join prunable for task partitionId 1 and 3 +DEBUG: join prunable for task partitionId 2 and 0 +DEBUG: join prunable for task partitionId 2 and 1 +DEBUG: join prunable for task partitionId 2 and 3 +DEBUG: join prunable for task partitionId 3 and 0 +DEBUG: join prunable for task partitionId 3 and 1 +DEBUG: join prunable for task partitionId 3 and 2 +DEBUG: generated sql query for task 3 +DETAIL: query string: "SELECT "pg_merge_job_0016.task_000005".intermediate_column_16_0 AS s_i_id, "pg_merge_job_0016.task_000005".intermediate_column_16_1 AS worker_column_2, any_value("pg_merge_job_0016.task_000005".intermediate_column_16_2) AS worker_column_3, any_value("pg_merge_job_0016.task_000005".intermediate_column_16_2) AS worker_column_4 FROM (pg_merge_job_0016.task_000005 "pg_merge_job_0016.task_000005" JOIN pg_merge_job_0017.task_000005 "pg_merge_job_0017.task_000005" ON (("pg_merge_job_0017.task_000005".intermediate_column_17_0 OPERATOR(pg_catalog.=) "pg_merge_job_0016.task_000005".intermediate_column_16_0))) WHERE true GROUP BY "pg_merge_job_0016.task_000005".intermediate_column_16_0, "pg_merge_job_0016.task_000005".intermediate_column_16_1 HAVING ((any_value("pg_merge_job_0016.task_000005".intermediate_column_16_2))::double precision OPERATOR(pg_catalog.>) random())" +DEBUG: generated sql query for task 6 +DETAIL: query string: "SELECT "pg_merge_job_0016.task_000010".intermediate_column_16_0 AS s_i_id, "pg_merge_job_0016.task_000010".intermediate_column_16_1 AS worker_column_2, any_value("pg_merge_job_0016.task_000010".intermediate_column_16_2) AS worker_column_3, any_value("pg_merge_job_0016.task_000010".intermediate_column_16_2) AS worker_column_4 FROM (pg_merge_job_0016.task_000010 "pg_merge_job_0016.task_000010" JOIN pg_merge_job_0017.task_000010 "pg_merge_job_0017.task_000010" ON (("pg_merge_job_0017.task_000010".intermediate_column_17_0 OPERATOR(pg_catalog.=) "pg_merge_job_0016.task_000010".intermediate_column_16_0))) WHERE true GROUP BY "pg_merge_job_0016.task_000010".intermediate_column_16_0, "pg_merge_job_0016.task_000010".intermediate_column_16_1 HAVING ((any_value("pg_merge_job_0016.task_000010".intermediate_column_16_2))::double precision OPERATOR(pg_catalog.>) random())" +DEBUG: generated sql query for task 9 +DETAIL: query string: "SELECT "pg_merge_job_0016.task_000015".intermediate_column_16_0 AS s_i_id, "pg_merge_job_0016.task_000015".intermediate_column_16_1 AS worker_column_2, any_value("pg_merge_job_0016.task_000015".intermediate_column_16_2) AS worker_column_3, any_value("pg_merge_job_0016.task_000015".intermediate_column_16_2) AS worker_column_4 FROM (pg_merge_job_0016.task_000015 "pg_merge_job_0016.task_000015" JOIN pg_merge_job_0017.task_000015 "pg_merge_job_0017.task_000015" ON (("pg_merge_job_0017.task_000015".intermediate_column_17_0 OPERATOR(pg_catalog.=) "pg_merge_job_0016.task_000015".intermediate_column_16_0))) WHERE true GROUP BY "pg_merge_job_0016.task_000015".intermediate_column_16_0, "pg_merge_job_0016.task_000015".intermediate_column_16_1 HAVING ((any_value("pg_merge_job_0016.task_000015".intermediate_column_16_2))::double precision OPERATOR(pg_catalog.>) random())" +DEBUG: generated sql query for task 12 +DETAIL: query string: "SELECT "pg_merge_job_0016.task_000020".intermediate_column_16_0 AS s_i_id, "pg_merge_job_0016.task_000020".intermediate_column_16_1 AS worker_column_2, any_value("pg_merge_job_0016.task_000020".intermediate_column_16_2) AS worker_column_3, any_value("pg_merge_job_0016.task_000020".intermediate_column_16_2) AS worker_column_4 FROM (pg_merge_job_0016.task_000020 "pg_merge_job_0016.task_000020" JOIN pg_merge_job_0017.task_000020 "pg_merge_job_0017.task_000020" ON (("pg_merge_job_0017.task_000020".intermediate_column_17_0 OPERATOR(pg_catalog.=) "pg_merge_job_0016.task_000020".intermediate_column_16_0))) WHERE true GROUP BY "pg_merge_job_0016.task_000020".intermediate_column_16_0, "pg_merge_job_0016.task_000020".intermediate_column_16_1 HAVING ((any_value("pg_merge_job_0016.task_000020".intermediate_column_16_2))::double precision OPERATOR(pg_catalog.>) random())" +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 5 +DEBUG: pruning merge fetch taskId 2 +DETAIL: Creating dependency on merge taskId 5 +DEBUG: pruning merge fetch taskId 4 +DETAIL: Creating dependency on merge taskId 10 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 10 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 15 +DEBUG: pruning merge fetch taskId 8 +DETAIL: Creating dependency on merge taskId 15 +DEBUG: pruning merge fetch taskId 10 +DETAIL: Creating dependency on merge taskId 20 +DEBUG: pruning merge fetch taskId 11 +DETAIL: Creating dependency on merge taskId 20 +DEBUG: assigned task 3 to node localhost:57637 +DEBUG: assigned task 6 to node localhost:57638 +DEBUG: assigned task 9 to node localhost:57637 +DEBUG: assigned task 12 to node localhost:57638 +DEBUG: completed cleanup query for job 18 +DEBUG: completed cleanup query for job 18 +DEBUG: completed cleanup query for job 16 +DEBUG: completed cleanup query for job 16 +DEBUG: completed cleanup query for job 17 +DEBUG: completed cleanup query for job 17 + s_i_id +-------- +(0 rows) + -- Reset client logging level to its previous value SET client_min_messages TO NOTICE; COMMIT; +drop schema repartition_join; diff --git a/src/test/regress/sql/multi_repartition_join_planning.sql b/src/test/regress/sql/multi_repartition_join_planning.sql index 82c74e56b..a7bd65e7f 100644 --- a/src/test/regress/sql/multi_repartition_join_planning.sql +++ b/src/test/regress/sql/multi_repartition_join_planning.sql @@ -10,6 +10,29 @@ SET citus.next_shard_id TO 690000; SET citus.enable_unique_job_ids TO off; +create schema repartition_join; +DROP TABLE IF EXISTS repartition_join.order_line; +CREATE TABLE order_line ( + ol_w_id int NOT NULL, + ol_d_id int NOT NULL, + ol_o_id int NOT NULL, + ol_number int NOT NULL, + ol_i_id int NOT NULL, + ol_quantity decimal(2,0) NOT NULL, + PRIMARY KEY (ol_w_id,ol_d_id,ol_o_id,ol_number) +); + +DROP TABLE IF EXISTS repartition_join.stock; +CREATE TABLE stock ( + s_w_id int NOT NULL, + s_i_id int NOT NULL, + s_quantity decimal(4,0) NOT NULL, + PRIMARY KEY (s_w_id,s_i_id) +); + +SELECT create_distributed_table('order_line','ol_w_id'); +SELECT create_distributed_table('stock','s_w_id'); + BEGIN; SET client_min_messages TO DEBUG4; SET citus.task_executor_type TO 'task-tracker'; @@ -52,7 +75,59 @@ GROUP BY ORDER BY l_partkey, o_orderkey; +-- Check that grouping by primary key allows o_shippriority to be in the target list +SELECT + o_orderkey, o_shippriority, count(*) +FROM + lineitem, orders +WHERE + l_suppkey = o_shippriority +GROUP BY + o_orderkey +ORDER BY + o_orderkey; + +-- Check that grouping by primary key allows o_shippriority to be in the target +-- list +-- Postgres removes o_shippriority from the group by clause here +SELECT + o_orderkey, o_shippriority, count(*) +FROM + lineitem, orders +WHERE + l_suppkey = o_shippriority +GROUP BY + o_orderkey, o_shippriority +ORDER BY + o_orderkey; + +-- Check that calling any_value manually works as well +SELECT + o_orderkey, any_value(o_shippriority) +FROM + lineitem, orders +WHERE + l_suppkey = o_shippriority +GROUP BY + o_orderkey, o_shippriority +ORDER BY + o_orderkey; + + +-- Check that grouping by primary key allows s_quantity to be in the having +-- list +-- Postgres removes s_quantity from the group by clause here + +select s_i_id + from stock, order_line + where ol_i_id=s_i_id + group by s_i_id, s_w_id, s_quantity + having s_quantity > random() +; + -- Reset client logging level to its previous value SET client_min_messages TO NOTICE; COMMIT; + +drop schema repartition_join;