Merge pull request #4865 from citusdata/marcocitus/fix-from-only

Fix FROM ONLY queries on partitioned tables
pull/4717/head^2
Marco Slot 2021-04-28 14:17:12 +02:00 committed by GitHub
commit 6a050ab6b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 177 additions and 34 deletions

View File

@ -7057,7 +7057,8 @@ get_from_clause_item(Node *jtnode, Query *query, deparse_context *context)
ExtractRangeTblExtraData(rte, NULL, &fragmentSchemaName, &fragmentTableName, NULL); ExtractRangeTblExtraData(rte, NULL, &fragmentSchemaName, &fragmentTableName, NULL);
/* use schema and table name from the remote alias */ /* use schema and table name from the remote alias */
appendStringInfoString(buf, appendStringInfo(buf, "%s%s",
only_marker(rte),
generate_fragment_name(fragmentSchemaName, generate_fragment_name(fragmentSchemaName,
fragmentTableName)); fragmentTableName));
break; break;

View File

@ -7115,7 +7115,8 @@ get_from_clause_item(Node *jtnode, Query *query, deparse_context *context)
ExtractRangeTblExtraData(rte, NULL, &fragmentSchemaName, &fragmentTableName, NULL); ExtractRangeTblExtraData(rte, NULL, &fragmentSchemaName, &fragmentTableName, NULL);
/* use schema and table name from the remote alias */ /* use schema and table name from the remote alias */
appendStringInfoString(buf, appendStringInfo(buf, "%s%s",
only_marker(rte),
generate_fragment_name(fragmentSchemaName, generate_fragment_name(fragmentSchemaName,
fragmentTableName)); fragmentTableName));
break; break;

View File

