mirror of https://github.com/citusdata/citus.git
Fix FROM ONLY queries on partitioned tables
parent
9c08ab49df
commit
4b49cb112f
|
@ -7057,9 +7057,10 @@ get_from_clause_item(Node *jtnode, Query *query, deparse_context *context)
|
|||
ExtractRangeTblExtraData(rte, NULL, &fragmentSchemaName, &fragmentTableName, NULL);
|
||||
|
||||
/* use schema and table name from the remote alias */
|
||||
appendStringInfoString(buf,
|
||||
generate_fragment_name(fragmentSchemaName,
|
||||
fragmentTableName));
|
||||
appendStringInfo(buf, "%s%s",
|
||||
only_marker(rte),
|
||||
generate_fragment_name(fragmentSchemaName,
|
||||
fragmentTableName));
|
||||
break;
|
||||
}
|
||||
|
||||
|
|
|
@ -7115,9 +7115,10 @@ get_from_clause_item(Node *jtnode, Query *query, deparse_context *context)
|
|||
ExtractRangeTblExtraData(rte, NULL, &fragmentSchemaName, &fragmentTableName, NULL);
|
||||
|
||||
/* use schema and table name from the remote alias */
|
||||
appendStringInfoString(buf,
|
||||
generate_fragment_name(fragmentSchemaName,
|
||||
fragmentTableName));
|
||||
appendStringInfo(buf, "%s%s",
|
||||
only_marker(rte),
|
||||
generate_fragment_name(fragmentSchemaName,
|
||||
fragmentTableName));
|
||||
break;
|
||||
}
|
||||
|
||||
|
|
|
@ -164,30 +164,29 @@ distributed_planner(Query *parse,
|
|||
.boundParams = boundParams,
|
||||
};
|
||||
|
||||
if (fastPathRouterQuery)
|
||||
{
|
||||
/*
|
||||
* We need to copy the parse tree because the FastPathPlanner modifies
|
||||
* it. In the next branch we do the same for other distributed queries
|
||||
* too, but for those it needs to be done AFTER calling
|
||||
* AssignRTEIdentities.
|
||||
*/
|
||||
planContext.originalQuery = copyObject(parse);
|
||||
}
|
||||
else if (needsDistributedPlanning)
|
||||
if (needsDistributedPlanning)
|
||||
{
|
||||
/*
|
||||
* standard_planner scribbles on it's input, but for deparsing we need the
|
||||
* unmodified form. Note that before copying we call
|
||||
* AssignRTEIdentities, which is needed because these identities need
|
||||
* to be present in the copied query too.
|
||||
* unmodified form. Before copying we call AssignRTEIdentities to be able
|
||||
* to match RTEs in the rewritten query tree with those in the original
|
||||
* tree.
|
||||
*/
|
||||
rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter);
|
||||
|
||||
planContext.originalQuery = copyObject(parse);
|
||||
|
||||
bool setPartitionedTablesInherited = false;
|
||||
AdjustPartitioningForDistributedPlanning(rangeTableList,
|
||||
setPartitionedTablesInherited);
|
||||
/*
|
||||
* When there are partitioned tables (not applicable to fast path),
|
||||
* pretend that they are regular tables to avoid unnecessary work
|
||||
* in standard_planner.
|
||||
*/
|
||||
if (!fastPathRouterQuery)
|
||||
{
|
||||
bool setPartitionedTablesInherited = false;
|
||||
AdjustPartitioningForDistributedPlanning(rangeTableList,
|
||||
setPartitionedTablesInherited);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -446,7 +445,7 @@ AssignRTEIdentity(RangeTblEntry *rangeTableEntry, int rteIdentifier)
|
|||
{
|
||||
Assert(rangeTableEntry->rtekind == RTE_RELATION);
|
||||
|
||||
rangeTableEntry->values_lists = list_make1_int(rteIdentifier);
|
||||
rangeTableEntry->values_lists = list_make2_int(rteIdentifier, rangeTableEntry->inh);
|
||||
}
|
||||
|
||||
|
||||
|
@ -457,12 +456,24 @@ GetRTEIdentity(RangeTblEntry *rte)
|
|||
Assert(rte->rtekind == RTE_RELATION);
|
||||
Assert(rte->values_lists != NIL);
|
||||
Assert(IsA(rte->values_lists, IntList));
|
||||
Assert(list_length(rte->values_lists) == 1);
|
||||
Assert(list_length(rte->values_lists) == 2);
|
||||
|
||||
return linitial_int(rte->values_lists);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetOriginalInh gets the original value of the inheritance flag set by
|
||||
* AssignRTEIdentity. The planner resets this flag in the rewritten query,
|
||||
* but we need it during deparsing.
|
||||
*/
|
||||
bool
|
||||
GetOriginalInh(RangeTblEntry *rte)
|
||||
{
|
||||
return lsecond_int(rte->values_lists);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetQueryLockMode returns the necessary lock mode to be acquired for the
|
||||
* given query. (See comment written in RangeTblEntry->rellockmode)
|
||||
|
|
|
@ -1530,6 +1530,7 @@ MultiTableNodeList(List *tableEntryList, List *rangeTableList)
|
|||
tableNode->partitionColumn = partitionColumn;
|
||||
tableNode->alias = rangeTableEntry->alias;
|
||||
tableNode->referenceNames = rangeTableEntry->eref;
|
||||
tableNode->includePartitions = GetOriginalInh(rangeTableEntry);
|
||||
|
||||
tableNodeList = lappend(tableNodeList, tableNode);
|
||||
}
|
||||
|
|
|
@ -45,6 +45,7 @@
|
|||
#include "distributed/multi_join_order.h"
|
||||
#include "distributed/multi_logical_optimizer.h"
|
||||
#include "distributed/multi_logical_planner.h"
|
||||
#include "distributed/multi_partitioning_utils.h"
|
||||
#include "distributed/multi_physical_planner.h"
|
||||
#include "distributed/log_utils.h"
|
||||
#include "distributed/pg_dist_partition.h"
|
||||
|
@ -738,6 +739,8 @@ BaseRangeTableList(MultiNode *multiNode)
|
|||
rangeTableEntry->eref = multiTable->referenceNames;
|
||||
rangeTableEntry->alias = multiTable->alias;
|
||||
rangeTableEntry->relid = multiTable->relationId;
|
||||
rangeTableEntry->inh = multiTable->includePartitions;
|
||||
|
||||
SetRangeTblExtraData(rangeTableEntry, CITUS_RTE_RELATION, NULL, NULL,
|
||||
list_make1_int(multiTable->rangeTableId),
|
||||
NIL, NIL, NIL, NIL);
|
||||
|
@ -1463,6 +1466,7 @@ ConstructCallingRTE(RangeTblEntry *rangeTableEntry, List *dependentJobList)
|
|||
callingRTE->rtekind = RTE_RELATION;
|
||||
callingRTE->eref = rangeTableEntry->eref;
|
||||
callingRTE->relid = rangeTableEntry->relid;
|
||||
callingRTE->inh = rangeTableEntry->inh;
|
||||
}
|
||||
else if (rangeTableKind == CITUS_RTE_REMOTE_QUERY)
|
||||
{
|
||||
|
|
|
@ -237,6 +237,7 @@ extern Node * ResolveExternalParams(Node *inputNode, ParamListInfo boundParams);
|
|||
extern bool IsMultiTaskPlan(struct DistributedPlan *distributedPlan);
|
||||
extern RangeTblEntry * RemoteScanRangeTableEntry(List *columnNameList);
|
||||
extern int GetRTEIdentity(RangeTblEntry *rte);
|
||||
extern bool GetOriginalInh(RangeTblEntry *rte);
|
||||
extern LOCKMODE GetQueryLockMode(Query *query);
|
||||
extern int32 BlessRecordExpression(Expr *expr);
|
||||
extern void DissuadePlannerFromUsingPlan(PlannedStmt *plan);
|
||||
|
|
|
@ -76,7 +76,7 @@ typedef struct MultiTreeRoot
|
|||
|
||||
|
||||
/*
|
||||
* MultiTable represents a partitioned table in a logical query plan. Note that
|
||||
* MultiTable represents a distributed table in a logical query plan. Note that
|
||||
* this node does not represent a query operator, and differs from the nodes
|
||||
* that follow in that sense.
|
||||
*/
|
||||
|
@ -89,6 +89,7 @@ typedef struct MultiTable
|
|||
Alias *alias;
|
||||
Alias *referenceNames;
|
||||
Query *subquery; /* this field is only valid for non-relation subquery types */
|
||||
bool includePartitions;
|
||||
} MultiTable;
|
||||
|
||||
|
||||
|
|
|
@ -84,9 +84,9 @@ NOTICE: renaming the new table to collation_tests.test_collate_pushed_down_aggr
|
|||
SET citus.log_remote_commands TO true;
|
||||
SELECT ALL MIN((lower(CAST(test_collate_pushed_down_aggregate.a AS VARCHAR)) COLLATE "C"))
|
||||
FROM ONLY test_collate_pushed_down_aggregate;
|
||||
NOTICE: issuing SELECT min((lower(((a)::character varying COLLATE "default")) COLLATE "C")) AS min FROM collation_tests.test_collate_pushed_down_aggregate_20060004 test_collate_pushed_down_aggregate WHERE true
|
||||
NOTICE: issuing SELECT min((lower(((a)::character varying COLLATE "default")) COLLATE "C")) AS min FROM ONLY collation_tests.test_collate_pushed_down_aggregate_20060004 test_collate_pushed_down_aggregate WHERE true
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing SELECT min((lower(((a)::character varying COLLATE "default")) COLLATE "C")) AS min FROM collation_tests.test_collate_pushed_down_aggregate_20060005 test_collate_pushed_down_aggregate WHERE true
|
||||
NOTICE: issuing SELECT min((lower(((a)::character varying COLLATE "default")) COLLATE "C")) AS min FROM ONLY collation_tests.test_collate_pushed_down_aggregate_20060005 test_collate_pushed_down_aggregate WHERE true
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
min
|
||||
---------------------------------------------------------------------
|
||||
|
|
|
@ -74,6 +74,22 @@ SELECT * FROM partitioning_hash_test ORDER BY 1;
|
|||
4 | 4
|
||||
(4 rows)
|
||||
|
||||
-- should not return results when only querying parent
|
||||
SELECT * FROM ONLY partitioning_test ORDER BY 1;
|
||||
id | time
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
SELECT * FROM ONLY partitioning_test WHERE id = 1;
|
||||
id | time
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
SELECT * FROM ONLY partitioning_test a JOIN partitioning_hash_test b ON (a.id = b.subid) ORDER BY 1;
|
||||
id | time | id | subid
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
-- see partitioned table and its partitions are distributed
|
||||
SELECT
|
||||
logicalrelid
|
||||
|
@ -378,11 +394,15 @@ SELECT * FROM partitioning_test WHERE time >= '2011-01-01' AND time < '2013-01-0
|
|||
(12 rows)
|
||||
|
||||
-- test UPDATE
|
||||
-- UPDATE partitioned table
|
||||
-- (1) UPDATE partitioned table
|
||||
UPDATE partitioning_test SET time = '2013-07-07' WHERE id = 7;
|
||||
-- UPDATE partition directly
|
||||
-- (2) UPDATE partition directly
|
||||
UPDATE partitioning_test_2013 SET time = '2013-08-08' WHERE id = 8;
|
||||
-- see the data is updated
|
||||
-- (3) UPDATE only the parent (noop)
|
||||
UPDATE ONLY partitioning_test SET time = '2013-09-09' WHERE id = 7;
|
||||
-- (4) DELETE from only the parent (noop)
|
||||
DELETE FROM ONLY partitioning_test WHERE id = 7;
|
||||
-- see that only (1) and (2) had an effect
|
||||
SELECT * FROM partitioning_test WHERE id = 7 OR id = 8 ORDER BY 1;
|
||||
id | time
|
||||
---------------------------------------------------------------------
|
||||
|
@ -1061,6 +1081,54 @@ ORDER BY types;
|
|||
3 | 25
|
||||
(4 rows)
|
||||
|
||||
-- subquery with UNIONs on partitioned table, but only scan (empty) parent for some
|
||||
SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType
|
||||
FROM
|
||||
(SELECT *, random()
|
||||
FROM
|
||||
(SELECT
|
||||
"t"."user_id", "t"."time", unnest("t"."collected_events") AS "event_types"
|
||||
FROM
|
||||
(SELECT
|
||||
"t1"."user_id", min("t1"."time") AS "time", array_agg(("t1"."event") ORDER BY TIME ASC, event DESC) AS collected_events
|
||||
FROM(
|
||||
(SELECT
|
||||
"events"."user_id", "events"."time", 0 AS event
|
||||
FROM
|
||||
partitioned_events_table as "events"
|
||||
WHERE
|
||||
event_type IN (1, 2) )
|
||||
UNION
|
||||
(SELECT
|
||||
"events"."user_id", "events"."time", 1 AS event
|
||||
FROM
|
||||
partitioned_events_table as "events"
|
||||
WHERE
|
||||
event_type IN (3, 4) )
|
||||
UNION
|
||||
(SELECT
|
||||
"events"."user_id", "events"."time", 2 AS event
|
||||
FROM
|
||||
ONLY partitioned_events_table as "events"
|
||||
WHERE
|
||||
event_type IN (5, 6) )
|
||||
UNION
|
||||
(SELECT
|
||||
"events"."user_id", "events"."time", 3 AS event
|
||||
FROM
|
||||
ONLY partitioned_events_table as "events"
|
||||
WHERE
|
||||
event_type IN (1, 6))) t1
|
||||
GROUP BY "t1"."user_id") AS t) "q"
|
||||
) AS final_query
|
||||
GROUP BY types
|
||||
ORDER BY types;
|
||||
types | sumofeventtype
|
||||
---------------------------------------------------------------------
|
||||
0 | 43
|
||||
1 | 44
|
||||
(2 rows)
|
||||
|
||||
-- UNION and JOIN on both partitioned and regular tables
|
||||
SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType
|
||||
FROM
|
||||
|
|
|
@ -45,6 +45,11 @@ SELECT * FROM partitioning_test ORDER BY 1;
|
|||
|
||||
SELECT * FROM partitioning_hash_test ORDER BY 1;
|
||||
|
||||
-- should not return results when only querying parent
|
||||
SELECT * FROM ONLY partitioning_test ORDER BY 1;
|
||||
SELECT * FROM ONLY partitioning_test WHERE id = 1;
|
||||
SELECT * FROM ONLY partitioning_test a JOIN partitioning_hash_test b ON (a.id = b.subid) ORDER BY 1;
|
||||
|
||||
-- see partitioned table and its partitions are distributed
|
||||
SELECT
|
||||
logicalrelid
|
||||
|
@ -236,13 +241,19 @@ INSERT INTO partitioning_test_2012 SELECT * FROM partitioning_test WHERE time >=
|
|||
SELECT * FROM partitioning_test WHERE time >= '2011-01-01' AND time < '2013-01-01' ORDER BY 1;
|
||||
|
||||
-- test UPDATE
|
||||
-- UPDATE partitioned table
|
||||
-- (1) UPDATE partitioned table
|
||||
UPDATE partitioning_test SET time = '2013-07-07' WHERE id = 7;
|
||||
|
||||
-- UPDATE partition directly
|
||||
-- (2) UPDATE partition directly
|
||||
UPDATE partitioning_test_2013 SET time = '2013-08-08' WHERE id = 8;
|
||||
|
||||
-- see the data is updated
|
||||
-- (3) UPDATE only the parent (noop)
|
||||
UPDATE ONLY partitioning_test SET time = '2013-09-09' WHERE id = 7;
|
||||
|
||||
-- (4) DELETE from only the parent (noop)
|
||||
DELETE FROM ONLY partitioning_test WHERE id = 7;
|
||||
|
||||
-- see that only (1) and (2) had an effect
|
||||
SELECT * FROM partitioning_test WHERE id = 7 OR id = 8 ORDER BY 1;
|
||||
|
||||
-- UPDATE that tries to move a row to a non-existing partition (this should fail)
|
||||
|
@ -674,6 +685,50 @@ FROM
|
|||
GROUP BY types
|
||||
ORDER BY types;
|
||||
|
||||
-- subquery with UNIONs on partitioned table, but only scan (empty) parent for some
|
||||
SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType
|
||||
FROM
|
||||
(SELECT *, random()
|
||||
FROM
|
||||
(SELECT
|
||||
"t"."user_id", "t"."time", unnest("t"."collected_events") AS "event_types"
|
||||
FROM
|
||||
(SELECT
|
||||
"t1"."user_id", min("t1"."time") AS "time", array_agg(("t1"."event") ORDER BY TIME ASC, event DESC) AS collected_events
|
||||
FROM(
|
||||
(SELECT
|
||||
"events"."user_id", "events"."time", 0 AS event
|
||||
FROM
|
||||
partitioned_events_table as "events"
|
||||
WHERE
|
||||
event_type IN (1, 2) )
|
||||
UNION
|
||||
(SELECT
|
||||
"events"."user_id", "events"."time", 1 AS event
|
||||
FROM
|
||||
partitioned_events_table as "events"
|
||||
WHERE
|
||||
event_type IN (3, 4) )
|
||||
UNION
|
||||
(SELECT
|
||||
"events"."user_id", "events"."time", 2 AS event
|
||||
FROM
|
||||
ONLY partitioned_events_table as "events"
|
||||
WHERE
|
||||
event_type IN (5, 6) )
|
||||
UNION
|
||||
(SELECT
|
||||
"events"."user_id", "events"."time", 3 AS event
|
||||
FROM
|
||||
ONLY partitioned_events_table as "events"
|
||||
WHERE
|
||||
event_type IN (1, 6))) t1
|
||||
GROUP BY "t1"."user_id") AS t) "q"
|
||||
) AS final_query
|
||||
GROUP BY types
|
||||
ORDER BY types;
|
||||
|
||||
|
||||
-- UNION and JOIN on both partitioned and regular tables
|
||||
SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType
|
||||
FROM
|
||||
|
|
Loading…
Reference in New Issue