Fix queries with repartition joins and group by unique column (#3157)

Postgres doesn't require you to add all columns that are in the target list to
the GROUP BY when you group by a unique column (or columns). It even actively
removes these group by clauses when you do.

This is normally fine, but for repartition joins it is not. The reason for this
is that the temporary tables don't have these primary key columns. So when the
worker executes the query it will complain that it is missing columns in the
group by.

This PR fixes that by adding an ANY_VALUE aggregate around each variable in
the target list that does is not contained in the group by or in an aggregate.
This is done only for repartition joins.

The ANY_VALUE aggregate chooses the value from an undefined row in the
group.
pull/3163/head
Jelte Fennema 2019-11-08 15:36:18 +01:00 committed by GitHub
parent 02b359623f
commit 9fb897a074
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 560 additions and 4 deletions

View File

@ -1847,6 +1847,7 @@ MasterAggregateExpression(Aggref *originalAggregate,
*/
Var *column = NULL;
TargetEntry *columnTargetEntry = NULL;
Aggref *newMasterAggregate = NULL;
/* worker aggregate and original aggregate have the same return type */
Oid workerReturnType = exprType((Node *) originalAggregate);
@ -1857,7 +1858,16 @@ MasterAggregateExpression(Aggref *originalAggregate,
Oid aggregateFunctionId = AggregateFunctionOid(aggregateName, workerReturnType);
Oid masterReturnType = get_func_rettype(aggregateFunctionId);
Aggref *newMasterAggregate = copyObject(originalAggregate);
/*
* If return type aggregate is anyelement, it's actual return type is
* determined on the type of its argument. So we replace it with the
* argument type in that case.
*/
if (masterReturnType == ANYELEMENTOID)
{
masterReturnType = workerReturnType;
}
newMasterAggregate = copyObject(originalAggregate);
newMasterAggregate->aggdistinct = NULL;
newMasterAggregate->aggfnoid = aggregateFunctionId;
newMasterAggregate->aggtype = masterReturnType;
@ -2968,7 +2978,8 @@ AggregateFunctionOid(const char *functionName, Oid inputType)
if (argumentCount == 1)
{
/* check if input type and found value type match */
if (procForm->proargtypes.values[0] == inputType)
if (procForm->proargtypes.values[0] == inputType ||
procForm->proargtypes.values[0] == ANYELEMENTOID)
{
#if PG_VERSION_NUM < 120000
functionOid = HeapTupleGetOid(heapTuple);

View File

@ -24,6 +24,7 @@
#include "access/nbtree.h"
#include "access/skey.h"
#include "access/xlog.h"
#include "catalog/pg_aggregate.h"
#include "catalog/pg_am.h"
#include "catalog/pg_operator.h"
#include "catalog/pg_type.h"
@ -203,6 +204,7 @@ static StringInfo IntermediateTableQueryString(uint64 jobId, uint32 taskIdIndex,
static uint32 FinalTargetEntryCount(List *targetEntryList);
static bool CoPlacedShardIntervals(ShardInterval *firstInterval,
ShardInterval *secondInterval);
static Node * AddAnyValueAggregates(Node *node, void *context);
/*
@ -593,6 +595,7 @@ BuildJobQuery(MultiNode *multiNode, List *dependedJobList)
Node *havingQual = NULL;
bool hasDistinctOn = false;
List *distinctClause = NIL;
bool isRepartitionJoin = false;
/* we start building jobs from below the collect node */
Assert(!CitusIsA(multiNode, MultiCollect));
@ -623,6 +626,7 @@ BuildJobQuery(MultiNode *multiNode, List *dependedJobList)
if (CitusIsA(job, MapMergeJob))
{
MapMergeJob *mapMergeJob = (MapMergeJob *) job;
isRepartitionJoin = true;
if (mapMergeJob->reduceQuery)
{
updateColumnAttributes = false;
@ -672,6 +676,7 @@ BuildJobQuery(MultiNode *multiNode, List *dependedJobList)
/* build group clauses */
groupClauseList = QueryGroupClauseList(multiNode);
/* build the where clause list using select predicates */
selectClauseList = QuerySelectClauseList(multiNode);
@ -683,6 +688,23 @@ BuildJobQuery(MultiNode *multiNode, List *dependedJobList)
UpdateAllColumnAttributes(havingQual, rangeTableList, dependedJobList);
}
/*
* Group by on primary key allows all columns to appear in the target
* list, but after re-partitioning we will be querying an intermediate
* table that does not have the primary key. We therefore wrap all the
* columns that do not appear in the GROUP BY in an any_value aggregate.
*/
if (groupClauseList != NIL && isRepartitionJoin)
{
targetList = (List *) expression_tree_mutator((Node *) targetList,
AddAnyValueAggregates,
groupClauseList);
havingQual = expression_tree_mutator((Node *) havingQual,
AddAnyValueAggregates,
groupClauseList);
}
/*
* Build the From/Where construct. We keep the where-clause list implicitly
* AND'd, since both partition and join pruning depends on the clauses being
@ -955,6 +977,63 @@ TargetEntryList(List *expressionList)
}
/*
* AddAnyValueAggregates wraps all vars that do not apear in the GROUP BY
* clause or are inside an aggregate function in an any_value aggregate
* function. This is needed for repartition joins because primary keys are not
* present on intermediate tables.
*/
static Node *
AddAnyValueAggregates(Node *node, void *context)
{
List *groupClauseList = context;
if (node == NULL)
{
return node;
}
if (IsA(node, Var))
{
Var *var = (Var *) node;
Aggref *agg = makeNode(Aggref);
agg->aggfnoid = CitusAnyValueFunctionId();
agg->aggtype = var->vartype;
agg->args = list_make1(makeTargetEntry((Expr *) var, 1, NULL, false));
agg->aggkind = AGGKIND_NORMAL;
agg->aggtranstype = InvalidOid;
agg->aggargtypes = list_make1_oid(var->vartype);
agg->aggsplit = AGGSPLIT_SIMPLE;
return (Node *) agg;
}
if (IsA(node, TargetEntry))
{
TargetEntry *targetEntry = (TargetEntry *) node;
/*
* Stop searching this part of the tree if the targetEntry is part of
* the group by clause.
*/
if (targetEntry->ressortgroupref != 0)
{
SortGroupClause *sortGroupClause = NULL;
foreach_ptr(sortGroupClause, groupClauseList)
{
if (sortGroupClause->tleSortGroupRef == targetEntry->ressortgroupref)
{
return node;
}
}
}
}
if (IsA(node, Aggref))
{
return node;
}
return expression_tree_mutator(node, AddAnyValueAggregates, context);
}
/*
* QueryGroupClauseList extracts the group clause list from the logical plan. If
* no grouping clauses exist, the function returns an empty list.
@ -1489,6 +1568,7 @@ BuildSubqueryJobQuery(MultiNode *multiNode)
distinctClause = NIL;
}
/*
* Build the From/Where construct. We keep the where-clause list implicitly
* AND'd, since both partition and join pruning depends on the clauses being

View File

@ -8,3 +8,5 @@ COMMENT ON COLUMN pg_catalog.pg_dist_node.shouldhaveshards IS
-- we don't maintain replication factor of reference tables anymore and just
-- use -1 instead.
UPDATE pg_dist_colocation SET replicationfactor = -1 WHERE distributioncolumntype = 0;
#include "udfs/any_value/9.1-1.sql"

View File

@ -0,0 +1,14 @@
CREATE OR REPLACE FUNCTION pg_catalog.any_value_agg ( anyelement, anyelement )
RETURNS anyelement AS $$
SELECT CASE WHEN $1 IS NULL THEN $2 ELSE $1 END;
$$ LANGUAGE SQL STABLE;
CREATE AGGREGATE pg_catalog.any_value (
sfunc = pg_catalog.any_value_agg,
combinefunc = pg_catalog.any_value_agg,
basetype = anyelement,
stype = anyelement
);
COMMENT ON AGGREGATE pg_catalog.any_value(anyelement) IS
'Returns the value of any row in the group. It is mostly useful when you know there will be only 1 element.';

View File

@ -0,0 +1,14 @@
CREATE OR REPLACE FUNCTION pg_catalog.any_value_agg ( anyelement, anyelement )
RETURNS anyelement AS $$
SELECT CASE WHEN $1 IS NULL THEN $2 ELSE $1 END;
$$ LANGUAGE SQL STABLE;
CREATE AGGREGATE pg_catalog.any_value (
sfunc = pg_catalog.any_value_agg,
combinefunc = pg_catalog.any_value_agg,
basetype = anyelement,
stype = anyelement
);
COMMENT ON AGGREGATE pg_catalog.any_value(anyelement) IS
'Returns the value of any row in the group. It is mostly useful when you know there will be only 1 element.';

View File

@ -136,6 +136,7 @@ typedef struct MetadataCacheData
Oid readIntermediateResultFuncId;
Oid extraDataContainerFuncId;
Oid workerHashFunctionId;
Oid anyValueFunctionId;
Oid textSendAsJsonbFunctionId;
Oid extensionOwner;
Oid binaryCopyFormatId;
@ -2216,6 +2217,21 @@ CitusWorkerHashFunctionId(void)
}
/* return oid of the any_value aggregate function */
Oid
CitusAnyValueFunctionId(void)
{
if (MetadataCache.anyValueFunctionId == InvalidOid)
{
const int argCount = 1;
MetadataCache.anyValueFunctionId =
FunctionOid("pg_catalog", "any_value", argCount);
}
return MetadataCache.anyValueFunctionId;
}
/* return oid of the citus_text_send_as_jsonb(text) function */
Oid
CitusTextSendAsJsonbFunctionId(void)

View File

@ -188,6 +188,7 @@ extern Oid CitusCopyFormatTypeId(void);
extern Oid CitusReadIntermediateResultFuncId(void);
extern Oid CitusExtraDataContainerFuncId(void);
extern Oid CitusWorkerHashFunctionId(void);
extern Oid CitusAnyValueFunctionId(void);
extern Oid CitusTextSendAsJsonbFunctionId(void);
extern Oid PgTableVisibleFuncId(void);
extern Oid CitusTableVisibleFuncId(void);

View File

@ -75,7 +75,8 @@ typedef enum
AGGREGATE_HLL_ADD = 16,
AGGREGATE_HLL_UNION = 17,
AGGREGATE_TOPN_ADD_AGG = 18,
AGGREGATE_TOPN_UNION_AGG = 19
AGGREGATE_TOPN_UNION_AGG = 19,
AGGREGATE_ANY_VALUE = 20
} AggregateType;
@ -122,7 +123,8 @@ static const char *const AggregateNames[] = {
"json_agg", "json_object_agg",
"bit_and", "bit_or", "bool_and", "bool_or", "every",
"hll_add_agg", "hll_union_agg",
"topn_add_agg", "topn_union_agg"
"topn_add_agg", "topn_union_agg",
"any_value"
};

View File

@ -7,6 +7,38 @@
-- executor here, as we cannot run repartition jobs with real time executor.
SET citus.next_shard_id TO 690000;
SET citus.enable_unique_job_ids TO off;
create schema repartition_join;
DROP TABLE IF EXISTS repartition_join.order_line;
NOTICE: table "order_line" does not exist, skipping
CREATE TABLE order_line (
ol_w_id int NOT NULL,
ol_d_id int NOT NULL,
ol_o_id int NOT NULL,
ol_number int NOT NULL,
ol_i_id int NOT NULL,
ol_quantity decimal(2,0) NOT NULL,
PRIMARY KEY (ol_w_id,ol_d_id,ol_o_id,ol_number)
);
DROP TABLE IF EXISTS repartition_join.stock;
NOTICE: table "stock" does not exist, skipping
CREATE TABLE stock (
s_w_id int NOT NULL,
s_i_id int NOT NULL,
s_quantity decimal(4,0) NOT NULL,
PRIMARY KEY (s_w_id,s_i_id)
);
SELECT create_distributed_table('order_line','ol_w_id');
create_distributed_table
--------------------------
(1 row)
SELECT create_distributed_table('stock','s_w_id');
create_distributed_table
--------------------------
(1 row)
BEGIN;
SET client_min_messages TO DEBUG4;
SET citus.task_executor_type TO 'task-tracker';
@ -187,6 +219,315 @@ DEBUG: completed cleanup query for job 5
-----------+------------+-------
(0 rows)
-- Check that grouping by primary key allows o_shippriority to be in the target list
SELECT
o_orderkey, o_shippriority, count(*)
FROM
lineitem, orders
WHERE
l_suppkey = o_shippriority
GROUP BY
o_orderkey
ORDER BY
o_orderkey;
DEBUG: Router planner does not support append-partitioned tables.
DEBUG: generated sql query for task 1
DETAIL: query string: "SELECT l_suppkey FROM lineitem_290000 lineitem WHERE true"
DEBUG: generated sql query for task 2
DETAIL: query string: "SELECT l_suppkey FROM lineitem_290001 lineitem WHERE true"
DEBUG: assigned task 2 to node localhost:57637
DEBUG: assigned task 1 to node localhost:57638
DEBUG: generated sql query for task 1
DETAIL: query string: "SELECT o_orderkey, o_shippriority FROM orders_290002 orders WHERE true"
DEBUG: generated sql query for task 2
DETAIL: query string: "SELECT o_orderkey, o_shippriority FROM orders_290003 orders WHERE true"
DEBUG: assigned task 2 to node localhost:57637
DEBUG: assigned task 1 to node localhost:57638
DEBUG: join prunable for task partitionId 0 and 1
DEBUG: join prunable for task partitionId 0 and 2
DEBUG: join prunable for task partitionId 0 and 3
DEBUG: join prunable for task partitionId 1 and 0
DEBUG: join prunable for task partitionId 1 and 2
DEBUG: join prunable for task partitionId 1 and 3
DEBUG: join prunable for task partitionId 2 and 0
DEBUG: join prunable for task partitionId 2 and 1
DEBUG: join prunable for task partitionId 2 and 3
DEBUG: join prunable for task partitionId 3 and 0
DEBUG: join prunable for task partitionId 3 and 1
DEBUG: join prunable for task partitionId 3 and 2
DEBUG: generated sql query for task 3
DETAIL: query string: "SELECT "pg_merge_job_0008.task_000003".intermediate_column_8_0 AS o_orderkey, any_value("pg_merge_job_0008.task_000003".intermediate_column_8_1) AS o_shippriority, count(*) AS count FROM (pg_merge_job_0007.task_000003 "pg_merge_job_0007.task_000003" JOIN pg_merge_job_0008.task_000003 "pg_merge_job_0008.task_000003" ON (("pg_merge_job_0007.task_000003".intermediate_column_7_0 OPERATOR(pg_catalog.=) "pg_merge_job_0008.task_000003".intermediate_column_8_1))) WHERE true GROUP BY "pg_merge_job_0008.task_000003".intermediate_column_8_0"
DEBUG: generated sql query for task 6
DETAIL: query string: "SELECT "pg_merge_job_0008.task_000006".intermediate_column_8_0 AS o_orderkey, any_value("pg_merge_job_0008.task_000006".intermediate_column_8_1) AS o_shippriority, count(*) AS count FROM (pg_merge_job_0007.task_000006 "pg_merge_job_0007.task_000006" JOIN pg_merge_job_0008.task_000006 "pg_merge_job_0008.task_000006" ON (("pg_merge_job_0007.task_000006".intermediate_column_7_0 OPERATOR(pg_catalog.=) "pg_merge_job_0008.task_000006".intermediate_column_8_1))) WHERE true GROUP BY "pg_merge_job_0008.task_000006".intermediate_column_8_0"
DEBUG: generated sql query for task 9
DETAIL: query string: "SELECT "pg_merge_job_0008.task_000009".intermediate_column_8_0 AS o_orderkey, any_value("pg_merge_job_0008.task_000009".intermediate_column_8_1) AS o_shippriority, count(*) AS count FROM (pg_merge_job_0007.task_000009 "pg_merge_job_0007.task_000009" JOIN pg_merge_job_0008.task_000009 "pg_merge_job_0008.task_000009" ON (("pg_merge_job_0007.task_000009".intermediate_column_7_0 OPERATOR(pg_catalog.=) "pg_merge_job_0008.task_000009".intermediate_column_8_1))) WHERE true GROUP BY "pg_merge_job_0008.task_000009".intermediate_column_8_0"
DEBUG: generated sql query for task 12
DETAIL: query string: "SELECT "pg_merge_job_0008.task_000012".intermediate_column_8_0 AS o_orderkey, any_value("pg_merge_job_0008.task_000012".intermediate_column_8_1) AS o_shippriority, count(*) AS count FROM (pg_merge_job_0007.task_000012 "pg_merge_job_0007.task_000012" JOIN pg_merge_job_0008.task_000012 "pg_merge_job_0008.task_000012" ON (("pg_merge_job_0007.task_000012".intermediate_column_7_0 OPERATOR(pg_catalog.=) "pg_merge_job_0008.task_000012".intermediate_column_8_1))) WHERE true GROUP BY "pg_merge_job_0008.task_000012".intermediate_column_8_0"
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 3
DEBUG: pruning merge fetch taskId 2
DETAIL: Creating dependency on merge taskId 3
DEBUG: pruning merge fetch taskId 4
DETAIL: Creating dependency on merge taskId 6
DEBUG: pruning merge fetch taskId 5
DETAIL: Creating dependency on merge taskId 6
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 9
DEBUG: pruning merge fetch taskId 8
DETAIL: Creating dependency on merge taskId 9
DEBUG: pruning merge fetch taskId 10
DETAIL: Creating dependency on merge taskId 12
DEBUG: pruning merge fetch taskId 11
DETAIL: Creating dependency on merge taskId 12
DEBUG: assigned task 3 to node localhost:57638
DEBUG: assigned task 6 to node localhost:57637
DEBUG: assigned task 9 to node localhost:57638
DEBUG: assigned task 12 to node localhost:57637
DEBUG: completed cleanup query for job 9
DEBUG: completed cleanup query for job 9
DEBUG: completed cleanup query for job 7
DEBUG: completed cleanup query for job 7
DEBUG: completed cleanup query for job 8
DEBUG: completed cleanup query for job 8
o_orderkey | o_shippriority | count
------------+----------------+-------
(0 rows)
-- Check that grouping by primary key allows o_shippriority to be in the target
-- list
-- Postgres removes o_shippriority from the group by clause here
SELECT
o_orderkey, o_shippriority, count(*)
FROM
lineitem, orders
WHERE
l_suppkey = o_shippriority
GROUP BY
o_orderkey, o_shippriority
ORDER BY
o_orderkey;
DEBUG: Router planner does not support append-partitioned tables.
DEBUG: generated sql query for task 1
DETAIL: query string: "SELECT l_suppkey FROM lineitem_290000 lineitem WHERE true"
DEBUG: generated sql query for task 2
DETAIL: query string: "SELECT l_suppkey FROM lineitem_290001 lineitem WHERE true"
DEBUG: assigned task 2 to node localhost:57637
DEBUG: assigned task 1 to node localhost:57638
DEBUG: generated sql query for task 1
DETAIL: query string: "SELECT o_orderkey, o_shippriority FROM orders_290002 orders WHERE true"
DEBUG: generated sql query for task 2
DETAIL: query string: "SELECT o_orderkey, o_shippriority FROM orders_290003 orders WHERE true"
DEBUG: assigned task 2 to node localhost:57637
DEBUG: assigned task 1 to node localhost:57638
DEBUG: join prunable for task partitionId 0 and 1
DEBUG: join prunable for task partitionId 0 and 2
DEBUG: join prunable for task partitionId 0 and 3
DEBUG: join prunable for task partitionId 1 and 0
DEBUG: join prunable for task partitionId 1 and 2
DEBUG: join prunable for task partitionId 1 and 3
DEBUG: join prunable for task partitionId 2 and 0
DEBUG: join prunable for task partitionId 2 and 1
DEBUG: join prunable for task partitionId 2 and 3
DEBUG: join prunable for task partitionId 3 and 0
DEBUG: join prunable for task partitionId 3 and 1
DEBUG: join prunable for task partitionId 3 and 2
DEBUG: generated sql query for task 3
DETAIL: query string: "SELECT "pg_merge_job_0011.task_000003".intermediate_column_11_0 AS o_orderkey, any_value("pg_merge_job_0011.task_000003".intermediate_column_11_1) AS o_shippriority, count(*) AS count FROM (pg_merge_job_0010.task_000003 "pg_merge_job_0010.task_000003" JOIN pg_merge_job_0011.task_000003 "pg_merge_job_0011.task_000003" ON (("pg_merge_job_0010.task_000003".intermediate_column_10_0 OPERATOR(pg_catalog.=) "pg_merge_job_0011.task_000003".intermediate_column_11_1))) WHERE true GROUP BY "pg_merge_job_0011.task_000003".intermediate_column_11_0"
DEBUG: generated sql query for task 6
DETAIL: query string: "SELECT "pg_merge_job_0011.task_000006".intermediate_column_11_0 AS o_orderkey, any_value("pg_merge_job_0011.task_000006".intermediate_column_11_1) AS o_shippriority, count(*) AS count FROM (pg_merge_job_0010.task_000006 "pg_merge_job_0010.task_000006" JOIN pg_merge_job_0011.task_000006 "pg_merge_job_0011.task_000006" ON (("pg_merge_job_0010.task_000006".intermediate_column_10_0 OPERATOR(pg_catalog.=) "pg_merge_job_0011.task_000006".intermediate_column_11_1))) WHERE true GROUP BY "pg_merge_job_0011.task_000006".intermediate_column_11_0"
DEBUG: generated sql query for task 9
DETAIL: query string: "SELECT "pg_merge_job_0011.task_000009".intermediate_column_11_0 AS o_orderkey, any_value("pg_merge_job_0011.task_000009".intermediate_column_11_1) AS o_shippriority, count(*) AS count FROM (pg_merge_job_0010.task_000009 "pg_merge_job_0010.task_000009" JOIN pg_merge_job_0011.task_000009 "pg_merge_job_0011.task_000009" ON (("pg_merge_job_0010.task_000009".intermediate_column_10_0 OPERATOR(pg_catalog.=) "pg_merge_job_0011.task_000009".intermediate_column_11_1))) WHERE true GROUP BY "pg_merge_job_0011.task_000009".intermediate_column_11_0"
DEBUG: generated sql query for task 12
DETAIL: query string: "SELECT "pg_merge_job_0011.task_000012".intermediate_column_11_0 AS o_orderkey, any_value("pg_merge_job_0011.task_000012".intermediate_column_11_1) AS o_shippriority, count(*) AS count FROM (pg_merge_job_0010.task_000012 "pg_merge_job_0010.task_000012" JOIN pg_merge_job_0011.task_000012 "pg_merge_job_0011.task_000012" ON (("pg_merge_job_0010.task_000012".intermediate_column_10_0 OPERATOR(pg_catalog.=) "pg_merge_job_0011.task_000012".intermediate_column_11_1))) WHERE true GROUP BY "pg_merge_job_0011.task_000012".intermediate_column_11_0"
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 3
DEBUG: pruning merge fetch taskId 2
DETAIL: Creating dependency on merge taskId 3
DEBUG: pruning merge fetch taskId 4
DETAIL: Creating dependency on merge taskId 6
DEBUG: pruning merge fetch taskId 5
DETAIL: Creating dependency on merge taskId 6
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 9
DEBUG: pruning merge fetch taskId 8
DETAIL: Creating dependency on merge taskId 9
DEBUG: pruning merge fetch taskId 10
DETAIL: Creating dependency on merge taskId 12
DEBUG: pruning merge fetch taskId 11
DETAIL: Creating dependency on merge taskId 12
DEBUG: assigned task 3 to node localhost:57637
DEBUG: assigned task 6 to node localhost:57638
DEBUG: assigned task 9 to node localhost:57637
DEBUG: assigned task 12 to node localhost:57638
DEBUG: completed cleanup query for job 12
DEBUG: completed cleanup query for job 12
DEBUG: completed cleanup query for job 10
DEBUG: completed cleanup query for job 10
DEBUG: completed cleanup query for job 11
DEBUG: completed cleanup query for job 11
o_orderkey | o_shippriority | count
------------+----------------+-------
(0 rows)
-- Check that calling any_value manually works as well
SELECT
o_orderkey, any_value(o_shippriority)
FROM
lineitem, orders
WHERE
l_suppkey = o_shippriority
GROUP BY
o_orderkey, o_shippriority
ORDER BY
o_orderkey;
DEBUG: Router planner does not support append-partitioned tables.
DEBUG: generated sql query for task 1
DETAIL: query string: "SELECT l_suppkey FROM lineitem_290000 lineitem WHERE true"
DEBUG: generated sql query for task 2
DETAIL: query string: "SELECT l_suppkey FROM lineitem_290001 lineitem WHERE true"
DEBUG: assigned task 2 to node localhost:57637
DEBUG: assigned task 1 to node localhost:57638
DEBUG: generated sql query for task 1
DETAIL: query string: "SELECT o_orderkey, o_shippriority FROM orders_290002 orders WHERE true"
DEBUG: generated sql query for task 2
DETAIL: query string: "SELECT o_orderkey, o_shippriority FROM orders_290003 orders WHERE true"
DEBUG: assigned task 2 to node localhost:57637
DEBUG: assigned task 1 to node localhost:57638
DEBUG: join prunable for task partitionId 0 and 1
DEBUG: join prunable for task partitionId 0 and 2
DEBUG: join prunable for task partitionId 0 and 3
DEBUG: join prunable for task partitionId 1 and 0
DEBUG: join prunable for task partitionId 1 and 2
DEBUG: join prunable for task partitionId 1 and 3
DEBUG: join prunable for task partitionId 2 and 0
DEBUG: join prunable for task partitionId 2 and 1
DEBUG: join prunable for task partitionId 2 and 3
DEBUG: join prunable for task partitionId 3 and 0
DEBUG: join prunable for task partitionId 3 and 1
DEBUG: join prunable for task partitionId 3 and 2
DEBUG: generated sql query for task 3
DETAIL: query string: "SELECT "pg_merge_job_0014.task_000003".intermediate_column_14_0 AS o_orderkey, any_value("pg_merge_job_0014.task_000003".intermediate_column_14_1) AS any_value, any_value("pg_merge_job_0014.task_000003".intermediate_column_14_1) AS worker_column_3 FROM (pg_merge_job_0013.task_000003 "pg_merge_job_0013.task_000003" JOIN pg_merge_job_0014.task_000003 "pg_merge_job_0014.task_000003" ON (("pg_merge_job_0013.task_000003".intermediate_column_13_0 OPERATOR(pg_catalog.=) "pg_merge_job_0014.task_000003".intermediate_column_14_1))) WHERE true GROUP BY "pg_merge_job_0014.task_000003".intermediate_column_14_0"
DEBUG: generated sql query for task 6
DETAIL: query string: "SELECT "pg_merge_job_0014.task_000006".intermediate_column_14_0 AS o_orderkey, any_value("pg_merge_job_0014.task_000006".intermediate_column_14_1) AS any_value, any_value("pg_merge_job_0014.task_000006".intermediate_column_14_1) AS worker_column_3 FROM (pg_merge_job_0013.task_000006 "pg_merge_job_0013.task_000006" JOIN pg_merge_job_0014.task_000006 "pg_merge_job_0014.task_000006" ON (("pg_merge_job_0013.task_000006".intermediate_column_13_0 OPERATOR(pg_catalog.=) "pg_merge_job_0014.task_000006".intermediate_column_14_1))) WHERE true GROUP BY "pg_merge_job_0014.task_000006".intermediate_column_14_0"
DEBUG: generated sql query for task 9
DETAIL: query string: "SELECT "pg_merge_job_0014.task_000009".intermediate_column_14_0 AS o_orderkey, any_value("pg_merge_job_0014.task_000009".intermediate_column_14_1) AS any_value, any_value("pg_merge_job_0014.task_000009".intermediate_column_14_1) AS worker_column_3 FROM (pg_merge_job_0013.task_000009 "pg_merge_job_0013.task_000009" JOIN pg_merge_job_0014.task_000009 "pg_merge_job_0014.task_000009" ON (("pg_merge_job_0013.task_000009".intermediate_column_13_0 OPERATOR(pg_catalog.=) "pg_merge_job_0014.task_000009".intermediate_column_14_1))) WHERE true GROUP BY "pg_merge_job_0014.task_000009".intermediate_column_14_0"
DEBUG: generated sql query for task 12
DETAIL: query string: "SELECT "pg_merge_job_0014.task_000012".intermediate_column_14_0 AS o_orderkey, any_value("pg_merge_job_0014.task_000012".intermediate_column_14_1) AS any_value, any_value("pg_merge_job_0014.task_000012".intermediate_column_14_1) AS worker_column_3 FROM (pg_merge_job_0013.task_000012 "pg_merge_job_0013.task_000012" JOIN pg_merge_job_0014.task_000012 "pg_merge_job_0014.task_000012" ON (("pg_merge_job_0013.task_000012".intermediate_column_13_0 OPERATOR(pg_catalog.=) "pg_merge_job_0014.task_000012".intermediate_column_14_1))) WHERE true GROUP BY "pg_merge_job_0014.task_000012".intermediate_column_14_0"
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 3
DEBUG: pruning merge fetch taskId 2
DETAIL: Creating dependency on merge taskId 3
DEBUG: pruning merge fetch taskId 4
DETAIL: Creating dependency on merge taskId 6
DEBUG: pruning merge fetch taskId 5
DETAIL: Creating dependency on merge taskId 6
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 9
DEBUG: pruning merge fetch taskId 8
DETAIL: Creating dependency on merge taskId 9
DEBUG: pruning merge fetch taskId 10
DETAIL: Creating dependency on merge taskId 12
DEBUG: pruning merge fetch taskId 11
DETAIL: Creating dependency on merge taskId 12
DEBUG: assigned task 3 to node localhost:57638
DEBUG: assigned task 6 to node localhost:57637
DEBUG: assigned task 9 to node localhost:57638
DEBUG: assigned task 12 to node localhost:57637
DEBUG: completed cleanup query for job 15
DEBUG: completed cleanup query for job 15
DEBUG: completed cleanup query for job 13
DEBUG: completed cleanup query for job 13
DEBUG: completed cleanup query for job 14
DEBUG: completed cleanup query for job 14
o_orderkey | any_value
------------+-----------
(0 rows)
-- Check that grouping by primary key allows s_quantity to be in the having
-- list
-- Postgres removes s_quantity from the group by clause here
select s_i_id
from stock, order_line
where ol_i_id=s_i_id
group by s_i_id, s_w_id, s_quantity
having s_quantity > random()
;
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: generated sql query for task 1
DETAIL: query string: "SELECT s_i_id, s_w_id, s_quantity FROM stock_690004 stock WHERE true"
DEBUG: generated sql query for task 2
DETAIL: query string: "SELECT s_i_id, s_w_id, s_quantity FROM stock_690005 stock WHERE true"
DEBUG: generated sql query for task 3
DETAIL: query string: "SELECT s_i_id, s_w_id, s_quantity FROM stock_690006 stock WHERE true"
DEBUG: generated sql query for task 4
DETAIL: query string: "SELECT s_i_id, s_w_id, s_quantity FROM stock_690007 stock WHERE true"
DEBUG: assigned task 1 to node localhost:57637
DEBUG: assigned task 2 to node localhost:57638
DEBUG: assigned task 3 to node localhost:57637
DEBUG: assigned task 4 to node localhost:57638
DEBUG: generated sql query for task 1
DETAIL: query string: "SELECT ol_i_id FROM order_line_690000 order_line WHERE true"
DEBUG: generated sql query for task 2
DETAIL: query string: "SELECT ol_i_id FROM order_line_690001 order_line WHERE true"
DEBUG: generated sql query for task 3
DETAIL: query string: "SELECT ol_i_id FROM order_line_690002 order_line WHERE true"
DEBUG: generated sql query for task 4
DETAIL: query string: "SELECT ol_i_id FROM order_line_690003 order_line WHERE true"
DEBUG: assigned task 1 to node localhost:57637
DEBUG: assigned task 2 to node localhost:57638
DEBUG: assigned task 3 to node localhost:57637
DEBUG: assigned task 4 to node localhost:57638
DEBUG: join prunable for task partitionId 0 and 1
DEBUG: join prunable for task partitionId 0 and 2
DEBUG: join prunable for task partitionId 0 and 3
DEBUG: join prunable for task partitionId 1 and 0
DEBUG: join prunable for task partitionId 1 and 2
DEBUG: join prunable for task partitionId 1 and 3
DEBUG: join prunable for task partitionId 2 and 0
DEBUG: join prunable for task partitionId 2 and 1
DEBUG: join prunable for task partitionId 2 and 3
DEBUG: join prunable for task partitionId 3 and 0
DEBUG: join prunable for task partitionId 3 and 1
DEBUG: join prunable for task partitionId 3 and 2
DEBUG: generated sql query for task 3
DETAIL: query string: "SELECT "pg_merge_job_0016.task_000005".intermediate_column_16_0 AS s_i_id, "pg_merge_job_0016.task_000005".intermediate_column_16_1 AS worker_column_2, any_value("pg_merge_job_0016.task_000005".intermediate_column_16_2) AS worker_column_3, any_value("pg_merge_job_0016.task_000005".intermediate_column_16_2) AS worker_column_4 FROM (pg_merge_job_0016.task_000005 "pg_merge_job_0016.task_000005" JOIN pg_merge_job_0017.task_000005 "pg_merge_job_0017.task_000005" ON (("pg_merge_job_0017.task_000005".intermediate_column_17_0 OPERATOR(pg_catalog.=) "pg_merge_job_0016.task_000005".intermediate_column_16_0))) WHERE true GROUP BY "pg_merge_job_0016.task_000005".intermediate_column_16_0, "pg_merge_job_0016.task_000005".intermediate_column_16_1 HAVING ((any_value("pg_merge_job_0016.task_000005".intermediate_column_16_2))::double precision OPERATOR(pg_catalog.>) random())"
DEBUG: generated sql query for task 6
DETAIL: query string: "SELECT "pg_merge_job_0016.task_000010".intermediate_column_16_0 AS s_i_id, "pg_merge_job_0016.task_000010".intermediate_column_16_1 AS worker_column_2, any_value("pg_merge_job_0016.task_000010".intermediate_column_16_2) AS worker_column_3, any_value("pg_merge_job_0016.task_000010".intermediate_column_16_2) AS worker_column_4 FROM (pg_merge_job_0016.task_000010 "pg_merge_job_0016.task_000010" JOIN pg_merge_job_0017.task_000010 "pg_merge_job_0017.task_000010" ON (("pg_merge_job_0017.task_000010".intermediate_column_17_0 OPERATOR(pg_catalog.=) "pg_merge_job_0016.task_000010".intermediate_column_16_0))) WHERE true GROUP BY "pg_merge_job_0016.task_000010".intermediate_column_16_0, "pg_merge_job_0016.task_000010".intermediate_column_16_1 HAVING ((any_value("pg_merge_job_0016.task_000010".intermediate_column_16_2))::double precision OPERATOR(pg_catalog.>) random())"
DEBUG: generated sql query for task 9
DETAIL: query string: "SELECT "pg_merge_job_0016.task_000015".intermediate_column_16_0 AS s_i_id, "pg_merge_job_0016.task_000015".intermediate_column_16_1 AS worker_column_2, any_value("pg_merge_job_0016.task_000015".intermediate_column_16_2) AS worker_column_3, any_value("pg_merge_job_0016.task_000015".intermediate_column_16_2) AS worker_column_4 FROM (pg_merge_job_0016.task_000015 "pg_merge_job_0016.task_000015" JOIN pg_merge_job_0017.task_000015 "pg_merge_job_0017.task_000015" ON (("pg_merge_job_0017.task_000015".intermediate_column_17_0 OPERATOR(pg_catalog.=) "pg_merge_job_0016.task_000015".intermediate_column_16_0))) WHERE true GROUP BY "pg_merge_job_0016.task_000015".intermediate_column_16_0, "pg_merge_job_0016.task_000015".intermediate_column_16_1 HAVING ((any_value("pg_merge_job_0016.task_000015".intermediate_column_16_2))::double precision OPERATOR(pg_catalog.>) random())"
DEBUG: generated sql query for task 12
DETAIL: query string: "SELECT "pg_merge_job_0016.task_000020".intermediate_column_16_0 AS s_i_id, "pg_merge_job_0016.task_000020".intermediate_column_16_1 AS worker_column_2, any_value("pg_merge_job_0016.task_000020".intermediate_column_16_2) AS worker_column_3, any_value("pg_merge_job_0016.task_000020".intermediate_column_16_2) AS worker_column_4 FROM (pg_merge_job_0016.task_000020 "pg_merge_job_0016.task_000020" JOIN pg_merge_job_0017.task_000020 "pg_merge_job_0017.task_000020" ON (("pg_merge_job_0017.task_000020".intermediate_column_17_0 OPERATOR(pg_catalog.=) "pg_merge_job_0016.task_000020".intermediate_column_16_0))) WHERE true GROUP BY "pg_merge_job_0016.task_000020".intermediate_column_16_0, "pg_merge_job_0016.task_000020".intermediate_column_16_1 HAVING ((any_value("pg_merge_job_0016.task_000020".intermediate_column_16_2))::double precision OPERATOR(pg_catalog.>) random())"
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 5
DEBUG: pruning merge fetch taskId 2
DETAIL: Creating dependency on merge taskId 5
DEBUG: pruning merge fetch taskId 4
DETAIL: Creating dependency on merge taskId 10
DEBUG: pruning merge fetch taskId 5
DETAIL: Creating dependency on merge taskId 10
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 15
DEBUG: pruning merge fetch taskId 8
DETAIL: Creating dependency on merge taskId 15
DEBUG: pruning merge fetch taskId 10
DETAIL: Creating dependency on merge taskId 20
DEBUG: pruning merge fetch taskId 11
DETAIL: Creating dependency on merge taskId 20
DEBUG: assigned task 3 to node localhost:57637
DEBUG: assigned task 6 to node localhost:57638
DEBUG: assigned task 9 to node localhost:57637
DEBUG: assigned task 12 to node localhost:57638
DEBUG: completed cleanup query for job 18
DEBUG: completed cleanup query for job 18
DEBUG: completed cleanup query for job 16
DEBUG: completed cleanup query for job 16
DEBUG: completed cleanup query for job 17
DEBUG: completed cleanup query for job 17
s_i_id
--------
(0 rows)
-- Reset client logging level to its previous value
SET client_min_messages TO NOTICE;
COMMIT;
drop schema repartition_join;

View File

@ -10,6 +10,29 @@
SET citus.next_shard_id TO 690000;
SET citus.enable_unique_job_ids TO off;
create schema repartition_join;
DROP TABLE IF EXISTS repartition_join.order_line;
CREATE TABLE order_line (
ol_w_id int NOT NULL,
ol_d_id int NOT NULL,
ol_o_id int NOT NULL,
ol_number int NOT NULL,
ol_i_id int NOT NULL,
ol_quantity decimal(2,0) NOT NULL,
PRIMARY KEY (ol_w_id,ol_d_id,ol_o_id,ol_number)
);
DROP TABLE IF EXISTS repartition_join.stock;
CREATE TABLE stock (
s_w_id int NOT NULL,
s_i_id int NOT NULL,
s_quantity decimal(4,0) NOT NULL,
PRIMARY KEY (s_w_id,s_i_id)
);
SELECT create_distributed_table('order_line','ol_w_id');
SELECT create_distributed_table('stock','s_w_id');
BEGIN;
SET client_min_messages TO DEBUG4;
SET citus.task_executor_type TO 'task-tracker';
@ -52,7 +75,59 @@ GROUP BY
ORDER BY
l_partkey, o_orderkey;
-- Check that grouping by primary key allows o_shippriority to be in the target list
SELECT
o_orderkey, o_shippriority, count(*)
FROM
lineitem, orders
WHERE
l_suppkey = o_shippriority
GROUP BY
o_orderkey
ORDER BY
o_orderkey;
-- Check that grouping by primary key allows o_shippriority to be in the target
-- list
-- Postgres removes o_shippriority from the group by clause here
SELECT
o_orderkey, o_shippriority, count(*)
FROM
lineitem, orders
WHERE
l_suppkey = o_shippriority
GROUP BY
o_orderkey, o_shippriority
ORDER BY
o_orderkey;
-- Check that calling any_value manually works as well
SELECT
o_orderkey, any_value(o_shippriority)
FROM
lineitem, orders
WHERE
l_suppkey = o_shippriority
GROUP BY
o_orderkey, o_shippriority
ORDER BY
o_orderkey;
-- Check that grouping by primary key allows s_quantity to be in the having
-- list
-- Postgres removes s_quantity from the group by clause here
select s_i_id
from stock, order_line
where ol_i_id=s_i_id
group by s_i_id, s_w_id, s_quantity
having s_quantity > random()
;
-- Reset client logging level to its previous value
SET client_min_messages TO NOTICE;
COMMIT;
drop schema repartition_join;