@ -164,31 +164,30 @@ distributed_planner(Query *parse,
.boundParams = boundParams, .boundParams = boundParams,
}; };
if (fastPathRouterQuery) if (needsDistributedPlanning)
{
/*
* We need to copy the parse tree because the FastPathPlanner modifies
* it. In the next branch we do the same for other distributed queries
* too, but for those it needs to be done AFTER calling
* AssignRTEIdentities.
*/
planContext.originalQuery = copyObject(parse);
}
else if (needsDistributedPlanning)
{ {
/* /*
* standard_planner scribbles on it's input, but for deparsing we need the * standard_planner scribbles on it's input, but for deparsing we need the
* unmodified form. Note that before copying we call * unmodified form. Before copying we call AssignRTEIdentities to be able
* AssignRTEIdentities, which is needed because these identities need * to match RTEs in the rewritten query tree with those in the original
* to be present in the copied query too. * tree.
*/ */
rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter); rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter);
planContext.originalQuery = copyObject(parse); planContext.originalQuery = copyObject(parse);
/*
* When there are partitioned tables (not applicable to fast path),
* pretend that they are regular tables to avoid unnecessary work
* in standard_planner.
*/
if (!fastPathRouterQuery)
{
bool setPartitionedTablesInherited = false; bool setPartitionedTablesInherited = false;
AdjustPartitioningForDistributedPlanning(rangeTableList, AdjustPartitioningForDistributedPlanning(rangeTableList,
setPartitionedTablesInherited); setPartitionedTablesInherited);
} }
}
/* /*
* Make sure that we hide shard names on the Citus MX worker nodes. See comments in * Make sure that we hide shard names on the Citus MX worker nodes. See comments in
@ -446,7 +445,7 @@ AssignRTEIdentity(RangeTblEntry *rangeTableEntry, int rteIdentifier)
{ {
Assert(rangeTableEntry->rtekind == RTE_RELATION); Assert(rangeTableEntry->rtekind == RTE_RELATION);
rangeTableEntry->values_lists = list_make1_int(rteIdentifier); rangeTableEntry->values_lists = list_make2_int(rteIdentifier, rangeTableEntry->inh);
} }
@ -457,12 +456,24 @@ GetRTEIdentity(RangeTblEntry *rte)
Assert(rte->rtekind == RTE_RELATION); Assert(rte->rtekind == RTE_RELATION);
Assert(rte->values_lists != NIL); Assert(rte->values_lists != NIL);
Assert(IsA(rte->values_lists, IntList)); Assert(IsA(rte->values_lists, IntList));
Assert(list_length(rte->values_lists) == 1); Assert(list_length(rte->values_lists) == 2);
return linitial_int(rte->values_lists); return linitial_int(rte->values_lists);
} }
/*
* GetOriginalInh gets the original value of the inheritance flag set by
* AssignRTEIdentity. The planner resets this flag in the rewritten query,
* but we need it during deparsing.
*/
bool
GetOriginalInh(RangeTblEntry *rte)
{
return lsecond_int(rte->values_lists);
}
/* /*
* GetQueryLockMode returns the necessary lock mode to be acquired for the * GetQueryLockMode returns the necessary lock mode to be acquired for the
* given query. (See comment written in RangeTblEntry->rellockmode) * given query. (See comment written in RangeTblEntry->rellockmode)

View File

@ -1530,6 +1530,7 @@ MultiTableNodeList(List *tableEntryList, List *rangeTableList)
tableNode->partitionColumn = partitionColumn; tableNode->partitionColumn = partitionColumn;
tableNode->alias = rangeTableEntry->alias; tableNode->alias = rangeTableEntry->alias;
tableNode->referenceNames = rangeTableEntry->eref; tableNode->referenceNames = rangeTableEntry->eref;
tableNode->includePartitions = GetOriginalInh(rangeTableEntry);
tableNodeList = lappend(tableNodeList, tableNode); tableNodeList = lappend(tableNodeList, tableNode);
} }

View File

@ -45,6 +45,7 @@
#include "distributed/multi_join_order.h" #include "distributed/multi_join_order.h"
#include "distributed/multi_logical_optimizer.h" #include "distributed/multi_logical_optimizer.h"
#include "distributed/multi_logical_planner.h" #include "distributed/multi_logical_planner.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
#include "distributed/log_utils.h" #include "distributed/log_utils.h"
#include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_partition.h"
@ -738,6 +739,8 @@ BaseRangeTableList(MultiNode *multiNode)
rangeTableEntry->eref = multiTable->referenceNames; rangeTableEntry->eref = multiTable->referenceNames;
rangeTableEntry->alias = multiTable->alias; rangeTableEntry->alias = multiTable->alias;
rangeTableEntry->relid = multiTable->relationId; rangeTableEntry->relid = multiTable->relationId;
rangeTableEntry->inh = multiTable->includePartitions;
SetRangeTblExtraData(rangeTableEntry, CITUS_RTE_RELATION, NULL, NULL, SetRangeTblExtraData(rangeTableEntry, CITUS_RTE_RELATION, NULL, NULL,
list_make1_int(multiTable->rangeTableId), list_make1_int(multiTable->rangeTableId),
NIL, NIL, NIL, NIL); NIL, NIL, NIL, NIL);
@ -1463,6 +1466,7 @@ ConstructCallingRTE(RangeTblEntry *rangeTableEntry, List *dependentJobList)
callingRTE->rtekind = RTE_RELATION; callingRTE->rtekind = RTE_RELATION;
callingRTE->eref = rangeTableEntry->eref; callingRTE->eref = rangeTableEntry->eref;
callingRTE->relid = rangeTableEntry->relid; callingRTE->relid = rangeTableEntry->relid;
callingRTE->inh = rangeTableEntry->inh;
} }
else if (rangeTableKind == CITUS_RTE_REMOTE_QUERY) else if (rangeTableKind == CITUS_RTE_REMOTE_QUERY)
{ {

View File

@ -237,6 +237,7 @@ extern Node * ResolveExternalParams(Node *inputNode, ParamListInfo boundParams);
extern bool IsMultiTaskPlan(struct DistributedPlan *distributedPlan); extern bool IsMultiTaskPlan(struct DistributedPlan *distributedPlan);
extern RangeTblEntry * RemoteScanRangeTableEntry(List *columnNameList); extern RangeTblEntry * RemoteScanRangeTableEntry(List *columnNameList);
extern int GetRTEIdentity(RangeTblEntry *rte); extern int GetRTEIdentity(RangeTblEntry *rte);
extern bool GetOriginalInh(RangeTblEntry *rte);
extern LOCKMODE GetQueryLockMode(Query *query); extern LOCKMODE GetQueryLockMode(Query *query);
extern int32 BlessRecordExpression(Expr *expr); extern int32 BlessRecordExpression(Expr *expr);
extern void DissuadePlannerFromUsingPlan(PlannedStmt *plan); extern void DissuadePlannerFromUsingPlan(PlannedStmt *plan);

View File

@ -76,7 +76,7 @@ typedef struct MultiTreeRoot
/* /*
* MultiTable represents a partitioned table in a logical query plan. Note that * MultiTable represents a distributed table in a logical query plan. Note that
* this node does not represent a query operator, and differs from the nodes * this node does not represent a query operator, and differs from the nodes
* that follow in that sense. * that follow in that sense.
*/ */
@ -89,6 +89,7 @@ typedef struct MultiTable
Alias *alias; Alias *alias;
Alias *referenceNames; Alias *referenceNames;
Query *subquery; /* this field is only valid for non-relation subquery types */ Query *subquery; /* this field is only valid for non-relation subquery types */
bool includePartitions;
} MultiTable; } MultiTable;

View File

@ -84,9 +84,9 @@ NOTICE: renaming the new table to collation_tests.test_collate_pushed_down_aggr
SET citus.log_remote_commands TO true; SET citus.log_remote_commands TO true;
SELECT ALL MIN((lower(CAST(test_collate_pushed_down_aggregate.a AS VARCHAR)) COLLATE "C")) SELECT ALL MIN((lower(CAST(test_collate_pushed_down_aggregate.a AS VARCHAR)) COLLATE "C"))
FROM ONLY test_collate_pushed_down_aggregate; FROM ONLY test_collate_pushed_down_aggregate;
NOTICE: issuing SELECT min((lower(((a)::character varying COLLATE "default")) COLLATE "C")) AS min FROM collation_tests.test_collate_pushed_down_aggregate_20060004 test_collate_pushed_down_aggregate WHERE true NOTICE: issuing SELECT min((lower(((a)::character varying COLLATE "default")) COLLATE "C")) AS min FROM ONLY collation_tests.test_collate_pushed_down_aggregate_20060004 test_collate_pushed_down_aggregate WHERE true
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT min((lower(((a)::character varying COLLATE "default")) COLLATE "C")) AS min FROM collation_tests.test_collate_pushed_down_aggregate_20060005 test_collate_pushed_down_aggregate WHERE true NOTICE: issuing SELECT min((lower(((a)::character varying COLLATE "default")) COLLATE "C")) AS min FROM ONLY collation_tests.test_collate_pushed_down_aggregate_20060005 test_collate_pushed_down_aggregate WHERE true
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
min min
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -74,6 +74,22 @@ SELECT * FROM partitioning_hash_test ORDER BY 1;
4 | 4 4 | 4
(4 rows) (4 rows)
-- should not return results when only querying parent
SELECT * FROM ONLY partitioning_test ORDER BY 1;
id | time
---------------------------------------------------------------------
(0 rows)
SELECT * FROM ONLY partitioning_test WHERE id = 1;
id | time
---------------------------------------------------------------------
(0 rows)
SELECT * FROM ONLY partitioning_test a JOIN partitioning_hash_test b ON (a.id = b.subid) ORDER BY 1;
id | time | id | subid
---------------------------------------------------------------------
(0 rows)
-- see partitioned table and its partitions are distributed -- see partitioned table and its partitions are distributed
SELECT SELECT
logicalrelid logicalrelid
@ -378,11 +394,15 @@ SELECT * FROM partitioning_test WHERE time >= '2011-01-01' AND time < '2013-01-0
(12 rows) (12 rows)
-- test UPDATE -- test UPDATE
-- UPDATE partitioned table -- (1) UPDATE partitioned table
UPDATE partitioning_test SET time = '2013-07-07' WHERE id = 7; UPDATE partitioning_test SET time = '2013-07-07' WHERE id = 7;
-- UPDATE partition directly -- (2) UPDATE partition directly
UPDATE partitioning_test_2013 SET time = '2013-08-08' WHERE id = 8; UPDATE partitioning_test_2013 SET time = '2013-08-08' WHERE id = 8;
-- see the data is updated -- (3) UPDATE only the parent (noop)
UPDATE ONLY partitioning_test SET time = '2013-09-09' WHERE id = 7;
-- (4) DELETE from only the parent (noop)
DELETE FROM ONLY partitioning_test WHERE id = 7;
-- see that only (1) and (2) had an effect
SELECT * FROM partitioning_test WHERE id = 7 OR id = 8 ORDER BY 1; SELECT * FROM partitioning_test WHERE id = 7 OR id = 8 ORDER BY 1;
id | time id | time
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -1061,6 +1081,54 @@ ORDER BY types;
3 | 25 3 | 25
(4 rows) (4 rows)
-- subquery with UNIONs on partitioned table, but only scan (empty) parent for some
SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType
FROM
(SELECT *, random()
FROM
(SELECT
"t"."user_id", "t"."time", unnest("t"."collected_events") AS "event_types"
FROM
(SELECT
"t1"."user_id", min("t1"."time") AS "time", array_agg(("t1"."event") ORDER BY TIME ASC, event DESC) AS collected_events
FROM(
(SELECT
"events"."user_id", "events"."time", 0 AS event
FROM
partitioned_events_table as "events"
WHERE
event_type IN (1, 2) )
UNION
(SELECT
"events"."user_id", "events"."time", 1 AS event
FROM
partitioned_events_table as "events"
WHERE
event_type IN (3, 4) )
UNION
(SELECT
"events"."user_id", "events"."time", 2 AS event
FROM
ONLY partitioned_events_table as "events"
WHERE
event_type IN (5, 6) )
UNION
(SELECT
"events"."user_id", "events"."time", 3 AS event
FROM
ONLY partitioned_events_table as "events"
WHERE
event_type IN (1, 6))) t1
GROUP BY "t1"."user_id") AS t) "q"
) AS final_query
GROUP BY types
ORDER BY types;
types | sumofeventtype
---------------------------------------------------------------------
0 | 43
1 | 44
(2 rows)
-- UNION and JOIN on both partitioned and regular tables -- UNION and JOIN on both partitioned and regular tables
SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType
FROM FROM

View File

@ -45,6 +45,11 @@ SELECT * FROM partitioning_test ORDER BY 1;
SELECT * FROM partitioning_hash_test ORDER BY 1; SELECT * FROM partitioning_hash_test ORDER BY 1;
-- should not return results when only querying parent
SELECT * FROM ONLY partitioning_test ORDER BY 1;
SELECT * FROM ONLY partitioning_test WHERE id = 1;
SELECT * FROM ONLY partitioning_test a JOIN partitioning_hash_test b ON (a.id = b.subid) ORDER BY 1;
-- see partitioned table and its partitions are distributed -- see partitioned table and its partitions are distributed
SELECT SELECT
logicalrelid logicalrelid
@ -236,13 +241,19 @@ INSERT INTO partitioning_test_2012 SELECT * FROM partitioning_test WHERE time >=
SELECT * FROM partitioning_test WHERE time >= '2011-01-01' AND time < '2013-01-01' ORDER BY 1; SELECT * FROM partitioning_test WHERE time >= '2011-01-01' AND time < '2013-01-01' ORDER BY 1;
-- test UPDATE -- test UPDATE
-- UPDATE partitioned table -- (1) UPDATE partitioned table
UPDATE partitioning_test SET time = '2013-07-07' WHERE id = 7; UPDATE partitioning_test SET time = '2013-07-07' WHERE id = 7;
-- UPDATE partition directly -- (2) UPDATE partition directly
UPDATE partitioning_test_2013 SET time = '2013-08-08' WHERE id = 8; UPDATE partitioning_test_2013 SET time = '2013-08-08' WHERE id = 8;
-- see the data is updated -- (3) UPDATE only the parent (noop)
UPDATE ONLY partitioning_test SET time = '2013-09-09' WHERE id = 7;
-- (4) DELETE from only the parent (noop)
DELETE FROM ONLY partitioning_test WHERE id = 7;
-- see that only (1) and (2) had an effect
SELECT * FROM partitioning_test WHERE id = 7 OR id = 8 ORDER BY 1; SELECT * FROM partitioning_test WHERE id = 7 OR id = 8 ORDER BY 1;
-- UPDATE that tries to move a row to a non-existing partition (this should fail) -- UPDATE that tries to move a row to a non-existing partition (this should fail)
@ -674,6 +685,50 @@ FROM
GROUP BY types GROUP BY types
ORDER BY types; ORDER BY types;
-- subquery with UNIONs on partitioned table, but only scan (empty) parent for some
SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType
FROM
(SELECT *, random()
FROM
(SELECT
"t"."user_id", "t"."time", unnest("t"."collected_events") AS "event_types"
FROM
(SELECT
"t1"."user_id", min("t1"."time") AS "time", array_agg(("t1"."event") ORDER BY TIME ASC, event DESC) AS collected_events
FROM(
(SELECT
"events"."user_id", "events"."time", 0 AS event
FROM
partitioned_events_table as "events"
WHERE
event_type IN (1, 2) )
UNION
(SELECT
"events"."user_id", "events"."time", 1 AS event
FROM
partitioned_events_table as "events"
WHERE
event_type IN (3, 4) )
UNION
(SELECT
"events"."user_id", "events"."time", 2 AS event
FROM
ONLY partitioned_events_table as "events"
WHERE
event_type IN (5, 6) )
UNION
(SELECT
"events"."user_id", "events"."time", 3 AS event
FROM
ONLY partitioned_events_table as "events"
WHERE
event_type IN (1, 6))) t1
GROUP BY "t1"."user_id") AS t) "q"
) AS final_query
GROUP BY types
ORDER BY types;
-- UNION and JOIN on both partitioned and regular tables -- UNION and JOIN on both partitioned and regular tables
SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType
FROM FROM